diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 6b94f3d74..11801b098 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -338,9 +338,9 @@ publish(_PacketId, Msg, Session) -> puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) -> case emqx_persistent_message_ds_replayer:commit_offset(Id, ack, PacketId, Inflight0) of {true, Inflight} -> - %% TODO + %% TODO: we pass a bogus message into the hook: Msg = emqx_message:make(Id, <<>>, <<>>), - {ok, Msg, [], Session#{inflight => Inflight}}; + {ok, Msg, [], pull_now(Session#{inflight => Inflight})}; {false, _} -> %% Invalid Packet Id {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -356,9 +356,9 @@ puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) -> pubrec(PacketId, Session = #{id := Id, inflight := Inflight0}) -> case emqx_persistent_message_ds_replayer:commit_offset(Id, rec, PacketId, Inflight0) of {true, Inflight} -> - %% TODO + %% TODO: we pass a bogus message into the hook: Msg = emqx_message:make(Id, <<>>, <<>>), - {ok, Msg, Session#{inflight => Inflight}}; + {ok, Msg, pull_now(Session#{inflight => Inflight})}; {false, _} -> %% Invalid Packet Id {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -967,6 +967,10 @@ ensure_timers(Session0) -> Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1), emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2). +-spec pull_now(session()) -> session(). +pull_now(Session) -> + emqx_session:reset_timer(?TIMER_PULL, 0, Session). + -spec receive_maximum(conninfo()) -> pos_integer(). receive_maximum(ConnInfo) -> %% Note: the default value should be always set by the channel