chore(mria): ekka_mnesia:running_nodes -> mria:running_nodes

This commit is contained in:
k32 2021-10-14 18:30:22 +02:00 committed by x1001100011
parent fd482e2ec0
commit ae71bdc0fe
13 changed files with 38 additions and 40 deletions

View File

@ -150,7 +150,7 @@ handle_info({timeout, TRef, tick},
State = #state{ticker = TRef, version = Version, sysdescr = Descr}) ->
publish_any(version, Version),
publish_any(sysdescr, Descr),
publish_any(brokers, ekka_mnesia:running_nodes()),
publish_any(brokers, mria_mnesia:running_nodes()),
publish_any(stats, emqx_stats:getstats()),
publish_any(metrics, emqx_metrics:all()),
{noreply, tick(State), hibernate};

View File

@ -153,7 +153,7 @@ param_path_operation()->
}.
list_bridges(get, _Params) ->
{200, lists:append([list_local_bridges(Node) || Node <- ekka_mnesia:running_nodes()])}.
{200, lists:append([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
list_local_bridges(Node) when Node =:= node() ->
[format_resp(Data) || Data <- emqx_bridge:list_bridges()];
@ -161,7 +161,7 @@ list_local_bridges(Node) ->
rpc_call(Node, list_local_bridges, [Node]).
crud_bridges_cluster(Method, Params) ->
Results = [crud_bridges(Node, Method, Params) || Node <- ekka_mnesia:running_nodes()],
Results = [crud_bridges(Node, Method, Params) || Node <- mria_mnesia:running_nodes()],
case lists:filter(fun({200}) -> false; ({200, _}) -> false; (_) -> true end, Results) of
[] ->
case Results of

View File

@ -158,8 +158,8 @@ counters(get, #{bindings := #{counter := Counter}}) ->
lookup([{<<"counter">>, Counter}]).
current_counters(get, _Params) ->
Data = [get_collect(Node) || Node <- ekka_mnesia:running_nodes()],
Nodes = length(ekka_mnesia:running_nodes()),
Data = [get_collect(Node) || Node <- mria_mnesia:running_nodes()],
Nodes = length(mria_mnesia:running_nodes()),
{Received, Sent, Sub, Conn} = format_current_metrics(Data),
Response = #{
nodes => Nodes,
@ -194,16 +194,16 @@ lookup_(#{node := Node, counter := Counter}) ->
lookup_(#{node := Node}) ->
{200, sampling(Node)};
lookup_(#{counter := Counter}) ->
CounterData = merger_counters([sampling(Node, Counter) || Node <- ekka_mnesia:running_nodes()]),
CounterData = merger_counters([sampling(Node, Counter) || Node <- mria_mnesia:running_nodes()]),
Data = hd(maps:values(CounterData)),
{200, Data}.
list_collect(Aggregate) ->
case Aggregate of
<<"true">> ->
[maps:put(node, Node, sampling(Node)) || Node <- ekka_mnesia:running_nodes()];
[maps:put(node, Node, sampling(Node)) || Node <- mria_mnesia:running_nodes()];
_ ->
Counters = [sampling(Node) || Node <- ekka_mnesia:running_nodes()],
Counters = [sampling(Node) || Node <- mria_mnesia:running_nodes()],
merger_counters(Counters)
end.

View File

@ -209,7 +209,7 @@ confexp({error, already_exist}) ->
emqx_type:clientid(), {atom(), atom()}) -> list().
lookup_client(GwName, ClientId, FormatFun) ->
lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun)
|| Node <- ekka_mnesia:running_nodes()]).
|| Node <- mria_mnesia:running_nodes()]).
lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() ->
ChanTab = emqx_gateway_cm:tabname(chan, GwName),
@ -229,7 +229,7 @@ lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) ->
| ok.
kickout_client(GwName, ClientId) ->
Results = [kickout_client(Node, GwName, ClientId)
|| Node <- ekka_mnesia:running_nodes()],
|| Node <- mria_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)

View File

