From 34f5a886cea0caa0d9bfa5631ac36fdcf261b9ec Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 Jul 2024 12:06:48 -0300 Subject: [PATCH] refactor(cluster link api): return erpc errors in status and metrics responses --- .../src/emqx_cluster_link_api.erl | 151 ++++++++++-------- .../src/emqx_cluster_link_metrics.erl | 15 +- .../src/emqx_cluster_link_mqtt.erl | 23 +-- .../test/emqx_cluster_link_api_SUITE.erl | 24 ++- 4 files changed, 109 insertions(+), 104 deletions(-) diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl index 5eca0a944..257a19854 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl @@ -201,9 +201,15 @@ fields(node_metrics) -> handle_list() -> Links = get_raw(), - NodeResults = get_all_link_status_cluster(), - NameToStatus = collect_all_status(NodeResults), - EmptyStatus = #{status => inconsistent, node_status => []}, + NodeRPCResults = emqx_cluster_link_mqtt:get_all_resources_cluster(), + {NameToStatus, Errors} = collect_all_status(NodeRPCResults), + NodeErrors = lists:map( + fun({Node, Error}) -> + #{node => Node, status => inconsistent, reason => Error} + end, + Errors + ), + EmptyStatus = #{status => inconsistent, node_status => NodeErrors}, Response = lists:map( fun(#{<<"name">> := Name} = Link) -> @@ -227,25 +233,32 @@ handle_lookup(Name, Link) -> ?OK(add_status(Name, Link)). handle_metrics(Name) -> - case emqx_cluster_link_metrics:get_metrics(Name) of - {error, BadResults} -> + Results = emqx_cluster_link_metrics:get_metrics(Name), + {NodeMetrics0, NodeErrors} = + lists:foldl( + fun + ({Node, {ok, Metrics}}, {OkAccIn, ErrAccIn}) -> + {[format_metrics(Node, Metrics) | OkAccIn], ErrAccIn}; + ({Node, Error}, {OkAccIn, ErrAccIn}) -> + {OkAccIn, [{Node, Error} | ErrAccIn]} + end, + {[], []}, + Results + ), + case NodeErrors of + [] -> + ok; + [_ | _] -> ?SLOG(warning, #{ msg => "cluster_link_api_metrics_bad_erpc_results", - results => BadResults - }), - ?OK(#{metrics => #{}, node_metrics => []}); - {ok, NodeResults} -> - NodeMetrics = - lists:map( - fun({Node, Metrics}) -> - format_metrics(Node, Metrics) - end, - NodeResults - ), - AggregatedMetrics = aggregate_metrics(NodeMetrics), - Response = #{metrics => AggregatedMetrics, node_metrics => NodeMetrics}, - ?OK(Response) - end. + errors => maps:from_list(NodeErrors) + }) + end, + NodeMetrics1 = lists:map(fun({Node, _Error}) -> format_metrics(Node, #{}) end, NodeErrors), + NodeMetrics = NodeMetrics1 ++ NodeMetrics0, + AggregatedMetrics = aggregate_metrics(NodeMetrics), + Response = #{metrics => AggregatedMetrics, node_metrics => NodeMetrics}, + ?OK(Response). aggregate_metrics(NodeMetrics) -> ErrorLogger = fun(_) -> ok end, @@ -267,8 +280,8 @@ format_metrics(Node, Metrics) -> }. add_status(Name, Link) -> - NodeResults = get_link_status_cluster(Name), - Status = collect_single_status(NodeResults), + NodeRPCResults = emqx_cluster_link_mqtt:get_resource_cluster(Name), + Status = collect_single_status(NodeRPCResults), maps:merge(Link, Status). handle_update(Name, Params0) -> @@ -289,64 +302,62 @@ get_raw() -> ), Links. -get_all_link_status_cluster() -> - case emqx_cluster_link_mqtt:get_all_resources_cluster() of - {error, BadResults} -> - ?SLOG(warning, #{ - msg => "cluster_link_api_all_status_bad_erpc_results", - results => BadResults - }), - []; - {ok, NodeResults} -> - NodeResults - end. - -get_link_status_cluster(Name) -> - case emqx_cluster_link_mqtt:get_resource_cluster(Name) of - {error, BadResults} -> - ?SLOG(warning, #{ - msg => "cluster_link_api_lookup_status_bad_erpc_results", - results => BadResults - }), - []; - {ok, NodeResults} -> - NodeResults - end. - --spec collect_all_status([{node(), #{cluster_name() => _}}]) -> - #{ +-spec collect_all_status([{node(), {ok, #{cluster_name() => _}} | _Error}]) -> + {ClusterToStatus, Errors} +when + ClusterToStatus :: #{ cluster_name() => #{ node := node(), status := emqx_resource:resource_status() | inconsistent } - }. + }, + Errors :: [{node(), term()}]. collect_all_status(NodeResults) -> - Reindexed = lists:foldl( - fun({Node, AllLinkData}, Acc) -> - maps:fold( - fun(Name, Data, AccIn) -> - collect_all_status1(Node, Name, Data, AccIn) - end, - Acc, - AllLinkData - ) + {Reindexed, Errors} = lists:foldl( + fun + ({Node, {ok, AllLinkData}}, {OkAccIn, ErrAccIn}) -> + OkAcc = maps:fold( + fun(Name, Data, AccIn) -> + collect_all_status1(Node, Name, Data, AccIn) + end, + OkAccIn, + AllLinkData + ), + {OkAcc, ErrAccIn}; + ({Node, Error}, {OkAccIn, ErrAccIn}) -> + {OkAccIn, [{Node, Error} | ErrAccIn]} end, - #{}, + {#{}, []}, NodeResults ), - maps:fold( + NoErrors = + case Errors of + [] -> + true; + [_ | _] -> + ?SLOG(warning, #{ + msg => "cluster_link_api_lookup_status_bad_erpc_results", + errors => Errors + }), + false + end, + ClusterToStatus = maps:fold( fun(Name, NodeToData, Acc) -> OnlyStatus = [S || #{status := S} <- maps:values(NodeToData)], SummaryStatus = case lists:usort(OnlyStatus) of - [SameStatus] -> SameStatus; + [SameStatus] when NoErrors -> SameStatus; _ -> inconsistent end, NodeStatus = lists:map( - fun({Node, #{status := S}}) -> - #{node => Node, status => S} + fun + ({Node, #{status := S}}) -> + #{node => Node, status => S}; + ({Node, Error0}) -> + Error = emqx_logger_jsonfmt:best_effort_json(Error0), + #{node => Node, status => inconsistent, reason => Error} end, - maps:to_list(NodeToData) + maps:to_list(NodeToData) ++ Errors ), Acc#{ Name => #{ @@ -357,7 +368,8 @@ collect_all_status(NodeResults) -> end, #{}, Reindexed - ). + ), + {ClusterToStatus, Errors}. collect_all_status1(Node, Name, Data, Acc) -> maps:update_with( @@ -371,12 +383,13 @@ collect_single_status(NodeResults) -> NodeStatus = lists:map( fun - ({Node, {ok, #{status := S}}}) -> + ({Node, {ok, {ok, #{status := S}}}}) -> #{node => Node, status => S}; - ({Node, {error, _}}) -> + ({Node, {ok, {error, _}}}) -> #{node => Node, status => ?status_disconnected}; - ({Node, _}) -> - #{node => Node, status => inconsistent} + ({Node, Error0}) -> + Error = emqx_logger_jsonfmt:best_effort_json(Error0), + #{node => Node, status => inconsistent, reason => Error} end, NodeResults ), diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl index 3d6f1edc8..695419c50 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl @@ -31,7 +31,7 @@ get_metrics(ClusterName) -> Nodes = emqx:running_nodes(), Timeout = 15_000, Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout), - sequence_multicall_results(Nodes, Results). + lists:zip(Nodes, Results). maybe_create_metrics(ClusterName) -> case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of @@ -52,16 +52,3 @@ routes_inc(ClusterName, Val) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - --spec sequence_multicall_results([node()], emqx_rpc:erpc_multicall(term())) -> - {ok, [{node(), term()}]} | {error, [term()]}. -sequence_multicall_results(Nodes, Results) -> - case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of - {OkResults, []} -> - {ok, [{Node, Res} || {Node, {ok, Res}} <- OkResults]}; - {_OkResults, BadResults} -> - {error, BadResults} - end. - -is_ok({_Node, {ok, _}}) -> true; -is_ok(_) -> false. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index 3b37a304e..65e02f53e 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -106,20 +106,18 @@ remove_msg_fwd_resource(ClusterName) -> emqx_resource:remove_local(?MSG_RES_ID(ClusterName)). -spec get_all_resources_cluster() -> - {ok, [{node(), #{cluster_name() => emqx_resource:resource_data()}}]} - | {error, [term()]}. + [{node(), emqx_rpc:erpc(#{cluster_name() => emqx_resource:resource_data()})}]. get_all_resources_cluster() -> Nodes = emqx:running_nodes(), Results = emqx_cluster_link_proto_v1:get_all_resources(Nodes), - sequence_multicall_results(Nodes, Results). + lists:zip(Nodes, Results). -spec get_resource_cluster(cluster_name()) -> - {ok, [{node(), {ok, emqx_resource:resource_data()} | {error, not_found}}]} - | {error, [term()]}. + [{node(), {ok, {ok, emqx_resource:resource_data()} | {error, not_found}} | _Error}]. get_resource_cluster(ClusterName) -> Nodes = emqx:running_nodes(), Results = emqx_cluster_link_proto_v1:get_resource(Nodes, ClusterName), - sequence_multicall_results(Nodes, Results). + lists:zip(Nodes, Results). %% RPC Target in `emqx_cluster_link_proto_v1'. -spec get_resource_local_v1(cluster_name()) -> @@ -478,16 +476,3 @@ emqtt_client_opts(ClientIdSuffix, ClusterConf) -> #{clientid := BaseClientId} = Opts = emqx_cluster_link_config:mk_emqtt_options(ClusterConf), ClientId = emqx_bridge_mqtt_lib:clientid_base([BaseClientId, ClientIdSuffix]), Opts#{clientid => ClientId}. - --spec sequence_multicall_results([node()], emqx_rpc:erpc_multicall(term())) -> - {ok, [{node(), term()}]} | {error, [term()]}. -sequence_multicall_results(Nodes, Results) -> - case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of - {OkResults, []} -> - {ok, [{Node, Res} || {Node, {ok, Res}} <- OkResults]}; - {_OkResults, BadResults} -> - {error, BadResults} - end. - -is_ok({_Node, {ok, _}}) -> true; -is_ok(_) -> false. diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl index 0e5143865..6b469272a 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl @@ -409,7 +409,17 @@ t_status(Config) -> {200, [ #{ <<"status">> := <<"inconsistent">>, - <<"node_status">> := [] + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"inconsistent">>, + <<"reason">> := _ + } + ] } ]}, list() @@ -417,7 +427,17 @@ t_status(Config) -> ?assertMatch( {200, #{ <<"status">> := <<"inconsistent">>, - <<"node_status">> := [] + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"inconsistent">>, + <<"reason">> := _ + } + ] }}, get_link(Name) ),