refactor(replay): zone -> shard
Also bump erlang-rocksdb version
This commit is contained in:
parent
b5443c2981
commit
f79dd16672
|
@ -18,7 +18,7 @@
|
||||||
%% API:
|
%% API:
|
||||||
-export([]).
|
-export([]).
|
||||||
|
|
||||||
-export_type([topic/0, time/0]).
|
-export_type([topic/0, time/0, shard/0]).
|
||||||
-export_type([replay_id/0, replay/0]).
|
-export_type([replay_id/0, replay/0]).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -28,6 +28,8 @@
|
||||||
%% parsed
|
%% parsed
|
||||||
-type topic() :: list(binary()).
|
-type topic() :: list(binary()).
|
||||||
|
|
||||||
|
-type shard() :: binary().
|
||||||
|
|
||||||
%% Timestamp
|
%% Timestamp
|
||||||
%% Earliest possible timestamp is 0.
|
%% Earliest possible timestamp is 0.
|
||||||
%% TODO granularity?
|
%% TODO granularity?
|
||||||
|
|
|
@ -18,9 +18,9 @@
|
||||||
%% TODO: make a proper HOCON schema and all...
|
%% TODO: make a proper HOCON schema and all...
|
||||||
|
|
||||||
%% API:
|
%% API:
|
||||||
-export([zone_config/1, db_options/0]).
|
-export([shard_config/1, db_options/0]).
|
||||||
|
|
||||||
-export([zone_iteration_options/1]).
|
-export([shard_iteration_options/1]).
|
||||||
-export([default_iteration_options/0]).
|
-export([default_iteration_options/0]).
|
||||||
|
|
||||||
-type backend_config() ::
|
-type backend_config() ::
|
||||||
|
@ -35,17 +35,16 @@
|
||||||
|
|
||||||
-define(APP, emqx_replay).
|
-define(APP, emqx_replay).
|
||||||
|
|
||||||
-type zone() :: emqx_types:zone().
|
-spec shard_config(emqx_replay:shard()) -> backend_config().
|
||||||
|
shard_config(Shard) ->
|
||||||
|
DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()),
|
||||||
|
Shards = application:get_env(?APP, shard_config, #{}),
|
||||||
|
maps:get(Shard, Shards, DefaultShardConfig).
|
||||||
|
|
||||||
-spec zone_config(zone()) -> backend_config().
|
-spec shard_iteration_options(emqx_replay:shard()) ->
|
||||||
zone_config(Zone) ->
|
emqx_replay_message_storage:iteration_options().
|
||||||
DefaultZoneConfig = application:get_env(?APP, default_zone_config, default_zone_config()),
|
shard_iteration_options(Shard) ->
|
||||||
Zones = application:get_env(?APP, zone_config, #{}),
|
case shard_config(Shard) of
|
||||||
maps:get(Zone, Zones, DefaultZoneConfig).
|
|
||||||
|
|
||||||
-spec zone_iteration_options(zone()) -> emqx_replay_message_storage:iteration_options().
|
|
||||||
zone_iteration_options(Zone) ->
|
|
||||||
case zone_config(Zone) of
|
|
||||||
{emqx_replay_message_storage, Config} ->
|
{emqx_replay_message_storage, Config} ->
|
||||||
maps:get(iteration, Config, default_iteration_options());
|
maps:get(iteration, Config, default_iteration_options());
|
||||||
{_Module, _} ->
|
{_Module, _} ->
|
||||||
|
@ -54,11 +53,11 @@ zone_iteration_options(Zone) ->
|
||||||
|
|
||||||
-spec default_iteration_options() -> emqx_replay_message_storage:iteration_options().
|
-spec default_iteration_options() -> emqx_replay_message_storage:iteration_options().
|
||||||
default_iteration_options() ->
|
default_iteration_options() ->
|
||||||
{emqx_replay_message_storage, Config} = default_zone_config(),
|
{emqx_replay_message_storage, Config} = default_shard_config(),
|
||||||
maps:get(iteration, Config).
|
maps:get(iteration, Config).
|
||||||
|
|
||||||
-spec default_zone_config() -> backend_config().
|
-spec default_shard_config() -> backend_config().
|
||||||
default_zone_config() ->
|
default_shard_config() ->
|
||||||
{emqx_replay_message_storage, #{
|
{emqx_replay_message_storage, #{
|
||||||
timestamp_bits => 64,
|
timestamp_bits => 64,
|
||||||
topic_bits_per_level => [8, 8, 8, 32, 16],
|
topic_bits_per_level => [8, 8, 8, 32, 16],
|
||||||
|
|
|
@ -30,7 +30,7 @@
|
||||||
%% behavior callbacks:
|
%% behavior callbacks:
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
||||||
|
|
||||||
-export_type([cf_refs/0, gen_id/0, db_write_options/0]).
|
-export_type([cf_refs/0, gen_id/0, db_write_options/0, state/0, iterator/0]).
|
||||||
|
|
||||||
-compile({inline, [meta_lookup/2]}).
|
-compile({inline, [meta_lookup/2]}).
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
%% see rocksdb:db_options()
|
%% see rocksdb:db_options()
|
||||||
-type options() :: proplists:proplist().
|
% -type options() :: proplists:proplist().
|
||||||
|
|
||||||
-type db_write_options() :: proplists:proplist().
|
-type db_write_options() :: proplists:proplist().
|
||||||
|
|
||||||
|
@ -59,14 +59,14 @@
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-record(s, {
|
-record(s, {
|
||||||
zone :: emqx_types:zone(),
|
shard :: emqx_replay:shard(),
|
||||||
db :: rocksdb:db_handle(),
|
db :: rocksdb:db_handle(),
|
||||||
cf_iterator :: rocksdb:cf_handle(),
|
cf_iterator :: rocksdb:cf_handle(),
|
||||||
cf_generations :: cf_refs()
|
cf_generations :: cf_refs()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-record(it, {
|
-record(it, {
|
||||||
zone :: emqx_types:zone(),
|
shard :: emqx_replay:shard(),
|
||||||
gen :: gen_id(),
|
gen :: gen_id(),
|
||||||
replay :: emqx_replay:replay(),
|
replay :: emqx_replay:replay(),
|
||||||
module :: module(),
|
module :: module(),
|
||||||
|
@ -94,33 +94,35 @@
|
||||||
%% 3. `inplace_update_support`?
|
%% 3. `inplace_update_support`?
|
||||||
-define(ITERATOR_CF_OPTS, []).
|
-define(ITERATOR_CF_OPTS, []).
|
||||||
|
|
||||||
-define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}).
|
-define(REF(Shard), {via, gproc, {n, l, {?MODULE, Shard}}}).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API funcions
|
%% API funcions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec start_link(emqx_types:zone()) -> {ok, pid()}.
|
-spec start_link(emqx_replay:shard()) -> {ok, pid()}.
|
||||||
start_link(Zone) ->
|
start_link(Shard) ->
|
||||||
gen_server:start_link(?REF(Zone), ?MODULE, [Zone], []).
|
gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []).
|
||||||
|
|
||||||
-spec create_generation(emqx_types:zone(), emqx_replay:time(), emqx_replay_conf:backend_config()) ->
|
-spec create_generation(emqx_replay:shard(), emqx_replay:time(), emqx_replay_conf:backend_config()) ->
|
||||||
{ok, gen_id()} | {error, nonmonotonic}.
|
{ok, gen_id()} | {error, nonmonotonic}.
|
||||||
create_generation(Zone, Since, Config = {_Module, _Options}) ->
|
create_generation(Shard, Since, Config = {_Module, _Options}) ->
|
||||||
gen_server:call(?REF(Zone), {create_generation, Since, Config}).
|
gen_server:call(?REF(Shard), {create_generation, Since, Config}).
|
||||||
|
|
||||||
-spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) ->
|
-spec store(
|
||||||
|
emqx_replay:shard(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()
|
||||||
|
) ->
|
||||||
ok | {error, _TODO}.
|
ok | {error, _TODO}.
|
||||||
store(Zone, GUID, Time, Topic, Msg) ->
|
store(Shard, GUID, Time, Topic, Msg) ->
|
||||||
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time),
|
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
|
||||||
Mod:store(Data, GUID, Time, Topic, Msg).
|
Mod:store(Data, GUID, Time, Topic, Msg).
|
||||||
|
|
||||||
-spec make_iterator(emqx_types:zone(), emqx_replay:replay()) ->
|
-spec make_iterator(emqx_replay:shard(), emqx_replay:replay()) ->
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
make_iterator(Zone, Replay = {_, StartTime}) ->
|
make_iterator(Shard, Replay = {_, StartTime}) ->
|
||||||
{GenId, Gen} = meta_lookup_gen(Zone, StartTime),
|
{GenId, Gen} = meta_lookup_gen(Shard, StartTime),
|
||||||
open_iterator(Gen, #it{
|
open_iterator(Gen, #it{
|
||||||
zone = Zone,
|
shard = Shard,
|
||||||
gen = GenId,
|
gen = GenId,
|
||||||
replay = Replay
|
replay = Replay
|
||||||
}).
|
}).
|
||||||
|
@ -148,30 +150,30 @@ next(It = #it{module = Mod, data = ItData}) ->
|
||||||
preserve_iterator(It = #it{}, ReplayID) ->
|
preserve_iterator(It = #it{}, ReplayID) ->
|
||||||
iterator_put_state(ReplayID, It).
|
iterator_put_state(ReplayID, It).
|
||||||
|
|
||||||
-spec restore_iterator(emqx_types:zone(), emqx_replay:replay_id()) ->
|
-spec restore_iterator(emqx_replay:shard(), emqx_replay:replay_id()) ->
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
restore_iterator(Zone, ReplayID) ->
|
restore_iterator(Shard, ReplayID) ->
|
||||||
case iterator_get_state(Zone, ReplayID) of
|
case iterator_get_state(Shard, ReplayID) of
|
||||||
{ok, Serial} ->
|
{ok, Serial} ->
|
||||||
restore_iterator_state(Zone, Serial);
|
restore_iterator_state(Shard, Serial);
|
||||||
not_found ->
|
not_found ->
|
||||||
{error, not_found};
|
{error, not_found};
|
||||||
{error, _Reason} = Error ->
|
{error, _Reason} = Error ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec discard_iterator(emqx_types:zone(), emqx_replay:replay_id()) ->
|
-spec discard_iterator(emqx_replay:shard(), emqx_replay:replay_id()) ->
|
||||||
ok | {error, _TODO}.
|
ok | {error, _TODO}.
|
||||||
discard_iterator(Zone, ReplayID) ->
|
discard_iterator(Shard, ReplayID) ->
|
||||||
iterator_delete(Zone, ReplayID).
|
iterator_delete(Shard, ReplayID).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% behavior callbacks
|
%% behavior callbacks
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
init([Zone]) ->
|
init([Shard]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
{ok, S0} = open_db(Zone),
|
{ok, S0} = open_db(Shard),
|
||||||
S = ensure_current_generation(S0),
|
S = ensure_current_generation(S0),
|
||||||
ok = populate_metadata(S),
|
ok = populate_metadata(S),
|
||||||
{ok, S}.
|
{ok, S}.
|
||||||
|
@ -192,8 +194,8 @@ handle_cast(_Cast, S) ->
|
||||||
handle_info(_Info, S) ->
|
handle_info(_Info, S) ->
|
||||||
{noreply, S}.
|
{noreply, S}.
|
||||||
|
|
||||||
terminate(_Reason, #s{db = DB, zone = Zone}) ->
|
terminate(_Reason, #s{db = DB, shard = Shard}) ->
|
||||||
meta_erase(Zone),
|
meta_erase(Shard),
|
||||||
ok = rocksdb:close(DB).
|
ok = rocksdb:close(DB).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -203,21 +205,21 @@ terminate(_Reason, #s{db = DB, zone = Zone}) ->
|
||||||
-record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}).
|
-record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}).
|
||||||
|
|
||||||
-spec populate_metadata(state()) -> ok.
|
-spec populate_metadata(state()) -> ok.
|
||||||
populate_metadata(S = #s{zone = Zone, db = DBHandle, cf_iterator = CFIterator}) ->
|
populate_metadata(S = #s{shard = Shard, db = DBHandle, cf_iterator = CFIterator}) ->
|
||||||
ok = meta_put(Zone, db, #db{handle = DBHandle, cf_iterator = CFIterator}),
|
ok = meta_put(Shard, db, #db{handle = DBHandle, cf_iterator = CFIterator}),
|
||||||
Current = schema_get_current(DBHandle),
|
Current = schema_get_current(DBHandle),
|
||||||
lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)).
|
lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)).
|
||||||
|
|
||||||
-spec populate_metadata(gen_id(), state()) -> ok.
|
-spec populate_metadata(gen_id(), state()) -> ok.
|
||||||
populate_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) ->
|
populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) ->
|
||||||
Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S),
|
Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S),
|
||||||
meta_register_gen(Zone, GenId, Gen).
|
meta_register_gen(Shard, GenId, Gen).
|
||||||
|
|
||||||
-spec ensure_current_generation(state()) -> state().
|
-spec ensure_current_generation(state()) -> state().
|
||||||
ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) ->
|
ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) ->
|
||||||
case schema_get_current(DBHandle) of
|
case schema_get_current(DBHandle) of
|
||||||
undefined ->
|
undefined ->
|
||||||
Config = emqx_replay_conf:zone_config(Zone),
|
Config = emqx_replay_conf:shard_config(Shard),
|
||||||
{ok, _, NS} = create_new_gen(0, Config, S),
|
{ok, _, NS} = create_new_gen(0, Config, S),
|
||||||
NS;
|
NS;
|
||||||
_GenId ->
|
_GenId ->
|
||||||
|
@ -226,16 +228,16 @@ ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) ->
|
||||||
|
|
||||||
-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
|
-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
|
||||||
{ok, gen_id(), state()} | {error, nonmonotonic}.
|
{ok, gen_id(), state()} | {error, nonmonotonic}.
|
||||||
create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) ->
|
create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) ->
|
||||||
GenId = get_next_id(meta_get_current(Zone)),
|
GenId = get_next_id(meta_get_current(Shard)),
|
||||||
GenId = get_next_id(schema_get_current(DBHandle)),
|
GenId = get_next_id(schema_get_current(DBHandle)),
|
||||||
case is_gen_valid(Zone, GenId, Since) of
|
case is_gen_valid(Shard, GenId, Since) of
|
||||||
ok ->
|
ok ->
|
||||||
{ok, Gen, NS} = create_gen(GenId, Since, Config, S),
|
{ok, Gen, NS} = create_gen(GenId, Since, Config, S),
|
||||||
%% TODO: Transaction? Column family creation can't be transactional, anyway.
|
%% TODO: Transaction? Column family creation can't be transactional, anyway.
|
||||||
ok = schema_put_gen(DBHandle, GenId, Gen),
|
ok = schema_put_gen(DBHandle, GenId, Gen),
|
||||||
ok = schema_put_current(DBHandle, GenId),
|
ok = schema_put_current(DBHandle, GenId),
|
||||||
ok = meta_register_gen(Zone, GenId, open_gen(GenId, Gen, NS)),
|
ok = meta_register_gen(Shard, GenId, open_gen(GenId, Gen, NS)),
|
||||||
{ok, GenId, NS};
|
{ok, GenId, NS};
|
||||||
{error, _} = Error ->
|
{error, _} = Error ->
|
||||||
Error
|
Error
|
||||||
|
@ -253,9 +255,9 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations
|
||||||
},
|
},
|
||||||
{ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
|
{ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
|
||||||
|
|
||||||
-spec open_db(emqx_types:zone()) -> {ok, state()} | {error, _TODO}.
|
-spec open_db(emqx_replay:shard()) -> {ok, state()} | {error, _TODO}.
|
||||||
open_db(Zone) ->
|
open_db(Shard) ->
|
||||||
Filename = atom_to_list(Zone),
|
Filename = binary_to_list(Shard),
|
||||||
DBOptions = [
|
DBOptions = [
|
||||||
{create_if_missing, true},
|
{create_if_missing, true},
|
||||||
{create_missing_column_families, true}
|
{create_missing_column_families, true}
|
||||||
|
@ -278,7 +280,7 @@ open_db(Zone) ->
|
||||||
{ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
|
{ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
|
||||||
{CFNames, _} = lists:unzip(ExistingCFs),
|
{CFNames, _} = lists:unzip(ExistingCFs),
|
||||||
{ok, #s{
|
{ok, #s{
|
||||||
zone = Zone,
|
shard = Shard,
|
||||||
db = DBHandle,
|
db = DBHandle,
|
||||||
cf_iterator = CFIterator,
|
cf_iterator = CFIterator,
|
||||||
cf_generations = lists:zip(CFNames, CFRefs)
|
cf_generations = lists:zip(CFNames, CFRefs)
|
||||||
|
@ -291,14 +293,14 @@ open_db(Zone) ->
|
||||||
open_gen(
|
open_gen(
|
||||||
GenId,
|
GenId,
|
||||||
Gen = #{module := Mod, data := Data},
|
Gen = #{module := Mod, data := Data},
|
||||||
#s{zone = Zone, db = DBHandle, cf_generations = CFs}
|
#s{shard = Shard, db = DBHandle, cf_generations = CFs}
|
||||||
) ->
|
) ->
|
||||||
DB = Mod:open(Zone, DBHandle, GenId, CFs, Data),
|
DB = Mod:open(Shard, DBHandle, GenId, CFs, Data),
|
||||||
Gen#{data := DB}.
|
Gen#{data := DB}.
|
||||||
|
|
||||||
-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none.
|
-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none.
|
||||||
open_next_iterator(It = #it{zone = Zone, gen = GenId}) ->
|
open_next_iterator(It = #it{shard = Shard, gen = GenId}) ->
|
||||||
open_next_iterator(meta_get_gen(Zone, GenId + 1), It#it{gen = GenId + 1}).
|
open_next_iterator(meta_get_gen(Shard, GenId + 1), It#it{gen = GenId + 1}).
|
||||||
|
|
||||||
open_next_iterator(undefined, _It) ->
|
open_next_iterator(undefined, _It) ->
|
||||||
none;
|
none;
|
||||||
|
@ -331,17 +333,17 @@ open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay},
|
||||||
-define(ITERATION_WRITE_OPTS, []).
|
-define(ITERATION_WRITE_OPTS, []).
|
||||||
-define(ITERATION_READ_OPTS, []).
|
-define(ITERATION_READ_OPTS, []).
|
||||||
|
|
||||||
iterator_get_state(Zone, ReplayID) ->
|
iterator_get_state(Shard, ReplayID) ->
|
||||||
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db),
|
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
|
||||||
rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS).
|
rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS).
|
||||||
|
|
||||||
iterator_put_state(ID, It = #it{zone = Zone}) ->
|
iterator_put_state(ID, It = #it{shard = Shard}) ->
|
||||||
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db),
|
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
|
||||||
Serial = preserve_iterator_state(It),
|
Serial = preserve_iterator_state(It),
|
||||||
rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS).
|
rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS).
|
||||||
|
|
||||||
iterator_delete(Zone, ID) ->
|
iterator_delete(Shard, ID) ->
|
||||||
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db),
|
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
|
||||||
rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS).
|
rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS).
|
||||||
|
|
||||||
preserve_iterator_state(#it{
|
preserve_iterator_state(#it{
|
||||||
|
@ -358,10 +360,10 @@ preserve_iterator_state(#it{
|
||||||
st => Mod:preserve_iterator(ItData)
|
st => Mod:preserve_iterator(ItData)
|
||||||
}).
|
}).
|
||||||
|
|
||||||
restore_iterator_state(Zone, Serial) when is_binary(Serial) ->
|
restore_iterator_state(Shard, Serial) when is_binary(Serial) ->
|
||||||
restore_iterator_state(Zone, binary_to_term(Serial));
|
restore_iterator_state(Shard, binary_to_term(Serial));
|
||||||
restore_iterator_state(
|
restore_iterator_state(
|
||||||
Zone,
|
Shard,
|
||||||
#{
|
#{
|
||||||
v := 1,
|
v := 1,
|
||||||
gen := Gen,
|
gen := Gen,
|
||||||
|
@ -370,8 +372,8 @@ restore_iterator_state(
|
||||||
st := State
|
st := State
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
It = #it{zone = Zone, gen = Gen, replay = {TopicFilter, StartTime}},
|
It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}},
|
||||||
open_restore_iterator(meta_get_gen(Zone, Gen), It, State).
|
open_restore_iterator(meta_get_gen(Shard, Gen), It, State).
|
||||||
|
|
||||||
%% Functions for dealing with the metadata stored persistently in rocksdb
|
%% Functions for dealing with the metadata stored persistently in rocksdb
|
||||||
|
|
||||||
|
@ -409,27 +411,27 @@ schema_gen_key(N) ->
|
||||||
-undef(SCHEMA_WRITE_OPTS).
|
-undef(SCHEMA_WRITE_OPTS).
|
||||||
-undef(SCHEMA_READ_OPTS).
|
-undef(SCHEMA_READ_OPTS).
|
||||||
|
|
||||||
%% Functions for dealing with the runtime zone metadata:
|
%% Functions for dealing with the runtime shard metadata:
|
||||||
|
|
||||||
-define(PERSISTENT_TERM(ZONE, GEN), {?MODULE, ZONE, GEN}).
|
-define(PERSISTENT_TERM(SHARD, GEN), {?MODULE, SHARD, GEN}).
|
||||||
|
|
||||||
-spec meta_register_gen(emqx_types:zone(), gen_id(), generation()) -> ok.
|
-spec meta_register_gen(emqx_replay:shard(), gen_id(), generation()) -> ok.
|
||||||
meta_register_gen(Zone, GenId, Gen) ->
|
meta_register_gen(Shard, GenId, Gen) ->
|
||||||
Gs =
|
Gs =
|
||||||
case GenId > 0 of
|
case GenId > 0 of
|
||||||
true -> meta_lookup(Zone, GenId - 1);
|
true -> meta_lookup(Shard, GenId - 1);
|
||||||
false -> []
|
false -> []
|
||||||
end,
|
end,
|
||||||
ok = meta_put(Zone, GenId, [Gen | Gs]),
|
ok = meta_put(Shard, GenId, [Gen | Gs]),
|
||||||
ok = meta_put(Zone, current, GenId).
|
ok = meta_put(Shard, current, GenId).
|
||||||
|
|
||||||
-spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> {gen_id(), generation()}.
|
-spec meta_lookup_gen(emqx_replay:shard(), emqx_replay:time()) -> {gen_id(), generation()}.
|
||||||
meta_lookup_gen(Zone, Time) ->
|
meta_lookup_gen(Shard, Time) ->
|
||||||
% TODO
|
% TODO
|
||||||
% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
|
% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
|
||||||
% towards a "no".
|
% towards a "no".
|
||||||
Current = meta_lookup(Zone, current),
|
Current = meta_lookup(Shard, current),
|
||||||
Gens = meta_lookup(Zone, Current),
|
Gens = meta_lookup(Shard, Current),
|
||||||
find_gen(Time, Current, Gens).
|
find_gen(Time, Current, Gens).
|
||||||
|
|
||||||
find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
|
find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
|
||||||
|
@ -437,34 +439,34 @@ find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
|
||||||
find_gen(Time, GenId, [_Gen | Rest]) ->
|
find_gen(Time, GenId, [_Gen | Rest]) ->
|
||||||
find_gen(Time, GenId - 1, Rest).
|
find_gen(Time, GenId - 1, Rest).
|
||||||
|
|
||||||
-spec meta_get_gen(emqx_types:zone(), gen_id()) -> generation() | undefined.
|
-spec meta_get_gen(emqx_replay:shard(), gen_id()) -> generation() | undefined.
|
||||||
meta_get_gen(Zone, GenId) ->
|
meta_get_gen(Shard, GenId) ->
|
||||||
case meta_lookup(Zone, GenId, []) of
|
case meta_lookup(Shard, GenId, []) of
|
||||||
[Gen | _Older] -> Gen;
|
[Gen | _Older] -> Gen;
|
||||||
[] -> undefined
|
[] -> undefined
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec meta_get_current(emqx_types:zone()) -> gen_id() | undefined.
|
-spec meta_get_current(emqx_replay:shard()) -> gen_id() | undefined.
|
||||||
meta_get_current(Zone) ->
|
meta_get_current(Shard) ->
|
||||||
meta_lookup(Zone, current, undefined).
|
meta_lookup(Shard, current, undefined).
|
||||||
|
|
||||||
-spec meta_lookup(emqx_types:zone(), _K) -> _V.
|
-spec meta_lookup(emqx_replay:shard(), _K) -> _V.
|
||||||
meta_lookup(Zone, K) ->
|
meta_lookup(Shard, K) ->
|
||||||
persistent_term:get(?PERSISTENT_TERM(Zone, K)).
|
persistent_term:get(?PERSISTENT_TERM(Shard, K)).
|
||||||
|
|
||||||
-spec meta_lookup(emqx_types:zone(), _K, Default) -> _V | Default.
|
-spec meta_lookup(emqx_replay:shard(), _K, Default) -> _V | Default.
|
||||||
meta_lookup(Zone, K, Default) ->
|
meta_lookup(Shard, K, Default) ->
|
||||||
persistent_term:get(?PERSISTENT_TERM(Zone, K), Default).
|
persistent_term:get(?PERSISTENT_TERM(Shard, K), Default).
|
||||||
|
|
||||||
-spec meta_put(emqx_types:zone(), _K, _V) -> ok.
|
-spec meta_put(emqx_replay:shard(), _K, _V) -> ok.
|
||||||
meta_put(Zone, K, V) ->
|
meta_put(Shard, K, V) ->
|
||||||
persistent_term:put(?PERSISTENT_TERM(Zone, K), V).
|
persistent_term:put(?PERSISTENT_TERM(Shard, K), V).
|
||||||
|
|
||||||
-spec meta_erase(emqx_types:zone()) -> ok.
|
-spec meta_erase(emqx_replay:shard()) -> ok.
|
||||||
meta_erase(Zone) ->
|
meta_erase(Shard) ->
|
||||||
[
|
[
|
||||||
persistent_term:erase(K)
|
persistent_term:erase(K)
|
||||||
|| {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Zone
|
|| {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Shard
|
||||||
],
|
],
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -473,15 +475,15 @@ meta_erase(Zone) ->
|
||||||
get_next_id(undefined) -> 0;
|
get_next_id(undefined) -> 0;
|
||||||
get_next_id(GenId) -> GenId + 1.
|
get_next_id(GenId) -> GenId + 1.
|
||||||
|
|
||||||
is_gen_valid(Zone, GenId, Since) when GenId > 0 ->
|
is_gen_valid(Shard, GenId, Since) when GenId > 0 ->
|
||||||
[GenPrev | _] = meta_lookup(Zone, GenId - 1),
|
[GenPrev | _] = meta_lookup(Shard, GenId - 1),
|
||||||
case GenPrev of
|
case GenPrev of
|
||||||
#{since := SincePrev} when Since > SincePrev ->
|
#{since := SincePrev} when Since > SincePrev ->
|
||||||
ok;
|
ok;
|
||||||
#{} ->
|
#{} ->
|
||||||
{error, nonmonotonic}
|
{error, nonmonotonic}
|
||||||
end;
|
end;
|
||||||
is_gen_valid(_Zone, 0, 0) ->
|
is_gen_valid(_Shard, 0, 0) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.
|
%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
-behavior(supervisor).
|
-behavior(supervisor).
|
||||||
|
|
||||||
%% API:
|
%% API:
|
||||||
-export([start_link/0, start_zone/1, stop_zone/1]).
|
-export([start_link/0, start_shard/1, stop_shard/1]).
|
||||||
|
|
||||||
%% behavior callbacks:
|
%% behavior callbacks:
|
||||||
-export([init/1]).
|
-export([init/1]).
|
||||||
|
@ -37,14 +37,14 @@
|
||||||
start_link() ->
|
start_link() ->
|
||||||
supervisor:start_link({local, ?SUP}, ?MODULE, []).
|
supervisor:start_link({local, ?SUP}, ?MODULE, []).
|
||||||
|
|
||||||
-spec start_zone(emqx_types:zone()) -> supervisor:startchild_ret().
|
-spec start_shard(emqx_replay:shard()) -> supervisor:startchild_ret().
|
||||||
start_zone(Zone) ->
|
start_shard(Shard) ->
|
||||||
supervisor:start_child(?SUP, zone_child_spec(Zone)).
|
supervisor:start_child(?SUP, shard_child_spec(Shard)).
|
||||||
|
|
||||||
-spec stop_zone(emqx_types:zone()) -> ok | {error, _}.
|
-spec stop_shard(emqx_replay:shard()) -> ok | {error, _}.
|
||||||
stop_zone(Zone) ->
|
stop_shard(Shard) ->
|
||||||
ok = supervisor:terminate_child(?SUP, Zone),
|
ok = supervisor:terminate_child(?SUP, Shard),
|
||||||
ok = supervisor:delete_child(?SUP, Zone).
|
ok = supervisor:delete_child(?SUP, Shard).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% behavior callbacks
|
%% behavior callbacks
|
||||||
|
@ -63,11 +63,11 @@ init([]) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec zone_child_spec(emqx_types:zone()) -> supervisor:child_spec().
|
-spec shard_child_spec(emqx_replay:shard()) -> supervisor:child_spec().
|
||||||
zone_child_spec(Zone) ->
|
shard_child_spec(Shard) ->
|
||||||
#{
|
#{
|
||||||
id => Zone,
|
id => Shard,
|
||||||
start => {emqx_replay_local_store, start_link, [Zone]},
|
start => {emqx_replay_local_store, start_link, [Shard]},
|
||||||
shutdown => 5_000,
|
shutdown => 5_000,
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
type => worker
|
type => worker
|
||||||
|
|
|
@ -182,7 +182,7 @@
|
||||||
-opaque schema() :: #schema{}.
|
-opaque schema() :: #schema{}.
|
||||||
|
|
||||||
-record(db, {
|
-record(db, {
|
||||||
zone :: emqx_types:zone(),
|
shard :: emqx_replay:shard(),
|
||||||
handle :: rocksdb:db_handle(),
|
handle :: rocksdb:db_handle(),
|
||||||
cf :: rocksdb:cf_handle(),
|
cf :: rocksdb:cf_handle(),
|
||||||
keymapper :: keymapper(),
|
keymapper :: keymapper(),
|
||||||
|
@ -244,17 +244,17 @@ create_new(DBHandle, GenId, Options) ->
|
||||||
|
|
||||||
%% Reopen the database
|
%% Reopen the database
|
||||||
-spec open(
|
-spec open(
|
||||||
emqx_types:zone(),
|
emqx_replay:shard(),
|
||||||
rocksdb:db_handle(),
|
rocksdb:db_handle(),
|
||||||
emqx_replay_local_store:gen_id(),
|
emqx_replay_local_store:gen_id(),
|
||||||
emqx_replay_local_store:cf_refs(),
|
emqx_replay_local_store:cf_refs(),
|
||||||
schema()
|
schema()
|
||||||
) ->
|
) ->
|
||||||
db().
|
db().
|
||||||
open(Zone, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
|
open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
|
||||||
{value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs),
|
{value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs),
|
||||||
#db{
|
#db{
|
||||||
zone = Zone,
|
shard = Shard,
|
||||||
handle = DBHandle,
|
handle = DBHandle,
|
||||||
cf = CFHandle,
|
cf = CFHandle,
|
||||||
keymapper = Keymapper
|
keymapper = Keymapper
|
||||||
|
@ -292,7 +292,7 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
|
||||||
-spec make_iterator(db(), emqx_replay:replay()) ->
|
-spec make_iterator(db(), emqx_replay:replay()) ->
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
make_iterator(DB, Replay) ->
|
make_iterator(DB, Replay) ->
|
||||||
Options = emqx_replay_conf:zone_iteration_options(DB#db.zone),
|
Options = emqx_replay_conf:shard_iteration_options(DB#db.shard),
|
||||||
make_iterator(DB, Replay, Options).
|
make_iterator(DB, Replay, Options).
|
||||||
|
|
||||||
-spec make_iterator(db(), emqx_replay:replay(), iteration_options()) ->
|
-spec make_iterator(db(), emqx_replay:replay(), iteration_options()) ->
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -42,7 +42,7 @@ start_link() ->
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
Children = [zone_sup()],
|
Children = [shard_sup()],
|
||||||
SupFlags = #{
|
SupFlags = #{
|
||||||
strategy => one_for_all,
|
strategy => one_for_all,
|
||||||
intensity => 0,
|
intensity => 0,
|
||||||
|
@ -54,9 +54,9 @@ init([]) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
zone_sup() ->
|
shard_sup() ->
|
||||||
#{
|
#{
|
||||||
id => local_store_zone_sup,
|
id => local_store_shard_sup,
|
||||||
start => {emqx_replay_local_store_sup, start_link, []},
|
start => {emqx_replay_local_store_sup, start_link, []},
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
type => supervisor,
|
type => supervisor,
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
|
||||||
-define(ZONE, zone(?FUNCTION_NAME)).
|
-define(SHARD, shard(?FUNCTION_NAME)).
|
||||||
|
|
||||||
-define(DEFAULT_CONFIG,
|
-define(DEFAULT_CONFIG,
|
||||||
{emqx_replay_message_storage, #{
|
{emqx_replay_message_storage, #{
|
||||||
|
@ -44,8 +44,8 @@
|
||||||
|
|
||||||
%% Smoke test for opening and reopening the database
|
%% Smoke test for opening and reopening the database
|
||||||
t_open(_Config) ->
|
t_open(_Config) ->
|
||||||
ok = emqx_replay_local_store_sup:stop_zone(?ZONE),
|
ok = emqx_replay_local_store_sup:stop_shard(?SHARD),
|
||||||
{ok, _} = emqx_replay_local_store_sup:start_zone(?ZONE).
|
{ok, _} = emqx_replay_local_store_sup:start_shard(?SHARD).
|
||||||
|
|
||||||
%% Smoke test of store function
|
%% Smoke test of store function
|
||||||
t_store(_Config) ->
|
t_store(_Config) ->
|
||||||
|
@ -53,7 +53,7 @@ t_store(_Config) ->
|
||||||
PublishedAt = 1000,
|
PublishedAt = 1000,
|
||||||
Topic = [<<"foo">>, <<"bar">>],
|
Topic = [<<"foo">>, <<"bar">>],
|
||||||
Payload = <<"message">>,
|
Payload = <<"message">>,
|
||||||
?assertMatch(ok, emqx_replay_local_store:store(?ZONE, MessageID, PublishedAt, Topic, Payload)).
|
?assertMatch(ok, emqx_replay_local_store:store(?SHARD, MessageID, PublishedAt, Topic, Payload)).
|
||||||
|
|
||||||
%% Smoke test for iteration through a concrete topic
|
%% Smoke test for iteration through a concrete topic
|
||||||
t_iterate(_Config) ->
|
t_iterate(_Config) ->
|
||||||
|
@ -62,7 +62,7 @@ t_iterate(_Config) ->
|
||||||
Timestamps = lists:seq(1, 10),
|
Timestamps = lists:seq(1, 10),
|
||||||
[
|
[
|
||||||
emqx_replay_local_store:store(
|
emqx_replay_local_store:store(
|
||||||
?ZONE,
|
?SHARD,
|
||||||
emqx_guid:gen(),
|
emqx_guid:gen(),
|
||||||
PublishedAt,
|
PublishedAt,
|
||||||
Topic,
|
Topic,
|
||||||
|
@ -73,7 +73,7 @@ t_iterate(_Config) ->
|
||||||
%% Iterate through individual topics:
|
%% Iterate through individual topics:
|
||||||
[
|
[
|
||||||
begin
|
begin
|
||||||
{ok, It} = emqx_replay_local_store:make_iterator(?ZONE, {Topic, 0}),
|
{ok, It} = emqx_replay_local_store:make_iterator(?SHARD, {Topic, 0}),
|
||||||
Values = iterate(It),
|
Values = iterate(It),
|
||||||
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
|
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
|
||||||
end
|
end
|
||||||
|
@ -87,50 +87,50 @@ t_iterate_wildcard(_Config) ->
|
||||||
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
|
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
|
||||||
Timestamps = lists:seq(1, 10),
|
Timestamps = lists:seq(1, 10),
|
||||||
_ = [
|
_ = [
|
||||||
store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|
store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|
||||||
|| Topic <- Topics, PublishedAt <- Timestamps
|
|| Topic <- Topics, PublishedAt <- Timestamps
|
||||||
],
|
],
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]),
|
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]),
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 0)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 0)])
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[],
|
[],
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 10 + 1)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 10 + 1)])
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]),
|
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]),
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 5)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 5)])
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
lists:sort([
|
lists:sort([
|
||||||
{Topic, PublishedAt}
|
{Topic, PublishedAt}
|
||||||
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
|
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
|
||||||
]),
|
]),
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]),
|
lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]),
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/+", 0)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+", 0)])
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[],
|
[],
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/+/bar", 0)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+/bar", 0)])
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
lists:sort([
|
lists:sort([
|
||||||
{Topic, PublishedAt}
|
{Topic, PublishedAt}
|
||||||
|| Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps
|
|| Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps
|
||||||
]),
|
]),
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "+/bar/#", 0)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "+/bar/#", 0)])
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]),
|
lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]),
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 0)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 0)])
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[],
|
[],
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/+/+", 0)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/+/+", 0)])
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -139,40 +139,40 @@ t_iterate_long_tail_wildcard(_Config) ->
|
||||||
TopicFilter = "b/c/d/e/+/+",
|
TopicFilter = "b/c/d/e/+/+",
|
||||||
Timestamps = lists:seq(1, 100),
|
Timestamps = lists:seq(1, 100),
|
||||||
_ = [
|
_ = [
|
||||||
store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|
store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|
||||||
|| PublishedAt <- Timestamps
|
|| PublishedAt <- Timestamps
|
||||||
],
|
],
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]),
|
lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]),
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, TopicFilter, 50)])
|
||||||
).
|
).
|
||||||
|
|
||||||
t_create_gen(_Config) ->
|
t_create_gen(_Config) ->
|
||||||
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG),
|
{ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
{error, nonmonotonic},
|
{error, nonmonotonic},
|
||||||
emqx_replay_local_store:create_generation(?ZONE, 1, ?DEFAULT_CONFIG)
|
emqx_replay_local_store:create_generation(?SHARD, 1, ?DEFAULT_CONFIG)
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
{error, nonmonotonic},
|
{error, nonmonotonic},
|
||||||
emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG)
|
emqx_replay_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG)
|
||||||
),
|
),
|
||||||
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
|
{ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||||
Topics = ["foo/bar", "foo/bar/baz"],
|
Topics = ["foo/bar", "foo/bar/baz"],
|
||||||
Timestamps = lists:seq(1, 100),
|
Timestamps = lists:seq(1, 100),
|
||||||
[
|
[
|
||||||
?assertEqual(ok, store(?ZONE, PublishedAt, Topic, <<>>))
|
?assertEqual(ok, store(?SHARD, PublishedAt, Topic, <<>>))
|
||||||
|| Topic <- Topics, PublishedAt <- Timestamps
|
|| Topic <- Topics, PublishedAt <- Timestamps
|
||||||
].
|
].
|
||||||
|
|
||||||
t_iterate_multigen(_Config) ->
|
t_iterate_multigen(_Config) ->
|
||||||
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
|
{ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||||
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG),
|
{ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
|
||||||
{ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 1000, ?DEFAULT_CONFIG),
|
{ok, 3} = emqx_replay_local_store:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
|
||||||
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
|
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
|
||||||
Timestamps = lists:seq(1, 100),
|
Timestamps = lists:seq(1, 100),
|
||||||
_ = [
|
_ = [
|
||||||
store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|
store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|
||||||
|| Topic <- Topics, PublishedAt <- Timestamps
|
|| Topic <- Topics, PublishedAt <- Timestamps
|
||||||
],
|
],
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
|
@ -180,38 +180,38 @@ t_iterate_multigen(_Config) ->
|
||||||
{Topic, PublishedAt}
|
{Topic, PublishedAt}
|
||||||
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
|
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
|
||||||
]),
|
]),
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
lists:sort([
|
lists:sort([
|
||||||
{Topic, PublishedAt}
|
{Topic, PublishedAt}
|
||||||
|| Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
|
|| Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
|
||||||
]),
|
]),
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)])
|
||||||
).
|
).
|
||||||
|
|
||||||
t_iterate_multigen_preserve_restore(_Config) ->
|
t_iterate_multigen_preserve_restore(_Config) ->
|
||||||
ReplayID = atom_to_binary(?FUNCTION_NAME),
|
ReplayID = atom_to_binary(?FUNCTION_NAME),
|
||||||
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
|
{ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||||
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG),
|
{ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
|
||||||
{ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 100, ?DEFAULT_CONFIG),
|
{ok, 3} = emqx_replay_local_store:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
|
||||||
Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
|
Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
|
||||||
Timestamps = lists:seq(1, 100),
|
Timestamps = lists:seq(1, 100),
|
||||||
TopicFilter = "foo/#",
|
TopicFilter = "foo/#",
|
||||||
TopicsMatching = ["foo/bar", "foo/bar/baz"],
|
TopicsMatching = ["foo/bar", "foo/bar/baz"],
|
||||||
_ = [
|
_ = [
|
||||||
store(?ZONE, TS, Topic, term_to_binary({Topic, TS}))
|
store(?SHARD, TS, Topic, term_to_binary({Topic, TS}))
|
||||||
|| Topic <- Topics, TS <- Timestamps
|
|| Topic <- Topics, TS <- Timestamps
|
||||||
],
|
],
|
||||||
It0 = iterator(?ZONE, TopicFilter, 0),
|
It0 = iterator(?SHARD, TopicFilter, 0),
|
||||||
{It1, Res10} = iterate(It0, 10),
|
{It1, Res10} = iterate(It0, 10),
|
||||||
% preserve mid-generation
|
% preserve mid-generation
|
||||||
ok = emqx_replay_local_store:preserve_iterator(It1, ReplayID),
|
ok = emqx_replay_local_store:preserve_iterator(It1, ReplayID),
|
||||||
{ok, It2} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID),
|
{ok, It2} = emqx_replay_local_store:restore_iterator(?SHARD, ReplayID),
|
||||||
{It3, Res100} = iterate(It2, 88),
|
{It3, Res100} = iterate(It2, 88),
|
||||||
% preserve on the generation boundary
|
% preserve on the generation boundary
|
||||||
ok = emqx_replay_local_store:preserve_iterator(It3, ReplayID),
|
ok = emqx_replay_local_store:preserve_iterator(It3, ReplayID),
|
||||||
{ok, It4} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID),
|
{ok, It4} = emqx_replay_local_store:restore_iterator(?SHARD, ReplayID),
|
||||||
{It5, Res200} = iterate(It4, 1000),
|
{It5, Res200} = iterate(It4, 1000),
|
||||||
?assertEqual(none, It5),
|
?assertEqual(none, It5),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
|
@ -220,16 +220,16 @@ t_iterate_multigen_preserve_restore(_Config) ->
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
ok,
|
ok,
|
||||||
emqx_replay_local_store:discard_iterator(?ZONE, ReplayID)
|
emqx_replay_local_store:discard_iterator(?SHARD, ReplayID)
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
{error, not_found},
|
{error, not_found},
|
||||||
emqx_replay_local_store:restore_iterator(?ZONE, ReplayID)
|
emqx_replay_local_store:restore_iterator(?SHARD, ReplayID)
|
||||||
).
|
).
|
||||||
|
|
||||||
store(Zone, PublishedAt, Topic, Payload) ->
|
store(Shard, PublishedAt, Topic, Payload) ->
|
||||||
ID = emqx_guid:gen(),
|
ID = emqx_guid:gen(),
|
||||||
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
|
emqx_replay_local_store:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload).
|
||||||
|
|
||||||
iterate(DB, TopicFilter, StartTime) ->
|
iterate(DB, TopicFilter, StartTime) ->
|
||||||
iterate(iterator(DB, TopicFilter, StartTime)).
|
iterate(iterator(DB, TopicFilter, StartTime)).
|
||||||
|
@ -274,15 +274,15 @@ end_per_suite(_Config) ->
|
||||||
ok = application:stop(emqx_replay).
|
ok = application:stop(emqx_replay).
|
||||||
|
|
||||||
init_per_testcase(TC, Config) ->
|
init_per_testcase(TC, Config) ->
|
||||||
ok = set_zone_config(zone(TC), ?DEFAULT_CONFIG),
|
ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
|
||||||
{ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
|
{ok, _} = emqx_replay_local_store_sup:start_shard(shard(TC)),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(TC, _Config) ->
|
end_per_testcase(TC, _Config) ->
|
||||||
ok = emqx_replay_local_store_sup:stop_zone(zone(TC)).
|
ok = emqx_replay_local_store_sup:stop_shard(shard(TC)).
|
||||||
|
|
||||||
zone(TC) ->
|
shard(TC) ->
|
||||||
list_to_atom(lists:concat([?MODULE, "_", TC])).
|
list_to_binary(lists:concat([?MODULE, "_", TC])).
|
||||||
|
|
||||||
set_zone_config(Zone, Config) ->
|
set_shard_config(Shard, Config) ->
|
||||||
ok = application:set_env(emqx_replay, zone_config, #{Zone => Config}).
|
ok = application:set_env(emqx_replay, shard_config, #{Shard => Config}).
|
||||||
|
|
2
mix.exs
2
mix.exs
|
@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
||||||
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
||||||
{:esockd, github: "emqx/esockd", tag: "5.9.6", override: true},
|
{:esockd, github: "emqx/esockd", tag: "5.9.6", override: true},
|
||||||
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-9", override: true},
|
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-11", override: true},
|
||||||
{:ekka, github: "emqx/ekka", tag: "0.15.1", override: true},
|
{:ekka, github: "emqx/ekka", tag: "0.15.1", override: true},
|
||||||
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
|
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
|
||||||
{:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
|
{:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
|
||||||
|
|
|
@ -61,7 +61,7 @@
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
||||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}
|
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}
|
||||||
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-9"}}}
|
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-11"}}}
|
||||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}}
|
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}}
|
||||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
|
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
|
||||||
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
|
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
|
||||||
|
|
Loading…
Reference in New Issue