From b3ded7edce091f7d8e7905feb14a3f226ed24180 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 20 May 2024 12:56:53 +0200 Subject: [PATCH] fix(ds): Fix code review remark --- .../src/emqx_ds_builtin_db_sup.erl | 28 +++++++++++ .../src/emqx_ds_replication_layer_egress.erl | 20 +++++--- .../src/emqx_ds_storage_layer.erl | 47 ++++++++++++------- 3 files changed, 71 insertions(+), 24 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 06e925c1b..b2a461e7a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -33,6 +33,12 @@ ]). -export([which_dbs/0, which_shards/1]). +%% Debug: +-export([ + get_egress_workers/1, + get_shard_workers/1 +]). + %% behaviour callbacks: -export([init/1]). @@ -111,6 +117,28 @@ which_dbs() -> Key = {n, l, #?db_sup{_ = '_', db = '$1'}}, gproc:select({local, names}, [{{Key, '_', '_'}, [], ['$1']}]). +%% @doc Get pids of all local egress servers for the given DB. +-spec get_egress_workers(emqx_ds:db()) -> #{_Shard => pid()}. +get_egress_workers(DB) -> + Children = supervisor:which_children(?via(#?egress_sup{db = DB})), + L = [{Shard, Child} || {Shard, Child, _, _} <- Children, is_pid(Child)], + maps:from_list(L). + +%% @doc Get pids of all local shard servers for the given DB. +-spec get_shard_workers(emqx_ds:db()) -> #{_Shard => pid()}. +get_shard_workers(DB) -> + Shards = supervisor:which_children(?via(#?shards_sup{db = DB})), + L = lists:flatmap( + fun + ({_Shard, Sup, _, _}) when is_pid(Sup) -> + [{Id, Pid} || {Id, Pid, _, _} <- supervisor:which_children(Sup), is_pid(Pid)]; + (_) -> + [] + end, + Shards + ), + maps:from_list(L). + %%================================================================================ %% behaviour callbacks %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index dc76aecf0..1d0efca6f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -129,12 +129,20 @@ init([DB, Shard]) -> }, {ok, S}. -format_status(#s{db = DB, shard = Shard, queue = Q}) -> - #{ - db => DB, - shard => Shard, - queue => queue:len(Q) - }. +format_status(Status) -> + maps:map( + fun + (state, #s{db = DB, shard = Shard, queue = Q}) -> + #{ + db => DB, + shard => Shard, + queue => queue:len(Q) + }; + (_, Val) -> + Val + end, + Status + ). handle_call( #enqueue_req{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 71bf6fa6e..7d1fffbcb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -589,23 +589,16 @@ init({ShardId, Options}) -> commit_metadata(S), {ok, S}. -format_status(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) -> - #{ - id => ShardId, - db => DB, - cf_refs => CFRefs, - schema => Schema, - shard => - maps:map( - fun - (?GEN_KEY(_), _Schema) -> - '...'; - (_K, Val) -> - Val - end, - Shard - ) - }. +format_status(Status) -> + maps:map( + fun + (state, State) -> + format_state(State); + (_, Val) -> + Val + end, + Status + ). handle_call(#call_update_config{since = Since, options = Options}, _From, S0) -> case handle_update_config(S0, Since, Options) of @@ -791,7 +784,7 @@ handle_drop_generation(S0, GenId) -> EC => Err, stacktrace => Stack, generation => GenId, - s => format_status(S0) + s => format_state(S0) } ) end, @@ -994,6 +987,24 @@ generations_since(Shard, Since) -> Schema ). +format_state(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) -> + #{ + id => ShardId, + db => DB, + cf_refs => CFRefs, + schema => Schema, + shard => + maps:map( + fun + (?GEN_KEY(_), _Schema) -> + '...'; + (_K, Val) -> + Val + end, + Shard + ) + }. + -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}). -spec get_schema_runtime(shard_id()) -> shard().