feat: replace gen_statem by gen_server

This commit is contained in:
zhongwencool 2021-08-25 16:38:01 +08:00
parent 60c1c4edba
commit 4528508620
2 changed files with 69 additions and 108 deletions

View File

@ -14,14 +14,14 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_cluster_rpc). -module(emqx_cluster_rpc).
-behaviour(gen_statem). -behaviour(gen_server).
%% API %% API
-export([start_link/0, mnesia/1]). -export([start_link/0, mnesia/1]).
-export([multicall/3, multicall/4, query/1, reset/0, status/0]). -export([multicall/3, multicall/4, query/1, reset/0, status/0]).
-export([init/1, format_status/2, handle_event/4, terminate/3, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/4, callback_mode/0]). handle_continue/2, code_change/3]).
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
@ -40,8 +40,7 @@
-rlog_shard({?COMMON_SHARD, ?CLUSTER_COMMIT}). -rlog_shard({?COMMON_SHARD, ?CLUSTER_COMMIT}).
-define(CATCH_UP, catch_up). -define(CATCH_UP, catch_up).
-define(REALTIME, realtime). -define(TIMEOUT, timer:minutes(1)).
-define(CATCH_UP_AFTER(_Ms_), {state_timeout, _Ms_, catch_up_delay}).
%%%=================================================================== %%%===================================================================
%%% API %%% API
@ -49,14 +48,14 @@
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?CLUSTER_MFA, [ ok = ekka_mnesia:create_table(?CLUSTER_MFA, [
{type, ordered_set}, {type, ordered_set},
{rlog_shard, ?COMMON_SHARD},
{disc_copies, [node()]}, {disc_copies, [node()]},
{local_content, true},
{record_name, cluster_rpc_mfa}, {record_name, cluster_rpc_mfa},
{attributes, record_info(fields, cluster_rpc_mfa)}]), {attributes, record_info(fields, cluster_rpc_mfa)}]),
ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [ ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [
{type, set}, {type, set},
{rlog_shard, ?COMMON_SHARD},
{disc_copies, [node()]}, {disc_copies, [node()]},
{local_content, true},
{record_name, cluster_rpc_commit}, {record_name, cluster_rpc_commit},
{attributes, record_info(fields, cluster_rpc_commit)}]); {attributes, record_info(fields, cluster_rpc_commit)}]);
mnesia(copy) -> mnesia(copy) ->
@ -66,15 +65,16 @@ mnesia(copy) ->
start_link() -> start_link() ->
RetryMs = application:get_env(emqx_machine, cluster_call_retry_interval, 1000), RetryMs = application:get_env(emqx_machine, cluster_call_retry_interval, 1000),
start_link(node(), ?MODULE, RetryMs). start_link(node(), ?MODULE, RetryMs).
start_link(Node, Name, RetryMs) -> start_link(Node, Name, RetryMs) ->
gen_statem:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []).
-spec multicall(Module, Function, Args) -> {ok, TnxId} | {error, Reason} when -spec multicall(Module, Function, Args) -> {ok, TnxId} | {error, Reason} when
Module :: module(), Module :: module(),
Function :: atom(), Function :: atom(),
Args :: [term()], Args :: [term()],
TnxId :: pos_integer(), TnxId :: pos_integer(),
Reason :: term(). Reason :: string().
multicall(M, F, A) -> multicall(M, F, A) ->
multicall(M, F, A, timer:minutes(2)). multicall(M, F, A, timer:minutes(2)).
@ -84,14 +84,17 @@ multicall(M, F, A) ->
Args :: [term()], Args :: [term()],
TnxId :: pos_integer(), TnxId :: pos_integer(),
Timeout :: timeout(), Timeout :: timeout(),
Reason :: term(). Reason :: string().
multicall(M, F, A, Timeout) -> multicall(M, F, A, Timeout) ->
MFA = {initiate, {M, F, A}}, MFA = {initiate, {M, F, A}},
case ekka_rlog:role() of case ekka_rlog:role() of
core -> gen_statem:call(?MODULE, MFA, Timeout); core -> gen_server:call(?MODULE, MFA, Timeout);
replicant -> replicant ->
%% the initiate transaction must happened on core node
%% make sure MFA(in the transaction) and the transaction on the same node
%% don't need rpc again inside transaction.
case ekka_rlog_status:upstream_node(?COMMON_SHARD) of case ekka_rlog_status:upstream_node(?COMMON_SHARD) of
{ok, Node} -> gen_statem:call({?MODULE, Node}, MFA, Timeout); {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout);
disconnected -> {error, disconnected} disconnected -> {error, disconnected}
end end
end. end.
@ -101,7 +104,7 @@ query(TnxId) ->
transaction(fun trans_query/1, [TnxId]). transaction(fun trans_query/1, [TnxId]).
-spec reset() -> reset. -spec reset() -> reset.
reset() -> gen_statem:call(?MODULE, reset). reset() -> gen_server:call(?MODULE, reset).
-spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}. -spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}.
status() -> status() ->
@ -114,75 +117,67 @@ status() ->
%% @private %% @private
init([Node, RetryMs]) -> init([Node, RetryMs]) ->
{ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
{ok, ?CATCH_UP, #{node => Node, retry_interval => RetryMs}, ?CATCH_UP_AFTER(0)}. {ok, #{node => Node, retry_interval => RetryMs}, {continue, ?CATCH_UP}}.
callback_mode() ->
handle_event_function.
%% @private %% @private
format_status(Opt, [_PDict, StateName, Data]) -> handle_continue(?CATCH_UP, State) ->
#{state => StateName, data => Data , opt => Opt}. {noreply, State, catch_up(State)}.
%% @private handle_call(reset, _From, State) ->
handle_event(state_timeout, catch_up_delay, _State, Data) ->
catch_up(Data);
handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{} = MFARec, _AId}}, ?REALTIME, Data) ->
handle_mfa_write_event(MFARec, Data);
handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{}, _ActivityId}}, ?CATCH_UP, _Data) ->
{keep_state_and_data, [postpone, ?CATCH_UP_AFTER(0)]};
%% we should catch up as soon as possible when we reset all.
handle_event(info, {mnesia_table_event, {delete,{schema, ?CLUSTER_MFA}, _Tid}}, _, _Data) ->
{keep_state_and_data, [?CATCH_UP_AFTER(0)]};
handle_event({call, From}, reset, _State, _Data) ->
_ = ekka_mnesia:clear_table(?CLUSTER_COMMIT), _ = ekka_mnesia:clear_table(?CLUSTER_COMMIT),
_ = ekka_mnesia:clear_table(?CLUSTER_MFA), _ = ekka_mnesia:clear_table(?CLUSTER_MFA),
{keep_state_and_data, [{reply, From, ok}, ?CATCH_UP_AFTER(0)]}; {reply, ok, State, {continue, ?CATCH_UP}};
handle_event({call, From}, {initiate, MFA}, ?REALTIME, Data = #{node := Node}) -> handle_call({initiate, MFA}, _From, State = #{node := Node}) ->
case transaction(fun init_mfa/2, [Node, MFA]) of case transaction(fun init_mfa/2, [Node, MFA]) of
{atomic, {ok, TnxId}} -> {atomic, {ok, TnxId}} ->
{keep_state, Data, [{reply, From, {ok, TnxId}}]}; {reply, {ok, TnxId}, State, {continue, ?CATCH_UP}};
{aborted, Reason} -> {aborted, Reason} ->
{keep_state, Data, [{reply, From, {error, Reason}}]} {reply, {error, Reason}, State, {continue, ?CATCH_UP}}
end;
handle_event({call, From}, {initiate, _MFA}, ?CATCH_UP, Data = #{retry_interval := RetryMs}) ->
case catch_up(Data) of
{next_state, ?REALTIME, Data} ->
{next_state, ?REALTIME, Data, [{postpone, true}]};
_ ->
Reason = "There are still transactions that have not been executed.",
{keep_state_and_data, [{reply, From, {error, Reason}}, ?CATCH_UP_AFTER(RetryMs)]}
end; end;
handle_call(_, _From, State) ->
{reply, ok, State, catch_up(State)}.
handle_event(_EventType, _EventContent, ?CATCH_UP, #{retry_interval := RetryMs}) -> handle_cast(_, State) ->
{keep_state_and_data, [?CATCH_UP_AFTER(RetryMs)]}; {noreply, State, catch_up(State)}.
handle_event(_EventType, _EventContent, _StateName, _Data) ->
keep_state_and_data.
terminate(_Reason, _StateName, _Data) -> handle_info({mnesia_table_event, _}, State) ->
{noreply, State, catch_up(State)};
handle_info(_, State) ->
{noreply, State, catch_up(State)}.
terminate(_Reason, _Data) ->
ok. ok.
code_change(_OldVsn, StateName, Data, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, StateName, Data}. {ok, State}.
%%%=================================================================== %%%===================================================================
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
catch_up(#{node := Node, retry_interval := RetryMs} = Data) -> catch_up(#{node := Node, retry_interval := RetryMs} = State) ->
case transaction(fun get_next_mfa/1, [Node]) of case transaction(fun get_next_mfa/1, [Node]) of
{atomic, caught_up} -> {next_state, ?REALTIME, Data}; {atomic, caught_up} -> ?TIMEOUT;
{atomic, {still_lagging, NextId, MFA}} -> {atomic, {still_lagging, NextId, MFA}} ->
case apply_mfa(NextId, MFA) of case apply_mfa(NextId, MFA) of
ok -> ok ->
case transaction(fun commit/2, [Node, NextId]) of case transaction(fun commit/2, [Node, NextId]) of
{atomic, ok} -> catch_up(Data); {atomic, ok} -> catch_up(State);
_ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} Error ->
?SLOG(error, #{
msg => "mnesia write transaction failed",
node => Node,
nextId => NextId,
error => Error}),
RetryMs
end; end;
_ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} _Error -> RetryMs
end; end;
{aborted, _Reason} -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} {aborted, Reason} ->
?SLOG(error, #{
msg => "get_next_mfa transaction failed",
node => Node, error => Reason}),
RetryMs
end. end.
get_next_mfa(Node) -> get_next_mfa(Node) ->
@ -192,7 +187,9 @@ get_next_mfa(Node) ->
LatestId = get_latest_id(), LatestId = get_latest_id(),
TnxId = max(LatestId - 1, 0), TnxId = max(LatestId - 1, 0),
commit(Node, TnxId), commit(Node, TnxId),
?LOG(notice, "New node(~p) first catch up and start commit at ~p", [Node, TnxId]), ?SLOG(notice, #{
msg => "New node first catch up and start commit.",
node => Node, tnx_id => TnxId}),
TnxId; TnxId;
[#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1
end, end,
@ -219,7 +216,12 @@ do_catch_up(ToTnxId, Node) ->
[#cluster_rpc_commit{tnx_id = LastAppliedId}] -> [#cluster_rpc_commit{tnx_id = LastAppliedId}] ->
Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
[Node, LastAppliedId, ToTnxId])), [Node, LastAppliedId, ToTnxId])),
?LOG(error, Reason), ?SLOG(error, #{
msg => "catch up failed!",
last_applied_id => LastAppliedId,
node => Node,
to_tnx_id => ToTnxId
}),
{error, Reason} {error, Reason}
end. end.
@ -232,35 +234,6 @@ get_latest_id() ->
Id -> Id Id -> Id
end. end.
handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Data) ->
#{node := Node, retry_interval := RetryMs} = Data,
{atomic, LastAppliedId} = transaction(fun get_last_applied_id/2, [Node, EventId - 1]),
if LastAppliedId + 1 =:= EventId ->
case apply_mfa(EventId, MFA) of
ok ->
case transaction(fun commit/2, [Node, EventId]) of
{atomic, ok} ->
{next_state, ?REALTIME, Data};
_ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]}
end;
_ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]}
end;
LastAppliedId >= EventId -> %% It's means the initiator receive self event or other receive stale event.
keep_state_and_data;
true ->
?LOG(error, "LastAppliedId+1<EventId, maybe the mnesia event'order is messed up! restart process:~p",
[{LastAppliedId, EventId, MFA, Node}]),
{stop, {"LastAppliedId+1<EventId", {LastAppliedId, EventId, MFA, Node}}}
end.
get_last_applied_id(Node, Default) ->
case mnesia:wread({?CLUSTER_COMMIT, Node}) of
[#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId;
[] ->
commit(Node, Default),
Default
end.
init_mfa(Node, MFA) -> init_mfa(Node, MFA) ->
mnesia:write_lock_table(?CLUSTER_MFA), mnesia:write_lock_table(?CLUSTER_MFA),
LatestId = get_latest_id(), LatestId = get_latest_id(),

View File

@ -78,8 +78,6 @@ t_base_test(_Config) ->
?assertEqual(node(), maps:get(initiator, Query)), ?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)), ?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)), ?assertEqual(ok, receive_msg(3, test)),
SysStatus = lists:last(lists:last(element(4,sys:get_status(?NODE1)))),
?assertEqual(#{data => #{node => node(),retry_interval => 900}, opt => normal, state => realtime}, SysStatus),
sleep(400), sleep(400),
{atomic, Status} = emqx_cluster_rpc:status(), {atomic, Status} = emqx_cluster_rpc:status(),
?assertEqual(3, length(Status)), ?assertEqual(3, length(Status)),
@ -111,12 +109,9 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
{atomic, [Status]} = emqx_cluster_rpc:status(), {atomic, [Status]} = emqx_cluster_rpc:status(),
?assertEqual(MFA, maps:get(mfa, Status)), ?assertEqual(MFA, maps:get(mfa, Status)),
?assertEqual(node(), maps:get(node, Status)), ?assertEqual(node(), maps:get(node, Status)),
?assertEqual(realtime, element(1, sys:get_state(?NODE1))),
?assertEqual(catch_up, element(1, sys:get_state(?NODE2))),
?assertEqual(catch_up, element(1, sys:get_state(?NODE3))),
erlang:send(?NODE2, test), erlang:send(?NODE2, test),
Res = gen_statem:call(?NODE2, {initiate, {M, F, A}}), Res = gen_statem:call(?NODE2, {initiate, {M, F, A}}),
?assertEqual({error, "There are still transactions that have not been executed."}, Res), ?assertEqual({error, "MFA return not ok"}, Res),
ok. ok.
t_catch_up_status_handle_next_commit(_Config) -> t_catch_up_status_handle_next_commit(_Config) ->
@ -124,7 +119,6 @@ t_catch_up_status_handle_next_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]},
{ok, _} = emqx_cluster_rpc:multicall(M, F, A), {ok, _} = emqx_cluster_rpc:multicall(M, F, A),
?assertEqual(catch_up, element(1, sys:get_state(?NODE2))),
{ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}), {ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}),
ok. ok.
@ -139,17 +133,11 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
?assertEqual([], L), ?assertEqual([], L),
?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)),
?assertEqual(node(), maps:get(node, Status)), ?assertEqual(node(), maps:get(node, Status)),
?assertEqual(realtime, element(1, sys:get_state(?NODE1))),
?assertEqual(catch_up, element(1, sys:get_state(?NODE2))),
?assertEqual(catch_up, element(1, sys:get_state(?NODE3))),
sleep(4000), sleep(4000),
{atomic, [Status1]} = emqx_cluster_rpc:status(), {atomic, [Status1]} = emqx_cluster_rpc:status(),
?assertEqual(Status, Status1), ?assertEqual(Status, Status1),
sleep(1600), sleep(1600),
{atomic, NewStatus} = emqx_cluster_rpc:status(), {atomic, NewStatus} = emqx_cluster_rpc:status(),
?assertEqual(realtime, element(1, sys:get_state(?NODE1))),
?assertEqual(realtime, element(1, sys:get_state(?NODE2))),
?assertEqual(realtime, element(1, sys:get_state(?NODE3))),
?assertEqual(3, length(NewStatus)), ?assertEqual(3, length(NewStatus)),
Pid = self(), Pid = self(),
MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]},