diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 5dc42cc88..4a0a49b91 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -527,7 +527,7 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), clientinfo = ClientInfo = #{clientid := ClientId}}) -> case ReturnCode of ?SN_RC_ACCEPTED -> - case emqx_session:puback(MsgId, Session) of + case emqx_session:puback(ClientInfo, MsgId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), {ok, Channel#channel{session = NSession}}; @@ -574,7 +574,7 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId), Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) -> - case emqx_session:pubrec(MsgId, Session) of + case emqx_session:pubrec(ClientInfo, MsgId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), NChannel = Channel#channel{session = NSession}, @@ -596,8 +596,8 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId), end; handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId), - Channel = #channel{ctx = Ctx, session = Session}) -> - case emqx_session:pubrel(MsgId, Session) of + Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) -> + case emqx_session:pubrel(ClientInfo, MsgId, Session) of {ok, NSession} -> NChannel = Channel#channel{session = NSession}, handle_out(pubcomp, MsgId, NChannel); @@ -611,8 +611,8 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId), end; handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId), - Channel = #channel{ctx = Ctx, session = Session}) -> - case emqx_session:pubcomp(MsgId, Session) of + Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) -> + case emqx_session:pubcomp(ClientInfo, MsgId, Session) of {ok, NSession} -> {ok, Channel#channel{session = NSession}}; {ok, Publishes, NSession} -> @@ -823,8 +823,8 @@ do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_1}, Channel) -> handle_out(puback, {TopicId, MsgId, ?SN_RC_ACCEPTED}, Channel); do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_2}, - Channel = #channel{ctx = Ctx, session = Session}) -> - case emqx_session:publish(MsgId, Msg, Session) of + Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) -> + case emqx_session:publish(ClientInfo, MsgId, Msg, Session) of {ok, _PubRes, NSession} -> NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession} @@ -1012,9 +1012,9 @@ do_unsubscribe(TopicFilters, %%-------------------------------------------------------------------- %% Awake & Asleep -awake(Channel = #channel{session = Session}) -> - {ok, Publishes, Session1} = emqx_session:replay(Session), - {NPublishes, NSession} = case emqx_session:deliver([], Session1) of +awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) -> + {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), + {NPublishes, NSession} = case emqx_session:deliver(ClientInfo, [], Session1) of {ok, Session2} -> {Publishes, Session2}; {ok, More, Session2} -> @@ -1109,9 +1109,9 @@ maybe_resume_session(#channel{resuming = false}) -> ignore; maybe_resume_session(#channel{session = Session, resuming = true, - pendings = Pendings}) -> - {ok, Publishes, Session1} = emqx_session:replay(Session), - case emqx_session:deliver(Pendings, Session1) of + pendings = Pendings, clientinfo = ClientInfo}) -> + {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), + case emqx_session:deliver(ClientInfo, Pendings, Session1) of {ok, Session2} -> {ok, Publishes, Session2}; {ok, More, Session2} -> @@ -1335,10 +1335,10 @@ handle_deliver(Delivers, Channel = #channel{ ctx = Ctx, conn_state = ConnState, session = Session, - clientinfo = #{clientid := ClientId}}) + clientinfo = ClientInfo = #{clientid := ClientId}}) when ConnState =:= disconnected; ConnState =:= asleep -> - NSession = emqx_session:enqueue( + NSession = emqx_session:enqueue(ClientInfo, ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx), Session ), @@ -1359,8 +1359,8 @@ handle_deliver(Delivers, Channel = #channel{ handle_deliver(Delivers, Channel = #channel{ ctx = Ctx, session = Session, - clientinfo = #{clientid := ClientId}}) -> - case emqx_session:deliver( + clientinfo = ClientInfo = #{clientid := ClientId}}) -> + case emqx_session:deliver(ClientInfo, ignore_local(Delivers, ClientId, Session, Ctx), Session ) of @@ -1427,8 +1427,8 @@ handle_timeout(_TRef, retry_delivery, Channel = #channel{conn_state = asleep}) -> {ok, reset_timer(retry_timer, Channel)}; handle_timeout(_TRef, retry_delivery, - Channel = #channel{session = Session}) -> - case emqx_session:retry(Session) of + Channel = #channel{session = Session, clientinfo = ClientInfo}) -> + case emqx_session:retry(ClientInfo, Session) of {ok, NSession} -> {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; {ok, Publishes, Timeout, NSession} -> @@ -1443,8 +1443,8 @@ handle_timeout(_TRef, expire_awaiting_rel, Channel = #channel{conn_state = asleep}) -> {ok, reset_timer(await_timer, Channel)}; handle_timeout(_TRef, expire_awaiting_rel, - Channel = #channel{session = Session}) -> - case emqx_session:expire(awaiting_rel, Session) of + Channel = #channel{session = Session, clientinfo = ClientInfo}) -> + case emqx_session:expire(ClientInfo, awaiting_rel, Session) of {ok, NSession} -> {ok, clean_timer(await_timer, Channel#channel{session = NSession})}; {ok, Timeout, NSession} ->