428 lines
13 KiB
Erlang
428 lines
13 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_cluster_rpc_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
-define(NODE1, emqx_cluster_rpc).
|
|
-define(NODE2, emqx_cluster_rpc2).
|
|
-define(NODE3, emqx_cluster_rpc3).
|
|
|
|
all() ->
|
|
[
|
|
t_base_test,
|
|
t_commit_fail_test,
|
|
t_commit_crash_test,
|
|
t_commit_ok_but_apply_fail_on_other_node,
|
|
t_commit_ok_apply_fail_on_other_node_then_recover,
|
|
t_del_stale_mfa,
|
|
t_skip_failed_commit,
|
|
t_fast_forward_commit,
|
|
t_commit_concurrency
|
|
].
|
|
suite() -> [{timetrap, {minutes, 5}}].
|
|
groups() -> [].
|
|
|
|
init_per_suite(Config) ->
|
|
Apps = emqx_cth_suite:start(
|
|
[
|
|
emqx,
|
|
{emqx_conf,
|
|
"node.cluster_call {"
|
|
"\n retry_interval = 1s"
|
|
"\n max_history = 100"
|
|
"\n cleanup_interval = 500ms"
|
|
"\n}"}
|
|
],
|
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
|
),
|
|
meck:new(mria, [non_strict, passthrough, no_link]),
|
|
meck:expect(mria, running_nodes, 0, [?NODE1, {node(), ?NODE2}, {node(), ?NODE3}]),
|
|
[{suite_apps, Apps} | Config].
|
|
|
|
end_per_suite(Config) ->
|
|
_ = meck:unload(),
|
|
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
|
|
|
|
init_per_testcase(_TestCase, Config) ->
|
|
stop(),
|
|
start(),
|
|
Config.
|
|
|
|
end_per_testcase(_Config) ->
|
|
stop(),
|
|
ok.
|
|
|
|
t_base_test(_Config) ->
|
|
?assertEqual(emqx_cluster_rpc:status(), {atomic, []}),
|
|
Pid = self(),
|
|
MFA = {M, F, A} = {?MODULE, echo, [Pid, test]},
|
|
{ok, TnxId, ok} = multicall(M, F, A),
|
|
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
|
|
?assertEqual(MFA, maps:get(mfa, Query)),
|
|
?assertEqual(node(), maps:get(initiator, Query)),
|
|
?assert(maps:is_key(created_at, Query)),
|
|
?assertEqual(ok, receive_msg(3, test)),
|
|
?assertEqual({ok, 2, ok}, multicall(M, F, A)),
|
|
{atomic, Status} = emqx_cluster_rpc:status(),
|
|
case length(Status) =:= 3 of
|
|
true ->
|
|
?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status));
|
|
false ->
|
|
%% wait for mnesia to write in.
|
|
ct:sleep(42),
|
|
{atomic, Status1} = emqx_cluster_rpc:status(),
|
|
ct:pal("status: ~p", Status),
|
|
ct:pal("status1: ~p", Status1),
|
|
?assertEqual(3, length(Status1)),
|
|
?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status))
|
|
end,
|
|
ok.
|
|
|
|
t_commit_fail_test(_Config) ->
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE2)]},
|
|
{init_failure, "MFA return not ok"} = multicall(M, F, A),
|
|
?assertEqual({atomic, []}, emqx_cluster_rpc:status()),
|
|
ok.
|
|
|
|
t_commit_crash_test(_Config) ->
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
{M, F, A} = {?MODULE, no_exist_function, []},
|
|
{init_failure, {error, Meta}} = multicall(M, F, A),
|
|
?assertEqual(undef, maps:get(reason, Meta)),
|
|
?assertEqual(error, maps:get(exception, Meta)),
|
|
?assertEqual(true, maps:is_key(stacktrace, Meta)),
|
|
?assertEqual({atomic, []}, emqx_cluster_rpc:status()),
|
|
ok.
|
|
|
|
t_commit_ok_but_apply_fail_on_other_node(_Config) ->
|
|
emqx_cluster_rpc:reset(),
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
Pid = self(),
|
|
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
|
|
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
|
|
?assertEqual(ok, receive_msg(3, test)),
|
|
|
|
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
|
|
{ok, _, ok} = multicall(M, F, A, 1, 1000),
|
|
{atomic, AllStatus} = emqx_cluster_rpc:status(),
|
|
Node = node(),
|
|
?assertEqual(
|
|
[
|
|
{1, {Node, emqx_cluster_rpc2}},
|
|
{1, {Node, emqx_cluster_rpc3}},
|
|
{2, Node}
|
|
],
|
|
lists:sort([{T, N} || #{tnx_id := T, node := N} <- AllStatus])
|
|
),
|
|
erlang:send(?NODE2, test),
|
|
Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A),
|
|
Res1 = gen_server:call(?NODE2, Call),
|
|
Res2 = gen_server:call(?NODE3, Call),
|
|
%% Node2 is retry on tnx_id 1, and should not run Next MFA.
|
|
?assertEqual(
|
|
{init_failure, #{
|
|
msg => stale_view_of_cluster_state,
|
|
retry_times => 2,
|
|
cluster_tnx_id => 2,
|
|
node_tnx_id => 1
|
|
}},
|
|
Res1
|
|
),
|
|
?assertEqual(Res1, Res2),
|
|
ok.
|
|
|
|
t_commit_concurrency(_Config) ->
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
Pid = self(),
|
|
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
|
|
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
|
|
?assertEqual(ok, receive_msg(3, test)),
|
|
|
|
%% call concurrently without stale tnx_id error
|
|
Workers = lists:seq(1, 256),
|
|
lists:foreach(
|
|
fun(Seq) ->
|
|
{EchoM, EchoF, EchoA} = {?MODULE, echo_delay, [Pid, Seq]},
|
|
Call = emqx_cluster_rpc:make_initiate_call_req(EchoM, EchoF, EchoA),
|
|
spawn_link(fun() ->
|
|
?assertMatch({ok, _, ok}, gen_server:call(?NODE1, Call, infinity))
|
|
end),
|
|
spawn_link(fun() ->
|
|
?assertMatch({ok, _, ok}, gen_server:call(?NODE2, Call, infinity))
|
|
end),
|
|
spawn_link(fun() ->
|
|
?assertMatch({ok, _, ok}, gen_server:call(?NODE3, Call, infinity))
|
|
end)
|
|
end,
|
|
Workers
|
|
),
|
|
%% receive seq msg in order
|
|
List = lists:sort(receive_seq_msg([])),
|
|
?assertEqual(256 * 3 * 3, length(List), List),
|
|
{atomic, Status} = emqx_cluster_rpc:status(),
|
|
lists:map(
|
|
fun(#{tnx_id := TnxId} = S) ->
|
|
?assertEqual(256 * 3 + 1, TnxId, S)
|
|
end,
|
|
Status
|
|
),
|
|
AllMsgIndex = lists:flatten(lists:duplicate(9, Workers)),
|
|
Result =
|
|
lists:foldl(
|
|
fun(Index, Acc) ->
|
|
?assertEqual(true, lists:keymember(Index, 1, Acc), {Index, Acc}),
|
|
lists:keydelete(Index, 1, Acc)
|
|
end,
|
|
List,
|
|
AllMsgIndex
|
|
),
|
|
?assertEqual([], Result),
|
|
receive
|
|
Unknown -> throw({receive_unknown_msg, Unknown})
|
|
after 1000 -> ok
|
|
end,
|
|
ok.
|
|
|
|
receive_seq_msg(Acc) ->
|
|
receive
|
|
{msg, Seq, Time, Pid} ->
|
|
receive_seq_msg([{Seq, Time, Pid} | Acc])
|
|
after 3000 ->
|
|
Acc
|
|
end.
|
|
|
|
t_catch_up_status_handle_next_commit(_Config) ->
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
{M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]},
|
|
{ok, 1, ok} = multicall(M, F, A, 1, 1000),
|
|
Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A),
|
|
{ok, 2} = gen_server:call(?NODE2, Call),
|
|
ok.
|
|
|
|
t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
ets:new(test, [named_table, public]),
|
|
ets:insert(test, {other_mfa_result, failed}),
|
|
ct:pal("111:~p~n", [ets:tab2list(cluster_rpc_commit)]),
|
|
{M, F, A} = {?MODULE, failed_on_other_recover_after_retry, [erlang:whereis(?NODE1)]},
|
|
{ok, 1, ok} = multicall(M, F, A, 1, 1000),
|
|
ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]),
|
|
ct:pal("333:~p~n", [emqx_cluster_rpc:status()]),
|
|
{atomic, [_Status | L]} = emqx_cluster_rpc:status(),
|
|
?assertEqual([], L),
|
|
ets:insert(test, {other_mfa_result, ok}),
|
|
{ok, 2, ok} = multicall(io, format, ["test"], 1, 1000),
|
|
ct:sleep(1000),
|
|
{atomic, NewStatus} = emqx_cluster_rpc:status(),
|
|
?assertEqual(3, length(NewStatus)),
|
|
Pid = self(),
|
|
MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]},
|
|
{ok, TnxId, ok} = multicall(M1, F1, A1),
|
|
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
|
|
?assertEqual(MFAEcho, maps:get(mfa, Query)),
|
|
?assertEqual(node(), maps:get(initiator, Query)),
|
|
?assert(maps:is_key(created_at, Query)),
|
|
?assertEqual(ok, receive_msg(3, test)),
|
|
ok.
|
|
|
|
t_del_stale_mfa(_Config) ->
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
MFA = {M, F, A} = {io, format, ["test"]},
|
|
Keys = lists:seq(1, 50),
|
|
Keys2 = lists:seq(51, 150),
|
|
Ids =
|
|
[
|
|
begin
|
|
{ok, TnxId, ok} = multicall(M, F, A),
|
|
TnxId
|
|
end
|
|
|| _ <- Keys
|
|
],
|
|
?assertEqual(Keys, Ids),
|
|
Ids2 =
|
|
[
|
|
begin
|
|
{ok, TnxId, ok} = multicall(M, F, A),
|
|
TnxId
|
|
end
|
|
|| _ <- Keys2
|
|
],
|
|
?assertEqual(Keys2, Ids2),
|
|
ct:sleep(1200),
|
|
[
|
|
begin
|
|
?assertEqual({aborted, not_found}, emqx_cluster_rpc:query(I))
|
|
end
|
|
|| I <- lists:seq(1, 50)
|
|
],
|
|
[
|
|
begin
|
|
{atomic, Map} = emqx_cluster_rpc:query(I),
|
|
?assertEqual(MFA, maps:get(mfa, Map)),
|
|
?assertEqual(node(), maps:get(initiator, Map)),
|
|
?assert(maps:is_key(created_at, Map))
|
|
end
|
|
|| I <- lists:seq(51, 150)
|
|
],
|
|
ok.
|
|
|
|
t_skip_failed_commit(_Config) ->
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
|
|
ct:sleep(180),
|
|
{atomic, List1} = emqx_cluster_rpc:status(),
|
|
Node = node(),
|
|
?assertEqual(
|
|
[{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
|
|
tnx_ids(List1)
|
|
),
|
|
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
|
|
{ok, 2, ok} = multicall(M, F, A, 1, 1000),
|
|
2 = gen_server:call(?NODE2, skip_failed_commit, 5000),
|
|
{atomic, List2} = emqx_cluster_rpc:status(),
|
|
?assertEqual(
|
|
[{Node, 2}, {{Node, ?NODE2}, 2}, {{Node, ?NODE3}, 1}],
|
|
tnx_ids(List2)
|
|
),
|
|
ok.
|
|
|
|
t_fast_forward_commit(_Config) ->
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
|
|
ct:sleep(180),
|
|
{atomic, List1} = emqx_cluster_rpc:status(),
|
|
Node = node(),
|
|
?assertEqual(
|
|
[{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
|
|
tnx_ids(List1)
|
|
),
|
|
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
|
|
{ok, 2, ok} = multicall(M, F, A, 1, 1000),
|
|
{ok, 3, ok} = multicall(M, F, A, 1, 1000),
|
|
{ok, 4, ok} = multicall(M, F, A, 1, 1000),
|
|
{ok, 5, ok} = multicall(M, F, A, 1, 1000),
|
|
{peers_lagging, 6, ok, _} = multicall(M, F, A, 2, 1000),
|
|
3 = gen_server:call(?NODE2, {fast_forward_to_commit, 3}, 5000),
|
|
4 = gen_server:call(?NODE2, {fast_forward_to_commit, 4}, 5000),
|
|
6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000),
|
|
2 = gen_server:call(?NODE3, {fast_forward_to_commit, 2}, 5000),
|
|
{atomic, List2} = emqx_cluster_rpc:status(),
|
|
?assertEqual(
|
|
[{Node, 6}, {{Node, ?NODE2}, 6}, {{Node, ?NODE3}, 2}],
|
|
tnx_ids(List2)
|
|
),
|
|
ok.
|
|
|
|
t_cleaner_unexpected_msg(_Config) ->
|
|
Cleaner = emqx_cluster_cleaner,
|
|
OldPid = erlang:whereis(Cleaner),
|
|
ok = gen_server:cast(Cleaner, unexpected_cast_msg),
|
|
ignore = gen_server:call(Cleaner, unexpected_cast_msg),
|
|
erlang:send(Cleaner, unexpected_info_msg),
|
|
NewPid = erlang:whereis(Cleaner),
|
|
?assertEqual(OldPid, NewPid),
|
|
ok.
|
|
|
|
tnx_ids(Status) ->
|
|
lists:sort(
|
|
lists:map(
|
|
fun(#{tnx_id := TnxId, node := Node}) ->
|
|
{Node, TnxId}
|
|
end,
|
|
Status
|
|
)
|
|
).
|
|
|
|
start() ->
|
|
{ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
|
|
{ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
|
|
ok = emqx_cluster_rpc:reset(),
|
|
ok.
|
|
|
|
stop() ->
|
|
[
|
|
begin
|
|
case erlang:whereis(N) of
|
|
undefined ->
|
|
ok;
|
|
P ->
|
|
erlang:unlink(P),
|
|
erlang:exit(P, kill)
|
|
end
|
|
end
|
|
|| N <- [?NODE2, ?NODE3]
|
|
].
|
|
|
|
receive_msg(0, _Msg) ->
|
|
ok;
|
|
receive_msg(Count, Msg) when Count > 0 ->
|
|
receive
|
|
Msg ->
|
|
receive_msg(Count - 1, Msg)
|
|
after 1000 ->
|
|
timeout
|
|
end.
|
|
|
|
echo(Pid, Msg) ->
|
|
erlang:send(Pid, Msg),
|
|
ok.
|
|
|
|
echo_delay(Pid, Msg) ->
|
|
timer:sleep(rand:uniform(150)),
|
|
erlang:send(Pid, {msg, Msg, erlang:system_time(), self()}),
|
|
ok.
|
|
|
|
failed_on_node(Pid) ->
|
|
case Pid =:= self() of
|
|
true -> ok;
|
|
false -> "MFA return not ok"
|
|
end.
|
|
|
|
failed_on_node_by_odd(Pid) ->
|
|
case Pid =:= self() of
|
|
true ->
|
|
ok;
|
|
false ->
|
|
catch ets:new(test, [named_table, set, public]),
|
|
Num = ets:update_counter(test, self(), {2, 1}, {self(), 1}),
|
|
case Num rem 2 =:= 0 of
|
|
false -> "MFA return not ok";
|
|
true -> ok
|
|
end
|
|
end.
|
|
|
|
failed_on_other_recover_after_retry(Pid) ->
|
|
case Pid =:= self() of
|
|
true ->
|
|
ok;
|
|
false ->
|
|
[{_, Res}] = ets:lookup(test, other_mfa_result),
|
|
Res
|
|
end.
|
|
|
|
multicall(M, F, A, N, T) ->
|
|
emqx_cluster_rpc:do_multicall(M, F, A, N, T).
|
|
|
|
multicall(M, F, A) ->
|
|
multicall(M, F, A, all, timer:minutes(2)).
|