diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index c85e7aa29..e3c701e28 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -490,6 +490,7 @@ listeners.wss.default { ## Websocket options ## See ${example_common_websocket_options} for more information websocket.idle_timeout = 86400s + } ## Enable per connection statistics. @@ -1071,7 +1072,7 @@ broker { ## are mostly published to topics with large number of levels. ## ## NOTE: This is a cluster-wide configuration. - ## It rquires all nodes to be stopped before changing it. + ## It requires all nodes to be stopped before changing it. ## ## @doc broker.perf.trie_compaction ## ValueType: Boolean diff --git a/apps/emqx/src/emqx_cluster_rpc.erl b/apps/emqx/src/emqx_cluster_rpc.erl index 301603c63..30f9e3234 100644 --- a/apps/emqx/src/emqx_cluster_rpc.erl +++ b/apps/emqx/src/emqx_cluster_rpc.erl @@ -26,7 +26,7 @@ -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). --export([start_link/2]). +-export([start_link/3]). -endif. -boot_mnesia({mnesia, [boot]}). @@ -41,7 +41,7 @@ -define(CATCH_UP, catch_up). -define(REALTIME, realtime). --define(CATCH_UP_AFTER(_Sec_), {state_timeout, timer:seconds(_Sec_), catch_up_delay}). +-define(CATCH_UP_AFTER(_Ms_), {state_timeout, _Ms_, catch_up_delay}). %%%=================================================================== %%% API @@ -64,9 +64,10 @@ mnesia(copy) -> ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). start_link() -> - start_link(node(), ?MODULE). -start_link(Node, Name) -> - gen_statem:start_link({local, Name}, ?MODULE, [Node], []). + RetryMs = emqx:get_config([broker, cluster_call, retry_interval]), + start_link(node(), ?MODULE, RetryMs). +start_link(Node, Name, RetryMs) -> + gen_statem:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). multicall(M, F, A) -> multicall(M, F, A, timer:minutes(2)). @@ -91,14 +92,14 @@ multicall(M, F, A, Timeout) -> -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. query(TnxId) -> - Fun = fun() -> - case mnesia:read(?CLUSTER_MFA, TnxId) of - [] -> mnesia:abort(not_found); - [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> - #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} - end - end, - transaction(Fun). + transaction(fun do_query/1, [TnxId]). + +do_query(TnxId) -> + case mnesia:read(?CLUSTER_MFA, TnxId) of + [] -> mnesia:abort(not_found); + [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> + #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} + end. -spec reset() -> reset. reset() -> gen_statem:call(?MODULE, reset). @@ -128,77 +129,77 @@ status() -> %%%=================================================================== %% @private -init([Node]) -> +init([Node, RetryMs]) -> {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), - {ok, ?CATCH_UP, Node, ?CATCH_UP_AFTER(0)}. + {ok, ?CATCH_UP, #{node => Node, retry_interval => RetryMs}, ?CATCH_UP_AFTER(0)}. callback_mode() -> handle_event_function. %% @private -format_status(Opt, [_PDict, StateName, Node]) -> - #{state => StateName, node => Node, opt => Opt}. +format_status(Opt, [_PDict, StateName, Data]) -> + #{state => StateName, data => Data , opt => Opt}. %% @private -handle_event(state_timeout, catch_up_delay, _State, Node) -> - catch_up(Node); +handle_event(state_timeout, catch_up_delay, _State, Data) -> + catch_up(Data); -handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{} = MFARec, _AId}}, ?REALTIME, Node) -> - handle_mfa_write_event(MFARec, Node); -handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{}, _ActivityId}}, ?CATCH_UP, _Node) -> +handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{} = MFARec, _AId}}, ?REALTIME, Data) -> + handle_mfa_write_event(MFARec, Data); +handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{}, _ActivityId}}, ?CATCH_UP, _Data) -> {keep_state_and_data, [postpone, ?CATCH_UP_AFTER(0)]}; %% we should catch up as soon as possible when we reset all. -handle_event(info, {mnesia_table_event, {delete,{schema, ?CLUSTER_MFA}, _Tid}}, _, _Node) -> +handle_event(info, {mnesia_table_event, {delete,{schema, ?CLUSTER_MFA}, _Tid}}, _, _Data) -> {keep_state_and_data, [?CATCH_UP_AFTER(0)]}; -handle_event({call, From}, reset, _State, _Node) -> +handle_event({call, From}, reset, _State, _Data) -> _ = ekka_mnesia:clear_table(?CLUSTER_COMMIT), _ = ekka_mnesia:clear_table(?CLUSTER_MFA), {keep_state_and_data, [{reply, From, ok}, ?CATCH_UP_AFTER(0)]}; -handle_event({call, From}, {initiate, MFA}, ?REALTIME, Node) -> +handle_event({call, From}, {initiate, MFA}, ?REALTIME, Data = #{node := Node}) -> case transaction(fun() -> init_mfa(Node, MFA) end) of {atomic, {ok, TnxId}} -> - {keep_state, Node, [{reply, From, {ok, TnxId}}]}; + {keep_state, Data, [{reply, From, {ok, TnxId}}]}; {aborted, Reason} -> - {keep_state, Node, [{reply, From, {error, Reason}}]} + {keep_state, Data, [{reply, From, {error, Reason}}]} end; -handle_event({call, From}, {initiate, _MFA}, ?CATCH_UP, Node) -> - case catch_up(Node) of - {next_state, ?REALTIME, Node} -> - {next_state, ?REALTIME, Node, [{postpone, true}]}; +handle_event({call, From}, {initiate, _MFA}, ?CATCH_UP, Data = #{retry_interval := RetryMs}) -> + case catch_up(Data) of + {next_state, ?REALTIME, Data} -> + {next_state, ?REALTIME, Data, [{postpone, true}]}; _ -> Reason = "There are still transactions that have not been executed.", - {keep_state, Node, [{reply, From, {error, Reason}}, ?CATCH_UP_AFTER(1)]} + {keep_state_and_data, [{reply, From, {error, Reason}}, ?CATCH_UP_AFTER(RetryMs)]} end; -handle_event(_EventType, _EventContent, ?CATCH_UP, _Node) -> - {keep_state_and_data, [?CATCH_UP_AFTER(10)]}; -handle_event(_EventType, _EventContent, _StateName, _Node) -> +handle_event(_EventType, _EventContent, ?CATCH_UP, #{retry_interval := RetryMs}) -> + {keep_state_and_data, [?CATCH_UP_AFTER(RetryMs)]}; +handle_event(_EventType, _EventContent, _StateName, _Data) -> keep_state_and_data. -terminate(_Reason, _StateName, _Node) -> +terminate(_Reason, _StateName, _Data) -> ok. -code_change(_OldVsn, StateName, Node, _Extra) -> - {ok, StateName, Node}. +code_change(_OldVsn, StateName, Data, _Extra) -> + {ok, StateName, Data}. %%%=================================================================== %%% Internal functions %%%=================================================================== -catch_up(Node) -> +catch_up(#{node := Node, retry_interval := RetryMs} = Data) -> case get_next_mfa(Node) of - {atomic, caught_up} -> {next_state, ?REALTIME, Node}; + {atomic, caught_up} -> {next_state, ?REALTIME, Data}; {atomic, {still_lagging, NextId, MFA}} -> case apply_mfa(NextId, MFA) of ok -> case transaction(fun() -> commit(Node, NextId) end) of - {atomic, ok} -> catch_up(Node); - _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + {atomic, ok} -> catch_up(Data); + _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} end; - _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} end; - {aborted, _Reason} -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + {aborted, _Reason} -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} end. get_next_mfa(Node) -> @@ -252,17 +253,18 @@ get_latest_id() -> Id -> Id end. -handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Node) -> +handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Data) -> + #{node := Node, retry_interval := RetryMs} = Data, {atomic, LastAppliedId} = transaction(fun() -> get_last_applied_id(Node, EventId - 1) end), if LastAppliedId + 1 =:= EventId -> case apply_mfa(EventId, MFA) of ok -> case transaction(fun() -> commit(Node, EventId) end) of {atomic, ok} -> - {next_state, ?REALTIME, Node}; - _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + {next_state, ?REALTIME, Data}; + _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} end; - _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} end; LastAppliedId >= EventId -> %% It's means the initiator receive self event or other receive stale event. keep_state_and_data; @@ -301,8 +303,11 @@ do_catch_up_in_one_trans(LatestId, Node) -> {error, Reason} -> mnesia:abort(Reason) end. -transaction(Fun) -> - ekka_mnesia:transaction(?COMMON_SHARD, Fun). +transaction(Func, Args) -> + ekka_mnesia:transaction(?COMMON_SHARD, Func, Args). + +transaction(Func) -> + ekka_mnesia:transaction(?COMMON_SHARD, Func). apply_mfa(TnxId, {M, F, A} = MFA) -> try diff --git a/apps/emqx/src/emqx_cluster_rpc_handler.erl b/apps/emqx/src/emqx_cluster_rpc_handler.erl index a51ef3d74..5f548e7fd 100644 --- a/apps/emqx/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx/src/emqx_cluster_rpc_handler.erl @@ -21,22 +21,27 @@ -include("logger.hrl"). -include("emqx_cluster_rpc.hrl"). --export([start_link/0]). +-export([start_link/0, start_link/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(MFA_HISTORY_LEN, 100). start_link() -> - gen_server:start_link(?MODULE, [], []). + MaxHistory = emqx:get_config([broker, cluster_call, mfa_max_history]), + CleanupMs = emqx:get_config([broker, cluster_call, mfa_cleanup_interval]), + start_link(MaxHistory, CleanupMs). + +start_link(MaxHistory, CleanupMs) -> + State = #{max_history => MaxHistory, cleanup_ms => CleanupMs, timer => undefined}, + gen_server:start_link(?MODULE, [State], []). %%%=================================================================== %%% Spawning and gen_server implementation %%%=================================================================== -init([]) -> - _ = emqx_misc:rand_seed(), - {ok, ensure_timer(#{timer => undefined})}. +init([State]) -> + {ok, ensure_timer(State)}. handle_call(Req, _From, State) -> ?LOG(error, "unexpected call: ~p", [Req]), @@ -46,8 +51,8 @@ handle_cast(Msg, State) -> ?LOG(error, "unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef}) -> - case ekka_mnesia:transaction(?COMMON_SHARD, fun del_stale_mfa/0, []) of +handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) -> + case ekka_mnesia:transaction(?COMMON_SHARD, fun del_stale_mfa/1, [MaxHistory]) of {atomic, ok} -> ok; Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error]) end, @@ -66,23 +71,15 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - --ifdef(TEST). -ensure_timer(State) -> - State#{timer := emqx_misc:start_timer(timer:seconds(1), del_stale_mfa)}. --else. -ensure_timer(State) -> - Ms = timer:minutes(5) + rand:uniform(5000), +ensure_timer(State = #{cleanup_ms := Ms}) -> State#{timer := emqx_misc:start_timer(Ms, del_stale_mfa)}. --endif. - %% @doc Keep the latest completed 100 records for querying and troubleshooting. -del_stale_mfa() -> +del_stale_mfa(MaxHistory) -> DoneId = mnesia:foldl(fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end, infinity, ?CLUSTER_COMMIT), - delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, ?MFA_HISTORY_LEN). + delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory). delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok; delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 3bbeb1d07..cb250f575 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -293,6 +293,7 @@ fields("broker") -> , {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)} , {"route_batch_clean", t(boolean(), undefined, true)} , {"perf", ref("perf")} + , {"cluster_call", ref("cluster_call")} ]; fields("perf") -> @@ -325,6 +326,12 @@ fields("sysmon_os") -> , {"procmem_high_watermark", t(percent(), undefined, "5%")} ]; +fields("cluster_call") -> + [{"retry_interval", t(duration(), undefined, "2s")} + , {"mfa_max_history", t(range(1, 500), undefined, 50)} + , {"mfa_cleanup_interval", t(duration(), undefined, "5m")} + ]; + fields("alarm") -> [ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])} , {"size_limit", t(integer(), undefined, 1000)} diff --git a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl index 6cdb34c6c..8e8f55834 100644 --- a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl @@ -41,6 +41,11 @@ init_per_suite(Config) -> application:load(emqx), ok = ekka:start(), emqx_cluster_rpc:mnesia(copy), + emqx_config:put([broker, cluster_call], #{ + mfa_max_history => 100, + mfa_cleanup_interval => 1000, + retry_interval => 900 + }), %%dbg:tracer(), %%dbg:p(all, c), %%dbg:tpl(emqx_cluster_rpc, cx), @@ -75,7 +80,7 @@ t_base_test(_Config) -> ?assert(maps:is_key(created_at, Query)), ?assertEqual(ok, receive_msg(3, test)), SysStatus = lists:last(lists:last(element(4,sys:get_status(?NODE1)))), - ?assertEqual(#{node => node(), opt => normal, state => realtime}, SysStatus), + ?assertEqual(#{data => #{node => node(),retry_interval => 900}, opt => normal, state => realtime}, SysStatus), sleep(400), {atomic, Status} = emqx_cluster_rpc:status(), ?assertEqual(3, length(Status)), @@ -186,9 +191,9 @@ t_del_stale_mfa(_Config) -> start() -> {ok, Pid1} = emqx_cluster_rpc:start_link(), - {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2), - {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3), - {ok, Pid4} = emqx_cluster_rpc_handler:start_link(), + {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500), + {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500), + {ok, Pid4} = emqx_cluster_rpc_handler:start_link(100, 500), {ok, [Pid1, Pid2, Pid3, Pid4]}. stop() ->