From 53e386ad4ece13d10690057ef963825371c55bff Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 19 Aug 2021 22:55:38 +0800 Subject: [PATCH 01/11] feat(emqx_cluster_call): ensure the consistency of resources When EMQX updates the cluster resources via HTTP API, it first updates the local node resources, and then updates all other nodes via RPC Multi Call to ensure the consistency of resources (configuration) in the cluster. --- apps/emqx/include/emqx_cluster_rpc.hrl | 35 +++ apps/emqx/src/emqx_cluster_rpc.erl | 321 +++++++++++++++++++++ apps/emqx/src/emqx_cluster_rpc_handler.erl | 94 ++++++ apps/emqx/src/emqx_kernel_sup.erl | 2 + apps/emqx/test/emqx_cluster_rpc_SUITE.erl | 247 ++++++++++++++++ 5 files changed, 699 insertions(+) create mode 100644 apps/emqx/include/emqx_cluster_rpc.hrl create mode 100644 apps/emqx/src/emqx_cluster_rpc.erl create mode 100644 apps/emqx/src/emqx_cluster_rpc_handler.erl create mode 100644 apps/emqx/test/emqx_cluster_rpc_SUITE.erl diff --git a/apps/emqx/include/emqx_cluster_rpc.hrl b/apps/emqx/include/emqx_cluster_rpc.hrl new file mode 100644 index 000000000..5c04346b7 --- /dev/null +++ b/apps/emqx/include/emqx_cluster_rpc.hrl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQ_X_CLUSTER_RPC_HRL). +-define(EMQ_X_CLUSTER_RPC_HRL, true). + +-define(CLUSTER_MFA, cluster_rpc_mfa). +-define(CLUSTER_COMMIT, cluster_rpc_commit). + +-record(cluster_rpc_mfa, { + tnx_id :: pos_integer(), + mfa :: mfa(), + created_at :: calendar:datetime(), + initiator :: node() +}). + +-record(cluster_rpc_commit, { + node :: node(), + tnx_id :: pos_integer() +}). + +-endif. diff --git a/apps/emqx/src/emqx_cluster_rpc.erl b/apps/emqx/src/emqx_cluster_rpc.erl new file mode 100644 index 000000000..301603c63 --- /dev/null +++ b/apps/emqx/src/emqx_cluster_rpc.erl @@ -0,0 +1,321 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_cluster_rpc). +-behaviour(gen_statem). + +%% API +-export([start_link/0, mnesia/1]). +-export([multicall/3, multicall/4, query/1, reset/0, status/0]). + +-export([init/1, format_status/2, handle_event/4, terminate/3, + code_change/4, callback_mode/0]). + +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-export([start_link/2]). +-endif. + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-include("emqx.hrl"). +-include("logger.hrl"). +-include("emqx_cluster_rpc.hrl"). + +-rlog_shard({?COMMON_SHARD, ?CLUSTER_MFA}). +-rlog_shard({?COMMON_SHARD, ?CLUSTER_COMMIT}). + +-define(CATCH_UP, catch_up). +-define(REALTIME, realtime). +-define(CATCH_UP_AFTER(_Sec_), {state_timeout, timer:seconds(_Sec_), catch_up_delay}). + +%%%=================================================================== +%%% API +%%%=================================================================== +mnesia(boot) -> + ok = ekka_mnesia:create_table(?CLUSTER_MFA, [ + {type, ordered_set}, + {disc_copies, [node()]}, + {local_content, true}, + {record_name, cluster_rpc_mfa}, + {attributes, record_info(fields, cluster_rpc_mfa)}]), + ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [ + {type, set}, + {disc_copies, [node()]}, + {local_content, true}, + {record_name, cluster_rpc_commit}, + {attributes, record_info(fields, cluster_rpc_commit)}]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(cluster_rpc_mfa, disc_copies), + ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). + +start_link() -> + start_link(node(), ?MODULE). +start_link(Node, Name) -> + gen_statem:start_link({local, Name}, ?MODULE, [Node], []). + +multicall(M, F, A) -> + multicall(M, F, A, timer:minutes(2)). + +-spec multicall(Module, Function, Args, Timeout) -> {ok, TnxId} |{error, Reason} when + Module :: module(), + Function :: atom(), + Args :: [term()], + TnxId :: pos_integer(), + Timeout :: timeout(), + Reason :: term(). +multicall(M, F, A, Timeout) -> + MFA = {initiate, {M, F, A}}, + case ekka_rlog:role() of + core -> gen_statem:call(?MODULE, MFA, Timeout); + replicant -> + case ekka_rlog_status:upstream_node(?COMMON_SHARD) of + {ok, Node} -> gen_statem:call({?MODULE, Node}, MFA, Timeout); + disconnected -> {error, disconnected} + end + end. + +-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. +query(TnxId) -> + Fun = fun() -> + case mnesia:read(?CLUSTER_MFA, TnxId) of + [] -> mnesia:abort(not_found); + [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> + #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} + end + end, + transaction(Fun). + +-spec reset() -> reset. +reset() -> gen_statem:call(?MODULE, reset). + +-spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}. +status() -> + Fun = fun() -> + mnesia:foldl(fun(Rec, Acc) -> + #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, + case mnesia:read(?CLUSTER_MFA, TnxId) of + [MFARec] -> + #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec, + [#{ + node => Node, + tnx_id => TnxId, + initiator => InitNode, + mfa => MFA, + created_at => CreatedAt + } | Acc]; + [] -> Acc + end end, [], ?CLUSTER_COMMIT) + end, + transaction(Fun). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +init([Node]) -> + {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), + {ok, ?CATCH_UP, Node, ?CATCH_UP_AFTER(0)}. + +callback_mode() -> + handle_event_function. + +%% @private +format_status(Opt, [_PDict, StateName, Node]) -> + #{state => StateName, node => Node, opt => Opt}. + +%% @private +handle_event(state_timeout, catch_up_delay, _State, Node) -> + catch_up(Node); + +handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{} = MFARec, _AId}}, ?REALTIME, Node) -> + handle_mfa_write_event(MFARec, Node); +handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{}, _ActivityId}}, ?CATCH_UP, _Node) -> + {keep_state_and_data, [postpone, ?CATCH_UP_AFTER(0)]}; +%% we should catch up as soon as possible when we reset all. +handle_event(info, {mnesia_table_event, {delete,{schema, ?CLUSTER_MFA}, _Tid}}, _, _Node) -> + {keep_state_and_data, [?CATCH_UP_AFTER(0)]}; + +handle_event({call, From}, reset, _State, _Node) -> + _ = ekka_mnesia:clear_table(?CLUSTER_COMMIT), + _ = ekka_mnesia:clear_table(?CLUSTER_MFA), + {keep_state_and_data, [{reply, From, ok}, ?CATCH_UP_AFTER(0)]}; + +handle_event({call, From}, {initiate, MFA}, ?REALTIME, Node) -> + case transaction(fun() -> init_mfa(Node, MFA) end) of + {atomic, {ok, TnxId}} -> + {keep_state, Node, [{reply, From, {ok, TnxId}}]}; + {aborted, Reason} -> + {keep_state, Node, [{reply, From, {error, Reason}}]} + end; +handle_event({call, From}, {initiate, _MFA}, ?CATCH_UP, Node) -> + case catch_up(Node) of + {next_state, ?REALTIME, Node} -> + {next_state, ?REALTIME, Node, [{postpone, true}]}; + _ -> + Reason = "There are still transactions that have not been executed.", + {keep_state, Node, [{reply, From, {error, Reason}}, ?CATCH_UP_AFTER(1)]} + end; + +handle_event(_EventType, _EventContent, ?CATCH_UP, _Node) -> + {keep_state_and_data, [?CATCH_UP_AFTER(10)]}; +handle_event(_EventType, _EventContent, _StateName, _Node) -> + keep_state_and_data. + +terminate(_Reason, _StateName, _Node) -> + ok. + +code_change(_OldVsn, StateName, Node, _Extra) -> + {ok, StateName, Node}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +catch_up(Node) -> + case get_next_mfa(Node) of + {atomic, caught_up} -> {next_state, ?REALTIME, Node}; + {atomic, {still_lagging, NextId, MFA}} -> + case apply_mfa(NextId, MFA) of + ok -> + case transaction(fun() -> commit(Node, NextId) end) of + {atomic, ok} -> catch_up(Node); + _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + end; + _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + end; + {aborted, _Reason} -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + end. + +get_next_mfa(Node) -> + Fun = + fun() -> + NextId = + case mnesia:wread({?CLUSTER_COMMIT, Node}) of + [] -> + LatestId = get_latest_id(), + TnxId = max(LatestId - 1, 0), + commit(Node, TnxId), + ?LOG(notice, "New node(~p) first catch up and start commit at ~p", [Node, TnxId]), + TnxId; + [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 + end, + case mnesia:read(?CLUSTER_MFA, NextId) of + [] -> caught_up; + [#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA} + end + end, + transaction(Fun). + +do_catch_up(ToTnxId, Node) -> + case mnesia:wread({?CLUSTER_COMMIT, Node}) of + [] -> + commit(Node, ToTnxId), + caught_up; + [#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId =:= LastAppliedId -> + caught_up; + [#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId > LastAppliedId -> + CurTnxId = LastAppliedId + 1, + [#cluster_rpc_mfa{mfa = MFA}] = mnesia:read(?CLUSTER_MFA, CurTnxId), + case apply_mfa(CurTnxId, MFA) of + ok -> ok = commit(Node, CurTnxId); + {error, Reason} -> mnesia:abort(Reason); + Other -> mnesia:abort(Other) + end; + [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> + Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", + [Node, LastAppliedId, ToTnxId])), + ?LOG(error, Reason), + {error, Reason} + end. + +commit(Node, TnxId) -> + ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write). + +get_latest_id() -> + case mnesia:last(?CLUSTER_MFA) of + '$end_of_table' -> 0; + Id -> Id + end. + +handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Node) -> + {atomic, LastAppliedId} = transaction(fun() -> get_last_applied_id(Node, EventId - 1) end), + if LastAppliedId + 1 =:= EventId -> + case apply_mfa(EventId, MFA) of + ok -> + case transaction(fun() -> commit(Node, EventId) end) of + {atomic, ok} -> + {next_state, ?REALTIME, Node}; + _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + end; + _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + end; + LastAppliedId >= EventId -> %% It's means the initiator receive self event or other receive stale event. + keep_state_and_data; + true -> + ?LOG(error, "LastAppliedId+1 + case mnesia:wread({?CLUSTER_COMMIT, Node}) of + [#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId; + [] -> + commit(Node, Default), + Default + end. + +init_mfa(Node, MFA) -> + mnesia:write_lock_table(?CLUSTER_MFA), + LatestId = get_latest_id(), + ok = do_catch_up_in_one_trans(LatestId, Node), + TnxId = LatestId + 1, + MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA, initiator = Node, created_at = erlang:localtime()}, + ok = mnesia:write(?CLUSTER_MFA, MFARec, write), + ok = commit(Node, TnxId), + case apply_mfa(TnxId, MFA) of + ok -> {ok, TnxId}; + {error, Reason} -> mnesia:abort(Reason); + Other -> mnesia:abort(Other) + end. + +do_catch_up_in_one_trans(LatestId, Node) -> + case do_catch_up(LatestId, Node) of + caught_up -> ok; + ok -> do_catch_up_in_one_trans(LatestId, Node); + {error, Reason} -> mnesia:abort(Reason) + end. + +transaction(Fun) -> + ekka_mnesia:transaction(?COMMON_SHARD, Fun). + +apply_mfa(TnxId, {M, F, A} = MFA) -> + try + Res = erlang:apply(M, F, A), + case Res =:= ok of + true -> + ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => ok}); + false -> + ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}) + end, + Res + catch + C : E -> + ?SLOG(critical, #{msg => "crash to apply MFA", tnx_id => TnxId, mfa => MFA, exception => C, reason => E}), + {error, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))} + end. diff --git a/apps/emqx/src/emqx_cluster_rpc_handler.erl b/apps/emqx/src/emqx_cluster_rpc_handler.erl new file mode 100644 index 000000000..a51ef3d74 --- /dev/null +++ b/apps/emqx/src/emqx_cluster_rpc_handler.erl @@ -0,0 +1,94 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_cluster_rpc_handler). + +-behaviour(gen_server). + +-include("emqx.hrl"). +-include("logger.hrl"). +-include("emqx_cluster_rpc.hrl"). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-define(MFA_HISTORY_LEN, 100). + +start_link() -> + gen_server:start_link(?MODULE, [], []). + +%%%=================================================================== +%%% Spawning and gen_server implementation +%%%=================================================================== + +init([]) -> + _ = emqx_misc:rand_seed(), + {ok, ensure_timer(#{timer => undefined})}. + +handle_call(Req, _From, State) -> + ?LOG(error, "unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?LOG(error, "unexpected msg: ~p", [Msg]), + {noreply, State}. + +handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef}) -> + case ekka_mnesia:transaction(?COMMON_SHARD, fun del_stale_mfa/0, []) of + {atomic, ok} -> ok; + Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error]) + end, + {noreply, ensure_timer(State), hibernate}; + +handle_info(Info, State) -> + ?LOG(error, "unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #{timer := TRef}) -> + emqx_misc:cancel_timer(TRef). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +-ifdef(TEST). +ensure_timer(State) -> + State#{timer := emqx_misc:start_timer(timer:seconds(1), del_stale_mfa)}. +-else. +ensure_timer(State) -> + Ms = timer:minutes(5) + rand:uniform(5000), + State#{timer := emqx_misc:start_timer(Ms, del_stale_mfa)}. +-endif. + + +%% @doc Keep the latest completed 100 records for querying and troubleshooting. +del_stale_mfa() -> + DoneId = + mnesia:foldl(fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end, + infinity, ?CLUSTER_COMMIT), + delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, ?MFA_HISTORY_LEN). + +delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok; +delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> + delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count); +delete_stale_mfa(CurrId, DoneId, Count) when Count > 0 -> + delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count - 1); +delete_stale_mfa(CurrId, DoneId, Count) when Count =< 0 -> + mnesia:delete(?CLUSTER_MFA, CurrId, write), + delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count - 1). diff --git a/apps/emqx/src/emqx_kernel_sup.erl b/apps/emqx/src/emqx_kernel_sup.erl index defe96182..39a8d4fba 100644 --- a/apps/emqx/src/emqx_kernel_sup.erl +++ b/apps/emqx/src/emqx_kernel_sup.erl @@ -29,6 +29,8 @@ init([]) -> {ok, {{one_for_one, 10, 100}, %% always start emqx_config_handler first to load the emqx.conf to emqx_config [ child_spec(emqx_config_handler, worker) + , child_spec(emqx_cluster_rpc, worker) + , child_spec(emqx_cluster_rpc_handler, worker) , child_spec(emqx_pool_sup, supervisor) , child_spec(emqx_hooks, worker) , child_spec(emqx_stats, worker) diff --git a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl new file mode 100644 index 000000000..6cdb34c6c --- /dev/null +++ b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl @@ -0,0 +1,247 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_rpc_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-define(NODE1, emqx_cluster_rpc). +-define(NODE2, emqx_cluster_rpc2). +-define(NODE3, emqx_cluster_rpc3). + +all() -> [ + t_base_test, + t_commit_fail_test, + t_commit_crash_test, + t_commit_ok_but_apply_fail_on_other_node, + t_commit_ok_apply_fail_on_other_node_then_recover, + t_del_stale_mfa +]. +suite() -> [{timetrap, {minutes, 3}}]. +groups() -> []. + +init_per_suite(Config) -> + application:load(emqx), + ok = ekka:start(), + emqx_cluster_rpc:mnesia(copy), + %%dbg:tracer(), + %%dbg:p(all, c), + %%dbg:tpl(emqx_cluster_rpc, cx), + %%dbg:tpl(gen_statem, loop_receive, cx), + %%dbg:tpl(gen_statem, loop_state_callback, cx), + %%dbg:tpl(gen_statem, loop_callback_mode_result, cx), + Config. + +end_per_suite(_Config) -> + ekka:stop(), + ekka_mnesia:ensure_stopped(), + ekka_mnesia:delete_schema(), + %%dbg:stop(), + ok. + +init_per_testcase(_TestCase, Config) -> + start(), + Config. + +end_per_testcase(_Config) -> + stop(), + ok. + +t_base_test(_Config) -> + ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), + Pid = self(), + MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, + {ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A), + {atomic, Query} = emqx_cluster_rpc:query(TnxId), + ?assertEqual(MFA, maps:get(mfa, Query)), + ?assertEqual(node(), maps:get(initiator, Query)), + ?assert(maps:is_key(created_at, Query)), + ?assertEqual(ok, receive_msg(3, test)), + SysStatus = lists:last(lists:last(element(4,sys:get_status(?NODE1)))), + ?assertEqual(#{node => node(), opt => normal, state => realtime}, SysStatus), + sleep(400), + {atomic, Status} = emqx_cluster_rpc:status(), + ?assertEqual(3, length(Status)), + ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 1 end, Status)), + ok. + +t_commit_fail_test(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE2)]}, + {error, "MFA return not ok"} = emqx_cluster_rpc:multicall(M, F, A), + ?assertEqual({atomic, []}, emqx_cluster_rpc:status()), + ok. + +t_commit_crash_test(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + {M, F, A} = {?MODULE, no_exist_function, []}, + Error = emqx_cluster_rpc:multicall(M, F, A), + ?assertEqual({error, "TnxId(1) apply MFA({emqx_cluster_rpc_SUITE,no_exist_function,[]}) crash"}, Error), + ?assertEqual({atomic, []}, emqx_cluster_rpc:status()), + ok. + +t_commit_ok_but_apply_fail_on_other_node(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, + {ok, _} = emqx_cluster_rpc:multicall(M, F, A), + {atomic, [Status]} = emqx_cluster_rpc:status(), + ?assertEqual(MFA, maps:get(mfa, Status)), + ?assertEqual(node(), maps:get(node, Status)), + ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), + ?assertEqual(catch_up, element(1, sys:get_state(?NODE2))), + ?assertEqual(catch_up, element(1, sys:get_state(?NODE3))), + erlang:send(?NODE2, test), + Res = gen_statem:call(?NODE2, {initiate, {M, F, A}}), + ?assertEqual({error, "There are still transactions that have not been executed."}, Res), + ok. + +t_catch_up_status_handle_next_commit(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, + {ok, _} = emqx_cluster_rpc:multicall(M, F, A), + ?assertEqual(catch_up, element(1, sys:get_state(?NODE2))), + {ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}), + ok. + +t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + Now = erlang:system_time(second), + {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]}, + {ok, _} = emqx_cluster_rpc:multicall(M, F, A), + {ok, _} = emqx_cluster_rpc:multicall(io, format, ["test"]), + {atomic, [Status]} = emqx_cluster_rpc:status(), + ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), + ?assertEqual(node(), maps:get(node, Status)), + ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), + ?assertEqual(catch_up, element(1, sys:get_state(?NODE2))), + ?assertEqual(catch_up, element(1, sys:get_state(?NODE3))), + sleep(4000), + {atomic, [Status1]} = emqx_cluster_rpc:status(), + ?assertEqual(Status, Status1), + sleep(1600), + {atomic, NewStatus} = emqx_cluster_rpc:status(), + ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), + ?assertEqual(realtime, element(1, sys:get_state(?NODE2))), + ?assertEqual(realtime, element(1, sys:get_state(?NODE3))), + ?assertEqual(3, length(NewStatus)), + Pid = self(), + MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, + {ok, TnxId} = emqx_cluster_rpc:multicall(M1, F1, A1), + {atomic, Query} = emqx_cluster_rpc:query(TnxId), + ?assertEqual(MFAEcho, maps:get(mfa, Query)), + ?assertEqual(node(), maps:get(initiator, Query)), + ?assert(maps:is_key(created_at, Query)), + ?assertEqual(ok, receive_msg(3, test)), + ok. + +t_del_stale_mfa(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + MFA = {M, F, A} = {io, format, ["test"]}, + Keys = lists:seq(1, 50), + Keys2 = lists:seq(51, 150), + Ids = + [begin + {ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A), + TnxId end || _ <- Keys], + ?assertEqual(Keys, Ids), + Ids2 = + [begin + {ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A), + TnxId end || _ <- Keys2], + ?assertEqual(Keys2, Ids2), + sleep(1200), + [begin + ?assertEqual({aborted, not_found}, emqx_cluster_rpc:query(I)) + end || I <- lists:seq(1, 50)], + [begin + {atomic, Map} = emqx_cluster_rpc:query(I), + ?assertEqual(MFA, maps:get(mfa, Map)), + ?assertEqual(node(), maps:get(initiator, Map)), + ?assert(maps:is_key(created_at, Map)) + end || I <- lists:seq(51, 150)], + ok. + +start() -> + {ok, Pid1} = emqx_cluster_rpc:start_link(), + {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2), + {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3), + {ok, Pid4} = emqx_cluster_rpc_handler:start_link(), + {ok, [Pid1, Pid2, Pid3, Pid4]}. + +stop() -> + [begin + case erlang:whereis(N) of + undefined -> ok; + P -> + erlang:unlink(P), + erlang:exit(P, kill) + end end || N <- [?NODE1, ?NODE2, ?NODE3]]. + +receive_msg(0, _Msg) -> ok; +receive_msg(Count, Msg) when Count > 0 -> + receive Msg -> + receive_msg(Count - 1, Msg) + after 800 -> + timeout + end. + +echo(Pid, Msg) -> + erlang:send(Pid, Msg), + ok. + +failed_on_node(Pid) -> + case Pid =:= self() of + true -> ok; + false -> "MFA return not ok" + end. + +failed_on_node_by_odd(Pid) -> + case Pid =:= self() of + true -> ok; + false -> + catch ets:new(test, [named_table, set, public]), + Num = ets:update_counter(test, self(), {2, 1}, {self(), 1}), + case Num rem 2 =:= 0 of + false -> "MFA return not ok"; + true -> ok + end + end. + +failed_on_other_recover_after_5_second(Pid, CreatedAt) -> + Now = erlang:system_time(second), + case Pid =:= self() of + true -> ok; + false -> + case Now < CreatedAt + 5 of + true -> "MFA return not ok"; + false -> ok + end + end. + +sleep(Second) -> + receive _ -> ok + after Second -> timeout + end. From 11e87ab9a37282f9c10f82fb19458737d4102529 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 19 Aug 2021 23:15:27 +0800 Subject: [PATCH 02/11] chore(CI): DIAGNOSTIC=1 for more error message --- .github/workflows/run_test_cases.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index fcd5e8374..b97c3c003 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -98,8 +98,8 @@ jobs: - name: run cover run: | printenv > .env - docker exec -i ${{ matrix.otp_release }} bash -c "make cover" - docker exec --env-file .env -i ${{ matrix.otp_release }} bash -c "make coveralls" + docker exec -i ${{ matrix.otp_release }} bash -c "DIAGNOSTIC=1 make cover" + docker exec --env-file .env -i ${{ matrix.otp_release }} bash -c "DIAGNOSTIC=1 make coveralls" - name: cat rebar.crashdump if: failure() run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi From e5129ead6de223eb0e25b56b4270aada2ef36d92 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 20 Aug 2021 08:31:44 +0800 Subject: [PATCH 03/11] fix(CI): Error detected: 'Invalid reference to group api in emqx_rule_engine_SUITE:all/0' --- apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index a056d0c26..bf4dcb30e 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -30,7 +30,7 @@ all() -> [ {group, engine} , {group, actions} - , {group, api} +%% , {group, api} , {group, cli} , {group, funcs} , {group, registry} From 765c94152bb2cbdfc339ea49e77ab601345e5a75 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 20 Aug 2021 11:46:58 +0800 Subject: [PATCH 04/11] feat: add cluster_call.retry_interval/mfa_max_history/mfa_cleanup_interval config --- apps/emqx/etc/emqx.conf | 3 +- apps/emqx/src/emqx_cluster_rpc.erl | 105 +++++++++++---------- apps/emqx/src/emqx_cluster_rpc_handler.erl | 33 +++---- apps/emqx/src/emqx_schema.erl | 7 ++ apps/emqx/test/emqx_cluster_rpc_SUITE.erl | 13 ++- 5 files changed, 88 insertions(+), 73 deletions(-) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index c85e7aa29..e3c701e28 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -490,6 +490,7 @@ listeners.wss.default { ## Websocket options ## See ${example_common_websocket_options} for more information websocket.idle_timeout = 86400s + } ## Enable per connection statistics. @@ -1071,7 +1072,7 @@ broker { ## are mostly published to topics with large number of levels. ## ## NOTE: This is a cluster-wide configuration. - ## It rquires all nodes to be stopped before changing it. + ## It requires all nodes to be stopped before changing it. ## ## @doc broker.perf.trie_compaction ## ValueType: Boolean diff --git a/apps/emqx/src/emqx_cluster_rpc.erl b/apps/emqx/src/emqx_cluster_rpc.erl index 301603c63..30f9e3234 100644 --- a/apps/emqx/src/emqx_cluster_rpc.erl +++ b/apps/emqx/src/emqx_cluster_rpc.erl @@ -26,7 +26,7 @@ -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). --export([start_link/2]). +-export([start_link/3]). -endif. -boot_mnesia({mnesia, [boot]}). @@ -41,7 +41,7 @@ -define(CATCH_UP, catch_up). -define(REALTIME, realtime). --define(CATCH_UP_AFTER(_Sec_), {state_timeout, timer:seconds(_Sec_), catch_up_delay}). +-define(CATCH_UP_AFTER(_Ms_), {state_timeout, _Ms_, catch_up_delay}). %%%=================================================================== %%% API @@ -64,9 +64,10 @@ mnesia(copy) -> ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). start_link() -> - start_link(node(), ?MODULE). -start_link(Node, Name) -> - gen_statem:start_link({local, Name}, ?MODULE, [Node], []). + RetryMs = emqx:get_config([broker, cluster_call, retry_interval]), + start_link(node(), ?MODULE, RetryMs). +start_link(Node, Name, RetryMs) -> + gen_statem:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). multicall(M, F, A) -> multicall(M, F, A, timer:minutes(2)). @@ -91,14 +92,14 @@ multicall(M, F, A, Timeout) -> -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. query(TnxId) -> - Fun = fun() -> - case mnesia:read(?CLUSTER_MFA, TnxId) of - [] -> mnesia:abort(not_found); - [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> - #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} - end - end, - transaction(Fun). + transaction(fun do_query/1, [TnxId]). + +do_query(TnxId) -> + case mnesia:read(?CLUSTER_MFA, TnxId) of + [] -> mnesia:abort(not_found); + [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> + #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} + end. -spec reset() -> reset. reset() -> gen_statem:call(?MODULE, reset). @@ -128,77 +129,77 @@ status() -> %%%=================================================================== %% @private -init([Node]) -> +init([Node, RetryMs]) -> {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), - {ok, ?CATCH_UP, Node, ?CATCH_UP_AFTER(0)}. + {ok, ?CATCH_UP, #{node => Node, retry_interval => RetryMs}, ?CATCH_UP_AFTER(0)}. callback_mode() -> handle_event_function. %% @private -format_status(Opt, [_PDict, StateName, Node]) -> - #{state => StateName, node => Node, opt => Opt}. +format_status(Opt, [_PDict, StateName, Data]) -> + #{state => StateName, data => Data , opt => Opt}. %% @private -handle_event(state_timeout, catch_up_delay, _State, Node) -> - catch_up(Node); +handle_event(state_timeout, catch_up_delay, _State, Data) -> + catch_up(Data); -handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{} = MFARec, _AId}}, ?REALTIME, Node) -> - handle_mfa_write_event(MFARec, Node); -handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{}, _ActivityId}}, ?CATCH_UP, _Node) -> +handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{} = MFARec, _AId}}, ?REALTIME, Data) -> + handle_mfa_write_event(MFARec, Data); +handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{}, _ActivityId}}, ?CATCH_UP, _Data) -> {keep_state_and_data, [postpone, ?CATCH_UP_AFTER(0)]}; %% we should catch up as soon as possible when we reset all. -handle_event(info, {mnesia_table_event, {delete,{schema, ?CLUSTER_MFA}, _Tid}}, _, _Node) -> +handle_event(info, {mnesia_table_event, {delete,{schema, ?CLUSTER_MFA}, _Tid}}, _, _Data) -> {keep_state_and_data, [?CATCH_UP_AFTER(0)]}; -handle_event({call, From}, reset, _State, _Node) -> +handle_event({call, From}, reset, _State, _Data) -> _ = ekka_mnesia:clear_table(?CLUSTER_COMMIT), _ = ekka_mnesia:clear_table(?CLUSTER_MFA), {keep_state_and_data, [{reply, From, ok}, ?CATCH_UP_AFTER(0)]}; -handle_event({call, From}, {initiate, MFA}, ?REALTIME, Node) -> +handle_event({call, From}, {initiate, MFA}, ?REALTIME, Data = #{node := Node}) -> case transaction(fun() -> init_mfa(Node, MFA) end) of {atomic, {ok, TnxId}} -> - {keep_state, Node, [{reply, From, {ok, TnxId}}]}; + {keep_state, Data, [{reply, From, {ok, TnxId}}]}; {aborted, Reason} -> - {keep_state, Node, [{reply, From, {error, Reason}}]} + {keep_state, Data, [{reply, From, {error, Reason}}]} end; -handle_event({call, From}, {initiate, _MFA}, ?CATCH_UP, Node) -> - case catch_up(Node) of - {next_state, ?REALTIME, Node} -> - {next_state, ?REALTIME, Node, [{postpone, true}]}; +handle_event({call, From}, {initiate, _MFA}, ?CATCH_UP, Data = #{retry_interval := RetryMs}) -> + case catch_up(Data) of + {next_state, ?REALTIME, Data} -> + {next_state, ?REALTIME, Data, [{postpone, true}]}; _ -> Reason = "There are still transactions that have not been executed.", - {keep_state, Node, [{reply, From, {error, Reason}}, ?CATCH_UP_AFTER(1)]} + {keep_state_and_data, [{reply, From, {error, Reason}}, ?CATCH_UP_AFTER(RetryMs)]} end; -handle_event(_EventType, _EventContent, ?CATCH_UP, _Node) -> - {keep_state_and_data, [?CATCH_UP_AFTER(10)]}; -handle_event(_EventType, _EventContent, _StateName, _Node) -> +handle_event(_EventType, _EventContent, ?CATCH_UP, #{retry_interval := RetryMs}) -> + {keep_state_and_data, [?CATCH_UP_AFTER(RetryMs)]}; +handle_event(_EventType, _EventContent, _StateName, _Data) -> keep_state_and_data. -terminate(_Reason, _StateName, _Node) -> +terminate(_Reason, _StateName, _Data) -> ok. -code_change(_OldVsn, StateName, Node, _Extra) -> - {ok, StateName, Node}. +code_change(_OldVsn, StateName, Data, _Extra) -> + {ok, StateName, Data}. %%%=================================================================== %%% Internal functions %%%=================================================================== -catch_up(Node) -> +catch_up(#{node := Node, retry_interval := RetryMs} = Data) -> case get_next_mfa(Node) of - {atomic, caught_up} -> {next_state, ?REALTIME, Node}; + {atomic, caught_up} -> {next_state, ?REALTIME, Data}; {atomic, {still_lagging, NextId, MFA}} -> case apply_mfa(NextId, MFA) of ok -> case transaction(fun() -> commit(Node, NextId) end) of - {atomic, ok} -> catch_up(Node); - _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + {atomic, ok} -> catch_up(Data); + _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} end; - _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} end; - {aborted, _Reason} -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + {aborted, _Reason} -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} end. get_next_mfa(Node) -> @@ -252,17 +253,18 @@ get_latest_id() -> Id -> Id end. -handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Node) -> +handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Data) -> + #{node := Node, retry_interval := RetryMs} = Data, {atomic, LastAppliedId} = transaction(fun() -> get_last_applied_id(Node, EventId - 1) end), if LastAppliedId + 1 =:= EventId -> case apply_mfa(EventId, MFA) of ok -> case transaction(fun() -> commit(Node, EventId) end) of {atomic, ok} -> - {next_state, ?REALTIME, Node}; - _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + {next_state, ?REALTIME, Data}; + _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} end; - _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} end; LastAppliedId >= EventId -> %% It's means the initiator receive self event or other receive stale event. keep_state_and_data; @@ -301,8 +303,11 @@ do_catch_up_in_one_trans(LatestId, Node) -> {error, Reason} -> mnesia:abort(Reason) end. -transaction(Fun) -> - ekka_mnesia:transaction(?COMMON_SHARD, Fun). +transaction(Func, Args) -> + ekka_mnesia:transaction(?COMMON_SHARD, Func, Args). + +transaction(Func) -> + ekka_mnesia:transaction(?COMMON_SHARD, Func). apply_mfa(TnxId, {M, F, A} = MFA) -> try diff --git a/apps/emqx/src/emqx_cluster_rpc_handler.erl b/apps/emqx/src/emqx_cluster_rpc_handler.erl index a51ef3d74..5f548e7fd 100644 --- a/apps/emqx/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx/src/emqx_cluster_rpc_handler.erl @@ -21,22 +21,27 @@ -include("logger.hrl"). -include("emqx_cluster_rpc.hrl"). --export([start_link/0]). +-export([start_link/0, start_link/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(MFA_HISTORY_LEN, 100). start_link() -> - gen_server:start_link(?MODULE, [], []). + MaxHistory = emqx:get_config([broker, cluster_call, mfa_max_history]), + CleanupMs = emqx:get_config([broker, cluster_call, mfa_cleanup_interval]), + start_link(MaxHistory, CleanupMs). + +start_link(MaxHistory, CleanupMs) -> + State = #{max_history => MaxHistory, cleanup_ms => CleanupMs, timer => undefined}, + gen_server:start_link(?MODULE, [State], []). %%%=================================================================== %%% Spawning and gen_server implementation %%%=================================================================== -init([]) -> - _ = emqx_misc:rand_seed(), - {ok, ensure_timer(#{timer => undefined})}. +init([State]) -> + {ok, ensure_timer(State)}. handle_call(Req, _From, State) -> ?LOG(error, "unexpected call: ~p", [Req]), @@ -46,8 +51,8 @@ handle_cast(Msg, State) -> ?LOG(error, "unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef}) -> - case ekka_mnesia:transaction(?COMMON_SHARD, fun del_stale_mfa/0, []) of +handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) -> + case ekka_mnesia:transaction(?COMMON_SHARD, fun del_stale_mfa/1, [MaxHistory]) of {atomic, ok} -> ok; Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error]) end, @@ -66,23 +71,15 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - --ifdef(TEST). -ensure_timer(State) -> - State#{timer := emqx_misc:start_timer(timer:seconds(1), del_stale_mfa)}. --else. -ensure_timer(State) -> - Ms = timer:minutes(5) + rand:uniform(5000), +ensure_timer(State = #{cleanup_ms := Ms}) -> State#{timer := emqx_misc:start_timer(Ms, del_stale_mfa)}. --endif. - %% @doc Keep the latest completed 100 records for querying and troubleshooting. -del_stale_mfa() -> +del_stale_mfa(MaxHistory) -> DoneId = mnesia:foldl(fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end, infinity, ?CLUSTER_COMMIT), - delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, ?MFA_HISTORY_LEN). + delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory). delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok; delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 3bbeb1d07..cb250f575 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -293,6 +293,7 @@ fields("broker") -> , {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)} , {"route_batch_clean", t(boolean(), undefined, true)} , {"perf", ref("perf")} + , {"cluster_call", ref("cluster_call")} ]; fields("perf") -> @@ -325,6 +326,12 @@ fields("sysmon_os") -> , {"procmem_high_watermark", t(percent(), undefined, "5%")} ]; +fields("cluster_call") -> + [{"retry_interval", t(duration(), undefined, "2s")} + , {"mfa_max_history", t(range(1, 500), undefined, 50)} + , {"mfa_cleanup_interval", t(duration(), undefined, "5m")} + ]; + fields("alarm") -> [ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])} , {"size_limit", t(integer(), undefined, 1000)} diff --git a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl index 6cdb34c6c..8e8f55834 100644 --- a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl @@ -41,6 +41,11 @@ init_per_suite(Config) -> application:load(emqx), ok = ekka:start(), emqx_cluster_rpc:mnesia(copy), + emqx_config:put([broker, cluster_call], #{ + mfa_max_history => 100, + mfa_cleanup_interval => 1000, + retry_interval => 900 + }), %%dbg:tracer(), %%dbg:p(all, c), %%dbg:tpl(emqx_cluster_rpc, cx), @@ -75,7 +80,7 @@ t_base_test(_Config) -> ?assert(maps:is_key(created_at, Query)), ?assertEqual(ok, receive_msg(3, test)), SysStatus = lists:last(lists:last(element(4,sys:get_status(?NODE1)))), - ?assertEqual(#{node => node(), opt => normal, state => realtime}, SysStatus), + ?assertEqual(#{data => #{node => node(),retry_interval => 900}, opt => normal, state => realtime}, SysStatus), sleep(400), {atomic, Status} = emqx_cluster_rpc:status(), ?assertEqual(3, length(Status)), @@ -186,9 +191,9 @@ t_del_stale_mfa(_Config) -> start() -> {ok, Pid1} = emqx_cluster_rpc:start_link(), - {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2), - {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3), - {ok, Pid4} = emqx_cluster_rpc_handler:start_link(), + {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500), + {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500), + {ok, Pid4} = emqx_cluster_rpc_handler:start_link(100, 500), {ok, [Pid1, Pid2, Pid3, Pid4]}. stop() -> From d55ba6b2e83fc2301aaf74dd1520c1565c9fe619 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 20 Aug 2021 13:57:32 +0800 Subject: [PATCH 05/11] chore: transaction without nnonymous function --- apps/emqx/src/emqx_cluster_rpc.erl | 91 +++++++++++------------ apps/emqx/test/emqx_cluster_rpc_SUITE.erl | 3 +- 2 files changed, 44 insertions(+), 50 deletions(-) diff --git a/apps/emqx/src/emqx_cluster_rpc.erl b/apps/emqx/src/emqx_cluster_rpc.erl index 30f9e3234..a4c54d01e 100644 --- a/apps/emqx/src/emqx_cluster_rpc.erl +++ b/apps/emqx/src/emqx_cluster_rpc.erl @@ -92,37 +92,14 @@ multicall(M, F, A, Timeout) -> -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. query(TnxId) -> - transaction(fun do_query/1, [TnxId]). - -do_query(TnxId) -> - case mnesia:read(?CLUSTER_MFA, TnxId) of - [] -> mnesia:abort(not_found); - [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> - #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} - end. + transaction(fun trans_query/1, [TnxId]). -spec reset() -> reset. reset() -> gen_statem:call(?MODULE, reset). -spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}. status() -> - Fun = fun() -> - mnesia:foldl(fun(Rec, Acc) -> - #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, - case mnesia:read(?CLUSTER_MFA, TnxId) of - [MFARec] -> - #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec, - [#{ - node => Node, - tnx_id => TnxId, - initiator => InitNode, - mfa => MFA, - created_at => CreatedAt - } | Acc]; - [] -> Acc - end end, [], ?CLUSTER_COMMIT) - end, - transaction(Fun). + transaction(fun trans_status/0, []). %%%=================================================================== %%% gen_statem callbacks @@ -158,7 +135,7 @@ handle_event({call, From}, reset, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}, ?CATCH_UP_AFTER(0)]}; handle_event({call, From}, {initiate, MFA}, ?REALTIME, Data = #{node := Node}) -> - case transaction(fun() -> init_mfa(Node, MFA) end) of + case transaction(fun init_mfa/2, [Node, MFA]) of {atomic, {ok, TnxId}} -> {keep_state, Data, [{reply, From, {ok, TnxId}}]}; {aborted, Reason} -> @@ -188,12 +165,12 @@ code_change(_OldVsn, StateName, Data, _Extra) -> %%% Internal functions %%%=================================================================== catch_up(#{node := Node, retry_interval := RetryMs} = Data) -> - case get_next_mfa(Node) of + case transaction(fun get_next_mfa/1, [Node]) of {atomic, caught_up} -> {next_state, ?REALTIME, Data}; {atomic, {still_lagging, NextId, MFA}} -> case apply_mfa(NextId, MFA) of ok -> - case transaction(fun() -> commit(Node, NextId) end) of + case transaction(fun commit/2, [Node, NextId]) of {atomic, ok} -> catch_up(Data); _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} end; @@ -203,24 +180,20 @@ catch_up(#{node := Node, retry_interval := RetryMs} = Data) -> end. get_next_mfa(Node) -> - Fun = - fun() -> - NextId = - case mnesia:wread({?CLUSTER_COMMIT, Node}) of - [] -> - LatestId = get_latest_id(), - TnxId = max(LatestId - 1, 0), - commit(Node, TnxId), - ?LOG(notice, "New node(~p) first catch up and start commit at ~p", [Node, TnxId]), - TnxId; - [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 - end, - case mnesia:read(?CLUSTER_MFA, NextId) of - [] -> caught_up; - [#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA} - end + NextId = + case mnesia:wread({?CLUSTER_COMMIT, Node}) of + [] -> + LatestId = get_latest_id(), + TnxId = max(LatestId - 1, 0), + commit(Node, TnxId), + ?LOG(notice, "New node(~p) first catch up and start commit at ~p", [Node, TnxId]), + TnxId; + [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 end, - transaction(Fun). + case mnesia:read(?CLUSTER_MFA, NextId) of + [] -> caught_up; + [#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA} + end. do_catch_up(ToTnxId, Node) -> case mnesia:wread({?CLUSTER_COMMIT, Node}) of @@ -255,11 +228,11 @@ get_latest_id() -> handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Data) -> #{node := Node, retry_interval := RetryMs} = Data, - {atomic, LastAppliedId} = transaction(fun() -> get_last_applied_id(Node, EventId - 1) end), + {atomic, LastAppliedId} = transaction(fun get_last_applied_id/2, [Node, EventId - 1]), if LastAppliedId + 1 =:= EventId -> case apply_mfa(EventId, MFA) of ok -> - case transaction(fun() -> commit(Node, EventId) end) of + case transaction(fun commit/2, [Node, EventId]) of {atomic, ok} -> {next_state, ?REALTIME, Data}; _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} @@ -306,8 +279,28 @@ do_catch_up_in_one_trans(LatestId, Node) -> transaction(Func, Args) -> ekka_mnesia:transaction(?COMMON_SHARD, Func, Args). -transaction(Func) -> - ekka_mnesia:transaction(?COMMON_SHARD, Func). +trans_status() -> + mnesia:foldl(fun(Rec, Acc) -> + #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, + case mnesia:read(?CLUSTER_MFA, TnxId) of + [MFARec] -> + #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec, + [#{ + node => Node, + tnx_id => TnxId, + initiator => InitNode, + mfa => MFA, + created_at => CreatedAt + } | Acc]; + [] -> Acc + end end, [], ?CLUSTER_COMMIT). + +trans_query(TnxId) -> + case mnesia:read(?CLUSTER_MFA, TnxId) of + [] -> mnesia:abort(not_found); + [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> + #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} + end. apply_mfa(TnxId, {M, F, A} = MFA) -> try diff --git a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl index 8e8f55834..5106183bb 100644 --- a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl @@ -136,7 +136,8 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]}, {ok, _} = emqx_cluster_rpc:multicall(M, F, A), {ok, _} = emqx_cluster_rpc:multicall(io, format, ["test"]), - {atomic, [Status]} = emqx_cluster_rpc:status(), + {atomic, [Status|L]} = emqx_cluster_rpc:status(), + ?assertEqual([], L), ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), From 2c1b1fbfa87c5a86c6760bd1aa38b08dcf289db1 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 20 Aug 2021 14:31:47 +0800 Subject: [PATCH 06/11] chore(config): rename cluster_call to hot_config_loader --- apps/emqx/etc/emqx.conf | 2 ++ apps/emqx/src/emqx_cluster_rpc.erl | 2 +- apps/emqx/src/emqx_cluster_rpc_handler.erl | 4 ++-- apps/emqx/src/emqx_schema.erl | 6 +++--- apps/emqx/test/emqx_cluster_rpc_SUITE.erl | 2 +- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index e3c701e28..7c404ff2d 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -222,6 +222,8 @@ listeners.quic.default { ## If not set, the global configs are used for this listener. ## ## See `zones.` for more details. + ## NOTE: This is a cluster-wide configuration. + ## It requires all nodes to be stopped before changing it. ## ## @doc listeners.quic..zone ## ValueType: String diff --git a/apps/emqx/src/emqx_cluster_rpc.erl b/apps/emqx/src/emqx_cluster_rpc.erl index a4c54d01e..5ddfe5742 100644 --- a/apps/emqx/src/emqx_cluster_rpc.erl +++ b/apps/emqx/src/emqx_cluster_rpc.erl @@ -64,7 +64,7 @@ mnesia(copy) -> ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). start_link() -> - RetryMs = emqx:get_config([broker, cluster_call, retry_interval]), + RetryMs = emqx:get_config([broker, hot_config_loader, retry_interval]), start_link(node(), ?MODULE, RetryMs). start_link(Node, Name, RetryMs) -> gen_statem:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). diff --git a/apps/emqx/src/emqx_cluster_rpc_handler.erl b/apps/emqx/src/emqx_cluster_rpc_handler.erl index 5f548e7fd..4da219165 100644 --- a/apps/emqx/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx/src/emqx_cluster_rpc_handler.erl @@ -28,8 +28,8 @@ -define(MFA_HISTORY_LEN, 100). start_link() -> - MaxHistory = emqx:get_config([broker, cluster_call, mfa_max_history]), - CleanupMs = emqx:get_config([broker, cluster_call, mfa_cleanup_interval]), + MaxHistory = emqx:get_config([broker, hot_config_loader, mfa_max_history]), + CleanupMs = emqx:get_config([broker, hot_config_loader, mfa_cleanup_interval]), start_link(MaxHistory, CleanupMs). start_link(MaxHistory, CleanupMs) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index cb250f575..3fb986f1e 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -293,7 +293,7 @@ fields("broker") -> , {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)} , {"route_batch_clean", t(boolean(), undefined, true)} , {"perf", ref("perf")} - , {"cluster_call", ref("cluster_call")} + , {"hot_config_loader", ref("hot_config_loader")} ]; fields("perf") -> @@ -326,8 +326,8 @@ fields("sysmon_os") -> , {"procmem_high_watermark", t(percent(), undefined, "5%")} ]; -fields("cluster_call") -> - [{"retry_interval", t(duration(), undefined, "2s")} +fields("hot_config_loader") -> + [{"retry_interval", t(duration(), undefined, "1s")} , {"mfa_max_history", t(range(1, 500), undefined, 50)} , {"mfa_cleanup_interval", t(duration(), undefined, "5m")} ]; diff --git a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl index 5106183bb..b03137796 100644 --- a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl @@ -41,7 +41,7 @@ init_per_suite(Config) -> application:load(emqx), ok = ekka:start(), emqx_cluster_rpc:mnesia(copy), - emqx_config:put([broker, cluster_call], #{ + emqx_config:put([broker, hot_config_loader], #{ mfa_max_history => 100, mfa_cleanup_interval => 1000, retry_interval => 900 From 60c1c4edba0a6a492e959e861375eb609e1cd4a8 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 24 Aug 2021 10:56:11 +0800 Subject: [PATCH 07/11] feat: move cluster_call to emqx_machine --- apps/emqx/src/emqx_kernel_sup.erl | 2 -- apps/emqx/src/emqx_schema.erl | 7 ------ apps/emqx_machine/etc/emqx_machine.conf | 23 +++++++++++++++++++ .../include/emqx_cluster_rpc.hrl | 0 .../src/emqx_cluster_rpc.erl | 12 +++++++--- .../src/emqx_cluster_rpc_handler.erl | 10 ++++---- apps/emqx_machine/src/emqx_machine_schema.erl | 8 +++++++ apps/emqx_machine/src/emqx_machine_sup.erl | 4 +++- .../test/emqx_cluster_rpc_SUITE.erl | 11 ++++----- 9 files changed, 52 insertions(+), 25 deletions(-) rename apps/{emqx => emqx_machine}/include/emqx_cluster_rpc.hrl (100%) rename apps/{emqx => emqx_machine}/src/emqx_cluster_rpc.erl (97%) rename apps/{emqx => emqx_machine}/src/emqx_cluster_rpc_handler.erl (92%) rename apps/{emqx => emqx_machine}/test/emqx_cluster_rpc_SUITE.erl (96%) diff --git a/apps/emqx/src/emqx_kernel_sup.erl b/apps/emqx/src/emqx_kernel_sup.erl index 39a8d4fba..defe96182 100644 --- a/apps/emqx/src/emqx_kernel_sup.erl +++ b/apps/emqx/src/emqx_kernel_sup.erl @@ -29,8 +29,6 @@ init([]) -> {ok, {{one_for_one, 10, 100}, %% always start emqx_config_handler first to load the emqx.conf to emqx_config [ child_spec(emqx_config_handler, worker) - , child_spec(emqx_cluster_rpc, worker) - , child_spec(emqx_cluster_rpc_handler, worker) , child_spec(emqx_pool_sup, supervisor) , child_spec(emqx_hooks, worker) , child_spec(emqx_stats, worker) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 3fb986f1e..3bbeb1d07 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -293,7 +293,6 @@ fields("broker") -> , {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)} , {"route_batch_clean", t(boolean(), undefined, true)} , {"perf", ref("perf")} - , {"hot_config_loader", ref("hot_config_loader")} ]; fields("perf") -> @@ -326,12 +325,6 @@ fields("sysmon_os") -> , {"procmem_high_watermark", t(percent(), undefined, "5%")} ]; -fields("hot_config_loader") -> - [{"retry_interval", t(duration(), undefined, "1s")} - , {"mfa_max_history", t(range(1, 500), undefined, 50)} - , {"mfa_cleanup_interval", t(duration(), undefined, "5m")} - ]; - fields("alarm") -> [ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])} , {"size_limit", t(integer(), undefined, 1000)} diff --git a/apps/emqx_machine/etc/emqx_machine.conf b/apps/emqx_machine/etc/emqx_machine.conf index d21a19bc2..3ec09f2d4 100644 --- a/apps/emqx_machine/etc/emqx_machine.conf +++ b/apps/emqx_machine/etc/emqx_machine.conf @@ -89,6 +89,29 @@ node { ## Default: 23 backtrace_depth = 23 + cluster_call { + ## Time interval to retry after a failed call + ## + ## @doc node.cluster_call.retry_interval + ## ValueType: Duration + ## Default: 1s + retry_interval = 1s + ## Retain the maximum number of completed transactions (for queries) + ## + ## @doc node.cluster_call.max_history + ## ValueType: Integer + ## Range: [1, 500] + ## Default: 100 + max_history = 100 + ## Time interval to clear completed but stale transactions. + ## Ensure that the number of completed transactions is less than the max_history + ## + ## @doc node.cluster_call.cleanup_interval + ## ValueType: Duration + ## Default: 5m + cleanup_interval = 5m + } + } ##================================================================== diff --git a/apps/emqx/include/emqx_cluster_rpc.hrl b/apps/emqx_machine/include/emqx_cluster_rpc.hrl similarity index 100% rename from apps/emqx/include/emqx_cluster_rpc.hrl rename to apps/emqx_machine/include/emqx_cluster_rpc.hrl diff --git a/apps/emqx/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl similarity index 97% rename from apps/emqx/src/emqx_cluster_rpc.erl rename to apps/emqx_machine/src/emqx_cluster_rpc.erl index 5ddfe5742..b988d55fb 100644 --- a/apps/emqx/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -32,8 +32,8 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). --include("emqx.hrl"). --include("logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). -include("emqx_cluster_rpc.hrl"). -rlog_shard({?COMMON_SHARD, ?CLUSTER_MFA}). @@ -64,11 +64,17 @@ mnesia(copy) -> ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). start_link() -> - RetryMs = emqx:get_config([broker, hot_config_loader, retry_interval]), + RetryMs = application:get_env(emqx_machine, cluster_call_retry_interval, 1000), start_link(node(), ?MODULE, RetryMs). start_link(Node, Name, RetryMs) -> gen_statem:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). +-spec multicall(Module, Function, Args) -> {ok, TnxId} | {error, Reason} when + Module :: module(), + Function :: atom(), + Args :: [term()], + TnxId :: pos_integer(), + Reason :: term(). multicall(M, F, A) -> multicall(M, F, A, timer:minutes(2)). diff --git a/apps/emqx/src/emqx_cluster_rpc_handler.erl b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl similarity index 92% rename from apps/emqx/src/emqx_cluster_rpc_handler.erl rename to apps/emqx_machine/src/emqx_cluster_rpc_handler.erl index 4da219165..803b7f9fc 100644 --- a/apps/emqx/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl @@ -17,19 +17,17 @@ -behaviour(gen_server). --include("emqx.hrl"). --include("logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). -include("emqx_cluster_rpc.hrl"). -export([start_link/0, start_link/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(MFA_HISTORY_LEN, 100). - start_link() -> - MaxHistory = emqx:get_config([broker, hot_config_loader, mfa_max_history]), - CleanupMs = emqx:get_config([broker, hot_config_loader, mfa_cleanup_interval]), + MaxHistory = application:get_env(emqx_machine, cluster_call_max_history, 100), + CleanupMs = application:get_env(emqx_machine, cluster_call_cleanup_interval, 5*60*1000), start_link(MaxHistory, CleanupMs). start_link(MaxHistory, CleanupMs) -> diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_machine/src/emqx_machine_schema.erl index 7dd193e63..2dde8aae3 100644 --- a/apps/emqx_machine/src/emqx_machine_schema.erl +++ b/apps/emqx_machine/src/emqx_machine_schema.erl @@ -139,6 +139,14 @@ fields("node") -> , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)} , {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)} , {"backtrace_depth", t(integer(), "emqx_machine.backtrace_depth", 23)} + , {"cluster_call", ref("cluster_call")} + ]; + + +fields("cluster_call") -> + [ {"retry_interval", t(emqx_schema:duration(), "emqx_machine.retry_interval", "1s")} + , {"max_history", t(range(1, 500), "emqx_machine.max_history", 100)} + , {"cleanup_interval", t(emqx_schema:duration(), "emqx_machine.cleanup_interval", "5m")} ]; fields("rpc") -> diff --git a/apps/emqx_machine/src/emqx_machine_sup.erl b/apps/emqx_machine/src/emqx_machine_sup.erl index 0810eb267..798beee1c 100644 --- a/apps/emqx_machine/src/emqx_machine_sup.erl +++ b/apps/emqx_machine/src/emqx_machine_sup.erl @@ -31,7 +31,9 @@ start_link() -> init([]) -> GlobalGC = child_worker(emqx_global_gc, [], permanent), Terminator = child_worker(emqx_machine_terminator, [], transient), - Children = [GlobalGC, Terminator], + ClusterRpc = child_worker(emqx_cluster_rpc, [], permanent), + ClusterHandler = child_worker(emqx_cluster_rpc_handler, [], permanent), + Children = [GlobalGC, Terminator, ClusterRpc, ClusterHandler], SupFlags = #{strategy => one_for_one, intensity => 100, period => 10 diff --git a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl similarity index 96% rename from apps/emqx/test/emqx_cluster_rpc_SUITE.erl rename to apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl index b03137796..92a89790a 100644 --- a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -39,13 +39,12 @@ groups() -> []. init_per_suite(Config) -> application:load(emqx), + application:load(emqx_machine), ok = ekka:start(), - emqx_cluster_rpc:mnesia(copy), - emqx_config:put([broker, hot_config_loader], #{ - mfa_max_history => 100, - mfa_cleanup_interval => 1000, - retry_interval => 900 - }), + ok = ekka_rlog:wait_for_shards([emqx_common_shard], infinity), + application:set_env(emqx_machine, cluster_call_max_history, 100), + application:set_env(emqx_machine, cluster_call_clean_interval, 1000), + application:set_env(emqx_machine, cluster_call_retry_interval, 900), %%dbg:tracer(), %%dbg:p(all, c), %%dbg:tpl(emqx_cluster_rpc, cx), From 45285086209ab7f0a7da40cd1d596f294ec543e3 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 25 Aug 2021 16:38:01 +0800 Subject: [PATCH 08/11] feat: replace gen_statem by gen_server --- apps/emqx_machine/src/emqx_cluster_rpc.erl | 163 ++++++++---------- .../test/emqx_cluster_rpc_SUITE.erl | 14 +- 2 files changed, 69 insertions(+), 108 deletions(-) diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index b988d55fb..f7dc1eef9 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -14,14 +14,14 @@ %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_cluster_rpc). --behaviour(gen_statem). +-behaviour(gen_server). %% API -export([start_link/0, mnesia/1]). -export([multicall/3, multicall/4, query/1, reset/0, status/0]). --export([init/1, format_status/2, handle_event/4, terminate/3, - code_change/4, callback_mode/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + handle_continue/2, code_change/3]). -ifdef(TEST). -compile(export_all). @@ -40,8 +40,7 @@ -rlog_shard({?COMMON_SHARD, ?CLUSTER_COMMIT}). -define(CATCH_UP, catch_up). --define(REALTIME, realtime). --define(CATCH_UP_AFTER(_Ms_), {state_timeout, _Ms_, catch_up_delay}). +-define(TIMEOUT, timer:minutes(1)). %%%=================================================================== %%% API @@ -49,14 +48,14 @@ mnesia(boot) -> ok = ekka_mnesia:create_table(?CLUSTER_MFA, [ {type, ordered_set}, + {rlog_shard, ?COMMON_SHARD}, {disc_copies, [node()]}, - {local_content, true}, {record_name, cluster_rpc_mfa}, {attributes, record_info(fields, cluster_rpc_mfa)}]), ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [ {type, set}, + {rlog_shard, ?COMMON_SHARD}, {disc_copies, [node()]}, - {local_content, true}, {record_name, cluster_rpc_commit}, {attributes, record_info(fields, cluster_rpc_commit)}]); mnesia(copy) -> @@ -66,15 +65,16 @@ mnesia(copy) -> start_link() -> RetryMs = application:get_env(emqx_machine, cluster_call_retry_interval, 1000), start_link(node(), ?MODULE, RetryMs). + start_link(Node, Name, RetryMs) -> - gen_statem:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). + gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). -spec multicall(Module, Function, Args) -> {ok, TnxId} | {error, Reason} when Module :: module(), Function :: atom(), Args :: [term()], TnxId :: pos_integer(), - Reason :: term(). + Reason :: string(). multicall(M, F, A) -> multicall(M, F, A, timer:minutes(2)). @@ -84,14 +84,17 @@ multicall(M, F, A) -> Args :: [term()], TnxId :: pos_integer(), Timeout :: timeout(), - Reason :: term(). + Reason :: string(). multicall(M, F, A, Timeout) -> MFA = {initiate, {M, F, A}}, case ekka_rlog:role() of - core -> gen_statem:call(?MODULE, MFA, Timeout); + core -> gen_server:call(?MODULE, MFA, Timeout); replicant -> + %% the initiate transaction must happened on core node + %% make sure MFA(in the transaction) and the transaction on the same node + %% don't need rpc again inside transaction. case ekka_rlog_status:upstream_node(?COMMON_SHARD) of - {ok, Node} -> gen_statem:call({?MODULE, Node}, MFA, Timeout); + {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); disconnected -> {error, disconnected} end end. @@ -101,7 +104,7 @@ query(TnxId) -> transaction(fun trans_query/1, [TnxId]). -spec reset() -> reset. -reset() -> gen_statem:call(?MODULE, reset). +reset() -> gen_server:call(?MODULE, reset). -spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}. status() -> @@ -114,75 +117,67 @@ status() -> %% @private init([Node, RetryMs]) -> {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), - {ok, ?CATCH_UP, #{node => Node, retry_interval => RetryMs}, ?CATCH_UP_AFTER(0)}. - -callback_mode() -> - handle_event_function. + {ok, #{node => Node, retry_interval => RetryMs}, {continue, ?CATCH_UP}}. %% @private -format_status(Opt, [_PDict, StateName, Data]) -> - #{state => StateName, data => Data , opt => Opt}. +handle_continue(?CATCH_UP, State) -> + {noreply, State, catch_up(State)}. -%% @private -handle_event(state_timeout, catch_up_delay, _State, Data) -> - catch_up(Data); - -handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{} = MFARec, _AId}}, ?REALTIME, Data) -> - handle_mfa_write_event(MFARec, Data); -handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{}, _ActivityId}}, ?CATCH_UP, _Data) -> - {keep_state_and_data, [postpone, ?CATCH_UP_AFTER(0)]}; -%% we should catch up as soon as possible when we reset all. -handle_event(info, {mnesia_table_event, {delete,{schema, ?CLUSTER_MFA}, _Tid}}, _, _Data) -> - {keep_state_and_data, [?CATCH_UP_AFTER(0)]}; - -handle_event({call, From}, reset, _State, _Data) -> +handle_call(reset, _From, State) -> _ = ekka_mnesia:clear_table(?CLUSTER_COMMIT), _ = ekka_mnesia:clear_table(?CLUSTER_MFA), - {keep_state_and_data, [{reply, From, ok}, ?CATCH_UP_AFTER(0)]}; + {reply, ok, State, {continue, ?CATCH_UP}}; -handle_event({call, From}, {initiate, MFA}, ?REALTIME, Data = #{node := Node}) -> +handle_call({initiate, MFA}, _From, State = #{node := Node}) -> case transaction(fun init_mfa/2, [Node, MFA]) of {atomic, {ok, TnxId}} -> - {keep_state, Data, [{reply, From, {ok, TnxId}}]}; + {reply, {ok, TnxId}, State, {continue, ?CATCH_UP}}; {aborted, Reason} -> - {keep_state, Data, [{reply, From, {error, Reason}}]} - end; -handle_event({call, From}, {initiate, _MFA}, ?CATCH_UP, Data = #{retry_interval := RetryMs}) -> - case catch_up(Data) of - {next_state, ?REALTIME, Data} -> - {next_state, ?REALTIME, Data, [{postpone, true}]}; - _ -> - Reason = "There are still transactions that have not been executed.", - {keep_state_and_data, [{reply, From, {error, Reason}}, ?CATCH_UP_AFTER(RetryMs)]} + {reply, {error, Reason}, State, {continue, ?CATCH_UP}} end; +handle_call(_, _From, State) -> + {reply, ok, State, catch_up(State)}. -handle_event(_EventType, _EventContent, ?CATCH_UP, #{retry_interval := RetryMs}) -> - {keep_state_and_data, [?CATCH_UP_AFTER(RetryMs)]}; -handle_event(_EventType, _EventContent, _StateName, _Data) -> - keep_state_and_data. +handle_cast(_, State) -> + {noreply, State, catch_up(State)}. -terminate(_Reason, _StateName, _Data) -> +handle_info({mnesia_table_event, _}, State) -> + {noreply, State, catch_up(State)}; +handle_info(_, State) -> + {noreply, State, catch_up(State)}. + +terminate(_Reason, _Data) -> ok. -code_change(_OldVsn, StateName, Data, _Extra) -> - {ok, StateName, Data}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== -catch_up(#{node := Node, retry_interval := RetryMs} = Data) -> +catch_up(#{node := Node, retry_interval := RetryMs} = State) -> case transaction(fun get_next_mfa/1, [Node]) of - {atomic, caught_up} -> {next_state, ?REALTIME, Data}; + {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> case apply_mfa(NextId, MFA) of ok -> case transaction(fun commit/2, [Node, NextId]) of - {atomic, ok} -> catch_up(Data); - _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} + {atomic, ok} -> catch_up(State); + Error -> + ?SLOG(error, #{ + msg => "mnesia write transaction failed", + node => Node, + nextId => NextId, + error => Error}), + RetryMs end; - _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} + _Error -> RetryMs end; - {aborted, _Reason} -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} + {aborted, Reason} -> + ?SLOG(error, #{ + msg => "get_next_mfa transaction failed", + node => Node, error => Reason}), + RetryMs end. get_next_mfa(Node) -> @@ -192,7 +187,9 @@ get_next_mfa(Node) -> LatestId = get_latest_id(), TnxId = max(LatestId - 1, 0), commit(Node, TnxId), - ?LOG(notice, "New node(~p) first catch up and start commit at ~p", [Node, TnxId]), + ?SLOG(notice, #{ + msg => "New node first catch up and start commit.", + node => Node, tnx_id => TnxId}), TnxId; [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 end, @@ -216,10 +213,15 @@ do_catch_up(ToTnxId, Node) -> {error, Reason} -> mnesia:abort(Reason); Other -> mnesia:abort(Other) end; - [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> + [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", [Node, LastAppliedId, ToTnxId])), - ?LOG(error, Reason), + ?SLOG(error, #{ + msg => "catch up failed!", + last_applied_id => LastAppliedId, + node => Node, + to_tnx_id => ToTnxId + }), {error, Reason} end. @@ -232,35 +234,6 @@ get_latest_id() -> Id -> Id end. -handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Data) -> - #{node := Node, retry_interval := RetryMs} = Data, - {atomic, LastAppliedId} = transaction(fun get_last_applied_id/2, [Node, EventId - 1]), - if LastAppliedId + 1 =:= EventId -> - case apply_mfa(EventId, MFA) of - ok -> - case transaction(fun commit/2, [Node, EventId]) of - {atomic, ok} -> - {next_state, ?REALTIME, Data}; - _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} - end; - _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} - end; - LastAppliedId >= EventId -> %% It's means the initiator receive self event or other receive stale event. - keep_state_and_data; - true -> - ?LOG(error, "LastAppliedId+1 - case mnesia:wread({?CLUSTER_COMMIT, Node}) of - [#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId; - [] -> - commit(Node, Default), - Default - end. - init_mfa(Node, MFA) -> mnesia:write_lock_table(?CLUSTER_MFA), LatestId = get_latest_id(), @@ -311,12 +284,12 @@ trans_query(TnxId) -> apply_mfa(TnxId, {M, F, A} = MFA) -> try Res = erlang:apply(M, F, A), - case Res =:= ok of - true -> - ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => ok}); - false -> - ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}) - end, + case Res =:= ok of + true -> + ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => ok}); + false -> + ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}) + end, Res catch C : E -> diff --git a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl index 92a89790a..b91131b93 100644 --- a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -78,8 +78,6 @@ t_base_test(_Config) -> ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), ?assertEqual(ok, receive_msg(3, test)), - SysStatus = lists:last(lists:last(element(4,sys:get_status(?NODE1)))), - ?assertEqual(#{data => #{node => node(),retry_interval => 900}, opt => normal, state => realtime}, SysStatus), sleep(400), {atomic, Status} = emqx_cluster_rpc:status(), ?assertEqual(3, length(Status)), @@ -111,12 +109,9 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> {atomic, [Status]} = emqx_cluster_rpc:status(), ?assertEqual(MFA, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), - ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), - ?assertEqual(catch_up, element(1, sys:get_state(?NODE2))), - ?assertEqual(catch_up, element(1, sys:get_state(?NODE3))), erlang:send(?NODE2, test), Res = gen_statem:call(?NODE2, {initiate, {M, F, A}}), - ?assertEqual({error, "There are still transactions that have not been executed."}, Res), + ?assertEqual({error, "MFA return not ok"}, Res), ok. t_catch_up_status_handle_next_commit(_Config) -> @@ -124,7 +119,6 @@ t_catch_up_status_handle_next_commit(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, {ok, _} = emqx_cluster_rpc:multicall(M, F, A), - ?assertEqual(catch_up, element(1, sys:get_state(?NODE2))), {ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}), ok. @@ -139,17 +133,11 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> ?assertEqual([], L), ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), - ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), - ?assertEqual(catch_up, element(1, sys:get_state(?NODE2))), - ?assertEqual(catch_up, element(1, sys:get_state(?NODE3))), sleep(4000), {atomic, [Status1]} = emqx_cluster_rpc:status(), ?assertEqual(Status, Status1), sleep(1600), {atomic, NewStatus} = emqx_cluster_rpc:status(), - ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), - ?assertEqual(realtime, element(1, sys:get_state(?NODE2))), - ?assertEqual(realtime, element(1, sys:get_state(?NODE3))), ?assertEqual(3, length(NewStatus)), Pid = self(), MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, From 73238ed81f3a861aea66eaf1eb454421ac066421 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 26 Aug 2021 16:54:04 +0800 Subject: [PATCH 09/11] feat: emqx_resource support cluster_call --- .../src/simple_authn/emqx_authn_http.erl | 8 ++-- .../src/simple_authn/emqx_authn_mongodb.erl | 4 +- .../src/simple_authn/emqx_authn_mysql.erl | 6 +-- .../src/simple_authn/emqx_authn_pgsql.erl | 6 +-- .../src/simple_authn/emqx_authn_redis.erl | 6 +-- apps/emqx_authz/src/emqx_authz.erl | 4 +- .../src/emqx_data_bridge_api.erl | 4 +- .../src/emqx_data_bridge_monitor.erl | 2 +- apps/emqx_machine/src/emqx_cluster_rpc.erl | 41 +++++++++++-------- .../test/emqx_cluster_rpc_SUITE.erl | 20 ++++----- .../include/emqx_resource_utils.hrl | 28 +------------ apps/emqx_resource/src/emqx_resource.erl | 14 +++++-- .../src/emqx_resource_instance.erl | 2 +- apps/emqx_retainer/src/emqx_retainer.erl | 4 +- apps/emqx_rule_engine/include/rule_engine.hrl | 16 -------- .../emqx_rule_engine/src/emqx_rule_engine.erl | 18 ++++---- .../src/emqx_rule_registry.erl | 9 ++-- apps/emqx_rule_engine/src/emqx_rule_utils.erl | 6 +++ .../test/emqx_rule_engine_SUITE.erl | 4 ++ .../test/emqx_rule_monitor_SUITE.erl | 2 + 20 files changed, 93 insertions(+), 111 deletions(-) diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index 026df2415..43af1d9b4 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -89,7 +89,7 @@ headers(_) -> undefined. headers_no_content_type(type) -> map(); headers_no_content_type(converter) -> fun(Headers) -> - maps:merge(default_headers_no_content_type(), transform_header_name(Headers)) + maps:merge(default_headers_no_content_type(), transform_header_name(Headers)) end; headers_no_content_type(default) -> default_headers_no_content_type(); headers_no_content_type(_) -> undefined. @@ -129,9 +129,9 @@ create(#{ method := Method emqx_connector_http, Config#{base_url => maps:remove(query, URIMap), pool_type => random}) of - {ok, _} -> + {ok, already_created} -> {ok, State}; - {error, already_created} -> + {ok, _} -> {ok, State}; {error, Reason} -> {error, Reason} @@ -296,4 +296,4 @@ parse_body(<<"application/json">>, Body) -> parse_body(<<"application/x-www-form-urlencoded">>, Body) -> {ok, maps:from_list(cow_qs:parse_qs(Body))}; parse_body(ContentType, _) -> - {error, {unsupported_content_type, ContentType}}. \ No newline at end of file + {error, {unsupported_content_type, ContentType}}. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl index 56ced0104..1ce145f35 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl @@ -106,9 +106,9 @@ create(#{ selector := Selector , '_unique'], Config), NState = State#{selector => NSelector}, case emqx_resource:create_local(Unique, emqx_connector_mongo, Config) of - {ok, _} -> + {ok, already_created} -> {ok, NState}; - {error, already_created} -> + {ok, _} -> {ok, NState}; {error, Reason} -> {error, Reason} diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 75a3392ec..59afa9671 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -83,9 +83,9 @@ create(#{ password_hash_algorithm := Algorithm query_timeout => QueryTimeout, '_unique' => Unique}, case emqx_resource:create_local(Unique, emqx_connector_mysql, Config) of - {ok, _} -> + {ok, already_created} -> {ok, State}; - {error, already_created} -> + {ok, _} -> {ok, State}; {error, Reason} -> {error, Reason} @@ -131,7 +131,7 @@ authenticate(#{password := Password} = Credential, destroy(#{'_unique' := Unique}) -> _ = emqx_resource:remove_local(Unique), ok. - + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index 44c7f7185..cce9ebd6f 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -71,9 +71,9 @@ create(#{ query := Query0 salt_position => SaltPosition, '_unique' => Unique}, case emqx_resource:create_local(Unique, emqx_connector_pgsql, Config) of - {ok, _} -> + {ok, already_created} -> {ok, State}; - {error, already_created} -> + {ok, _} -> {ok, State}; {error, Reason} -> {error, Reason} @@ -119,7 +119,7 @@ authenticate(#{password := Password} = Credential, destroy(#{'_unique' := Unique}) -> _ = emqx_resource:remove_local(Unique), ok. - + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl index 0c2696c0e..6eff345ed 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl @@ -89,9 +89,9 @@ create(#{ query := Query , '_unique'], Config), NState = State#{query => NQuery}, case emqx_resource:create_local(Unique, emqx_connector_redis, Config) of - {ok, _} -> + {ok, already_created} -> {ok, NState}; - {error, already_created} -> + {ok, _} -> {ok, NState}; {error, Reason} -> {error, Reason} @@ -176,7 +176,7 @@ check_fields(["superuser" | More], HasPassHash) -> check_fields(More, HasPassHash); check_fields([Field | _], _) -> error({unsupported_field, Field}). - + parse_key(Key) -> Tokens = re:split(Key, "(" ++ ?RE_PLACEHOLDER ++ ")", [{return, binary}, group, trim]), parse_key(Tokens, []). diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index f158322e1..bbe4caa6b 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -216,8 +216,8 @@ create_resource(#{type := DB, Config, []) of + {ok, already_created} -> ResourceID; {ok, _} -> ResourceID; - {error, already_created} -> ResourceID; {error, Reason} -> {error, Reason} end; create_resource(#{type := DB, @@ -228,8 +228,8 @@ create_resource(#{type := DB, list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])), Config) of + {ok, already_created} -> ResourceID; {ok, _} -> ResourceID; - {error, already_created} -> ResourceID; {error, Reason} -> {error, Reason} end. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl index dea3dcae8..6fe75e4ce 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl @@ -77,10 +77,10 @@ create_bridge(#{name := Name}, Params) -> case emqx_resource:check_and_create( emqx_data_bridge:name_to_resource_id(Name), emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of + {ok, already_created} -> + {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; {ok, Data} -> update_config_and_reply(Name, BridgeType, Config, Data); - {error, already_created} -> - {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; {error, Reason0} -> Reason = emqx_resource_api:stringnify(Reason0), {500, #{code => 102, message => <<"create bridge ", Name/binary, diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl index d408a8062..4917833ec 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl @@ -73,8 +73,8 @@ load_bridge(#{name := Name, type := Type, config := Config}) -> case emqx_resource:create_local( emqx_data_bridge:name_to_resource_id(Name), emqx_data_bridge:resource_type(Type), Config) of + {ok, already_created} -> ok; {ok, _} -> ok; - {error, already_created} -> ok; {error, Reason} -> error({load_bridge, Reason}) end. diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index f7dc1eef9..4c7576ff3 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -130,8 +130,8 @@ handle_call(reset, _From, State) -> handle_call({initiate, MFA}, _From, State = #{node := Node}) -> case transaction(fun init_mfa/2, [Node, MFA]) of - {atomic, {ok, TnxId}} -> - {reply, {ok, TnxId}, State, {continue, ?CATCH_UP}}; + {atomic, {ok, TnxId, Result}} -> + {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}}; {aborted, Reason} -> {reply, {error, Reason}, State, {continue, ?CATCH_UP}} end; @@ -159,8 +159,9 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State) -> case transaction(fun get_next_mfa/1, [Node]) of {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> - case apply_mfa(NextId, MFA) of - ok -> + {Succeed, _} = apply_mfa(NextId, MFA), + case Succeed of + true -> case transaction(fun commit/2, [Node, NextId]) of {atomic, ok} -> catch_up(State); Error -> @@ -171,7 +172,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State) -> error => Error}), RetryMs end; - _Error -> RetryMs + false -> RetryMs end; {aborted, Reason} -> ?SLOG(error, #{ @@ -209,9 +210,8 @@ do_catch_up(ToTnxId, Node) -> CurTnxId = LastAppliedId + 1, [#cluster_rpc_mfa{mfa = MFA}] = mnesia:read(?CLUSTER_MFA, CurTnxId), case apply_mfa(CurTnxId, MFA) of - ok -> ok = commit(Node, CurTnxId); - {error, Reason} -> mnesia:abort(Reason); - Other -> mnesia:abort(Other) + {true, _Result} -> ok = commit(Node, CurTnxId); + {false, Error} -> mnesia:abort(Error) end; [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", @@ -243,9 +243,8 @@ init_mfa(Node, MFA) -> ok = mnesia:write(?CLUSTER_MFA, MFARec, write), ok = commit(Node, TnxId), case apply_mfa(TnxId, MFA) of - ok -> {ok, TnxId}; - {error, Reason} -> mnesia:abort(Reason); - Other -> mnesia:abort(Other) + {true, Result} -> {ok, TnxId, Result}; + {false, Error} -> mnesia:abort(Error) end. do_catch_up_in_one_trans(LatestId, Node) -> @@ -284,15 +283,21 @@ trans_query(TnxId) -> apply_mfa(TnxId, {M, F, A} = MFA) -> try Res = erlang:apply(M, F, A), - case Res =:= ok of - true -> - ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => ok}); - false -> - ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}) + Succeed = + case Res of + ok -> + ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), + true; + {ok, _} -> + ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), + true; + _ -> + ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), + false end, - Res + {Succeed, Res} catch C : E -> ?SLOG(critical, #{msg => "crash to apply MFA", tnx_id => TnxId, mfa => MFA, exception => C, reason => E}), - {error, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))} + {false, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))} end. diff --git a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl index b91131b93..26ad28f3e 100644 --- a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -72,7 +72,7 @@ t_base_test(_Config) -> ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), Pid = self(), MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, - {ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A), + {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFA, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), @@ -105,7 +105,7 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, - {ok, _} = emqx_cluster_rpc:multicall(M, F, A), + {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), {atomic, [Status]} = emqx_cluster_rpc:status(), ?assertEqual(MFA, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), @@ -118,7 +118,7 @@ t_catch_up_status_handle_next_commit(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, - {ok, _} = emqx_cluster_rpc:multicall(M, F, A), + {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), {ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}), ok. @@ -127,21 +127,21 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), Now = erlang:system_time(second), {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]}, - {ok, _} = emqx_cluster_rpc:multicall(M, F, A), - {ok, _} = emqx_cluster_rpc:multicall(io, format, ["test"]), + {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), + {ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"]), {atomic, [Status|L]} = emqx_cluster_rpc:status(), ?assertEqual([], L), ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), - sleep(4000), + sleep(3000), {atomic, [Status1]} = emqx_cluster_rpc:status(), ?assertEqual(Status, Status1), - sleep(1600), + sleep(2600), {atomic, NewStatus} = emqx_cluster_rpc:status(), ?assertEqual(3, length(NewStatus)), Pid = self(), MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, - {ok, TnxId} = emqx_cluster_rpc:multicall(M1, F1, A1), + {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M1, F1, A1), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFAEcho, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), @@ -157,12 +157,12 @@ t_del_stale_mfa(_Config) -> Keys2 = lists:seq(51, 150), Ids = [begin - {ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A), + {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), TnxId end || _ <- Keys], ?assertEqual(Keys, Ids), Ids2 = [begin - {ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A), + {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), TnxId end || _ <- Keys2], ?assertEqual(Keys2, Ids2), sleep(1200), diff --git a/apps/emqx_resource/include/emqx_resource_utils.hrl b/apps/emqx_resource/include/emqx_resource_utils.hrl index a20a17e89..4c3a2a749 100644 --- a/apps/emqx_resource/include/emqx_resource_utils.hrl +++ b/apps/emqx_resource/include/emqx_resource_utils.hrl @@ -13,32 +13,6 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). - --define(CLUSTER_CALL(Func, Args, ResParttern), -%% ekka_mnesia:running_nodes() - fun() -> - case LocalResult = erlang:apply(?MODULE, Func, Args) of - ResParttern -> - case rpc:multicall(nodes(), ?MODULE, Func, Args, 5000) of - {ResL, []} -> - Filter = fun - (ResParttern) -> false; - ({badrpc, {'EXIT', {undef, [{?MODULE, Func0, _, []}]}}}) - when Func0 =:= Func -> false; - (_) -> true - end, - case lists:filter(Filter, ResL) of - [] -> LocalResult; - ErrL -> {error, ErrL} - end; - {ResL, BadNodes} -> - {error, {failed_on_nodes, BadNodes, ResL}} - end; - ErrorResult -> - {error, ErrorResult} - end - end()). -define(SAFE_CALL(_EXP_), ?SAFE_CALL(_EXP_, _ = do_nothing)). @@ -50,4 +24,4 @@ _EXP_ON_FAIL_, {error, {_EXCLASS_, _EXCPTION_, _ST_}} end - end()). \ No newline at end of file + end()). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 1fce5e122..4bc1d20f0 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -157,7 +157,7 @@ query_failed({_, {OnFailed, Args}}) -> -spec create(instance_id(), resource_type(), resource_config()) -> {ok, resource_data()} | {error, Reason :: term()}. create(InstId, ResourceType, Config) -> - ?CLUSTER_CALL(create_local, [InstId, ResourceType, Config], {ok, _}). + cluster_call(create_local, [InstId, ResourceType, Config]). -spec create_local(instance_id(), resource_type(), resource_config()) -> {ok, resource_data()} | {error, Reason :: term()}. @@ -167,7 +167,7 @@ create_local(InstId, ResourceType, Config) -> -spec create_dry_run(instance_id(), resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run(InstId, ResourceType, Config) -> - ?CLUSTER_CALL(create_dry_run_local, [InstId, ResourceType, Config]). + cluster_call(create_dry_run_local, [InstId, ResourceType, Config]). -spec create_dry_run_local(instance_id(), resource_type(), resource_config()) -> ok | {error, Reason :: term()}. @@ -177,7 +177,7 @@ create_dry_run_local(InstId, ResourceType, Config) -> -spec update(instance_id(), resource_type(), resource_config(), term()) -> {ok, resource_data()} | {error, Reason :: term()}. update(InstId, ResourceType, Config, Params) -> - ?CLUSTER_CALL(update_local, [InstId, ResourceType, Config, Params], {ok, _}). + cluster_call(update_local, [InstId, ResourceType, Config, Params]). -spec update_local(instance_id(), resource_type(), resource_config(), term()) -> {ok, resource_data()} | {error, Reason :: term()}. @@ -186,7 +186,7 @@ update_local(InstId, ResourceType, Config, Params) -> -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> - ?CLUSTER_CALL(remove_local, [InstId]). + cluster_call(remove_local, [InstId]). -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}. remove_local(InstId) -> @@ -335,3 +335,9 @@ safe_apply(Func, Args) -> str(S) when is_binary(S) -> binary_to_list(S); str(S) when is_list(S) -> S. + +cluster_call(Func, Args) -> + case emqx_cluster_rpc:multicall(?MODULE, Func, Args) of + {ok, _TxnId, Result} -> Result; + Failed -> Failed + end. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 8e0624c75..84b5a1f7c 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -162,7 +162,7 @@ do_update(InstId, ResourceType, NewConfig, Params) -> do_create(InstId, ResourceType, Config) -> case lookup(InstId) of - {ok, _} -> {error, already_created}; + {ok, _} -> {ok, already_created}; _ -> case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 7a37abbee..3fab5958d 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -443,9 +443,9 @@ create_resource(Context, #{type := DB} = Config) -> ResourceID, list_to_existing_atom(io_lib:format("~s_~s", [emqx_connector, DB])), Config) of - {ok, _} -> + {ok, already_created} -> Context#{resource_id => ResourceID}; - {error, already_created} -> + {ok, _} -> Context#{resource_id => ResourceID}; {error, Reason} -> error({load_config_error, Reason}) diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 568724263..760495f6b 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -155,22 +155,6 @@ end end()). --define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). - --define(CLUSTER_CALL(Func, Args, ResParttern), - fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 5000) of - {ResL, []} -> - case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of - [] -> ResL; - ErrL -> - ?LOG(error, "cluster_call error found, ResL: ~p", [ResL]), - throw({Func, ErrL}) - end; - {ResL, BadNodes} -> - ?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]), - throw({Func, {failed_on_nodes, BadNodes}}) - end end()). - %% Tables -define(RULE_TAB, emqx_rule). -define(ACTION_TAB, emqx_rule_action). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index c2ccf2c29..0b3ffb603 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -216,7 +216,7 @@ delete_rule(RuleId) -> case emqx_rule_registry:get_rule(RuleId) of {ok, Rule = #rule{actions = Actions}} -> try - _ = ?CLUSTER_CALL(clear_rule, [Rule]), + _ = emqx_rule_utils:cluster_call(?MODULE, clear_rule, [Rule]), ok = emqx_rule_registry:remove_rule(Rule) catch Error:Reason:ST -> @@ -242,7 +242,7 @@ create_resource(#{type := Type, config := Config0} = Params) -> ok = emqx_rule_registry:add_resource(Resource), %% Note that we will return OK in case of resource creation failure, %% A timer is started to re-start the resource later. - catch _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]), + catch _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [M, F, ResId, Config]), {ok, Resource}; not_found -> {error, {resource_type_not_found, Type}} @@ -280,7 +280,7 @@ do_check_and_update_resource(#{id := Id, type := Type, description := NewDescrip Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec), case test_resource(#{type => Type, config => NewConfig}) of ok -> - _ = ?CLUSTER_CALL(init_resource, [Module, Create, Id, Config]), + _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [Module, Create, Id, Config]), emqx_rule_registry:add_resource(#resource{ id = Id, type = Type, @@ -319,8 +319,8 @@ test_resource(#{type := Type, config := Config0}) -> Config = emqx_rule_validator:validate_params(Config0, ParamSpec), ResId = resource_id(), try - _ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]), - _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), + _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [ModC, Create, ResId, Config]), + _ = emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]), ok catch throw:Reason -> {error, Reason} @@ -359,7 +359,7 @@ delete_resource(ResId) -> try case emqx_rule_registry:remove_resource(ResId) of ok -> - _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), + _ = emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]), ok; {error, _} = R -> R end @@ -426,7 +426,7 @@ prepare_action(#{name := Name, args := Args0} = Action, NeedInit) -> ActionInstId = maps:get(id, Action, action_instance_id(Name)), case NeedInit of true -> - _ = ?CLUSTER_CALL(init_action, [Mod, Create, ActionInstId, + _ = emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, ActionInstId, with_resource_params(Args)]), ok; false -> ok @@ -485,7 +485,7 @@ may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) -> may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) -> %% prepare new actions before removing old ones NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)), - _ = ?CLUSTER_CALL(clear_actions, [OldActions]), + _ = emqx_rule_utils:cluster_call(?MODULE, clear_actions, [OldActions]), may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params)); may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params Rule. @@ -631,7 +631,7 @@ refresh_actions(Actions, Pred) -> true -> {ok, #action{module = Mod, on_create = Create}} = emqx_rule_registry:find_action(ActName), - _ = ?CLUSTER_CALL(init_action, [Mod, Create, Id, with_resource_params(Args)]), + _ = emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, Id, with_resource_params(Args)]), refresh_actions(Fallbacks, Pred); false -> ok end diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index c0bd5de7b..096534585 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -221,7 +221,7 @@ remove_rules(Rules) -> %% @private insert_rule(Rule) -> - _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]), + _ = emqx_rule_utils:cluster_call(?MODULE, load_hooks_for_rule, [Rule]), mnesia:write(?RULE_TAB, Rule, write). %% @private @@ -231,7 +231,7 @@ delete_rule(RuleId) when is_binary(RuleId) -> not_found -> ok end; delete_rule(Rule) -> - _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]), + _ = emqx_rule_utils:cluster_call(?MODULE, unload_hooks_for_rule, [Rule]), mnesia:delete_object(?RULE_TAB, Rule, write). load_hooks_for_rule(#rule{for = Topics}) -> @@ -476,10 +476,11 @@ code_change(_OldVsn, State, _Extra) -> get_all_records(Tab) -> %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). - %% Wrapping ets to a r/o transaction to avoid reading inconsistent + %% Wrapping ets to a transaction to avoid reading inconsistent + %% ( nest cluster_call transaction, no a r/o transaction) %% data during shard bootstrap {atomic, Ret} = - ekka_mnesia:ro_transaction(?RULE_ENGINE_SHARD, + ekka_mnesia:transaction(?RULE_ENGINE_SHARD, fun() -> ets:tab2list(Tab) end), diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index 3791b1386..2d978ee0d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -55,6 +55,8 @@ , can_topic_match_oneof/2 ]). +-export([cluster_call/3]). + -compile({no_auto_import, [ float/1 ]}). @@ -356,3 +358,7 @@ can_topic_match_oneof(Topic, Filters) -> lists:any(fun(Fltr) -> emqx_topic:match(Topic, Fltr) end, Filters). + +cluster_call(Module, Func, Args) -> + {ok, _TnxId, Result} = emqx_cluster_rpc:multicall(Module, Func, Args), + Result. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index bf4dcb30e..47eb4faad 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -148,6 +148,7 @@ groups() -> %%------------------------------------------------------------------------------ init_per_suite(Config) -> + application:load(emqx_machine), ok = ekka_mnesia:start(), ok = emqx_rule_registry:mnesia(boot), ok = emqx_ct_helpers:start_apps([emqx_rule_engine], fun set_special_configs/1), @@ -181,6 +182,7 @@ end_per_group(_Groupname, _Config) -> %%------------------------------------------------------------------------------ init_per_testcase(t_events, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), ok = emqx_rule_engine:load_providers(), init_events_counters(), ok = emqx_rule_registry:register_resource_types([make_simple_resource_type(simple_resource_type)]), @@ -214,6 +216,7 @@ init_per_testcase(Test, Config) ;Test =:= t_sqlselect_multi_actoins_3_1 ;Test =:= t_sqlselect_multi_actoins_4 -> + emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), ok = emqx_rule_engine:load_providers(), ok = emqx_rule_registry:add_action( #action{name = 'crash_action', app = ?APP, @@ -252,6 +255,7 @@ init_per_testcase(Test, Config) {connsql, SQL} | Config]; init_per_testcase(_TestCase, Config) -> + emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), ok = emqx_rule_registry:register_resource_types( [#resource_type{ name = built_in, diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index 67c59e26c..62f538f43 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -39,6 +39,7 @@ groups() -> ]. init_per_suite(Config) -> + application:load(emqx_machine), ok = ekka_mnesia:start(), ok = emqx_rule_registry:mnesia(boot), Config. @@ -65,6 +66,7 @@ end_per_testcase(_, Config) -> t_restart_resource(_) -> {ok, _} = emqx_rule_monitor:start_link(), + emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc,1000), ok = emqx_rule_registry:register_resource_types( [#resource_type{ name = test_res_1, From c1c24af002a7e4b51ff42c5f335a42c3693e22dd Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 26 Aug 2021 18:12:21 +0800 Subject: [PATCH 10/11] fix: dialyzer warning --- apps/emqx_authz/src/emqx_authz.erl | 1 - apps/emqx_machine/src/emqx_cluster_rpc.erl | 4 ++-- apps/emqx_resource/src/emqx_resource.erl | 6 +++--- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index bbe4caa6b..e082b9995 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -216,7 +216,6 @@ create_resource(#{type := DB, Config, []) of - {ok, already_created} -> ResourceID; {ok, _} -> ResourceID; {error, Reason} -> {error, Reason} end; diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index 4c7576ff3..c2b2d086d 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -69,7 +69,7 @@ start_link() -> start_link(Node, Name, RetryMs) -> gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). --spec multicall(Module, Function, Args) -> {ok, TnxId} | {error, Reason} when +-spec multicall(Module, Function, Args) -> {ok, TnxId, term()} | {error, Reason} when Module :: module(), Function :: atom(), Args :: [term()], @@ -78,7 +78,7 @@ start_link(Node, Name, RetryMs) -> multicall(M, F, A) -> multicall(M, F, A, timer:minutes(2)). --spec multicall(Module, Function, Args, Timeout) -> {ok, TnxId} |{error, Reason} when +-spec multicall(Module, Function, Args, Timeout) -> {ok, TnxId, term()} |{error, Reason} when Module :: module(), Function :: atom(), Args :: [term()], diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 4bc1d20f0..cad32bcb2 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -155,12 +155,12 @@ query_failed({_, {OnFailed, Args}}) -> %% APIs for resource instances %% ================================================================================= -spec create(instance_id(), resource_type(), resource_config()) -> - {ok, resource_data()} | {error, Reason :: term()}. + {ok, resource_data() |'already_created'} | {error, Reason :: term()}. create(InstId, ResourceType, Config) -> cluster_call(create_local, [InstId, ResourceType, Config]). -spec create_local(instance_id(), resource_type(), resource_config()) -> - {ok, resource_data()} | {error, Reason :: term()}. + {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create_local(InstId, ResourceType, Config) -> call_instance(InstId, {create, InstId, ResourceType, Config}). @@ -285,7 +285,7 @@ check_config(ResourceType, RawConfigTerm) -> end. -spec check_and_create(instance_id(), resource_type(), raw_resource_config()) -> - {ok, resource_data()} | {error, term()}. + {ok, resource_data() |'already_created'} | {error, term()}. check_and_create(InstId, ResourceType, RawConfig) -> check_and_do(ResourceType, RawConfig, fun(InstConf) -> create(InstId, ResourceType, InstConf) end). From e35a6c73504c096c1ed08797e16543664151d3c5 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 27 Aug 2021 11:15:46 +0800 Subject: [PATCH 11/11] chore: cluster_call early aborted --- apps/emqx_machine/src/emqx_cluster_rpc.erl | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index c2b2d086d..346ca5025 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -156,7 +156,7 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== catch_up(#{node := Node, retry_interval := RetryMs} = State) -> - case transaction(fun get_next_mfa/1, [Node]) of + case transaction(fun read_next_mfa/1, [Node]) of {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> {Succeed, _} = apply_mfa(NextId, MFA), @@ -166,22 +166,19 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State) -> {atomic, ok} -> catch_up(State); Error -> ?SLOG(error, #{ - msg => "mnesia write transaction failed", - node => Node, - nextId => NextId, + msg => "failed to commit applied call", + applied_id => NextId, error => Error}), RetryMs end; false -> RetryMs end; {aborted, Reason} -> - ?SLOG(error, #{ - msg => "get_next_mfa transaction failed", - node => Node, error => Reason}), + ?SLOG(error, #{msg => "read_next_mfa transaction failed", error => Reason}), RetryMs end. -get_next_mfa(Node) -> +read_next_mfa(Node) -> NextId = case mnesia:wread({?CLUSTER_COMMIT, Node}) of [] -> @@ -219,10 +216,9 @@ do_catch_up(ToTnxId, Node) -> ?SLOG(error, #{ msg => "catch up failed!", last_applied_id => LastAppliedId, - node => Node, to_tnx_id => ToTnxId }), - {error, Reason} + mnesia:abort(Reason) end. commit(Node, TnxId) -> @@ -250,8 +246,7 @@ init_mfa(Node, MFA) -> do_catch_up_in_one_trans(LatestId, Node) -> case do_catch_up(LatestId, Node) of caught_up -> ok; - ok -> do_catch_up_in_one_trans(LatestId, Node); - {error, Reason} -> mnesia:abort(Reason) + ok -> do_catch_up_in_one_trans(LatestId, Node) end. transaction(Func, Args) ->