fix(sn): update calling to session APIs
This commit is contained in:
parent
b69dca4f08
commit
e220810b90
|
@ -527,7 +527,7 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
|
||||||
clientinfo = ClientInfo = #{clientid := ClientId}}) ->
|
clientinfo = ClientInfo = #{clientid := ClientId}}) ->
|
||||||
case ReturnCode of
|
case ReturnCode of
|
||||||
?SN_RC_ACCEPTED ->
|
?SN_RC_ACCEPTED ->
|
||||||
case emqx_session:puback(MsgId, Session) of
|
case emqx_session:puback(ClientInfo, MsgId, Session) of
|
||||||
{ok, Msg, NSession} ->
|
{ok, Msg, NSession} ->
|
||||||
ok = after_message_acked(ClientInfo, Msg, Channel),
|
ok = after_message_acked(ClientInfo, Msg, Channel),
|
||||||
{ok, Channel#channel{session = NSession}};
|
{ok, Channel#channel{session = NSession}};
|
||||||
|
@ -574,7 +574,7 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId),
|
||||||
Channel = #channel{ctx = Ctx,
|
Channel = #channel{ctx = Ctx,
|
||||||
session = Session,
|
session = Session,
|
||||||
clientinfo = ClientInfo}) ->
|
clientinfo = ClientInfo}) ->
|
||||||
case emqx_session:pubrec(MsgId, Session) of
|
case emqx_session:pubrec(ClientInfo, MsgId, Session) of
|
||||||
{ok, Msg, NSession} ->
|
{ok, Msg, NSession} ->
|
||||||
ok = after_message_acked(ClientInfo, Msg, Channel),
|
ok = after_message_acked(ClientInfo, Msg, Channel),
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = Channel#channel{session = NSession},
|
||||||
|
@ -596,8 +596,8 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId),
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId),
|
handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId),
|
||||||
Channel = #channel{ctx = Ctx, session = Session}) ->
|
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) ->
|
||||||
case emqx_session:pubrel(MsgId, Session) of
|
case emqx_session:pubrel(ClientInfo, MsgId, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = Channel#channel{session = NSession},
|
||||||
handle_out(pubcomp, MsgId, NChannel);
|
handle_out(pubcomp, MsgId, NChannel);
|
||||||
|
@ -611,8 +611,8 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId),
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
|
handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
|
||||||
Channel = #channel{ctx = Ctx, session = Session}) ->
|
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) ->
|
||||||
case emqx_session:pubcomp(MsgId, Session) of
|
case emqx_session:pubcomp(ClientInfo, MsgId, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, Channel#channel{session = NSession}};
|
{ok, Channel#channel{session = NSession}};
|
||||||
{ok, Publishes, NSession} ->
|
{ok, Publishes, NSession} ->
|
||||||
|
@ -823,8 +823,8 @@ do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_1}, Channel) ->
|
||||||
handle_out(puback, {TopicId, MsgId, ?SN_RC_ACCEPTED}, Channel);
|
handle_out(puback, {TopicId, MsgId, ?SN_RC_ACCEPTED}, Channel);
|
||||||
|
|
||||||
do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_2},
|
do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_2},
|
||||||
Channel = #channel{ctx = Ctx, session = Session}) ->
|
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) ->
|
||||||
case emqx_session:publish(MsgId, Msg, Session) of
|
case emqx_session:publish(ClientInfo, MsgId, Msg, Session) of
|
||||||
{ok, _PubRes, NSession} ->
|
{ok, _PubRes, NSession} ->
|
||||||
NChannel1 = ensure_timer(await_timer,
|
NChannel1 = ensure_timer(await_timer,
|
||||||
Channel#channel{session = NSession}
|
Channel#channel{session = NSession}
|
||||||
|
@ -1012,9 +1012,9 @@ do_unsubscribe(TopicFilters,
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Awake & Asleep
|
%% Awake & Asleep
|
||||||
|
|
||||||
awake(Channel = #channel{session = Session}) ->
|
awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
|
||||||
{ok, Publishes, Session1} = emqx_session:replay(Session),
|
{ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
|
||||||
{NPublishes, NSession} = case emqx_session:deliver([], Session1) of
|
{NPublishes, NSession} = case emqx_session:deliver(ClientInfo, [], Session1) of
|
||||||
{ok, Session2} ->
|
{ok, Session2} ->
|
||||||
{Publishes, Session2};
|
{Publishes, Session2};
|
||||||
{ok, More, Session2} ->
|
{ok, More, Session2} ->
|
||||||
|
@ -1109,9 +1109,9 @@ maybe_resume_session(#channel{resuming = false}) ->
|
||||||
ignore;
|
ignore;
|
||||||
maybe_resume_session(#channel{session = Session,
|
maybe_resume_session(#channel{session = Session,
|
||||||
resuming = true,
|
resuming = true,
|
||||||
pendings = Pendings}) ->
|
pendings = Pendings, clientinfo = ClientInfo}) ->
|
||||||
{ok, Publishes, Session1} = emqx_session:replay(Session),
|
{ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
|
||||||
case emqx_session:deliver(Pendings, Session1) of
|
case emqx_session:deliver(ClientInfo, Pendings, Session1) of
|
||||||
{ok, Session2} ->
|
{ok, Session2} ->
|
||||||
{ok, Publishes, Session2};
|
{ok, Publishes, Session2};
|
||||||
{ok, More, Session2} ->
|
{ok, More, Session2} ->
|
||||||
|
@ -1335,10 +1335,10 @@ handle_deliver(Delivers, Channel = #channel{
|
||||||
ctx = Ctx,
|
ctx = Ctx,
|
||||||
conn_state = ConnState,
|
conn_state = ConnState,
|
||||||
session = Session,
|
session = Session,
|
||||||
clientinfo = #{clientid := ClientId}})
|
clientinfo = ClientInfo = #{clientid := ClientId}})
|
||||||
when ConnState =:= disconnected;
|
when ConnState =:= disconnected;
|
||||||
ConnState =:= asleep ->
|
ConnState =:= asleep ->
|
||||||
NSession = emqx_session:enqueue(
|
NSession = emqx_session:enqueue(ClientInfo,
|
||||||
ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx),
|
ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx),
|
||||||
Session
|
Session
|
||||||
),
|
),
|
||||||
|
@ -1359,8 +1359,8 @@ handle_deliver(Delivers, Channel = #channel{
|
||||||
handle_deliver(Delivers, Channel = #channel{
|
handle_deliver(Delivers, Channel = #channel{
|
||||||
ctx = Ctx,
|
ctx = Ctx,
|
||||||
session = Session,
|
session = Session,
|
||||||
clientinfo = #{clientid := ClientId}}) ->
|
clientinfo = ClientInfo = #{clientid := ClientId}}) ->
|
||||||
case emqx_session:deliver(
|
case emqx_session:deliver(ClientInfo,
|
||||||
ignore_local(Delivers, ClientId, Session, Ctx),
|
ignore_local(Delivers, ClientId, Session, Ctx),
|
||||||
Session
|
Session
|
||||||
) of
|
) of
|
||||||
|
@ -1427,8 +1427,8 @@ handle_timeout(_TRef, retry_delivery,
|
||||||
Channel = #channel{conn_state = asleep}) ->
|
Channel = #channel{conn_state = asleep}) ->
|
||||||
{ok, reset_timer(retry_timer, Channel)};
|
{ok, reset_timer(retry_timer, Channel)};
|
||||||
handle_timeout(_TRef, retry_delivery,
|
handle_timeout(_TRef, retry_delivery,
|
||||||
Channel = #channel{session = Session}) ->
|
Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
|
||||||
case emqx_session:retry(Session) of
|
case emqx_session:retry(ClientInfo, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
||||||
{ok, Publishes, Timeout, NSession} ->
|
{ok, Publishes, Timeout, NSession} ->
|
||||||
|
@ -1443,8 +1443,8 @@ handle_timeout(_TRef, expire_awaiting_rel,
|
||||||
Channel = #channel{conn_state = asleep}) ->
|
Channel = #channel{conn_state = asleep}) ->
|
||||||
{ok, reset_timer(await_timer, Channel)};
|
{ok, reset_timer(await_timer, Channel)};
|
||||||
handle_timeout(_TRef, expire_awaiting_rel,
|
handle_timeout(_TRef, expire_awaiting_rel,
|
||||||
Channel = #channel{session = Session}) ->
|
Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
|
||||||
case emqx_session:expire(awaiting_rel, Session) of
|
case emqx_session:expire(ClientInfo, awaiting_rel, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, clean_timer(await_timer, Channel#channel{session = NSession})};
|
{ok, clean_timer(await_timer, Channel#channel{session = NSession})};
|
||||||
{ok, Timeout, NSession} ->
|
{ok, Timeout, NSession} ->
|
||||||
|
|
Loading…
Reference in New Issue