feat: add cluster_call.retry_interval/mfa_max_history/mfa_cleanup_interval config

This commit is contained in:
zhongwencool 2021-08-20 11:46:58 +08:00
parent e5129ead6d
commit 765c94152b
5 changed files with 88 additions and 73 deletions

View File

@ -490,6 +490,7 @@ listeners.wss.default {
## Websocket options ## Websocket options
## See ${example_common_websocket_options} for more information ## See ${example_common_websocket_options} for more information
websocket.idle_timeout = 86400s websocket.idle_timeout = 86400s
} }
## Enable per connection statistics. ## Enable per connection statistics.
@ -1071,7 +1072,7 @@ broker {
## are mostly published to topics with large number of levels. ## are mostly published to topics with large number of levels.
## ##
## NOTE: This is a cluster-wide configuration. ## NOTE: This is a cluster-wide configuration.
## It rquires all nodes to be stopped before changing it. ## It requires all nodes to be stopped before changing it.
## ##
## @doc broker.perf.trie_compaction ## @doc broker.perf.trie_compaction
## ValueType: Boolean ## ValueType: Boolean

View File

@ -26,7 +26,7 @@
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-export([start_link/2]). -export([start_link/3]).
-endif. -endif.
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
@ -41,7 +41,7 @@
-define(CATCH_UP, catch_up). -define(CATCH_UP, catch_up).
-define(REALTIME, realtime). -define(REALTIME, realtime).
-define(CATCH_UP_AFTER(_Sec_), {state_timeout, timer:seconds(_Sec_), catch_up_delay}). -define(CATCH_UP_AFTER(_Ms_), {state_timeout, _Ms_, catch_up_delay}).
%%%=================================================================== %%%===================================================================
%%% API %%% API
@ -64,9 +64,10 @@ mnesia(copy) ->
ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies).
start_link() -> start_link() ->
start_link(node(), ?MODULE). RetryMs = emqx:get_config([broker, cluster_call, retry_interval]),
start_link(Node, Name) -> start_link(node(), ?MODULE, RetryMs).
gen_statem:start_link({local, Name}, ?MODULE, [Node], []). start_link(Node, Name, RetryMs) ->
gen_statem:start_link({local, Name}, ?MODULE, [Node, RetryMs], []).
multicall(M, F, A) -> multicall(M, F, A) ->
multicall(M, F, A, timer:minutes(2)). multicall(M, F, A, timer:minutes(2)).
@ -91,14 +92,14 @@ multicall(M, F, A, Timeout) ->
-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
query(TnxId) -> query(TnxId) ->
Fun = fun() -> transaction(fun do_query/1, [TnxId]).
case mnesia:read(?CLUSTER_MFA, TnxId) of
[] -> mnesia:abort(not_found); do_query(TnxId) ->
[#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> case mnesia:read(?CLUSTER_MFA, TnxId) of
#{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} [] -> mnesia:abort(not_found);
end [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] ->
end, #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt}
transaction(Fun). end.
-spec reset() -> reset. -spec reset() -> reset.
reset() -> gen_statem:call(?MODULE, reset). reset() -> gen_statem:call(?MODULE, reset).
@ -128,77 +129,77 @@ status() ->
%%%=================================================================== %%%===================================================================
%% @private %% @private
init([Node]) -> init([Node, RetryMs]) ->
{ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
{ok, ?CATCH_UP, Node, ?CATCH_UP_AFTER(0)}. {ok, ?CATCH_UP, #{node => Node, retry_interval => RetryMs}, ?CATCH_UP_AFTER(0)}.
callback_mode() -> callback_mode() ->
handle_event_function. handle_event_function.
%% @private %% @private
format_status(Opt, [_PDict, StateName, Node]) -> format_status(Opt, [_PDict, StateName, Data]) ->
#{state => StateName, node => Node, opt => Opt}. #{state => StateName, data => Data , opt => Opt}.
%% @private %% @private
handle_event(state_timeout, catch_up_delay, _State, Node) -> handle_event(state_timeout, catch_up_delay, _State, Data) ->
catch_up(Node); catch_up(Data);
handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{} = MFARec, _AId}}, ?REALTIME, Node) -> handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{} = MFARec, _AId}}, ?REALTIME, Data) ->
handle_mfa_write_event(MFARec, Node); handle_mfa_write_event(MFARec, Data);
handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{}, _ActivityId}}, ?CATCH_UP, _Node) -> handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{}, _ActivityId}}, ?CATCH_UP, _Data) ->
{keep_state_and_data, [postpone, ?CATCH_UP_AFTER(0)]}; {keep_state_and_data, [postpone, ?CATCH_UP_AFTER(0)]};
%% we should catch up as soon as possible when we reset all. %% we should catch up as soon as possible when we reset all.
handle_event(info, {mnesia_table_event, {delete,{schema, ?CLUSTER_MFA}, _Tid}}, _, _Node) -> handle_event(info, {mnesia_table_event, {delete,{schema, ?CLUSTER_MFA}, _Tid}}, _, _Data) ->
{keep_state_and_data, [?CATCH_UP_AFTER(0)]}; {keep_state_and_data, [?CATCH_UP_AFTER(0)]};
handle_event({call, From}, reset, _State, _Node) -> handle_event({call, From}, reset, _State, _Data) ->
_ = ekka_mnesia:clear_table(?CLUSTER_COMMIT), _ = ekka_mnesia:clear_table(?CLUSTER_COMMIT),
_ = ekka_mnesia:clear_table(?CLUSTER_MFA), _ = ekka_mnesia:clear_table(?CLUSTER_MFA),
{keep_state_and_data, [{reply, From, ok}, ?CATCH_UP_AFTER(0)]}; {keep_state_and_data, [{reply, From, ok}, ?CATCH_UP_AFTER(0)]};
handle_event({call, From}, {initiate, MFA}, ?REALTIME, Node) -> handle_event({call, From}, {initiate, MFA}, ?REALTIME, Data = #{node := Node}) ->
case transaction(fun() -> init_mfa(Node, MFA) end) of case transaction(fun() -> init_mfa(Node, MFA) end) of
{atomic, {ok, TnxId}} -> {atomic, {ok, TnxId}} ->
{keep_state, Node, [{reply, From, {ok, TnxId}}]}; {keep_state, Data, [{reply, From, {ok, TnxId}}]};
{aborted, Reason} -> {aborted, Reason} ->
{keep_state, Node, [{reply, From, {error, Reason}}]} {keep_state, Data, [{reply, From, {error, Reason}}]}
end; end;
handle_event({call, From}, {initiate, _MFA}, ?CATCH_UP, Node) -> handle_event({call, From}, {initiate, _MFA}, ?CATCH_UP, Data = #{retry_interval := RetryMs}) ->
case catch_up(Node) of case catch_up(Data) of
{next_state, ?REALTIME, Node} -> {next_state, ?REALTIME, Data} ->
{next_state, ?REALTIME, Node, [{postpone, true}]}; {next_state, ?REALTIME, Data, [{postpone, true}]};
_ -> _ ->
Reason = "There are still transactions that have not been executed.", Reason = "There are still transactions that have not been executed.",
{keep_state, Node, [{reply, From, {error, Reason}}, ?CATCH_UP_AFTER(1)]} {keep_state_and_data, [{reply, From, {error, Reason}}, ?CATCH_UP_AFTER(RetryMs)]}
end; end;
handle_event(_EventType, _EventContent, ?CATCH_UP, _Node) -> handle_event(_EventType, _EventContent, ?CATCH_UP, #{retry_interval := RetryMs}) ->
{keep_state_and_data, [?CATCH_UP_AFTER(10)]}; {keep_state_and_data, [?CATCH_UP_AFTER(RetryMs)]};
handle_event(_EventType, _EventContent, _StateName, _Node) -> handle_event(_EventType, _EventContent, _StateName, _Data) ->
keep_state_and_data. keep_state_and_data.
terminate(_Reason, _StateName, _Node) -> terminate(_Reason, _StateName, _Data) ->
ok. ok.
code_change(_OldVsn, StateName, Node, _Extra) -> code_change(_OldVsn, StateName, Data, _Extra) ->
{ok, StateName, Node}. {ok, StateName, Data}.
%%%=================================================================== %%%===================================================================
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
catch_up(Node) -> catch_up(#{node := Node, retry_interval := RetryMs} = Data) ->
case get_next_mfa(Node) of case get_next_mfa(Node) of
{atomic, caught_up} -> {next_state, ?REALTIME, Node}; {atomic, caught_up} -> {next_state, ?REALTIME, Data};
{atomic, {still_lagging, NextId, MFA}} -> {atomic, {still_lagging, NextId, MFA}} ->
case apply_mfa(NextId, MFA) of case apply_mfa(NextId, MFA) of
ok -> ok ->
case transaction(fun() -> commit(Node, NextId) end) of case transaction(fun() -> commit(Node, NextId) end) of
{atomic, ok} -> catch_up(Node); {atomic, ok} -> catch_up(Data);
_ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]}
end; end;
_ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]}
end; end;
{aborted, _Reason} -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} {aborted, _Reason} -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]}
end. end.
get_next_mfa(Node) -> get_next_mfa(Node) ->
@ -252,17 +253,18 @@ get_latest_id() ->
Id -> Id Id -> Id
end. end.
handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Node) -> handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Data) ->
#{node := Node, retry_interval := RetryMs} = Data,
{atomic, LastAppliedId} = transaction(fun() -> get_last_applied_id(Node, EventId - 1) end), {atomic, LastAppliedId} = transaction(fun() -> get_last_applied_id(Node, EventId - 1) end),
if LastAppliedId + 1 =:= EventId -> if LastAppliedId + 1 =:= EventId ->
case apply_mfa(EventId, MFA) of case apply_mfa(EventId, MFA) of
ok -> ok ->
case transaction(fun() -> commit(Node, EventId) end) of case transaction(fun() -> commit(Node, EventId) end) of
{atomic, ok} -> {atomic, ok} ->
{next_state, ?REALTIME, Node}; {next_state, ?REALTIME, Data};
_ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]}
end; end;
_ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]}
end; end;
LastAppliedId >= EventId -> %% It's means the initiator receive self event or other receive stale event. LastAppliedId >= EventId -> %% It's means the initiator receive self event or other receive stale event.
keep_state_and_data; keep_state_and_data;
@ -301,8 +303,11 @@ do_catch_up_in_one_trans(LatestId, Node) ->
{error, Reason} -> mnesia:abort(Reason) {error, Reason} -> mnesia:abort(Reason)
end. end.
transaction(Fun) -> transaction(Func, Args) ->
ekka_mnesia:transaction(?COMMON_SHARD, Fun). ekka_mnesia:transaction(?COMMON_SHARD, Func, Args).
transaction(Func) ->
ekka_mnesia:transaction(?COMMON_SHARD, Func).
apply_mfa(TnxId, {M, F, A} = MFA) -> apply_mfa(TnxId, {M, F, A} = MFA) ->
try try

