fix(ds): Fix code review remark

This commit is contained in:
ieQu1 2024-05-20 12:56:53 +02:00
parent 60edf5e9b8
commit b3ded7edce
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
3 changed files with 71 additions and 24 deletions

View File

@ -33,6 +33,12 @@
]).
-export([which_dbs/0, which_shards/1]).
%% Debug:
-export([
get_egress_workers/1,
get_shard_workers/1
]).
%% behaviour callbacks:
-export([init/1]).
@ -111,6 +117,28 @@ which_dbs() ->
Key = {n, l, #?db_sup{_ = '_', db = '$1'}},
gproc:select({local, names}, [{{Key, '_', '_'}, [], ['$1']}]).
%% @doc Get pids of all local egress servers for the given DB.
-spec get_egress_workers(emqx_ds:db()) -> #{_Shard => pid()}.
get_egress_workers(DB) ->
Children = supervisor:which_children(?via(#?egress_sup{db = DB})),
L = [{Shard, Child} || {Shard, Child, _, _} <- Children, is_pid(Child)],
maps:from_list(L).
%% @doc Get pids of all local shard servers for the given DB.
-spec get_shard_workers(emqx_ds:db()) -> #{_Shard => pid()}.
get_shard_workers(DB) ->
Shards = supervisor:which_children(?via(#?shards_sup{db = DB})),
L = lists:flatmap(
fun
({_Shard, Sup, _, _}) when is_pid(Sup) ->
[{Id, Pid} || {Id, Pid, _, _} <- supervisor:which_children(Sup), is_pid(Pid)];
(_) ->
[]
end,
Shards
),
maps:from_list(L).
%%================================================================================
%% behaviour callbacks
%%================================================================================

View File

@ -129,12 +129,20 @@ init([DB, Shard]) ->
},
{ok, S}.
format_status(#s{db = DB, shard = Shard, queue = Q}) ->
format_status(Status) ->
maps:map(
fun
(state, #s{db = DB, shard = Shard, queue = Q}) ->
#{
db => DB,
shard => Shard,
queue => queue:len(Q)
}.
};
(_, Val) ->
Val
end,
Status
).
handle_call(
#enqueue_req{

View File

@ -589,23 +589,16 @@ init({ShardId, Options}) ->
commit_metadata(S),
{ok, S}.
format_status(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) ->
#{
id => ShardId,
db => DB,
cf_refs => CFRefs,
schema => Schema,
shard =>
format_status(Status) ->
maps:map(
fun
(?GEN_KEY(_), _Schema) ->
'...';
(_K, Val) ->
(state, State) ->
format_state(State);
(_, Val) ->
Val
end,
Shard
)
}.
Status
).
handle_call(#call_update_config{since = Since, options = Options}, _From, S0) ->
case handle_update_config(S0, Since, Options) of
@ -791,7 +784,7 @@ handle_drop_generation(S0, GenId) ->
EC => Err,
stacktrace => Stack,
generation => GenId,
s => format_status(S0)
s => format_state(S0)
}
)
end,
@ -994,6 +987,24 @@ generations_since(Shard, Since) ->
Schema
).
format_state(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) ->
#{
id => ShardId,
db => DB,
cf_refs => CFRefs,
schema => Schema,
shard =>
maps:map(
fun
(?GEN_KEY(_), _Schema) ->
'...';
(_K, Val) ->
Val
end,
Shard
)
}.
-define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).
-spec get_schema_runtime(shard_id()) -> shard().