diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index af0aeec7d..a2476ba66 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -581,8 +581,18 @@ list_nodes() -> -define(RA_TIMEOUT, 60 * 1000). %% How often to release Raft logs? -%% Each N log entries mark everything up to the last N entries "releasable". --define(RA_RELEASE_LOG_FREQ, 1000). +%% Each time we written approximately this number of bytes. +%% Close to the RocksDB's default of 64 MiB. +-define(RA_RELEASE_LOG_APPROX_SIZE, 50_000_000). +%% ...Or at least each N log entries. +-define(RA_RELEASE_LOG_MIN_FREQ, 64_000). + +-ifdef(TEST). +-undef(RA_RELEASE_LOG_APPROX_SIZE). +-undef(RA_RELEASE_LOG_MIN_FREQ). +-define(RA_RELEASE_LOG_APPROX_SIZE, 50_000). +-define(RA_RELEASE_LOG_MIN_FREQ, 1_000). +-endif. -define(SAFE_ERPC(EXPR), try @@ -752,6 +762,7 @@ ra_drop_shard(DB, Shard) -> %% -define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release'). +-define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release'). -spec init(_Args :: map()) -> ra_state(). init(#{db := DB, shard := Shard}) -> @@ -768,11 +779,11 @@ apply( #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0 ) -> ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}), - {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), + {Stats, Latest, Messages} = assign_timestamps(Latest0, MessagesIn), Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}), State = State0#{latest := Latest}, set_ts(DBShard, Latest), - Effects = try_release_log(RaftMeta, State), + Effects = try_release_log(Stats, RaftMeta, State), Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), {State, Result, Effects}; apply( @@ -847,22 +858,27 @@ apply( Effects = handle_custom_event(DBShard, Latest, CustomEvent), {State#{latest => Latest}, ok, Effects}. -try_release_log(RaftMeta = #{index := CurrentIdx}, State) -> +try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) -> %% NOTE - %% Release everything up to the last log entry, but only if there were more than - %% `?RA_RELEASE_LOG_FREQ` new entries since the last release. - case get_log_need_release(RaftMeta) of - undefined -> - []; - PrevIdx when ?RA_RELEASE_LOG_FREQ > CurrentIdx - PrevIdx -> - []; - _PrevIdx -> - %% TODO - %% Number of log entries is not the best metric. Because cursor release - %% means storage flush (see `emqx_ds_replication_snapshot:write/3`), we - %% should do that not too often (so the storage is happy with L0 SST size) - %% and not too rarely (so we don't accumulate huge Raft logs). - release_log(RaftMeta, State) + %% Because cursor release means storage flush (see + %% `emqx_ds_replication_snapshot:write/3`), we should do that not too often + %% (so the storage is happy with L0 SST sizes) and not too rarely (so we don't + %% accumulate huge Raft logs). + case inc_bytes_need_release(BatchSize) of + AccSize when AccSize > ?RA_RELEASE_LOG_APPROX_SIZE -> + release_log(RaftMeta, State); + _NotYet -> + case get_log_need_release(RaftMeta) of + undefined -> + []; + PrevIdx when CurrentIdx - PrevIdx > ?RA_RELEASE_LOG_MIN_FREQ -> + %% Release everything up to the last log entry, but only if there were + %% more than %% `?RA_RELEASE_LOG_MIN_FREQ` new entries since the last + %% release. + release_log(RaftMeta, State); + _ -> + [] + end end. release_log(RaftMeta = #{index := CurrentIdx}, State) -> @@ -871,6 +887,7 @@ release_log(RaftMeta = #{index := CurrentIdx}, State) -> %% following `CurrentIdx` should not contribute to `State` (that will be recovered %% from a snapshot). update_log_need_release(RaftMeta), + reset_bytes_need_release(), {release_cursor, CurrentIdx, State}. get_log_need_release(RaftMeta) -> @@ -885,6 +902,17 @@ get_log_need_release(RaftMeta) -> update_log_need_release(#{index := CurrentIdx}) -> erlang:put(?pd_ra_idx_need_release, CurrentIdx). +get_bytes_need_release() -> + emqx_maybe:define(erlang:get(?pd_ra_bytes_need_release), 0). + +inc_bytes_need_release(Size) -> + Acc = get_bytes_need_release() + Size, + erlang:put(?pd_ra_bytes_need_release, Acc), + Acc. + +reset_bytes_need_release() -> + erlang:put(?pd_ra_bytes_need_release, 0). + -spec tick(integer(), ra_state()) -> ra_machine:effects(). tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), @@ -893,19 +921,20 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> - assign_timestamps(Latest, Messages, []). + assign_timestamps(Latest, Messages, [], 0, 0). -assign_timestamps(Latest, [MessageIn | Rest], Acc) -> - case emqx_message:timestamp(MessageIn, microsecond) of - TimestampUs when TimestampUs > Latest -> - Message = assign_timestamp(TimestampUs, MessageIn), - assign_timestamps(TimestampUs, Rest, [Message | Acc]); +assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) -> + case emqx_message:timestamp(Message0, microsecond) of + TimestampUs when TimestampUs > Latest0 -> + Latest = TimestampUs, + Message = assign_timestamp(TimestampUs, Message0); _Earlier -> - Message = assign_timestamp(Latest + 1, MessageIn), - assign_timestamps(Latest + 1, Rest, [Message | Acc]) - end; -assign_timestamps(Latest, [], Acc) -> - {Latest, lists:reverse(Acc)}. + Latest = Latest0 + 1, + Message = assign_timestamp(Latest, Message0) + end, + assign_timestamps(Latest, Rest, [Message | Acc], N + 1, Sz + approx_message_size(Message0)); +assign_timestamps(Latest, [], Acc, N, Size) -> + {{N, Size}, Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> {TimestampUs, Message}. @@ -938,3 +967,14 @@ handle_custom_event(DBShard, Latest, Event) -> set_ts({DB, Shard}, TS) -> emqx_ds_builtin_raft_sup:set_gvar(DB, ?gv_timestamp(Shard), TS). + +%% + +approx_message_size(#message{from = ClientID, topic = Topic, payload = Payload}) -> + MinOverhead = 40, + MinOverhead + clientid_size(ClientID) + byte_size(Topic) + byte_size(Payload). + +clientid_size(ClientID) when is_binary(ClientID) -> + byte_size(ClientID); +clientid_size(ClientID) -> + erlang:external_size(ClientID).