@ -164,7 +164,7 @@ stopped_node_info(Node) ->
%%--------------------------------------------------------------------
list_brokers() ->
[{Node, broker_info(Node)} || Node <- ekka_mnesia:running_nodes()].
[{Node, broker_info(Node)} || Node <- mria_mnesia:running_nodes()].
lookup_broker(Node) ->
broker_info(Node).
@ -181,7 +181,7 @@ broker_info(Node) ->
%%--------------------------------------------------------------------
get_metrics() ->
nodes_info_count([get_metrics(Node) || Node <- ekka_mnesia:running_nodes()]).
nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]).
get_metrics(Node) when Node =:= node() ->
emqx_metrics:all();
@ -201,7 +201,7 @@ get_stats() ->
begin
Stats = get_stats(Node),
delete_keys(Stats, GlobalStatsKeys)
end || Node <- ekka_mnesia:running_nodes()]),
end || Node <- mria_mnesia:running_nodes()]),
GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))),
maps:merge(CountStats, GlobalStats).
@ -232,10 +232,10 @@ nodes_info_count(PropList) ->
%%--------------------------------------------------------------------
lookup_client({clientid, ClientId}, FormatFun) ->
lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- ekka_mnesia:running_nodes()]);
lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- mria_mnesia:running_nodes()]);
lookup_client({username, Username}, FormatFun) ->
lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]).
lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- mria_mnesia:running_nodes()]).
lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() ->
lists:append(lists:map(
@ -257,7 +257,7 @@ lookup_client(Node, {username, Username}, FormatFun) ->
rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]).
kickout_client(ClientId) ->
Results = [kickout_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
Results = [kickout_client(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
@ -273,7 +273,7 @@ list_authz_cache(ClientId) ->
call_client(ClientId, list_authz_cache).
list_client_subscriptions(ClientId) ->
Results = [client_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
Results = [client_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
Expected = lists:filter(fun({error, _}) -> false;
([]) -> false;
(_) -> true
@ -290,7 +290,7 @@ client_subscriptions(Node, ClientId) ->
rpc_call(Node, client_subscriptions, [Node, ClientId]).
clean_authz_cache(ClientId) ->
Results = [clean_authz_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
@ -308,7 +308,7 @@ clean_authz_cache(Node, ClientId) ->
rpc_call(Node, clean_authz_cache, [Node, ClientId]).
clean_authz_cache_all() ->
Results = [{Node, clean_authz_cache_all(Node)} || Node <- ekka_mnesia:running_nodes()],
Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()],
case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of
[] -> ok;
BadNodes -> {error, BadNodes}
@ -328,7 +328,7 @@ set_quota_policy(ClientId, Policy) ->
%% @private
call_client(ClientId, Req) ->
Results = [call_client(Node, ClientId, Req) || Node <- ekka_mnesia:running_nodes()],
Results = [call_client(Node, ClientId, Req) || Node <- mria_mnesia:running_nodes()],
Expected = lists:filter(fun({error, _}) -> false;
(_) -> true
end, Results),
@ -366,7 +366,7 @@ list_subscriptions(Node) ->
rpc_call(Node, list_subscriptions, [Node]).
list_subscriptions_via_topic(Topic, FormatFun) ->
lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- ekka_mnesia:running_nodes()]).
lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- mria_mnesia:running_nodes()]).
list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}],
@ -376,7 +376,7 @@ list_subscriptions_via_topic(Node, Topic, FormatFun) ->
rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
lookup_subscriptions(ClientId) ->
lists:append([lookup_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()]).
lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]).
lookup_subscriptions(Node, ClientId) when Node =:= node() ->
case ets:lookup(emqx_subid, ClientId) of
@ -400,7 +400,7 @@ lookup_routes(Topic) ->
%%--------------------------------------------------------------------
subscribe(ClientId, TopicTables) ->
subscribe(ekka_mnesia:running_nodes(), ClientId, TopicTables).
subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables).
subscribe([Node | Nodes], ClientId, TopicTables) ->
case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of
@ -424,7 +424,7 @@ publish(Msg) ->
emqx:publish(Msg).
unsubscribe(ClientId, Topic) ->
unsubscribe(ekka_mnesia:running_nodes(), ClientId, Topic).
unsubscribe(mria_mnesia:running_nodes(), ClientId, Topic).
unsubscribe([Node | Nodes], ClientId, Topic) ->
case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of
@ -447,7 +447,7 @@ do_unsubscribe(ClientId, Topic) ->
%%--------------------------------------------------------------------
list_plugins() ->
[{Node, list_plugins(Node)} || Node <- ekka_mnesia:running_nodes()].
[{Node, list_plugins(Node)} || Node <- mria_mnesia:running_nodes()].
list_plugins(Node) when Node =:= node() ->
emqx_plugins:list();
@ -474,7 +474,7 @@ reload_plugin(Node, Plugin) ->
%%--------------------------------------------------------------------
list_listeners() ->
lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]).
lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]).
list_listeners(Node) when Node =:= node() ->
[Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()];
@ -505,7 +505,7 @@ manage_listener(Operation, Param = #{node := Node}) ->
rpc_call(Node, manage_listener, [Operation, Param]).
update_listener(Id, Config) ->
[update_listener(Node, Id, Config) || Node <- ekka_mnesia:running_nodes()].
[update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()].
update_listener(Node, Id, Config) when Node =:= node() ->
case emqx_listeners:parse_listener_id(Id) of
@ -523,7 +523,7 @@ update_listener(Node, Id, Config) ->
rpc_call(Node, update_listener, [Node, Id, Config]).
remove_listener(Id) ->
[remove_listener(Node, Id) || Node <- ekka_mnesia:running_nodes()].
[remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()].
remove_listener(Node, Id) when Node =:= node() ->
{Type, Name} = emqx_listeners:parse_listener_id(Id),
@ -540,7 +540,7 @@ remove_listener(Node, Id) ->
%%--------------------------------------------------------------------
get_alarms(Type) ->
[{Node, get_alarms(Node, Type)} || Node <- ekka_mnesia:running_nodes()].
[{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()].
get_alarms(Node, Type) when Node =:= node() ->
add_duration_field(emqx_alarm:get_alarms(Type));
@ -553,7 +553,7 @@ deactivate(Node, Name) ->
rpc_call(Node, deactivate, [Node, Name]).
delete_all_deactivated_alarms() ->
[delete_all_deactivated_alarms(Node) || Node <- ekka_mnesia:running_nodes()].
[delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()].
delete_all_deactivated_alarms(Node) when Node =:= node() ->
emqx_alarm:delete_all_deactivated_alarms();
@ -621,5 +621,3 @@ max_row_limit() ->
?MAX_ROW_LIMIT.
table_size(Tab) -> ets:info(Tab, size).

View File

@ -167,7 +167,7 @@ cluster_query(Params, Tab, QsSchema, QueryFun) ->
{_CodCnt, Qs} = params2qs(Params, QsSchema),
Limit = b2i(limit(Params)),
Page = b2i(page(Params)),
Nodes = ekka_mnesia:running_nodes(),
Nodes = mria_mnesia:running_nodes(),
Meta = #{page => Page, limit => Limit, count => 0},
page_limit_check_query(Meta, {fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, Meta]}).

View File

@ -300,7 +300,7 @@ manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}})
Result;
manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) ->
Results = [do_manage_listeners(Node, Id, Oper) || Node <- ekka_mnesia:running_nodes()],
Results = [do_manage_listeners(Node, Id, Oper) || Node <- mria_mnesia:running_nodes()],
case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of
[] -> {200};
Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}}

