Merge pull request #12762 from thalesmg/ds-fix-sync-egress-reply-m-20240321
fix(ds): add caller to pending replies before flushing and reply failures to sync callers
This commit is contained in:
commit
23ad37f566
|
@ -161,12 +161,13 @@ do_flush(
|
||||||
) ->
|
) ->
|
||||||
case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of
|
case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of
|
||||||
ok ->
|
ok ->
|
||||||
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
|
|
||||||
true = erlang:garbage_collect(),
|
|
||||||
?tp(
|
?tp(
|
||||||
emqx_ds_replication_layer_egress_flush,
|
emqx_ds_replication_layer_egress_flush,
|
||||||
#{db => DB, shard => Shard, batch => Messages}
|
#{db => DB, shard => Shard, batch => Messages}
|
||||||
);
|
),
|
||||||
|
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
|
||||||
|
true = erlang:garbage_collect(),
|
||||||
|
ok;
|
||||||
Error ->
|
Error ->
|
||||||
true = erlang:garbage_collect(),
|
true = erlang:garbage_collect(),
|
||||||
?tp(
|
?tp(
|
||||||
|
@ -175,7 +176,11 @@ do_flush(
|
||||||
#{db => DB, shard => Shard, reason => Error}
|
#{db => DB, shard => Shard, reason => Error}
|
||||||
),
|
),
|
||||||
Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
|
Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
|
||||||
ok = timer:sleep(Cooldown)
|
ok = timer:sleep(Cooldown),
|
||||||
|
%% Since we drop the entire batch here, we at least reply callers with an
|
||||||
|
%% error so they don't hang indefinitely in the `gen_server' call with
|
||||||
|
%% `infinity' timeout.
|
||||||
|
lists:foreach(fun(From) -> gen_server:reply(From, {error, Error}) end, Replies)
|
||||||
end,
|
end,
|
||||||
S#s{
|
S#s{
|
||||||
n = 0,
|
n = 0,
|
||||||
|
@ -193,14 +198,6 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies
|
||||||
Msg ->
|
Msg ->
|
||||||
S0#s{n = N + 1, batch = [Msg | Batch]}
|
S0#s{n = N + 1, batch = [Msg | Batch]}
|
||||||
end,
|
end,
|
||||||
S2 =
|
|
||||||
case N >= NMax of
|
|
||||||
true ->
|
|
||||||
_ = erlang:cancel_timer(S0#s.tref),
|
|
||||||
do_flush(S1);
|
|
||||||
false ->
|
|
||||||
S1
|
|
||||||
end,
|
|
||||||
%% TODO: later we may want to delay the reply until the message is
|
%% TODO: later we may want to delay the reply until the message is
|
||||||
%% replicated, but it requies changes to the PUBACK/PUBREC flow to
|
%% replicated, but it requies changes to the PUBACK/PUBREC flow to
|
||||||
%% allow for async replies. For now, we ack when the message is
|
%% allow for async replies. For now, we ack when the message is
|
||||||
|
@ -208,12 +205,20 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies
|
||||||
%%
|
%%
|
||||||
%% Otherwise, the client would freeze for at least flush interval,
|
%% Otherwise, the client would freeze for at least flush interval,
|
||||||
%% or until the buffer is filled.
|
%% or until the buffer is filled.
|
||||||
S =
|
S2 =
|
||||||
case Sync of
|
case Sync of
|
||||||
true ->
|
true ->
|
||||||
S2#s{pending_replies = [From | Replies]};
|
S1#s{pending_replies = [From | Replies]};
|
||||||
false ->
|
false ->
|
||||||
gen_server:reply(From, ok),
|
gen_server:reply(From, ok),
|
||||||
|
S1
|
||||||
|
end,
|
||||||
|
S =
|
||||||
|
case N >= NMax of
|
||||||
|
true ->
|
||||||
|
_ = erlang:cancel_timer(S2#s.tref),
|
||||||
|
do_flush(S2);
|
||||||
|
false ->
|
||||||
S2
|
S2
|
||||||
end,
|
end,
|
||||||
%% TODO: add a backpressure mechanism for the server to avoid
|
%% TODO: add a backpressure mechanism for the server to avoid
|
||||||
|
|
Loading…
Reference in New Issue