refactor(ds): emqx_ds_local_store -> emqx_ds_storage_layer
This commit is contained in:
parent
1159f99432
commit
a4219db163
|
@ -55,6 +55,6 @@ default_shard_config() ->
|
||||||
}
|
}
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
-spec db_options() -> emqx_ds_local_store:db_options().
|
-spec db_options() -> emqx_ds_storage_layer:db_options().
|
||||||
db_options() ->
|
db_options() ->
|
||||||
application:get_env(?APP, db_options, []).
|
application:get_env(?APP, db_options, []).
|
||||||
|
|
|
@ -152,7 +152,7 @@
|
||||||
|
|
||||||
iteration => iteration_options(),
|
iteration => iteration_options(),
|
||||||
|
|
||||||
cf_options => emqx_ds_local_store:db_cf_options()
|
cf_options => emqx_ds_storage_layer:db_cf_options()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type iteration_options() :: #{
|
-type iteration_options() :: #{
|
||||||
|
@ -174,8 +174,8 @@
|
||||||
handle :: rocksdb:db_handle(),
|
handle :: rocksdb:db_handle(),
|
||||||
cf :: rocksdb:cf_handle(),
|
cf :: rocksdb:cf_handle(),
|
||||||
keymapper :: keymapper(),
|
keymapper :: keymapper(),
|
||||||
write_options = [{sync, true}] :: emqx_ds_local_store:db_write_options(),
|
write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(),
|
||||||
read_options = [] :: emqx_ds_local_store:db_write_options()
|
read_options = [] :: emqx_ds_storage_layer:db_write_options()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-record(it, {
|
-record(it, {
|
||||||
|
@ -221,8 +221,8 @@
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
%% Create a new column family for the generation and a serializable representation of the schema
|
%% Create a new column family for the generation and a serializable representation of the schema
|
||||||
-spec create_new(rocksdb:db_handle(), emqx_ds_local_store:gen_id(), options()) ->
|
-spec create_new(rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), options()) ->
|
||||||
{schema(), emqx_ds_local_store:cf_refs()}.
|
{schema(), emqx_ds_storage_layer:cf_refs()}.
|
||||||
create_new(DBHandle, GenId, Options) ->
|
create_new(DBHandle, GenId, Options) ->
|
||||||
CFName = data_cf(GenId),
|
CFName = data_cf(GenId),
|
||||||
CFOptions = maps:get(cf_options, Options, []),
|
CFOptions = maps:get(cf_options, Options, []),
|
||||||
|
@ -234,8 +234,8 @@ create_new(DBHandle, GenId, Options) ->
|
||||||
-spec open(
|
-spec open(
|
||||||
emqx_ds:shard(),
|
emqx_ds:shard(),
|
||||||
rocksdb:db_handle(),
|
rocksdb:db_handle(),
|
||||||
emqx_ds_local_store:gen_id(),
|
emqx_ds_storage_layer:gen_id(),
|
||||||
emqx_ds_local_store:cf_refs(),
|
emqx_ds_storage_layer:cf_refs(),
|
||||||
schema()
|
schema()
|
||||||
) ->
|
) ->
|
||||||
db().
|
db().
|
||||||
|
@ -710,7 +710,7 @@ substring(I, Offset, Size) ->
|
||||||
(I bsr Offset) band ones(Size).
|
(I bsr Offset) band ones(Size).
|
||||||
|
|
||||||
%% @doc Generate a column family ID for the MQTT messages
|
%% @doc Generate a column family ID for the MQTT messages
|
||||||
-spec data_cf(emqx_ds_local_store:gen_id()) -> [char()].
|
-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
|
||||||
data_cf(GenId) ->
|
data_cf(GenId) ->
|
||||||
?MODULE_STRING ++ integer_to_list(GenId).
|
?MODULE_STRING ++ integer_to_list(GenId).
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_ds_local_store).
|
-module(emqx_ds_storage_layer).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_ds_local_store_sup).
|
-module(emqx_ds_storage_layer_sup).
|
||||||
|
|
||||||
-behaviour(supervisor).
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
@ -55,7 +55,7 @@ init([]) ->
|
||||||
shard_child_spec(Shard) ->
|
shard_child_spec(Shard) ->
|
||||||
#{
|
#{
|
||||||
id => Shard,
|
id => Shard,
|
||||||
start => {emqx_ds_local_store, start_link, [Shard]},
|
start => {emqx_ds_storage_layer, start_link, [Shard]},
|
||||||
shutdown => 5_000,
|
shutdown => 5_000,
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
type => worker
|
type => worker
|
|
@ -45,7 +45,7 @@ init([]) ->
|
||||||
shard_sup() ->
|
shard_sup() ->
|
||||||
#{
|
#{
|
||||||
id => local_store_shard_sup,
|
id => local_store_shard_sup,
|
||||||
start => {emqx_ds_local_store_sup, start_link, []},
|
start => {emqx_ds_storage_layer_sup, start_link, []},
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
type => supervisor,
|
type => supervisor,
|
||||||
shutdown => infinity
|
shutdown => infinity
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_ds_local_store_SUITE).
|
-module(emqx_ds_storage_layer_SUITE).
|
||||||
|
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
@ -32,8 +32,8 @@
|
||||||
|
|
||||||
%% Smoke test for opening and reopening the database
|
%% Smoke test for opening and reopening the database
|
||||||
t_open(_Config) ->
|
t_open(_Config) ->
|
||||||
ok = emqx_ds_local_store_sup:stop_shard(?SHARD),
|
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
|
||||||
{ok, _} = emqx_ds_local_store_sup:start_shard(?SHARD).
|
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD).
|
||||||
|
|
||||||
%% Smoke test of store function
|
%% Smoke test of store function
|
||||||
t_store(_Config) ->
|
t_store(_Config) ->
|
||||||
|
@ -41,7 +41,7 @@ t_store(_Config) ->
|
||||||
PublishedAt = 1000,
|
PublishedAt = 1000,
|
||||||
Topic = [<<"foo">>, <<"bar">>],
|
Topic = [<<"foo">>, <<"bar">>],
|
||||||
Payload = <<"message">>,
|
Payload = <<"message">>,
|
||||||
?assertMatch(ok, emqx_ds_local_store:store(?SHARD, MessageID, PublishedAt, Topic, Payload)).
|
?assertMatch(ok, emqx_ds_storage_layer:store(?SHARD, MessageID, PublishedAt, Topic, Payload)).
|
||||||
|
|
||||||
%% Smoke test for iteration through a concrete topic
|
%% Smoke test for iteration through a concrete topic
|
||||||
t_iterate(_Config) ->
|
t_iterate(_Config) ->
|
||||||
|
@ -49,7 +49,7 @@ t_iterate(_Config) ->
|
||||||
Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]],
|
Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]],
|
||||||
Timestamps = lists:seq(1, 10),
|
Timestamps = lists:seq(1, 10),
|
||||||
[
|
[
|
||||||
emqx_ds_local_store:store(
|
emqx_ds_storage_layer:store(
|
||||||
?SHARD,
|
?SHARD,
|
||||||
emqx_guid:gen(),
|
emqx_guid:gen(),
|
||||||
PublishedAt,
|
PublishedAt,
|
||||||
|
@ -61,7 +61,7 @@ t_iterate(_Config) ->
|
||||||
%% Iterate through individual topics:
|
%% Iterate through individual topics:
|
||||||
[
|
[
|
||||||
begin
|
begin
|
||||||
{ok, It} = emqx_ds_local_store:make_iterator(?SHARD, {Topic, 0}),
|
{ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, {Topic, 0}),
|
||||||
Values = iterate(It),
|
Values = iterate(It),
|
||||||
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
|
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
|
||||||
end
|
end
|
||||||
|
@ -136,16 +136,16 @@ t_iterate_long_tail_wildcard(_Config) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
t_create_gen(_Config) ->
|
t_create_gen(_Config) ->
|
||||||
{ok, 1} = emqx_ds_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
|
{ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
{error, nonmonotonic},
|
{error, nonmonotonic},
|
||||||
emqx_ds_local_store:create_generation(?SHARD, 1, ?DEFAULT_CONFIG)
|
emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG)
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
{error, nonmonotonic},
|
{error, nonmonotonic},
|
||||||
emqx_ds_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG)
|
emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG)
|
||||||
),
|
),
|
||||||
{ok, 2} = emqx_ds_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
{ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||||
Topics = ["foo/bar", "foo/bar/baz"],
|
Topics = ["foo/bar", "foo/bar/baz"],
|
||||||
Timestamps = lists:seq(1, 100),
|
Timestamps = lists:seq(1, 100),
|
||||||
[
|
[
|
||||||
|
@ -154,9 +154,9 @@ t_create_gen(_Config) ->
|
||||||
].
|
].
|
||||||
|
|
||||||
t_iterate_multigen(_Config) ->
|
t_iterate_multigen(_Config) ->
|
||||||
{ok, 1} = emqx_ds_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
{ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||||
{ok, 2} = emqx_ds_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
|
{ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
|
||||||
{ok, 3} = emqx_ds_local_store:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
|
{ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
|
||||||
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
|
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
|
||||||
Timestamps = lists:seq(1, 100),
|
Timestamps = lists:seq(1, 100),
|
||||||
_ = [
|
_ = [
|
||||||
|
@ -180,9 +180,9 @@ t_iterate_multigen(_Config) ->
|
||||||
|
|
||||||
t_iterate_multigen_preserve_restore(_Config) ->
|
t_iterate_multigen_preserve_restore(_Config) ->
|
||||||
ReplayID = atom_to_binary(?FUNCTION_NAME),
|
ReplayID = atom_to_binary(?FUNCTION_NAME),
|
||||||
{ok, 1} = emqx_ds_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
{ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||||
{ok, 2} = emqx_ds_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
|
{ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
|
||||||
{ok, 3} = emqx_ds_local_store:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
|
{ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
|
||||||
Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
|
Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
|
||||||
Timestamps = lists:seq(1, 100),
|
Timestamps = lists:seq(1, 100),
|
||||||
TopicFilter = "foo/#",
|
TopicFilter = "foo/#",
|
||||||
|
@ -194,12 +194,12 @@ t_iterate_multigen_preserve_restore(_Config) ->
|
||||||
It0 = iterator(?SHARD, TopicFilter, 0),
|
It0 = iterator(?SHARD, TopicFilter, 0),
|
||||||
{It1, Res10} = iterate(It0, 10),
|
{It1, Res10} = iterate(It0, 10),
|
||||||
% preserve mid-generation
|
% preserve mid-generation
|
||||||
ok = emqx_ds_local_store:preserve_iterator(It1, ReplayID),
|
ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID),
|
||||||
{ok, It2} = emqx_ds_local_store:restore_iterator(?SHARD, ReplayID),
|
{ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
|
||||||
{It3, Res100} = iterate(It2, 88),
|
{It3, Res100} = iterate(It2, 88),
|
||||||
% preserve on the generation boundary
|
% preserve on the generation boundary
|
||||||
ok = emqx_ds_local_store:preserve_iterator(It3, ReplayID),
|
ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID),
|
||||||
{ok, It4} = emqx_ds_local_store:restore_iterator(?SHARD, ReplayID),
|
{ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
|
||||||
{It5, Res200} = iterate(It4, 1000),
|
{It5, Res200} = iterate(It4, 1000),
|
||||||
?assertEqual(none, It5),
|
?assertEqual(none, It5),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
|
@ -208,22 +208,22 @@ t_iterate_multigen_preserve_restore(_Config) ->
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
ok,
|
ok,
|
||||||
emqx_ds_local_store:discard_iterator(?SHARD, ReplayID)
|
emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID)
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
{error, not_found},
|
{error, not_found},
|
||||||
emqx_ds_local_store:restore_iterator(?SHARD, ReplayID)
|
emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID)
|
||||||
).
|
).
|
||||||
|
|
||||||
store(Shard, PublishedAt, Topic, Payload) ->
|
store(Shard, PublishedAt, Topic, Payload) ->
|
||||||
ID = emqx_guid:gen(),
|
ID = emqx_guid:gen(),
|
||||||
emqx_ds_local_store:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload).
|
emqx_ds_storage_layer:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload).
|
||||||
|
|
||||||
iterate(DB, TopicFilter, StartTime) ->
|
iterate(DB, TopicFilter, StartTime) ->
|
||||||
iterate(iterator(DB, TopicFilter, StartTime)).
|
iterate(iterator(DB, TopicFilter, StartTime)).
|
||||||
|
|
||||||
iterate(It) ->
|
iterate(It) ->
|
||||||
case emqx_ds_local_store:next(It) of
|
case emqx_ds_storage_layer:next(It) of
|
||||||
{value, Payload, ItNext} ->
|
{value, Payload, ItNext} ->
|
||||||
[Payload | iterate(ItNext)];
|
[Payload | iterate(ItNext)];
|
||||||
none ->
|
none ->
|
||||||
|
@ -233,7 +233,7 @@ iterate(It) ->
|
||||||
iterate(It, 0) ->
|
iterate(It, 0) ->
|
||||||
{It, []};
|
{It, []};
|
||||||
iterate(It, N) ->
|
iterate(It, N) ->
|
||||||
case emqx_ds_local_store:next(It) of
|
case emqx_ds_storage_layer:next(It) of
|
||||||
{value, Payload, ItNext} ->
|
{value, Payload, ItNext} ->
|
||||||
{ItFinal, Ps} = iterate(ItNext, N - 1),
|
{ItFinal, Ps} = iterate(ItNext, N - 1),
|
||||||
{ItFinal, [Payload | Ps]};
|
{ItFinal, [Payload | Ps]};
|
||||||
|
@ -242,7 +242,7 @@ iterate(It, N) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
iterator(DB, TopicFilter, StartTime) ->
|
iterator(DB, TopicFilter, StartTime) ->
|
||||||
{ok, It} = emqx_ds_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}),
|
{ok, It} = emqx_ds_storage_layer:make_iterator(DB, {parse_topic(TopicFilter), StartTime}),
|
||||||
It.
|
It.
|
||||||
|
|
||||||
parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
|
parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
|
||||||
|
@ -263,11 +263,11 @@ end_per_suite(_Config) ->
|
||||||
|
|
||||||
init_per_testcase(TC, Config) ->
|
init_per_testcase(TC, Config) ->
|
||||||
ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
|
ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
|
||||||
{ok, _} = emqx_ds_local_store_sup:start_shard(shard(TC)),
|
{ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC)),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(TC, _Config) ->
|
end_per_testcase(TC, _Config) ->
|
||||||
ok = emqx_ds_local_store_sup:stop_shard(shard(TC)).
|
ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
|
||||||
|
|
||||||
shard(TC) ->
|
shard(TC) ->
|
||||||
list_to_binary(lists:concat([?MODULE, "_", TC])).
|
list_to_binary(lists:concat([?MODULE, "_", TC])).
|
Loading…
Reference in New Issue