diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 19cf43474..cf0390012 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -381,6 +381,10 @@ list_nodes() -> %% +%% TODO +%% Too large for normal operation, need better backpressure mechanism. +-define(RA_TIMEOUT, 60 * 1000). + -define(SAFERPC(SERVER, EXPR), make_safe_rpc(SERVER, fun() -> EXPR end)). -define(SAFERPC(SERVER, EXPR, RET), make_safe_rpc(SERVER, fun() -> EXPR end, RET)). -define(GENRPC(SERVER, EXPR), make_gen_rpc(SERVER, fun() -> EXPR end)). @@ -391,17 +395,17 @@ ra_store_batch(DB, Shard, Messages) -> ?batch_messages => Messages }, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command) of + case ra:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; Error -> - error(Error, [DB, Shard]) + Error end. ra_add_generation(DB, Shard) -> Command = #{?tag => add_generation}, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command) of + case ra:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; Error -> @@ -411,7 +415,7 @@ ra_add_generation(DB, Shard) -> ra_update_config(DB, Shard, Opts) -> Command = #{?tag => update_config, ?config => Opts}, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command) of + case ra:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; Error -> @@ -421,7 +425,7 @@ ra_update_config(DB, Shard, Opts) -> ra_drop_generation(DB, Shard, GenId) -> Command = #{?tag => drop_generation, ?generation => GenId}, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command) of + case ra:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; Error -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 9b24f63ae..e2fc17089 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -55,6 +55,8 @@ %% API functions %%================================================================================ +-define(STORE_TIMEOUT, 60 * 1000). + -spec start_link(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, pid()}. start_link(DB, Shard) -> gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []). @@ -67,7 +69,11 @@ store_batch(DB, Messages, Opts) -> fun(MessageIn) -> Message = emqx_message:set_timestamp(emqx_ds:timestamp_us(), MessageIn), Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), - gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync}) + ok = gen_server:call( + ?via(DB, Shard), + #enqueue_req{message = Message, sync = Sync}, + ?STORE_TIMEOUT + ) end, Messages ). @@ -119,22 +125,39 @@ terminate(_Reason, _S) -> %% Internal functions %%================================================================================ +-define(COOLDOWN_MIN, 1000). +-define(COOLDOWN_MAX, 5000). + do_flush(S = #s{batch = []}) -> S#s{tref = start_timer()}; do_flush( S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard} ) -> %% FIXME - ok = emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)), - [gen_server:reply(From, ok) || From <- lists:reverse(Replies)], - ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard}), - erlang:garbage_collect(), - S#s{ - n = 0, - batch = [], - pending_replies = [], - tref = start_timer() - }. + case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of + ok -> + lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), + ?tp( + emqx_ds_replication_layer_egress_flush, + #{db => DB, shard => Shard} + ), + true = erlang:garbage_collect(), + S#s{ + n = 0, + batch = [], + pending_replies = [], + tref = start_timer() + }; + {error, Reason} -> + ?tp( + warning, + emqx_ds_replication_layer_egress_flush_failed, + #{db => DB, shard => Shard, reason => Reason} + ), + Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), + ok = timer:sleep(Cooldown), + S#s{tref = start_timer()} + end. do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),