diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index 34cb5fc4d..fe7da3f8f 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -579,6 +579,10 @@ list_nodes() -> %% Too large for normal operation, need better backpressure mechanism. -define(RA_TIMEOUT, 60 * 1000). +%% How often to release Raft logs? +%% Each N log entries mark everything up to the last N entries "releasable". +-define(RA_RELEASE_LOG_FREQ, 1000). + -define(SAFE_ERPC(EXPR), try EXPR @@ -746,6 +750,8 @@ ra_drop_shard(DB, Shard) -> %% +-define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release'). + -spec init(_Args :: map()) -> ra_state(). init(#{db := DB, shard := Shard}) -> #{db_shard => {DB, Shard}, latest => 0}. @@ -753,7 +759,7 @@ init(#{db := DB, shard := Shard}) -> -spec apply(ra_machine:command_meta_data(), ra_command(), ra_state()) -> {ra_state(), _Reply, _Effects}. apply( - #{index := RaftIdx}, + RaftMeta, #{ ?tag := ?BATCH, ?batch_messages := MessagesIn @@ -765,17 +771,17 @@ apply( Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), State = State0#{latest := Latest}, set_ts(DBShard, Latest), - %% TODO: Need to measure effects of changing frequency of `release_cursor`. - Effect = {release_cursor, RaftIdx, State}, - {State, Result, Effect}; + Effects = try_release_log(RaftMeta, State), + Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), + {State, Result, Effects}; apply( - _RaftMeta, + RaftMeta, #{?tag := add_generation, ?since := Since}, #{db_shard := DBShard, latest := Latest0} = State0 ) -> ?tp( info, - ds_replication_layer_add_generation, + ds_ra_add_generation, #{ shard => DBShard, since => Since @@ -785,15 +791,17 @@ apply( Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp), State = State0#{latest := Latest}, set_ts(DBShard, Latest), - {State, Result}; + Effects = release_log(RaftMeta, State), + Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), + {State, Result, Effects}; apply( - _RaftMeta, + RaftMeta, #{?tag := update_config, ?since := Since, ?config := Opts}, #{db_shard := DBShard, latest := Latest0} = State0 ) -> ?tp( notice, - ds_replication_layer_update_config, + ds_ra_update_config, #{ shard => DBShard, config => Opts, @@ -803,7 +811,9 @@ apply( {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0), Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts), State = State0#{latest := Latest}, - {State, Result}; + Effects = release_log(RaftMeta, State), + Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), + {State, Result, Effects}; apply( _RaftMeta, #{?tag := drop_generation, ?generation := GenId}, @@ -811,7 +821,7 @@ apply( ) -> ?tp( info, - ds_replication_layer_drop_generation, + ds_ra_drop_generation, #{ shard => DBShard, generation => GenId @@ -828,7 +838,7 @@ apply( set_ts(DBShard, Latest), ?tp( debug, - emqx_ds_replication_layer_storage_event, + ds_ra_storage_event, #{ shard => DBShard, payload => CustomEvent, latest => Latest } @@ -836,6 +846,44 @@ apply( Effects = handle_custom_event(DBShard, Latest, CustomEvent), {State#{latest => Latest}, ok, Effects}. +try_release_log(RaftMeta = #{index := CurrentIdx}, State) -> + %% NOTE + %% Release everything up to the last log entry, but only if there were more than + %% `?RA_RELEASE_LOG_FREQ` new entries since the last release. + case get_log_need_release(RaftMeta) of + undefined -> + []; + PrevIdx when ?RA_RELEASE_LOG_FREQ > CurrentIdx - PrevIdx -> + []; + _PrevIdx -> + %% TODO + %% Number of log entries is not the best metric. Because cursor release + %% means storage flush (see `emqx_ds_replication_snapshot:write/3`), we + %% should do that not too often (so the storage is happy with L0 SST size) + %% and not too rarely (so we don't accumulate huge Raft logs). + release_log(RaftMeta, State) + end. + +release_log(RaftMeta = #{index := CurrentIdx}, State) -> + %% NOTE + %% Release everything up to the last log entry. This is important: any log entries + %% following `CurrentIdx` should not contribute to `State` (that will be recovered + %% from a snapshot). + update_log_need_release(RaftMeta), + {release_cursor, CurrentIdx, State}. + +get_log_need_release(RaftMeta) -> + case erlang:get(?pd_ra_idx_need_release) of + undefined -> + update_log_need_release(RaftMeta), + undefined; + LastIdx -> + LastIdx + end. + +update_log_need_release(#{index := CurrentIdx}) -> + erlang:put(?pd_ra_idx_need_release, CurrentIdx). + -spec tick(integer(), ra_state()) -> ra_machine:effects(). tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard),