From be793e4735d044ad98b3fe98de939b2da4eda429 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 1 Feb 2024 13:03:57 +0100 Subject: [PATCH] fix(dsrepl): reassign timestamp at the time of submission This is needed to ensure total message order for a shard, and guarantee that no messages will be written "in the past". which may break replay consistency. --- .../src/emqx_ds_replication_layer.erl | 36 ++++++++++++------- .../src/emqx_ds_replication_layer_egress.erl | 11 ++++-- .../src/emqx_ds_storage_reference.erl | 3 +- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index badad7fc8..cb2001d6e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -555,8 +555,7 @@ ra_start_shard(DB, Shard) -> ra_store_batch(DB, Shard, Messages) -> Command = #{ ?tag => ?BATCH, - ?batch_messages => Messages, - ?timestamp => emqx_ds:timestamp_us() + ?batch_messages => Messages }, case ra:process_command(ra_leader_servers(DB, Shard), Command) of {ok, Result, _Leader} -> @@ -652,8 +651,7 @@ apply( #{index := RaftIdx}, #{ ?tag := ?BATCH, - ?batch_messages := MessagesIn, - ?timestamp := TimestampLocal + ?batch_messages := MessagesIn }, #{latest := Latest} = State ) -> @@ -661,18 +659,30 @@ apply( %% Unique timestamp tracking real time closely. %% With microsecond granularity it should be nearly impossible for it to run %% too far ahead than the real time clock. - Timestamp = max(Latest + 1, TimestampLocal), - Messages = assign_timestamps(Timestamp, MessagesIn), + {NLatest, Messages} = assign_timestamps(Latest, MessagesIn), + %% TODO + %% Batch is now reversed, but it should not make a lot of difference. + %% Even if it would be in order, it's still possible to write messages far away + %% in the past, i.e. when replica catches up with the leader. Storage layer + %% currently relies on wall clock time to decide if it's safe to iterate over + %% next epoch, this is likely wrong. Ideally it should rely on consensus clock + %% time instead. Result = emqx_ds_storage_layer:store_batch(erlang:get(emqx_ds_db_shard), Messages, #{}), - %% NOTE: Last assigned timestamp. - NLatest = Timestamp + length(Messages) - 1, NState = State#{latest := NLatest}, %% TODO: Need to measure effects of changing frequency of `release_cursor`. Effect = {release_cursor, RaftIdx, NState}, {NState, Result, Effect}. -assign_timestamps(Timestamp, [MessageIn | Rest]) -> - Message = emqx_message:set_timestamp(Timestamp, MessageIn), - [Message | assign_timestamps(Timestamp + 1, Rest)]; -assign_timestamps(_Timestamp, []) -> - []. +assign_timestamps(Latest, Messages) -> + assign_timestamps(Latest, Messages, []). + +assign_timestamps(Latest, [MessageIn | Rest], Acc) -> + case emqx_message:timestamp(MessageIn) of + Later when Later > Latest -> + assign_timestamps(Later, Rest, [MessageIn | Acc]); + _Earlier -> + Message = emqx_message:set_timestamp(Latest + 1, MessageIn), + assign_timestamps(Latest + 1, Rest, [Message | Acc]) + end; +assign_timestamps(Latest, [], Acc) -> + {Latest, Acc}. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 515bae1b6..c130b8b2f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -67,8 +67,9 @@ store_batch(DB, Messages, Opts) -> case maps:get(atomic, Opts, false) of false -> lists:foreach( - fun(Message) -> - Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), + fun(MessageIn) -> + Shard = emqx_ds_replication_layer:shard_of_message(DB, MessageIn, clientid), + Message = emqx_message:set_timestamp(emqx_ds:timestamp_us(), MessageIn), gen_server:call( ?via(DB, Shard), #enqueue_req{ @@ -83,10 +84,14 @@ store_batch(DB, Messages, Opts) -> true -> maps:foreach( fun(Shard, Batch) -> + Timestamp = emqx_ds:timestamp_us(), gen_server:call( ?via(DB, Shard), #enqueue_atomic_req{ - batch = Batch, + batch = [ + emqx_message:set_timestamp(Timestamp, Message) + || Message <- Batch + ], sync = Sync }, infinity diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index c9b5bad60..9c217ef48 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -118,8 +118,7 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := tru store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> lists:foreach( fun(Msg) -> - Id = erlang:unique_integer([monotonic]), - Key = <>, + Key = <<(emqx_message:timestamp(Msg)):64>>, Val = term_to_binary(Msg), rocksdb:put(DB, CF, Key, Val, []) end,