diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index a7fe9c60a..a54afb7a7 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -154,6 +154,22 @@ end end()). +-define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). + +-define(CLUSTER_CALL(Func, Args, ResParttern), + fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 5000) of + {ResL, []} -> + case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of + [] -> ResL; + ErrL -> + ?LOG(error, "cluster_call error found, ResL: ~p", [ResL]), + throw({Func, ErrL}) + end; + {ResL, BadNodes} -> + ?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]), + throw({Func, {failed_on_nodes, BadNodes}}) + end end()). + %% Tables -define(RULE_TAB, emqx_rule). -define(ACTION_TAB, emqx_rule_action). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 12f00c191..c301367fd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -215,7 +215,7 @@ delete_rule(RuleId) -> case emqx_rule_registry:get_rule(RuleId) of {ok, Rule = #rule{actions = Actions}} -> try - cluster_call(clear_rule, [Rule]), + _ = ?CLUSTER_CALL(clear_rule, [Rule]), ok = emqx_rule_registry:remove_rule(Rule) catch Error:Reason:ST -> @@ -241,7 +241,7 @@ create_resource(#{type := Type, config := Config0} = Params) -> ok = emqx_rule_registry:add_resource(Resource), %% Note that we will return OK in case of resource creation failure, %% A timer is started to re-start the resource later. - catch cluster_call(init_resource, [M, F, ResId, Config]), + catch _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]), {ok, Resource}; not_found -> {error, {resource_type_not_found, Type}} @@ -289,15 +289,14 @@ do_update_resource(#{id := Id, type := Type, description := NewDescription, conf Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec), case test_resource(#{type => Type, config => NewConfig}) of ok -> - Resource = #resource{ + _ = ?CLUSTER_CALL(init_resource, [Module, Create, Id, Config]), + emqx_rule_registry:add_resource(#resource{ id = Id, type = Type, config = Config, description = NewDescription, created_at = erlang:system_time(millisecond) - }, - cluster_call(init_resource, [Module, Create, Id, Config]), - emqx_rule_registry:add_resource(Resource); + }); {error, Reason} -> error({error, Reason}) end @@ -328,8 +327,9 @@ test_resource(#{type := Type, config := Config0}) -> Config = emqx_rule_validator:validate_params(Config0, ParamSpec), ResId = resource_id(), try - cluster_call(init_resource, [ModC, Create, ResId, Config]), - cluster_call(clear_resource, [ModD, Destroy, ResId]) + _ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]), + _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), + ok catch throw:Reason -> {error, Reason} end; @@ -366,7 +366,8 @@ delete_resource(ResId) -> = emqx_rule_registry:find_resource_type(ResType), try ok = emqx_rule_registry:remove_resource(ResId), - cluster_call(clear_resource, [ModD, Destroy, ResId]) + _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), + ok catch throw:Reason -> {error, Reason} end; @@ -430,7 +431,13 @@ prepare_action(#{name := Name, args := Args0} = Action, NeedInit) -> {ok, #action{module = Mod, on_create = Create, params_spec = ParamSpec}} -> Args = emqx_rule_validator:validate_params(Args0, ParamSpec), ActionInstId = maps:get(id, Action, action_instance_id(Name)), - NeedInit andalso cluster_call(init_action, [Mod, Create, ActionInstId, with_resource_params(Args)]), + case NeedInit of + true -> + _ = ?CLUSTER_CALL(init_action, [Mod, Create, ActionInstId, + with_resource_params(Args)]), + ok; + false -> ok + end, #action_instance{ id = ActionInstId, name = Name, args = Args, fallbacks = prepare_actions(maps:get(fallbacks, Action, []), NeedInit) @@ -481,7 +488,7 @@ may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) -> may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) -> %% prepare new actions before removing old ones NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)), - cluster_call(clear_actions, [OldActions]), + _ = ?CLUSTER_CALL(clear_actions, [OldActions]), may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params)); may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params Rule. @@ -512,20 +519,6 @@ gen_id(Prefix, TestFun) -> action_instance_id(ActionName) -> iolist_to_binary([atom_to_list(ActionName), "_", integer_to_list(erlang:system_time())]). -cluster_call(Func, Args) -> - case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 5000) of - {ResL, []} -> - case lists:filter(fun(ok) -> false; (_) -> true end, ResL) of - [] -> ok; - ErrL -> - ?LOG(error, "cluster_call error found, ResL: ~p", [ResL]), - throw({func_fail(Func), ErrL}) - end; - {ResL, BadNodes} -> - ?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]), - throw({func_fail(Func), {failed_on_nodes, BadNodes}}) - end. - init_resource(Module, OnCreate, ResId, Config) -> Params = ?RAISE(Module:OnCreate(ResId, Config), {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}), @@ -642,15 +635,12 @@ refresh_actions(Actions, Pred) -> true -> {ok, #action{module = Mod, on_create = Create}} = emqx_rule_registry:find_action(ActName), - cluster_call(init_action, [Mod, Create, Id, with_resource_params(Args)]), + _ = ?CLUSTER_CALL(init_action, [Mod, Create, Id, with_resource_params(Args)]), refresh_actions(Fallbacks, Pred); false -> ok end end, Actions). -func_fail(Func) when is_atom(Func) -> - list_to_atom(atom_to_list(Func) ++ "_failure"). - find_type(ResId) -> {ok, #resource{type = Type}} = emqx_rule_registry:find_resource(ResId), {ok, Type}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index e7287d98d..82b4c2682 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -273,14 +273,13 @@ do_create_resource(Create, ParsedParams) -> return({ok, record_to_map(Resource)}); {error, {resource_type_not_found, Type}} -> return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)}); - {error, {init_resource_failure, _}} -> + {error, {init_resource, _}} -> return({error, 500, <<"Init resource failure!">>}); {error, Reason} -> ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. - list_resources(#{}, _Params) -> Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), Data = lists:map(fun(Res = #{id := Id}) -> @@ -345,7 +344,7 @@ update_resource(#{id := Id}, NewParams) -> {error, not_found} -> ?LOG(error, "Resource not found: ~0p", [Id]), return({error, 400, <<"Resource not found:", Id/binary>>}); - {error, {init_resource_failure, _}} -> + {error, {init_resource, _}} -> ?LOG(error, "Init resource failure: ~0p", [Id]), return({error, 500, <<"Init resource failure:", Id/binary>>}); {error, {dependency_exists, RuleId}} -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 80667f995..9285b0ad5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -67,6 +67,10 @@ , unregister_resource_types_of/1 ]). +-export([ load_hooks_for_rule/1 + , unload_hooks_for_rule/1 + ]). + %% for debug purposes -export([dump/0]). @@ -216,8 +220,8 @@ remove_rules(Rules) -> gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL). %% @private -insert_rule(Rule = #rule{for = Topics}) -> - lists:foreach(fun emqx_rule_events:load/1, Topics), +insert_rule(Rule) -> + _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]), mnesia:write(?RULE_TAB, Rule, write). %% @private @@ -226,15 +230,21 @@ delete_rule(RuleId) when is_binary(RuleId) -> {ok, Rule} -> delete_rule(Rule); not_found -> ok end; -delete_rule(Rule = #rule{id = Id, for = Topics}) -> +delete_rule(Rule) -> + _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]), + mnesia:delete_object(?RULE_TAB, Rule, write). + +load_hooks_for_rule(#rule{for = Topics}) -> + lists:foreach(fun emqx_rule_events:load/1, Topics). + +unload_hooks_for_rule(#rule{id = Id, for = Topics}) -> lists:foreach(fun(Topic) -> case get_rules_with_same_event(Topic) of [#rule{id = Id}] -> %% we are now deleting the last rule emqx_rule_events:unload(Topic); _ -> ok end - end, Topics), - mnesia:delete_object(?RULE_TAB, Rule, write). + end, Topics). %%------------------------------------------------------------------------------ %% Action Management