refactor: use set_session in preparation for persistent sessions
This commit is contained in:
parent
566f24e5d8
commit
24e870672c
|
@ -33,8 +33,6 @@
|
||||||
, get_mqtt_conf/2
|
, get_mqtt_conf/2
|
||||||
, get_mqtt_conf/3
|
, get_mqtt_conf/3
|
||||||
, set_conn_state/2
|
, set_conn_state/2
|
||||||
, get_session/1
|
|
||||||
, set_session/2
|
|
||||||
, stats/1
|
, stats/1
|
||||||
, caps/1
|
, caps/1
|
||||||
]).
|
]).
|
||||||
|
@ -180,9 +178,6 @@ info(timers, #channel{timers = Timers}) -> Timers.
|
||||||
set_conn_state(ConnState, Channel) ->
|
set_conn_state(ConnState, Channel) ->
|
||||||
Channel#channel{conn_state = ConnState}.
|
Channel#channel{conn_state = ConnState}.
|
||||||
|
|
||||||
get_session(#channel{session = Session}) ->
|
|
||||||
Session.
|
|
||||||
|
|
||||||
set_session(Session, Channel) ->
|
set_session(Session, Channel) ->
|
||||||
Channel#channel{session = Session}.
|
Channel#channel{session = Session}.
|
||||||
|
|
||||||
|
@ -369,10 +364,10 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
|
||||||
case emqx_session:puback(PacketId, Session) of
|
case emqx_session:puback(PacketId, Session) of
|
||||||
{ok, Msg, NSession} ->
|
{ok, Msg, NSession} ->
|
||||||
ok = after_message_acked(ClientInfo, Msg, Properties),
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
||||||
{ok, Channel#channel{session = NSession}};
|
{ok, set_session(NSession, Channel)};
|
||||||
{ok, Msg, Publishes, NSession} ->
|
{ok, Msg, Publishes, NSession} ->
|
||||||
ok = after_message_acked(ClientInfo, Msg, Properties),
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
||||||
handle_out(publish, Publishes, Channel#channel{session = NSession});
|
handle_out(publish, Publishes, set_session(NSession, Channel));
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}),
|
?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}),
|
||||||
ok = emqx_metrics:inc('packets.puback.inuse'),
|
ok = emqx_metrics:inc('packets.puback.inuse'),
|
||||||
|
@ -388,7 +383,7 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
|
||||||
case emqx_session:pubrec(PacketId, Session) of
|
case emqx_session:pubrec(PacketId, Session) of
|
||||||
{ok, Msg, NSession} ->
|
{ok, Msg, NSession} ->
|
||||||
ok = after_message_acked(ClientInfo, Msg, Properties),
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = set_session(NSession, Channel),
|
||||||
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
|
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
|
||||||
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}),
|
?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}),
|
||||||
|
@ -403,7 +398,7 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
|
||||||
handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
||||||
case emqx_session:pubrel(PacketId, Session) of
|
case emqx_session:pubrel(PacketId, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = set_session(NSession, Channel),
|
||||||
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
|
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
|
||||||
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
||||||
?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}),
|
?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}),
|
||||||
|
@ -414,9 +409,9 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se
|
||||||
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
||||||
case emqx_session:pubcomp(PacketId, Session) of
|
case emqx_session:pubcomp(PacketId, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, Channel#channel{session = NSession}};
|
{ok, set_session(NSession, Channel)};
|
||||||
{ok, Publishes, NSession} ->
|
{ok, Publishes, NSession} ->
|
||||||
handle_out(publish, Publishes, Channel#channel{session = NSession});
|
handle_out(publish, Publishes, set_session(NSession, Channel));
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
ok = emqx_metrics:inc('packets.pubcomp.inuse'),
|
ok = emqx_metrics:inc('packets.pubcomp.inuse'),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
@ -624,7 +619,8 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
||||||
case emqx_session:publish(PacketId, Msg, Session) of
|
case emqx_session:publish(PacketId, Msg, Session) of
|
||||||
{ok, PubRes, NSession} ->
|
{ok, PubRes, NSession} ->
|
||||||
RC = puback_reason_code(PubRes),
|
RC = puback_reason_code(PubRes),
|
||||||
NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession}),
|
NChannel0 = set_session(NSession, Channel),
|
||||||
|
NChannel1 = ensure_timer(await_timer, NChannel0),
|
||||||
NChannel2 = ensure_quota(PubRes, NChannel1),
|
NChannel2 = ensure_quota(PubRes, NChannel1),
|
||||||
handle_out(pubrec, {PacketId, RC}, NChannel2);
|
handle_out(pubrec, {PacketId, RC}, NChannel2);
|
||||||
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
|
@ -698,7 +694,7 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
||||||
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
|
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
|
||||||
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
|
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{QoS, Channel#channel{session = NSession}};
|
{QoS, set_session(NSession, Channel)};
|
||||||
{error, RC} ->
|
{error, RC} ->
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "cannot_subscribe_topic_filter",
|
msg => "cannot_subscribe_topic_filter",
|
||||||
|
@ -728,7 +724,7 @@ do_unsubscribe(TopicFilter, SubOpts, Channel =
|
||||||
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
||||||
case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of
|
case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{?RC_SUCCESS, Channel#channel{session = NSession}};
|
{?RC_SUCCESS, set_session(NSession, Channel)};
|
||||||
{error, RC} -> {RC, Channel}
|
{error, RC} -> {RC, Channel}
|
||||||
end.
|
end.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -765,8 +761,11 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
|
||||||
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
|
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
|
||||||
session = Session,
|
session = Session,
|
||||||
clientinfo = #{clientid := ClientId}}) ->
|
clientinfo = #{clientid := ClientId}}) ->
|
||||||
NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
|
Delivers1 = maybe_nack(Delivers),
|
||||||
{ok, Channel#channel{session = NSession}};
|
Delivers2 = ignore_local(Delivers1, ClientId, Session),
|
||||||
|
NSession = emqx_session:enqueue(Delivers2, Session),
|
||||||
|
NChannel = set_session(NSession, Channel),
|
||||||
|
{ok, NChannel};
|
||||||
|
|
||||||
handle_deliver(Delivers, Channel = #channel{takeover = true,
|
handle_deliver(Delivers, Channel = #channel{takeover = true,
|
||||||
pendings = Pendings,
|
pendings = Pendings,
|
||||||
|
@ -779,10 +778,10 @@ handle_deliver(Delivers, Channel = #channel{session = Session,
|
||||||
clientinfo = #{clientid := ClientId}}) ->
|
clientinfo = #{clientid := ClientId}}) ->
|
||||||
case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
|
case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
|
||||||
{ok, Publishes, NSession} ->
|
{ok, Publishes, NSession} ->
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = set_session(NSession, Channel),
|
||||||
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
|
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, Channel#channel{session = NSession}}
|
{ok, set_session(NSession, Channel)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ignore_local(Delivers, Subscriber, Session) ->
|
ignore_local(Delivers, Subscriber, Session) ->
|
||||||
|
@ -898,13 +897,13 @@ return_connack(AckPacket, Channel) ->
|
||||||
case maybe_resume_session(Channel) of
|
case maybe_resume_session(Channel) of
|
||||||
ignore -> {ok, Replies, Channel};
|
ignore -> {ok, Replies, Channel};
|
||||||
{ok, Publishes, NSession} ->
|
{ok, Publishes, NSession} ->
|
||||||
NChannel = Channel#channel{session = NSession,
|
NChannel0 = Channel#channel{resuming = false,
|
||||||
resuming = false,
|
|
||||||
pendings = []
|
pendings = []
|
||||||
},
|
},
|
||||||
{Packets, NChannel1} = do_deliver(Publishes, NChannel),
|
NChannel1 = set_session(NSession, NChannel0),
|
||||||
|
{Packets, NChannel2} = do_deliver(Publishes, NChannel1),
|
||||||
Outgoing = [{outgoing, Packets} || length(Packets) > 0],
|
Outgoing = [{outgoing, Packets} || length(Packets) > 0],
|
||||||
{ok, Replies ++ Outgoing, NChannel1}
|
{ok, Replies ++ Outgoing, NChannel2}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -1063,9 +1062,9 @@ handle_timeout(_TRef, retry_delivery,
|
||||||
Channel = #channel{session = Session}) ->
|
Channel = #channel{session = Session}) ->
|
||||||
case emqx_session:retry(Session) of
|
case emqx_session:retry(Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
{ok, clean_timer(retry_timer, set_session(NSession, Channel))};
|
||||||
{ok, Publishes, Timeout, NSession} ->
|
{ok, Publishes, Timeout, NSession} ->
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = set_session(NSession, Channel),
|
||||||
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
|
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -1076,9 +1075,9 @@ handle_timeout(_TRef, expire_awaiting_rel,
|
||||||
Channel = #channel{session = Session}) ->
|
Channel = #channel{session = Session}) ->
|
||||||
case emqx_session:expire(awaiting_rel, Session) of
|
case emqx_session:expire(awaiting_rel, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, clean_timer(await_timer, Channel#channel{session = NSession})};
|
{ok, clean_timer(await_timer, set_session(NSession, Channel))};
|
||||||
{ok, Timeout, NSession} ->
|
{ok, Timeout, NSession} ->
|
||||||
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})}
|
{ok, reset_timer(await_timer, Timeout, set_session(NSession, Channel))}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_timeout(_TRef, expire_session, Channel) ->
|
handle_timeout(_TRef, expire_session, Channel) ->
|
||||||
|
|
Loading…
Reference in New Issue