Merge pull request #5623 from zhongwencool/cluster-call-api

feat(cluster-call): support confirm success after all mfa apply ok
This commit is contained in:
zhongwencool 2021-09-13 16:03:47 +08:00 committed by GitHub
commit 86eb6605f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 182 additions and 81 deletions

View File

@ -408,6 +408,8 @@ normalize_message(high_cpu_usage, #{usage := Usage}) ->
list_to_binary(io_lib:format("~s cpu usage", [Usage])); list_to_binary(io_lib:format("~s cpu usage", [Usage]));
normalize_message(too_many_processes, #{usage := Usage}) -> normalize_message(too_many_processes, #{usage := Usage}) ->
list_to_binary(io_lib:format("~s process usage", [Usage])); list_to_binary(io_lib:format("~s process usage", [Usage]));
normalize_message(cluster_rpc_apply_failed, #{tnx_id := TnxId}) ->
list_to_binary(io_lib:format("cluster_rpc_apply_failed:~w", [TnxId]));
normalize_message(partition, #{occurred := Node}) -> normalize_message(partition, #{occurred := Node}) ->
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<!-- <!--
FILE INFORMATION FILE INFORMATION

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
<!-- <!--
FILE INFORMATION FILE INFORMATION

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<!-- <!--
FILE INFORMATION FILE INFORMATION

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
<!-- <!--
FILE INFORMATION FILE INFORMATION

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<!-- <!--
FILE INFORMATION FILE INFORMATION

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<!-- <!--
FILE INFORMATION FILE INFORMATION

View File

@ -29,7 +29,7 @@
-record(cluster_rpc_commit, { -record(cluster_rpc_commit, {
node :: node(), node :: node(),
tnx_id :: pos_integer() tnx_id :: pos_integer() | '$1'
}). }).
-endif. -endif.

View File

@ -18,7 +18,7 @@
%% API %% API
-export([start_link/0, mnesia/1]). -export([start_link/0, mnesia/1]).
-export([multicall/3, multicall/4, query/1, reset/0, status/0]). -export([multicall/3, multicall/5, query/1, reset/0, status/0, skip_failed_commit/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
handle_continue/2, code_change/3]). handle_continue/2, code_change/3]).
@ -63,8 +63,7 @@ mnesia(copy) ->
ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies).
start_link() -> start_link() ->
RetryMs = application:get_env(emqx_machine, cluster_call_retry_interval, 1000), start_link(node(), ?MODULE, get_retry_ms()).
start_link(node(), ?MODULE, RetryMs).
start_link(Node, Name, RetryMs) -> start_link(Node, Name, RetryMs) ->
gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []).
@ -76,27 +75,48 @@ start_link(Node, Name, RetryMs) ->
TnxId :: pos_integer(), TnxId :: pos_integer(),
Reason :: string(). Reason :: string().
multicall(M, F, A) -> multicall(M, F, A) ->
multicall(M, F, A, timer:minutes(2)). multicall(M, F, A, all, timer:minutes(2)).
-spec multicall(Module, Function, Args, Timeout) -> {ok, TnxId, term()} |{error, Reason} when -spec multicall(Module, Function, Args, SucceedNum, Timeout) -> {ok, TnxId, term()} |{error, Reason} when
Module :: module(), Module :: module(),
Function :: atom(), Function :: atom(),
Args :: [term()], Args :: [term()],
SucceedNum :: pos_integer() | all,
TnxId :: pos_integer(), TnxId :: pos_integer(),
Timeout :: timeout(), Timeout :: timeout(),
Reason :: string(). Reason :: string().
multicall(M, F, A, Timeout) -> multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 ->
MFA = {initiate, {M, F, A}}, MFA = {initiate, {M, F, A}},
case ekka_rlog:role() of Begin = erlang:monotonic_time(),
core -> gen_server:call(?MODULE, MFA, Timeout); InitRes =
replicant -> case ekka_rlog:role() of
%% the initiate transaction must happened on core node core -> gen_server:call(?MODULE, MFA, Timeout);
%% make sure MFA(in the transaction) and the transaction on the same node replicant ->
%% don't need rpc again inside transaction. %% the initiate transaction must happened on core node
case ekka_rlog_status:upstream_node(?COMMON_SHARD) of %% make sure MFA(in the transaction) and the transaction on the same node
{ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); %% don't need rpc again inside transaction.
disconnected -> {error, disconnected} case ekka_rlog_status:upstream_node(?COMMON_SHARD) of
end {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout);
disconnected -> {error, disconnected}
end
end,
End = erlang:monotonic_time(),
MinDelay = erlang:convert_time_unit(Begin - End, native, millisecond) + 50,
%% Fail after 3 attempts.
RetryTimeout = 3 * max(MinDelay, get_retry_ms()),
OkOrFailed =
case InitRes of
{ok, _TnxId, _} when RequireNum =:= 1 ->
ok;
{ok, TnxId, _} when RequireNum =:= all ->
wait_for_all_nodes_commit(TnxId, MinDelay, RetryTimeout);
{ok, TnxId, _} when is_integer(RequireNum) ->
wait_for_nodes_commit(RequireNum, TnxId, MinDelay, RetryTimeout);
Error -> Error
end,
case OkOrFailed of
ok -> InitRes;
_ -> OkOrFailed
end. end.
-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
@ -110,8 +130,15 @@ reset() -> gen_server:call(?MODULE, reset).
status() -> status() ->
transaction(fun trans_status/0, []). transaction(fun trans_status/0, []).
%% Regardless of what MFA is returned, consider it a success),
%% then move to the next tnxId.
%% if the next TnxId failed, need call the function again to skip.
-spec skip_failed_commit(node()) -> pos_integer().
skip_failed_commit(Node) ->
gen_server:call({?MODULE, Node}, skip_failed_commit).
%%%=================================================================== %%%===================================================================
%%% gen_statem callbacks %%% gen_server callbacks
%%%=================================================================== %%%===================================================================
%% @private %% @private
@ -135,6 +162,8 @@ handle_call({initiate, MFA}, _From, State = #{node := Node}) ->
{aborted, Reason} -> {aborted, Reason} ->
{reply, {error, Reason}, State, {continue, ?CATCH_UP}} {reply, {error, Reason}, State, {continue, ?CATCH_UP}}
end; end;
handle_call(skip_failed_commit, _From, State) ->
{reply, ok, State, catch_up(State, true)};
handle_call(_, _From, State) -> handle_call(_, _From, State) ->
{reply, ok, State, catch_up(State)}. {reply, ok, State, catch_up(State)}.
@ -155,15 +184,17 @@ code_change(_OldVsn, State, _Extra) ->
%%%=================================================================== %%%===================================================================
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
catch_up(#{node := Node, retry_interval := RetryMs} = State) -> catch_up(State) -> catch_up(State, false).
catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
case transaction(fun read_next_mfa/1, [Node]) of case transaction(fun read_next_mfa/1, [Node]) of
{atomic, caught_up} -> ?TIMEOUT; {atomic, caught_up} -> ?TIMEOUT;
{atomic, {still_lagging, NextId, MFA}} -> {atomic, {still_lagging, NextId, MFA}} ->
{Succeed, _} = apply_mfa(NextId, MFA), {Succeed, _} = apply_mfa(NextId, MFA),
case Succeed of case Succeed orelse SkipResult of
true -> true ->
case transaction(fun commit/2, [Node, NextId]) of case transaction(fun commit/2, [Node, NextId]) of
{atomic, ok} -> catch_up(State); {atomic, ok} -> catch_up(State, false);
Error -> Error ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed to commit applied call", msg => "failed to commit applied call",
@ -275,24 +306,68 @@ trans_query(TnxId) ->
#{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt}
end. end.
apply_mfa(TnxId, {M, F, A} = MFA) -> -define(TO_BIN(_B_), iolist_to_binary(io_lib:format("~p", [_B_]))).
try
Res = erlang:apply(M, F, A), apply_mfa(TnxId, {M, F, A}) ->
Succeed = Res =
case Res of try erlang:apply(M, F, A)
ok -> catch
?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), Class:Reason:Stacktrace ->
true; {error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}}
{ok, _} ->
?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}),
true;
_ ->
?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}),
false
end, end,
{Succeed, Res} Meta = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)},
catch IsSuccess = is_success(Res),
C : E -> log_and_alarm(IsSuccess, Res, Meta),
?SLOG(critical, #{msg => "crash to apply MFA", tnx_id => TnxId, mfa => MFA, exception => C, reason => E}), {IsSuccess, Res}.
{false, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))}
is_success(ok) -> true;
is_success({ok, _}) -> true;
is_success(_) -> false.
log_and_alarm(true, Res, Meta) ->
OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => Res},
?SLOG(debug, OkMeta),
emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta#{result => ?TO_BIN(Res)});
log_and_alarm(false, Res, Meta) ->
NotOkMeta = Meta#{msg => <<"failed to apply MFA">>, result => Res},
?SLOG(error, NotOkMeta),
emqx_alarm:activate(cluster_rpc_apply_failed, NotOkMeta#{result => ?TO_BIN(Res)}).
wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
ok = timer:sleep(Delay),
case lagging_node(TnxId) of
[_ | _] when Remain > 0 ->
wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay);
[] -> ok;
Nodes -> {error, Nodes}
end. end.
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) ->
ok = timer:sleep(Delay),
case length(synced_nodes(TnxId)) >= RequiredNum of
true -> ok;
false when Remain > 0 ->
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay);
false ->
case lagging_node(TnxId) of
[] -> ok; %% All commit but The succeedNum > length(nodes()).
Nodes -> {error, Nodes}
end
end.
lagging_node(TnxId) ->
{atomic, Nodes} = transaction(fun commit_status_trans/2, ['<', TnxId]),
Nodes.
synced_nodes(TnxId) ->
{atomic, Nodes} = transaction(fun commit_status_trans/2, ['>=', TnxId]),
Nodes.
commit_status_trans(Operator, TnxId) ->
MatchHead = #cluster_rpc_commit{tnx_id = '$1', node = '$2', _ = '_'},
Guard = {Operator, '$1', TnxId},
Result = '$2',
mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]).
get_retry_ms() ->
application:get_env(emqx_machine, cluster_call_retry_interval, 1000).

