diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 61126c164..3ff87ab44 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -672,22 +672,15 @@ apply( ?tag := ?BATCH, ?batch_messages := MessagesIn }, - #{db_shard := DBShard, latest := Latest} = State + #{db_shard := DBShard, latest := Latest0} = State ) -> %% NOTE %% Unique timestamp tracking real time closely. %% With microsecond granularity it should be nearly impossible for it to run %% too far ahead than the real time clock. - {NLatest, Messages} = assign_timestamps(Latest, MessagesIn), - %% TODO - %% Batch is now reversed, but it should not make a lot of difference. - %% Even if it would be in order, it's still possible to write messages far away - %% in the past, i.e. when replica catches up with the leader. Storage layer - %% currently relies on wall clock time to decide if it's safe to iterate over - %% next epoch, this is likely wrong. Ideally it should rely on consensus clock - %% time instead. + {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), - NState = State#{latest := NLatest}, + NState = State#{latest := Latest}, %% TODO: Need to measure effects of changing frequency of `release_cursor`. Effect = {release_cursor, RaftIdx, NState}, {NState, Result, Effect}; @@ -730,7 +723,7 @@ assign_timestamps(Latest, [MessageIn | Rest], Acc) -> assign_timestamps(Latest + 1, Rest, [Message | Acc]) end; assign_timestamps(Latest, [], Acc) -> - {Latest, Acc}. + {Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> {TimestampUs, Message}.