From b30bcf32bd93d359b89fed1f7ed12c1027bc8ff4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 14 Sep 2023 15:15:21 -0300 Subject: [PATCH 1/3] feat(ds): introduce keyspace concept Fixes https://emqx.atlassian.net/browse/EMQX-10579 This introduces the concept of "keyspaces" to our durable storage (DS) implementation, and also refactors some places where "shard" and "keyspace" would be mixed up. We might want to tune the storage options differently for distinct sets of topics, the keyspaces. The keyspace is composed by one or more shards. - Keyspaces are identified simply by binary strings. - DS configuration is scoped by keyspaces instead of shards. - Starting a new DS shard requires definining to which keyspace the shard belongs. --- apps/emqx/src/emqx_persistent_session_ds.erl | 11 ++++-- apps/emqx_durable_storage/src/emqx_ds.erl | 10 +++-- .../emqx_durable_storage/src/emqx_ds_conf.erl | 39 +++++++++++-------- .../src/emqx_ds_message_storage_bitmask.erl | 9 +++-- .../src/emqx_ds_storage_layer.erl | 38 +++++++++++------- .../src/emqx_ds_storage_layer_sup.erl | 14 +++---- .../test/emqx_ds_storage_layer_SUITE.erl | 16 +++++--- .../props/prop_replay_message_storage.erl | 5 ++- 8 files changed, 87 insertions(+), 55 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index aed3ece82..a61bf37f9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -46,6 +46,7 @@ %% FIXME -define(DS_SHARD, <<"local">>). +-define(DEFAULT_KEYSPACE, <<"#">>). -define(WHEN_ENABLED(DO), case is_store_enabled() of @@ -58,9 +59,13 @@ init() -> ?WHEN_ENABLED(begin - ok = emqx_ds:ensure_shard(?DS_SHARD, #{ - dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD]) - }), + ok = emqx_ds:ensure_shard( + ?DS_SHARD, + ?DEFAULT_KEYSPACE, + #{ + dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD]) + } + ), ok = emqx_persistent_session_ds_router:init_tables(), ok end). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 0a61cad43..8e6f5535d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -19,7 +19,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% API: --export([ensure_shard/2]). +-export([ensure_shard/3]). %% Messages: -export([message_store/2, message_store/1, message_stats/0]). %% Iterator: @@ -39,6 +39,7 @@ -export([]). -export_type([ + keyspace/0, message_id/0, message_stats/0, message_store_opts/0, @@ -77,6 +78,7 @@ %% Parsed topic: -type topic() :: list(binary()). +-type keyspace() :: binary(). -type shard() :: binary(). %% Timestamp @@ -96,10 +98,10 @@ %% API funcions %%================================================================================ --spec ensure_shard(shard(), emqx_ds_storage_layer:options()) -> +-spec ensure_shard(shard(), keyspace(), emqx_ds_storage_layer:options()) -> ok | {error, _Reason}. -ensure_shard(Shard, Options) -> - case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of +ensure_shard(Shard, Keyspace, Options) -> + case emqx_ds_storage_layer_sup:start_shard(Shard, Keyspace, Options) of {ok, _Pid} -> ok; {error, {already_started, _Pid}} -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_conf.erl b/apps/emqx_durable_storage/src/emqx_ds_conf.erl index db8b14b45..5633cdf58 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_conf.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_conf.erl @@ -6,9 +6,9 @@ %% TODO: make a proper HOCON schema and all... %% API: --export([shard_config/1, db_options/0]). +-export([keyspace_config/1, db_options/1]). --export([shard_iteration_options/1]). +-export([iteration_options/1]). -export([default_iteration_options/0]). -type backend_config() :: @@ -23,16 +23,20 @@ -define(APP, emqx_ds). --spec shard_config(emqx_ds:shard()) -> backend_config(). -shard_config(Shard) -> - DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()), - Shards = application:get_env(?APP, shard_config, #{}), - maps:get(Shard, Shards, DefaultShardConfig). +-spec keyspace_config(emqx_ds:keyspace()) -> backend_config(). +keyspace_config(Keyspace) -> + DefaultKeyspaceConfig = application:get_env( + ?APP, + default_keyspace_config, + default_keyspace_config() + ), + Keyspaces = application:get_env(?APP, keyspace_config, #{}), + maps:get(Keyspace, Keyspaces, DefaultKeyspaceConfig). --spec shard_iteration_options(emqx_ds:shard()) -> +-spec iteration_options(emqx_ds:keyspace()) -> emqx_ds_message_storage_bitmask:iteration_options(). -shard_iteration_options(Shard) -> - case shard_config(Shard) of +iteration_options(Keyspace) -> + case keyspace_config(Keyspace) of {emqx_ds_message_storage_bitmask, Config} -> maps:get(iteration, Config, default_iteration_options()); {_Module, _} -> @@ -41,12 +45,13 @@ shard_iteration_options(Shard) -> -spec default_iteration_options() -> emqx_ds_message_storage_bitmask:iteration_options(). default_iteration_options() -> - {emqx_ds_message_storage_bitmask, Config} = default_shard_config(), + {emqx_ds_message_storage_bitmask, Config} = default_keyspace_config(), maps:get(iteration, Config). --spec default_shard_config() -> backend_config(). -default_shard_config() -> +-spec default_keyspace_config() -> backend_config(). +default_keyspace_config() -> {emqx_ds_message_storage_bitmask, #{ + db_options => [], timestamp_bits => 64, topic_bits_per_level => [8, 8, 8, 32, 16], epoch => 5, @@ -55,6 +60,8 @@ default_shard_config() -> } }}. --spec db_options() -> emqx_ds_storage_layer:db_options(). -db_options() -> - application:get_env(?APP, db_options, []). +-spec db_options(emqx_ds:keyspace()) -> emqx_ds_storage_layer:db_options(). +db_options(Keyspace) -> + DefaultDBOptions = application:get_env(?APP, default_db_options, []), + Keyspaces = application:get_env(?APP, keyspace_config, #{}), + emqx_utils_maps:deep_get([Keyspace, db_options], Keyspaces, DefaultDBOptions). diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index a97b89580..ae292452e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -80,7 +80,7 @@ -behaviour(emqx_ds_storage_layer). %% API: --export([create_new/3, open/5]). +-export([create_new/3, open/6]). -export([make_keymapper/1]). -export([store/5]). @@ -174,6 +174,7 @@ -record(db, { shard :: emqx_ds:shard(), + keyspace :: emqx_ds:keyspace(), handle :: rocksdb:db_handle(), cf :: rocksdb:cf_handle(), keymapper :: keymapper(), @@ -236,16 +237,18 @@ create_new(DBHandle, GenId, Options) -> %% Reopen the database -spec open( emqx_ds:shard(), + emqx_ds:keyspace(), rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), emqx_ds_storage_layer:cf_refs(), schema() ) -> db(). -open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> +open(Shard, Keyspace, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs), #db{ shard = Shard, + keyspace = Keyspace, handle = DBHandle, cf = CFHandle, keymapper = Keymapper @@ -289,7 +292,7 @@ delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic -spec make_iterator(db(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, Replay) -> - Options = emqx_ds_conf:shard_iteration_options(DB#db.shard), + Options = emqx_ds_conf:iteration_options(DB#db.keyspace), make_iterator(DB, Replay, Options). -spec make_iterator(db(), emqx_ds:replay(), iteration_options()) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index a16c9b476..f34d508d9 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -6,7 +6,7 @@ -behaviour(gen_server). %% API: --export([start_link/2]). +-export([start_link/3]). -export([create_generation/3]). -export([store/5]). @@ -64,6 +64,7 @@ -record(s, { shard :: emqx_ds:shard(), + keyspace :: emqx_ds:keyspace(), db :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle(), cf_generations :: cf_refs() @@ -107,7 +108,14 @@ -callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) -> {_Schema, cf_refs()}. --callback open(emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> +-callback open( + emqx_ds:shard(), + emqx_ds:keyspace(), + rocksdb:db_handle(), + gen_id(), + cf_refs(), + _Schema +) -> term(). -callback store( @@ -135,9 +143,10 @@ %% API funcions %%================================================================================ --spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}. -start_link(Shard, Options) -> - gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). +-spec start_link(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) -> + {ok, pid()}. +start_link(Shard, Keyspace, Options) -> + gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Keyspace, Options}, []). -spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) -> {ok, gen_id()} | {error, nonmonotonic}. @@ -249,9 +258,9 @@ foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> %% behaviour callbacks %%================================================================================ -init({Shard, Options}) -> +init({Shard, Keyspace, Options}) -> process_flag(trap_exit, true), - {ok, S0} = open_db(Shard, Options), + {ok, S0} = open_db(Shard, Keyspace, Options), S = ensure_current_generation(S0), ok = populate_metadata(S), {ok, S}. @@ -294,10 +303,10 @@ populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) -> meta_register_gen(Shard, GenId, Gen). -spec ensure_current_generation(state()) -> state(). -ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) -> +ensure_current_generation(S = #s{keyspace = Keyspace, db = DBHandle}) -> case schema_get_current(DBHandle) of undefined -> - Config = emqx_ds_conf:shard_config(Shard), + Config = emqx_ds_conf:keyspace_config(Keyspace), {ok, _, NS} = create_new_gen(0, Config, S), NS; _GenId -> @@ -333,13 +342,13 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations }, {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. --spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. -open_db(Shard, Options) -> +-spec open_db(emqx_ds:shard(), emqx_ds:keyspace(), options()) -> {ok, state()} | {error, _TODO}. +open_db(Shard, Keyspace, Options) -> DBDir = unicode:characters_to_list(maps:get(dir, Options, Shard)), DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true} - | emqx_ds_conf:db_options() + | emqx_ds_conf:db_options(Keyspace) ], _ = filelib:ensure_dir(DBDir), ExistingCFs = @@ -360,6 +369,7 @@ open_db(Shard, Options) -> {CFNames, _} = lists:unzip(ExistingCFs), {ok, #s{ shard = Shard, + keyspace = Keyspace, db = DBHandle, cf_iterator = CFIterator, cf_generations = lists:zip(CFNames, CFRefs) @@ -372,9 +382,9 @@ open_db(Shard, Options) -> open_gen( GenId, Gen = #{module := Mod, data := Data}, - #s{shard = Shard, db = DBHandle, cf_generations = CFs} + #s{shard = Shard, keyspace = Keyspace, db = DBHandle, cf_generations = CFs} ) -> - DB = Mod:open(Shard, DBHandle, GenId, CFs, Data), + DB = Mod:open(Shard, Keyspace, DBHandle, GenId, CFs, Data), Gen#{data := DB}. -spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index 56c8c760a..cf84c905c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -6,7 +6,7 @@ -behaviour(supervisor). %% API: --export([start_link/0, start_shard/2, stop_shard/1]). +-export([start_link/0, start_shard/3, stop_shard/1]). %% behaviour callbacks: -export([init/1]). @@ -25,10 +25,10 @@ start_link() -> supervisor:start_link({local, ?SUP}, ?MODULE, []). --spec start_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> +-spec start_shard(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) -> supervisor:startchild_ret(). -start_shard(Shard, Options) -> - supervisor:start_child(?SUP, shard_child_spec(Shard, Options)). +start_shard(Shard, Keyspace, Options) -> + supervisor:start_child(?SUP, shard_child_spec(Shard, Keyspace, Options)). -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}. stop_shard(Shard) -> @@ -52,12 +52,12 @@ init([]) -> %% Internal functions %%================================================================================ --spec shard_child_spec(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> +-spec shard_child_spec(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) -> supervisor:child_spec(). -shard_child_spec(Shard, Options) -> +shard_child_spec(Shard, Keyspace, Options) -> #{ id => Shard, - start => {emqx_ds_storage_layer, start_link, [Shard, Options]}, + start => {emqx_ds_storage_layer, start_link, [Shard, Keyspace, Options]}, shutdown => 5_000, restart => permanent, type => worker diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl index c5c227333..e19c83dc1 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("stdlib/include/assert.hrl"). -define(SHARD, shard(?FUNCTION_NAME)). +-define(KEYSPACE, keyspace(?FUNCTION_NAME)). -define(DEFAULT_CONFIG, {emqx_ds_message_storage_bitmask, #{ @@ -33,7 +34,7 @@ %% Smoke test for opening and reopening the database t_open(_Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}). + {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, ?KEYSPACE, #{}). %% Smoke test of store function t_store(_Config) -> @@ -262,15 +263,18 @@ end_per_suite(_Config) -> ok = application:stop(emqx_durable_storage). init_per_testcase(TC, Config) -> - ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}), + ok = set_keyspace_config(keyspace(TC), ?DEFAULT_CONFIG), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), keyspace(TC), #{}), Config. end_per_testcase(TC, _Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). -shard(TC) -> +keyspace(TC) -> list_to_binary(lists:concat([?MODULE, "_", TC])). -set_shard_config(Shard, Config) -> - ok = application:set_env(emqx_ds, shard_config, #{Shard => Config}). +shard(TC) -> + <<(keyspace(TC))/binary, "_shard">>. + +set_keyspace_config(Keyspace, Config) -> + ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}). diff --git a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl index 7452906b8..e1981888b 100644 --- a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl @@ -10,7 +10,8 @@ -define(WORK_DIR, ["_build", "test"]). -define(RUN_ID, {?MODULE, testrun_id}). --define(ZONE, ?MODULE). +-define(KEYSPACE, atom_to_binary(?MODULE)). +-define(SHARD, <<(?KEYSPACE)/binary, "_shard">>). -define(GEN_ID, 42). %%-------------------------------------------------------------------- @@ -255,7 +256,7 @@ iterate_shim(Shim, Iteration) -> open_db(Filepath, Options) -> {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]), {Schema, CFRefs} = emqx_ds_message_storage_bitmask:create_new(Handle, ?GEN_ID, Options), - DB = emqx_ds_message_storage_bitmask:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema), + DB = emqx_ds_message_storage_bitmask:open(?SHARD, ?KEYSPACE, Handle, ?GEN_ID, CFRefs, Schema), {DB, Handle}. close_db(Handle) -> From a511088fd42dbd1dfe78e3b3829116d33050f8ca Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 18 Sep 2023 17:38:12 -0300 Subject: [PATCH 2/3] refactor: address review comments --- apps/emqx/src/emqx_persistent_session_ds.erl | 12 +++++-- apps/emqx_durable_storage/src/emqx_ds.erl | 8 ++--- .../src/emqx_ds_message_storage_bitmask.erl | 8 ++--- .../src/emqx_ds_storage_layer.erl | 35 ++++++++++--------- .../src/emqx_ds_storage_layer_sup.erl | 12 +++---- .../test/emqx_ds_storage_layer_SUITE.erl | 28 +++++++-------- .../props/prop_replay_message_storage.erl | 4 +-- 7 files changed, 58 insertions(+), 49 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index a61bf37f9..76d42e8f0 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -46,7 +46,7 @@ %% FIXME -define(DS_SHARD, <<"local">>). --define(DEFAULT_KEYSPACE, <<"#">>). +-define(DEFAULT_KEYSPACE, default). -define(WHEN_ENABLED(DO), case is_store_enabled() of @@ -60,10 +60,16 @@ init() -> ?WHEN_ENABLED(begin ok = emqx_ds:ensure_shard( - ?DS_SHARD, ?DEFAULT_KEYSPACE, + ?DS_SHARD, #{ - dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD]) + dir => filename:join([ + emqx:data_dir(), + ds, + messages, + atom_to_binary(?DEFAULT_KEYSPACE), + ?DS_SHARD + ]) } ), ok = emqx_persistent_session_ds_router:init_tables(), diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 8e6f5535d..cf93e97a0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -78,7 +78,7 @@ %% Parsed topic: -type topic() :: list(binary()). --type keyspace() :: binary(). +-type keyspace() :: atom(). -type shard() :: binary(). %% Timestamp @@ -98,10 +98,10 @@ %% API funcions %%================================================================================ --spec ensure_shard(shard(), keyspace(), emqx_ds_storage_layer:options()) -> +-spec ensure_shard(keyspace(), shard(), emqx_ds_storage_layer:options()) -> ok | {error, _Reason}. -ensure_shard(Shard, Keyspace, Options) -> - case emqx_ds_storage_layer_sup:start_shard(Shard, Keyspace, Options) of +ensure_shard(Keyspace, Shard, Options) -> + case emqx_ds_storage_layer_sup:start_shard(Keyspace, Shard, Options) of {ok, _Pid} -> ok; {error, {already_started, _Pid}} -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index ae292452e..b537e8f0e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -173,8 +173,8 @@ -opaque schema() :: #schema{}. -record(db, { - shard :: emqx_ds:shard(), keyspace :: emqx_ds:keyspace(), + shard :: emqx_ds:shard(), handle :: rocksdb:db_handle(), cf :: rocksdb:cf_handle(), keymapper :: keymapper(), @@ -236,19 +236,19 @@ create_new(DBHandle, GenId, Options) -> %% Reopen the database -spec open( - emqx_ds:shard(), emqx_ds:keyspace(), + emqx_ds:shard(), rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), emqx_ds_storage_layer:cf_refs(), schema() ) -> db(). -open(Shard, Keyspace, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> +open(Keyspace, Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs), #db{ - shard = Shard, keyspace = Keyspace, + shard = Shard, handle = DBHandle, cf = CFHandle, keymapper = Keymapper diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index f34d508d9..1aee14f43 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -7,7 +7,7 @@ %% API: -export([start_link/3]). --export([create_generation/3]). +-export([create_generation/4]). -export([store/5]). -export([delete/4]). @@ -99,7 +99,7 @@ %% 3. `inplace_update_support`? -define(ITERATOR_CF_OPTS, []). --define(REF(Shard), {via, gproc, {n, l, {?MODULE, Shard}}}). +-define(REF(Keyspace, Shard), {via, gproc, {n, l, {?MODULE, Keyspace, Shard}}}). %%================================================================================ %% Callbacks @@ -109,8 +109,8 @@ {_Schema, cf_refs()}. -callback open( - emqx_ds:shard(), emqx_ds:keyspace(), + emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), @@ -143,15 +143,17 @@ %% API funcions %%================================================================================ --spec start_link(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) -> +-spec start_link(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}. -start_link(Shard, Keyspace, Options) -> - gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Keyspace, Options}, []). +start_link(Keyspace, Shard, Options) -> + gen_server:start_link(?REF(Keyspace, Shard), ?MODULE, {Keyspace, Shard, Options}, []). --spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) -> +-spec create_generation( + emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config() +) -> {ok, gen_id()} | {error, nonmonotonic}. -create_generation(Shard, Since, Config = {_Module, _Options}) -> - gen_server:call(?REF(Shard), {create_generation, Since, Config}). +create_generation(Keyspace, Shard, Since, Config = {_Module, _Options}) -> + gen_server:call(?REF(Keyspace, Shard), {create_generation, Since, Config}). -spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) -> ok | {error, _}. @@ -258,9 +260,9 @@ foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> %% behaviour callbacks %%================================================================================ -init({Shard, Keyspace, Options}) -> +init({Keyspace, Shard, Options}) -> process_flag(trap_exit, true), - {ok, S0} = open_db(Shard, Keyspace, Options), + {ok, S0} = open_db(Keyspace, Shard, Options), S = ensure_current_generation(S0), ok = populate_metadata(S), {ok, S}. @@ -342,9 +344,10 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations }, {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. --spec open_db(emqx_ds:shard(), emqx_ds:keyspace(), options()) -> {ok, state()} | {error, _TODO}. -open_db(Shard, Keyspace, Options) -> - DBDir = unicode:characters_to_list(maps:get(dir, Options, Shard)), +-spec open_db(emqx_ds:keyspace(), emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. +open_db(Keyspace, Shard, Options) -> + DefaultDir = filename:join([atom_to_binary(Keyspace), Shard]), + DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)), DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true} @@ -382,9 +385,9 @@ open_db(Shard, Keyspace, Options) -> open_gen( GenId, Gen = #{module := Mod, data := Data}, - #s{shard = Shard, keyspace = Keyspace, db = DBHandle, cf_generations = CFs} + #s{keyspace = Keyspace, shard = Shard, db = DBHandle, cf_generations = CFs} ) -> - DB = Mod:open(Shard, Keyspace, DBHandle, GenId, CFs, Data), + DB = Mod:open(Keyspace, Shard, DBHandle, GenId, CFs, Data), Gen#{data := DB}. -spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index cf84c905c..6ae651e74 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -25,10 +25,10 @@ start_link() -> supervisor:start_link({local, ?SUP}, ?MODULE, []). --spec start_shard(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) -> +-spec start_shard(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) -> supervisor:startchild_ret(). -start_shard(Shard, Keyspace, Options) -> - supervisor:start_child(?SUP, shard_child_spec(Shard, Keyspace, Options)). +start_shard(Keyspace, Shard, Options) -> + supervisor:start_child(?SUP, shard_child_spec(Keyspace, Shard, Options)). -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}. stop_shard(Shard) -> @@ -52,12 +52,12 @@ init([]) -> %% Internal functions %%================================================================================ --spec shard_child_spec(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) -> +-spec shard_child_spec(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) -> supervisor:child_spec(). -shard_child_spec(Shard, Keyspace, Options) -> +shard_child_spec(Keyspace, Shard, Options) -> #{ id => Shard, - start => {emqx_ds_storage_layer, start_link, [Shard, Keyspace, Options]}, + start => {emqx_ds_storage_layer, start_link, [Keyspace, Shard, Options]}, shutdown => 5_000, restart => permanent, type => worker diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl index e19c83dc1..caceaa1b3 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -34,7 +34,7 @@ %% Smoke test for opening and reopening the database t_open(_Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, ?KEYSPACE, #{}). + {ok, _} = emqx_ds_storage_layer_sup:start_shard(?KEYSPACE, ?SHARD, #{}). %% Smoke test of store function t_store(_Config) -> @@ -137,16 +137,16 @@ t_iterate_long_tail_wildcard(_Config) -> ). t_create_gen(_Config) -> - {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 5, ?DEFAULT_CONFIG), ?assertEqual( {error, nonmonotonic}, - emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG) + emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 1, ?DEFAULT_CONFIG) ), ?assertEqual( {error, nonmonotonic}, - emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG) + emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 5, ?DEFAULT_CONFIG) ), - {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG), Topics = ["foo/bar", "foo/bar/baz"], Timestamps = lists:seq(1, 100), [ @@ -155,9 +155,9 @@ t_create_gen(_Config) -> ]. t_iterate_multigen(_Config) -> - {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 1000, ?DEFAULT_CONFIG), Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], Timestamps = lists:seq(1, 100), _ = [ @@ -181,9 +181,9 @@ t_iterate_multigen(_Config) -> t_iterate_multigen_preserve_restore(_Config) -> ReplayID = atom_to_binary(?FUNCTION_NAME), - {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 100, ?DEFAULT_CONFIG), Topics = ["foo/bar", "foo/bar/baz", "a/bar"], Timestamps = lists:seq(1, 100), TopicFilter = "foo/#", @@ -264,17 +264,17 @@ end_per_suite(_Config) -> init_per_testcase(TC, Config) -> ok = set_keyspace_config(keyspace(TC), ?DEFAULT_CONFIG), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), keyspace(TC), #{}), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(keyspace(TC), shard(TC), #{}), Config. end_per_testcase(TC, _Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). keyspace(TC) -> - list_to_binary(lists:concat([?MODULE, "_", TC])). + list_to_atom(lists:concat([?MODULE, "_", TC])). shard(TC) -> - <<(keyspace(TC))/binary, "_shard">>. + <<(atom_to_binary(keyspace(TC)))/binary, "_shard">>. set_keyspace_config(Keyspace, Config) -> ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}). diff --git a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl index e1981888b..14e76d707 100644 --- a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl @@ -10,8 +10,8 @@ -define(WORK_DIR, ["_build", "test"]). -define(RUN_ID, {?MODULE, testrun_id}). --define(KEYSPACE, atom_to_binary(?MODULE)). --define(SHARD, <<(?KEYSPACE)/binary, "_shard">>). +-define(KEYSPACE, ?MODULE). +-define(SHARD, <<(atom_to_binary(?KEYSPACE))/binary, "_shard">>). -define(GEN_ID, 42). %%-------------------------------------------------------------------- From 5ed5ac48ee3aa43146e6ed945294d5df6f544091 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 19 Sep 2023 09:33:59 -0300 Subject: [PATCH 3/3] refactor: combine shard id and keyspace into a single value --- apps/emqx/integration_test/emqx_ds_SUITE.erl | 4 +- apps/emqx/src/emqx_persistent_session_ds.erl | 12 +++--- .../test/emqx_persistent_messages_SUITE.erl | 4 +- apps/emqx_durable_storage/src/emqx_ds.erl | 20 +++++----- apps/emqx_durable_storage/src/emqx_ds_app.erl | 4 +- apps/emqx_durable_storage/src/emqx_ds_int.hrl | 2 +- .../src/emqx_ds_message_storage_bitmask.erl | 10 ++--- .../src/emqx_ds_storage_layer.erl | 37 +++++++++---------- .../src/emqx_ds_storage_layer_sup.erl | 14 +++---- .../test/emqx_ds_storage_layer_SUITE.erl | 30 ++++++++------- .../props/prop_replay_message_storage.erl | 5 ++- 11 files changed, 74 insertions(+), 68 deletions(-) diff --git a/apps/emqx/integration_test/emqx_ds_SUITE.erl b/apps/emqx/integration_test/emqx_ds_SUITE.erl index 264cbde14..fa30661e2 100644 --- a/apps/emqx/integration_test/emqx_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_ds_SUITE.erl @@ -11,7 +11,9 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). --define(DS_SHARD, <<"local">>). +-define(DEFAULT_KEYSPACE, default). +-define(DS_SHARD_ID, <<"local">>). +-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}). -define(ITERATOR_REF_TAB, emqx_ds_iterator_ref). -import(emqx_common_test_helpers, [on_exit/1]). diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 76d42e8f0..6b25dd185 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -45,8 +45,9 @@ ]). %% FIXME --define(DS_SHARD, <<"local">>). +-define(DS_SHARD_ID, <<"local">>). -define(DEFAULT_KEYSPACE, default). +-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}). -define(WHEN_ENABLED(DO), case is_store_enabled() of @@ -60,15 +61,14 @@ init() -> ?WHEN_ENABLED(begin ok = emqx_ds:ensure_shard( - ?DEFAULT_KEYSPACE, ?DS_SHARD, #{ dir => filename:join([ emqx:data_dir(), ds, messages, - atom_to_binary(?DEFAULT_KEYSPACE), - ?DS_SHARD + ?DEFAULT_KEYSPACE, + ?DS_SHARD_ID ]) } ), @@ -97,7 +97,9 @@ store_message(Msg) -> ID = emqx_message:id(Msg), Timestamp = emqx_guid:timestamp(ID), Topic = emqx_topic:words(emqx_message:topic(Msg)), - emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)). + emqx_ds_storage_layer:store( + ?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg) + ). has_subscribers(#message{topic = Topic}) -> emqx_persistent_session_ds_router:has_any_route(Topic). diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index a04a0d4de..9d0f42424 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -26,7 +26,9 @@ -import(emqx_common_test_helpers, [on_exit/1]). --define(DS_SHARD, <<"local">>). +-define(DEFAULT_KEYSPACE, default). +-define(DS_SHARD_ID, <<"local">>). +-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}). all() -> emqx_common_test_helpers:all(?MODULE). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index cf93e97a0..697dd88a8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -19,7 +19,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% API: --export([ensure_shard/3]). +-export([ensure_shard/2]). %% Messages: -export([message_store/2, message_store/1, message_stats/0]). %% Iterator: @@ -49,6 +49,7 @@ iterator_id/0, iterator/0, shard/0, + shard_id/0, topic/0, time/0 ]). @@ -79,7 +80,8 @@ -type topic() :: list(binary()). -type keyspace() :: atom(). --type shard() :: binary(). +-type shard_id() :: binary(). +-type shard() :: {keyspace(), shard_id()}. %% Timestamp %% Earliest possible timestamp is 0. @@ -98,10 +100,10 @@ %% API funcions %%================================================================================ --spec ensure_shard(keyspace(), shard(), emqx_ds_storage_layer:options()) -> +-spec ensure_shard(shard(), emqx_ds_storage_layer:options()) -> ok | {error, _Reason}. -ensure_shard(Keyspace, Shard, Options) -> - case emqx_ds_storage_layer_sup:start_shard(Keyspace, Shard, Options) of +ensure_shard(Shard, Options) -> + case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of {ok, _Pid} -> ok; {error, {already_started, _Pid}} -> @@ -142,7 +144,7 @@ message_stats() -> -spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id()}. session_open(ClientID) -> {atomic, Res} = - mria:transaction(?DS_SHARD, fun() -> + mria:transaction(?DS_MRIA_SHARD, fun() -> case mnesia:read(?SESSION_TAB, ClientID, write) of [#session{}] -> {false, ClientID}; @@ -159,7 +161,7 @@ session_open(ClientID) -> -spec session_drop(emqx_types:clientid()) -> ok. session_drop(ClientID) -> {atomic, ok} = mria:transaction( - ?DS_SHARD, + ?DS_MRIA_SHARD, fun() -> %% TODO: ensure all iterators from this clientid are closed? mnesia:delete({?SESSION_TAB, ClientID}) @@ -180,7 +182,7 @@ session_suspend(_SessionId) -> session_add_iterator(DSSessionId, TopicFilter) -> IteratorRefId = {DSSessionId, TopicFilter}, {atomic, Res} = - mria:transaction(?DS_SHARD, fun() -> + mria:transaction(?DS_MRIA_SHARD, fun() -> case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of [] -> {IteratorId, StartMS} = new_iterator_id(DSSessionId), @@ -223,7 +225,7 @@ session_get_iterator_id(DSSessionId, TopicFilter) -> session_del_iterator(DSSessionId, TopicFilter) -> IteratorRefId = {DSSessionId, TopicFilter}, {atomic, ok} = - mria:transaction(?DS_SHARD, fun() -> + mria:transaction(?DS_MRIA_SHARD, fun() -> mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write) end), ok. diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index 3c9d2652e..09856df3c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -18,7 +18,7 @@ init_mnesia() -> ok = mria:create_table( ?SESSION_TAB, [ - {rlog_shard, ?DS_SHARD}, + {rlog_shard, ?DS_MRIA_SHARD}, {type, set}, {storage, storage()}, {record_name, session}, @@ -28,7 +28,7 @@ init_mnesia() -> ok = mria:create_table( ?ITERATOR_REF_TAB, [ - {rlog_shard, ?DS_SHARD}, + {rlog_shard, ?DS_MRIA_SHARD}, {type, ordered_set}, {storage, storage()}, {record_name, iterator_ref}, diff --git a/apps/emqx_durable_storage/src/emqx_ds_int.hrl b/apps/emqx_durable_storage/src/emqx_ds_int.hrl index 47493bd0b..28a0db429 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_int.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_int.hrl @@ -18,7 +18,7 @@ -define(SESSION_TAB, emqx_ds_session). -define(ITERATOR_REF_TAB, emqx_ds_iterator_ref). --define(DS_SHARD, emqx_ds_shard). +-define(DS_MRIA_SHARD, emqx_ds_shard). -record(session, { %% same as clientid diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index b537e8f0e..437cc5b06 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -80,7 +80,7 @@ -behaviour(emqx_ds_storage_layer). %% API: --export([create_new/3, open/6]). +-export([create_new/3, open/5]). -export([make_keymapper/1]). -export([store/5]). @@ -173,7 +173,6 @@ -opaque schema() :: #schema{}. -record(db, { - keyspace :: emqx_ds:keyspace(), shard :: emqx_ds:shard(), handle :: rocksdb:db_handle(), cf :: rocksdb:cf_handle(), @@ -236,7 +235,6 @@ create_new(DBHandle, GenId, Options) -> %% Reopen the database -spec open( - emqx_ds:keyspace(), emqx_ds:shard(), rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), @@ -244,10 +242,9 @@ create_new(DBHandle, GenId, Options) -> schema() ) -> db(). -open(Keyspace, Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> +open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs), #db{ - keyspace = Keyspace, shard = Shard, handle = DBHandle, cf = CFHandle, @@ -292,7 +289,8 @@ delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic -spec make_iterator(db(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, Replay) -> - Options = emqx_ds_conf:iteration_options(DB#db.keyspace), + {Keyspace, _ShardId} = DB#db.shard, + Options = emqx_ds_conf:iteration_options(Keyspace), make_iterator(DB, Replay, Options). -spec make_iterator(db(), emqx_ds:replay(), iteration_options()) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 1aee14f43..6137a1ed7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -6,8 +6,8 @@ -behaviour(gen_server). %% API: --export([start_link/3]). --export([create_generation/4]). +-export([start_link/2]). +-export([create_generation/3]). -export([store/5]). -export([delete/4]). @@ -64,7 +64,6 @@ -record(s, { shard :: emqx_ds:shard(), - keyspace :: emqx_ds:keyspace(), db :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle(), cf_generations :: cf_refs() @@ -99,7 +98,7 @@ %% 3. `inplace_update_support`? -define(ITERATOR_CF_OPTS, []). --define(REF(Keyspace, Shard), {via, gproc, {n, l, {?MODULE, Keyspace, Shard}}}). +-define(REF(Keyspace, ShardId), {via, gproc, {n, l, {?MODULE, Keyspace, ShardId}}}). %%================================================================================ %% Callbacks @@ -109,7 +108,6 @@ {_Schema, cf_refs()}. -callback open( - emqx_ds:keyspace(), emqx_ds:shard(), rocksdb:db_handle(), gen_id(), @@ -143,17 +141,17 @@ %% API funcions %%================================================================================ --spec start_link(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) -> +-spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}. -start_link(Keyspace, Shard, Options) -> - gen_server:start_link(?REF(Keyspace, Shard), ?MODULE, {Keyspace, Shard, Options}, []). +start_link(Shard = {Keyspace, ShardId}, Options) -> + gen_server:start_link(?REF(Keyspace, ShardId), ?MODULE, {Shard, Options}, []). -spec create_generation( - emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config() + emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config() ) -> {ok, gen_id()} | {error, nonmonotonic}. -create_generation(Keyspace, Shard, Since, Config = {_Module, _Options}) -> - gen_server:call(?REF(Keyspace, Shard), {create_generation, Since, Config}). +create_generation({Keyspace, ShardId}, Since, Config = {_Module, _Options}) -> + gen_server:call(?REF(Keyspace, ShardId), {create_generation, Since, Config}). -spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) -> ok | {error, _}. @@ -260,9 +258,9 @@ foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> %% behaviour callbacks %%================================================================================ -init({Keyspace, Shard, Options}) -> +init({Shard, Options}) -> process_flag(trap_exit, true), - {ok, S0} = open_db(Keyspace, Shard, Options), + {ok, S0} = open_db(Shard, Options), S = ensure_current_generation(S0), ok = populate_metadata(S), {ok, S}. @@ -305,7 +303,7 @@ populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) -> meta_register_gen(Shard, GenId, Gen). -spec ensure_current_generation(state()) -> state(). -ensure_current_generation(S = #s{keyspace = Keyspace, db = DBHandle}) -> +ensure_current_generation(S = #s{shard = {Keyspace, _ShardId}, db = DBHandle}) -> case schema_get_current(DBHandle) of undefined -> Config = emqx_ds_conf:keyspace_config(Keyspace), @@ -344,9 +342,9 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations }, {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. --spec open_db(emqx_ds:keyspace(), emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. -open_db(Keyspace, Shard, Options) -> - DefaultDir = filename:join([atom_to_binary(Keyspace), Shard]), +-spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. +open_db(Shard = {Keyspace, ShardId}, Options) -> + DefaultDir = filename:join([atom_to_binary(Keyspace), ShardId]), DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)), DBOptions = [ {create_if_missing, true}, @@ -372,7 +370,6 @@ open_db(Keyspace, Shard, Options) -> {CFNames, _} = lists:unzip(ExistingCFs), {ok, #s{ shard = Shard, - keyspace = Keyspace, db = DBHandle, cf_iterator = CFIterator, cf_generations = lists:zip(CFNames, CFRefs) @@ -385,9 +382,9 @@ open_db(Keyspace, Shard, Options) -> open_gen( GenId, Gen = #{module := Mod, data := Data}, - #s{keyspace = Keyspace, shard = Shard, db = DBHandle, cf_generations = CFs} + #s{shard = Shard, db = DBHandle, cf_generations = CFs} ) -> - DB = Mod:open(Keyspace, Shard, DBHandle, GenId, CFs, Data), + DB = Mod:open(Shard, DBHandle, GenId, CFs, Data), Gen#{data := DB}. -spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index 6ae651e74..56c8c760a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -6,7 +6,7 @@ -behaviour(supervisor). %% API: --export([start_link/0, start_shard/3, stop_shard/1]). +-export([start_link/0, start_shard/2, stop_shard/1]). %% behaviour callbacks: -export([init/1]). @@ -25,10 +25,10 @@ start_link() -> supervisor:start_link({local, ?SUP}, ?MODULE, []). --spec start_shard(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) -> +-spec start_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> supervisor:startchild_ret(). -start_shard(Keyspace, Shard, Options) -> - supervisor:start_child(?SUP, shard_child_spec(Keyspace, Shard, Options)). +start_shard(Shard, Options) -> + supervisor:start_child(?SUP, shard_child_spec(Shard, Options)). -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}. stop_shard(Shard) -> @@ -52,12 +52,12 @@ init([]) -> %% Internal functions %%================================================================================ --spec shard_child_spec(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) -> +-spec shard_child_spec(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> supervisor:child_spec(). -shard_child_spec(Keyspace, Shard, Options) -> +shard_child_spec(Shard, Options) -> #{ id => Shard, - start => {emqx_ds_storage_layer, start_link, [Keyspace, Shard, Options]}, + start => {emqx_ds_storage_layer, start_link, [Shard, Options]}, shutdown => 5_000, restart => permanent, type => worker diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl index caceaa1b3..3a872934f 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -10,7 +10,6 @@ -include_lib("stdlib/include/assert.hrl"). -define(SHARD, shard(?FUNCTION_NAME)). --define(KEYSPACE, keyspace(?FUNCTION_NAME)). -define(DEFAULT_CONFIG, {emqx_ds_message_storage_bitmask, #{ @@ -34,7 +33,7 @@ %% Smoke test for opening and reopening the database t_open(_Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(?KEYSPACE, ?SHARD, #{}). + {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}). %% Smoke test of store function t_store(_Config) -> @@ -137,16 +136,16 @@ t_iterate_long_tail_wildcard(_Config) -> ). t_create_gen(_Config) -> - {ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 5, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), ?assertEqual( {error, nonmonotonic}, - emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 1, ?DEFAULT_CONFIG) + emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG) ), ?assertEqual( {error, nonmonotonic}, - emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 5, ?DEFAULT_CONFIG) + emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG) ), - {ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), Topics = ["foo/bar", "foo/bar/baz"], Timestamps = lists:seq(1, 100), [ @@ -155,9 +154,9 @@ t_create_gen(_Config) -> ]. t_iterate_multigen(_Config) -> - {ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 1000, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], Timestamps = lists:seq(1, 100), _ = [ @@ -181,9 +180,9 @@ t_iterate_multigen(_Config) -> t_iterate_multigen_preserve_restore(_Config) -> ReplayID = atom_to_binary(?FUNCTION_NAME), - {ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 100, ?DEFAULT_CONFIG), + {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), Topics = ["foo/bar", "foo/bar/baz", "a/bar"], Timestamps = lists:seq(1, 100), TopicFilter = "foo/#", @@ -264,7 +263,7 @@ end_per_suite(_Config) -> init_per_testcase(TC, Config) -> ok = set_keyspace_config(keyspace(TC), ?DEFAULT_CONFIG), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(keyspace(TC), shard(TC), #{}), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}), Config. end_per_testcase(TC, _Config) -> @@ -273,8 +272,11 @@ end_per_testcase(TC, _Config) -> keyspace(TC) -> list_to_atom(lists:concat([?MODULE, "_", TC])). +shard_id(_TC) -> + <<"shard">>. + shard(TC) -> - <<(atom_to_binary(keyspace(TC)))/binary, "_shard">>. + {keyspace(TC), shard_id(TC)}. set_keyspace_config(Keyspace, Config) -> ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}). diff --git a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl index 14e76d707..f9964bebe 100644 --- a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl @@ -11,7 +11,8 @@ -define(RUN_ID, {?MODULE, testrun_id}). -define(KEYSPACE, ?MODULE). --define(SHARD, <<(atom_to_binary(?KEYSPACE))/binary, "_shard">>). +-define(SHARD_ID, <<"shard">>). +-define(SHARD, {?KEYSPACE, ?SHARD_ID}). -define(GEN_ID, 42). %%-------------------------------------------------------------------- @@ -256,7 +257,7 @@ iterate_shim(Shim, Iteration) -> open_db(Filepath, Options) -> {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]), {Schema, CFRefs} = emqx_ds_message_storage_bitmask:create_new(Handle, ?GEN_ID, Options), - DB = emqx_ds_message_storage_bitmask:open(?SHARD, ?KEYSPACE, Handle, ?GEN_ID, CFRefs, Schema), + DB = emqx_ds_message_storage_bitmask:open(?SHARD, Handle, ?GEN_ID, CFRefs, Schema), {DB, Handle}. close_db(Handle) ->