test(ds): Refactor replication suite
This commit is contained in:
parent
63e51fca66
commit
07aa708894
|
@ -29,10 +29,6 @@
|
||||||
erpc:call(NODE, erlang, apply, [fun() -> BODY end, []])
|
erpc:call(NODE, erlang, apply, [fun() -> BODY end, []])
|
||||||
).
|
).
|
||||||
|
|
||||||
-define(diff_opts, #{
|
|
||||||
context => 20, window => 1000, compare_fun => fun message_eq/2
|
|
||||||
}).
|
|
||||||
|
|
||||||
opts() ->
|
opts() ->
|
||||||
opts(#{}).
|
opts(#{}).
|
||||||
|
|
||||||
|
@ -78,7 +74,9 @@ t_replication_transfers_snapshots('end', Config) ->
|
||||||
t_replication_transfers_snapshots(Config) ->
|
t_replication_transfers_snapshots(Config) ->
|
||||||
NMsgs = 400,
|
NMsgs = 400,
|
||||||
NClients = 5,
|
NClients = 5,
|
||||||
{Stream, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs),
|
{Stream, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
|
||||||
|
?FUNCTION_NAME, NClients, NMsgs
|
||||||
|
),
|
||||||
|
|
||||||
Nodes = [Node, NodeOffline | _] = ?config(nodes, Config),
|
Nodes = [Node, NodeOffline | _] = ?config(nodes, Config),
|
||||||
_Specs = [_, SpecOffline | _] = ?config(specs, Config),
|
_Specs = [_, SpecOffline | _] = ?config(specs, Config),
|
||||||
|
@ -100,7 +98,7 @@ t_replication_transfers_snapshots(Config) ->
|
||||||
ok = emqx_cth_cluster:stop_node(NodeOffline),
|
ok = emqx_cth_cluster:stop_node(NodeOffline),
|
||||||
|
|
||||||
%% Fill the storage with messages and few additional generations.
|
%% Fill the storage with messages and few additional generations.
|
||||||
apply_stream(?DB, Nodes -- [NodeOffline], Stream),
|
emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream),
|
||||||
|
|
||||||
%% Restart the node.
|
%% Restart the node.
|
||||||
[NodeOffline] = emqx_cth_cluster:restart(SpecOffline),
|
[NodeOffline] = emqx_cth_cluster:restart(SpecOffline),
|
||||||
|
@ -126,7 +124,7 @@ t_replication_transfers_snapshots(Config) ->
|
||||||
ok = timer:sleep(3_000),
|
ok = timer:sleep(3_000),
|
||||||
|
|
||||||
%% Check that the DB has been restored:
|
%% Check that the DB has been restored:
|
||||||
verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams)
|
emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
|
||||||
end,
|
end,
|
||||||
[]
|
[]
|
||||||
).
|
).
|
||||||
|
@ -154,7 +152,9 @@ t_rebalance('end', Config) ->
|
||||||
t_rebalance(Config) ->
|
t_rebalance(Config) ->
|
||||||
NMsgs = 50,
|
NMsgs = 50,
|
||||||
NClients = 5,
|
NClients = 5,
|
||||||
{Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs),
|
{Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
|
||||||
|
?FUNCTION_NAME, NClients, NMsgs
|
||||||
|
),
|
||||||
Nodes = [N1, N2 | _] = ?config(nodes, Config),
|
Nodes = [N1, N2 | _] = ?config(nodes, Config),
|
||||||
?check_trace(
|
?check_trace(
|
||||||
#{timetrap => 30_000},
|
#{timetrap => 30_000},
|
||||||
|
@ -205,9 +205,9 @@ t_rebalance(Config) ->
|
||||||
],
|
],
|
||||||
|
|
||||||
%% 2. Start filling the storage:
|
%% 2. Start filling the storage:
|
||||||
apply_stream(?DB, Nodes, Stream),
|
emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream),
|
||||||
timer:sleep(5000),
|
timer:sleep(5000),
|
||||||
verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams),
|
emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams),
|
||||||
[
|
[
|
||||||
?defer_assert(
|
?defer_assert(
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
|
@ -233,8 +233,10 @@ t_rebalance(Config) ->
|
||||||
|
|
||||||
%% Scale down the cluster by removing the first node.
|
%% Scale down the cluster by removing the first node.
|
||||||
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
|
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
|
||||||
ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]),
|
ct:pal("Transitions (~p -> ~p): ~p~n", [
|
||||||
?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
|
Sites, tl(Sites), emqx_ds_test_helpers:transitions(N1, ?DB)
|
||||||
|
]),
|
||||||
|
?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N2, ?DB))),
|
||||||
|
|
||||||
%% Verify that at the end each node is now responsible for each shard.
|
%% Verify that at the end each node is now responsible for each shard.
|
||||||
?defer_assert(
|
?defer_assert(
|
||||||
|
@ -245,7 +247,7 @@ t_rebalance(Config) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% Verify that the messages are once again preserved after the rebalance:
|
%% Verify that the messages are once again preserved after the rebalance:
|
||||||
verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams)
|
emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
|
||||||
end,
|
end,
|
||||||
[]
|
[]
|
||||||
).
|
).
|
||||||
|
@ -294,7 +296,7 @@ t_join_leave_errors(Config) ->
|
||||||
|
|
||||||
%% Should be no-op.
|
%% Should be no-op.
|
||||||
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])),
|
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])),
|
||||||
?assertEqual([], transitions(N1, ?DB)),
|
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)),
|
||||||
|
|
||||||
%% Impossible to leave the last site.
|
%% Impossible to leave the last site.
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
|
@ -305,12 +307,12 @@ t_join_leave_errors(Config) ->
|
||||||
%% "Move" the DB to the other node.
|
%% "Move" the DB to the other node.
|
||||||
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
|
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
|
||||||
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
|
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
|
||||||
?assertMatch([_ | _], transitions(N1, ?DB)),
|
?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?DB)),
|
||||||
?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))),
|
?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
|
||||||
|
|
||||||
%% Should be no-op.
|
%% Should be no-op.
|
||||||
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
|
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
|
||||||
?assertEqual([], transitions(N1, ?DB)).
|
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)).
|
||||||
|
|
||||||
t_rebalance_chaotic_converges(init, Config) ->
|
t_rebalance_chaotic_converges(init, Config) ->
|
||||||
Apps = [appspec(emqx_durable_storage)],
|
Apps = [appspec(emqx_durable_storage)],
|
||||||
|
@ -335,7 +337,9 @@ t_rebalance_chaotic_converges(Config) ->
|
||||||
Nodes = [N1, N2, N3] = ?config(nodes, Config),
|
Nodes = [N1, N2, N3] = ?config(nodes, Config),
|
||||||
|
|
||||||
NClients = 5,
|
NClients = 5,
|
||||||
{Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs),
|
{Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
|
||||||
|
?FUNCTION_NAME, NClients, NMsgs
|
||||||
|
),
|
||||||
|
|
||||||
?check_trace(
|
?check_trace(
|
||||||
#{},
|
#{},
|
||||||
|
@ -385,10 +389,10 @@ t_rebalance_chaotic_converges(Config) ->
|
||||||
"Initially, the DB is assigned to [S1, S2]"
|
"Initially, the DB is assigned to [S1, S2]"
|
||||||
),
|
),
|
||||||
|
|
||||||
apply_stream(?DB, Nodes, Stream),
|
emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream),
|
||||||
|
|
||||||
%% Wait for the last transition to complete.
|
%% Wait for the last transition to complete.
|
||||||
?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))),
|
?retry(500, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
|
||||||
|
|
||||||
?defer_assert(
|
?defer_assert(
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
|
@ -401,7 +405,7 @@ t_rebalance_chaotic_converges(Config) ->
|
||||||
timer:sleep(5000),
|
timer:sleep(5000),
|
||||||
|
|
||||||
%% Check that all messages are still there.
|
%% Check that all messages are still there.
|
||||||
verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams)
|
emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
|
||||||
end,
|
end,
|
||||||
[]
|
[]
|
||||||
).
|
).
|
||||||
|
@ -447,7 +451,7 @@ t_rebalance_offline_restarts(Config) ->
|
||||||
%% Shut down N3 and then remove it from the DB.
|
%% Shut down N3 and then remove it from the DB.
|
||||||
ok = emqx_cth_cluster:stop_node(N3),
|
ok = emqx_cth_cluster:stop_node(N3),
|
||||||
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
|
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
|
||||||
Transitions = transitions(N1, ?DB),
|
Transitions = emqx_ds_test_helpers:transitions(N1, ?DB),
|
||||||
ct:pal("Transitions: ~p~n", [Transitions]),
|
ct:pal("Transitions: ~p~n", [Transitions]),
|
||||||
|
|
||||||
%% Wait until at least one transition completes.
|
%% Wait until at least one transition completes.
|
||||||
|
@ -462,7 +466,7 @@ t_rebalance_offline_restarts(Config) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% Target state should still be reached eventually.
|
%% Target state should still be reached eventually.
|
||||||
?retry(1000, 20, ?assertEqual([], transitions(N1, ?DB))),
|
?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
|
||||||
?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])).
|
?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
@ -491,10 +495,6 @@ ds_repl_meta(Node, Fun, Args) ->
|
||||||
ds_repl_shard(Node, Fun, Args) ->
|
ds_repl_shard(Node, Fun, Args) ->
|
||||||
erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args).
|
erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args).
|
||||||
|
|
||||||
transitions(Node, DB) ->
|
|
||||||
Shards = shards(Node, DB),
|
|
||||||
[{S, T} || S <- Shards, T <- ds_repl_meta(Node, replica_set_transitions, [DB, S])].
|
|
||||||
|
|
||||||
shards(Node, DB) ->
|
shards(Node, DB) ->
|
||||||
erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
|
erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
|
||||||
|
|
||||||
|
@ -557,196 +557,3 @@ init_per_testcase(TCName, Config0) ->
|
||||||
end_per_testcase(TCName, Config) ->
|
end_per_testcase(TCName, Config) ->
|
||||||
ok = snabbkaffe:stop(),
|
ok = snabbkaffe:stop(),
|
||||||
emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).
|
emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).
|
||||||
|
|
||||||
without_extra(L) ->
|
|
||||||
[I#message{extra = #{}} || I <- L].
|
|
||||||
|
|
||||||
%% Consume data from the DS storage on a given node as a stream:
|
|
||||||
-type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}).
|
|
||||||
|
|
||||||
%% Create a stream from the topic (wildcards are NOT supported for a
|
|
||||||
%% good reason: order of messages is implementation-dependent!).
|
|
||||||
%%
|
|
||||||
%% Note: stream produces messages with keys
|
|
||||||
-spec ds_topic_stream(binary(), binary(), node()) -> ds_stream().
|
|
||||||
ds_topic_stream(ClientId, TopicBin, Node) ->
|
|
||||||
Topic = emqx_topic:words(TopicBin),
|
|
||||||
Shard = shard_of_clientid(Node, ClientId),
|
|
||||||
{ShardId, DSStreams} =
|
|
||||||
?ON(
|
|
||||||
Node,
|
|
||||||
begin
|
|
||||||
DBShard = {?DB, Shard},
|
|
||||||
{DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)}
|
|
||||||
end
|
|
||||||
),
|
|
||||||
%% Sort streams by their rank Y, and chain them together:
|
|
||||||
emqx_utils_stream:chain([
|
|
||||||
ds_topic_generation_stream(Node, ShardId, Topic, S)
|
|
||||||
|| {_RankY, S} <- lists:sort(DSStreams)
|
|
||||||
]).
|
|
||||||
|
|
||||||
ds_topic_generation_stream(Node, Shard, Topic, Stream) ->
|
|
||||||
{ok, Iterator} = ?ON(
|
|
||||||
Node,
|
|
||||||
emqx_ds_storage_layer:make_iterator(Shard, Stream, Topic, 0)
|
|
||||||
),
|
|
||||||
do_ds_topic_generation_stream(Node, Shard, Iterator).
|
|
||||||
|
|
||||||
do_ds_topic_generation_stream(Node, Shard, It0) ->
|
|
||||||
fun() ->
|
|
||||||
case
|
|
||||||
?ON(
|
|
||||||
Node,
|
|
||||||
begin
|
|
||||||
Now = emqx_ds_replication_layer:current_timestamp(?DB, Shard),
|
|
||||||
emqx_ds_storage_layer:next(Shard, It0, 1, Now)
|
|
||||||
end
|
|
||||||
)
|
|
||||||
of
|
|
||||||
{ok, _It, []} ->
|
|
||||||
[];
|
|
||||||
{ok, end_of_stream} ->
|
|
||||||
[];
|
|
||||||
{ok, It, [KeyMsg]} ->
|
|
||||||
[KeyMsg | do_ds_topic_generation_stream(Node, Shard, It)]
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% Payload generation:
|
|
||||||
|
|
||||||
apply_stream(DB, Nodes, Stream) ->
|
|
||||||
apply_stream(
|
|
||||||
DB,
|
|
||||||
emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)),
|
|
||||||
Stream,
|
|
||||||
0
|
|
||||||
).
|
|
||||||
|
|
||||||
apply_stream(DB, NodeStream0, Stream0, N) ->
|
|
||||||
case emqx_utils_stream:next(Stream0) of
|
|
||||||
[] ->
|
|
||||||
?tp(all_done, #{});
|
|
||||||
[Msg = #message{} | Stream] ->
|
|
||||||
[Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
|
|
||||||
?tp(
|
|
||||||
test_push_message,
|
|
||||||
maps:merge(
|
|
||||||
emqx_message:to_map(Msg),
|
|
||||||
#{n => N}
|
|
||||||
)
|
|
||||||
),
|
|
||||||
?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})),
|
|
||||||
apply_stream(DB, NodeStream, Stream, N + 1);
|
|
||||||
[add_generation | Stream] ->
|
|
||||||
%% FIXME:
|
|
||||||
[_Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
|
|
||||||
%% add_generation(Node, DB),
|
|
||||||
apply_stream(DB, NodeStream, Stream, N);
|
|
||||||
[{Node, Operation, Arg} | Stream] when
|
|
||||||
Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites
|
|
||||||
->
|
|
||||||
?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}),
|
|
||||||
%% Apply the transition.
|
|
||||||
?assertEqual(ok, ds_repl_meta(Node, Operation, [DB, Arg])),
|
|
||||||
%% Give some time for at least one transition to complete.
|
|
||||||
Transitions = transitions(Node, ?DB),
|
|
||||||
ct:pal("Transitions after ~p: ~p", [Operation, Transitions]),
|
|
||||||
?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))),
|
|
||||||
apply_stream(DB, NodeStream0, Stream, N);
|
|
||||||
[Fun | Stream] when is_function(Fun) ->
|
|
||||||
Fun(),
|
|
||||||
apply_stream(DB, NodeStream0, Stream, N)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @doc Create an infinite list of messages from a given client:
|
|
||||||
interleaved_topic_messages(TestCase, NClients, NMsgs) ->
|
|
||||||
%% List of fake client IDs:
|
|
||||||
Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)],
|
|
||||||
TopicStreams = [
|
|
||||||
{ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(TestCase, ClientId))}
|
|
||||||
|| ClientId <- Clients
|
|
||||||
],
|
|
||||||
%% Interleaved stream of messages:
|
|
||||||
Stream = emqx_utils_stream:interleave(
|
|
||||||
[{2, Stream} || {_ClientId, Stream} <- TopicStreams], true
|
|
||||||
),
|
|
||||||
{Stream, TopicStreams}.
|
|
||||||
|
|
||||||
topic_messages(TestCase, ClientId) ->
|
|
||||||
topic_messages(TestCase, ClientId, 0).
|
|
||||||
|
|
||||||
topic_messages(TestCase, ClientId, N) ->
|
|
||||||
fun() ->
|
|
||||||
Msg = #message{
|
|
||||||
from = ClientId,
|
|
||||||
topic = client_topic(TestCase, ClientId),
|
|
||||||
timestamp = N * 100,
|
|
||||||
payload = integer_to_binary(N)
|
|
||||||
},
|
|
||||||
[Msg | topic_messages(TestCase, ClientId, N + 1)]
|
|
||||||
end.
|
|
||||||
|
|
||||||
client_topic(TestCase, ClientId) when is_atom(TestCase) ->
|
|
||||||
client_topic(atom_to_binary(TestCase, utf8), ClientId);
|
|
||||||
client_topic(TestCase, ClientId) when is_binary(TestCase) ->
|
|
||||||
<<TestCase/binary, "/", ClientId/binary>>.
|
|
||||||
|
|
||||||
message_eq(Msg1, {_Key, Msg2}) ->
|
|
||||||
%% Timestamps can be modified by the replication layer, ignore them:
|
|
||||||
Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}.
|
|
||||||
|
|
||||||
%% Stream comparison:
|
|
||||||
|
|
||||||
-spec verify_stream_effects(binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> ok.
|
|
||||||
verify_stream_effects(TestCase, Nodes0, L) ->
|
|
||||||
Checked = lists:flatmap(
|
|
||||||
fun({ClientId, Stream}) ->
|
|
||||||
Nodes = nodes_of_clientid(ClientId, Nodes0),
|
|
||||||
ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]),
|
|
||||||
?defer_assert(
|
|
||||||
?assertMatch([_ | _], Nodes, ["No nodes have been allocated for ", ClientId])
|
|
||||||
),
|
|
||||||
[verify_stream_effects(TestCase, Node, ClientId, Stream) || Node <- Nodes]
|
|
||||||
end,
|
|
||||||
L
|
|
||||||
),
|
|
||||||
?defer_assert(?assertMatch([_ | _], Checked, "Some messages have been verified")).
|
|
||||||
|
|
||||||
-spec verify_stream_effects(binary(), node(), emqx_types:clientid(), ds_stream()) -> ok.
|
|
||||||
verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) ->
|
|
||||||
ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]),
|
|
||||||
?defer_assert(
|
|
||||||
begin
|
|
||||||
snabbkaffe_diff:assert_lists_eq(
|
|
||||||
ExpectedStream,
|
|
||||||
ds_topic_stream(ClientId, client_topic(TestCase, ClientId), Node),
|
|
||||||
?diff_opts
|
|
||||||
),
|
|
||||||
ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node])
|
|
||||||
end
|
|
||||||
).
|
|
||||||
|
|
||||||
%% Find which nodes from the list contain the shards for the given
|
|
||||||
%% client ID:
|
|
||||||
nodes_of_clientid(ClientId, Nodes = [N0 | _]) ->
|
|
||||||
Shard = shard_of_clientid(N0, ClientId),
|
|
||||||
SiteNodes = ?ON(
|
|
||||||
N0,
|
|
||||||
begin
|
|
||||||
Sites = emqx_ds_replication_layer_meta:replica_set(?DB, Shard),
|
|
||||||
lists:map(fun emqx_ds_replication_layer_meta:node/1, Sites)
|
|
||||||
end
|
|
||||||
),
|
|
||||||
lists:filter(
|
|
||||||
fun(N) ->
|
|
||||||
lists:member(N, SiteNodes)
|
|
||||||
end,
|
|
||||||
Nodes
|
|
||||||
).
|
|
||||||
|
|
||||||
shard_of_clientid(Node, ClientId) ->
|
|
||||||
?ON(
|
|
||||||
Node,
|
|
||||||
emqx_ds_replication_layer:shard_of_message(?DB, #message{from = ClientId}, clientid)
|
|
||||||
).
|
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
|
||||||
opts() ->
|
opts() ->
|
||||||
#{storage => {emqx_ds_storage_bitfield_lts, #{}}}.
|
#{storage => {emqx_ds_storage_reference, #{}}}.
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,12 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
|
||||||
|
-define(ON(NODE, BODY),
|
||||||
|
erpc:call(NODE, erlang, apply, [fun() -> BODY end, []])
|
||||||
|
).
|
||||||
|
|
||||||
%% RPC mocking
|
%% RPC mocking
|
||||||
|
|
||||||
|
@ -59,8 +65,220 @@ mock_rpc_result(gen_rpc, ExpectFun) ->
|
||||||
end
|
end
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
%% Consume data from the DS storage on a given node as a stream:
|
||||||
|
-type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}).
|
||||||
|
|
||||||
|
%% @doc Create an infinite list of messages from a given client:
|
||||||
|
interleaved_topic_messages(TestCase, NClients, NMsgs) ->
|
||||||
|
%% List of fake client IDs:
|
||||||
|
Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)],
|
||||||
|
TopicStreams = [
|
||||||
|
{ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(TestCase, ClientId))}
|
||||||
|
|| ClientId <- Clients
|
||||||
|
],
|
||||||
|
%% Interleaved stream of messages:
|
||||||
|
Stream = emqx_utils_stream:interleave(
|
||||||
|
[{2, Stream} || {_ClientId, Stream} <- TopicStreams], true
|
||||||
|
),
|
||||||
|
{Stream, TopicStreams}.
|
||||||
|
|
||||||
|
topic_messages(TestCase, ClientId) ->
|
||||||
|
topic_messages(TestCase, ClientId, 0).
|
||||||
|
|
||||||
|
topic_messages(TestCase, ClientId, N) ->
|
||||||
|
fun() ->
|
||||||
|
Msg = #message{
|
||||||
|
from = ClientId,
|
||||||
|
topic = client_topic(TestCase, ClientId),
|
||||||
|
timestamp = N * 100,
|
||||||
|
payload = integer_to_binary(N)
|
||||||
|
},
|
||||||
|
[Msg | topic_messages(TestCase, ClientId, N + 1)]
|
||||||
|
end.
|
||||||
|
|
||||||
|
client_topic(TestCase, ClientId) when is_atom(TestCase) ->
|
||||||
|
client_topic(atom_to_binary(TestCase, utf8), ClientId);
|
||||||
|
client_topic(TestCase, ClientId) when is_binary(TestCase) ->
|
||||||
|
<<TestCase/binary, "/", ClientId/binary>>.
|
||||||
|
|
||||||
|
ds_topic_generation_stream(DB, Node, Shard, Topic, Stream) ->
|
||||||
|
{ok, Iterator} = ?ON(
|
||||||
|
Node,
|
||||||
|
emqx_ds_storage_layer:make_iterator(Shard, Stream, Topic, 0)
|
||||||
|
),
|
||||||
|
do_ds_topic_generation_stream(DB, Node, Shard, Iterator).
|
||||||
|
|
||||||
|
do_ds_topic_generation_stream(DB, Node, Shard, It0) ->
|
||||||
|
fun() ->
|
||||||
|
case
|
||||||
|
?ON(
|
||||||
|
Node,
|
||||||
|
begin
|
||||||
|
Now = emqx_ds_replication_layer:current_timestamp(DB, Shard),
|
||||||
|
emqx_ds_storage_layer:next(Shard, It0, 1, Now)
|
||||||
|
end
|
||||||
|
)
|
||||||
|
of
|
||||||
|
{ok, _It, []} ->
|
||||||
|
[];
|
||||||
|
{ok, end_of_stream} ->
|
||||||
|
[];
|
||||||
|
{ok, It, [KeyMsg]} ->
|
||||||
|
[KeyMsg | do_ds_topic_generation_stream(DB, Node, Shard, It)]
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% Payload generation:
|
||||||
|
|
||||||
|
apply_stream(DB, Nodes, Stream) ->
|
||||||
|
apply_stream(
|
||||||
|
DB,
|
||||||
|
emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)),
|
||||||
|
Stream,
|
||||||
|
0
|
||||||
|
).
|
||||||
|
|
||||||
|
apply_stream(DB, NodeStream0, Stream0, N) ->
|
||||||
|
case emqx_utils_stream:next(Stream0) of
|
||||||
|
[] ->
|
||||||
|
?tp(all_done, #{});
|
||||||
|
[Msg = #message{} | Stream] ->
|
||||||
|
[Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
|
||||||
|
?tp(
|
||||||
|
test_push_message,
|
||||||
|
maps:merge(
|
||||||
|
emqx_message:to_map(Msg),
|
||||||
|
#{n => N}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})),
|
||||||
|
apply_stream(DB, NodeStream, Stream, N + 1);
|
||||||
|
[add_generation | Stream] ->
|
||||||
|
%% FIXME:
|
||||||
|
[_Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
|
||||||
|
%% add_generation(Node, DB),
|
||||||
|
apply_stream(DB, NodeStream, Stream, N);
|
||||||
|
[{Node, Operation, Arg} | Stream] when
|
||||||
|
Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites
|
||||||
|
->
|
||||||
|
?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}),
|
||||||
|
%% Apply the transition.
|
||||||
|
?assertEqual(
|
||||||
|
ok,
|
||||||
|
?ON(
|
||||||
|
Node,
|
||||||
|
emqx_ds_replication_layer_meta:Operation(DB, Arg)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
%% Give some time for at least one transition to complete.
|
||||||
|
Transitions = transitions(Node, DB),
|
||||||
|
ct:pal("Transitions after ~p: ~p", [Operation, Transitions]),
|
||||||
|
?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))),
|
||||||
|
apply_stream(DB, NodeStream0, Stream, N);
|
||||||
|
[Fun | Stream] when is_function(Fun) ->
|
||||||
|
Fun(),
|
||||||
|
apply_stream(DB, NodeStream0, Stream, N)
|
||||||
|
end.
|
||||||
|
|
||||||
|
transitions(Node, DB) ->
|
||||||
|
?ON(
|
||||||
|
Node,
|
||||||
|
begin
|
||||||
|
Shards = emqx_ds_replication_layer_meta:shards(DB),
|
||||||
|
[
|
||||||
|
{S, T}
|
||||||
|
|| S <- Shards, T <- emqx_ds_replication_layer_meta:replica_set_transitions(DB, S)
|
||||||
|
]
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
|
%% Stream comparison
|
||||||
|
|
||||||
|
message_eq(Msg1, {_Key, Msg2}) ->
|
||||||
|
%% Timestamps can be modified by the replication layer, ignore them:
|
||||||
|
Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}.
|
||||||
|
|
||||||
%% Consuming streams and iterators
|
%% Consuming streams and iterators
|
||||||
|
|
||||||
|
-spec verify_stream_effects(atom(), binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) ->
|
||||||
|
ok.
|
||||||
|
verify_stream_effects(DB, TestCase, Nodes0, L) ->
|
||||||
|
Checked = lists:flatmap(
|
||||||
|
fun({ClientId, Stream}) ->
|
||||||
|
Nodes = nodes_of_clientid(DB, ClientId, Nodes0),
|
||||||
|
ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]),
|
||||||
|
?defer_assert(
|
||||||
|
?assertMatch([_ | _], Nodes, ["No nodes have been allocated for ", ClientId])
|
||||||
|
),
|
||||||
|
[verify_stream_effects(DB, TestCase, Node, ClientId, Stream) || Node <- Nodes]
|
||||||
|
end,
|
||||||
|
L
|
||||||
|
),
|
||||||
|
?defer_assert(?assertMatch([_ | _], Checked, "Some messages have been verified")).
|
||||||
|
|
||||||
|
-spec verify_stream_effects(atom(), binary(), node(), emqx_types:clientid(), ds_stream()) -> ok.
|
||||||
|
verify_stream_effects(DB, TestCase, Node, ClientId, ExpectedStream) ->
|
||||||
|
ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]),
|
||||||
|
DiffOpts = #{context => 20, window => 1000, compare_fun => fun message_eq/2},
|
||||||
|
?defer_assert(
|
||||||
|
begin
|
||||||
|
snabbkaffe_diff:assert_lists_eq(
|
||||||
|
ExpectedStream,
|
||||||
|
ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node),
|
||||||
|
DiffOpts
|
||||||
|
),
|
||||||
|
ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node])
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
|
%% Create a stream from the topic (wildcards are NOT supported for a
|
||||||
|
%% good reason: order of messages is implementation-dependent!).
|
||||||
|
%%
|
||||||
|
%% Note: stream produces messages with keys
|
||||||
|
-spec ds_topic_stream(atom(), binary(), binary(), node()) -> ds_stream().
|
||||||
|
ds_topic_stream(DB, ClientId, TopicBin, Node) ->
|
||||||
|
Topic = emqx_topic:words(TopicBin),
|
||||||
|
Shard = shard_of_clientid(DB, Node, ClientId),
|
||||||
|
{ShardId, DSStreams} =
|
||||||
|
?ON(
|
||||||
|
Node,
|
||||||
|
begin
|
||||||
|
DBShard = {DB, Shard},
|
||||||
|
{DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)}
|
||||||
|
end
|
||||||
|
),
|
||||||
|
%% Sort streams by their rank Y, and chain them together:
|
||||||
|
emqx_utils_stream:chain([
|
||||||
|
ds_topic_generation_stream(DB, Node, ShardId, Topic, S)
|
||||||
|
|| {_RankY, S} <- lists:sort(DSStreams)
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Find which nodes from the list contain the shards for the given
|
||||||
|
%% client ID:
|
||||||
|
nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) ->
|
||||||
|
Shard = shard_of_clientid(DB, N0, ClientId),
|
||||||
|
SiteNodes = ?ON(
|
||||||
|
N0,
|
||||||
|
begin
|
||||||
|
Sites = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
|
||||||
|
lists:map(fun emqx_ds_replication_layer_meta:node/1, Sites)
|
||||||
|
end
|
||||||
|
),
|
||||||
|
lists:filter(
|
||||||
|
fun(N) ->
|
||||||
|
lists:member(N, SiteNodes)
|
||||||
|
end,
|
||||||
|
Nodes
|
||||||
|
).
|
||||||
|
|
||||||
|
shard_of_clientid(DB, Node, ClientId) ->
|
||||||
|
?ON(
|
||||||
|
Node,
|
||||||
|
emqx_ds_replication_layer:shard_of_message(DB, #message{from = ClientId}, clientid)
|
||||||
|
).
|
||||||
|
|
||||||
|
%% Consume eagerly:
|
||||||
|
|
||||||
consume(DB, TopicFilter) ->
|
consume(DB, TopicFilter) ->
|
||||||
consume(DB, TopicFilter, 0).
|
consume(DB, TopicFilter, 0).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue