emqx/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

428 lines
13 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_cluster_rpc_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(NODE1, emqx_cluster_rpc).
-define(NODE2, emqx_cluster_rpc2).
-define(NODE3, emqx_cluster_rpc3).
all() ->
[
t_base_test,
t_commit_fail_test,
t_commit_crash_test,
t_commit_ok_but_apply_fail_on_other_node,
t_commit_ok_apply_fail_on_other_node_then_recover,
t_del_stale_mfa,
t_skip_failed_commit,
t_fast_forward_commit,
t_commit_concurrency
].
suite() -> [{timetrap, {minutes, 5}}].
groups() -> [].
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
emqx,
{emqx_conf,
"node.cluster_call {"
"\n retry_interval = 1s"
"\n max_history = 100"
"\n cleanup_interval = 500ms"
"\n}"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
meck:new(mria, [non_strict, passthrough, no_link]),
meck:expect(mria, running_nodes, 0, [?NODE1, {node(), ?NODE2}, {node(), ?NODE3}]),
[{suite_apps, Apps} | Config].
end_per_suite(Config) ->
_ = meck:unload(),
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(_TestCase, Config) ->
stop(),
start(),
Config.
end_per_testcase(_Config) ->
stop(),
ok.
t_base_test(_Config) ->
?assertEqual(emqx_cluster_rpc:status(), {atomic, []}),
Pid = self(),
MFA = {M, F, A} = {?MODULE, echo, [Pid, test]},
{ok, TnxId, ok} = multicall(M, F, A),
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
?assertEqual(MFA, maps:get(mfa, Query)),
?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)),
?assertEqual({ok, 2, ok}, multicall(M, F, A)),
{atomic, Status} = emqx_cluster_rpc:status(),
case length(Status) =:= 3 of
true ->
?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status));
false ->
%% wait for mnesia to write in.
ct:sleep(42),
{atomic, Status1} = emqx_cluster_rpc:status(),
ct:pal("status: ~p", Status),
ct:pal("status1: ~p", Status1),
?assertEqual(3, length(Status1)),
?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status))
end,
ok.
t_commit_fail_test(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE2)]},
{init_failure, "MFA return not ok"} = multicall(M, F, A),
?assertEqual({atomic, []}, emqx_cluster_rpc:status()),
ok.
t_commit_crash_test(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, no_exist_function, []},
{init_failure, {error, Meta}} = multicall(M, F, A),
?assertEqual(undef, maps:get(reason, Meta)),
?assertEqual(error, maps:get(exception, Meta)),
?assertEqual(true, maps:is_key(stacktrace, Meta)),
?assertEqual({atomic, []}, emqx_cluster_rpc:status()),
ok.
t_commit_ok_but_apply_fail_on_other_node(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
Pid = self(),
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
?assertEqual(ok, receive_msg(3, test)),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, _, ok} = multicall(M, F, A, 1, 1000),
{atomic, AllStatus} = emqx_cluster_rpc:status(),
Node = node(),
?assertEqual(
[
{1, {Node, emqx_cluster_rpc2}},
{1, {Node, emqx_cluster_rpc3}},
{2, Node}
],
lists:sort([{T, N} || #{tnx_id := T, node := N} <- AllStatus])
),
erlang:send(?NODE2, test),
Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A),
Res1 = gen_server:call(?NODE2, Call),
Res2 = gen_server:call(?NODE3, Call),
%% Node2 is retry on tnx_id 1, and should not run Next MFA.
?assertEqual(
{init_failure, #{
msg => stale_view_of_cluster_state,
retry_times => 2,
cluster_tnx_id => 2,
node_tnx_id => 1
}},
Res1
),
?assertEqual(Res1, Res2),
ok.
t_commit_concurrency(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
Pid = self(),
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
?assertEqual(ok, receive_msg(3, test)),
%% call concurrently without stale tnx_id error
Workers = lists:seq(1, 256),
lists:foreach(
fun(Seq) ->
{EchoM, EchoF, EchoA} = {?MODULE, echo_delay, [Pid, Seq]},
Call = emqx_cluster_rpc:make_initiate_call_req(EchoM, EchoF, EchoA),
spawn_link(fun() ->
?assertMatch({ok, _, ok}, gen_server:call(?NODE1, Call, infinity))
end),
spawn_link(fun() ->
?assertMatch({ok, _, ok}, gen_server:call(?NODE2, Call, infinity))
end),
spawn_link(fun() ->
?assertMatch({ok, _, ok}, gen_server:call(?NODE3, Call, infinity))
end)
end,
Workers
),
%% receive seq msg in order
List = lists:sort(receive_seq_msg([])),
?assertEqual(256 * 3 * 3, length(List), List),
{atomic, Status} = emqx_cluster_rpc:status(),
lists:map(
fun(#{tnx_id := TnxId} = S) ->
?assertEqual(256 * 3 + 1, TnxId, S)
end,
Status
),
AllMsgIndex = lists:flatten(lists:duplicate(9, Workers)),
Result =
lists:foldl(
fun(Index, Acc) ->
?assertEqual(true, lists:keymember(Index, 1, Acc), {Index, Acc}),
lists:keydelete(Index, 1, Acc)
end,
List,
AllMsgIndex
),
?assertEqual([], Result),
receive
Unknown -> throw({receive_unknown_msg, Unknown})
after 1000 -> ok
end,
ok.
receive_seq_msg(Acc) ->
receive
{msg, Seq, Time, Pid} ->
receive_seq_msg([{Seq, Time, Pid} | Acc])
after 3000 ->
Acc
end.
t_catch_up_status_handle_next_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
{M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]},
{ok, 1, ok} = multicall(M, F, A, 1, 1000),
Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A),
{ok, 2} = gen_server:call(?NODE2, Call),
ok.
t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
ets:new(test, [named_table, public]),
ets:insert(test, {other_mfa_result, failed}),
ct:pal("111:~p~n", [ets:tab2list(cluster_rpc_commit)]),
{M, F, A} = {?MODULE, failed_on_other_recover_after_retry, [erlang:whereis(?NODE1)]},
{ok, 1, ok} = multicall(M, F, A, 1, 1000),
ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]),
ct:pal("333:~p~n", [emqx_cluster_rpc:status()]),
{atomic, [_Status | L]} = emqx_cluster_rpc:status(),
?assertEqual([], L),
ets:insert(test, {other_mfa_result, ok}),
{ok, 2, ok} = multicall(io, format, ["test"], 1, 1000),
ct:sleep(1000),
{atomic, NewStatus} = emqx_cluster_rpc:status(),
?assertEqual(3, length(NewStatus)),
Pid = self(),
MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]},
{ok, TnxId, ok} = multicall(M1, F1, A1),
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
?assertEqual(MFAEcho, maps:get(mfa, Query)),
?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)),
ok.
t_del_stale_mfa(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
MFA = {M, F, A} = {io, format, ["test"]},
Keys = lists:seq(1, 50),
Keys2 = lists:seq(51, 150),
Ids =
[
begin
{ok, TnxId, ok} = multicall(M, F, A),
TnxId
end
|| _ <- Keys
],
?assertEqual(Keys, Ids),
Ids2 =
[
begin
{ok, TnxId, ok} = multicall(M, F, A),
TnxId
end
|| _ <- Keys2
],
?assertEqual(Keys2, Ids2),
ct:sleep(1200),
[
begin
?assertEqual({aborted, not_found}, emqx_cluster_rpc:query(I))
end
|| I <- lists:seq(1, 50)
],
[
begin
{atomic, Map} = emqx_cluster_rpc:query(I),
?assertEqual(MFA, maps:get(mfa, Map)),
?assertEqual(node(), maps:get(initiator, Map)),
?assert(maps:is_key(created_at, Map))
end
|| I <- lists:seq(51, 150)
],
ok.
t_skip_failed_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
?assertEqual(
[{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
tnx_ids(List1)
),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, 2, ok} = multicall(M, F, A, 1, 1000),
2 = gen_server:call(?NODE2, skip_failed_commit, 5000),
{atomic, List2} = emqx_cluster_rpc:status(),
?assertEqual(
[{Node, 2}, {{Node, ?NODE2}, 2}, {{Node, ?NODE3}, 1}],
tnx_ids(List2)
),
ok.
t_fast_forward_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
?assertEqual(
[{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
tnx_ids(List1)
),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, 2, ok} = multicall(M, F, A, 1, 1000),
{ok, 3, ok} = multicall(M, F, A, 1, 1000),
{ok, 4, ok} = multicall(M, F, A, 1, 1000),
{ok, 5, ok} = multicall(M, F, A, 1, 1000),
{peers_lagging, 6, ok, _} = multicall(M, F, A, 2, 1000),
3 = gen_server:call(?NODE2, {fast_forward_to_commit, 3}, 5000),
4 = gen_server:call(?NODE2, {fast_forward_to_commit, 4}, 5000),
6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000),
2 = gen_server:call(?NODE3, {fast_forward_to_commit, 2}, 5000),
{atomic, List2} = emqx_cluster_rpc:status(),
?assertEqual(
[{Node, 6}, {{Node, ?NODE2}, 6}, {{Node, ?NODE3}, 2}],
tnx_ids(List2)
),
ok.
t_cleaner_unexpected_msg(_Config) ->
Cleaner = emqx_cluster_cleaner,
OldPid = erlang:whereis(Cleaner),
ok = gen_server:cast(Cleaner, unexpected_cast_msg),
ignore = gen_server:call(Cleaner, unexpected_cast_msg),
erlang:send(Cleaner, unexpected_info_msg),
NewPid = erlang:whereis(Cleaner),
?assertEqual(OldPid, NewPid),
ok.
tnx_ids(Status) ->
lists:sort(
lists:map(
fun(#{tnx_id := TnxId, node := Node}) ->
{Node, TnxId}
end,
Status
)
).
start() ->
{ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
{ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
ok = emqx_cluster_rpc:reset(),
ok.
stop() ->
[
begin
case erlang:whereis(N) of
undefined ->
ok;
P ->
erlang:unlink(P),
erlang:exit(P, kill)
end
end
|| N <- [?NODE2, ?NODE3]
].
receive_msg(0, _Msg) ->
ok;
receive_msg(Count, Msg) when Count > 0 ->
receive
Msg ->
receive_msg(Count - 1, Msg)
after 1000 ->
timeout
end.
echo(Pid, Msg) ->
erlang:send(Pid, Msg),
ok.
echo_delay(Pid, Msg) ->
timer:sleep(rand:uniform(150)),
erlang:send(Pid, {msg, Msg, erlang:system_time(), self()}),
ok.
failed_on_node(Pid) ->
case Pid =:= self() of
true -> ok;
false -> "MFA return not ok"
end.
failed_on_node_by_odd(Pid) ->
case Pid =:= self() of
true ->
ok;
false ->
catch ets:new(test, [named_table, set, public]),
Num = ets:update_counter(test, self(), {2, 1}, {self(), 1}),
case Num rem 2 =:= 0 of
false -> "MFA return not ok";
true -> ok
end
end.
failed_on_other_recover_after_retry(Pid) ->
case Pid =:= self() of
true ->
ok;
false ->
[{_, Res}] = ets:lookup(test, other_mfa_result),
Res
end.
multicall(M, F, A, N, T) ->
emqx_cluster_rpc:do_multicall(M, F, A, N, T).
multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)).