diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index ef870685a..ffee5fba7 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -30,6 +30,12 @@ stop/0 ]). +%% Cluster API +-export([ + cluster_nodes/1, + running_nodes/0 +]). + %% PubSub API -export([ subscribe/1, @@ -102,6 +108,18 @@ is_running() -> _ -> true end. +%%-------------------------------------------------------------------- +%% Cluster API +%%-------------------------------------------------------------------- + +-spec running_nodes() -> [node()]. +running_nodes() -> + mria:running_nodes(). + +-spec cluster_nodes(all | running | cores | stopped) -> [node()]. +cluster_nodes(Type) -> + mria:cluster_nodes(Type). + %%-------------------------------------------------------------------- %% PubSub API %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_SUITE.erl b/apps/emqx/test/emqx_SUITE.erl index 09d5d8017..64ed2ea19 100644 --- a/apps/emqx/test/emqx_SUITE.erl +++ b/apps/emqx/test/emqx_SUITE.erl @@ -148,6 +148,14 @@ t_run_hook(_) -> ?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)), ?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)). +t_cluster_nodes(_) -> + Expected = [node()], + ?assertEqual(Expected, emqx:running_nodes()), + ?assertEqual(Expected, emqx:cluster_nodes(running)), + ?assertEqual(Expected, emqx:cluster_nodes(all)), + ?assertEqual(Expected, emqx:cluster_nodes(cores)), + ?assertEqual([], emqx:cluster_nodes(stopped)). + %%-------------------------------------------------------------------- %% Hook fun %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 5ba12646f..0b91817f0 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -112,8 +112,8 @@ %%-------------------------------------------------------------------- list_nodes() -> - Running = mria:cluster_nodes(running), - Stopped = mria:cluster_nodes(stopped), + Running = emqx:cluster_nodes(running), + Stopped = emqx:cluster_nodes(stopped), DownNodes = lists:map(fun stopped_node_info/1, Stopped), [{Node, Info} || #{node := Node} = Info <- node_info(Running)] ++ DownNodes. @@ -199,7 +199,7 @@ vm_stats() -> %%-------------------------------------------------------------------- list_brokers() -> - Running = mria:running_nodes(), + Running = emqx:running_nodes(), [{Node, Broker} || #{node := Node} = Broker <- broker_info(Running)]. lookup_broker(Node) -> @@ -223,7 +223,7 @@ broker_info(Nodes) -> %%-------------------------------------------------------------------- get_metrics() -> - nodes_info_count([get_metrics(Node) || Node <- mria:running_nodes()]). + nodes_info_count([get_metrics(Node) || Node <- emqx:running_nodes()]). get_metrics(Node) -> unwrap_rpc(emqx_proto_v1:get_metrics(Node)). @@ -238,13 +238,20 @@ get_stats() -> 'subscriptions.shared.count', 'subscriptions.shared.max' ], - CountStats = nodes_info_count([ - begin - Stats = get_stats(Node), - delete_keys(Stats, GlobalStatsKeys) - end - || Node <- mria:running_nodes() - ]), + CountStats = nodes_info_count( + lists:foldl( + fun(Node, Acc) -> + case get_stats(Node) of + {error, _} -> + Acc; + Stats -> + [delete_keys(Stats, GlobalStatsKeys) | Acc] + end + end, + [], + emqx:running_nodes() + ) + ), GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))), maps:merge(CountStats, GlobalStats). @@ -275,12 +282,12 @@ nodes_info_count(PropList) -> lookup_client({clientid, ClientId}, FormatFun) -> lists:append([ lookup_client(Node, {clientid, ClientId}, FormatFun) - || Node <- mria:running_nodes() + || Node <- emqx:running_nodes() ]); lookup_client({username, Username}, FormatFun) -> lists:append([ lookup_client(Node, {username, Username}, FormatFun) - || Node <- mria:running_nodes() + || Node <- emqx:running_nodes() ]). lookup_client(Node, Key, FormatFun) -> @@ -307,7 +314,7 @@ kickout_client(ClientId) -> [] -> {error, not_found}; _ -> - Results = [kickout_client(Node, ClientId) || Node <- mria:running_nodes()], + Results = [kickout_client(Node, ClientId) || Node <- emqx:running_nodes()], check_results(Results) end. @@ -322,7 +329,7 @@ list_client_subscriptions(ClientId) -> [] -> {error, not_found}; _ -> - Results = [client_subscriptions(Node, ClientId) || Node <- mria:running_nodes()], + Results = [client_subscriptions(Node, ClientId) || Node <- emqx:running_nodes()], Filter = fun ({error, _}) -> @@ -340,18 +347,18 @@ client_subscriptions(Node, ClientId) -> {Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}. clean_authz_cache(ClientId) -> - Results = [clean_authz_cache(Node, ClientId) || Node <- mria:running_nodes()], + Results = [clean_authz_cache(Node, ClientId) || Node <- emqx:running_nodes()], check_results(Results). clean_authz_cache(Node, ClientId) -> unwrap_rpc(emqx_proto_v1:clean_authz_cache(Node, ClientId)). clean_authz_cache_all() -> - Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria:running_nodes()], + Results = [{Node, clean_authz_cache_all(Node)} || Node <- emqx:running_nodes()], wrap_results(Results). clean_pem_cache_all() -> - Results = [{Node, clean_pem_cache_all(Node)} || Node <- mria:running_nodes()], + Results = [{Node, clean_pem_cache_all(Node)} || Node <- emqx:running_nodes()], wrap_results(Results). wrap_results(Results) -> @@ -379,7 +386,7 @@ set_keepalive(_ClientId, _Interval) -> %% @private call_client(ClientId, Req) -> - Results = [call_client(Node, ClientId, Req) || Node <- mria:running_nodes()], + Results = [call_client(Node, ClientId, Req) || Node <- emqx:running_nodes()], Expected = lists:filter( fun ({error, _}) -> false; @@ -428,7 +435,7 @@ list_subscriptions(Node) -> list_subscriptions_via_topic(Topic, FormatFun) -> lists:append([ list_subscriptions_via_topic(Node, Topic, FormatFun) - || Node <- mria:running_nodes() + || Node <- emqx:running_nodes() ]). list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) -> @@ -442,7 +449,7 @@ list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) -> %%-------------------------------------------------------------------- subscribe(ClientId, TopicTables) -> - subscribe(mria:running_nodes(), ClientId, TopicTables). + subscribe(emqx:running_nodes(), ClientId, TopicTables). subscribe([Node | Nodes], ClientId, TopicTables) -> case unwrap_rpc(emqx_management_proto_v3:subscribe(Node, ClientId, TopicTables)) of @@ -467,7 +474,7 @@ publish(Msg) -> -spec unsubscribe(emqx_types:clientid(), emqx_types:topic()) -> {unsubscribe, _} | {error, channel_not_found}. unsubscribe(ClientId, Topic) -> - unsubscribe(mria:running_nodes(), ClientId, Topic). + unsubscribe(emqx:running_nodes(), ClientId, Topic). -spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) -> {unsubscribe, _} | {error, channel_not_found}. @@ -490,7 +497,7 @@ do_unsubscribe(ClientId, Topic) -> -spec unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) -> {unsubscribe, _} | {error, channel_not_found}. unsubscribe_batch(ClientId, Topics) -> - unsubscribe_batch(mria:running_nodes(), ClientId, Topics). + unsubscribe_batch(emqx:running_nodes(), ClientId, Topics). -spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) -> {unsubscribe_batch, _} | {error, channel_not_found}. @@ -515,7 +522,7 @@ do_unsubscribe_batch(ClientId, Topics) -> %%-------------------------------------------------------------------- get_alarms(Type) -> - [{Node, get_alarms(Node, Type)} || Node <- mria:running_nodes()]. + [{Node, get_alarms(Node, Type)} || Node <- emqx:running_nodes()]. get_alarms(Node, Type) -> add_duration_field(unwrap_rpc(emqx_proto_v1:get_alarms(Node, Type))). @@ -524,7 +531,7 @@ deactivate(Node, Name) -> unwrap_rpc(emqx_proto_v1:deactivate_alarm(Node, Name)). delete_all_deactivated_alarms() -> - [delete_all_deactivated_alarms(Node) || Node <- mria:running_nodes()]. + [delete_all_deactivated_alarms(Node) || Node <- emqx:running_nodes()]. delete_all_deactivated_alarms(Node) -> unwrap_rpc(emqx_proto_v1:delete_all_deactivated_alarms(Node)). diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index c77752f7d..8365b983c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -163,7 +163,7 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) -> {error, page_limit_invalid}; Meta -> {_CodCnt, NQString} = parse_qstring(QString, QSchema), - Nodes = mria:running_nodes(), + Nodes = emqx:running_nodes(), ResultAcc = init_query_result(), QueryState = init_query_state(Tab, NQString, MsFun, Meta), NResultAcc = do_cluster_query( diff --git a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl index 68bb6c81d..e74b6c362 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl @@ -101,7 +101,7 @@ cluster_info(get, _) -> ClusterName = application:get_env(ekka, cluster_name, emqxcl), Info = #{ name => ClusterName, - nodes => mria:running_nodes(), + nodes => emqx:running_nodes(), self => node() }, {200, Info}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index 41bca45cf..af203dfe9 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -258,7 +258,7 @@ configs(get, Params, _Req) -> QS = maps:get(query_string, Params, #{}), Node = maps:get(<<"node">>, QS, node()), case - lists:member(Node, mria:running_nodes()) andalso + lists:member(Node, emqx:running_nodes()) andalso emqx_management_proto_v2:get_full_config(Node) of false -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index d7f3ff321..de86700ef 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -483,7 +483,7 @@ err_msg_str(Reason) -> io_lib:format("~p", [Reason]). list_listeners() -> - [list_listeners(Node) || Node <- mria:running_nodes()]. + [list_listeners(Node) || Node <- emqx:running_nodes()]. list_listeners(Node) -> wrap_rpc(emqx_management_proto_v2:list_listeners(Node)). diff --git a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl index 1c5c8f62a..0fcc45d8e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl @@ -59,7 +59,7 @@ metrics(get, #{query_string := Qs}) -> maps:from_list( emqx_mgmt:get_metrics(Node) ++ [{node, Node}] ) - || Node <- mria:running_nodes() + || Node <- emqx:running_nodes() ], {200, Data} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_stats.erl b/apps/emqx_management/src/emqx_mgmt_api_stats.erl index 1e752aaac..5f4bbce65 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_stats.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_stats.erl @@ -127,21 +127,21 @@ list(get, #{query_string := Qs}) -> true -> {200, emqx_mgmt:get_stats()}; _ -> - Data = [ - maps:from_list(emqx_mgmt:get_stats(Node) ++ [{node, Node}]) - || Node <- running_nodes() - ], + Data = lists:foldl( + fun(Node, Acc) -> + case emqx_mgmt:get_stats(Node) of + {error, _Err} -> + Acc; + Stats when is_list(Stats) -> + Data = maps:from_list([{node, Node} | Stats]), + [Data | Acc] + end + end, + [], + emqx:running_nodes() + ), {200, Data} end. %%%============================================================================================== %% Internal - -running_nodes() -> - Nodes = erlang:nodes([visible, this]), - RpcResults = emqx_proto_v2:are_running(Nodes), - [ - Node - || {Node, IsRunning} <- lists:zip(Nodes, RpcResults), - IsRunning =:= {ok, true} - ]. diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index 5df641add..25cc2734f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -390,7 +390,7 @@ trace(get, _Params) -> fun(#{start_at := A}, #{start_at := B}) -> A > B end, emqx_trace:format(List0) ), - Nodes = mria:running_nodes(), + Nodes = emqx:running_nodes(), TraceSize = wrap_rpc(emqx_mgmt_trace_proto_v2:get_trace_size(Nodes)), AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize), Now = erlang:system_time(second), @@ -464,7 +464,7 @@ format_trace(Trace0) -> LogSize = lists:foldl( fun(Node, Acc) -> Acc#{Node => 0} end, #{}, - mria:running_nodes() + emqx:running_nodes() ), Trace2 = maps:without([enable, filter], Trace1), Trace2#{ @@ -560,13 +560,13 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) -> ). collect_trace_file(undefined, TraceLog) -> - Nodes = mria:running_nodes(), + Nodes = emqx:running_nodes(), wrap_rpc(emqx_mgmt_trace_proto_v2:trace_file(Nodes, TraceLog)); collect_trace_file(Node, TraceLog) -> wrap_rpc(emqx_mgmt_trace_proto_v2:trace_file([Node], TraceLog)). collect_trace_file_detail(TraceLog) -> - Nodes = mria:running_nodes(), + Nodes = emqx:running_nodes(), wrap_rpc(emqx_mgmt_trace_proto_v2:trace_file_detail(Nodes, TraceLog)). wrap_rpc({GoodRes, BadNodes}) -> @@ -696,7 +696,7 @@ parse_node(Query, Default) -> {ok, Default}; {ok, NodeBin} -> Node = binary_to_existing_atom(NodeBin), - true = lists:member(Node, mria:running_nodes()), + true = lists:member(Node, emqx:running_nodes()), {ok, Node} end catch diff --git a/apps/emqx_management/test/emqx_mgmt_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_SUITE.erl index 71b51b67c..3eb37060e 100644 --- a/apps/emqx_management/test/emqx_mgmt_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_SUITE.erl @@ -36,16 +36,16 @@ end_per_suite(_) -> emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]). init_per_testcase(TestCase, Config) -> - meck:expect(mria, running_nodes, 0, [node()]), + meck:expect(emqx, running_nodes, 0, [node()]), emqx_common_test_helpers:init_per_testcase(?MODULE, TestCase, Config). end_per_testcase(TestCase, Config) -> - meck:unload(mria), + meck:unload(emqx), emqx_common_test_helpers:end_per_testcase(?MODULE, TestCase, Config). t_list_nodes(init, Config) -> meck:expect( - mria, + emqx, cluster_nodes, fun (running) -> [node()]; @@ -125,7 +125,7 @@ t_lookup_client(_Config) -> emqx_mgmt:lookup_client({username, <<"user1">>}, ?FORMATFUN) ), ?assertEqual([], emqx_mgmt:lookup_client({clientid, <<"notfound">>}, ?FORMATFUN)), - meck:expect(mria, running_nodes, 0, [node(), 'fake@nonode']), + meck:expect(emqx, running_nodes, 0, [node(), 'fake@nonode']), ?assertMatch( [_ | {error, nodedown}], emqx_mgmt:lookup_client({clientid, <<"client1">>}, ?FORMATFUN) ). @@ -188,7 +188,7 @@ t_clean_cache(_Config) -> {error, _}, emqx_mgmt:clean_pem_cache_all() ), - meck:expect(mria, running_nodes, 0, [node(), 'fake@nonode']), + meck:expect(emqx, running_nodes, 0, [node(), 'fake@nonode']), ?assertMatch( {error, [{'fake@nonode', {error, _}}]}, emqx_mgmt:clean_authz_cache_all() diff --git a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl index bacec718d..a53ffc9c4 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl @@ -179,14 +179,14 @@ t_bad_rpc(_) -> ClientLs1 = [start_emqtt_client(node(), I, 1883) || I <- lists:seq(1, 10)], Path = emqx_mgmt_api_test_util:api_path(["clients?limit=2&page=2"]), try - meck:expect(mria, running_nodes, 0, ['fake@nohost']), + meck:expect(emqx, running_nodes, 0, ['fake@nohost']), {error, {_, 500, _}} = emqx_mgmt_api_test_util:request_api(get, Path), %% good cop, bad cop - meck:expect(mria, running_nodes, 0, [node(), 'fake@nohost']), + meck:expect(emqx, running_nodes, 0, [node(), 'fake@nohost']), {error, {_, 500, _}} = emqx_mgmt_api_test_util:request_api(get, Path) after _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1), - meck:unload(mria), + meck:unload(emqx), emqx_mgmt_api_test_util:end_suite() end. diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index 1638b815a..5a0116a4d 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -246,7 +246,7 @@ t_dashboard(_Config) -> t_configs_node({'init', Config}) -> Node = node(), - meck:expect(mria, running_nodes, fun() -> [Node, bad_node, other_node] end), + meck:expect(emqx, running_nodes, fun() -> [Node, bad_node, other_node] end), meck:expect( emqx_management_proto_v2, get_full_config, @@ -258,7 +258,7 @@ t_configs_node({'init', Config}) -> ), Config; t_configs_node({'end', _}) -> - meck:unload([mria, emqx_management_proto_v2]); + meck:unload([emqx, emqx_management_proto_v2]); t_configs_node(_) -> Node = atom_to_list(node()), diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index 62f689a84..33cb66eb2 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -168,8 +168,8 @@ t_api_listeners_list_not_ready(Config) when is_list(Config) -> L3 = get_tcp_listeners(Node2), Comment = #{ - node1 => rpc:call(Node1, mria, running_nodes, []), - node2 => rpc:call(Node2, mria, running_nodes, []) + node1 => rpc:call(Node1, emqx, running_nodes, []), + node2 => rpc:call(Node2, emqx, running_nodes, []) }, ?assert(length(L1) > length(L2), Comment), diff --git a/apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl index 4099426b8..2550bdbe2 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl @@ -24,10 +24,12 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + meck:expect(emqx, running_nodes, 0, [node(), 'fake@node']), emqx_mgmt_api_test_util:init_suite(), Config. end_per_suite(_) -> + meck:unload(emqx), emqx_mgmt_api_test_util:end_suite(). t_stats_api(_) -> diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 05d9508b6..d999f294e 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -599,8 +599,8 @@ emqx_cluster() -> ]. emqx_cluster_data() -> - Running = mria:cluster_nodes(running), - Stopped = mria:cluster_nodes(stopped), + Running = emqx:cluster_nodes(running), + Stopped = emqx:cluster_nodes(stopped), [ {nodes_running, length(Running)}, {nodes_stopped, length(Stopped)} diff --git a/changes/ce/fix-10369.en.md b/changes/ce/fix-10369.en.md new file mode 100644 index 000000000..3c32d33f3 --- /dev/null +++ b/changes/ce/fix-10369.en.md @@ -0,0 +1,6 @@ +Fix error in `/api/v5/monitor_current` API endpoint that happens when some EMQX nodes are down. + +Prior to this fix, sometimes the request returned HTTP code 500 and the following message: +``` +{"code":"INTERNAL_ERROR","message":"error, badarg, [{erlang,'++',[{error,nodedown},[{node,'emqx@10.42.0.150'}]], ... +```