diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index a61bf37f9..76d42e8f0 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -46,7 +46,7 @@ %% FIXME -define(DS_SHARD, <<"local">>). --define(DEFAULT_KEYSPACE, <<"#">>). +-define(DEFAULT_KEYSPACE, default). -define(WHEN_ENABLED(DO), case is_store_enabled() of @@ -60,10 +60,16 @@ init() -> ?WHEN_ENABLED(begin ok = emqx_ds:ensure_shard( - ?DS_SHARD, ?DEFAULT_KEYSPACE, + ?DS_SHARD, #{ - dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD]) + dir => filename:join([ + emqx:data_dir(), + ds, + messages, + atom_to_binary(?DEFAULT_KEYSPACE), + ?DS_SHARD + ]) } ), ok = emqx_persistent_session_ds_router:init_tables(), diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 8e6f5535d..cf93e97a0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -78,7 +78,7 @@ %% Parsed topic: -type topic() :: list(binary()). --type keyspace() :: binary(). +-type keyspace() :: atom(). -type shard() :: binary(). %% Timestamp @@ -98,10 +98,10 @@ %% API funcions %%================================================================================ --spec ensure_shard(shard(), keyspace(), emqx_ds_storage_layer:options()) -> +-spec ensure_shard(keyspace(), shard(), emqx_ds_storage_layer:options()) -> ok | {error, _Reason}. -ensure_shard(Shard, Keyspace, Options) -> - case emqx_ds_storage_layer_sup:start_shard(Shard, Keyspace, Options) of +ensure_shard(Keyspace, Shard, Options) -> + case emqx_ds_storage_layer_sup:start_shard(Keyspace, Shard, Options) of {ok, _Pid} -> ok; {error, {already_started, _Pid}} -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index ae292452e..b537e8f0e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -173,8 +173,8 @@ -opaque schema() :: #schema{}. -record(db, { - shard :: emqx_ds:shard(), keyspace :: emqx_ds:keyspace(), + shard :: emqx_ds:shard(), handle :: rocksdb:db_handle(), cf :: rocksdb:cf_handle(), keymapper :: keymapper(), @@ -236,19 +236,19 @@ create_new(DBHandle, GenId, Options) -> %% Reopen the database -spec open( - emqx_ds:shard(), emqx_ds:keyspace(), + emqx_ds:shard(), rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), emqx_ds_storage_layer:cf_refs(), schema() ) -> db(). -open(Shard, Keyspace, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> +open(Keyspace, Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs), #db{ - shard = Shard, keyspace = Keyspace, + shard = Shard, handle = DBHandle, cf = CFHandle, keymapper = Keymapper diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index f34d508d9..1aee14f43 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -7,7 +7,7 @@ %% API: -export([start_link/3]). --export([create_generation/3]). +-export([create_generation/4]). -export([store/5]). -export([delete/4]). @@ -99,7 +99,7 @@ %% 3. `inplace_update_support`? -define(ITERATOR_CF_OPTS, []). --define(REF(Shard), {via, gproc, {n, l, {?MODULE, Shard}}}). +-define(REF(Keyspace, Shard), {via, gproc, {n, l, {?MODULE, Keyspace, Shard}}}). %%================================================================================ %% Callbacks @@ -109,8 +109,8 @@ {_Schema, cf_refs()}. -callback open( - emqx_ds:shard(), emqx_ds:keyspace(), + emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), @@ -143,15 +143,17 @@ %% API funcions %%================================================================================ --spec start_link(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) -> +-spec start_link(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}. -start_link(Shard, Keyspace, Options) -> - gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Keyspace, Options}, []). +start_link(Keyspace, Shard, Options) -> + gen_server:start_link(?REF(Keyspace, Shard), ?MODULE, {Keyspace, Shard, Options}, []). --spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) -> +-spec create_generation( + emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config() +) -> {ok, gen_id()} | {error, nonmonotonic}. -create_generation(Shard, Since, Config = {_Module, _Options}) -> - gen_server:call(?REF(Shard), {create_generation, Since, Config}). +create_generation(Keyspace, Shard, Since, Config = {_Module, _Options}) -> + gen_server:call(?REF(Keyspace, Shard), {create_generation, Since, Config}). -spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) -> ok | {error, _}. @@ -258,9 +260,9 @@ foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> %% behaviour callbacks %%================================================================================ -init({Shard, Keyspace, Options}) -> +init({Keyspace, Shard, Options}) -> process_flag(trap_exit, true), - {ok, S0} = open_db(Shard, Keyspace, Options), + {ok, S0} = open_db(Keyspace, Shard, Options), S = ensure_current_generation(S0), ok = populate_metadata(S), {ok, S}. @@ -342,9 +344,10 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations }, {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. --spec open_db(emqx_ds:shard(), emqx_ds:keyspace(), options()) -> {ok, state()} | {error, _TODO}. -open_db(Shard, Keyspace, Options) -> - DBDir = unicode:characters_to_list(maps:get(dir, Options, Shard)), +-spec open_db(emqx_ds:keyspace(), emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. +open_db(Keyspace, Shard, Options) -> + DefaultDir = filename:join([atom_to_binary(Keyspace), Shard]), + DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)), DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true} @@ -382,9 +385,9 @@ open_db(Shard, Keyspace, Options) -> open_gen( GenId, Gen = #{module := Mod, data := Data}, - #s{shard = Shard, keyspace = Keyspace, db = DBHandle, cf_generations = CFs} + #s{keyspace = Keyspace, shard = Shard, db = DBHandle, cf_generations = CFs} ) -> - DB = Mod:open(Shard, Keyspace, DBHandle, GenId, CFs, Data), + DB = Mod:open(Keyspace, Shard, DBHandle, GenId, CFs, Data), Gen#{data := DB}. -spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index cf84c905c..6ae651e74 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -25,10 +25,10 @@ start_link() -> supervisor:start_link({local, ?SUP}, ?MODULE, []). --spec start_shard(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) -> +-spec start_shard(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) -> supervisor:startchild_ret(). -start_shard(Shard, Keyspace, Options) -> - supervisor:start_child(?SUP, shard_child_spec(Shard, Keyspace, Options)). +start_shard(Keyspace, Shard, Options) -> + supervisor:start_child(?SUP, shard_child_spec(Keyspace, Shard, Options)). -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}. stop_shard(Shard) -> @@ -52,12 +52,12 @@ init([]) -> %% Internal functions %%================================================================================ --spec shard_child_spec(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) -> +-spec shard_child_spec(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) -> supervisor:child_spec(). -shard_child_spec(Shard, Keyspace, Options) -> +shard_child_spec(Keyspace, Shard, Options) -> #{ id => Shard, - start => {emqx_ds_storage_layer, start_link, [Shard, Keyspace, Options]}, + start => {emqx_ds_storage_layer, start_link, [Keyspace, Shard, Options]}, shutdown => 5_000, restart => permanent, type => worker diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl index e19c83dc1..caceaa1b3 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -34,7 +34,7 @@ %% Smoke test for opening and reopening the database t_open(_Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, ?KEYSPACE, #{}). + {ok, _} = emqx_ds_storage_layer_sup:start_shard(?KEYSPACE, ?SHARD, #{}). %% Smoke test of store function t_store(_Config) -> @@ -137,16 +137,16 @@ t_iterate_long_tail_wildcard(_Config) -> ). t_create_gen(_Config) -> - {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 5, ?DEFAULT_CONFIG), ?assertEqual( {error, nonmonotonic}, - emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG) + emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 1, ?DEFAULT_CONFIG) ), ?assertEqual( {error, nonmonotonic}, - emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG) + emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 5, ?DEFAULT_CONFIG) ), - {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG), Topics = ["foo/bar", "foo/bar/baz"], Timestamps = lists:seq(1, 100), [ @@ -155,9 +155,9 @@ t_create_gen(_Config) -> ]. t_iterate_multigen(_Config) -> - {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 1000, ?DEFAULT_CONFIG), Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], Timestamps = lists:seq(1, 100), _ = [ @@ -181,9 +181,9 @@ t_iterate_multigen(_Config) -> t_iterate_multigen_preserve_restore(_Config) -> ReplayID = atom_to_binary(?FUNCTION_NAME), - {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 100, ?DEFAULT_CONFIG), Topics = ["foo/bar", "foo/bar/baz", "a/bar"], Timestamps = lists:seq(1, 100), TopicFilter = "foo/#", @@ -264,17 +264,17 @@ end_per_suite(_Config) -> init_per_testcase(TC, Config) -> ok = set_keyspace_config(keyspace(TC), ?DEFAULT_CONFIG), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), keyspace(TC), #{}), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(keyspace(TC), shard(TC), #{}), Config. end_per_testcase(TC, _Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). keyspace(TC) -> - list_to_binary(lists:concat([?MODULE, "_", TC])). + list_to_atom(lists:concat([?MODULE, "_", TC])). shard(TC) -> - <<(keyspace(TC))/binary, "_shard">>. + <<(atom_to_binary(keyspace(TC)))/binary, "_shard">>. set_keyspace_config(Keyspace, Config) -> ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}). diff --git a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl index e1981888b..14e76d707 100644 --- a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl @@ -10,8 +10,8 @@ -define(WORK_DIR, ["_build", "test"]). -define(RUN_ID, {?MODULE, testrun_id}). --define(KEYSPACE, atom_to_binary(?MODULE)). --define(SHARD, <<(?KEYSPACE)/binary, "_shard">>). +-define(KEYSPACE, ?MODULE). +-define(SHARD, <<(atom_to_binary(?KEYSPACE))/binary, "_shard">>). -define(GEN_ID, 42). %%--------------------------------------------------------------------