From 45285086209ab7f0a7da40cd1d596f294ec543e3 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 25 Aug 2021 16:38:01 +0800 Subject: [PATCH] feat: replace gen_statem by gen_server --- apps/emqx_machine/src/emqx_cluster_rpc.erl | 163 ++++++++---------- .../test/emqx_cluster_rpc_SUITE.erl | 14 +- 2 files changed, 69 insertions(+), 108 deletions(-) diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index b988d55fb..f7dc1eef9 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -14,14 +14,14 @@ %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_cluster_rpc). --behaviour(gen_statem). +-behaviour(gen_server). %% API -export([start_link/0, mnesia/1]). -export([multicall/3, multicall/4, query/1, reset/0, status/0]). --export([init/1, format_status/2, handle_event/4, terminate/3, - code_change/4, callback_mode/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + handle_continue/2, code_change/3]). -ifdef(TEST). -compile(export_all). @@ -40,8 +40,7 @@ -rlog_shard({?COMMON_SHARD, ?CLUSTER_COMMIT}). -define(CATCH_UP, catch_up). --define(REALTIME, realtime). --define(CATCH_UP_AFTER(_Ms_), {state_timeout, _Ms_, catch_up_delay}). +-define(TIMEOUT, timer:minutes(1)). %%%=================================================================== %%% API @@ -49,14 +48,14 @@ mnesia(boot) -> ok = ekka_mnesia:create_table(?CLUSTER_MFA, [ {type, ordered_set}, + {rlog_shard, ?COMMON_SHARD}, {disc_copies, [node()]}, - {local_content, true}, {record_name, cluster_rpc_mfa}, {attributes, record_info(fields, cluster_rpc_mfa)}]), ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [ {type, set}, + {rlog_shard, ?COMMON_SHARD}, {disc_copies, [node()]}, - {local_content, true}, {record_name, cluster_rpc_commit}, {attributes, record_info(fields, cluster_rpc_commit)}]); mnesia(copy) -> @@ -66,15 +65,16 @@ mnesia(copy) -> start_link() -> RetryMs = application:get_env(emqx_machine, cluster_call_retry_interval, 1000), start_link(node(), ?MODULE, RetryMs). + start_link(Node, Name, RetryMs) -> - gen_statem:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). + gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). -spec multicall(Module, Function, Args) -> {ok, TnxId} | {error, Reason} when Module :: module(), Function :: atom(), Args :: [term()], TnxId :: pos_integer(), - Reason :: term(). + Reason :: string(). multicall(M, F, A) -> multicall(M, F, A, timer:minutes(2)). @@ -84,14 +84,17 @@ multicall(M, F, A) -> Args :: [term()], TnxId :: pos_integer(), Timeout :: timeout(), - Reason :: term(). + Reason :: string(). multicall(M, F, A, Timeout) -> MFA = {initiate, {M, F, A}}, case ekka_rlog:role() of - core -> gen_statem:call(?MODULE, MFA, Timeout); + core -> gen_server:call(?MODULE, MFA, Timeout); replicant -> + %% the initiate transaction must happened on core node + %% make sure MFA(in the transaction) and the transaction on the same node + %% don't need rpc again inside transaction. case ekka_rlog_status:upstream_node(?COMMON_SHARD) of - {ok, Node} -> gen_statem:call({?MODULE, Node}, MFA, Timeout); + {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); disconnected -> {error, disconnected} end end. @@ -101,7 +104,7 @@ query(TnxId) -> transaction(fun trans_query/1, [TnxId]). -spec reset() -> reset. -reset() -> gen_statem:call(?MODULE, reset). +reset() -> gen_server:call(?MODULE, reset). -spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}. status() -> @@ -114,75 +117,67 @@ status() -> %% @private init([Node, RetryMs]) -> {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), - {ok, ?CATCH_UP, #{node => Node, retry_interval => RetryMs}, ?CATCH_UP_AFTER(0)}. - -callback_mode() -> - handle_event_function. + {ok, #{node => Node, retry_interval => RetryMs}, {continue, ?CATCH_UP}}. %% @private -format_status(Opt, [_PDict, StateName, Data]) -> - #{state => StateName, data => Data , opt => Opt}. +handle_continue(?CATCH_UP, State) -> + {noreply, State, catch_up(State)}. -%% @private -handle_event(state_timeout, catch_up_delay, _State, Data) -> - catch_up(Data); - -handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{} = MFARec, _AId}}, ?REALTIME, Data) -> - handle_mfa_write_event(MFARec, Data); -handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{}, _ActivityId}}, ?CATCH_UP, _Data) -> - {keep_state_and_data, [postpone, ?CATCH_UP_AFTER(0)]}; -%% we should catch up as soon as possible when we reset all. -handle_event(info, {mnesia_table_event, {delete,{schema, ?CLUSTER_MFA}, _Tid}}, _, _Data) -> - {keep_state_and_data, [?CATCH_UP_AFTER(0)]}; - -handle_event({call, From}, reset, _State, _Data) -> +handle_call(reset, _From, State) -> _ = ekka_mnesia:clear_table(?CLUSTER_COMMIT), _ = ekka_mnesia:clear_table(?CLUSTER_MFA), - {keep_state_and_data, [{reply, From, ok}, ?CATCH_UP_AFTER(0)]}; + {reply, ok, State, {continue, ?CATCH_UP}}; -handle_event({call, From}, {initiate, MFA}, ?REALTIME, Data = #{node := Node}) -> +handle_call({initiate, MFA}, _From, State = #{node := Node}) -> case transaction(fun init_mfa/2, [Node, MFA]) of {atomic, {ok, TnxId}} -> - {keep_state, Data, [{reply, From, {ok, TnxId}}]}; + {reply, {ok, TnxId}, State, {continue, ?CATCH_UP}}; {aborted, Reason} -> - {keep_state, Data, [{reply, From, {error, Reason}}]} - end; -handle_event({call, From}, {initiate, _MFA}, ?CATCH_UP, Data = #{retry_interval := RetryMs}) -> - case catch_up(Data) of - {next_state, ?REALTIME, Data} -> - {next_state, ?REALTIME, Data, [{postpone, true}]}; - _ -> - Reason = "There are still transactions that have not been executed.", - {keep_state_and_data, [{reply, From, {error, Reason}}, ?CATCH_UP_AFTER(RetryMs)]} + {reply, {error, Reason}, State, {continue, ?CATCH_UP}} end; +handle_call(_, _From, State) -> + {reply, ok, State, catch_up(State)}. -handle_event(_EventType, _EventContent, ?CATCH_UP, #{retry_interval := RetryMs}) -> - {keep_state_and_data, [?CATCH_UP_AFTER(RetryMs)]}; -handle_event(_EventType, _EventContent, _StateName, _Data) -> - keep_state_and_data. +handle_cast(_, State) -> + {noreply, State, catch_up(State)}. -terminate(_Reason, _StateName, _Data) -> +handle_info({mnesia_table_event, _}, State) -> + {noreply, State, catch_up(State)}; +handle_info(_, State) -> + {noreply, State, catch_up(State)}. + +terminate(_Reason, _Data) -> ok. -code_change(_OldVsn, StateName, Data, _Extra) -> - {ok, StateName, Data}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== -catch_up(#{node := Node, retry_interval := RetryMs} = Data) -> +catch_up(#{node := Node, retry_interval := RetryMs} = State) -> case transaction(fun get_next_mfa/1, [Node]) of - {atomic, caught_up} -> {next_state, ?REALTIME, Data}; + {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> case apply_mfa(NextId, MFA) of ok -> case transaction(fun commit/2, [Node, NextId]) of - {atomic, ok} -> catch_up(Data); - _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} + {atomic, ok} -> catch_up(State); + Error -> + ?SLOG(error, #{ + msg => "mnesia write transaction failed", + node => Node, + nextId => NextId, + error => Error}), + RetryMs end; - _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} + _Error -> RetryMs end; - {aborted, _Reason} -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} + {aborted, Reason} -> + ?SLOG(error, #{ + msg => "get_next_mfa transaction failed", + node => Node, error => Reason}), + RetryMs end. get_next_mfa(Node) -> @@ -192,7 +187,9 @@ get_next_mfa(Node) -> LatestId = get_latest_id(), TnxId = max(LatestId - 1, 0), commit(Node, TnxId), - ?LOG(notice, "New node(~p) first catch up and start commit at ~p", [Node, TnxId]), + ?SLOG(notice, #{ + msg => "New node first catch up and start commit.", + node => Node, tnx_id => TnxId}), TnxId; [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 end, @@ -216,10 +213,15 @@ do_catch_up(ToTnxId, Node) -> {error, Reason} -> mnesia:abort(Reason); Other -> mnesia:abort(Other) end; - [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> + [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", [Node, LastAppliedId, ToTnxId])), - ?LOG(error, Reason), + ?SLOG(error, #{ + msg => "catch up failed!", + last_applied_id => LastAppliedId, + node => Node, + to_tnx_id => ToTnxId + }), {error, Reason} end. @@ -232,35 +234,6 @@ get_latest_id() -> Id -> Id end. -handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Data) -> - #{node := Node, retry_interval := RetryMs} = Data, - {atomic, LastAppliedId} = transaction(fun get_last_applied_id/2, [Node, EventId - 1]), - if LastAppliedId + 1 =:= EventId -> - case apply_mfa(EventId, MFA) of - ok -> - case transaction(fun commit/2, [Node, EventId]) of - {atomic, ok} -> - {next_state, ?REALTIME, Data}; - _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} - end; - _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} - end; - LastAppliedId >= EventId -> %% It's means the initiator receive self event or other receive stale event. - keep_state_and_data; - true -> - ?LOG(error, "LastAppliedId+1 - case mnesia:wread({?CLUSTER_COMMIT, Node}) of - [#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId; - [] -> - commit(Node, Default), - Default - end. - init_mfa(Node, MFA) -> mnesia:write_lock_table(?CLUSTER_MFA), LatestId = get_latest_id(), @@ -311,12 +284,12 @@ trans_query(TnxId) -> apply_mfa(TnxId, {M, F, A} = MFA) -> try Res = erlang:apply(M, F, A), - case Res =:= ok of - true -> - ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => ok}); - false -> - ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}) - end, + case Res =:= ok of + true -> + ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => ok}); + false -> + ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}) + end, Res catch C : E -> diff --git a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl index 92a89790a..b91131b93 100644 --- a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -78,8 +78,6 @@ t_base_test(_Config) -> ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), ?assertEqual(ok, receive_msg(3, test)), - SysStatus = lists:last(lists:last(element(4,sys:get_status(?NODE1)))), - ?assertEqual(#{data => #{node => node(),retry_interval => 900}, opt => normal, state => realtime}, SysStatus), sleep(400), {atomic, Status} = emqx_cluster_rpc:status(), ?assertEqual(3, length(Status)), @@ -111,12 +109,9 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> {atomic, [Status]} = emqx_cluster_rpc:status(), ?assertEqual(MFA, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), - ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), - ?assertEqual(catch_up, element(1, sys:get_state(?NODE2))), - ?assertEqual(catch_up, element(1, sys:get_state(?NODE3))), erlang:send(?NODE2, test), Res = gen_statem:call(?NODE2, {initiate, {M, F, A}}), - ?assertEqual({error, "There are still transactions that have not been executed."}, Res), + ?assertEqual({error, "MFA return not ok"}, Res), ok. t_catch_up_status_handle_next_commit(_Config) -> @@ -124,7 +119,6 @@ t_catch_up_status_handle_next_commit(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, {ok, _} = emqx_cluster_rpc:multicall(M, F, A), - ?assertEqual(catch_up, element(1, sys:get_state(?NODE2))), {ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}), ok. @@ -139,17 +133,11 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> ?assertEqual([], L), ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), - ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), - ?assertEqual(catch_up, element(1, sys:get_state(?NODE2))), - ?assertEqual(catch_up, element(1, sys:get_state(?NODE3))), sleep(4000), {atomic, [Status1]} = emqx_cluster_rpc:status(), ?assertEqual(Status, Status1), sleep(1600), {atomic, NewStatus} = emqx_cluster_rpc:status(), - ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), - ?assertEqual(realtime, element(1, sys:get_state(?NODE2))), - ?assertEqual(realtime, element(1, sys:get_state(?NODE3))), ?assertEqual(3, length(NewStatus)), Pid = self(), MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]},