fix(dsbackend): unify timestamp resolution in operations / preconditions
This commit is contained in:
parent
810a4d3cf9
commit
58b9ab0210
|
@ -231,9 +231,9 @@ flush_buffer(DB, Shard, Messages, S0 = #bs{options = Options}) ->
|
||||||
make_batch(_ForceMonotonic = true, Latest, Messages) ->
|
make_batch(_ForceMonotonic = true, Latest, Messages) ->
|
||||||
assign_monotonic_timestamps(Latest, Messages, []);
|
assign_monotonic_timestamps(Latest, Messages, []);
|
||||||
make_batch(false, Latest, Messages) ->
|
make_batch(false, Latest, Messages) ->
|
||||||
assign_message_timestamps(Latest, Messages, []).
|
assign_operation_timestamps(Latest, Messages, []).
|
||||||
|
|
||||||
assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) ->
|
assign_monotonic_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
|
||||||
case emqx_message:timestamp(Message, microsecond) of
|
case emqx_message:timestamp(Message, microsecond) of
|
||||||
TimestampUs when TimestampUs > Latest0 ->
|
TimestampUs when TimestampUs > Latest0 ->
|
||||||
Latest = TimestampUs;
|
Latest = TimestampUs;
|
||||||
|
@ -242,15 +242,21 @@ assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) ->
|
||||||
end,
|
end,
|
||||||
Acc = [assign_timestamp(Latest, Message) | Acc0],
|
Acc = [assign_timestamp(Latest, Message) | Acc0],
|
||||||
assign_monotonic_timestamps(Latest, Rest, Acc);
|
assign_monotonic_timestamps(Latest, Rest, Acc);
|
||||||
|
assign_monotonic_timestamps(Latest, [Operation | Rest], Acc0) ->
|
||||||
|
Acc = [Operation | Acc0],
|
||||||
|
assign_monotonic_timestamps(Latest, Rest, Acc);
|
||||||
assign_monotonic_timestamps(Latest, [], Acc) ->
|
assign_monotonic_timestamps(Latest, [], Acc) ->
|
||||||
{Latest, lists:reverse(Acc)}.
|
{Latest, lists:reverse(Acc)}.
|
||||||
|
|
||||||
assign_message_timestamps(Latest0, [Message | Rest], Acc0) ->
|
assign_operation_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
|
||||||
TimestampUs = emqx_message:timestamp(Message, microsecond),
|
TimestampUs = emqx_message:timestamp(Message),
|
||||||
Latest = max(TimestampUs, Latest0),
|
Latest = max(TimestampUs, Latest0),
|
||||||
Acc = [assign_timestamp(TimestampUs, Message) | Acc0],
|
Acc = [assign_timestamp(TimestampUs, Message) | Acc0],
|
||||||
assign_message_timestamps(Latest, Rest, Acc);
|
assign_operation_timestamps(Latest, Rest, Acc);
|
||||||
assign_message_timestamps(Latest, [], Acc) ->
|
assign_operation_timestamps(Latest, [Operation | Rest], Acc0) ->
|
||||||
|
Acc = [Operation | Acc0],
|
||||||
|
assign_operation_timestamps(Latest, Rest, Acc);
|
||||||
|
assign_operation_timestamps(Latest, [], Acc) ->
|
||||||
{Latest, lists:reverse(Acc)}.
|
{Latest, lists:reverse(Acc)}.
|
||||||
|
|
||||||
assign_timestamp(TimestampUs, Message) ->
|
assign_timestamp(TimestampUs, Message) ->
|
||||||
|
|
|
@ -1036,9 +1036,9 @@ assign_timestamps(true, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
|
||||||
MSize = approx_message_size(Message0),
|
MSize = approx_message_size(Message0),
|
||||||
assign_timestamps(true, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
|
assign_timestamps(true, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
|
||||||
assign_timestamps(false, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
|
assign_timestamps(false, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
|
||||||
Timestamp = emqx_message:timestamp(Message0),
|
TimestampUs = emqx_message:timestamp(Message0),
|
||||||
Latest = max(Latest0, Timestamp),
|
Latest = max(Latest0, TimestampUs),
|
||||||
Message = assign_timestamp(Timestamp, Message0),
|
Message = assign_timestamp(TimestampUs, Message0),
|
||||||
MSize = approx_message_size(Message0),
|
MSize = approx_message_size(Message0),
|
||||||
assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
|
assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
|
||||||
assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) ->
|
assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) ->
|
||||||
|
|
Loading…
Reference in New Issue