diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl index 29fc98083..e55f4807e 100644 --- a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl @@ -231,9 +231,9 @@ flush_buffer(DB, Shard, Messages, S0 = #bs{options = Options}) -> make_batch(_ForceMonotonic = true, Latest, Messages) -> assign_monotonic_timestamps(Latest, Messages, []); make_batch(false, Latest, Messages) -> - assign_message_timestamps(Latest, Messages, []). + assign_operation_timestamps(Latest, Messages, []). -assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) -> +assign_monotonic_timestamps(Latest0, [Message = #message{} | Rest], Acc0) -> case emqx_message:timestamp(Message, microsecond) of TimestampUs when TimestampUs > Latest0 -> Latest = TimestampUs; @@ -242,15 +242,21 @@ assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) -> end, Acc = [assign_timestamp(Latest, Message) | Acc0], assign_monotonic_timestamps(Latest, Rest, Acc); +assign_monotonic_timestamps(Latest, [Operation | Rest], Acc0) -> + Acc = [Operation | Acc0], + assign_monotonic_timestamps(Latest, Rest, Acc); assign_monotonic_timestamps(Latest, [], Acc) -> {Latest, lists:reverse(Acc)}. -assign_message_timestamps(Latest0, [Message | Rest], Acc0) -> - TimestampUs = emqx_message:timestamp(Message, microsecond), +assign_operation_timestamps(Latest0, [Message = #message{} | Rest], Acc0) -> + TimestampUs = emqx_message:timestamp(Message), Latest = max(TimestampUs, Latest0), Acc = [assign_timestamp(TimestampUs, Message) | Acc0], - assign_message_timestamps(Latest, Rest, Acc); -assign_message_timestamps(Latest, [], Acc) -> + assign_operation_timestamps(Latest, Rest, Acc); +assign_operation_timestamps(Latest, [Operation | Rest], Acc0) -> + Acc = [Operation | Acc0], + assign_operation_timestamps(Latest, Rest, Acc); +assign_operation_timestamps(Latest, [], Acc) -> {Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index 3430a3bda..6acbf94ca 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -1036,9 +1036,9 @@ assign_timestamps(true, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) -> MSize = approx_message_size(Message0), assign_timestamps(true, Latest, Rest, [Message | Acc], N + 1, Sz + MSize); assign_timestamps(false, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) -> - Timestamp = emqx_message:timestamp(Message0), - Latest = max(Latest0, Timestamp), - Message = assign_timestamp(Timestamp, Message0), + TimestampUs = emqx_message:timestamp(Message0), + Latest = max(Latest0, TimestampUs), + Message = assign_timestamp(TimestampUs, Message0), MSize = approx_message_size(Message0), assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize); assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) ->