View File

@ -21,22 +21,27 @@
-include("logger.hrl"). -include("logger.hrl").
-include("emqx_cluster_rpc.hrl"). -include("emqx_cluster_rpc.hrl").
-export([start_link/0]). -export([start_link/0, start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]). code_change/3]).
-define(MFA_HISTORY_LEN, 100). -define(MFA_HISTORY_LEN, 100).
start_link() -> start_link() ->
gen_server:start_link(?MODULE, [], []). MaxHistory = emqx:get_config([broker, cluster_call, mfa_max_history]),
CleanupMs = emqx:get_config([broker, cluster_call, mfa_cleanup_interval]),
start_link(MaxHistory, CleanupMs).
start_link(MaxHistory, CleanupMs) ->
State = #{max_history => MaxHistory, cleanup_ms => CleanupMs, timer => undefined},
gen_server:start_link(?MODULE, [State], []).
%%%=================================================================== %%%===================================================================
%%% Spawning and gen_server implementation %%% Spawning and gen_server implementation
%%%=================================================================== %%%===================================================================
init([]) -> init([State]) ->
_ = emqx_misc:rand_seed(), {ok, ensure_timer(State)}.
{ok, ensure_timer(#{timer => undefined})}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?LOG(error, "unexpected call: ~p", [Req]), ?LOG(error, "unexpected call: ~p", [Req]),
@ -46,8 +51,8 @@ handle_cast(Msg, State) ->
?LOG(error, "unexpected msg: ~p", [Msg]), ?LOG(error, "unexpected msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef}) -> handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) ->
case ekka_mnesia:transaction(?COMMON_SHARD, fun del_stale_mfa/0, []) of case ekka_mnesia:transaction(?COMMON_SHARD, fun del_stale_mfa/1, [MaxHistory]) of
{atomic, ok} -> ok; {atomic, ok} -> ok;
Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error]) Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error])
end, end,
@ -66,23 +71,15 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
ensure_timer(State = #{cleanup_ms := Ms}) ->
-ifdef(TEST).
ensure_timer(State) ->
State#{timer := emqx_misc:start_timer(timer:seconds(1), del_stale_mfa)}.
-else.
ensure_timer(State) ->
Ms = timer:minutes(5) + rand:uniform(5000),
State#{timer := emqx_misc:start_timer(Ms, del_stale_mfa)}. State#{timer := emqx_misc:start_timer(Ms, del_stale_mfa)}.
-endif.
%% @doc Keep the latest completed 100 records for querying and troubleshooting. %% @doc Keep the latest completed 100 records for querying and troubleshooting.
del_stale_mfa() -> del_stale_mfa(MaxHistory) ->
DoneId = DoneId =
mnesia:foldl(fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end, mnesia:foldl(fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end,
infinity, ?CLUSTER_COMMIT), infinity, ?CLUSTER_COMMIT),
delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, ?MFA_HISTORY_LEN). delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory).
delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok; delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok;
delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId ->

