From a0a39770434f20c497de9e55357116d5fa3c944b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 8 May 2024 23:15:46 +0200 Subject: [PATCH] feat(ds): Assign latest timestamp deterministically --- .../src/emqx_ds_replication_layer.erl | 12 ++-- .../src/emqx_ds_replication_layer.hrl | 1 + .../test/emqx_ds_replication_SUITE.erl | 68 +++++++++++++------ apps/emqx_utils/src/emqx_utils_stream.erl | 32 +++++---- 4 files changed, 76 insertions(+), 37 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index afc8db40d..c6fdc69aa 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -734,20 +734,20 @@ apply( {State, Result}; apply( _RaftMeta, - #{?tag := storage_event, ?payload := CustomEvent}, + #{?tag := storage_event, ?payload := CustomEvent, ?now := Now}, #{db_shard := DBShard, latest := Latest0} = State ) -> - {Timestamp, Latest} = ensure_monotonic_timestamp(emqx_ds:timestamp_us(), Latest0), + Latest = max(Latest0, Now), set_ts(DBShard, Latest), ?tp( debug, emqx_ds_replication_layer_storage_event, #{ - shard => DBShard, ts => Timestamp, payload => CustomEvent + shard => DBShard, payload => CustomEvent, latest => Latest } ), - Effects = handle_custom_event(DBShard, Timestamp, CustomEvent), - {State#{latest := Latest}, ok, Effects}. + Effects = handle_custom_event(DBShard, Latest, CustomEvent), + {State#{latest => Latest}, ok, Effects}. -spec tick(integer(), ra_state()) -> ra_machine:effects(). tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> @@ -791,7 +791,7 @@ snapshot_module() -> handle_custom_event(DBShard, Latest, Event) -> try Events = emqx_ds_storage_layer:handle_event(DBShard, Latest, Event), - [{append, #{?tag => storage_event, ?payload => I}} || I <- Events] + [{append, #{?tag => storage_event, ?payload => I, ?now => Latest}} || I <- Events] catch EC:Err:Stacktrace -> ?tp(error, ds_storage_custom_even_fail, #{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl index 960824143..4472b5a47 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -43,5 +43,6 @@ %% custom events -define(payload, 2). +-define(now, 3). -endif. diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 4670dfeb0..f3a61d377 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -41,7 +41,7 @@ opts(Overrides) -> #{ backend => builtin, %% storage => {emqx_ds_storage_reference, #{}}, - storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 1}}, + storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 10}}, n_shards => 16, n_sites => 1, replication_factor => 3, @@ -159,7 +159,6 @@ t_rebalance('end', Config) -> t_rebalance(Config) -> NMsgs = 50, NClients = 5, - NEvents = NMsgs * NClients, %% List of fake client IDs: Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], %% List of streams that generate messages for each "client" in its own topic: @@ -168,7 +167,16 @@ t_rebalance(Config) -> || ClientId <- Clients ], %% Interleaved list of events: - Stream = emqx_utils_stream:interleave([{2, Stream} || {_ClientId, Stream} <- TopicStreams]), + Stream0 = emqx_utils_stream:interleave( + [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true + ), + Stream = emqx_utils_stream:interleave( + [ + {50, Stream0}, + emqx_utils_stream:const(add_generation) + ], + false + ), Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), ?check_trace( #{timetrap => 30_000}, @@ -176,15 +184,22 @@ t_rebalance(Config) -> %% 0. Inject schedulings to make sure the messages are %% written to the storage before, during, and after %% rebalance: - ?force_ordering(#{?snk_kind := test_push_message, n := 10}, #{ - ?snk_kind := test_start_rebalance - }), - ?force_ordering(#{?snk_kind := test_start_rebalance}, #{ - ?snk_kind := test_push_message, n := 20 - }), - ?force_ordering(#{?snk_kind := test_end_rebalance}, #{ - ?snk_kind := test_push_message, n := 30 - }), + ?force_ordering( + #{?snk_kind := test_push_message, n := 10}, + #{?snk_kind := test_start_rebalance} + ), + ?force_ordering( + #{?snk_kind := test_start_rebalance1}, + #{?snk_kind := test_push_message, n := 20} + ), + ?force_ordering( + #{?snk_kind := test_push_message, n := 30}, + #{?snk_kind := test_start_rebalance2} + ), + ?force_ordering( + #{?snk_kind := test_end_rebalance}, + #{?snk_kind := test_push_message, n := 40} + ), %% 1. Initialize DB on the first node. Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), @@ -224,7 +239,7 @@ t_rebalance(Config) -> ), %% 3. Start rebalance in the meanwhile: - ?tp(test_start_rebalance, #{}), + ?tp(test_start_rebalance1, #{}), %% 3.1 Join the second site to the DB replication sites. ?assertEqual(ok, ?ON(N1, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))), %% Should be no-op. @@ -233,6 +248,7 @@ t_rebalance(Config) -> ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + ?tp(test_start_rebalance2, #{}), %% Now join the rest of the sites. ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])), ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]), @@ -619,7 +635,9 @@ without_extra(L) -> -type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}). %% Create a stream from the topic (wildcards are NOT supported for a -%% good reason: order of messages is implementation-dependent!): +%% good reason: order of messages is implementation-dependent!). +%% +%% Note: stream produces messages with keys -spec ds_topic_stream(binary(), binary(), node()) -> ds_stream(). ds_topic_stream(ClientId, TopicBin, Node) -> Topic = emqx_topic:words(TopicBin), @@ -638,7 +656,6 @@ ds_topic_stream(ClientId, TopicBin, Node) -> || {_RankY, S} <- lists:sort(DSStreams) ]). -%% Note: produces messages with keys ds_topic_generation_stream(Node, Shard, Topic, Stream) -> {ok, Iterator} = ?ON( Node, @@ -647,11 +664,20 @@ ds_topic_generation_stream(Node, Shard, Topic, Stream) -> do_ds_topic_generation_stream(Node, Shard, Iterator). do_ds_topic_generation_stream(Node, Shard, It0) -> - Now = 99999999999999999999, fun() -> - case ?ON(Node, emqx_ds_storage_layer:next(Shard, It0, 1, Now)) of + case + ?ON( + Node, + begin + Now = emqx_ds_replication_layer:current_timestamp(?DB, Shard), + emqx_ds_storage_layer:next(Shard, It0, 1, Now) + end + ) + of {ok, It, []} -> []; + {ok, end_of_stream} -> + []; {ok, It, [KeyMsg]} -> [KeyMsg | do_ds_topic_generation_stream(Node, Shard, It)] end @@ -673,7 +699,11 @@ apply_stream(DB, NodeStream0, Stream0, N) -> ) ), ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), - apply_stream(DB, NodeStream, Stream, N + 1) + apply_stream(DB, NodeStream, Stream, N + 1); + [add_generation | Stream] -> + [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), + %% add_generation(Node, DB), + apply_stream(DB, NodeStream, Stream, N) end. %% @doc Create an infinite list of messages from a given client: @@ -724,7 +754,7 @@ verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) -> snabbkaffe_diff:assert_lists_eq( ExpectedStream, ds_topic_stream(ClientId, client_topic(TestCase, ClientId), Node), - ?diff_opts#{comment => #{clientid => ClientId, node => Node}} + ?diff_opts ), ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node]) end diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index e22f97ed7..a38deceeb 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -20,13 +20,14 @@ -export([ empty/0, list/1, + const/1, mqueue/1, map/2, transpose/1, chain/1, chain/2, repeat/1, - interleave/1, + interleave/2, limit_length/2 ]). @@ -72,6 +73,11 @@ list([]) -> list([X | Rest]) -> fun() -> [X | list(Rest)] end. +%% @doc Make a stream with a single element infinitely repeated +-spec const(T) -> stream(T). +const(T) -> + fun() -> [T | const(T)] end. + %% @doc Make a stream out of process message queue. -spec mqueue(timeout()) -> stream(any()). mqueue(Timeout) -> @@ -158,8 +164,8 @@ repeat(S) -> %% specifies size of the "batch" to be consumed from the stream at a %% time (stream is the second tuple element). If element of the list %% is a plain stream, then the batch size is assumed to be 1. --spec interleave([stream(X) | {non_neg_integer(), stream(X)}]) -> stream(X). -interleave(L0) -> +-spec interleave([stream(X) | {non_neg_integer(), stream(X)}], boolean()) -> stream(X). +interleave(L0, ContinueAtEmpty) -> L = lists:map( fun (Stream) when is_function(Stream) -> @@ -170,7 +176,7 @@ interleave(L0) -> L0 ), fun() -> - do_interleave(0, L, []) + do_interleave(ContinueAtEmpty, 0, L, []) end. %% @doc Truncate list to the given length @@ -281,21 +287,23 @@ csv_read_line([Line | Lines]) -> csv_read_line([]) -> eof. -do_interleave(_, [], []) -> +do_interleave(_Cont, _, [], []) -> []; -do_interleave(N, [{N, S} | Rest], Rev) -> - do_interleave(0, Rest, [{N, S} | Rev]); -do_interleave(_, [], Rev) -> - do_interleave(0, lists:reverse(Rev), []); -do_interleave(I, [{N, S} | Rest], Rev) when I < N -> +do_interleave(Cont, N, [{N, S} | Rest], Rev) -> + do_interleave(Cont, 0, Rest, [{N, S} | Rev]); +do_interleave(Cont, _, [], Rev) -> + do_interleave(Cont, 0, lists:reverse(Rev), []); +do_interleave(Cont, I, [{N, S} | Rest], Rev) when I < N -> case next(S) of + [] when Cont -> + do_interleave(Cont, 0, Rest, Rev); [] -> - do_interleave(0, Rest, Rev); + []; [X | S1] -> [ X | fun() -> - do_interleave(I + 1, [{N, S1} | Rest], Rev) + do_interleave(Cont, I + 1, [{N, S1} | Rest], Rev) end ] end.