wip: reassign timestamp at the time of submission

This commit is contained in:
Andrew Mayorov 2024-02-01 13:03:57 +01:00
parent bb0cf62879
commit fcb5ed346f
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 26 additions and 16 deletions

View File

@ -437,8 +437,7 @@ ra_start_shard(DB, Shard) ->
ra_store_batch(DB, Shard, Messages) -> ra_store_batch(DB, Shard, Messages) ->
Command = #{ Command = #{
?tag => ?BATCH, ?tag => ?BATCH,
?batch_messages => Messages, ?batch_messages => Messages
?timestamp => emqx_ds:timestamp_us()
}, },
case ra:process_command(ra_leader_servers(DB, Shard), Command) of case ra:process_command(ra_leader_servers(DB, Shard), Command) of
{ok, Result, _Leader} -> {ok, Result, _Leader} ->
@ -534,8 +533,7 @@ apply(
#{index := RaftIdx}, #{index := RaftIdx},
#{ #{
?tag := ?BATCH, ?tag := ?BATCH,
?batch_messages := MessagesIn, ?batch_messages := MessagesIn
?timestamp := TimestampLocal
}, },
#{latest := Latest} = State #{latest := Latest} = State
) -> ) ->
@ -543,18 +541,30 @@ apply(
%% Unique timestamp tracking real time closely. %% Unique timestamp tracking real time closely.
%% With microsecond granularity it should be nearly impossible for it to run %% With microsecond granularity it should be nearly impossible for it to run
%% too far ahead than the real time clock. %% too far ahead than the real time clock.
Timestamp = max(Latest + 1, TimestampLocal), {NLatest, Messages} = assign_timestamps(Latest, MessagesIn),
Messages = assign_timestamps(Timestamp, MessagesIn), %% TODO
%% Batch is now reversed, but it should not make a lot of difference.
%% Even if it would be in order, it's still possible to write messages far away
%% in the past, i.e. when replica catches up with the leader. Storage layer
%% currently relies on wall clock time to decide if it's safe to iterate over
%% next epoch, this is likely wrong. Ideally it should rely on consensus clock
%% time instead.
Result = emqx_ds_storage_layer:store_batch(erlang:get(emqx_ds_db_shard), Messages, #{}), Result = emqx_ds_storage_layer:store_batch(erlang:get(emqx_ds_db_shard), Messages, #{}),
%% NOTE: Last assigned timestamp.
NLatest = Timestamp + length(Messages) - 1,
NState = State#{latest := NLatest}, NState = State#{latest := NLatest},
%% TODO: Need to measure effects of changing frequency of `release_cursor`. %% TODO: Need to measure effects of changing frequency of `release_cursor`.
Effect = {release_cursor, RaftIdx, NState}, Effect = {release_cursor, RaftIdx, NState},
{NState, Result, Effect}. {NState, Result, Effect}.
assign_timestamps(Timestamp, [MessageIn | Rest]) -> assign_timestamps(Latest, Messages) ->
Message = emqx_message:set_timestamp(Timestamp, MessageIn), assign_timestamps(Latest, Messages, []).
[Message | assign_timestamps(Timestamp + 1, Rest)];
assign_timestamps(_Timestamp, []) -> assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
[]. case emqx_message:timestamp(MessageIn) of
Later when Later > Latest ->
assign_timestamps(Later, Rest, [MessageIn | Acc]);
_Earlier ->
Message = emqx_message:set_timestamp(Latest + 1, MessageIn),
assign_timestamps(Latest + 1, Rest, [Message | Acc])
end;
assign_timestamps(Latest, [], Acc) ->
{Latest, Acc}.

View File

@ -64,7 +64,8 @@ start_link(DB, Shard) ->
store_batch(DB, Messages, Opts) -> store_batch(DB, Messages, Opts) ->
Sync = maps:get(sync, Opts, true), Sync = maps:get(sync, Opts, true),
lists:foreach( lists:foreach(
fun(Message) -> fun(MessageIn) ->
Message = emqx_message:set_timestamp(emqx_ds:timestamp_us(), MessageIn),
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync}) gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync})
end, end,

View File

@ -93,8 +93,7 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
lists:foreach( lists:foreach(
fun(Msg) -> fun(Msg) ->
Id = erlang:unique_integer([monotonic]), Key = <<(emqx_message:timestamp(Msg)):64>>,
Key = <<Id:64>>,
Val = term_to_binary(Msg), Val = term_to_binary(Msg),
rocksdb:put(DB, CF, Key, Val, []) rocksdb:put(DB, CF, Key, Val, [])
end, end,