From ae71bdc0febe6eeff64615f386d795aea3c40dcc Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Thu, 14 Oct 2021 18:30:22 +0200 Subject: [PATCH] chore(mria): ekka_mnesia:running_nodes -> mria:running_nodes --- apps/emqx/src/emqx_sys.erl | 2 +- apps/emqx_bridge/src/emqx_bridge_api.erl | 4 +- .../src/emqx_dashboard_monitor_api.erl | 10 ++--- apps/emqx_gateway/src/emqx_gateway_http.erl | 4 +- apps/emqx_management/src/emqx_mgmt.erl | 42 +++++++++---------- apps/emqx_management/src/emqx_mgmt_api.erl | 2 +- .../src/emqx_mgmt_api_listeners.erl | 2 +- .../src/emqx_mgmt_api_metrics.erl | 2 +- .../src/emqx_mgmt_api_stats.erl | 2 +- apps/emqx_modules/src/emqx_delayed_api.erl | 2 +- apps/emqx_modules/src/emqx_telemetry.erl | 2 +- apps/emqx_modules/src/emqx_telemetry_api.erl | 2 +- .../src/emqx_rule_engine_api.erl | 2 +- 13 files changed, 38 insertions(+), 40 deletions(-) diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 692d2bd0a..7882abc19 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -150,7 +150,7 @@ handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Version, sysdescr = Descr}) -> publish_any(version, Version), publish_any(sysdescr, Descr), - publish_any(brokers, ekka_mnesia:running_nodes()), + publish_any(brokers, mria_mnesia:running_nodes()), publish_any(stats, emqx_stats:getstats()), publish_any(metrics, emqx_metrics:all()), {noreply, tick(State), hibernate}; diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index e4805d7eb..40a101640 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -153,7 +153,7 @@ param_path_operation()-> }. list_bridges(get, _Params) -> - {200, lists:append([list_local_bridges(Node) || Node <- ekka_mnesia:running_nodes()])}. + {200, lists:append([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}. list_local_bridges(Node) when Node =:= node() -> [format_resp(Data) || Data <- emqx_bridge:list_bridges()]; @@ -161,7 +161,7 @@ list_local_bridges(Node) -> rpc_call(Node, list_local_bridges, [Node]). crud_bridges_cluster(Method, Params) -> - Results = [crud_bridges(Node, Method, Params) || Node <- ekka_mnesia:running_nodes()], + Results = [crud_bridges(Node, Method, Params) || Node <- mria_mnesia:running_nodes()], case lists:filter(fun({200}) -> false; ({200, _}) -> false; (_) -> true end, Results) of [] -> case Results of diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index c00310211..ba2fc6dbe 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -158,8 +158,8 @@ counters(get, #{bindings := #{counter := Counter}}) -> lookup([{<<"counter">>, Counter}]). current_counters(get, _Params) -> - Data = [get_collect(Node) || Node <- ekka_mnesia:running_nodes()], - Nodes = length(ekka_mnesia:running_nodes()), + Data = [get_collect(Node) || Node <- mria_mnesia:running_nodes()], + Nodes = length(mria_mnesia:running_nodes()), {Received, Sent, Sub, Conn} = format_current_metrics(Data), Response = #{ nodes => Nodes, @@ -194,16 +194,16 @@ lookup_(#{node := Node, counter := Counter}) -> lookup_(#{node := Node}) -> {200, sampling(Node)}; lookup_(#{counter := Counter}) -> - CounterData = merger_counters([sampling(Node, Counter) || Node <- ekka_mnesia:running_nodes()]), + CounterData = merger_counters([sampling(Node, Counter) || Node <- mria_mnesia:running_nodes()]), Data = hd(maps:values(CounterData)), {200, Data}. list_collect(Aggregate) -> case Aggregate of <<"true">> -> - [maps:put(node, Node, sampling(Node)) || Node <- ekka_mnesia:running_nodes()]; + [maps:put(node, Node, sampling(Node)) || Node <- mria_mnesia:running_nodes()]; _ -> - Counters = [sampling(Node) || Node <- ekka_mnesia:running_nodes()], + Counters = [sampling(Node) || Node <- mria_mnesia:running_nodes()], merger_counters(Counters) end. diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 07a5ae3c4..da440dba8 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -209,7 +209,7 @@ confexp({error, already_exist}) -> emqx_type:clientid(), {atom(), atom()}) -> list(). lookup_client(GwName, ClientId, FormatFun) -> lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) - || Node <- ekka_mnesia:running_nodes()]). + || Node <- mria_mnesia:running_nodes()]). lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() -> ChanTab = emqx_gateway_cm:tabname(chan, GwName), @@ -229,7 +229,7 @@ lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) -> | ok. kickout_client(GwName, ClientId) -> Results = [kickout_client(Node, GwName, ClientId) - || Node <- ekka_mnesia:running_nodes()], + || Node <- mria_mnesia:running_nodes()], case lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; false -> lists:last(Results) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 3b1fd5903..eca5b3e56 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -164,7 +164,7 @@ stopped_node_info(Node) -> %%-------------------------------------------------------------------- list_brokers() -> - [{Node, broker_info(Node)} || Node <- ekka_mnesia:running_nodes()]. + [{Node, broker_info(Node)} || Node <- mria_mnesia:running_nodes()]. lookup_broker(Node) -> broker_info(Node). @@ -181,7 +181,7 @@ broker_info(Node) -> %%-------------------------------------------------------------------- get_metrics() -> - nodes_info_count([get_metrics(Node) || Node <- ekka_mnesia:running_nodes()]). + nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]). get_metrics(Node) when Node =:= node() -> emqx_metrics:all(); @@ -201,7 +201,7 @@ get_stats() -> begin Stats = get_stats(Node), delete_keys(Stats, GlobalStatsKeys) - end || Node <- ekka_mnesia:running_nodes()]), + end || Node <- mria_mnesia:running_nodes()]), GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))), maps:merge(CountStats, GlobalStats). @@ -232,10 +232,10 @@ nodes_info_count(PropList) -> %%-------------------------------------------------------------------- lookup_client({clientid, ClientId}, FormatFun) -> - lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- ekka_mnesia:running_nodes()]); + lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- mria_mnesia:running_nodes()]); lookup_client({username, Username}, FormatFun) -> - lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]). + lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- mria_mnesia:running_nodes()]). lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() -> lists:append(lists:map( @@ -257,7 +257,7 @@ lookup_client(Node, {username, Username}, FormatFun) -> rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]). kickout_client(ClientId) -> - Results = [kickout_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], + Results = [kickout_client(Node, ClientId) || Node <- mria_mnesia:running_nodes()], case lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; false -> lists:last(Results) @@ -273,7 +273,7 @@ list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache). list_client_subscriptions(ClientId) -> - Results = [client_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], + Results = [client_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()], Expected = lists:filter(fun({error, _}) -> false; ([]) -> false; (_) -> true @@ -290,7 +290,7 @@ client_subscriptions(Node, ClientId) -> rpc_call(Node, client_subscriptions, [Node, ClientId]). clean_authz_cache(ClientId) -> - Results = [clean_authz_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], + Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()], case lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; false -> lists:last(Results) @@ -308,7 +308,7 @@ clean_authz_cache(Node, ClientId) -> rpc_call(Node, clean_authz_cache, [Node, ClientId]). clean_authz_cache_all() -> - Results = [{Node, clean_authz_cache_all(Node)} || Node <- ekka_mnesia:running_nodes()], + Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()], case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of [] -> ok; BadNodes -> {error, BadNodes} @@ -328,7 +328,7 @@ set_quota_policy(ClientId, Policy) -> %% @private call_client(ClientId, Req) -> - Results = [call_client(Node, ClientId, Req) || Node <- ekka_mnesia:running_nodes()], + Results = [call_client(Node, ClientId, Req) || Node <- mria_mnesia:running_nodes()], Expected = lists:filter(fun({error, _}) -> false; (_) -> true end, Results), @@ -366,7 +366,7 @@ list_subscriptions(Node) -> rpc_call(Node, list_subscriptions, [Node]). list_subscriptions_via_topic(Topic, FormatFun) -> - lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- ekka_mnesia:running_nodes()]). + lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- mria_mnesia:running_nodes()]). list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() -> MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}], @@ -376,7 +376,7 @@ list_subscriptions_via_topic(Node, Topic, FormatFun) -> rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]). lookup_subscriptions(ClientId) -> - lists:append([lookup_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()]). + lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]). lookup_subscriptions(Node, ClientId) when Node =:= node() -> case ets:lookup(emqx_subid, ClientId) of @@ -400,7 +400,7 @@ lookup_routes(Topic) -> %%-------------------------------------------------------------------- subscribe(ClientId, TopicTables) -> - subscribe(ekka_mnesia:running_nodes(), ClientId, TopicTables). + subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables). subscribe([Node | Nodes], ClientId, TopicTables) -> case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of @@ -424,7 +424,7 @@ publish(Msg) -> emqx:publish(Msg). unsubscribe(ClientId, Topic) -> - unsubscribe(ekka_mnesia:running_nodes(), ClientId, Topic). + unsubscribe(mria_mnesia:running_nodes(), ClientId, Topic). unsubscribe([Node | Nodes], ClientId, Topic) -> case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of @@ -447,7 +447,7 @@ do_unsubscribe(ClientId, Topic) -> %%-------------------------------------------------------------------- list_plugins() -> - [{Node, list_plugins(Node)} || Node <- ekka_mnesia:running_nodes()]. + [{Node, list_plugins(Node)} || Node <- mria_mnesia:running_nodes()]. list_plugins(Node) when Node =:= node() -> emqx_plugins:list(); @@ -474,7 +474,7 @@ reload_plugin(Node, Plugin) -> %%-------------------------------------------------------------------- list_listeners() -> - lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]). + lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]). list_listeners(Node) when Node =:= node() -> [Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()]; @@ -505,7 +505,7 @@ manage_listener(Operation, Param = #{node := Node}) -> rpc_call(Node, manage_listener, [Operation, Param]). update_listener(Id, Config) -> - [update_listener(Node, Id, Config) || Node <- ekka_mnesia:running_nodes()]. + [update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()]. update_listener(Node, Id, Config) when Node =:= node() -> case emqx_listeners:parse_listener_id(Id) of @@ -523,7 +523,7 @@ update_listener(Node, Id, Config) -> rpc_call(Node, update_listener, [Node, Id, Config]). remove_listener(Id) -> - [remove_listener(Node, Id) || Node <- ekka_mnesia:running_nodes()]. + [remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()]. remove_listener(Node, Id) when Node =:= node() -> {Type, Name} = emqx_listeners:parse_listener_id(Id), @@ -540,7 +540,7 @@ remove_listener(Node, Id) -> %%-------------------------------------------------------------------- get_alarms(Type) -> - [{Node, get_alarms(Node, Type)} || Node <- ekka_mnesia:running_nodes()]. + [{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()]. get_alarms(Node, Type) when Node =:= node() -> add_duration_field(emqx_alarm:get_alarms(Type)); @@ -553,7 +553,7 @@ deactivate(Node, Name) -> rpc_call(Node, deactivate, [Node, Name]). delete_all_deactivated_alarms() -> - [delete_all_deactivated_alarms(Node) || Node <- ekka_mnesia:running_nodes()]. + [delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()]. delete_all_deactivated_alarms(Node) when Node =:= node() -> emqx_alarm:delete_all_deactivated_alarms(); @@ -621,5 +621,3 @@ max_row_limit() -> ?MAX_ROW_LIMIT. table_size(Tab) -> ets:info(Tab, size). - - diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 9afb6090e..d79eab587 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -167,7 +167,7 @@ cluster_query(Params, Tab, QsSchema, QueryFun) -> {_CodCnt, Qs} = params2qs(Params, QsSchema), Limit = b2i(limit(Params)), Page = b2i(page(Params)), - Nodes = ekka_mnesia:running_nodes(), + Nodes = mria_mnesia:running_nodes(), Meta = #{page => Page, limit => Limit, count => 0}, page_limit_check_query(Meta, {fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, Meta]}). diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 9e12cbe3a..52c7a8709 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -300,7 +300,7 @@ manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}}) Result; manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) -> - Results = [do_manage_listeners(Node, Id, Oper) || Node <- ekka_mnesia:running_nodes()], + Results = [do_manage_listeners(Node, Id, Oper) || Node <- mria_mnesia:running_nodes()], case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of [] -> {200}; Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}} diff --git a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl index 2795fe342..8c0b364c0 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl @@ -154,6 +154,6 @@ list(get, #{query_string := Qs}) -> {200, emqx_mgmt:get_metrics()}; _ -> Data = [maps:from_list(emqx_mgmt:get_metrics(Node) ++ [{node, Node}]) || - Node <- ekka_mnesia:running_nodes()], + Node <- mria_mnesia:running_nodes()], {200, Data} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_stats.erl b/apps/emqx_management/src/emqx_mgmt_api_stats.erl index 470b5fda1..da8e643d8 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_stats.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_stats.erl @@ -96,6 +96,6 @@ list(get, #{query_string := Qs}) -> {200, emqx_mgmt:get_stats()}; _ -> Data = [maps:from_list(emqx_mgmt:get_stats(Node) ++ [{node, Node}]) || - Node <- ekka_mnesia:running_nodes()], + Node <- mria_mnesia:running_nodes()], {200, Data} end. diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 94a388767..768ef4590 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -210,7 +210,7 @@ generate_max_delayed_messages(Config) -> update_config_(Config) -> lists:foreach(fun(Node) -> update_config_(Node, Config) - end, ekka_mnesia:running_nodes()). + end, mria_mnesia:running_nodes()). update_config_(Node, Config) when Node =:= node() -> _ = emqx_delayed:update_config(Config), diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 04586e8fc..3c3f9bd6a 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -268,7 +268,7 @@ uptime() -> element(1, erlang:statistics(wall_clock)). nodes_uuid() -> - Nodes = lists:delete(node(), ekka_mnesia:running_nodes()), + Nodes = lists:delete(node(), mria_mnesia:running_nodes()), lists:foldl(fun(Node, Acc) -> case rpc:call(Node, ?MODULE, get_uuid, []) of {badrpc, _Reason} -> diff --git a/apps/emqx_modules/src/emqx_telemetry_api.erl b/apps/emqx_modules/src/emqx_telemetry_api.erl index 5d1cffdcd..7dd36654a 100644 --- a/apps/emqx_modules/src/emqx_telemetry_api.erl +++ b/apps/emqx_modules/src/emqx_telemetry_api.erl @@ -152,7 +152,7 @@ data(get, _Request) -> enable_telemetry(Enable) -> lists:foreach(fun(Node) -> enable_telemetry(Node, Enable) - end, ekka_mnesia:running_nodes()). + end, mria_mnesia:running_nodes()). enable_telemetry(Node, Enable) when Node =:= node() -> case Enable of diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 5e6b28b61..b9a3b16f7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -338,4 +338,4 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) -> get_rule_metrics(Id) -> [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id])) - || Node <- ekka_mnesia:running_nodes()]. + || Node <- mria_mnesia:running_nodes()].