From 96b26bf4ce7ce471eca1a6bf6c070820f30fcefb Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 8 Apr 2022 10:12:40 +0800 Subject: [PATCH] fix: discard rpc call failed data --- .../src/emqx_rule_engine_api.erl | 30 ++++++++++++------- .../src/emqx_rule_engine_cli.erl | 27 ++++++++++++----- 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index a6220ee9f..c8d073a28 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -323,13 +323,13 @@ show_resource(#{id := Id}, _Params) -> case emqx_rule_registry:find_resource(Id) of {ok, R} -> Status = - [begin - St = case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of - {ok, St0} -> St0; - {error, _} -> #{is_alive => false} - end, - maps:put(node, Node, St) - end || Node <- ekka_mnesia:running_nodes()], + lists:flatten( + [ case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of + {badrpc, _} -> []; + {ok, St} -> [maps:put(node, Node, St)]; + {error, _} -> [maps:put(node, Node, #{is_alive => false})] + end + || Node <- ekka_mnesia:running_nodes()]), return({ok, maps:put(status, Status, record_to_map(R))}); not_found -> return({error, 404, <<"Not Found">>}) @@ -575,9 +575,17 @@ sort_by(Pos, TplList) -> end, TplList). get_rule_metrics(Id) -> - [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id])) - || Node <- ekka_mnesia:running_nodes()]. + lists:flatten( + [ case rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]) of + {badrpc, _} -> []; + Res -> [maps:put(node, Node, Res)] + end + || Node <- ekka_mnesia:running_nodes()]). get_action_metrics(Id) -> - [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id])) - || Node <- ekka_mnesia:running_nodes()]. + lists:flatten( + [ case rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]) of + {badrpc, _} -> []; + Res -> [maps:put(node, Node, Res)] + end + || Node <- ekka_mnesia:running_nodes()]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl index bcb869aec..a58bfc0d1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl @@ -273,10 +273,13 @@ format(#resource{id = Id, config = Config, description = Descr}) -> Status = - [begin - {ok, St} = rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]), - maps:put(node, Node, St) - end || Node <- [node()| nodes()]], + lists:flatten( + [ case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of + {badrpc, _} -> []; + {ok, St} -> [maps:put(node, Node, St)]; + {error, _} -> [maps:put(node, Node, #{is_alive => false})] + end + || Node <- ekka_mnesia:running_nodes()]), lists:flatten(io_lib:format("resource(id='~s', type='~s', config=~0p, status=~0p, description='~s')~n", [Id, Type, Config, Status, Descr])); format(#resource_type{name = Name, @@ -369,12 +372,20 @@ get_actions() -> emqx_rule_registry:get_actions(). get_rule_metrics(Id) -> - [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id])) - || Node <- [node()| nodes()]]. + lists:flatten( + [ case rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]) of + {badrpc, _} -> []; + Res -> [maps:put(node, Node, Res)] + end + || Node <- ekka_mnesia:running_nodes()]). get_action_metrics(Id) -> - [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id])) - || Node <- [node()| nodes()]]. + lists:flatten( + [ case rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]) of + {badrpc, _} -> []; + Res -> [maps:put(node, Node, Res)] + end + || Node <- ekka_mnesia:running_nodes()]). on_failed(continue) -> continue; on_failed(stop) -> stop;