From 86e8de4737edcbd240f72314cbaa3d9b27bb4fed Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 17 May 2022 09:49:19 +0800 Subject: [PATCH 1/6] fix: merge main-4.3 --- .../src/emqx_rule_engine.appup.src | 6 ++- .../emqx_rule_engine/src/emqx_rule_engine.erl | 52 ++++++++++++++++--- .../src/emqx_rule_engine_api.erl | 16 +++--- 3 files changed, 59 insertions(+), 15 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 25438c828..24a4a5358 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -6,7 +6,9 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -136,6 +138,8 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.8", [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 477c5ad9a..53bd4c1b6 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -38,6 +38,8 @@ , start_resource/1 , get_resource_status/1 , is_source_alive/1 + , is_source_alive/2 + , is_source_alive/3 , get_resource_params/1 , ensure_resource_deleted/1 , delete_resource/1 @@ -340,7 +342,7 @@ test_resource(#{type := Type} = Params) -> try case create_resource(maps:put(id, ResId, Params), no_retry) of {ok, _} -> - case is_source_alive(ResId) of + case is_source_alive(ResId, #{fetch => true}) of true -> ok; false -> @@ -363,15 +365,53 @@ test_resource(#{type := Type} = Params) -> end. is_source_alive(ResId) -> - case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, get_resource_status, [ResId], 5000) of - {ResL, []} -> - is_source_alive_(ResL); - {_, _Errors} -> - false + is_source_alive(ResId, #{fetch => false}). + +is_source_alive(ResId, Opts) -> + is_source_alive(ekka_mnesia:running_nodes(), ResId, Opts). + +-spec(is_source_alive(list(node()) | node(), resource_id(), #{fetch := boolean()}) -> boolean()). +is_source_alive(Node, ResId, Opts) when is_atom(Node) -> + is_source_alive([Node], ResId, Opts); +is_source_alive(Nodes, ResId, _Opts = #{fetch := true}) -> + try + case emqx_rule_registry:find_resource(ResId) of + {ok, #resource{type = ResType}} -> + {ok, #resource_type{on_status = {Mod, OnStatus}}} + = emqx_rule_registry:find_resource_type(ResType), + case rpc:multicall(Nodes, + ?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], 5000) of + {ResL, []} -> + is_source_alive_(ResL); + {_, _Error} -> + false + end; + not_found -> + false + end + catch E:R:S -> + ?LOG(warning, "is_source_alive failed, ~0p:~0p ~0p", [E, R, S]), + false + end; +is_source_alive(Nodes, ResId, _Opts = #{fetch := false}) -> + try + case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], 5000) of + {ResL, []} -> + is_source_alive_(ResL); + {_, _Errors} -> + false + end + catch E:R:S -> + ?LOG(warning, "is_source_alive failed, ~0p:~0p ~0p", [E, R, S]), + false end. +%% fetch_resource_status -> #{is_alive => boolean()} +%% get_resource_status -> {ok #{is_alive => boolean()}} is_source_alive_([]) -> true; +is_source_alive_([#{is_alive := true} | ResL]) -> is_source_alive_(ResL); is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL); +is_source_alive_([#{is_alive := false} | _ResL]) -> false; is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false; is_source_alive_([_Error | _ResL]) -> false. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 3430bc98d..c196e03f7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -332,14 +332,14 @@ list_resources_by_type(#{type := Type}, _Params) -> show_resource(#{id := Id}, _Params) -> case emqx_rule_registry:find_resource(Id) of {ok, R} -> - Status = - lists:concat( - [ case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of - {badrpc, _} -> []; - {ok, St} -> [maps:put(node, Node, St)]; - {error, _} -> [maps:put(node, Node, #{is_alive => false})] - end - || Node <- ekka_mnesia:running_nodes()]), + StatusFun = + fun(Node) -> + #{ + node => Node, + is_alive => emqx_rule_engine:is_source_alive(Node, Id, #{fetch => false}) + } + end, + Status = [StatusFun(Node) || Node <- ekka_mnesia:running_nodes()], return({ok, maps:put(status, Status, record_to_map(R))}); not_found -> return({error, 404, <<"Not Found">>}) From eb5956316a39ffb7c39a957005e4660f667f530d Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 17 May 2022 11:27:55 +0800 Subject: [PATCH 2/6] fix(emqx_rule_engine): export func for rpc --- apps/emqx_rule_engine/src/emqx_rule_engine.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 53bd4c1b6..c3f9728bd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -57,6 +57,9 @@ -export([ restore_action_metrics/2 ]). +-export([ fetch_resource_status/3 + ]). + -type(rule() :: #rule{}). -type(action() :: #action{}). -type(resource() :: #resource{}). @@ -407,7 +410,7 @@ is_source_alive(Nodes, ResId, _Opts = #{fetch := false}) -> end. %% fetch_resource_status -> #{is_alive => boolean()} -%% get_resource_status -> {ok #{is_alive => boolean()}} +%% get_resource_status -> {ok, #{is_alive => boolean()}} is_source_alive_([]) -> true; is_source_alive_([#{is_alive := true} | ResL]) -> is_source_alive_(ResL); is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL); From a5716318b6391606233122bb609843409e840529 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 18 May 2022 09:59:35 +0800 Subject: [PATCH 3/6] fix: better code format --- apps/emqx_rule_engine/src/emqx_rule_engine.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index c3f9728bd..238ce91d3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -413,8 +413,8 @@ is_source_alive(Nodes, ResId, _Opts = #{fetch := false}) -> %% get_resource_status -> {ok, #{is_alive => boolean()}} is_source_alive_([]) -> true; is_source_alive_([#{is_alive := true} | ResL]) -> is_source_alive_(ResL); -is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL); is_source_alive_([#{is_alive := false} | _ResL]) -> false; +is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL); is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false; is_source_alive_([_Error | _ResL]) -> false. From f3bef3c81c56d5ff0009dd7731978d2a10dd9616 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 18 May 2022 10:37:54 +0800 Subject: [PATCH 4/6] fix(rule_engine): remove resource with clean alarms --- apps/emqx_rule_engine/src/emqx_rule_engine.erl | 16 ++++++++++++---- .../test/emqx_rule_engine_SUITE.erl | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 238ce91d3..24c0bed56 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -48,7 +48,7 @@ -export([ init_resource/4 , init_action/4 - , clear_resource/3 + , clear_resource/4 , clear_rule/1 , clear_actions/1 , clear_action/3 @@ -60,6 +60,10 @@ -export([ fetch_resource_status/3 ]). +-ifdef(TEST). +-export([alarm_name_of_resource_down/2]). +-endif. + -type(rule() :: #rule{}). -type(action() :: #action{}). -type(resource() :: #resource{}). @@ -450,7 +454,7 @@ delete_resource(ResId) -> try case emqx_rule_registry:remove_resource(ResId) of ok -> - _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), + _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId, ResType]), ok; {error, _} = R -> R end @@ -652,9 +656,13 @@ init_action(Module, OnCreate, ActionInstId, Params) -> #action_instance_params{id = ActionInstId, params = Params, apply = Apply}) end. -clear_resource(_Module, undefined, ResId) -> +clear_resource(_Module, undefined, Type, ResId) -> + Name = alarm_name_of_resource_down(Type, ResId), + _ = emqx_alarm:deactivate(Name), ok = emqx_rule_registry:remove_resource_params(ResId); -clear_resource(Module, Destroy, ResId) -> +clear_resource(Module, Destroy, Type, ResId) -> + Name = alarm_name_of_resource_down(Type, ResId), + _ = emqx_alarm:deactivate(Name), case emqx_rule_registry:find_resource_params(ResId) of {ok, #resource_params{params = Params}} -> ?RAISE(Module:Destroy(ResId, Params), diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 442865bb4..9d40b9876 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -327,6 +327,24 @@ t_create_resource(_Config) -> emqx_rule_registry:remove_resource(ResId), ok. +t_clean_resource_alarms(_Config) -> + ok = emqx_rule_engine:load_providers(), + {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( + #{type => built_in, + config => #{}, + description => <<"debug resource">>}), + ?assert(true, is_binary(ResId)), + Name = emqx_rule_engine:alarm_name_of_resource_down(built_in, ResId), + _ = emqx_alarm:activate(Name, #{id => ResId, type => built_in}), + AlarmExist = fun(#{name := AName}) -> AName == Name end, + Len = length(lists:filter(AlarmExist, emqx_alarm:get_alarms())), + ?assert(Len == 1), + ok = emqx_rule_engine:unload_providers(), + emqx_rule_registry:remove_resource(ResId), + LenAfterRemove = length(lists:filter(AlarmExist, emqx_alarm:get_alarms())), + ?assert(LenAfterRemove == 0), + ok. + %%------------------------------------------------------------------------------ %% Test cases for rule actions %%------------------------------------------------------------------------------ From a67dff456807dc68d75845146a9bb13cb155a337 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 18 May 2022 11:14:12 +0800 Subject: [PATCH 5/6] fix(rule_engine): better function name for resource --- .../emqx_rule_engine/src/emqx_rule_engine.erl | 48 +++++++++---------- .../src/emqx_rule_engine_api.erl | 4 +- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 24c0bed56..312fc9e7d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -37,9 +37,9 @@ , test_resource/1 , start_resource/1 , get_resource_status/1 - , is_source_alive/1 - , is_source_alive/2 - , is_source_alive/3 + , is_resource_alive/1 + , is_resource_alive/2 + , is_resource_alive/3 , get_resource_params/1 , ensure_resource_deleted/1 , delete_resource/1 @@ -349,11 +349,11 @@ test_resource(#{type := Type} = Params) -> try case create_resource(maps:put(id, ResId, Params), no_retry) of {ok, _} -> - case is_source_alive(ResId, #{fetch => true}) of + case is_resource_alive(ResId, #{fetch => true}) of true -> ok; false -> - %% in is_source_alive, the cluster-call RPC logs errors + %% in is_resource_alive, the cluster-call RPC logs errors %% so we do not log anything here {error, {resource_down, ResId}} end; @@ -371,16 +371,16 @@ test_resource(#{type := Type} = Params) -> {error, {resource_type_not_found, Type}} end. -is_source_alive(ResId) -> - is_source_alive(ResId, #{fetch => false}). +is_resource_alive(ResId) -> + is_resource_alive(ResId, #{fetch => false}). -is_source_alive(ResId, Opts) -> - is_source_alive(ekka_mnesia:running_nodes(), ResId, Opts). +is_resource_alive(ResId, Opts) -> + is_resource_alive(ekka_mnesia:running_nodes(), ResId, Opts). --spec(is_source_alive(list(node()) | node(), resource_id(), #{fetch := boolean()}) -> boolean()). -is_source_alive(Node, ResId, Opts) when is_atom(Node) -> - is_source_alive([Node], ResId, Opts); -is_source_alive(Nodes, ResId, _Opts = #{fetch := true}) -> +-spec(is_resource_alive(list(node()) | node(), resource_id(), #{fetch := boolean()}) -> boolean()). +is_resource_alive(Node, ResId, Opts) when is_atom(Node) -> + is_resource_alive([Node], ResId, Opts); +is_resource_alive(Nodes, ResId, _Opts = #{fetch := true}) -> try case emqx_rule_registry:find_resource(ResId) of {ok, #resource{type = ResType}} -> @@ -389,7 +389,7 @@ is_source_alive(Nodes, ResId, _Opts = #{fetch := true}) -> case rpc:multicall(Nodes, ?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], 5000) of {ResL, []} -> - is_source_alive_(ResL); + is_resource_alive_(ResL); {_, _Error} -> false end; @@ -397,30 +397,30 @@ is_source_alive(Nodes, ResId, _Opts = #{fetch := true}) -> false end catch E:R:S -> - ?LOG(warning, "is_source_alive failed, ~0p:~0p ~0p", [E, R, S]), + ?LOG(warning, "is_resource_alive failed, ~0p:~0p ~0p", [E, R, S]), false end; -is_source_alive(Nodes, ResId, _Opts = #{fetch := false}) -> +is_resource_alive(Nodes, ResId, _Opts = #{fetch := false}) -> try case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], 5000) of {ResL, []} -> - is_source_alive_(ResL); + is_resource_alive_(ResL); {_, _Errors} -> false end catch E:R:S -> - ?LOG(warning, "is_source_alive failed, ~0p:~0p ~0p", [E, R, S]), + ?LOG(warning, "is_resource_alive failed, ~0p:~0p ~0p", [E, R, S]), false end. %% fetch_resource_status -> #{is_alive => boolean()} %% get_resource_status -> {ok, #{is_alive => boolean()}} -is_source_alive_([]) -> true; -is_source_alive_([#{is_alive := true} | ResL]) -> is_source_alive_(ResL); -is_source_alive_([#{is_alive := false} | _ResL]) -> false; -is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL); -is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false; -is_source_alive_([_Error | _ResL]) -> false. +is_resource_alive_([]) -> true; +is_resource_alive_([#{is_alive := true} | ResL]) -> is_resource_alive_(ResL); +is_resource_alive_([#{is_alive := false} | _ResL]) -> false; +is_resource_alive_([{ok, #{is_alive := true}} | ResL]) -> is_resource_alive_(ResL); +is_resource_alive_([{ok, #{is_alive := false}} | _ResL]) -> false; +is_resource_alive_([_Error | _ResL]) -> false. -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). get_resource_status(ResId) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index c196e03f7..6587f9c5f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -321,7 +321,7 @@ do_create_resource(Create, ParsedParams) -> list_resources(#{}, _Params) -> Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), Data = lists:map(fun(Res = #{id := ResId}) -> - Status = emqx_rule_engine:is_source_alive(ResId), + Status = emqx_rule_engine:is_resource_alive(ResId), maps:put(status, Status, Res) end, Data0), return({ok, Data}). @@ -336,7 +336,7 @@ show_resource(#{id := Id}, _Params) -> fun(Node) -> #{ node => Node, - is_alive => emqx_rule_engine:is_source_alive(Node, Id, #{fetch => false}) + is_alive => emqx_rule_engine:is_resource_alive(Node, Id, #{fetch => false}) } end, Status = [StatusFun(Node) || Node <- ekka_mnesia:running_nodes()], From 71a7d71f686b47a5ccec2d0af63213c40e999977 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 18 May 2022 15:18:12 +0800 Subject: [PATCH 6/6] fix(rule_engine): bad status SUITE --- apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 9d40b9876..f4c76f6e0 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -165,7 +165,7 @@ end_per_suite(_Config) -> on_resource_create(_id, _) -> #{}. on_resource_destroy(_id, _) -> ok. -on_get_resource_status(_id, _) -> #{}. +on_get_resource_status(_id, _) -> #{is_alive => true}. %%------------------------------------------------------------------------------ %% Group specific setup/teardown