feat(dsrepl): rely on accumulated log size to decide when to flush

This commit is contained in:
Andrew Mayorov 2024-06-14 18:41:44 +02:00
parent b6a249baa9
commit 8ff48ac5ea
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 70 additions and 30 deletions

View File

@ -581,8 +581,18 @@ list_nodes() ->
-define(RA_TIMEOUT, 60 * 1000).
%% How often to release Raft logs?
%% Each N log entries mark everything up to the last N entries "releasable".
-define(RA_RELEASE_LOG_FREQ, 1000).
%% Each time we written approximately this number of bytes.
%% Close to the RocksDB's default of 64 MiB.
-define(RA_RELEASE_LOG_APPROX_SIZE, 50_000_000).
%% ...Or at least each N log entries.
-define(RA_RELEASE_LOG_MIN_FREQ, 64_000).
-ifdef(TEST).
-undef(RA_RELEASE_LOG_APPROX_SIZE).
-undef(RA_RELEASE_LOG_MIN_FREQ).
-define(RA_RELEASE_LOG_APPROX_SIZE, 50_000).
-define(RA_RELEASE_LOG_MIN_FREQ, 1_000).
-endif.
-define(SAFE_ERPC(EXPR),
try
@ -752,6 +762,7 @@ ra_drop_shard(DB, Shard) ->
%%
-define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release').
-define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release').
-spec init(_Args :: map()) -> ra_state().
init(#{db := DB, shard := Shard}) ->
@ -768,11 +779,11 @@ apply(
#{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0
) ->
?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}),
{Latest, Messages} = assign_timestamps(Latest0, MessagesIn),
{Stats, Latest, Messages} = assign_timestamps(Latest0, MessagesIn),
Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}),
State = State0#{latest := Latest},
set_ts(DBShard, Latest),
Effects = try_release_log(RaftMeta, State),
Effects = try_release_log(Stats, RaftMeta, State),
Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
{State, Result, Effects};
apply(
@ -847,22 +858,27 @@ apply(
Effects = handle_custom_event(DBShard, Latest, CustomEvent),
{State#{latest => Latest}, ok, Effects}.
try_release_log(RaftMeta = #{index := CurrentIdx}, State) ->
try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) ->
%% NOTE
%% Release everything up to the last log entry, but only if there were more than
%% `?RA_RELEASE_LOG_FREQ` new entries since the last release.
case get_log_need_release(RaftMeta) of
undefined ->
[];
PrevIdx when ?RA_RELEASE_LOG_FREQ > CurrentIdx - PrevIdx ->
[];
_PrevIdx ->
%% TODO
%% Number of log entries is not the best metric. Because cursor release
%% means storage flush (see `emqx_ds_replication_snapshot:write/3`), we
%% should do that not too often (so the storage is happy with L0 SST size)
%% and not too rarely (so we don't accumulate huge Raft logs).
release_log(RaftMeta, State)
%% Because cursor release means storage flush (see
%% `emqx_ds_replication_snapshot:write/3`), we should do that not too often
%% (so the storage is happy with L0 SST sizes) and not too rarely (so we don't
%% accumulate huge Raft logs).
case inc_bytes_need_release(BatchSize) of
AccSize when AccSize > ?RA_RELEASE_LOG_APPROX_SIZE ->
release_log(RaftMeta, State);
_NotYet ->
case get_log_need_release(RaftMeta) of
undefined ->
[];
PrevIdx when CurrentIdx - PrevIdx > ?RA_RELEASE_LOG_MIN_FREQ ->
%% Release everything up to the last log entry, but only if there were
%% more than %% `?RA_RELEASE_LOG_MIN_FREQ` new entries since the last
%% release.
release_log(RaftMeta, State);
_ ->
[]
end
end.
release_log(RaftMeta = #{index := CurrentIdx}, State) ->
@ -871,6 +887,7 @@ release_log(RaftMeta = #{index := CurrentIdx}, State) ->
%% following `CurrentIdx` should not contribute to `State` (that will be recovered
%% from a snapshot).
update_log_need_release(RaftMeta),
reset_bytes_need_release(),
{release_cursor, CurrentIdx, State}.
get_log_need_release(RaftMeta) ->
@ -885,6 +902,17 @@ get_log_need_release(RaftMeta) ->
update_log_need_release(#{index := CurrentIdx}) ->
erlang:put(?pd_ra_idx_need_release, CurrentIdx).
get_bytes_need_release() ->
emqx_maybe:define(erlang:get(?pd_ra_bytes_need_release), 0).
inc_bytes_need_release(Size) ->
Acc = get_bytes_need_release() + Size,
erlang:put(?pd_ra_bytes_need_release, Acc),
Acc.
reset_bytes_need_release() ->
erlang:put(?pd_ra_bytes_need_release, 0).
-spec tick(integer(), ra_state()) -> ra_machine:effects().
tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
%% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard),
@ -893,19 +921,20 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
handle_custom_event(DBShard, Timestamp, tick).
assign_timestamps(Latest, Messages) ->
assign_timestamps(Latest, Messages, []).
assign_timestamps(Latest, Messages, [], 0, 0).
assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
case emqx_message:timestamp(MessageIn, microsecond) of
TimestampUs when TimestampUs > Latest ->
Message = assign_timestamp(TimestampUs, MessageIn),
assign_timestamps(TimestampUs, Rest, [Message | Acc]);
assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) ->
case emqx_message:timestamp(Message0, microsecond) of
TimestampUs when TimestampUs > Latest0 ->
Latest = TimestampUs,
Message = assign_timestamp(TimestampUs, Message0);
_Earlier ->
Message = assign_timestamp(Latest + 1, MessageIn),
assign_timestamps(Latest + 1, Rest, [Message | Acc])
end;
assign_timestamps(Latest, [], Acc) ->
{Latest, lists:reverse(Acc)}.
Latest = Latest0 + 1,
Message = assign_timestamp(Latest, Message0)
end,
assign_timestamps(Latest, Rest, [Message | Acc], N + 1, Sz + approx_message_size(Message0));
assign_timestamps(Latest, [], Acc, N, Size) ->
{{N, Size}, Latest, lists:reverse(Acc)}.
assign_timestamp(TimestampUs, Message) ->
{TimestampUs, Message}.
@ -938,3 +967,14 @@ handle_custom_event(DBShard, Latest, Event) ->
set_ts({DB, Shard}, TS) ->
emqx_ds_builtin_raft_sup:set_gvar(DB, ?gv_timestamp(Shard), TS).
%%
approx_message_size(#message{from = ClientID, topic = Topic, payload = Payload}) ->
MinOverhead = 40,
MinOverhead + clientid_size(ClientID) + byte_size(Topic) + byte_size(Payload).
clientid_size(ClientID) when is_binary(ClientID) ->
byte_size(ClientID);
clientid_size(ClientID) ->
erlang:external_size(ClientID).