refactor(ds): zone -> shard

Also bump erlang-rocksdb version
This commit is contained in:
ieQu1 2023-05-14 23:22:39 +02:00
parent 8ac0bba958
commit 04adb65c09
7 changed files with 177 additions and 174 deletions

View File

@ -18,7 +18,7 @@
%% API:
-export([]).
-export_type([topic/0, time/0]).
-export_type([topic/0, time/0, shard/0]).
-export_type([replay_id/0, replay/0]).
%%================================================================================
@ -28,6 +28,8 @@
%% parsed
-type topic() :: list(binary()).
-type shard() :: binary().
%% Timestamp
%% Earliest possible timestamp is 0.
%% TODO granularity?

View File

@ -18,9 +18,9 @@
%% TODO: make a proper HOCON schema and all...
%% API:
-export([zone_config/1, db_options/0]).
-export([shard_config/1, db_options/0]).
-export([zone_iteration_options/1]).
-export([shard_iteration_options/1]).
-export([default_iteration_options/0]).
-type backend_config() ::
@ -35,17 +35,16 @@
-define(APP, emqx_replay).
-type zone() :: emqx_types:zone().
-spec shard_config(emqx_replay:shard()) -> backend_config().
shard_config(Shard) ->
DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()),
Shards = application:get_env(?APP, shard_config, #{}),
maps:get(Shard, Shards, DefaultShardConfig).
-spec zone_config(zone()) -> backend_config().
zone_config(Zone) ->
DefaultZoneConfig = application:get_env(?APP, default_zone_config, default_zone_config()),
Zones = application:get_env(?APP, zone_config, #{}),
maps:get(Zone, Zones, DefaultZoneConfig).
-spec zone_iteration_options(zone()) -> emqx_replay_message_storage:iteration_options().
zone_iteration_options(Zone) ->
case zone_config(Zone) of
-spec shard_iteration_options(emqx_replay:shard()) ->
emqx_replay_message_storage:iteration_options().
shard_iteration_options(Shard) ->
case shard_config(Shard) of
{emqx_replay_message_storage, Config} ->
maps:get(iteration, Config, default_iteration_options());
{_Module, _} ->
@ -54,11 +53,11 @@ zone_iteration_options(Zone) ->
-spec default_iteration_options() -> emqx_replay_message_storage:iteration_options().
default_iteration_options() ->
{emqx_replay_message_storage, Config} = default_zone_config(),
{emqx_replay_message_storage, Config} = default_shard_config(),
maps:get(iteration, Config).
-spec default_zone_config() -> backend_config().
default_zone_config() ->
-spec default_shard_config() -> backend_config().
default_shard_config() ->
{emqx_replay_message_storage, #{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 8, 32, 16],

View File

@ -30,7 +30,7 @@
%% behavior callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export_type([cf_refs/0, gen_id/0, db_write_options/0]).
-export_type([cf_refs/0, gen_id/0, db_write_options/0, state/0, iterator/0]).
-compile({inline, [meta_lookup/2]}).
@ -39,7 +39,7 @@
%%================================================================================
%% see rocksdb:db_options()
-type options() :: proplists:proplist().
% -type options() :: proplists:proplist().
-type db_write_options() :: proplists:proplist().
@ -59,14 +59,14 @@
}.
-record(s, {
zone :: emqx_types:zone(),
shard :: emqx_replay:shard(),
db :: rocksdb:db_handle(),
cf_iterator :: rocksdb:cf_handle(),
cf_generations :: cf_refs()
}).
-record(it, {
zone :: emqx_types:zone(),
shard :: emqx_replay:shard(),
gen :: gen_id(),
replay :: emqx_replay:replay(),
module :: module(),
@ -94,33 +94,35 @@
%% 3. `inplace_update_support`?
-define(ITERATOR_CF_OPTS, []).
-define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}).
-define(REF(Shard), {via, gproc, {n, l, {?MODULE, Shard}}}).
%%================================================================================
%% API funcions
%%================================================================================
-spec start_link(emqx_types:zone()) -> {ok, pid()}.
start_link(Zone) ->
gen_server:start_link(?REF(Zone), ?MODULE, [Zone], []).
-spec start_link(emqx_replay:shard()) -> {ok, pid()}.
start_link(Shard) ->
gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []).
-spec create_generation(emqx_types:zone(), emqx_replay:time(), emqx_replay_conf:backend_config()) ->
-spec create_generation(emqx_replay:shard(), emqx_replay:time(), emqx_replay_conf:backend_config()) ->
{ok, gen_id()} | {error, nonmonotonic}.
create_generation(Zone, Since, Config = {_Module, _Options}) ->
gen_server:call(?REF(Zone), {create_generation, Since, Config}).
create_generation(Shard, Since, Config = {_Module, _Options}) ->
gen_server:call(?REF(Shard), {create_generation, Since, Config}).
-spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) ->
-spec store(
emqx_replay:shard(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()
) ->
ok | {error, _TODO}.
store(Zone, GUID, Time, Topic, Msg) ->
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time),
store(Shard, GUID, Time, Topic, Msg) ->
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
Mod:store(Data, GUID, Time, Topic, Msg).
-spec make_iterator(emqx_types:zone(), emqx_replay:replay()) ->
-spec make_iterator(emqx_replay:shard(), emqx_replay:replay()) ->
{ok, iterator()} | {error, _TODO}.
make_iterator(Zone, Replay = {_, StartTime}) ->
{GenId, Gen} = meta_lookup_gen(Zone, StartTime),
make_iterator(Shard, Replay = {_, StartTime}) ->
{GenId, Gen} = meta_lookup_gen(Shard, StartTime),
open_iterator(Gen, #it{
zone = Zone,
shard = Shard,
gen = GenId,
replay = Replay
}).
@ -148,30 +150,30 @@ next(It = #it{module = Mod, data = ItData}) ->
preserve_iterator(It = #it{}, ReplayID) ->
iterator_put_state(ReplayID, It).
-spec restore_iterator(emqx_types:zone(), emqx_replay:replay_id()) ->
-spec restore_iterator(emqx_replay:shard(), emqx_replay:replay_id()) ->
{ok, iterator()} | {error, _TODO}.
restore_iterator(Zone, ReplayID) ->
case iterator_get_state(Zone, ReplayID) of
restore_iterator(Shard, ReplayID) ->
case iterator_get_state(Shard, ReplayID) of
{ok, Serial} ->
restore_iterator_state(Zone, Serial);
restore_iterator_state(Shard, Serial);
not_found ->
{error, not_found};
{error, _Reason} = Error ->
Error
end.
-spec discard_iterator(emqx_types:zone(), emqx_replay:replay_id()) ->
-spec discard_iterator(emqx_replay:shard(), emqx_replay:replay_id()) ->
ok | {error, _TODO}.
discard_iterator(Zone, ReplayID) ->
iterator_delete(Zone, ReplayID).
discard_iterator(Shard, ReplayID) ->
iterator_delete(Shard, ReplayID).
%%================================================================================
%% behavior callbacks
%%================================================================================
init([Zone]) ->
init([Shard]) ->
process_flag(trap_exit, true),
{ok, S0} = open_db(Zone),
{ok, S0} = open_db(Shard),
S = ensure_current_generation(S0),
ok = populate_metadata(S),
{ok, S}.
@ -192,8 +194,8 @@ handle_cast(_Cast, S) ->
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, #s{db = DB, zone = Zone}) ->
meta_erase(Zone),
terminate(_Reason, #s{db = DB, shard = Shard}) ->
meta_erase(Shard),
ok = rocksdb:close(DB).
%%================================================================================
@ -203,21 +205,21 @@ terminate(_Reason, #s{db = DB, zone = Zone}) ->
-record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}).
-spec populate_metadata(state()) -> ok.
populate_metadata(S = #s{zone = Zone, db = DBHandle, cf_iterator = CFIterator}) ->
ok = meta_put(Zone, db, #db{handle = DBHandle, cf_iterator = CFIterator}),
populate_metadata(S = #s{shard = Shard, db = DBHandle, cf_iterator = CFIterator}) ->
ok = meta_put(Shard, db, #db{handle = DBHandle, cf_iterator = CFIterator}),
Current = schema_get_current(DBHandle),
lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)).
-spec populate_metadata(gen_id(), state()) -> ok.
populate_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) ->
populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) ->
Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S),
meta_register_gen(Zone, GenId, Gen).
meta_register_gen(Shard, GenId, Gen).
-spec ensure_current_generation(state()) -> state().
ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) ->
ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) ->
case schema_get_current(DBHandle) of
undefined ->
Config = emqx_replay_conf:zone_config(Zone),
Config = emqx_replay_conf:shard_config(Shard),
{ok, _, NS} = create_new_gen(0, Config, S),
NS;
_GenId ->
@ -226,16 +228,16 @@ ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) ->
-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
{ok, gen_id(), state()} | {error, nonmonotonic}.
create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) ->
GenId = get_next_id(meta_get_current(Zone)),
create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) ->
GenId = get_next_id(meta_get_current(Shard)),
GenId = get_next_id(schema_get_current(DBHandle)),
case is_gen_valid(Zone, GenId, Since) of
case is_gen_valid(Shard, GenId, Since) of
ok ->
{ok, Gen, NS} = create_gen(GenId, Since, Config, S),
%% TODO: Transaction? Column family creation can't be transactional, anyway.
ok = schema_put_gen(DBHandle, GenId, Gen),
ok = schema_put_current(DBHandle, GenId),
ok = meta_register_gen(Zone, GenId, open_gen(GenId, Gen, NS)),
ok = meta_register_gen(Shard, GenId, open_gen(GenId, Gen, NS)),
{ok, GenId, NS};
{error, _} = Error ->
Error
@ -253,9 +255,9 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations
},
{ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
-spec open_db(emqx_types:zone()) -> {ok, state()} | {error, _TODO}.
open_db(Zone) ->
Filename = atom_to_list(Zone),
-spec open_db(emqx_replay:shard()) -> {ok, state()} | {error, _TODO}.
open_db(Shard) ->
Filename = binary_to_list(Shard),
DBOptions = [
{create_if_missing, true},
{create_missing_column_families, true}
@ -278,7 +280,7 @@ open_db(Zone) ->
{ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
{CFNames, _} = lists:unzip(ExistingCFs),
{ok, #s{
zone = Zone,
shard = Shard,
db = DBHandle,
cf_iterator = CFIterator,
cf_generations = lists:zip(CFNames, CFRefs)
@ -291,14 +293,14 @@ open_db(Zone) ->
open_gen(
GenId,
Gen = #{module := Mod, data := Data},
#s{zone = Zone, db = DBHandle, cf_generations = CFs}
#s{shard = Shard, db = DBHandle, cf_generations = CFs}
) ->
DB = Mod:open(Zone, DBHandle, GenId, CFs, Data),
DB = Mod:open(Shard, DBHandle, GenId, CFs, Data),
Gen#{data := DB}.
-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none.
open_next_iterator(It = #it{zone = Zone, gen = GenId}) ->
open_next_iterator(meta_get_gen(Zone, GenId + 1), It#it{gen = GenId + 1}).
open_next_iterator(It = #it{shard = Shard, gen = GenId}) ->
open_next_iterator(meta_get_gen(Shard, GenId + 1), It#it{gen = GenId + 1}).
open_next_iterator(undefined, _It) ->
none;
@ -331,17 +333,17 @@ open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay},
-define(ITERATION_WRITE_OPTS, []).
-define(ITERATION_READ_OPTS, []).
iterator_get_state(Zone, ReplayID) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db),
iterator_get_state(Shard, ReplayID) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS).
iterator_put_state(ID, It = #it{zone = Zone}) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db),
iterator_put_state(ID, It = #it{shard = Shard}) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
Serial = preserve_iterator_state(It),
rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS).
iterator_delete(Zone, ID) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db),
iterator_delete(Shard, ID) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS).
preserve_iterator_state(#it{
@ -358,10 +360,10 @@ preserve_iterator_state(#it{
st => Mod:preserve_iterator(ItData)
}).
restore_iterator_state(Zone, Serial) when is_binary(Serial) ->
restore_iterator_state(Zone, binary_to_term(Serial));
restore_iterator_state(Shard, Serial) when is_binary(Serial) ->
restore_iterator_state(Shard, binary_to_term(Serial));
restore_iterator_state(
Zone,
Shard,
#{
v := 1,
gen := Gen,
@ -370,8 +372,8 @@ restore_iterator_state(
st := State
}
) ->
It = #it{zone = Zone, gen = Gen, replay = {TopicFilter, StartTime}},
open_restore_iterator(meta_get_gen(Zone, Gen), It, State).
It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}},
open_restore_iterator(meta_get_gen(Shard, Gen), It, State).
%% Functions for dealing with the metadata stored persistently in rocksdb
@ -409,27 +411,27 @@ schema_gen_key(N) ->
-undef(SCHEMA_WRITE_OPTS).
-undef(SCHEMA_READ_OPTS).
%% Functions for dealing with the runtime zone metadata:
%% Functions for dealing with the runtime shard metadata:
-define(PERSISTENT_TERM(ZONE, GEN), {?MODULE, ZONE, GEN}).
-define(PERSISTENT_TERM(SHARD, GEN), {?MODULE, SHARD, GEN}).
-spec meta_register_gen(emqx_types:zone(), gen_id(), generation()) -> ok.
meta_register_gen(Zone, GenId, Gen) ->
-spec meta_register_gen(emqx_replay:shard(), gen_id(), generation()) -> ok.
meta_register_gen(Shard, GenId, Gen) ->
Gs =
case GenId > 0 of
true -> meta_lookup(Zone, GenId - 1);
true -> meta_lookup(Shard, GenId - 1);
false -> []
end,
ok = meta_put(Zone, GenId, [Gen | Gs]),
ok = meta_put(Zone, current, GenId).
ok = meta_put(Shard, GenId, [Gen | Gs]),
ok = meta_put(Shard, current, GenId).
-spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> {gen_id(), generation()}.
meta_lookup_gen(Zone, Time) ->
-spec meta_lookup_gen(emqx_replay:shard(), emqx_replay:time()) -> {gen_id(), generation()}.
meta_lookup_gen(Shard, Time) ->
% TODO
% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
% towards a "no".
Current = meta_lookup(Zone, current),
Gens = meta_lookup(Zone, Current),
Current = meta_lookup(Shard, current),
Gens = meta_lookup(Shard, Current),
find_gen(Time, Current, Gens).
find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
@ -437,34 +439,34 @@ find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
find_gen(Time, GenId, [_Gen | Rest]) ->
find_gen(Time, GenId - 1, Rest).
-spec meta_get_gen(emqx_types:zone(), gen_id()) -> generation() | undefined.
meta_get_gen(Zone, GenId) ->
case meta_lookup(Zone, GenId, []) of
-spec meta_get_gen(emqx_replay:shard(), gen_id()) -> generation() | undefined.
meta_get_gen(Shard, GenId) ->
case meta_lookup(Shard, GenId, []) of
[Gen | _Older] -> Gen;
[] -> undefined
end.
-spec meta_get_current(emqx_types:zone()) -> gen_id() | undefined.
meta_get_current(Zone) ->
meta_lookup(Zone, current, undefined).
-spec meta_get_current(emqx_replay:shard()) -> gen_id() | undefined.
meta_get_current(Shard) ->
meta_lookup(Shard, current, undefined).
-spec meta_lookup(emqx_types:zone(), _K) -> _V.
meta_lookup(Zone, K) ->
persistent_term:get(?PERSISTENT_TERM(Zone, K)).
-spec meta_lookup(emqx_replay:shard(), _K) -> _V.
meta_lookup(Shard, K) ->
persistent_term:get(?PERSISTENT_TERM(Shard, K)).
-spec meta_lookup(emqx_types:zone(), _K, Default) -> _V | Default.
meta_lookup(Zone, K, Default) ->
persistent_term:get(?PERSISTENT_TERM(Zone, K), Default).
-spec meta_lookup(emqx_replay:shard(), _K, Default) -> _V | Default.
meta_lookup(Shard, K, Default) ->
persistent_term:get(?PERSISTENT_TERM(Shard, K), Default).
-spec meta_put(emqx_types:zone(), _K, _V) -> ok.
meta_put(Zone, K, V) ->
persistent_term:put(?PERSISTENT_TERM(Zone, K), V).
-spec meta_put(emqx_replay:shard(), _K, _V) -> ok.
meta_put(Shard, K, V) ->
persistent_term:put(?PERSISTENT_TERM(Shard, K), V).
-spec meta_erase(emqx_types:zone()) -> ok.
meta_erase(Zone) ->
-spec meta_erase(emqx_replay:shard()) -> ok.
meta_erase(Shard) ->
[
persistent_term:erase(K)
|| {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Zone
|| {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Shard
],
ok.
@ -473,15 +475,15 @@ meta_erase(Zone) ->
get_next_id(undefined) -> 0;
get_next_id(GenId) -> GenId + 1.
is_gen_valid(Zone, GenId, Since) when GenId > 0 ->
[GenPrev | _] = meta_lookup(Zone, GenId - 1),
is_gen_valid(Shard, GenId, Since) when GenId > 0 ->
[GenPrev | _] = meta_lookup(Shard, GenId - 1),
case GenPrev of
#{since := SincePrev} when Since > SincePrev ->
ok;
#{} ->
{error, nonmonotonic}
end;
is_gen_valid(_Zone, 0, 0) ->
is_gen_valid(_Shard, 0, 0) ->
ok.
%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.

View File

@ -18,7 +18,7 @@
-behavior(supervisor).
%% API:
-export([start_link/0, start_zone/1, stop_zone/1]).
-export([start_link/0, start_shard/1, stop_shard/1]).
%% behavior callbacks:
-export([init/1]).
@ -37,14 +37,14 @@
start_link() ->
supervisor:start_link({local, ?SUP}, ?MODULE, []).
-spec start_zone(emqx_types:zone()) -> supervisor:startchild_ret().
start_zone(Zone) ->
supervisor:start_child(?SUP, zone_child_spec(Zone)).
-spec start_shard(emqx_replay:shard()) -> supervisor:startchild_ret().
start_shard(Shard) ->
supervisor:start_child(?SUP, shard_child_spec(Shard)).
-spec stop_zone(emqx_types:zone()) -> ok | {error, _}.
stop_zone(Zone) ->
ok = supervisor:terminate_child(?SUP, Zone),
ok = supervisor:delete_child(?SUP, Zone).
-spec stop_shard(emqx_replay:shard()) -> ok | {error, _}.
stop_shard(Shard) ->
ok = supervisor:terminate_child(?SUP, Shard),
ok = supervisor:delete_child(?SUP, Shard).
%%================================================================================
%% behavior callbacks
@ -63,11 +63,11 @@ init([]) ->
%% Internal functions
%%================================================================================
-spec zone_child_spec(emqx_types:zone()) -> supervisor:child_spec().
zone_child_spec(Zone) ->
-spec shard_child_spec(emqx_replay:shard()) -> supervisor:child_spec().
shard_child_spec(Shard) ->
#{
id => Zone,
start => {emqx_replay_local_store, start_link, [Zone]},
id => Shard,
start => {emqx_replay_local_store, start_link, [Shard]},
shutdown => 5_000,
restart => permanent,
type => worker

View File

@ -182,7 +182,7 @@
-opaque schema() :: #schema{}.
-record(db, {
zone :: emqx_types:zone(),
shard :: emqx_replay:shard(),
handle :: rocksdb:db_handle(),
cf :: rocksdb:cf_handle(),
keymapper :: keymapper(),
@ -244,17 +244,17 @@ create_new(DBHandle, GenId, Options) ->
%% Reopen the database
-spec open(
emqx_types:zone(),
emqx_replay:shard(),
rocksdb:db_handle(),
emqx_replay_local_store:gen_id(),
emqx_replay_local_store:cf_refs(),
schema()
) ->
db().
open(Zone, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
{value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs),
#db{
zone = Zone,
shard = Shard,
handle = DBHandle,
cf = CFHandle,
keymapper = Keymapper
@ -292,7 +292,7 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
-spec make_iterator(db(), emqx_replay:replay()) ->
{ok, iterator()} | {error, _TODO}.
make_iterator(DB, Replay) ->
Options = emqx_replay_conf:zone_iteration_options(DB#db.zone),
Options = emqx_replay_conf:shard_iteration_options(DB#db.shard),
make_iterator(DB, Replay, Options).
-spec make_iterator(db(), emqx_replay:replay(), iteration_options()) ->

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -42,7 +42,7 @@ start_link() ->
%%================================================================================
init([]) ->
Children = [zone_sup()],
Children = [shard_sup()],
SupFlags = #{
strategy => one_for_all,
intensity => 0,
@ -54,9 +54,9 @@ init([]) ->
%% Internal functions
%%================================================================================
zone_sup() ->
shard_sup() ->
#{
id => local_store_zone_sup,
id => local_store_shard_sup,
start => {emqx_replay_local_store_sup, start_link, []},
restart => permanent,
type => supervisor,

View File

@ -21,7 +21,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(ZONE, zone(?FUNCTION_NAME)).
-define(SHARD, shard(?FUNCTION_NAME)).
-define(DEFAULT_CONFIG,
{emqx_replay_message_storage, #{
@ -44,8 +44,8 @@
%% Smoke test for opening and reopening the database
t_open(_Config) ->
ok = emqx_replay_local_store_sup:stop_zone(?ZONE),
{ok, _} = emqx_replay_local_store_sup:start_zone(?ZONE).
ok = emqx_replay_local_store_sup:stop_shard(?SHARD),
{ok, _} = emqx_replay_local_store_sup:start_shard(?SHARD).
%% Smoke test of store function
t_store(_Config) ->
@ -53,7 +53,7 @@ t_store(_Config) ->
PublishedAt = 1000,
Topic = [<<"foo">>, <<"bar">>],
Payload = <<"message">>,
?assertMatch(ok, emqx_replay_local_store:store(?ZONE, MessageID, PublishedAt, Topic, Payload)).
?assertMatch(ok, emqx_replay_local_store:store(?SHARD, MessageID, PublishedAt, Topic, Payload)).
%% Smoke test for iteration through a concrete topic
t_iterate(_Config) ->
@ -62,7 +62,7 @@ t_iterate(_Config) ->
Timestamps = lists:seq(1, 10),
[
emqx_replay_local_store:store(
?ZONE,
?SHARD,
emqx_guid:gen(),
PublishedAt,
Topic,
@ -73,7 +73,7 @@ t_iterate(_Config) ->
%% Iterate through individual topics:
[
begin
{ok, It} = emqx_replay_local_store:make_iterator(?ZONE, {Topic, 0}),
{ok, It} = emqx_replay_local_store:make_iterator(?SHARD, {Topic, 0}),
Values = iterate(It),
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
end
@ -87,50 +87,50 @@ t_iterate_wildcard(_Config) ->
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
Timestamps = lists:seq(1, 10),
_ = [
store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| Topic <- Topics, PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 10 + 1)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 10 + 1)])
),
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "#", 5)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 5)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
),
?assertEqual(
lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/+", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/+/bar", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+/bar", 0)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "+/bar/#", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "+/bar/#", 0)])
),
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/+/+", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/+/+", 0)])
),
ok.
@ -139,40 +139,40 @@ t_iterate_long_tail_wildcard(_Config) ->
TopicFilter = "b/c/d/e/+/+",
Timestamps = lists:seq(1, 100),
_ = [
store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, TopicFilter, 50)])
).
t_create_gen(_Config) ->
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG),
{ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
?assertEqual(
{error, nonmonotonic},
emqx_replay_local_store:create_generation(?ZONE, 1, ?DEFAULT_CONFIG)
emqx_replay_local_store:create_generation(?SHARD, 1, ?DEFAULT_CONFIG)
),
?assertEqual(
{error, nonmonotonic},
emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG)
emqx_replay_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG)
),
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz"],
Timestamps = lists:seq(1, 100),
[
?assertEqual(ok, store(?ZONE, PublishedAt, Topic, <<>>))
?assertEqual(ok, store(?SHARD, PublishedAt, Topic, <<>>))
|| Topic <- Topics, PublishedAt <- Timestamps
].
t_iterate_multigen(_Config) ->
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 1000, ?DEFAULT_CONFIG),
{ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_replay_local_store:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
Timestamps = lists:seq(1, 100),
_ = [
store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| Topic <- Topics, PublishedAt <- Timestamps
],
?assertEqual(
@ -180,38 +180,38 @@ t_iterate_multigen(_Config) ->
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)])
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)])
).
t_iterate_multigen_preserve_restore(_Config) ->
ReplayID = atom_to_binary(?FUNCTION_NAME),
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 100, ?DEFAULT_CONFIG),
{ok, 1} = emqx_replay_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_replay_local_store:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
Timestamps = lists:seq(1, 100),
TopicFilter = "foo/#",
TopicsMatching = ["foo/bar", "foo/bar/baz"],
_ = [
store(?ZONE, TS, Topic, term_to_binary({Topic, TS}))
store(?SHARD, TS, Topic, term_to_binary({Topic, TS}))
|| Topic <- Topics, TS <- Timestamps
],
It0 = iterator(?ZONE, TopicFilter, 0),
It0 = iterator(?SHARD, TopicFilter, 0),
{It1, Res10} = iterate(It0, 10),
% preserve mid-generation
ok = emqx_replay_local_store:preserve_iterator(It1, ReplayID),
{ok, It2} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID),
{ok, It2} = emqx_replay_local_store:restore_iterator(?SHARD, ReplayID),
{It3, Res100} = iterate(It2, 88),
% preserve on the generation boundary
ok = emqx_replay_local_store:preserve_iterator(It3, ReplayID),
{ok, It4} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID),
{ok, It4} = emqx_replay_local_store:restore_iterator(?SHARD, ReplayID),
{It5, Res200} = iterate(It4, 1000),
?assertEqual(none, It5),
?assertEqual(
@ -220,16 +220,16 @@ t_iterate_multigen_preserve_restore(_Config) ->
),
?assertEqual(
ok,
emqx_replay_local_store:discard_iterator(?ZONE, ReplayID)
emqx_replay_local_store:discard_iterator(?SHARD, ReplayID)
),
?assertEqual(
{error, not_found},
emqx_replay_local_store:restore_iterator(?ZONE, ReplayID)
emqx_replay_local_store:restore_iterator(?SHARD, ReplayID)
).
store(Zone, PublishedAt, Topic, Payload) ->
store(Shard, PublishedAt, Topic, Payload) ->
ID = emqx_guid:gen(),
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
emqx_replay_local_store:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload).
iterate(DB, TopicFilter, StartTime) ->
iterate(iterator(DB, TopicFilter, StartTime)).
@ -274,15 +274,15 @@ end_per_suite(_Config) ->
ok = application:stop(emqx_replay).
init_per_testcase(TC, Config) ->
ok = set_zone_config(zone(TC), ?DEFAULT_CONFIG),
{ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
{ok, _} = emqx_replay_local_store_sup:start_shard(shard(TC)),
Config.
end_per_testcase(TC, _Config) ->
ok = emqx_replay_local_store_sup:stop_zone(zone(TC)).
ok = emqx_replay_local_store_sup:stop_shard(shard(TC)).
zone(TC) ->
list_to_atom(lists:concat([?MODULE, "_", TC])).
shard(TC) ->
list_to_binary(lists:concat([?MODULE, "_", TC])).
set_zone_config(Zone, Config) ->
ok = application:set_env(emqx_replay, zone_config, #{Zone => Config}).
set_shard_config(Shard, Config) ->
ok = application:set_env(emqx_replay, shard_config, #{Shard => Config}).