View File

@ -32,7 +32,8 @@ all() -> [
t_commit_crash_test, t_commit_crash_test,
t_commit_ok_but_apply_fail_on_other_node, t_commit_ok_but_apply_fail_on_other_node,
t_commit_ok_apply_fail_on_other_node_then_recover, t_commit_ok_apply_fail_on_other_node_then_recover,
t_del_stale_mfa t_del_stale_mfa,
t_skip_failed_commit
]. ].
suite() -> [{timetrap, {minutes, 3}}]. suite() -> [{timetrap, {minutes, 3}}].
groups() -> []. groups() -> [].
@ -45,19 +46,16 @@ init_per_suite(Config) ->
application:set_env(emqx_machine, cluster_call_max_history, 100), application:set_env(emqx_machine, cluster_call_max_history, 100),
application:set_env(emqx_machine, cluster_call_clean_interval, 1000), application:set_env(emqx_machine, cluster_call_clean_interval, 1000),
application:set_env(emqx_machine, cluster_call_retry_interval, 900), application:set_env(emqx_machine, cluster_call_retry_interval, 900),
%%dbg:tracer(), meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
%%dbg:p(all, c), meck:expect(emqx_alarm, activate, 2, ok),
%%dbg:tpl(emqx_cluster_rpc, cx), meck:expect(emqx_alarm, deactivate, 2, ok),
%%dbg:tpl(gen_statem, loop_receive, cx),
%%dbg:tpl(gen_statem, loop_state_callback, cx),
%%dbg:tpl(gen_statem, loop_callback_mode_result, cx),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ekka:stop(), ekka:stop(),
ekka_mnesia:ensure_stopped(), ekka_mnesia:ensure_stopped(),
ekka_mnesia:delete_schema(), ekka_mnesia:delete_schema(),
%%dbg:stop(), meck:unload(emqx_alarm),
ok. ok.
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
@ -78,7 +76,6 @@ t_base_test(_Config) ->
?assertEqual(node(), maps:get(initiator, Query)), ?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)), ?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)), ?assertEqual(ok, receive_msg(3, test)),
sleep(400),
{atomic, Status} = emqx_cluster_rpc:status(), {atomic, Status} = emqx_cluster_rpc:status(),
?assertEqual(3, length(Status)), ?assertEqual(3, length(Status)),
?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 1 end, Status)), ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 1 end, Status)),
@ -96,8 +93,10 @@ t_commit_crash_test(_Config) ->
emqx_cluster_rpc:reset(), emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, no_exist_function, []}, {M, F, A} = {?MODULE, no_exist_function, []},
Error = emqx_cluster_rpc:multicall(M, F, A), {error, {error, Meta}} = emqx_cluster_rpc:multicall(M, F, A),
?assertEqual({error, "TnxId(1) apply MFA({emqx_cluster_rpc_SUITE,no_exist_function,[]}) crash"}, Error), ?assertEqual(undef, maps:get(reason, Meta)),
?assertEqual(error, maps:get(exception, Meta)),
?assertEqual(true, maps:is_key(stacktrace, Meta)),
?assertEqual({atomic, []}, emqx_cluster_rpc:status()), ?assertEqual({atomic, []}, emqx_cluster_rpc:status()),
ok. ok.
@ -105,12 +104,12 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
emqx_cluster_rpc:reset(), emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{atomic, [Status]} = emqx_cluster_rpc:status(), {atomic, [Status]} = emqx_cluster_rpc:status(),
?assertEqual(MFA, maps:get(mfa, Status)), ?assertEqual(MFA, maps:get(mfa, Status)),
?assertEqual(node(), maps:get(node, Status)), ?assertEqual(node(), maps:get(node, Status)),
erlang:send(?NODE2, test), erlang:send(?NODE2, test),
Res = gen_statem:call(?NODE2, {initiate, {M, F, A}}), Res = gen_server:call(?NODE2, {initiate, {M, F, A}}),
?assertEqual({error, "MFA return not ok"}, Res), ?assertEqual({error, "MFA return not ok"}, Res),
ok. ok.
@ -118,8 +117,8 @@ t_catch_up_status_handle_next_commit(_Config) ->
emqx_cluster_rpc:reset(), emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]},
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), {ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}), {ok, 2} = gen_server:call(?NODE2, {initiate, {M, F, A}}),
ok. ok.
t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
@ -127,8 +126,8 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
Now = erlang:system_time(second), Now = erlang:system_time(second),
{M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]}, {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]},
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"]), {ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000),
{atomic, [Status|L]} = emqx_cluster_rpc:status(), {atomic, [Status|L]} = emqx_cluster_rpc:status(),
?assertEqual([], L), ?assertEqual([], L),
?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)),
@ -177,6 +176,17 @@ t_del_stale_mfa(_Config) ->
end || I <- lists:seq(51, 150)], end || I <- lists:seq(51, 150)],
ok. ok.
t_skip_failed_commit(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
ok = gen_server:call(?NODE2, skip_failed_commit, 5000),
{atomic, List} = emqx_cluster_rpc:status(),
?assertEqual([1, 2, 2], lists:sort(lists:map(fun(#{tnx_id := TnxId}) -> TnxId end, List))),
ok.
start() -> start() ->
{ok, Pid1} = emqx_cluster_rpc:start_link(), {ok, Pid1} = emqx_cluster_rpc:start_link(),
{ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500), {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),

View File

@ -220,31 +220,45 @@ remove_rules(Rules) ->
gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL). gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL).
%% @private %% @private
insert_rule(Rule) ->
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, load_hooks_for_rule, [Rule]), insert_rules([]) -> ok;
mnesia:write(?RULE_TAB, Rule, write). insert_rules(Rules) ->
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, load_hooks_for_rule, [Rules]),
[mnesia:write(?RULE_TAB, Rule, write) ||Rule <- Rules].
%% @private %% @private
delete_rule(RuleId) when is_binary(RuleId) -> delete_rules([]) -> ok;
case get_rule(RuleId) of delete_rules(Rules = [R|_]) when is_binary(R) ->
{ok, Rule} -> delete_rule(Rule); RuleRecs =
not_found -> ok lists:foldl(fun(RuleId, Acc) ->
end; case get_rule(RuleId) of
delete_rule(Rule) -> {ok, Rule} -> [Rule|Acc];
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, unload_hooks_for_rule, [Rule]), not_found -> Acc
mnesia:delete_object(?RULE_TAB, Rule, write). end
end, [], Rules),
delete_rules_unload_hooks(RuleRecs);
delete_rules(Rules = [Rule|_]) when is_record(Rule, rule) ->
delete_rules_unload_hooks(Rules).
load_hooks_for_rule(#rule{for = Topics}) -> delete_rules_unload_hooks(Rules) ->
lists:foreach(fun emqx_rule_events:load/1, Topics). _ = emqx_plugin_libs_rule:cluster_call(?MODULE, unload_hooks_for_rule, [Rules]),
[mnesia:delete_object(?RULE_TAB, Rule, write) ||Rule <- Rules].
unload_hooks_for_rule(#rule{id = Id, for = Topics}) -> load_hooks_for_rule(Rules) ->
lists:foreach(fun(Topic) -> lists:foreach(fun(#rule{for = Topics}) ->
lists:foreach(fun emqx_rule_events:load/1, Topics)
end, Rules).
unload_hooks_for_rule(Rules) ->
lists:foreach(fun(#rule{id = Id, for = Topics}) ->
lists:foreach(fun(Topic) ->
case get_rules_with_same_event(Topic) of case get_rules_with_same_event(Topic) of
[#rule{id = Id}] -> %% we are now deleting the last rule [#rule{id = Id}] -> %% we are now deleting the last rule
emqx_rule_events:unload(Topic); emqx_rule_events:unload(Topic);
_ -> ok _ -> ok
end end
end, Topics). end, Topics)
end, Rules).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Action Management %% Action Management
@ -445,11 +459,11 @@ init([]) ->
{ok, #{}}. {ok, #{}}.
handle_call({add_rules, Rules}, _From, State) -> handle_call({add_rules, Rules}, _From, State) ->
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), trans(fun insert_rules/1, [Rules]),
{reply, ok, State}; {reply, ok, State};
handle_call({remove_rules, Rules}, _From, State) -> handle_call({remove_rules, Rules}, _From, State) ->
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), trans(fun delete_rules/1, [Rules]),
{reply, ok, State}; {reply, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->