fix(sessds): Schedule poll immediately upon receiving an ack
This commit affects the flow control, and improves the throughput by removing a delay between freeing up space in the in-flight window and polling new messages.
This commit is contained in:
parent
38800c0260
commit
4717e56fb6
|
@ -338,9 +338,9 @@ publish(_PacketId, Msg, Session) ->
|
||||||
puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) ->
|
puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) ->
|
||||||
case emqx_persistent_message_ds_replayer:commit_offset(Id, ack, PacketId, Inflight0) of
|
case emqx_persistent_message_ds_replayer:commit_offset(Id, ack, PacketId, Inflight0) of
|
||||||
{true, Inflight} ->
|
{true, Inflight} ->
|
||||||
%% TODO
|
%% TODO: we pass a bogus message into the hook:
|
||||||
Msg = emqx_message:make(Id, <<>>, <<>>),
|
Msg = emqx_message:make(Id, <<>>, <<>>),
|
||||||
{ok, Msg, [], Session#{inflight => Inflight}};
|
{ok, Msg, [], pull_now(Session#{inflight => Inflight})};
|
||||||
{false, _} ->
|
{false, _} ->
|
||||||
%% Invalid Packet Id
|
%% Invalid Packet Id
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
||||||
|
@ -356,9 +356,9 @@ puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) ->
|
||||||
pubrec(PacketId, Session = #{id := Id, inflight := Inflight0}) ->
|
pubrec(PacketId, Session = #{id := Id, inflight := Inflight0}) ->
|
||||||
case emqx_persistent_message_ds_replayer:commit_offset(Id, rec, PacketId, Inflight0) of
|
case emqx_persistent_message_ds_replayer:commit_offset(Id, rec, PacketId, Inflight0) of
|
||||||
{true, Inflight} ->
|
{true, Inflight} ->
|
||||||
%% TODO
|
%% TODO: we pass a bogus message into the hook:
|
||||||
Msg = emqx_message:make(Id, <<>>, <<>>),
|
Msg = emqx_message:make(Id, <<>>, <<>>),
|
||||||
{ok, Msg, Session#{inflight => Inflight}};
|
{ok, Msg, pull_now(Session#{inflight => Inflight})};
|
||||||
{false, _} ->
|
{false, _} ->
|
||||||
%% Invalid Packet Id
|
%% Invalid Packet Id
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
||||||
|
@ -967,6 +967,10 @@ ensure_timers(Session0) ->
|
||||||
Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1),
|
Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1),
|
||||||
emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2).
|
emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2).
|
||||||
|
|
||||||
|
-spec pull_now(session()) -> session().
|
||||||
|
pull_now(Session) ->
|
||||||
|
emqx_session:reset_timer(?TIMER_PULL, 0, Session).
|
||||||
|
|
||||||
-spec receive_maximum(conninfo()) -> pos_integer().
|
-spec receive_maximum(conninfo()) -> pos_integer().
|
||||||
receive_maximum(ConnInfo) ->
|
receive_maximum(ConnInfo) ->
|
||||||
%% Note: the default value should be always set by the channel
|
%% Note: the default value should be always set by the channel
|
||||||
|
|
Loading…
Reference in New Issue