View File

@ -154,6 +154,6 @@ list(get, #{query_string := Qs}) ->
{200, emqx_mgmt:get_metrics()};
_ ->
Data = [maps:from_list(emqx_mgmt:get_metrics(Node) ++ [{node, Node}]) ||
Node <- ekka_mnesia:running_nodes()],
Node <- mria_mnesia:running_nodes()],
{200, Data}
end.

View File

@ -96,6 +96,6 @@ list(get, #{query_string := Qs}) ->
{200, emqx_mgmt:get_stats()};
_ ->
Data = [maps:from_list(emqx_mgmt:get_stats(Node) ++ [{node, Node}]) ||
Node <- ekka_mnesia:running_nodes()],
Node <- mria_mnesia:running_nodes()],
{200, Data}
end.

View File

@ -210,7 +210,7 @@ generate_max_delayed_messages(Config) ->
update_config_(Config) ->
lists:foreach(fun(Node) ->
update_config_(Node, Config)
end, ekka_mnesia:running_nodes()).
end, mria_mnesia:running_nodes()).
update_config_(Node, Config) when Node =:= node() ->
_ = emqx_delayed:update_config(Config),

View File

@ -268,7 +268,7 @@ uptime() ->
element(1, erlang:statistics(wall_clock)).
nodes_uuid() ->
Nodes = lists:delete(node(), ekka_mnesia:running_nodes()),
Nodes = lists:delete(node(), mria_mnesia:running_nodes()),
lists:foldl(fun(Node, Acc) ->
case rpc:call(Node, ?MODULE, get_uuid, []) of
{badrpc, _Reason} ->

View File

@ -152,7 +152,7 @@ data(get, _Request) ->
enable_telemetry(Enable) ->
lists:foreach(fun(Node) ->
enable_telemetry(Node, Enable)
end, ekka_mnesia:running_nodes()).
end, mria_mnesia:running_nodes()).
enable_telemetry(Node, Enable) when Node =:= node() ->
case Enable of

View File

@ -338,4 +338,4 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
get_rule_metrics(Id) ->
[maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))
|| Node <- ekka_mnesia:running_nodes()].
|| Node <- mria_mnesia:running_nodes()].