refactor(cluster link api): return erpc errors in status and metrics responses

This commit is contained in:
Thales Macedo Garitezi 2024-07-24 12:06:48 -03:00
parent 79db2e6d7f
commit 34f5a886ce
4 changed files with 109 additions and 104 deletions

View File

@ -201,9 +201,15 @@ fields(node_metrics) ->
handle_list() -> handle_list() ->
Links = get_raw(), Links = get_raw(),
NodeResults = get_all_link_status_cluster(), NodeRPCResults = emqx_cluster_link_mqtt:get_all_resources_cluster(),
NameToStatus = collect_all_status(NodeResults), {NameToStatus, Errors} = collect_all_status(NodeRPCResults),
EmptyStatus = #{status => inconsistent, node_status => []}, NodeErrors = lists:map(
fun({Node, Error}) ->
#{node => Node, status => inconsistent, reason => Error}
end,
Errors
),
EmptyStatus = #{status => inconsistent, node_status => NodeErrors},
Response = Response =
lists:map( lists:map(
fun(#{<<"name">> := Name} = Link) -> fun(#{<<"name">> := Name} = Link) ->
@ -227,25 +233,32 @@ handle_lookup(Name, Link) ->
?OK(add_status(Name, Link)). ?OK(add_status(Name, Link)).
handle_metrics(Name) -> handle_metrics(Name) ->
case emqx_cluster_link_metrics:get_metrics(Name) of Results = emqx_cluster_link_metrics:get_metrics(Name),
{error, BadResults} -> {NodeMetrics0, NodeErrors} =
lists:foldl(
fun
({Node, {ok, Metrics}}, {OkAccIn, ErrAccIn}) ->
{[format_metrics(Node, Metrics) | OkAccIn], ErrAccIn};
({Node, Error}, {OkAccIn, ErrAccIn}) ->
{OkAccIn, [{Node, Error} | ErrAccIn]}
end,
{[], []},
Results
),
case NodeErrors of
[] ->
ok;
[_ | _] ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "cluster_link_api_metrics_bad_erpc_results", msg => "cluster_link_api_metrics_bad_erpc_results",
results => BadResults errors => maps:from_list(NodeErrors)
}), })
?OK(#{metrics => #{}, node_metrics => []}); end,
{ok, NodeResults} -> NodeMetrics1 = lists:map(fun({Node, _Error}) -> format_metrics(Node, #{}) end, NodeErrors),
NodeMetrics = NodeMetrics = NodeMetrics1 ++ NodeMetrics0,
lists:map( AggregatedMetrics = aggregate_metrics(NodeMetrics),
fun({Node, Metrics}) -> Response = #{metrics => AggregatedMetrics, node_metrics => NodeMetrics},
format_metrics(Node, Metrics) ?OK(Response).
end,
NodeResults
),
AggregatedMetrics = aggregate_metrics(NodeMetrics),
Response = #{metrics => AggregatedMetrics, node_metrics => NodeMetrics},
?OK(Response)
end.
aggregate_metrics(NodeMetrics) -> aggregate_metrics(NodeMetrics) ->
ErrorLogger = fun(_) -> ok end, ErrorLogger = fun(_) -> ok end,
@ -267,8 +280,8 @@ format_metrics(Node, Metrics) ->
}. }.
add_status(Name, Link) -> add_status(Name, Link) ->
NodeResults = get_link_status_cluster(Name), NodeRPCResults = emqx_cluster_link_mqtt:get_resource_cluster(Name),
Status = collect_single_status(NodeResults), Status = collect_single_status(NodeRPCResults),
maps:merge(Link, Status). maps:merge(Link, Status).
handle_update(Name, Params0) -> handle_update(Name, Params0) ->
@ -289,64 +302,62 @@ get_raw() ->
), ),
Links. Links.
get_all_link_status_cluster() -> -spec collect_all_status([{node(), {ok, #{cluster_name() => _}} | _Error}]) ->
case emqx_cluster_link_mqtt:get_all_resources_cluster() of {ClusterToStatus, Errors}
{error, BadResults} -> when
?SLOG(warning, #{ ClusterToStatus :: #{
msg => "cluster_link_api_all_status_bad_erpc_results",
results => BadResults
}),
[];
{ok, NodeResults} ->
NodeResults
end.
get_link_status_cluster(Name) ->
case emqx_cluster_link_mqtt:get_resource_cluster(Name) of
{error, BadResults} ->
?SLOG(warning, #{
msg => "cluster_link_api_lookup_status_bad_erpc_results",
results => BadResults
}),
[];
{ok, NodeResults} ->
NodeResults
end.
-spec collect_all_status([{node(), #{cluster_name() => _}}]) ->
#{
cluster_name() => #{ cluster_name() => #{
node := node(), node := node(),
status := emqx_resource:resource_status() | inconsistent status := emqx_resource:resource_status() | inconsistent
} }
}. },
Errors :: [{node(), term()}].
collect_all_status(NodeResults) -> collect_all_status(NodeResults) ->
Reindexed = lists:foldl( {Reindexed, Errors} = lists:foldl(
fun({Node, AllLinkData}, Acc) -> fun
maps:fold( ({Node, {ok, AllLinkData}}, {OkAccIn, ErrAccIn}) ->
fun(Name, Data, AccIn) -> OkAcc = maps:fold(
collect_all_status1(Node, Name, Data, AccIn) fun(Name, Data, AccIn) ->
end, collect_all_status1(Node, Name, Data, AccIn)
Acc, end,
AllLinkData OkAccIn,
) AllLinkData
),
{OkAcc, ErrAccIn};
({Node, Error}, {OkAccIn, ErrAccIn}) ->
{OkAccIn, [{Node, Error} | ErrAccIn]}
end, end,
#{}, {#{}, []},
NodeResults NodeResults
), ),
maps:fold( NoErrors =
case Errors of
[] ->
true;
[_ | _] ->
?SLOG(warning, #{
msg => "cluster_link_api_lookup_status_bad_erpc_results",
errors => Errors
}),
false
end,
ClusterToStatus = maps:fold(
fun(Name, NodeToData, Acc) -> fun(Name, NodeToData, Acc) ->
OnlyStatus = [S || #{status := S} <- maps:values(NodeToData)], OnlyStatus = [S || #{status := S} <- maps:values(NodeToData)],
SummaryStatus = SummaryStatus =
case lists:usort(OnlyStatus) of case lists:usort(OnlyStatus) of
[SameStatus] -> SameStatus; [SameStatus] when NoErrors -> SameStatus;
_ -> inconsistent _ -> inconsistent
end, end,
NodeStatus = lists:map( NodeStatus = lists:map(
fun({Node, #{status := S}}) -> fun
#{node => Node, status => S} ({Node, #{status := S}}) ->
#{node => Node, status => S};
({Node, Error0}) ->
Error = emqx_logger_jsonfmt:best_effort_json(Error0),
#{node => Node, status => inconsistent, reason => Error}
end, end,
maps:to_list(NodeToData) maps:to_list(NodeToData) ++ Errors
), ),
Acc#{ Acc#{
Name => #{ Name => #{
@ -357,7 +368,8 @@ collect_all_status(NodeResults) ->
end, end,
#{}, #{},
Reindexed Reindexed
). ),
{ClusterToStatus, Errors}.
collect_all_status1(Node, Name, Data, Acc) -> collect_all_status1(Node, Name, Data, Acc) ->
maps:update_with( maps:update_with(
@ -371,12 +383,13 @@ collect_single_status(NodeResults) ->
NodeStatus = NodeStatus =
lists:map( lists:map(
fun fun
({Node, {ok, #{status := S}}}) -> ({Node, {ok, {ok, #{status := S}}}}) ->
#{node => Node, status => S}; #{node => Node, status => S};
({Node, {error, _}}) -> ({Node, {ok, {error, _}}}) ->
#{node => Node, status => ?status_disconnected}; #{node => Node, status => ?status_disconnected};
({Node, _}) -> ({Node, Error0}) ->
#{node => Node, status => inconsistent} Error = emqx_logger_jsonfmt:best_effort_json(Error0),
#{node => Node, status => inconsistent, reason => Error}
end, end,
NodeResults NodeResults
), ),

View File

@ -31,7 +31,7 @@ get_metrics(ClusterName) ->
Nodes = emqx:running_nodes(), Nodes = emqx:running_nodes(),
Timeout = 15_000, Timeout = 15_000,
Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout), Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout),
sequence_multicall_results(Nodes, Results). lists:zip(Nodes, Results).
maybe_create_metrics(ClusterName) -> maybe_create_metrics(ClusterName) ->
case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of
@ -52,16 +52,3 @@ routes_inc(ClusterName, Val) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec sequence_multicall_results([node()], emqx_rpc:erpc_multicall(term())) ->
{ok, [{node(), term()}]} | {error, [term()]}.
sequence_multicall_results(Nodes, Results) ->
case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
{OkResults, []} ->
{ok, [{Node, Res} || {Node, {ok, Res}} <- OkResults]};
{_OkResults, BadResults} ->
{error, BadResults}
end.
is_ok({_Node, {ok, _}}) -> true;
is_ok(_) -> false.

View File

@ -106,20 +106,18 @@ remove_msg_fwd_resource(ClusterName) ->
emqx_resource:remove_local(?MSG_RES_ID(ClusterName)). emqx_resource:remove_local(?MSG_RES_ID(ClusterName)).
-spec get_all_resources_cluster() -> -spec get_all_resources_cluster() ->
{ok, [{node(), #{cluster_name() => emqx_resource:resource_data()}}]} [{node(), emqx_rpc:erpc(#{cluster_name() => emqx_resource:resource_data()})}].
| {error, [term()]}.
get_all_resources_cluster() -> get_all_resources_cluster() ->
Nodes = emqx:running_nodes(), Nodes = emqx:running_nodes(),
Results = emqx_cluster_link_proto_v1:get_all_resources(Nodes), Results = emqx_cluster_link_proto_v1:get_all_resources(Nodes),
sequence_multicall_results(Nodes, Results). lists:zip(Nodes, Results).
-spec get_resource_cluster(cluster_name()) -> -spec get_resource_cluster(cluster_name()) ->
{ok, [{node(), {ok, emqx_resource:resource_data()} | {error, not_found}}]} [{node(), {ok, {ok, emqx_resource:resource_data()} | {error, not_found}} | _Error}].
| {error, [term()]}.
get_resource_cluster(ClusterName) -> get_resource_cluster(ClusterName) ->
Nodes = emqx:running_nodes(), Nodes = emqx:running_nodes(),
Results = emqx_cluster_link_proto_v1:get_resource(Nodes, ClusterName), Results = emqx_cluster_link_proto_v1:get_resource(Nodes, ClusterName),
sequence_multicall_results(Nodes, Results). lists:zip(Nodes, Results).
%% RPC Target in `emqx_cluster_link_proto_v1'. %% RPC Target in `emqx_cluster_link_proto_v1'.
-spec get_resource_local_v1(cluster_name()) -> -spec get_resource_local_v1(cluster_name()) ->
@ -478,16 +476,3 @@ emqtt_client_opts(ClientIdSuffix, ClusterConf) ->
#{clientid := BaseClientId} = Opts = emqx_cluster_link_config:mk_emqtt_options(ClusterConf), #{clientid := BaseClientId} = Opts = emqx_cluster_link_config:mk_emqtt_options(ClusterConf),
ClientId = emqx_bridge_mqtt_lib:clientid_base([BaseClientId, ClientIdSuffix]), ClientId = emqx_bridge_mqtt_lib:clientid_base([BaseClientId, ClientIdSuffix]),
Opts#{clientid => ClientId}. Opts#{clientid => ClientId}.
-spec sequence_multicall_results([node()], emqx_rpc:erpc_multicall(term())) ->
{ok, [{node(), term()}]} | {error, [term()]}.
sequence_multicall_results(Nodes, Results) ->
case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
{OkResults, []} ->
{ok, [{Node, Res} || {Node, {ok, Res}} <- OkResults]};
{_OkResults, BadResults} ->
{error, BadResults}
end.
is_ok({_Node, {ok, _}}) -> true;
is_ok(_) -> false.

View File

@ -409,7 +409,17 @@ t_status(Config) ->
{200, [ {200, [
#{ #{
<<"status">> := <<"inconsistent">>, <<"status">> := <<"inconsistent">>,
<<"node_status">> := [] <<"node_status">> := [
#{
<<"node">> := _,
<<"status">> := <<"connected">>
},
#{
<<"node">> := _,
<<"status">> := <<"inconsistent">>,
<<"reason">> := _
}
]
} }
]}, ]},
list() list()
@ -417,7 +427,17 @@ t_status(Config) ->
?assertMatch( ?assertMatch(
{200, #{ {200, #{
<<"status">> := <<"inconsistent">>, <<"status">> := <<"inconsistent">>,
<<"node_status">> := [] <<"node_status">> := [
#{
<<"node">> := _,
<<"status">> := <<"connected">>
},
#{
<<"node">> := _,
<<"status">> := <<"inconsistent">>,
<<"reason">> := _
}
]
}}, }},
get_link(Name) get_link(Name)
), ),