fix(dsbackend): unify timestamp resolution in operations / preconditions

This commit is contained in:
Andrew Mayorov 2024-07-31 18:41:28 +02:00 committed by Thales Macedo Garitezi
parent 1559aac486
commit 109ffe7a70
2 changed files with 15 additions and 9 deletions

View File

@ -231,9 +231,9 @@ flush_buffer(DB, Shard, Messages, S0 = #bs{options = Options}) ->
make_batch(_ForceMonotonic = true, Latest, Messages) ->
assign_monotonic_timestamps(Latest, Messages, []);
make_batch(false, Latest, Messages) ->
assign_message_timestamps(Latest, Messages, []).
assign_operation_timestamps(Latest, Messages, []).
assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) ->
assign_monotonic_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
case emqx_message:timestamp(Message, microsecond) of
TimestampUs when TimestampUs > Latest0 ->
Latest = TimestampUs;
@ -242,15 +242,21 @@ assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) ->
end,
Acc = [assign_timestamp(Latest, Message) | Acc0],
assign_monotonic_timestamps(Latest, Rest, Acc);
assign_monotonic_timestamps(Latest, [Operation | Rest], Acc0) ->
Acc = [Operation | Acc0],
assign_monotonic_timestamps(Latest, Rest, Acc);
assign_monotonic_timestamps(Latest, [], Acc) ->
{Latest, lists:reverse(Acc)}.
assign_message_timestamps(Latest0, [Message | Rest], Acc0) ->
TimestampUs = emqx_message:timestamp(Message, microsecond),
assign_operation_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
TimestampUs = emqx_message:timestamp(Message),
Latest = max(TimestampUs, Latest0),
Acc = [assign_timestamp(TimestampUs, Message) | Acc0],
assign_message_timestamps(Latest, Rest, Acc);
assign_message_timestamps(Latest, [], Acc) ->
assign_operation_timestamps(Latest, Rest, Acc);
assign_operation_timestamps(Latest, [Operation | Rest], Acc0) ->
Acc = [Operation | Acc0],
assign_operation_timestamps(Latest, Rest, Acc);
assign_operation_timestamps(Latest, [], Acc) ->
{Latest, lists:reverse(Acc)}.
assign_timestamp(TimestampUs, Message) ->

View File

@ -1036,9 +1036,9 @@ assign_timestamps(true, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
MSize = approx_message_size(Message0),
assign_timestamps(true, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
assign_timestamps(false, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
Timestamp = emqx_message:timestamp(Message0),
Latest = max(Latest0, Timestamp),
Message = assign_timestamp(Timestamp, Message0),
TimestampUs = emqx_message:timestamp(Message0),
Latest = max(Latest0, TimestampUs),
Message = assign_timestamp(TimestampUs, Message0),
MSize = approx_message_size(Message0),
assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) ->