refactor: address review comments
This commit is contained in:
parent
b30bcf32bd
commit
a511088fd4
|
@ -46,7 +46,7 @@
|
|||
|
||||
%% FIXME
|
||||
-define(DS_SHARD, <<"local">>).
|
||||
-define(DEFAULT_KEYSPACE, <<"#">>).
|
||||
-define(DEFAULT_KEYSPACE, default).
|
||||
|
||||
-define(WHEN_ENABLED(DO),
|
||||
case is_store_enabled() of
|
||||
|
@ -60,10 +60,16 @@
|
|||
init() ->
|
||||
?WHEN_ENABLED(begin
|
||||
ok = emqx_ds:ensure_shard(
|
||||
?DS_SHARD,
|
||||
?DEFAULT_KEYSPACE,
|
||||
?DS_SHARD,
|
||||
#{
|
||||
dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD])
|
||||
dir => filename:join([
|
||||
emqx:data_dir(),
|
||||
ds,
|
||||
messages,
|
||||
atom_to_binary(?DEFAULT_KEYSPACE),
|
||||
?DS_SHARD
|
||||
])
|
||||
}
|
||||
),
|
||||
ok = emqx_persistent_session_ds_router:init_tables(),
|
||||
|
|
|
@ -78,7 +78,7 @@
|
|||
%% Parsed topic:
|
||||
-type topic() :: list(binary()).
|
||||
|
||||
-type keyspace() :: binary().
|
||||
-type keyspace() :: atom().
|
||||
-type shard() :: binary().
|
||||
|
||||
%% Timestamp
|
||||
|
@ -98,10 +98,10 @@
|
|||
%% API funcions
|
||||
%%================================================================================
|
||||
|
||||
-spec ensure_shard(shard(), keyspace(), emqx_ds_storage_layer:options()) ->
|
||||
-spec ensure_shard(keyspace(), shard(), emqx_ds_storage_layer:options()) ->
|
||||
ok | {error, _Reason}.
|
||||
ensure_shard(Shard, Keyspace, Options) ->
|
||||
case emqx_ds_storage_layer_sup:start_shard(Shard, Keyspace, Options) of
|
||||
ensure_shard(Keyspace, Shard, Options) ->
|
||||
case emqx_ds_storage_layer_sup:start_shard(Keyspace, Shard, Options) of
|
||||
{ok, _Pid} ->
|
||||
ok;
|
||||
{error, {already_started, _Pid}} ->
|
||||
|
|
|
@ -173,8 +173,8 @@
|
|||
-opaque schema() :: #schema{}.
|
||||
|
||||
-record(db, {
|
||||
shard :: emqx_ds:shard(),
|
||||
keyspace :: emqx_ds:keyspace(),
|
||||
shard :: emqx_ds:shard(),
|
||||
handle :: rocksdb:db_handle(),
|
||||
cf :: rocksdb:cf_handle(),
|
||||
keymapper :: keymapper(),
|
||||
|
@ -236,19 +236,19 @@ create_new(DBHandle, GenId, Options) ->
|
|||
|
||||
%% Reopen the database
|
||||
-spec open(
|
||||
emqx_ds:shard(),
|
||||
emqx_ds:keyspace(),
|
||||
emqx_ds:shard(),
|
||||
rocksdb:db_handle(),
|
||||
emqx_ds_storage_layer:gen_id(),
|
||||
emqx_ds_storage_layer:cf_refs(),
|
||||
schema()
|
||||
) ->
|
||||
db().
|
||||
open(Shard, Keyspace, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
|
||||
open(Keyspace, Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
|
||||
{value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs),
|
||||
#db{
|
||||
shard = Shard,
|
||||
keyspace = Keyspace,
|
||||
shard = Shard,
|
||||
handle = DBHandle,
|
||||
cf = CFHandle,
|
||||
keymapper = Keymapper
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
|
||||
%% API:
|
||||
-export([start_link/3]).
|
||||
-export([create_generation/3]).
|
||||
-export([create_generation/4]).
|
||||
|
||||
-export([store/5]).
|
||||
-export([delete/4]).
|
||||
|
@ -99,7 +99,7 @@
|
|||
%% 3. `inplace_update_support`?
|
||||
-define(ITERATOR_CF_OPTS, []).
|
||||
|
||||
-define(REF(Shard), {via, gproc, {n, l, {?MODULE, Shard}}}).
|
||||
-define(REF(Keyspace, Shard), {via, gproc, {n, l, {?MODULE, Keyspace, Shard}}}).
|
||||
|
||||
%%================================================================================
|
||||
%% Callbacks
|
||||
|
@ -109,8 +109,8 @@
|
|||
{_Schema, cf_refs()}.
|
||||
|
||||
-callback open(
|
||||
emqx_ds:shard(),
|
||||
emqx_ds:keyspace(),
|
||||
emqx_ds:shard(),
|
||||
rocksdb:db_handle(),
|
||||
gen_id(),
|
||||
cf_refs(),
|
||||
|
@ -143,15 +143,17 @@
|
|||
%% API funcions
|
||||
%%================================================================================
|
||||
|
||||
-spec start_link(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) ->
|
||||
-spec start_link(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
|
||||
{ok, pid()}.
|
||||
start_link(Shard, Keyspace, Options) ->
|
||||
gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Keyspace, Options}, []).
|
||||
start_link(Keyspace, Shard, Options) ->
|
||||
gen_server:start_link(?REF(Keyspace, Shard), ?MODULE, {Keyspace, Shard, Options}, []).
|
||||
|
||||
-spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) ->
|
||||
-spec create_generation(
|
||||
emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()
|
||||
) ->
|
||||
{ok, gen_id()} | {error, nonmonotonic}.
|
||||
create_generation(Shard, Since, Config = {_Module, _Options}) ->
|
||||
gen_server:call(?REF(Shard), {create_generation, Since, Config}).
|
||||
create_generation(Keyspace, Shard, Since, Config = {_Module, _Options}) ->
|
||||
gen_server:call(?REF(Keyspace, Shard), {create_generation, Since, Config}).
|
||||
|
||||
-spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
|
||||
ok | {error, _}.
|
||||
|
@ -258,9 +260,9 @@ foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) ->
|
|||
%% behaviour callbacks
|
||||
%%================================================================================
|
||||
|
||||
init({Shard, Keyspace, Options}) ->
|
||||
init({Keyspace, Shard, Options}) ->
|
||||
process_flag(trap_exit, true),
|
||||
{ok, S0} = open_db(Shard, Keyspace, Options),
|
||||
{ok, S0} = open_db(Keyspace, Shard, Options),
|
||||
S = ensure_current_generation(S0),
|
||||
ok = populate_metadata(S),
|
||||
{ok, S}.
|
||||
|
@ -342,9 +344,10 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations
|
|||
},
|
||||
{ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
|
||||
|
||||
-spec open_db(emqx_ds:shard(), emqx_ds:keyspace(), options()) -> {ok, state()} | {error, _TODO}.
|
||||
open_db(Shard, Keyspace, Options) ->
|
||||
DBDir = unicode:characters_to_list(maps:get(dir, Options, Shard)),
|
||||
-spec open_db(emqx_ds:keyspace(), emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}.
|
||||
open_db(Keyspace, Shard, Options) ->
|
||||
DefaultDir = filename:join([atom_to_binary(Keyspace), Shard]),
|
||||
DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)),
|
||||
DBOptions = [
|
||||
{create_if_missing, true},
|
||||
{create_missing_column_families, true}
|
||||
|
@ -382,9 +385,9 @@ open_db(Shard, Keyspace, Options) ->
|
|||
open_gen(
|
||||
GenId,
|
||||
Gen = #{module := Mod, data := Data},
|
||||
#s{shard = Shard, keyspace = Keyspace, db = DBHandle, cf_generations = CFs}
|
||||
#s{keyspace = Keyspace, shard = Shard, db = DBHandle, cf_generations = CFs}
|
||||
) ->
|
||||
DB = Mod:open(Shard, Keyspace, DBHandle, GenId, CFs, Data),
|
||||
DB = Mod:open(Keyspace, Shard, DBHandle, GenId, CFs, Data),
|
||||
Gen#{data := DB}.
|
||||
|
||||
-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none.
|
||||
|
|
|
@ -25,10 +25,10 @@
|
|||
start_link() ->
|
||||
supervisor:start_link({local, ?SUP}, ?MODULE, []).
|
||||
|
||||
-spec start_shard(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) ->
|
||||
-spec start_shard(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
|
||||
supervisor:startchild_ret().
|
||||
start_shard(Shard, Keyspace, Options) ->
|
||||
supervisor:start_child(?SUP, shard_child_spec(Shard, Keyspace, Options)).
|
||||
start_shard(Keyspace, Shard, Options) ->
|
||||
supervisor:start_child(?SUP, shard_child_spec(Keyspace, Shard, Options)).
|
||||
|
||||
-spec stop_shard(emqx_ds:shard()) -> ok | {error, _}.
|
||||
stop_shard(Shard) ->
|
||||
|
@ -52,12 +52,12 @@ init([]) ->
|
|||
%% Internal functions
|
||||
%%================================================================================
|
||||
|
||||
-spec shard_child_spec(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) ->
|
||||
-spec shard_child_spec(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
|
||||
supervisor:child_spec().
|
||||
shard_child_spec(Shard, Keyspace, Options) ->
|
||||
shard_child_spec(Keyspace, Shard, Options) ->
|
||||
#{
|
||||
id => Shard,
|
||||
start => {emqx_ds_storage_layer, start_link, [Shard, Keyspace, Options]},
|
||||
start => {emqx_ds_storage_layer, start_link, [Keyspace, Shard, Options]},
|
||||
shutdown => 5_000,
|
||||
restart => permanent,
|
||||
type => worker
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
%% Smoke test for opening and reopening the database
|
||||
t_open(_Config) ->
|
||||
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
|
||||
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, ?KEYSPACE, #{}).
|
||||
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?KEYSPACE, ?SHARD, #{}).
|
||||
|
||||
%% Smoke test of store function
|
||||
t_store(_Config) ->
|
||||
|
@ -137,16 +137,16 @@ t_iterate_long_tail_wildcard(_Config) ->
|
|||
).
|
||||
|
||||
t_create_gen(_Config) ->
|
||||
{ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
|
||||
{ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 5, ?DEFAULT_CONFIG),
|
||||
?assertEqual(
|
||||
{error, nonmonotonic},
|
||||
emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG)
|
||||
emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 1, ?DEFAULT_CONFIG)
|
||||
),
|
||||
?assertEqual(
|
||||
{error, nonmonotonic},
|
||||
emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG)
|
||||
emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 5, ?DEFAULT_CONFIG)
|
||||
),
|
||||
{ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||
{ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG),
|
||||
Topics = ["foo/bar", "foo/bar/baz"],
|
||||
Timestamps = lists:seq(1, 100),
|
||||
[
|
||||
|
@ -155,9 +155,9 @@ t_create_gen(_Config) ->
|
|||
].
|
||||
|
||||
t_iterate_multigen(_Config) ->
|
||||
{ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||
{ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
|
||||
{ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
|
||||
{ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG),
|
||||
{ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 50, ?DEFAULT_CONFIG),
|
||||
{ok, 3} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 1000, ?DEFAULT_CONFIG),
|
||||
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
|
||||
Timestamps = lists:seq(1, 100),
|
||||
_ = [
|
||||
|
@ -181,9 +181,9 @@ t_iterate_multigen(_Config) ->
|
|||
|
||||
t_iterate_multigen_preserve_restore(_Config) ->
|
||||
ReplayID = atom_to_binary(?FUNCTION_NAME),
|
||||
{ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||
{ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
|
||||
{ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
|
||||
{ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG),
|
||||
{ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 50, ?DEFAULT_CONFIG),
|
||||
{ok, 3} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 100, ?DEFAULT_CONFIG),
|
||||
Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
|
||||
Timestamps = lists:seq(1, 100),
|
||||
TopicFilter = "foo/#",
|
||||
|
@ -264,17 +264,17 @@ end_per_suite(_Config) ->
|
|||
|
||||
init_per_testcase(TC, Config) ->
|
||||
ok = set_keyspace_config(keyspace(TC), ?DEFAULT_CONFIG),
|
||||
{ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), keyspace(TC), #{}),
|
||||
{ok, _} = emqx_ds_storage_layer_sup:start_shard(keyspace(TC), shard(TC), #{}),
|
||||
Config.
|
||||
|
||||
end_per_testcase(TC, _Config) ->
|
||||
ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
|
||||
|
||||
keyspace(TC) ->
|
||||
list_to_binary(lists:concat([?MODULE, "_", TC])).
|
||||
list_to_atom(lists:concat([?MODULE, "_", TC])).
|
||||
|
||||
shard(TC) ->
|
||||
<<(keyspace(TC))/binary, "_shard">>.
|
||||
<<(atom_to_binary(keyspace(TC)))/binary, "_shard">>.
|
||||
|
||||
set_keyspace_config(Keyspace, Config) ->
|
||||
ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}).
|
||||
|
|
|
@ -10,8 +10,8 @@
|
|||
-define(WORK_DIR, ["_build", "test"]).
|
||||
-define(RUN_ID, {?MODULE, testrun_id}).
|
||||
|
||||
-define(KEYSPACE, atom_to_binary(?MODULE)).
|
||||
-define(SHARD, <<(?KEYSPACE)/binary, "_shard">>).
|
||||
-define(KEYSPACE, ?MODULE).
|
||||
-define(SHARD, <<(atom_to_binary(?KEYSPACE))/binary, "_shard">>).
|
||||
-define(GEN_ID, 42).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue