wip: handle errors gracefully in shard egress process

Also add cooldown on timeout / unavailability.
This commit is contained in:
Andrew Mayorov 2024-02-19 19:09:22 +01:00
parent 77c65266d0
commit 83dd6a2896
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 43 additions and 16 deletions

View File

@ -381,6 +381,10 @@ list_nodes() ->
%%
%% TODO
%% Too large for normal operation, need better backpressure mechanism.
-define(RA_TIMEOUT, 60 * 1000).
-define(SAFERPC(SERVER, EXPR), make_safe_rpc(SERVER, fun() -> EXPR end)).
-define(SAFERPC(SERVER, EXPR, RET), make_safe_rpc(SERVER, fun() -> EXPR end, RET)).
-define(GENRPC(SERVER, EXPR), make_gen_rpc(SERVER, fun() -> EXPR end)).
@ -391,17 +395,17 @@ ra_store_batch(DB, Shard, Messages) ->
?batch_messages => Messages
},
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command) of
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->
error(Error, [DB, Shard])
Error
end.
ra_add_generation(DB, Shard) ->
Command = #{?tag => add_generation},
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command) of
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->
@ -411,7 +415,7 @@ ra_add_generation(DB, Shard) ->
ra_update_config(DB, Shard, Opts) ->
Command = #{?tag => update_config, ?config => Opts},
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command) of
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->
@ -421,7 +425,7 @@ ra_update_config(DB, Shard, Opts) ->
ra_drop_generation(DB, Shard, GenId) ->
Command = #{?tag => drop_generation, ?generation => GenId},
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command) of
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->

View File

@ -55,6 +55,8 @@
%% API functions
%%================================================================================
-define(STORE_TIMEOUT, 60 * 1000).
-spec start_link(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, pid()}.
start_link(DB, Shard) ->
gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
@ -67,7 +69,11 @@ store_batch(DB, Messages, Opts) ->
fun(MessageIn) ->
Message = emqx_message:set_timestamp(emqx_ds:timestamp_us(), MessageIn),
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync})
ok = gen_server:call(
?via(DB, Shard),
#enqueue_req{message = Message, sync = Sync},
?STORE_TIMEOUT
)
end,
Messages
).
@ -119,22 +125,39 @@ terminate(_Reason, _S) ->
%% Internal functions
%%================================================================================
-define(COOLDOWN_MIN, 1000).
-define(COOLDOWN_MAX, 5000).
do_flush(S = #s{batch = []}) ->
S#s{tref = start_timer()};
do_flush(
S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard}
) ->
%% FIXME
ok = emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)),
[gen_server:reply(From, ok) || From <- lists:reverse(Replies)],
?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard}),
erlang:garbage_collect(),
S#s{
n = 0,
batch = [],
pending_replies = [],
tref = start_timer()
}.
case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of
ok ->
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
?tp(
emqx_ds_replication_layer_egress_flush,
#{db => DB, shard => Shard}
),
true = erlang:garbage_collect(),
S#s{
n = 0,
batch = [],
pending_replies = [],
tref = start_timer()
};
{error, Reason} ->
?tp(
warning,
emqx_ds_replication_layer_egress_flush_failed,
#{db => DB, shard => Shard, reason => Reason}
),
Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
ok = timer:sleep(Cooldown),
S#s{tref = start_timer()}
end.
do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),