View File

@ -293,6 +293,7 @@ fields("broker") ->
, {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)} , {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)}
, {"route_batch_clean", t(boolean(), undefined, true)} , {"route_batch_clean", t(boolean(), undefined, true)}
, {"perf", ref("perf")} , {"perf", ref("perf")}
, {"cluster_call", ref("cluster_call")}
]; ];
fields("perf") -> fields("perf") ->
@ -325,6 +326,12 @@ fields("sysmon_os") ->
, {"procmem_high_watermark", t(percent(), undefined, "5%")} , {"procmem_high_watermark", t(percent(), undefined, "5%")}
]; ];
fields("cluster_call") ->
[{"retry_interval", t(duration(), undefined, "2s")}
, {"mfa_max_history", t(range(1, 500), undefined, 50)}
, {"mfa_cleanup_interval", t(duration(), undefined, "5m")}
];
fields("alarm") -> fields("alarm") ->
[ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])} [ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])}
, {"size_limit", t(integer(), undefined, 1000)} , {"size_limit", t(integer(), undefined, 1000)}

View File

@ -41,6 +41,11 @@ init_per_suite(Config) ->
application:load(emqx), application:load(emqx),
ok = ekka:start(), ok = ekka:start(),
emqx_cluster_rpc:mnesia(copy), emqx_cluster_rpc:mnesia(copy),
emqx_config:put([broker, cluster_call], #{
mfa_max_history => 100,
mfa_cleanup_interval => 1000,
retry_interval => 900
}),
%%dbg:tracer(), %%dbg:tracer(),
%%dbg:p(all, c), %%dbg:p(all, c),
%%dbg:tpl(emqx_cluster_rpc, cx), %%dbg:tpl(emqx_cluster_rpc, cx),
@ -75,7 +80,7 @@ t_base_test(_Config) ->
?assert(maps:is_key(created_at, Query)), ?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)), ?assertEqual(ok, receive_msg(3, test)),
SysStatus = lists:last(lists:last(element(4,sys:get_status(?NODE1)))), SysStatus = lists:last(lists:last(element(4,sys:get_status(?NODE1)))),
?assertEqual(#{node => node(), opt => normal, state => realtime}, SysStatus), ?assertEqual(#{data => #{node => node(),retry_interval => 900}, opt => normal, state => realtime}, SysStatus),
sleep(400), sleep(400),
{atomic, Status} = emqx_cluster_rpc:status(), {atomic, Status} = emqx_cluster_rpc:status(),
?assertEqual(3, length(Status)), ?assertEqual(3, length(Status)),
@ -186,9 +191,9 @@ t_del_stale_mfa(_Config) ->
start() -> start() ->
{ok, Pid1} = emqx_cluster_rpc:start_link(), {ok, Pid1} = emqx_cluster_rpc:start_link(),
{ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2), {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
{ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3), {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
{ok, Pid4} = emqx_cluster_rpc_handler:start_link(), {ok, Pid4} = emqx_cluster_rpc_handler:start_link(100, 500),
{ok, [Pid1, Pid2, Pid3, Pid4]}. {ok, [Pid1, Pid2, Pid3, Pid4]}.
stop() -> stop() ->