diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index ef1600500..06e925c1b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -118,6 +118,7 @@ which_dbs() -> init({#?db_sup{db = DB}, DefaultOpts}) -> %% Spec for the top-level supervisor for the database: logger:notice("Starting DS DB ~p", [DB]), + emqx_ds_builtin_sup:clean_gvars(DB), emqx_ds_builtin_metrics:init_for_db(DB), Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), ok = start_ra_system(DB, Opts), diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl index 45e81bdc9..30b72e5a8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl @@ -23,6 +23,7 @@ %% API: -export([start_db/2, stop_db/1]). +-export([set_gvar/3, get_gvar/3, clean_gvars/1]). %% behavior callbacks: -export([init/1]). @@ -39,6 +40,13 @@ -define(top, ?MODULE). -define(databases, emqx_ds_builtin_databases_sup). +-define(gvar_tab, emqx_ds_builtin_gvar). + +-record(gvar, { + k :: {emqx_ds:db(), _Key}, + v :: _Value +}). + %%================================================================================ %% API functions %%================================================================================ @@ -61,11 +69,30 @@ stop_db(DB) -> Pid when is_pid(Pid) -> _ = supervisor:terminate_child(?databases, DB), _ = supervisor:delete_child(?databases, DB), - ok; + clean_gvars(DB); undefined -> ok end. +%% @doc Set a DB-global variable. Please don't abuse this API. +-spec set_gvar(emqx_ds:db(), _Key, _Val) -> ok. +set_gvar(DB, Key, Val) -> + ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}). + +-spec get_gvar(emqx_ds:db(), _Key, Val) -> Val. +get_gvar(DB, Key, Default) -> + case ets:lookup(?gvar_tab, {DB, Key}) of + [#gvar{v = Val}] -> + Val; + [] -> + Default + end. + +-spec clean_gvars(emqx_ds:db()) -> ok. +clean_gvars(DB) -> + ets:match_delete(?gvar_tab, #gvar{k = {DB, '_'}, _ = '_'}), + ok. + %%================================================================================ %% behavior callbacks %%================================================================================ @@ -96,6 +123,7 @@ init(?top) -> type => supervisor, shutdown => infinity }, + ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]), %% SupFlags = #{ strategy => one_for_all, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 3ff87ab44..9f9f28676 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -363,6 +363,7 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> end, integer_to_binary(Hash). +-spec foreach_shard(emqx_ds:db(), fun((shard_id()) -> _)) -> ok. foreach_shard(DB, Fun) -> lists:foreach(Fun, list_shards(DB)).