refactor(ds): emqx_replay -> emqx_ds

This commit is contained in:
ieQu1 2023-05-17 15:46:29 +02:00
parent 8d6bcc1414
commit 1159f99432
13 changed files with 131 additions and 131 deletions

View File

@ -1,6 +1,6 @@
# EMQX Replay # EMQX Replay
`emqx_replay` is a durable storage for MQTT messages within EMQX. `emqx_ds` is a durable storage for MQTT messages within EMQX.
It implements the following scenarios: It implements the following scenarios:
- Persisting messages published by clients - Persisting messages published by clients
- -

View File

@ -1,11 +1,11 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_replay, [ {application, emqx_ds, [
{description, "Message persistence and subscription replays for EMQX"}, {description, "Message persistence and subscription replays for EMQX"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "0.1.0"}, {vsn, "0.1.0"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, rocksdb, gproc]}, {applications, [kernel, stdlib, rocksdb, gproc]},
{mod, {emqx_replay_app, []}}, {mod, {emqx_ds_app, []}},
{env, []} {env, []}
]}. ]}.

View File

@ -2,9 +2,9 @@
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_replay_app). -module(emqx_ds_app).
-export([start/2]). -export([start/2]).
start(_Type, _Args) -> start(_Type, _Args) ->
emqx_replay_sup:start_link(). emqx_ds_sup:start_link().

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_replay_conf). -module(emqx_ds_conf).
%% TODO: make a proper HOCON schema and all... %% TODO: make a proper HOCON schema and all...
@ -12,7 +12,7 @@
-export([default_iteration_options/0]). -export([default_iteration_options/0]).
-type backend_config() :: -type backend_config() ::
{emqx_replay_message_storage, emqx_replay_message_storage:options()} {emqx_ds_message_storage, emqx_ds_message_storage:options()}
| {module(), _Options}. | {module(), _Options}.
-export_type([backend_config/0]). -export_type([backend_config/0]).
@ -21,32 +21,32 @@
%% API funcions %% API funcions
%%================================================================================ %%================================================================================
-define(APP, emqx_replay). -define(APP, emqx_ds).
-spec shard_config(emqx_replay:shard()) -> backend_config(). -spec shard_config(emqx_ds:shard()) -> backend_config().
shard_config(Shard) -> shard_config(Shard) ->
DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()), DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()),
Shards = application:get_env(?APP, shard_config, #{}), Shards = application:get_env(?APP, shard_config, #{}),
maps:get(Shard, Shards, DefaultShardConfig). maps:get(Shard, Shards, DefaultShardConfig).
-spec shard_iteration_options(emqx_replay:shard()) -> -spec shard_iteration_options(emqx_ds:shard()) ->
emqx_replay_message_storage:iteration_options(). emqx_ds_message_storage:iteration_options().
shard_iteration_options(Shard) -> shard_iteration_options(Shard) ->
case shard_config(Shard) of case shard_config(Shard) of
{emqx_replay_message_storage, Config} -> {emqx_ds_message_storage, Config} ->
maps:get(iteration, Config, default_iteration_options()); maps:get(iteration, Config, default_iteration_options());
{_Module, _} -> {_Module, _} ->
default_iteration_options() default_iteration_options()
end. end.
-spec default_iteration_options() -> emqx_replay_message_storage:iteration_options(). -spec default_iteration_options() -> emqx_ds_message_storage:iteration_options().
default_iteration_options() -> default_iteration_options() ->
{emqx_replay_message_storage, Config} = default_shard_config(), {emqx_ds_message_storage, Config} = default_shard_config(),
maps:get(iteration, Config). maps:get(iteration, Config).
-spec default_shard_config() -> backend_config(). -spec default_shard_config() -> backend_config().
default_shard_config() -> default_shard_config() ->
{emqx_replay_message_storage, #{ {emqx_ds_message_storage, #{
timestamp_bits => 64, timestamp_bits => 64,
topic_bits_per_level => [8, 8, 8, 32, 16], topic_bits_per_level => [8, 8, 8, 32, 16],
epoch => 5, epoch => 5,
@ -55,6 +55,6 @@ default_shard_config() ->
} }
}}. }}.
-spec db_options() -> emqx_replay_local_store:db_options(). -spec db_options() -> emqx_ds_local_store:db_options().
db_options() -> db_options() ->
application:get_env(?APP, db_options, []). application:get_env(?APP, db_options, []).

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_replay_local_store). -module(emqx_ds_local_store).
-behaviour(gen_server). -behaviour(gen_server).
@ -43,20 +43,20 @@
%% When should this generation become active? %% When should this generation become active?
%% This generation should only contain messages timestamped no earlier than that. %% This generation should only contain messages timestamped no earlier than that.
%% The very first generation will have `since` equal 0. %% The very first generation will have `since` equal 0.
since := emqx_replay:time() since := emqx_ds:time()
}. }.
-record(s, { -record(s, {
shard :: emqx_replay:shard(), shard :: emqx_ds:shard(),
db :: rocksdb:db_handle(), db :: rocksdb:db_handle(),
cf_iterator :: rocksdb:cf_handle(), cf_iterator :: rocksdb:cf_handle(),
cf_generations :: cf_refs() cf_generations :: cf_refs()
}). }).
-record(it, { -record(it, {
shard :: emqx_replay:shard(), shard :: emqx_ds:shard(),
gen :: gen_id(), gen :: gen_id(),
replay :: emqx_replay:replay(), replay :: emqx_ds:replay(),
module :: module(), module :: module(),
data :: term() data :: term()
}). }).
@ -91,16 +91,16 @@
-callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) -> -callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) ->
{_Schema, cf_refs()}. {_Schema, cf_refs()}.
-callback open(emqx_replay:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> -callback open(emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
term(). term().
-callback store(_Schema, binary(), emqx_replay:time(), emqx_replay:topic(), binary()) -> -callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
ok | {error, _}. ok | {error, _}.
-callback make_iterator(_Schema, emqx_replay:replay()) -> -callback make_iterator(_Schema, emqx_ds:replay()) ->
{ok, _It} | {error, _}. {ok, _It} | {error, _}.
-callback restore_iterator(_Schema, emqx_replay:replay(), binary()) -> {ok, _It} | {error, _}. -callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}.
-callback preserve_iterator(_Schema, _It) -> term(). -callback preserve_iterator(_Schema, _It) -> term().
@ -110,24 +110,24 @@
%% API funcions %% API funcions
%%================================================================================ %%================================================================================
-spec start_link(emqx_replay:shard()) -> {ok, pid()}. -spec start_link(emqx_ds:shard()) -> {ok, pid()}.
start_link(Shard) -> start_link(Shard) ->
gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []). gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []).
-spec create_generation(emqx_replay:shard(), emqx_replay:time(), emqx_replay_conf:backend_config()) -> -spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) ->
{ok, gen_id()} | {error, nonmonotonic}. {ok, gen_id()} | {error, nonmonotonic}.
create_generation(Shard, Since, Config = {_Module, _Options}) -> create_generation(Shard, Since, Config = {_Module, _Options}) ->
gen_server:call(?REF(Shard), {create_generation, Since, Config}). gen_server:call(?REF(Shard), {create_generation, Since, Config}).
-spec store( -spec store(
emqx_replay:shard(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary() emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()
) -> ) ->
ok | {error, _}. ok | {error, _}.
store(Shard, GUID, Time, Topic, Msg) -> store(Shard, GUID, Time, Topic, Msg) ->
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
Mod:store(Data, GUID, Time, Topic, Msg). Mod:store(Data, GUID, Time, Topic, Msg).
-spec make_iterator(emqx_replay:shard(), emqx_replay:replay()) -> -spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) ->
{ok, iterator()} | {error, _TODO}. {ok, iterator()} | {error, _TODO}.
make_iterator(Shard, Replay = {_, StartTime}) -> make_iterator(Shard, Replay = {_, StartTime}) ->
{GenId, Gen} = meta_lookup_gen(Shard, StartTime), {GenId, Gen} = meta_lookup_gen(Shard, StartTime),
@ -155,12 +155,12 @@ next(It = #it{module = Mod, data = ItData}) ->
end end
end. end.
-spec preserve_iterator(iterator(), emqx_replay:replay_id()) -> -spec preserve_iterator(iterator(), emqx_ds:replay_id()) ->
ok | {error, _TODO}. ok | {error, _TODO}.
preserve_iterator(It = #it{}, ReplayID) -> preserve_iterator(It = #it{}, ReplayID) ->
iterator_put_state(ReplayID, It). iterator_put_state(ReplayID, It).
-spec restore_iterator(emqx_replay:shard(), emqx_replay:replay_id()) -> -spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
{ok, iterator()} | {error, _TODO}. {ok, iterator()} | {error, _TODO}.
restore_iterator(Shard, ReplayID) -> restore_iterator(Shard, ReplayID) ->
case iterator_get_state(Shard, ReplayID) of case iterator_get_state(Shard, ReplayID) of
@ -172,7 +172,7 @@ restore_iterator(Shard, ReplayID) ->
Error Error
end. end.
-spec discard_iterator(emqx_replay:shard(), emqx_replay:replay_id()) -> -spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
ok | {error, _TODO}. ok | {error, _TODO}.
discard_iterator(Shard, ReplayID) -> discard_iterator(Shard, ReplayID) ->
iterator_delete(Shard, ReplayID). iterator_delete(Shard, ReplayID).
@ -229,14 +229,14 @@ populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) ->
ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) -> ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) ->
case schema_get_current(DBHandle) of case schema_get_current(DBHandle) of
undefined -> undefined ->
Config = emqx_replay_conf:shard_config(Shard), Config = emqx_ds_conf:shard_config(Shard),
{ok, _, NS} = create_new_gen(0, Config, S), {ok, _, NS} = create_new_gen(0, Config, S),
NS; NS;
_GenId -> _GenId ->
S S
end. end.
-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> -spec create_new_gen(emqx_ds:time(), emqx_ds_conf:backend_config(), state()) ->
{ok, gen_id(), state()} | {error, nonmonotonic}. {ok, gen_id(), state()} | {error, nonmonotonic}.
create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) -> create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) ->
GenId = get_next_id(meta_get_current(Shard)), GenId = get_next_id(meta_get_current(Shard)),
@ -253,7 +253,7 @@ create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) ->
Error Error
end. end.
-spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> -spec create_gen(gen_id(), emqx_ds:time(), emqx_ds_conf:backend_config(), state()) ->
{ok, generation(), state()}. {ok, generation(), state()}.
create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) -> create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) ->
% TODO: Backend implementation should ensure idempotency. % TODO: Backend implementation should ensure idempotency.
@ -265,13 +265,13 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations
}, },
{ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
-spec open_db(emqx_replay:shard()) -> {ok, state()} | {error, _TODO}. -spec open_db(emqx_ds:shard()) -> {ok, state()} | {error, _TODO}.
open_db(Shard) -> open_db(Shard) ->
Filename = binary_to_list(Shard), Filename = binary_to_list(Shard),
DBOptions = [ DBOptions = [
{create_if_missing, true}, {create_if_missing, true},
{create_missing_column_families, true} {create_missing_column_families, true}
| emqx_replay_conf:db_options() | emqx_ds_conf:db_options()
], ],
ExistingCFs = ExistingCFs =
case rocksdb:list_column_families(Filename, DBOptions) of case rocksdb:list_column_families(Filename, DBOptions) of
@ -425,7 +425,7 @@ schema_gen_key(N) ->
-define(PERSISTENT_TERM(SHARD, GEN), {?MODULE, SHARD, GEN}). -define(PERSISTENT_TERM(SHARD, GEN), {?MODULE, SHARD, GEN}).
-spec meta_register_gen(emqx_replay:shard(), gen_id(), generation()) -> ok. -spec meta_register_gen(emqx_ds:shard(), gen_id(), generation()) -> ok.
meta_register_gen(Shard, GenId, Gen) -> meta_register_gen(Shard, GenId, Gen) ->
Gs = Gs =
case GenId > 0 of case GenId > 0 of
@ -435,7 +435,7 @@ meta_register_gen(Shard, GenId, Gen) ->
ok = meta_put(Shard, GenId, [Gen | Gs]), ok = meta_put(Shard, GenId, [Gen | Gs]),
ok = meta_put(Shard, current, GenId). ok = meta_put(Shard, current, GenId).
-spec meta_lookup_gen(emqx_replay:shard(), emqx_replay:time()) -> {gen_id(), generation()}. -spec meta_lookup_gen(emqx_ds:shard(), emqx_ds:time()) -> {gen_id(), generation()}.
meta_lookup_gen(Shard, Time) -> meta_lookup_gen(Shard, Time) ->
% TODO % TODO
% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning % Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
@ -449,30 +449,30 @@ find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
find_gen(Time, GenId, [_Gen | Rest]) -> find_gen(Time, GenId, [_Gen | Rest]) ->
find_gen(Time, GenId - 1, Rest). find_gen(Time, GenId - 1, Rest).
-spec meta_get_gen(emqx_replay:shard(), gen_id()) -> generation() | undefined. -spec meta_get_gen(emqx_ds:shard(), gen_id()) -> generation() | undefined.
meta_get_gen(Shard, GenId) -> meta_get_gen(Shard, GenId) ->
case meta_lookup(Shard, GenId, []) of case meta_lookup(Shard, GenId, []) of
[Gen | _Older] -> Gen; [Gen | _Older] -> Gen;
[] -> undefined [] -> undefined
end. end.
-spec meta_get_current(emqx_replay:shard()) -> gen_id() | undefined. -spec meta_get_current(emqx_ds:shard()) -> gen_id() | undefined.
meta_get_current(Shard) -> meta_get_current(Shard) ->
meta_lookup(Shard, current, undefined). meta_lookup(Shard, current, undefined).
-spec meta_lookup(emqx_replay:shard(), _K) -> _V. -spec meta_lookup(emqx_ds:shard(), _K) -> _V.
meta_lookup(Shard, K) -> meta_lookup(Shard, K) ->
persistent_term:get(?PERSISTENT_TERM(Shard, K)). persistent_term:get(?PERSISTENT_TERM(Shard, K)).
-spec meta_lookup(emqx_replay:shard(), _K, Default) -> _V | Default. -spec meta_lookup(emqx_ds:shard(), _K, Default) -> _V | Default.
meta_lookup(Shard, K, Default) -> meta_lookup(Shard, K, Default) ->
persistent_term:get(?PERSISTENT_TERM(Shard, K), Default). persistent_term:get(?PERSISTENT_TERM(Shard, K), Default).
-spec meta_put(emqx_replay:shard(), _K, _V) -> ok. -spec meta_put(emqx_ds:shard(), _K, _V) -> ok.
meta_put(Shard, K, V) -> meta_put(Shard, K, V) ->
persistent_term:put(?PERSISTENT_TERM(Shard, K), V). persistent_term:put(?PERSISTENT_TERM(Shard, K), V).
-spec meta_erase(emqx_replay:shard()) -> ok. -spec meta_erase(emqx_ds:shard()) -> ok.
meta_erase(Shard) -> meta_erase(Shard) ->
[ [
persistent_term:erase(K) persistent_term:erase(K)

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_replay_local_store_sup). -module(emqx_ds_local_store_sup).
-behaviour(supervisor). -behaviour(supervisor).
@ -25,11 +25,11 @@
start_link() -> start_link() ->
supervisor:start_link({local, ?SUP}, ?MODULE, []). supervisor:start_link({local, ?SUP}, ?MODULE, []).
-spec start_shard(emqx_replay:shard()) -> supervisor:startchild_ret(). -spec start_shard(emqx_ds:shard()) -> supervisor:startchild_ret().
start_shard(Shard) -> start_shard(Shard) ->
supervisor:start_child(?SUP, shard_child_spec(Shard)). supervisor:start_child(?SUP, shard_child_spec(Shard)).
-spec stop_shard(emqx_replay:shard()) -> ok | {error, _}. -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}.
stop_shard(Shard) -> stop_shard(Shard) ->
ok = supervisor:terminate_child(?SUP, Shard), ok = supervisor:terminate_child(?SUP, Shard),
ok = supervisor:delete_child(?SUP, Shard). ok = supervisor:delete_child(?SUP, Shard).
@ -51,11 +51,11 @@ init([]) ->
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
-spec shard_child_spec(emqx_replay:shard()) -> supervisor:child_spec(). -spec shard_child_spec(emqx_ds:shard()) -> supervisor:child_spec().
shard_child_spec(Shard) -> shard_child_spec(Shard) ->
#{ #{
id => Shard, id => Shard,
start => {emqx_replay_local_store, start_link, [Shard]}, start => {emqx_ds_local_store, start_link, [Shard]},
shutdown => 5_000, shutdown => 5_000,
restart => permanent, restart => permanent,
type => worker type => worker

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_replay_message_storage). -module(emqx_ds_message_storage).
%%================================================================================ %%================================================================================
%% @doc Description of the schema %% @doc Description of the schema
@ -128,8 +128,8 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-type topic() :: emqx_replay:topic(). -type topic() :: emqx_ds:topic().
-type time() :: emqx_replay:time(). -type time() :: emqx_ds:time().
%% Number of bits %% Number of bits
-type bits() :: non_neg_integer(). -type bits() :: non_neg_integer().
@ -152,7 +152,7 @@
iteration => iteration_options(), iteration => iteration_options(),
cf_options => emqx_replay_local_store:db_cf_options() cf_options => emqx_ds_local_store:db_cf_options()
}. }.
-type iteration_options() :: #{ -type iteration_options() :: #{
@ -170,12 +170,12 @@
-opaque schema() :: #schema{}. -opaque schema() :: #schema{}.
-record(db, { -record(db, {
shard :: emqx_replay:shard(), shard :: emqx_ds:shard(),
handle :: rocksdb:db_handle(), handle :: rocksdb:db_handle(),
cf :: rocksdb:cf_handle(), cf :: rocksdb:cf_handle(),
keymapper :: keymapper(), keymapper :: keymapper(),
write_options = [{sync, true}] :: emqx_replay_local_store:db_write_options(), write_options = [{sync, true}] :: emqx_ds_local_store:db_write_options(),
read_options = [] :: emqx_replay_local_store:db_write_options() read_options = [] :: emqx_ds_local_store:db_write_options()
}). }).
-record(it, { -record(it, {
@ -221,8 +221,8 @@
%%================================================================================ %%================================================================================
%% Create a new column family for the generation and a serializable representation of the schema %% Create a new column family for the generation and a serializable representation of the schema
-spec create_new(rocksdb:db_handle(), emqx_replay_local_store:gen_id(), options()) -> -spec create_new(rocksdb:db_handle(), emqx_ds_local_store:gen_id(), options()) ->
{schema(), emqx_replay_local_store:cf_refs()}. {schema(), emqx_ds_local_store:cf_refs()}.
create_new(DBHandle, GenId, Options) -> create_new(DBHandle, GenId, Options) ->
CFName = data_cf(GenId), CFName = data_cf(GenId),
CFOptions = maps:get(cf_options, Options, []), CFOptions = maps:get(cf_options, Options, []),
@ -232,10 +232,10 @@ create_new(DBHandle, GenId, Options) ->
%% Reopen the database %% Reopen the database
-spec open( -spec open(
emqx_replay:shard(), emqx_ds:shard(),
rocksdb:db_handle(), rocksdb:db_handle(),
emqx_replay_local_store:gen_id(), emqx_ds_local_store:gen_id(),
emqx_replay_local_store:cf_refs(), emqx_ds_local_store:cf_refs(),
schema() schema()
) -> ) ->
db(). db().
@ -277,13 +277,13 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
Value = make_message_value(Topic, MessagePayload), Value = make_message_value(Topic, MessagePayload),
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
-spec make_iterator(db(), emqx_replay:replay()) -> -spec make_iterator(db(), emqx_ds:replay()) ->
{ok, iterator()} | {error, _TODO}. {ok, iterator()} | {error, _TODO}.
make_iterator(DB, Replay) -> make_iterator(DB, Replay) ->
Options = emqx_replay_conf:shard_iteration_options(DB#db.shard), Options = emqx_ds_conf:shard_iteration_options(DB#db.shard),
make_iterator(DB, Replay, Options). make_iterator(DB, Replay, Options).
-spec make_iterator(db(), emqx_replay:replay(), iteration_options()) -> -spec make_iterator(db(), emqx_ds:replay(), iteration_options()) ->
% {error, invalid_start_time}? might just start from the beginning of time % {error, invalid_start_time}? might just start from the beginning of time
% and call it a day: client violated the contract anyway. % and call it a day: client violated the contract anyway.
{ok, iterator()} | {error, _TODO}. {ok, iterator()} | {error, _TODO}.
@ -337,7 +337,7 @@ preserve_iterator(#it{cursor = Cursor}) ->
}, },
term_to_binary(State). term_to_binary(State).
-spec restore_iterator(db(), emqx_replay:replay(), binary()) -> -spec restore_iterator(db(), emqx_ds:replay(), binary()) ->
{ok, iterator()} | {error, _TODO}. {ok, iterator()} | {error, _TODO}.
restore_iterator(DB, Replay, Serial) when is_binary(Serial) -> restore_iterator(DB, Replay, Serial) when is_binary(Serial) ->
State = binary_to_term(Serial), State = binary_to_term(Serial),
@ -419,7 +419,7 @@ hash(Input, Bits) ->
% at most 32 bits % at most 32 bits
erlang:phash2(Input, 1 bsl Bits). erlang:phash2(Input, 1 bsl Bits).
-spec make_keyspace_filter(emqx_replay:replay(), keymapper()) -> keyspace_filter(). -spec make_keyspace_filter(emqx_ds:replay(), keymapper()) -> keyspace_filter().
make_keyspace_filter({TopicFilter, StartTime}, Keymapper) -> make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ->
Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper), Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper), HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
@ -710,7 +710,7 @@ substring(I, Offset, Size) ->
(I bsr Offset) band ones(Size). (I bsr Offset) band ones(Size).
%% @doc Generate a column family ID for the MQTT messages %% @doc Generate a column family ID for the MQTT messages
-spec data_cf(emqx_replay_local_store:gen_id()) -> [char()]. -spec data_cf(emqx_ds_local_store:gen_id()) -> [char()].
data_cf(GenId) -> data_cf(GenId) ->
?MODULE_STRING ++ integer_to_list(GenId). ?MODULE_STRING ++ integer_to_list(GenId).

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_replay). -module(emqx_ds_replay).
%% API: %% API:
-export([]). -export([]).

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_replay_sup). -module(emqx_ds_sup).
-behaviour(supervisor). -behaviour(supervisor).
@ -45,7 +45,7 @@ init([]) ->
shard_sup() -> shard_sup() ->
#{ #{
id => local_store_shard_sup, id => local_store_shard_sup,
start => {emqx_replay_local_store_sup, start_link, []}, start => {emqx_ds_local_store_sup, start_link, []},
restart => permanent, restart => permanent,
type => supervisor, type => supervisor,
shutdown => infinity shutdown => infinity

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_replay_local_store_SUITE). -module(emqx_ds_local_store_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
@ -12,7 +12,7 @@
-define(SHARD, shard(?FUNCTION_NAME)). -define(SHARD, shard(?FUNCTION_NAME)).
-define(DEFAULT_CONFIG, -define(DEFAULT_CONFIG,
{emqx_replay_message_storage, #{ {emqx_ds_message_storage, #{
timestamp_bits => 64, timestamp_bits => 64,
topic_bits_per_level => [8, 8, 32, 16], topic_bits_per_level => [8, 8, 32, 16],
epoch => 5, epoch => 5,
@ -23,7 +23,7 @@
). ).
-define(COMPACT_CONFIG, -define(COMPACT_CONFIG,
{emqx_replay_message_storage, #{ {emqx_ds_message_storage, #{
timestamp_bits => 16, timestamp_bits => 16,
topic_bits_per_level => [16, 16], topic_bits_per_level => [16, 16],
epoch => 10 epoch => 10
@ -32,8 +32,8 @@
%% Smoke test for opening and reopening the database %% Smoke test for opening and reopening the database
t_open(_Config) -> t_open(_Config) ->
ok = emqx_replay_local_store_sup:stop_shard(?SHARD), ok = emqx_ds_local_store_sup:stop_shard(?SHARD),
{ok, _} = emqx_replay_local_store_sup:start_shard(?SHARD). {ok, _} = emqx_ds_local_store_sup:start_shard(?SHARD).
%% Smoke test of store function %% Smoke test of store function
t_store(_Config) -> t_store(_Config) ->
@ -41,7 +41,7 @@ t_store(_Config) ->
PublishedAt = 1000, PublishedAt = 1000,
Topic = [<<"foo">>, <<"bar">>], Topic = [<<"foo">>, <<"bar">>],
Payload = <<"message">>, Payload = <<"message">>,
?assertMatch(ok, emqx_replay_local_store:store(?SHARD, MessageID, PublishedAt, Topic, Payload)). ?assertMatch(ok, emqx_ds_local_store:store(?SHARD, MessageID, PublishedAt, Topic, Payload)).
%% Smoke test for iteration through a concrete topic %% Smoke test for iteration through a concrete topic
t_iterate(_Config) -> t_iterate(_Config) ->
@ -49,7 +49,7 @@ t_iterate(_Config) ->
Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]], Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]],
Timestamps = lists:seq(1, 10), Timestamps = lists:seq(1, 10),
[ [
emqx_replay_local_store:store( emqx_ds_local_store:store(
?SHARD, ?SHARD,
emqx_guid:gen(), emqx_guid:gen(),
PublishedAt, PublishedAt,
@ -61,7 +61,7 @@ t_iterate(_Config) ->
%% Iterate through individual topics: %% Iterate through individual topics:
[ [
begin begin
{ok, It} = emqx_replay_local_store:make_iterator(?SHARD, {Topic, 0}), {ok, It} = emqx_ds_local_store:make_iterator(?SHARD, {Topic, 0}),
Values = iterate(It), Values = iterate(It),
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values) ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
end end
@ -136,16 +136,16 @@ t_iterate_long_tail_wildcard(_Config) ->
). ).
t_create_gen(_Config) -> t_create_gen(_Config) ->
{ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), {ok, 1} = emqx_ds_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
?assertEqual( ?assertEqual(
{error, nonmonotonic}, {error, nonmonotonic},
emqx_replay_local_store:create_generation(?SHARD, 1, ?DEFAULT_CONFIG) emqx_ds_local_store:create_generation(?SHARD, 1, ?DEFAULT_CONFIG)
), ),
?assertEqual( ?assertEqual(
{error, nonmonotonic}, {error, nonmonotonic},
emqx_replay_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG) emqx_ds_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG)
), ),
{ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), {ok, 2} = emqx_ds_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz"], Topics = ["foo/bar", "foo/bar/baz"],
Timestamps = lists:seq(1, 100), Timestamps = lists:seq(1, 100),
[ [
@ -154,9 +154,9 @@ t_create_gen(_Config) ->
]. ].
t_iterate_multigen(_Config) -> t_iterate_multigen(_Config) ->
{ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), {ok, 1} = emqx_ds_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), {ok, 2} = emqx_ds_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_replay_local_store:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), {ok, 3} = emqx_ds_local_store:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
Timestamps = lists:seq(1, 100), Timestamps = lists:seq(1, 100),
_ = [ _ = [
@ -180,9 +180,9 @@ t_iterate_multigen(_Config) ->
t_iterate_multigen_preserve_restore(_Config) -> t_iterate_multigen_preserve_restore(_Config) ->
ReplayID = atom_to_binary(?FUNCTION_NAME), ReplayID = atom_to_binary(?FUNCTION_NAME),
{ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), {ok, 1} = emqx_ds_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), {ok, 2} = emqx_ds_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_replay_local_store:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), {ok, 3} = emqx_ds_local_store:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz", "a/bar"], Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
Timestamps = lists:seq(1, 100), Timestamps = lists:seq(1, 100),
TopicFilter = "foo/#", TopicFilter = "foo/#",
@ -194,12 +194,12 @@ t_iterate_multigen_preserve_restore(_Config) ->
It0 = iterator(?SHARD, TopicFilter, 0), It0 = iterator(?SHARD, TopicFilter, 0),
{It1, Res10} = iterate(It0, 10), {It1, Res10} = iterate(It0, 10),
% preserve mid-generation % preserve mid-generation
ok = emqx_replay_local_store:preserve_iterator(It1, ReplayID), ok = emqx_ds_local_store:preserve_iterator(It1, ReplayID),
{ok, It2} = emqx_replay_local_store:restore_iterator(?SHARD, ReplayID), {ok, It2} = emqx_ds_local_store:restore_iterator(?SHARD, ReplayID),
{It3, Res100} = iterate(It2, 88), {It3, Res100} = iterate(It2, 88),
% preserve on the generation boundary % preserve on the generation boundary
ok = emqx_replay_local_store:preserve_iterator(It3, ReplayID), ok = emqx_ds_local_store:preserve_iterator(It3, ReplayID),
{ok, It4} = emqx_replay_local_store:restore_iterator(?SHARD, ReplayID), {ok, It4} = emqx_ds_local_store:restore_iterator(?SHARD, ReplayID),
{It5, Res200} = iterate(It4, 1000), {It5, Res200} = iterate(It4, 1000),
?assertEqual(none, It5), ?assertEqual(none, It5),
?assertEqual( ?assertEqual(
@ -208,22 +208,22 @@ t_iterate_multigen_preserve_restore(_Config) ->
), ),
?assertEqual( ?assertEqual(
ok, ok,
emqx_replay_local_store:discard_iterator(?SHARD, ReplayID) emqx_ds_local_store:discard_iterator(?SHARD, ReplayID)
), ),
?assertEqual( ?assertEqual(
{error, not_found}, {error, not_found},
emqx_replay_local_store:restore_iterator(?SHARD, ReplayID) emqx_ds_local_store:restore_iterator(?SHARD, ReplayID)
). ).
store(Shard, PublishedAt, Topic, Payload) -> store(Shard, PublishedAt, Topic, Payload) ->
ID = emqx_guid:gen(), ID = emqx_guid:gen(),
emqx_replay_local_store:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload). emqx_ds_local_store:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload).
iterate(DB, TopicFilter, StartTime) -> iterate(DB, TopicFilter, StartTime) ->
iterate(iterator(DB, TopicFilter, StartTime)). iterate(iterator(DB, TopicFilter, StartTime)).
iterate(It) -> iterate(It) ->
case emqx_replay_local_store:next(It) of case emqx_ds_local_store:next(It) of
{value, Payload, ItNext} -> {value, Payload, ItNext} ->
[Payload | iterate(ItNext)]; [Payload | iterate(ItNext)];
none -> none ->
@ -233,7 +233,7 @@ iterate(It) ->
iterate(It, 0) -> iterate(It, 0) ->
{It, []}; {It, []};
iterate(It, N) -> iterate(It, N) ->
case emqx_replay_local_store:next(It) of case emqx_ds_local_store:next(It) of
{value, Payload, ItNext} -> {value, Payload, ItNext} ->
{ItFinal, Ps} = iterate(ItNext, N - 1), {ItFinal, Ps} = iterate(ItNext, N - 1),
{ItFinal, [Payload | Ps]}; {ItFinal, [Payload | Ps]};
@ -242,7 +242,7 @@ iterate(It, N) ->
end. end.
iterator(DB, TopicFilter, StartTime) -> iterator(DB, TopicFilter, StartTime) ->
{ok, It} = emqx_replay_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}), {ok, It} = emqx_ds_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}),
It. It.
parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
@ -255,22 +255,22 @@ parse_topic(Topic) ->
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(emqx_replay), {ok, _} = application:ensure_all_started(emqx_ds),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok = application:stop(emqx_replay). ok = application:stop(emqx_ds).
init_per_testcase(TC, Config) -> init_per_testcase(TC, Config) ->
ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG), ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
{ok, _} = emqx_replay_local_store_sup:start_shard(shard(TC)), {ok, _} = emqx_ds_local_store_sup:start_shard(shard(TC)),
Config. Config.
end_per_testcase(TC, _Config) -> end_per_testcase(TC, _Config) ->
ok = emqx_replay_local_store_sup:stop_shard(shard(TC)). ok = emqx_ds_local_store_sup:stop_shard(shard(TC)).
shard(TC) -> shard(TC) ->
list_to_binary(lists:concat([?MODULE, "_", TC])). list_to_binary(lists:concat([?MODULE, "_", TC])).
set_shard_config(Shard, Config) -> set_shard_config(Shard, Config) ->
ok = application:set_env(emqx_replay, shard_config, #{Shard => Config}). ok = application:set_env(emqx_ds, shard_config, #{Shard => Config}).

View File

@ -1,14 +1,14 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_replay_message_storage_SUITE). -module(emqx_ds_message_storage_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-import(emqx_replay_message_storage, [ -import(emqx_ds_message_storage, [
make_keymapper/1, make_keymapper/1,
keymapper_info/1, keymapper_info/1,
compute_topic_bitmask/2, compute_topic_bitmask/2,

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_replay_message_storage_shim). -module(emqx_ds_message_storage_shim).
-export([open/0]). -export([open/0]).
-export([close/1]). -export([close/1]).
@ -29,7 +29,7 @@ store(Tab, MessageID, PublishedAt, Topic, Payload) ->
true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}), true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}),
ok. ok.
-spec iterate(t(), emqx_replay:replay()) -> -spec iterate(t(), emqx_ds:replay()) ->
[binary()]. [binary()].
iterate(Tab, {TopicFilter, StartTime}) -> iterate(Tab, {TopicFilter, StartTime}) ->
ets:foldr( ets:foldr(

View File

@ -22,7 +22,7 @@ prop_bitstring_computes() ->
Keymapper, Keymapper,
keymapper(), keymapper(),
?FORALL({Topic, Timestamp}, {topic(), integer()}, begin ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin
BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper), BS = emqx_ds_message_storage:compute_bitstring(Topic, Timestamp, Keymapper),
is_integer(BS) andalso (BS < (1 bsl get_keymapper_bitsize(Keymapper))) is_integer(BS) andalso (BS < (1 bsl get_keymapper_bitsize(Keymapper)))
end) end)
). ).
@ -30,7 +30,7 @@ prop_bitstring_computes() ->
prop_topic_bitmask_computes() -> prop_topic_bitmask_computes() ->
Keymapper = make_keymapper(16, [8, 12, 16], 100), Keymapper = make_keymapper(16, [8, 12, 16], 100),
?FORALL(TopicFilter, topic_filter(), begin ?FORALL(TopicFilter, topic_filter(), begin
Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper), Mask = emqx_ds_message_storage:compute_topic_bitmask(TopicFilter, Keymapper),
% topic bits + timestamp LSBs % topic bits + timestamp LSBs
is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) is_integer(Mask) andalso (Mask < (1 bsl (36 + 6)))
end). end).
@ -40,14 +40,14 @@ prop_next_seek_monotonic() ->
{TopicFilter, StartTime, Keymapper}, {TopicFilter, StartTime, Keymapper},
{topic_filter(), pos_integer(), keymapper()}, {topic_filter(), pos_integer(), keymapper()},
begin begin
Filter = emqx_replay_message_storage:make_keyspace_filter( Filter = emqx_ds_message_storage:make_keyspace_filter(
{TopicFilter, StartTime}, {TopicFilter, StartTime},
Keymapper Keymapper
), ),
?FORALL( ?FORALL(
Bitstring, Bitstring,
bitstr(get_keymapper_bitsize(Keymapper)), bitstr(get_keymapper_bitsize(Keymapper)),
emqx_replay_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring emqx_ds_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring
) )
end end
). ).
@ -56,8 +56,8 @@ prop_next_seek_eq_initial_seek() ->
?FORALL( ?FORALL(
Filter, Filter,
keyspace_filter(), keyspace_filter(),
emqx_replay_message_storage:compute_initial_seek(Filter) =:= emqx_ds_message_storage:compute_initial_seek(Filter) =:=
emqx_replay_message_storage:compute_next_seek(0, Filter) emqx_ds_message_storage:compute_next_seek(0, Filter)
). ).
prop_iterate_messages() -> prop_iterate_messages() ->
@ -72,7 +72,7 @@ prop_iterate_messages() ->
?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin ?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin
Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)), Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)),
{DB, Handle} = open_db(Filepath, Options), {DB, Handle} = open_db(Filepath, Options),
Shim = emqx_replay_message_storage_shim:open(), Shim = emqx_ds_message_storage_shim:open(),
ok = store_db(DB, Stream), ok = store_db(DB, Stream),
ok = store_shim(Shim, Stream), ok = store_shim(Shim, Stream),
?FORALL( ?FORALL(
@ -92,7 +92,7 @@ prop_iterate_messages() ->
Messages = iterate_db(DB, Iteration), Messages = iterate_db(DB, Iteration),
Reference = iterate_shim(Shim, Iteration), Reference = iterate_shim(Shim, Iteration),
ok = close_db(Handle), ok = close_db(Handle),
ok = emqx_replay_message_storage_shim:close(Shim), ok = emqx_ds_message_storage_shim:close(Shim),
?WHENFAIL( ?WHENFAIL(
begin begin
io:format(user, " *** Filepath = ~s~n", [Filepath]), io:format(user, " *** Filepath = ~s~n", [Filepath]),
@ -182,7 +182,7 @@ prop_iterate_eq_iterate_with_refresh() ->
% PublishedAt = ChunkNum, % PublishedAt = ChunkNum,
% MessageID, PublishedAt, Topic % MessageID, PublishedAt, Topic
% ]), % ]),
% ok = emqx_replay_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload), % ok = emqx_ds_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload),
% store_message_stream(DB, payload_gen:next(Rest)); % store_message_stream(DB, payload_gen:next(Rest));
% store_message_stream(_Zone, []) -> % store_message_stream(_Zone, []) ->
% ok. % ok.
@ -191,7 +191,7 @@ store_db(DB, Messages) ->
lists:foreach( lists:foreach(
fun({Topic, Payload = {MessageID, Timestamp, _}}) -> fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
Bin = term_to_binary(Payload), Bin = term_to_binary(Payload),
emqx_replay_message_storage:store(DB, MessageID, Timestamp, Topic, Bin) emqx_ds_message_storage:store(DB, MessageID, Timestamp, Topic, Bin)
end, end,
Messages Messages
). ).
@ -200,7 +200,7 @@ iterate_db(DB, Iteration) ->
iterate_db(make_iterator(DB, Iteration)). iterate_db(make_iterator(DB, Iteration)).
iterate_db(It) -> iterate_db(It) ->
case emqx_replay_message_storage:next(It) of case emqx_ds_message_storage:next(It) of
{value, Payload, ItNext} -> {value, Payload, ItNext} ->
[binary_to_term(Payload) | iterate_db(ItNext)]; [binary_to_term(Payload) | iterate_db(ItNext)];
none -> none ->
@ -208,15 +208,15 @@ iterate_db(It) ->
end. end.
make_iterator(DB, Replay) -> make_iterator(DB, Replay) ->
{ok, It} = emqx_replay_message_storage:make_iterator(DB, Replay), {ok, It} = emqx_ds_message_storage:make_iterator(DB, Replay),
It. It.
make_iterator(DB, Replay, Options) -> make_iterator(DB, Replay, Options) ->
{ok, It} = emqx_replay_message_storage:make_iterator(DB, Replay, Options), {ok, It} = emqx_ds_message_storage:make_iterator(DB, Replay, Options),
It. It.
run_iterator_commands([iterate | Rest], It, Ctx) -> run_iterator_commands([iterate | Rest], It, Ctx) ->
case emqx_replay_message_storage:next(It) of case emqx_ds_message_storage:next(It) of
{value, Payload, ItNext} -> {value, Payload, ItNext} ->
[binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)]; [binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)];
none -> none ->
@ -227,8 +227,8 @@ run_iterator_commands([{preserve, restore} | Rest], It, Ctx) ->
db := DB, db := DB,
replay := Replay replay := Replay
} = Ctx, } = Ctx,
Serial = emqx_replay_message_storage:preserve_iterator(It), Serial = emqx_ds_message_storage:preserve_iterator(It),
{ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Replay, Serial), {ok, ItNext} = emqx_ds_message_storage:restore_iterator(DB, Replay, Serial),
run_iterator_commands(Rest, ItNext, Ctx); run_iterator_commands(Rest, ItNext, Ctx);
run_iterator_commands([], It, _Ctx) -> run_iterator_commands([], It, _Ctx) ->
iterate_db(It). iterate_db(It).
@ -237,7 +237,7 @@ store_shim(Shim, Messages) ->
lists:foreach( lists:foreach(
fun({Topic, Payload = {MessageID, Timestamp, _}}) -> fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
Bin = term_to_binary(Payload), Bin = term_to_binary(Payload),
emqx_replay_message_storage_shim:store(Shim, MessageID, Timestamp, Topic, Bin) emqx_ds_message_storage_shim:store(Shim, MessageID, Timestamp, Topic, Bin)
end, end,
Messages Messages
). ).
@ -245,7 +245,7 @@ store_shim(Shim, Messages) ->
iterate_shim(Shim, Iteration) -> iterate_shim(Shim, Iteration) ->
lists:map( lists:map(
fun binary_to_term/1, fun binary_to_term/1,
emqx_replay_message_storage_shim:iterate(Shim, Iteration) emqx_ds_message_storage_shim:iterate(Shim, Iteration)
). ).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -254,8 +254,8 @@ iterate_shim(Shim, Iteration) ->
open_db(Filepath, Options) -> open_db(Filepath, Options) ->
{ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]), {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]),
{Schema, CFRefs} = emqx_replay_message_storage:create_new(Handle, ?GEN_ID, Options), {Schema, CFRefs} = emqx_ds_message_storage:create_new(Handle, ?GEN_ID, Options),
DB = emqx_replay_message_storage:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema), DB = emqx_ds_message_storage:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema),
{DB, Handle}. {DB, Handle}.
close_db(Handle) -> close_db(Handle) ->
@ -379,7 +379,7 @@ keyspace_filter() ->
?LET( ?LET(
{TopicFilter, StartTime, Keymapper}, {TopicFilter, StartTime, Keymapper},
{topic_filter(), pos_integer(), keymapper()}, {topic_filter(), pos_integer(), keymapper()},
emqx_replay_message_storage:make_keyspace_filter({TopicFilter, StartTime}, Keymapper) emqx_ds_message_storage:make_keyspace_filter({TopicFilter, StartTime}, Keymapper)
). ).
messages(Topic) -> messages(Topic) ->
@ -426,14 +426,14 @@ flat(T) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
make_keymapper(TimestampBits, TopicBits, MaxEpoch) -> make_keymapper(TimestampBits, TopicBits, MaxEpoch) ->
emqx_replay_message_storage:make_keymapper(#{ emqx_ds_message_storage:make_keymapper(#{
timestamp_bits => TimestampBits, timestamp_bits => TimestampBits,
topic_bits_per_level => TopicBits, topic_bits_per_level => TopicBits,
epoch => MaxEpoch epoch => MaxEpoch
}). }).
get_keymapper_bitsize(Keymapper) -> get_keymapper_bitsize(Keymapper) ->
maps:get(bitsize, emqx_replay_message_storage:keymapper_info(Keymapper)). maps:get(bitsize, emqx_ds_message_storage:keymapper_info(Keymapper)).
-spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}). -spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}).
interleave(Seqs, Rng) -> interleave(Seqs, Rng) ->