diff --git a/apps/emqx/include/emqx_cluster_rpc.hrl b/apps/emqx/include/emqx_cluster_rpc.hrl new file mode 100644 index 000000000..5c04346b7 --- /dev/null +++ b/apps/emqx/include/emqx_cluster_rpc.hrl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQ_X_CLUSTER_RPC_HRL). +-define(EMQ_X_CLUSTER_RPC_HRL, true). + +-define(CLUSTER_MFA, cluster_rpc_mfa). +-define(CLUSTER_COMMIT, cluster_rpc_commit). + +-record(cluster_rpc_mfa, { + tnx_id :: pos_integer(), + mfa :: mfa(), + created_at :: calendar:datetime(), + initiator :: node() +}). + +-record(cluster_rpc_commit, { + node :: node(), + tnx_id :: pos_integer() +}). + +-endif. diff --git a/apps/emqx/src/emqx_cluster_rpc.erl b/apps/emqx/src/emqx_cluster_rpc.erl new file mode 100644 index 000000000..301603c63 --- /dev/null +++ b/apps/emqx/src/emqx_cluster_rpc.erl @@ -0,0 +1,321 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_cluster_rpc). +-behaviour(gen_statem). + +%% API +-export([start_link/0, mnesia/1]). +-export([multicall/3, multicall/4, query/1, reset/0, status/0]). + +-export([init/1, format_status/2, handle_event/4, terminate/3, + code_change/4, callback_mode/0]). + +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-export([start_link/2]). +-endif. + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-include("emqx.hrl"). +-include("logger.hrl"). +-include("emqx_cluster_rpc.hrl"). + +-rlog_shard({?COMMON_SHARD, ?CLUSTER_MFA}). +-rlog_shard({?COMMON_SHARD, ?CLUSTER_COMMIT}). + +-define(CATCH_UP, catch_up). +-define(REALTIME, realtime). +-define(CATCH_UP_AFTER(_Sec_), {state_timeout, timer:seconds(_Sec_), catch_up_delay}). + +%%%=================================================================== +%%% API +%%%=================================================================== +mnesia(boot) -> + ok = ekka_mnesia:create_table(?CLUSTER_MFA, [ + {type, ordered_set}, + {disc_copies, [node()]}, + {local_content, true}, + {record_name, cluster_rpc_mfa}, + {attributes, record_info(fields, cluster_rpc_mfa)}]), + ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [ + {type, set}, + {disc_copies, [node()]}, + {local_content, true}, + {record_name, cluster_rpc_commit}, + {attributes, record_info(fields, cluster_rpc_commit)}]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(cluster_rpc_mfa, disc_copies), + ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). + +start_link() -> + start_link(node(), ?MODULE). +start_link(Node, Name) -> + gen_statem:start_link({local, Name}, ?MODULE, [Node], []). + +multicall(M, F, A) -> + multicall(M, F, A, timer:minutes(2)). + +-spec multicall(Module, Function, Args, Timeout) -> {ok, TnxId} |{error, Reason} when + Module :: module(), + Function :: atom(), + Args :: [term()], + TnxId :: pos_integer(), + Timeout :: timeout(), + Reason :: term(). +multicall(M, F, A, Timeout) -> + MFA = {initiate, {M, F, A}}, + case ekka_rlog:role() of + core -> gen_statem:call(?MODULE, MFA, Timeout); + replicant -> + case ekka_rlog_status:upstream_node(?COMMON_SHARD) of + {ok, Node} -> gen_statem:call({?MODULE, Node}, MFA, Timeout); + disconnected -> {error, disconnected} + end + end. + +-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. +query(TnxId) -> + Fun = fun() -> + case mnesia:read(?CLUSTER_MFA, TnxId) of + [] -> mnesia:abort(not_found); + [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> + #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} + end + end, + transaction(Fun). + +-spec reset() -> reset. +reset() -> gen_statem:call(?MODULE, reset). + +-spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}. +status() -> + Fun = fun() -> + mnesia:foldl(fun(Rec, Acc) -> + #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, + case mnesia:read(?CLUSTER_MFA, TnxId) of + [MFARec] -> + #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec, + [#{ + node => Node, + tnx_id => TnxId, + initiator => InitNode, + mfa => MFA, + created_at => CreatedAt + } | Acc]; + [] -> Acc + end end, [], ?CLUSTER_COMMIT) + end, + transaction(Fun). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +init([Node]) -> + {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), + {ok, ?CATCH_UP, Node, ?CATCH_UP_AFTER(0)}. + +callback_mode() -> + handle_event_function. + +%% @private +format_status(Opt, [_PDict, StateName, Node]) -> + #{state => StateName, node => Node, opt => Opt}. + +%% @private +handle_event(state_timeout, catch_up_delay, _State, Node) -> + catch_up(Node); + +handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{} = MFARec, _AId}}, ?REALTIME, Node) -> + handle_mfa_write_event(MFARec, Node); +handle_event(info, {mnesia_table_event, {write, #cluster_rpc_mfa{}, _ActivityId}}, ?CATCH_UP, _Node) -> + {keep_state_and_data, [postpone, ?CATCH_UP_AFTER(0)]}; +%% we should catch up as soon as possible when we reset all. +handle_event(info, {mnesia_table_event, {delete,{schema, ?CLUSTER_MFA}, _Tid}}, _, _Node) -> + {keep_state_and_data, [?CATCH_UP_AFTER(0)]}; + +handle_event({call, From}, reset, _State, _Node) -> + _ = ekka_mnesia:clear_table(?CLUSTER_COMMIT), + _ = ekka_mnesia:clear_table(?CLUSTER_MFA), + {keep_state_and_data, [{reply, From, ok}, ?CATCH_UP_AFTER(0)]}; + +handle_event({call, From}, {initiate, MFA}, ?REALTIME, Node) -> + case transaction(fun() -> init_mfa(Node, MFA) end) of + {atomic, {ok, TnxId}} -> + {keep_state, Node, [{reply, From, {ok, TnxId}}]}; + {aborted, Reason} -> + {keep_state, Node, [{reply, From, {error, Reason}}]} + end; +handle_event({call, From}, {initiate, _MFA}, ?CATCH_UP, Node) -> + case catch_up(Node) of + {next_state, ?REALTIME, Node} -> + {next_state, ?REALTIME, Node, [{postpone, true}]}; + _ -> + Reason = "There are still transactions that have not been executed.", + {keep_state, Node, [{reply, From, {error, Reason}}, ?CATCH_UP_AFTER(1)]} + end; + +handle_event(_EventType, _EventContent, ?CATCH_UP, _Node) -> + {keep_state_and_data, [?CATCH_UP_AFTER(10)]}; +handle_event(_EventType, _EventContent, _StateName, _Node) -> + keep_state_and_data. + +terminate(_Reason, _StateName, _Node) -> + ok. + +code_change(_OldVsn, StateName, Node, _Extra) -> + {ok, StateName, Node}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +catch_up(Node) -> + case get_next_mfa(Node) of + {atomic, caught_up} -> {next_state, ?REALTIME, Node}; + {atomic, {still_lagging, NextId, MFA}} -> + case apply_mfa(NextId, MFA) of + ok -> + case transaction(fun() -> commit(Node, NextId) end) of + {atomic, ok} -> catch_up(Node); + _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + end; + _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + end; + {aborted, _Reason} -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + end. + +get_next_mfa(Node) -> + Fun = + fun() -> + NextId = + case mnesia:wread({?CLUSTER_COMMIT, Node}) of + [] -> + LatestId = get_latest_id(), + TnxId = max(LatestId - 1, 0), + commit(Node, TnxId), + ?LOG(notice, "New node(~p) first catch up and start commit at ~p", [Node, TnxId]), + TnxId; + [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 + end, + case mnesia:read(?CLUSTER_MFA, NextId) of + [] -> caught_up; + [#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA} + end + end, + transaction(Fun). + +do_catch_up(ToTnxId, Node) -> + case mnesia:wread({?CLUSTER_COMMIT, Node}) of + [] -> + commit(Node, ToTnxId), + caught_up; + [#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId =:= LastAppliedId -> + caught_up; + [#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId > LastAppliedId -> + CurTnxId = LastAppliedId + 1, + [#cluster_rpc_mfa{mfa = MFA}] = mnesia:read(?CLUSTER_MFA, CurTnxId), + case apply_mfa(CurTnxId, MFA) of + ok -> ok = commit(Node, CurTnxId); + {error, Reason} -> mnesia:abort(Reason); + Other -> mnesia:abort(Other) + end; + [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> + Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", + [Node, LastAppliedId, ToTnxId])), + ?LOG(error, Reason), + {error, Reason} + end. + +commit(Node, TnxId) -> + ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write). + +get_latest_id() -> + case mnesia:last(?CLUSTER_MFA) of + '$end_of_table' -> 0; + Id -> Id + end. + +handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Node) -> + {atomic, LastAppliedId} = transaction(fun() -> get_last_applied_id(Node, EventId - 1) end), + if LastAppliedId + 1 =:= EventId -> + case apply_mfa(EventId, MFA) of + ok -> + case transaction(fun() -> commit(Node, EventId) end) of + {atomic, ok} -> + {next_state, ?REALTIME, Node}; + _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + end; + _ -> {next_state, ?CATCH_UP, Node, [?CATCH_UP_AFTER(1)]} + end; + LastAppliedId >= EventId -> %% It's means the initiator receive self event or other receive stale event. + keep_state_and_data; + true -> + ?LOG(error, "LastAppliedId+1 + case mnesia:wread({?CLUSTER_COMMIT, Node}) of + [#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId; + [] -> + commit(Node, Default), + Default + end. + +init_mfa(Node, MFA) -> + mnesia:write_lock_table(?CLUSTER_MFA), + LatestId = get_latest_id(), + ok = do_catch_up_in_one_trans(LatestId, Node), + TnxId = LatestId + 1, + MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA, initiator = Node, created_at = erlang:localtime()}, + ok = mnesia:write(?CLUSTER_MFA, MFARec, write), + ok = commit(Node, TnxId), + case apply_mfa(TnxId, MFA) of + ok -> {ok, TnxId}; + {error, Reason} -> mnesia:abort(Reason); + Other -> mnesia:abort(Other) + end. + +do_catch_up_in_one_trans(LatestId, Node) -> + case do_catch_up(LatestId, Node) of + caught_up -> ok; + ok -> do_catch_up_in_one_trans(LatestId, Node); + {error, Reason} -> mnesia:abort(Reason) + end. + +transaction(Fun) -> + ekka_mnesia:transaction(?COMMON_SHARD, Fun). + +apply_mfa(TnxId, {M, F, A} = MFA) -> + try + Res = erlang:apply(M, F, A), + case Res =:= ok of + true -> + ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => ok}); + false -> + ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}) + end, + Res + catch + C : E -> + ?SLOG(critical, #{msg => "crash to apply MFA", tnx_id => TnxId, mfa => MFA, exception => C, reason => E}), + {error, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))} + end. diff --git a/apps/emqx/src/emqx_cluster_rpc_handler.erl b/apps/emqx/src/emqx_cluster_rpc_handler.erl new file mode 100644 index 000000000..a51ef3d74 --- /dev/null +++ b/apps/emqx/src/emqx_cluster_rpc_handler.erl @@ -0,0 +1,94 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_cluster_rpc_handler). + +-behaviour(gen_server). + +-include("emqx.hrl"). +-include("logger.hrl"). +-include("emqx_cluster_rpc.hrl"). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-define(MFA_HISTORY_LEN, 100). + +start_link() -> + gen_server:start_link(?MODULE, [], []). + +%%%=================================================================== +%%% Spawning and gen_server implementation +%%%=================================================================== + +init([]) -> + _ = emqx_misc:rand_seed(), + {ok, ensure_timer(#{timer => undefined})}. + +handle_call(Req, _From, State) -> + ?LOG(error, "unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?LOG(error, "unexpected msg: ~p", [Msg]), + {noreply, State}. + +handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef}) -> + case ekka_mnesia:transaction(?COMMON_SHARD, fun del_stale_mfa/0, []) of + {atomic, ok} -> ok; + Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error]) + end, + {noreply, ensure_timer(State), hibernate}; + +handle_info(Info, State) -> + ?LOG(error, "unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #{timer := TRef}) -> + emqx_misc:cancel_timer(TRef). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +-ifdef(TEST). +ensure_timer(State) -> + State#{timer := emqx_misc:start_timer(timer:seconds(1), del_stale_mfa)}. +-else. +ensure_timer(State) -> + Ms = timer:minutes(5) + rand:uniform(5000), + State#{timer := emqx_misc:start_timer(Ms, del_stale_mfa)}. +-endif. + + +%% @doc Keep the latest completed 100 records for querying and troubleshooting. +del_stale_mfa() -> + DoneId = + mnesia:foldl(fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end, + infinity, ?CLUSTER_COMMIT), + delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, ?MFA_HISTORY_LEN). + +delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok; +delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> + delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count); +delete_stale_mfa(CurrId, DoneId, Count) when Count > 0 -> + delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count - 1); +delete_stale_mfa(CurrId, DoneId, Count) when Count =< 0 -> + mnesia:delete(?CLUSTER_MFA, CurrId, write), + delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count - 1). diff --git a/apps/emqx/src/emqx_kernel_sup.erl b/apps/emqx/src/emqx_kernel_sup.erl index defe96182..39a8d4fba 100644 --- a/apps/emqx/src/emqx_kernel_sup.erl +++ b/apps/emqx/src/emqx_kernel_sup.erl @@ -29,6 +29,8 @@ init([]) -> {ok, {{one_for_one, 10, 100}, %% always start emqx_config_handler first to load the emqx.conf to emqx_config [ child_spec(emqx_config_handler, worker) + , child_spec(emqx_cluster_rpc, worker) + , child_spec(emqx_cluster_rpc_handler, worker) , child_spec(emqx_pool_sup, supervisor) , child_spec(emqx_hooks, worker) , child_spec(emqx_stats, worker) diff --git a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl new file mode 100644 index 000000000..6cdb34c6c --- /dev/null +++ b/apps/emqx/test/emqx_cluster_rpc_SUITE.erl @@ -0,0 +1,247 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_rpc_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-define(NODE1, emqx_cluster_rpc). +-define(NODE2, emqx_cluster_rpc2). +-define(NODE3, emqx_cluster_rpc3). + +all() -> [ + t_base_test, + t_commit_fail_test, + t_commit_crash_test, + t_commit_ok_but_apply_fail_on_other_node, + t_commit_ok_apply_fail_on_other_node_then_recover, + t_del_stale_mfa +]. +suite() -> [{timetrap, {minutes, 3}}]. +groups() -> []. + +init_per_suite(Config) -> + application:load(emqx), + ok = ekka:start(), + emqx_cluster_rpc:mnesia(copy), + %%dbg:tracer(), + %%dbg:p(all, c), + %%dbg:tpl(emqx_cluster_rpc, cx), + %%dbg:tpl(gen_statem, loop_receive, cx), + %%dbg:tpl(gen_statem, loop_state_callback, cx), + %%dbg:tpl(gen_statem, loop_callback_mode_result, cx), + Config. + +end_per_suite(_Config) -> + ekka:stop(), + ekka_mnesia:ensure_stopped(), + ekka_mnesia:delete_schema(), + %%dbg:stop(), + ok. + +init_per_testcase(_TestCase, Config) -> + start(), + Config. + +end_per_testcase(_Config) -> + stop(), + ok. + +t_base_test(_Config) -> + ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), + Pid = self(), + MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, + {ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A), + {atomic, Query} = emqx_cluster_rpc:query(TnxId), + ?assertEqual(MFA, maps:get(mfa, Query)), + ?assertEqual(node(), maps:get(initiator, Query)), + ?assert(maps:is_key(created_at, Query)), + ?assertEqual(ok, receive_msg(3, test)), + SysStatus = lists:last(lists:last(element(4,sys:get_status(?NODE1)))), + ?assertEqual(#{node => node(), opt => normal, state => realtime}, SysStatus), + sleep(400), + {atomic, Status} = emqx_cluster_rpc:status(), + ?assertEqual(3, length(Status)), + ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 1 end, Status)), + ok. + +t_commit_fail_test(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE2)]}, + {error, "MFA return not ok"} = emqx_cluster_rpc:multicall(M, F, A), + ?assertEqual({atomic, []}, emqx_cluster_rpc:status()), + ok. + +t_commit_crash_test(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + {M, F, A} = {?MODULE, no_exist_function, []}, + Error = emqx_cluster_rpc:multicall(M, F, A), + ?assertEqual({error, "TnxId(1) apply MFA({emqx_cluster_rpc_SUITE,no_exist_function,[]}) crash"}, Error), + ?assertEqual({atomic, []}, emqx_cluster_rpc:status()), + ok. + +t_commit_ok_but_apply_fail_on_other_node(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, + {ok, _} = emqx_cluster_rpc:multicall(M, F, A), + {atomic, [Status]} = emqx_cluster_rpc:status(), + ?assertEqual(MFA, maps:get(mfa, Status)), + ?assertEqual(node(), maps:get(node, Status)), + ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), + ?assertEqual(catch_up, element(1, sys:get_state(?NODE2))), + ?assertEqual(catch_up, element(1, sys:get_state(?NODE3))), + erlang:send(?NODE2, test), + Res = gen_statem:call(?NODE2, {initiate, {M, F, A}}), + ?assertEqual({error, "There are still transactions that have not been executed."}, Res), + ok. + +t_catch_up_status_handle_next_commit(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, + {ok, _} = emqx_cluster_rpc:multicall(M, F, A), + ?assertEqual(catch_up, element(1, sys:get_state(?NODE2))), + {ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}), + ok. + +t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + Now = erlang:system_time(second), + {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]}, + {ok, _} = emqx_cluster_rpc:multicall(M, F, A), + {ok, _} = emqx_cluster_rpc:multicall(io, format, ["test"]), + {atomic, [Status]} = emqx_cluster_rpc:status(), + ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), + ?assertEqual(node(), maps:get(node, Status)), + ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), + ?assertEqual(catch_up, element(1, sys:get_state(?NODE2))), + ?assertEqual(catch_up, element(1, sys:get_state(?NODE3))), + sleep(4000), + {atomic, [Status1]} = emqx_cluster_rpc:status(), + ?assertEqual(Status, Status1), + sleep(1600), + {atomic, NewStatus} = emqx_cluster_rpc:status(), + ?assertEqual(realtime, element(1, sys:get_state(?NODE1))), + ?assertEqual(realtime, element(1, sys:get_state(?NODE2))), + ?assertEqual(realtime, element(1, sys:get_state(?NODE3))), + ?assertEqual(3, length(NewStatus)), + Pid = self(), + MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, + {ok, TnxId} = emqx_cluster_rpc:multicall(M1, F1, A1), + {atomic, Query} = emqx_cluster_rpc:query(TnxId), + ?assertEqual(MFAEcho, maps:get(mfa, Query)), + ?assertEqual(node(), maps:get(initiator, Query)), + ?assert(maps:is_key(created_at, Query)), + ?assertEqual(ok, receive_msg(3, test)), + ok. + +t_del_stale_mfa(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + MFA = {M, F, A} = {io, format, ["test"]}, + Keys = lists:seq(1, 50), + Keys2 = lists:seq(51, 150), + Ids = + [begin + {ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A), + TnxId end || _ <- Keys], + ?assertEqual(Keys, Ids), + Ids2 = + [begin + {ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A), + TnxId end || _ <- Keys2], + ?assertEqual(Keys2, Ids2), + sleep(1200), + [begin + ?assertEqual({aborted, not_found}, emqx_cluster_rpc:query(I)) + end || I <- lists:seq(1, 50)], + [begin + {atomic, Map} = emqx_cluster_rpc:query(I), + ?assertEqual(MFA, maps:get(mfa, Map)), + ?assertEqual(node(), maps:get(initiator, Map)), + ?assert(maps:is_key(created_at, Map)) + end || I <- lists:seq(51, 150)], + ok. + +start() -> + {ok, Pid1} = emqx_cluster_rpc:start_link(), + {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2), + {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3), + {ok, Pid4} = emqx_cluster_rpc_handler:start_link(), + {ok, [Pid1, Pid2, Pid3, Pid4]}. + +stop() -> + [begin + case erlang:whereis(N) of + undefined -> ok; + P -> + erlang:unlink(P), + erlang:exit(P, kill) + end end || N <- [?NODE1, ?NODE2, ?NODE3]]. + +receive_msg(0, _Msg) -> ok; +receive_msg(Count, Msg) when Count > 0 -> + receive Msg -> + receive_msg(Count - 1, Msg) + after 800 -> + timeout + end. + +echo(Pid, Msg) -> + erlang:send(Pid, Msg), + ok. + +failed_on_node(Pid) -> + case Pid =:= self() of + true -> ok; + false -> "MFA return not ok" + end. + +failed_on_node_by_odd(Pid) -> + case Pid =:= self() of + true -> ok; + false -> + catch ets:new(test, [named_table, set, public]), + Num = ets:update_counter(test, self(), {2, 1}, {self(), 1}), + case Num rem 2 =:= 0 of + false -> "MFA return not ok"; + true -> ok + end + end. + +failed_on_other_recover_after_5_second(Pid, CreatedAt) -> + Now = erlang:system_time(second), + case Pid =:= self() of + true -> ok; + false -> + case Now < CreatedAt + 5 of + true -> "MFA return not ok"; + false -> ok + end + end. + +sleep(Second) -> + receive _ -> ok + after Second -> timeout + end.