From 68ca891f410b2b165f1ae8d08ee5de1e666ed45a Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 6 May 2024 11:20:57 +0200 Subject: [PATCH] test(ds): Use streams to create traffic --- .../src/emqx_ds_replication_layer.erl | 51 ++- .../src/emqx_ds_replication_layer_shard.erl | 5 +- .../src/emqx_ds_storage_bitfield_lts.erl | 14 +- .../src/emqx_ds_storage_layer.erl | 7 + .../test/emqx_ds_replication_SUITE.erl | 399 +++++++++++++----- .../test/emqx_ds_test_helpers.erl | 2 + apps/emqx_utils/src/emqx_utils_stream.erl | 64 ++- .../test/emqx_utils_stream_tests.erl | 8 + 8 files changed, 421 insertions(+), 129 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 90a26c484..315a276ad 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -88,6 +88,7 @@ ]). -include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -include("emqx_ds_replication_layer.hrl"). %%================================================================================ @@ -691,37 +692,39 @@ apply( ?tag := ?BATCH, ?batch_messages := MessagesIn }, - #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State + #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0 ) -> %% NOTE %% Unique timestamp tracking real time closely. %% With microsecond granularity it should be nearly impossible for it to run %% too far ahead than the real time clock. + ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, ts => Latest0}), {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), - NState = State#{latest := Latest}, + State = State0#{latest := Latest}, + set_ts(DBShard, Latest), %% TODO: Need to measure effects of changing frequency of `release_cursor`. - Effect = {release_cursor, RaftIdx, NState}, - emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), Latest), - {NState, Result, Effect}; + Effect = {release_cursor, RaftIdx, State}, + {State, Result, Effect}; apply( _RaftMeta, #{?tag := add_generation, ?since := Since}, - #{db_shard := DBShard, latest := Latest} = State + #{db_shard := DBShard, latest := Latest0} = State0 ) -> - {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest), + {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0), Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp), - NState = State#{latest := NLatest}, - {NState, Result}; + State = State0#{latest := Latest}, + set_ts(DBShard, Latest), + {State, Result}; apply( _RaftMeta, #{?tag := update_config, ?since := Since, ?config := Opts}, - #{db_shard := DBShard, latest := Latest} = State + #{db_shard := DBShard, latest := Latest0} = State0 ) -> - {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest), + {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0), Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts), - NState = State#{latest := NLatest}, - {NState, Result}; + State = State0#{latest := Latest}, + {State, Result}; apply( _RaftMeta, #{?tag := drop_generation, ?generation := GenId}, @@ -730,17 +733,28 @@ apply( Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId), {State, Result}; apply( - _RaftMeta, + #{index := RaftIdx}, #{?tag := storage_event, ?payload := CustomEvent}, #{db_shard := DBShard, latest := Latest0} = State ) -> {Timestamp, Latest} = ensure_monotonic_timestamp(emqx_ds:timestamp_us(), Latest0), + set_ts(DBShard, Latest), + ?tp( + debug, + emqx_ds_replication_layer_storage_event, + #{ + shard => DBShard, ts => Timestamp, payload => CustomEvent + } + ), Effects = handle_custom_event(DBShard, Timestamp, CustomEvent), {State#{latest := Latest}, ok, Effects}. -spec tick(integer(), ra_state()) -> ra_machine:effects(). -tick(TimeMs, #{db_shard := DBShard, latest := Latest}) -> +tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> + %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), {Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest), + ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}), + set_ts(DBShard, Latest), handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> @@ -781,6 +795,11 @@ handle_custom_event(DBShard, Latest, Event) -> [{append, #{?tag => storage_event, ?payload => I}} || I <- Events] catch EC:Err:Stacktrace -> - logger:error(#{EC => Err, stacktrace => Stacktrace, msg => "ds_storage_layer_tick"}), + ?tp(error, ds_storage_custom_even_fail, #{ + EC => Err, stacktrace => Stacktrace, event => Event + }), [] end. + +set_ts({DB, Shard}, TS) -> + emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), TS). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index ac495be1c..dca00222b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -309,7 +309,8 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), Servers = shard_servers(DB, Shard), - case ra:restart_server(DB, LocalServer) of + MutableConfig = #{tick_timeout => 100}, + case ra:restart_server(DB, LocalServer, MutableConfig) of {error, name_not_registered} -> Bootstrap = true, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, @@ -320,7 +321,7 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> ], ReplicationOpts ), - ok = ra:start_server(DB, #{ + ok = ra:start_server(DB, MutableConfig#{ id => LocalServer, uid => server_uid(DB, Shard), cluster_name => ClusterName, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 7342b097d..db50e49dd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -482,13 +482,17 @@ delete_next_until( end. handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> - %% Cause replication layer to bump timestamp when idle case ets:lookup(Gvars, ?IDLE_DETECT) of - [{?IDLE_DETECT, false, LastWrittenTs}] when - ?EPOCH(State, LastWrittenTs) > ?EPOCH(State, Time) - -> + [{?IDLE_DETECT, Latch, LastWrittenTs}] -> + ok; + [] -> + Latch = false, + LastWrittenTs = 0 + end, + case Latch of + false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) -> ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}), - [emqx_ds_storage_bitfield_lts_dummy_event]; + [dummy_event]; _ -> [] end; diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index fff3a77f3..175a0d515 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -287,6 +287,13 @@ get_streams(Shard, TopicFilter, StartTime) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), + ?tp(get_streams_get_gen_topic, #{ + gen_id => GenId, + topic => TopicFilter, + start_time => StartTime, + streams => Streams, + gen_data => GenData + }), [ {GenId, ?stream_v2(GenId, InnerStream)} || InnerStream <- Streams diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 3b0e37c7f..b31b9b0c2 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -21,10 +21,18 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). --include_lib("snabbkaffe/include/test_macros.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(DB, testdb). +-define(ON(NODE, BODY), + erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) +). + +-define(diff_opts, #{ + context => 20, window => 1000, max_failures => 1000, compare_fun => fun message_eq/2 +}). + opts() -> opts(#{}). @@ -32,7 +40,8 @@ opts(Overrides) -> maps:merge( #{ backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}}, + %% storage => {emqx_ds_storage_reference, #{}}, + storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 1}}, n_shards => 16, n_sites => 1, replication_factor => 3, @@ -142,112 +151,140 @@ t_rebalance(init, Config) -> t_rebalance('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). +%% This testcase verifies that the storage rebalancing works correctly: +%% 1. Join/leave operations are applied successfully. +%% 2. Message data survives the rebalancing. +%% 3. Shard cluster membership converges to the target replica allocation. +%% 4. Replication factor is respected. t_rebalance(Config) -> - %% This testcase verifies that the storage rebalancing works correctly: - %% 1. Join/leave operations are applied successfully. - %% 2. Message data survives the rebalancing. - %% 3. Shard cluster membership converges to the target replica allocation. - %% 4. Replication factor is respected. - - NMsgs = 800, + NMsgs = 50, NClients = 5, - Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), - - %% Initialize DB on the first node. - Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), - ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])), - ?assertMatch( - Shards when length(Shards) == 16, - shards_online(N1, ?DB) - ), - - %% Open DB on the rest of the nodes. - ?assertEqual( - [{ok, ok} || _ <- [N2, N3, N4]], - erpc:multicall([N2, N3, N4], emqx_ds, open_db, [?DB, Opts]) - ), - - Sites = [S1, S2 | _Rest] = [ds_repl_meta(N, this_site) || N <- Nodes], - ct:pal("Sites: ~p~n", [Sites]), - - %% Only N1 should be responsible for all shards initially. - ?assertEqual( - [[S1] || _ <- Nodes], - [ds_repl_meta(N, db_sites, [?DB]) || N <- Nodes] - ), - - %% Fill the storage with messages and few additional generations. - %% This will force shards to trigger snapshot transfers during rebalance. - ClientMessages = emqx_utils:pmap( - fun(CID) -> - N = lists:nth(1 + (CID rem length(Nodes)), Nodes), - fill_storage(N, ?DB, NMsgs, #{client_id => integer_to_binary(CID)}) - end, - lists:seq(1, NClients), - infinity - ), - Messages1 = lists:sort(fun compare_message/2, lists:append(ClientMessages)), - - %% Join the second site to the DB replication sites. - ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), - %% Should be no-op. - ?assertEqual(ok, ds_repl_meta(N2, join_db_site, [?DB, S2])), - ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]), - - %% Fill in some more messages *during* the rebalance. - MessagesRB1 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB1">>}), - - ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), - - %% Now join the rest of the sites. - ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])), - ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]), - - %% Fill in some more messages *during* the rebalance. - MessagesRB2 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB2">>}), - - ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), - - %% Verify that each node is now responsible for 3/4 of the shards. - ?assertEqual( - [(16 * 3) div length(Nodes) || _ <- Nodes], - [n_shards_online(N, ?DB) || N <- Nodes] - ), - - %% Verify that the set of shard servers matches the target allocation. - Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes], - ShardServers = [ - shard_server_info(N, ?DB, Shard, Site, readiness) - || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation), - Shard <- Shards + %% List of fake client IDs: + Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], + %% List of streams that generate messages for each "client" in its own topic: + TopicStreams = [ + {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(?FUNCTION_NAME, ClientId))} + || ClientId <- Clients ], - ?assert( - lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers), - ShardServers - ), + %% Interleaved list of events: + Stream = emqx_utils_stream:interleave([{2, Stream} || {_ClientId, Stream} <- TopicStreams]), + Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), + ?check_trace( + #{timetrap => 30_000}, + begin + %% 0. Inject schedulings to make sure the messages are + %% written to the storage before, during, and after + %% rebalance: + ?force_ordering(#{?snk_kind := test_push_message, n := 10}, #{ + ?snk_kind := test_start_rebalance + }), + ?force_ordering(#{?snk_kind := test_start_rebalance}, #{ + ?snk_kind := test_push_message, n := 20 + }), + ?force_ordering(#{?snk_kind := test_end_rebalance}, #{ + ?snk_kind := test_push_message, n := 30 + }), + %% 1. Initialize DB on the first node. + Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), - %% Verify that the messages are preserved after the rebalance. - Messages = Messages1 ++ MessagesRB1 ++ MessagesRB2, - MessagesN4 = lists:sort(fun compare_message/2, consume(N4, ?DB, ['#'], 0)), - ?assertEqual(sample(20, Messages), sample(20, MessagesN4)), - ?assertEqual(Messages, MessagesN4), + ?assertEqual(ok, ?ON(N1, emqx_ds:open_db(?DB, Opts))), + ?assertMatch(Shards when length(Shards) == 16, shards_online(N1, ?DB)), - %% Scale down the cluster by removing the first node. - ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), - ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]), + %% 1.1 Open DB on the rest of the nodes: + [ + ?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts))) + || Node <- Nodes + ], - ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), - %% Verify that each node is now responsible for each shard. - ?assertEqual( - [0, 16, 16, 16], - [n_shards_online(N, ?DB) || N <- Nodes] - ), + %% 1.2 Verify that all nodes have the same view of metadata storage: + [ + ?defer_assert( + ?assertEqual( + [S1], + ?ON(Node, emqx_ds_replication_layer_meta:db_sites(?DB)), + #{ + msg => "Initially, only S1 should be responsible for all shards", + node => Node + } + ) + ) + || Node <- Nodes + ], - %% Verify that the messages are once again preserved after the rebalance. - MessagesN3 = lists:sort(fun compare_message/2, consume(N3, ?DB, ['#'], 0)), - ?assertEqual(sample(20, Messages), sample(20, MessagesN3)), - ?assertEqual(Messages, MessagesN3). + %% 2. Start filling the storage: + spawn_link( + fun() -> + NodeStream = emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)), + apply_stream(?DB, NodeStream, Stream, 0) + end + ), + + %% 3. Start rebalance in the meanwhile: + ?tp(test_start_rebalance, #{}), + %% 3.1 Join the second site to the DB replication sites. + ?assertEqual(ok, ?ON(N1, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))), + %% Should be no-op. + ?assertEqual(ok, ?ON(N2, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))), + ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]), + + ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + + %% Now join the rest of the sites. + ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])), + ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]), + + ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + + ?tp(test_end_rebalance, #{}), + [ + ?defer_assert( + ?assertEqual( + 16 * 3 div length(Nodes), + n_shards_online(Node, ?DB), + "Each node is now responsible for 3/4 of the shards" + ) + ) + || Node <- Nodes + ], + + %% Verify that the set of shard servers matches the target allocation. + Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes], + ShardServers = [ + shard_server_info(N, ?DB, Shard, Site, readiness) + || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation), + Shard <- Shards + ], + ?assert( + lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers), + ShardServers + ), + + %% Verify that the messages are preserved after the rebalance: + ?block_until(#{?snk_kind := all_done}), + timer:sleep(5000), + verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams), + + %% Scale down the cluster by removing the first node. + ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), + ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]), + ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + + %% Verify that each node is now responsible for each shard. + ?defer_assert( + ?assertEqual( + [0, 16, 16, 16], + [n_shards_online(N, ?DB) || N <- Nodes] + ) + ), + + %% Verify that the messages are once again preserved after the rebalance: + verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams) + end, + [] + ). t_join_leave_errors(init, Config) -> Apps = [appspec(emqx_durable_storage)], @@ -400,6 +437,9 @@ t_rebalance_chaotic_converges(Config) -> ds_repl_meta(N1, db_sites, [?DB]) ), + %% Wait until the LTS timestamp is updated + timer:sleep(5000), + %% Check that all messages are still there. Messages = lists:append(TransitionMessages) ++ Messages0, MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)), @@ -502,14 +542,16 @@ fill_storage(Node, DB, NMsgs, Opts) -> fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs -> PAddGen = maps:get(p_addgen, Opts, 0.001), R1 = push_message(Node, DB, I, Opts), - R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end), + %probably(PAddGen, fun() -> add_generation(Node, DB) end), + R2 = [], R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts); fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) -> []. push_message(Node, DB, I, Opts) -> Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]), - {Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)), + %% {Bytes, _} = rand:bytes_s(5, rand:seed_s(default, I)), + Bytes = integer_to_binary(I), ClientId = maps:get(client_id, Opts, <>), Message = message(ClientId, Topic, Bytes, I * 100), ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]), @@ -545,9 +587,14 @@ probably(P, Fun) -> sample(N, List) -> L = length(List), - H = N div 2, - Filler = integer_to_list(L - N) ++ " more", - lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L). + case L =< N of + true -> + L; + false -> + H = N div 2, + Filler = integer_to_list(L - N) ++ " more", + lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L) + end. %% @@ -563,3 +610,145 @@ init_per_testcase(TCName, Config0) -> end_per_testcase(TCName, Config) -> ok = snabbkaffe:stop(), emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config). + +without_extra(L) -> + [I#message{extra = #{}} || I <- L]. + +%% Consume data from the DS storage on a given node as a stream: +-type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}). + +%% Create a stream from the topic (wildcards are NOT supported for a +%% good reason: order of messages is implementation-dependent!): +-spec ds_topic_stream(binary(), binary(), node()) -> ds_stream(). +ds_topic_stream(ClientId, TopicBin, Node) -> + Topic = emqx_topic:words(TopicBin), + Shard = shard_of_clientid(Node, ClientId), + {ShardId, DSStreams} = + ?ON( + Node, + begin + DBShard = {?DB, Shard}, + {DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)} + end + ), + %% Sort streams by their rank Y, and chain them together: + emqx_utils_stream:chain([ + ds_topic_generation_stream(Node, ShardId, Topic, S) + || {_RankY, S} <- lists:sort(DSStreams) + ]). + +%% Note: produces messages with keys +ds_topic_generation_stream(Node, Shard, Topic, Stream) -> + {ok, Iterator} = ?ON( + Node, + emqx_ds_storage_layer:make_iterator(Shard, Stream, Topic, 0) + ), + do_ds_topic_generation_stream(Node, Shard, Iterator). + +do_ds_topic_generation_stream(Node, Shard, It0) -> + Now = 99999999999999999999, + fun() -> + case ?ON(Node, emqx_ds_storage_layer:next(Shard, It0, 1, Now)) of + {ok, It, []} -> + []; + {ok, It, [KeyMsg]} -> + [KeyMsg | do_ds_topic_generation_stream(Node, Shard, It)] + end + end. + +%% Payload generation: + +apply_stream(DB, NodeStream0, Stream0, N) -> + case emqx_utils_stream:next(Stream0) of + [] -> + ?tp(all_done, #{}); + [Msg = #message{from = From} | Stream] -> + [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), + ?tp( + test_push_message, + maps:merge( + emqx_message:to_map(Msg), + #{n => N} + ) + ), + ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), + apply_stream(DB, NodeStream, Stream, N + 1) + end. + +%% @doc Create an infinite list of messages from a given client: +topic_messages(TestCase, ClientId) -> + topic_messages(TestCase, ClientId, 0). + +topic_messages(TestCase, ClientId, N) -> + fun() -> + Msg = #message{ + from = ClientId, + topic = client_topic(TestCase, ClientId), + timestamp = N * 100, + payload = integer_to_binary(N) + }, + [Msg | topic_messages(TestCase, ClientId, N + 1)] + end. + +client_topic(TestCase, ClientId) when is_atom(TestCase) -> + client_topic(atom_to_binary(TestCase, utf8), ClientId); +client_topic(TestCase, ClientId) when is_binary(TestCase) -> + <>. + +message_eq(Msg1, {Key, Msg2}) -> + %% Timestamps can be modified by the replication layer, ignore them: + Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}. + +%% Stream comparison: + +-spec verify_stream_effects(binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> ok. +verify_stream_effects(TestCase, Nodes0, L) -> + lists:foreach( + fun({ClientId, Stream}) -> + Nodes = nodes_of_clientid(ClientId, Nodes0), + ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]), + ?defer_assert( + ?assertMatch([_ | _], Nodes, ["No nodes have been allocated for ", ClientId]) + ), + [verify_stream_effects(TestCase, Node, ClientId, Stream) || Node <- Nodes] + end, + L + ). + +-spec verify_stream_effects(binary(), node(), emqx_types:clientid(), ds_stream()) -> ok. +verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) -> + ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]), + ?defer_assert( + begin + snabbkaffe_diff:assert_lists_eq( + ExpectedStream, + ds_topic_stream(ClientId, client_topic(TestCase, ClientId), Node), + ?diff_opts#{comment => #{clientid => ClientId, node => Node}} + ), + ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node]) + end + ). + +%% Find which nodes from the list contain the shards for the given +%% client ID: +nodes_of_clientid(ClientId, Nodes = [N0 | _]) -> + Shard = shard_of_clientid(N0, ClientId), + SiteNodes = ?ON( + N0, + begin + Sites = emqx_ds_replication_layer_meta:replica_set(?DB, Shard), + lists:map(fun emqx_ds_replication_layer_meta:node/1, Sites) + end + ), + lists:filter( + fun(N) -> + lists:member(N, SiteNodes) + end, + Nodes + ). + +shard_of_clientid(Node, ClientId) -> + ?ON( + Node, + emqx_ds_replication_layer:shard_of_message(?DB, #message{from = ClientId}, clientid) + ). diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index f54752230..26469c685 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -18,6 +18,8 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx_utils/include/emqx_message.hrl"). + %% RPC mocking mock_rpc() -> diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index fac536532..e22f97ed7 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -23,8 +23,11 @@ mqueue/1, map/2, transpose/1, + chain/1, chain/2, - repeat/1 + repeat/1, + interleave/1, + limit_length/2 ]). %% Evaluating @@ -118,6 +121,11 @@ transpose_tail(S, Tail) -> end end. +%% @doc Make a stream by concatenating multiple streams. +-spec chain([stream(X)]) -> stream(X). +chain(L) -> + lists:foldl(fun chain/2, empty(), L). + %% @doc Make a stream by chaining (concatenating) two streams. %% The second stream begins to produce values only after the first one is exhausted. -spec chain(stream(X), stream(Y)) -> stream(X | Y). @@ -144,6 +152,41 @@ repeat(S) -> end end. +%% @doc Interleave the elements of the streams. +%% +%% This function accepts a list of tuples where the first element +%% specifies size of the "batch" to be consumed from the stream at a +%% time (stream is the second tuple element). If element of the list +%% is a plain stream, then the batch size is assumed to be 1. +-spec interleave([stream(X) | {non_neg_integer(), stream(X)}]) -> stream(X). +interleave(L0) -> + L = lists:map( + fun + (Stream) when is_function(Stream) -> + {1, Stream}; + (A = {N, _}) when N >= 0 -> + A + end, + L0 + ), + fun() -> + do_interleave(0, L, []) + end. + +%% @doc Truncate list to the given length +-spec limit_length(non_neg_integer(), stream(X)) -> stream(X). +limit_length(0, _) -> + fun() -> [] end; +limit_length(N, S) when N >= 0 -> + fun() -> + case next(S) of + [] -> + []; + [X | S1] -> + [X | limit_length(N - 1, S1)] + end + end. + %% %% @doc Produce the next value from the stream. @@ -237,3 +280,22 @@ csv_read_line([Line | Lines]) -> {Fields, Lines}; csv_read_line([]) -> eof. + +do_interleave(_, [], []) -> + []; +do_interleave(N, [{N, S} | Rest], Rev) -> + do_interleave(0, Rest, [{N, S} | Rev]); +do_interleave(_, [], Rev) -> + do_interleave(0, lists:reverse(Rev), []); +do_interleave(I, [{N, S} | Rest], Rev) when I < N -> + case next(S) of + [] -> + do_interleave(0, Rest, Rev); + [X | S1] -> + [ + X + | fun() -> + do_interleave(I + 1, [{N, S1} | Rest], Rev) + end + ] + end. diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 60b67a4ff..0b117215e 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -157,6 +157,14 @@ mqueue_test() -> emqx_utils_stream:consume(emqx_utils_stream:mqueue(400)) ). +interleave_test() -> + S1 = emqx_utils_stream:list([1, 2, 3]), + S2 = emqx_utils_stream:list([a, b, c, d]), + ?assertEqual( + [1, 2, a, b, 3, c, d], + emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}])) + ). + csv_test() -> Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>, ?assertEqual(