diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index f01225431..846f40b33 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -71,6 +71,8 @@ register_inflight :: maybe(term()), %% Topics list for awaiting to register to client register_awaiting_queue :: list(), + %% Duration for asleep + asleep_timer_duration :: integer() | undefined, %% Timer timers :: #{atom() => disable | undefined | reference()}, %%% Takeover @@ -81,16 +83,17 @@ pendings :: list() }). --type(channel() :: #channel{}). +-type channel() :: #channel{}. --type(conn_state() :: idle | connecting | connected | asleep | disconnected). +-type conn_state() :: idle | connecting | connected | asleep | awake + | disconnected. --type(reply() :: {outgoing, mqtt_sn_message()} +-type reply() :: {outgoing, mqtt_sn_message()} | {outgoing, [mqtt_sn_message()]} | {event, conn_state()|updated} - | {close, Reason :: atom()}). + | {close, Reason :: atom()}. --type(replies() :: reply() | [reply()]). +-type replies() :: reply() | [reply()]. -define(TIMER_TABLE, #{ alive_timer => keepalive, @@ -471,8 +474,25 @@ handle_in(?SN_WILLMSG_MSG(Payload), handle_out(connack, ReasonCode, Channel) end; +%% TODO: takeover ??? +handle_in(Packet = ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, ClientId), + Channel = #channel{ + clientinfo = #{clientid := ClientId}, + conn_state = ConnState}) + when ConnState == asleep; + ConnState == awake -> + %% From the asleep or awake state a client can return either to the + %% active state by sending a CONNECT message [6.14] + ?SLOG(info, #{ msg => "goto_connected_state" + , previous_state => ConnState + , clientid => ClientId + }), + handle_out(connack, ?SN_RC_ACCEPTED, + Channel#channel{conn_state = connected}); + +%% new connection handle_in(Packet = ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId), - Channel) -> + Channel = #channel{conn_state = idle}) -> case emqx_misc:pipeline( [ fun enrich_conninfo/2 , fun run_conn_hooks/2 @@ -589,7 +609,10 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), case emqx_session:puback(ClientInfo, MsgId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), - {ok, Channel#channel{session = NSession}}; + {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent( + Channel#channel{session = NSession} + ), + {ok, Replies, NChannel}; {ok, Msg, Publishes, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), handle_out(publish, @@ -672,7 +695,10 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId), Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) -> case emqx_session:pubcomp(ClientInfo, MsgId, Session) of {ok, NSession} -> - {ok, Channel#channel{session = NSession}}; + {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent( + Channel#channel{session = NSession} + ), + {ok, Replies, NChannel}; {ok, Publishes, NSession} -> handle_out(publish, Publishes, Channel#channel{session = NSession}); @@ -732,32 +758,47 @@ handle_in(UnsubPkt = ?SN_UNSUBSCRIBE_MSG(_, MsgId, TopicIdOrName), {ok, {outgoing, UnsubAck}, NChannel} end; -handle_in(?SN_PINGREQ_MSG(_ClientId), - Channel = #channel{conn_state = asleep}) -> - {ok, Outgoing, NChannel} = awake(Channel), - NOutgoings = Outgoing ++ [{outgoing, ?SN_PINGRESP_MSG()}], - {ok, NOutgoings, NChannel}; - -handle_in(?SN_PINGREQ_MSG(_ClientId), Channel) -> +handle_in(?SN_PINGREQ_MSG(ClientId), Channel) + when ClientId == undefined; + ClientId == <<>> -> {ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel}; -handle_in(?SN_PINGRESP_MSG(), Channel) -> +handle_in(?SN_PINGREQ_MSG(ReqClientId), + Channel = #channel{clientinfo = #{clientid := ClientId}}) + when ReqClientId =/= ClientId -> + ?SLOG(warning, #{ msg => "awake_pingreq_clientid_not_match" + , clientid => ClientId + , request_clientid => ReqClientId + }), + %% FIXME: takeover_and_awake.. {ok, Channel}; -handle_in(?SN_DISCONNECT_MSG(Duration), Channel) -> - case Duration of - undefined -> - handle_out(disconnect, normal, Channel); - _ -> - %% A DISCONNECT message with a Duration field is sent by a client - %% when it wants to go to the “asleep” state. The receipt of this - %% message is also acknowledged by the gateway by means of a - %% DISCONNECT message (without a duration field) [5.4.21] - %% - %% TODO: asleep mechanism - AckPkt = ?SN_DISCONNECT_MSG(undefined), - {ok, {outgoing, AckPkt}, asleep(Duration, Channel)} - end; +handle_in(?SN_PINGREQ_MSG(ClientId), + Channel = #channel{conn_state = ConnState}) + when ConnState == idle; ConnState == asleep; ConnState == awake -> + awake(ClientId, Channel); + +handle_in(?SN_PINGREQ_MSG(ClientId), + Channel = #channel{conn_state = ConnState}) -> + ?SLOG(error, #{ msg => "awake_pingreq_in_bad_conn_state" + , conn_state => ConnState + , clientid => ClientId + }), + handle_out(disconnect, protocol_error, Channel); + +handle_in(?SN_DISCONNECT_MSG(_Duration = undefined), Channel) -> + handle_out(disconnect, normal, Channel); + +handle_in(?SN_DISCONNECT_MSG(Duration), + Channel = #channel{conn_state = ConnState}) + when ConnState == connected; ConnState == asleep -> + %% A DISCONNECT message with a Duration field is sent by a client + %% when it wants to go to the “asleep” state. The receipt of this + %% message is also acknowledged by the gateway by means of a + %% DISCONNECT message (without a duration field) [5.4.21] + %% + AckPkt = ?SN_DISCONNECT_MSG(undefined), + {ok, [{outgoing, AckPkt}, {event, asleep}], asleep(Duration, Channel)}; handle_in(?SN_WILLTOPICUPD_MSG(Flags, Topic), Channel = #channel{will_msg = WillMsg, @@ -1100,7 +1141,24 @@ do_unsubscribe(TopicFilters, %%-------------------------------------------------------------------- %% Awake & Asleep -awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) -> +awake(ClientId, Channel = #channel{conn_state = idle}) -> + ?SLOG(warning, #{ msg => "awake_pingreq_in_idle_state" + , clientid => ClientId + }), + %% TODO: takeover and awake? + %% 1. Query emqx_cm_registry to get the session state? + %% 2. Takeover it and goto awake state + {ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel}; + +awake(ClientId, Channel = #channel{ + conn_state = ConnState, + session = Session, + clientinfo = ClientInfo = #{clientid := ClientId}}) + when ConnState == asleep; ConnState == awake -> + ?SLOG(info, #{ msg => "goto_awake_state" + , clientid => ClientId + , previous_state => ConnState + }), {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), {NPublishes, NSession} = case emqx_session:deliver(ClientInfo, [], Session1) of {ok, Session2} -> @@ -1108,24 +1166,57 @@ awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) -> {ok, More, Session2} -> {lists:append(Publishes, More), Session2} end, - {Replies, NChannel} = outgoing_deliver_and_register( - do_deliver(NPublishes, - Channel#channel{session = NSession}) - ), - {ok, Replies, NChannel}. + Channel1 = cancel_timer(asleep_timer, Channel), + {Replies0, NChannel0} = outgoing_deliver_and_register( + do_deliver( + NPublishes, + Channel1#channel{ + conn_state = awake, session = NSession} + ) + ), + Replies1 = [{event, awake} | Replies0], + + {Replies2, NChannel} = goto_asleep_if_buffered_msgs_sent(NChannel0), + {ok, Replies1 ++ Replies2, NChannel}. + +goto_asleep_if_buffered_msgs_sent( + Channel = #channel{ + conn_state = awake, + session = Session, + asleep_timer_duration = Duration}) -> + case emqx_mqueue:is_empty(emqx_session:info(mqueue, Session)) andalso + emqx_inflight:is_empty(emqx_session:info(inflight, Session)) of + true -> + ?SLOG(info, #{ msg => "goto_asleep_state" + , reason => buffered_messages_sent + , duration => Duration + }), + Replies = [ {outgoing, ?SN_PINGRESP_MSG()} + , {event, asleep} + ], + {Replies, ensure_asleep_timer(Channel#channel{conn_state = asleep})}; + false -> + {[], Channel} + end; +goto_asleep_if_buffered_msgs_sent(Channel) -> + {[], Channel}. asleep(Duration, Channel = #channel{conn_state = asleep}) -> %% 6.14: The client can also modify its sleep duration %% by sending a DISCONNECT message with a new value of %% the sleep duration - ensure_timer(asleep_timer, Duration, - cancel_timer(asleep_timer, Channel) - ); + %% + %% XXX: Do we need to limit the maximum of Duration? + ?SLOG(debug, #{ msg => "update_asleep_timer" + , new_duration => Duration + }), + ensure_asleep_timer(Duration, cancel_timer(asleep_timer, Channel)); asleep(Duration, Channel = #channel{conn_state = connected}) -> - ensure_timer(asleep_timer, Duration, - Channel#channel{conn_state = asleep} - ). + ?SLOG(info, #{ msg => "goto_asleep_state" + , duration => Duration + }), + ensure_asleep_timer(Duration, Channel#channel{conn_state = asleep}). %%-------------------------------------------------------------------- %% Handle outgoing packet @@ -1154,10 +1245,11 @@ handle_out(connack, ReasonCode, shutdown(Reason, AckPacket, Channel); handle_out(publish, Publishes, Channel) -> - {Replies, NChannel} = outgoing_deliver_and_register( - do_deliver(Publishes, Channel) - ), - {ok, Replies, NChannel}; + {Replies1, NChannel} = outgoing_deliver_and_register( + do_deliver(Publishes, Channel) + ), + {Replies2, NChannel2} = goto_asleep_if_buffered_msgs_sent(NChannel), + {ok, Replies1 ++ Replies2, NChannel2}; handle_out(puback, {TopicId, MsgId, Rc}, Channel) -> {ok, {outgoing, ?SN_PUBACK_MSG(TopicId, MsgId, Rc)}, Channel}; @@ -1688,6 +1780,14 @@ update_will_msg(Will, Payload) -> %%-------------------------------------------------------------------- %% Timer +ensure_asleep_timer(Channel = #channel{asleep_timer_duration = Duration}) + when is_integer(Duration) -> + ensure_asleep_timer(Duration, Channel). + +ensure_asleep_timer(Durtion, Channel) -> + ensure_timer(asleep_timer, timer:seconds(Durtion), + Channel#channel{asleep_timer_duration = Durtion}). + cancel_timer(Name, Channel = #channel{timers = Timers}) -> case maps:get(Name, Timers, undefined) of undefined ->