fix(ds): add caller to pending replies before flushing
This commit is contained in:
parent
e2a2295c99
commit
70737a437a
|
@ -193,14 +193,6 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies
|
|||
Msg ->
|
||||
S0#s{n = N + 1, batch = [Msg | Batch]}
|
||||
end,
|
||||
S2 =
|
||||
case N >= NMax of
|
||||
true ->
|
||||
_ = erlang:cancel_timer(S0#s.tref),
|
||||
do_flush(S1);
|
||||
false ->
|
||||
S1
|
||||
end,
|
||||
%% TODO: later we may want to delay the reply until the message is
|
||||
%% replicated, but it requies changes to the PUBACK/PUBREC flow to
|
||||
%% allow for async replies. For now, we ack when the message is
|
||||
|
@ -208,12 +200,20 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies
|
|||
%%
|
||||
%% Otherwise, the client would freeze for at least flush interval,
|
||||
%% or until the buffer is filled.
|
||||
S =
|
||||
S2 =
|
||||
case Sync of
|
||||
true ->
|
||||
S2#s{pending_replies = [From | Replies]};
|
||||
S1#s{pending_replies = [From | Replies]};
|
||||
false ->
|
||||
gen_server:reply(From, ok),
|
||||
S1
|
||||
end,
|
||||
S =
|
||||
case N >= NMax of
|
||||
true ->
|
||||
_ = erlang:cancel_timer(S2#s.tref),
|
||||
do_flush(S2);
|
||||
false ->
|
||||
S2
|
||||
end,
|
||||
%% TODO: add a backpressure mechanism for the server to avoid
|
||||
|
|
Loading…
Reference in New Issue