diff --git a/apps/emqx_durable_storage/src/emqx_ds_conf.erl b/apps/emqx_durable_storage/src/emqx_ds_conf.erl index f5761e8a0..e748c359e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_conf.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_conf.erl @@ -55,6 +55,6 @@ default_shard_config() -> } }}. --spec db_options() -> emqx_ds_local_store:db_options(). +-spec db_options() -> emqx_ds_storage_layer:db_options(). db_options() -> application:get_env(?APP, db_options, []). diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage.erl index a9427b1f6..9ebb23726 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage.erl @@ -152,7 +152,7 @@ iteration => iteration_options(), - cf_options => emqx_ds_local_store:db_cf_options() + cf_options => emqx_ds_storage_layer:db_cf_options() }. -type iteration_options() :: #{ @@ -174,8 +174,8 @@ handle :: rocksdb:db_handle(), cf :: rocksdb:cf_handle(), keymapper :: keymapper(), - write_options = [{sync, true}] :: emqx_ds_local_store:db_write_options(), - read_options = [] :: emqx_ds_local_store:db_write_options() + write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(), + read_options = [] :: emqx_ds_storage_layer:db_write_options() }). -record(it, { @@ -221,8 +221,8 @@ %%================================================================================ %% Create a new column family for the generation and a serializable representation of the schema --spec create_new(rocksdb:db_handle(), emqx_ds_local_store:gen_id(), options()) -> - {schema(), emqx_ds_local_store:cf_refs()}. +-spec create_new(rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), options()) -> + {schema(), emqx_ds_storage_layer:cf_refs()}. create_new(DBHandle, GenId, Options) -> CFName = data_cf(GenId), CFOptions = maps:get(cf_options, Options, []), @@ -234,8 +234,8 @@ create_new(DBHandle, GenId, Options) -> -spec open( emqx_ds:shard(), rocksdb:db_handle(), - emqx_ds_local_store:gen_id(), - emqx_ds_local_store:cf_refs(), + emqx_ds_storage_layer:gen_id(), + emqx_ds_storage_layer:cf_refs(), schema() ) -> db(). @@ -710,7 +710,7 @@ substring(I, Offset, Size) -> (I bsr Offset) band ones(Size). %% @doc Generate a column family ID for the MQTT messages --spec data_cf(emqx_ds_local_store:gen_id()) -> [char()]. +-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. data_cf(GenId) -> ?MODULE_STRING ++ integer_to_list(GenId). diff --git a/apps/emqx_durable_storage/src/emqx_ds_local_store.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl similarity index 99% rename from apps/emqx_durable_storage/src/emqx_ds_local_store.erl rename to apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 45845d714..43a399a1b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_local_store.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ds_local_store). +-module(emqx_ds_storage_layer). -behaviour(gen_server). diff --git a/apps/emqx_durable_storage/src/emqx_ds_local_store_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl similarity index 95% rename from apps/emqx_durable_storage/src/emqx_ds_local_store_sup.erl rename to apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index aad50992d..ed745df5f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_local_store_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ds_local_store_sup). +-module(emqx_ds_storage_layer_sup). -behaviour(supervisor). @@ -55,7 +55,7 @@ init([]) -> shard_child_spec(Shard) -> #{ id => Shard, - start => {emqx_ds_local_store, start_link, [Shard]}, + start => {emqx_ds_storage_layer, start_link, [Shard]}, shutdown => 5_000, restart => permanent, type => worker diff --git a/apps/emqx_durable_storage/src/emqx_ds_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_sup.erl index ebd022632..ca939e892 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_sup.erl @@ -45,7 +45,7 @@ init([]) -> shard_sup() -> #{ id => local_store_shard_sup, - start => {emqx_ds_local_store_sup, start_link, []}, + start => {emqx_ds_storage_layer_sup, start_link, []}, restart => permanent, type => supervisor, shutdown => infinity diff --git a/apps/emqx_durable_storage/test/emqx_ds_local_store_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl similarity index 79% rename from apps/emqx_durable_storage/test/emqx_ds_local_store_SUITE.erl rename to apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl index d59c4571e..9fd93ecfe 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_local_store_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ds_local_store_SUITE). +-module(emqx_ds_storage_layer_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -32,8 +32,8 @@ %% Smoke test for opening and reopening the database t_open(_Config) -> - ok = emqx_ds_local_store_sup:stop_shard(?SHARD), - {ok, _} = emqx_ds_local_store_sup:start_shard(?SHARD). + ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD). %% Smoke test of store function t_store(_Config) -> @@ -41,7 +41,7 @@ t_store(_Config) -> PublishedAt = 1000, Topic = [<<"foo">>, <<"bar">>], Payload = <<"message">>, - ?assertMatch(ok, emqx_ds_local_store:store(?SHARD, MessageID, PublishedAt, Topic, Payload)). + ?assertMatch(ok, emqx_ds_storage_layer:store(?SHARD, MessageID, PublishedAt, Topic, Payload)). %% Smoke test for iteration through a concrete topic t_iterate(_Config) -> @@ -49,7 +49,7 @@ t_iterate(_Config) -> Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]], Timestamps = lists:seq(1, 10), [ - emqx_ds_local_store:store( + emqx_ds_storage_layer:store( ?SHARD, emqx_guid:gen(), PublishedAt, @@ -61,7 +61,7 @@ t_iterate(_Config) -> %% Iterate through individual topics: [ begin - {ok, It} = emqx_ds_local_store:make_iterator(?SHARD, {Topic, 0}), + {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, {Topic, 0}), Values = iterate(It), ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values) end @@ -136,16 +136,16 @@ t_iterate_long_tail_wildcard(_Config) -> ). t_create_gen(_Config) -> - {ok, 1} = emqx_ds_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), ?assertEqual( {error, nonmonotonic}, - emqx_ds_local_store:create_generation(?SHARD, 1, ?DEFAULT_CONFIG) + emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG) ), ?assertEqual( {error, nonmonotonic}, - emqx_ds_local_store:create_generation(?SHARD, 5, ?DEFAULT_CONFIG) + emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG) ), - {ok, 2} = emqx_ds_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), Topics = ["foo/bar", "foo/bar/baz"], Timestamps = lists:seq(1, 100), [ @@ -154,9 +154,9 @@ t_create_gen(_Config) -> ]. t_iterate_multigen(_Config) -> - {ok, 1} = emqx_ds_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_ds_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_ds_local_store:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], Timestamps = lists:seq(1, 100), _ = [ @@ -180,9 +180,9 @@ t_iterate_multigen(_Config) -> t_iterate_multigen_preserve_restore(_Config) -> ReplayID = atom_to_binary(?FUNCTION_NAME), - {ok, 1} = emqx_ds_local_store:create_generation(?SHARD, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_ds_local_store:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_ds_local_store:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), Topics = ["foo/bar", "foo/bar/baz", "a/bar"], Timestamps = lists:seq(1, 100), TopicFilter = "foo/#", @@ -194,12 +194,12 @@ t_iterate_multigen_preserve_restore(_Config) -> It0 = iterator(?SHARD, TopicFilter, 0), {It1, Res10} = iterate(It0, 10), % preserve mid-generation - ok = emqx_ds_local_store:preserve_iterator(It1, ReplayID), - {ok, It2} = emqx_ds_local_store:restore_iterator(?SHARD, ReplayID), + ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID), + {ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), {It3, Res100} = iterate(It2, 88), % preserve on the generation boundary - ok = emqx_ds_local_store:preserve_iterator(It3, ReplayID), - {ok, It4} = emqx_ds_local_store:restore_iterator(?SHARD, ReplayID), + ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID), + {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), {It5, Res200} = iterate(It4, 1000), ?assertEqual(none, It5), ?assertEqual( @@ -208,22 +208,22 @@ t_iterate_multigen_preserve_restore(_Config) -> ), ?assertEqual( ok, - emqx_ds_local_store:discard_iterator(?SHARD, ReplayID) + emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID) ), ?assertEqual( {error, not_found}, - emqx_ds_local_store:restore_iterator(?SHARD, ReplayID) + emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID) ). store(Shard, PublishedAt, Topic, Payload) -> ID = emqx_guid:gen(), - emqx_ds_local_store:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload). + emqx_ds_storage_layer:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload). iterate(DB, TopicFilter, StartTime) -> iterate(iterator(DB, TopicFilter, StartTime)). iterate(It) -> - case emqx_ds_local_store:next(It) of + case emqx_ds_storage_layer:next(It) of {value, Payload, ItNext} -> [Payload | iterate(ItNext)]; none -> @@ -233,7 +233,7 @@ iterate(It) -> iterate(It, 0) -> {It, []}; iterate(It, N) -> - case emqx_ds_local_store:next(It) of + case emqx_ds_storage_layer:next(It) of {value, Payload, ItNext} -> {ItFinal, Ps} = iterate(ItNext, N - 1), {ItFinal, [Payload | Ps]}; @@ -242,7 +242,7 @@ iterate(It, N) -> end. iterator(DB, TopicFilter, StartTime) -> - {ok, It} = emqx_ds_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}), + {ok, It} = emqx_ds_storage_layer:make_iterator(DB, {parse_topic(TopicFilter), StartTime}), It. parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> @@ -263,11 +263,11 @@ end_per_suite(_Config) -> init_per_testcase(TC, Config) -> ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG), - {ok, _} = emqx_ds_local_store_sup:start_shard(shard(TC)), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC)), Config. end_per_testcase(TC, _Config) -> - ok = emqx_ds_local_store_sup:stop_shard(shard(TC)). + ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). shard(TC) -> list_to_binary(lists:concat([?MODULE, "_", TC])).