diff --git a/apps/emqx/src/emqx_cluster_rpc.erl b/apps/emqx/src/emqx_cluster_rpc.erl index 30f9e3234..a4c54d01e 100644 --- a/apps/emqx/src/emqx_cluster_rpc.erl +++ b/apps/emqx/src/emqx_cluster_rpc.erl @@ -92,37 +92,14 @@ multicall(M, F, A, Timeout) -> -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. query(TnxId) -> - transaction(fun do_query/1, [TnxId]). - -do_query(TnxId) -> - case mnesia:read(?CLUSTER_MFA, TnxId) of - [] -> mnesia:abort(not_found); - [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> - #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} - end. + transaction(fun trans_query/1, [TnxId]). -spec reset() -> reset. reset() -> gen_statem:call(?MODULE, reset). -spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}. status() -> - Fun = fun() -> - mnesia:foldl(fun(Rec, Acc) -> - #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, - case mnesia:read(?CLUSTER_MFA, TnxId) of - [MFARec] -> - #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec, - [#{ - node => Node, - tnx_id => TnxId, - initiator => InitNode, - mfa => MFA, - created_at => CreatedAt - } | Acc]; - [] -> Acc - end end, [], ?CLUSTER_COMMIT) - end, - transaction(Fun). + transaction(fun trans_status/0, []). %%%=================================================================== %%% gen_statem callbacks @@ -158,7 +135,7 @@ handle_event({call, From}, reset, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}, ?CATCH_UP_AFTER(0)]}; handle_event({call, From}, {initiate, MFA}, ?REALTIME, Data = #{node := Node}) -> - case transaction(fun() -> init_mfa(Node, MFA) end) of + case transaction(fun init_mfa/2, [Node, MFA]) of {atomic, {ok, TnxId}} -> {keep_state, Data, [{reply, From, {ok, TnxId}}]}; {aborted, Reason} -> @@ -188,12 +165,12 @@ code_change(_OldVsn, StateName, Data, _Extra) -> %%% Internal functions %%%=================================================================== catch_up(#{node := Node, retry_interval := RetryMs} = Data) -> - case get_next_mfa(Node) of + case transaction(fun get_next_mfa/1, [Node]) of {atomic, caught_up} -> {next_state, ?REALTIME, Data}; {atomic, {still_lagging, NextId, MFA}} -> case apply_mfa(NextId, MFA) of ok -> - case transaction(fun() -> commit(Node, NextId) end) of + case transaction(fun commit/2, [Node, NextId]) of {atomic, ok} -> catch_up(Data); _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} end; @@ -203,24 +180,20 @@ catch_up(#{node := Node, retry_interval := RetryMs} = Data) -> end. get_next_mfa(Node) -> - Fun = - fun() -> - NextId = - case mnesia:wread({?CLUSTER_COMMIT, Node}) of - [] -> - LatestId = get_latest_id(), - TnxId = max(LatestId - 1, 0), - commit(Node, TnxId), - ?LOG(notice, "New node(~p) first catch up and start commit at ~p", [Node, TnxId]), - TnxId; - [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 - end, - case mnesia:read(?CLUSTER_MFA, NextId) of - [] -> caught_up; - [#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA} - end + NextId = + case mnesia:wread({?CLUSTER_COMMIT, Node}) of + [] -> + LatestId = get_latest_id(), + TnxId = max(LatestId - 1, 0), + commit(Node, TnxId), + ?LOG(notice, "New node(~p) first catch up and start commit at ~p", [Node, TnxId]), + TnxId; + [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 end, - transaction(Fun). + case mnesia:read(?CLUSTER_MFA, NextId) of + [] -> caught_up; + [#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA} + end. do_catch_up(ToTnxId, Node) -> case mnesia:wread({?CLUSTER_COMMIT, Node}) of @@ -255,11 +228,11 @@ get_latest_id() -> handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Data) -> #{node := Node, retry_interval := RetryMs} = Data, - {atomic, LastAppliedId} = transaction(fun() -> get_last_applied_id(Node, EventId - 1) end), + {atomic, LastAppliedId} = transaction(fun get_last_applied_id/2, [Node, EventId - 1]), if LastAppliedId + 1 =:= EventId -> case apply_mfa(EventId, MFA) of ok -> - case transaction(fun() -> commit(Node, EventId) end) of + case transaction(fun commit/2, [Node, EventId]) of {atomic, ok} -> {next_state, ?REALTIME, Data}; _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]} @@ -306,8 +279,28 @@ do_catch_up_in_one_trans(LatestId, Node) -> transaction(Func, Args) -> ekka_mnesia:transaction(?COMMON_SHARD, Func, Args). -transaction(Func) -> - ekka_mnesia:transaction(?COMMON_SHARD, Func). +trans_status() -> + mnesia:foldl(fun(Rec, Acc) -> + #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, + case mnesia:read(?CLUSTER_MFA, TnxId) of + [MFARec] -> + #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec, + [#{ + node => Node, + tnx_id => TnxId, + initiator => InitNode, + mfa => MFA, + created_at => CreatedAt + } | Acc]; + [] -> Acc + end end, [], ?CLUSTER_COMMIT). + +trans_query(TnxId) -> + case mnesia:read(?CLUSTER_MFA, TnxId) of + [] -> mnesia:abort(not_found); + [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> + #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} + end. apply_mfa(TnxId, {M, F, A} = MFA) -> try diff --git a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl index 8e8f55834..5106183bb 100644 --- a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl @@ -136,7 +136,8 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]}, {ok, _} = emqx_cluster_rpc:multicall(M, F, A), {ok, _} = emqx_cluster_rpc:multicall(io, format, ["test"]), - {atomic, [Status]} = emqx_cluster_rpc:status(), + {atomic, [Status|L]} = emqx_cluster_rpc:status(), + ?assertEqual([], L), ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), ?assertEqual(realtime, element(1, sys:get_state(?NODE1))),