diff --git a/apps/emqx_sn/etc/emqx_sn.conf b/apps/emqx_sn/etc/emqx_sn.conf index 6572812c1..655ef4028 100644 --- a/apps/emqx_sn/etc/emqx_sn.conf +++ b/apps/emqx_sn/etc/emqx_sn.conf @@ -51,3 +51,10 @@ mqtt.sn.username = mqtt_sn_user ## ## Value: String mqtt.sn.password = abc + +## Whether to initiate all subscribed topic registration messages to the +## client after the Session has been taken over by a new channel. +## +## Value: Boolean +## Default: false +#mqtt.sn.subs_resume = false diff --git a/apps/emqx_sn/priv/emqx_sn.schema b/apps/emqx_sn/priv/emqx_sn.schema index a585c1037..bd0995c77 100644 --- a/apps/emqx_sn/priv/emqx_sn.schema +++ b/apps/emqx_sn/priv/emqx_sn.schema @@ -56,6 +56,11 @@ end}. {datatype, string} ]}. +{mapping, "mqtt.sn.subs_resume", "emqx_sn.subs_resume", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {translation, "emqx_sn.username", fun(Conf) -> Username = cuttlefish:conf_get("mqtt.sn.username", Conf), list_to_binary(Username) diff --git a/apps/emqx_sn/src/emqx_sn_frame.erl b/apps/emqx_sn/src/emqx_sn_frame.erl index 28a20956e..7ec18dd4a 100644 --- a/apps/emqx_sn/src/emqx_sn_frame.erl +++ b/apps/emqx_sn/src/emqx_sn_frame.erl @@ -339,9 +339,9 @@ format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) -> format(?SN_PINGREQ_MSG(ClientId)) -> io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]); format(?SN_PINGRESP_MSG()) -> - "SN_PINGREQ()"; + "SN_PINGRESP()"; format(?SN_DISCONNECT_MSG(Duration)) -> - io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]); + io_lib:format("SN_DISCONNECT(Duration=~w)", [Duration]); format(#mqtt_sn_message{type = Type, variable = Var}) -> io_lib:format("mqtt_sn_message(type=~s, Var=~w)", diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 265607229..d5c574f8c 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -48,6 +48,7 @@ , wait_for_will_topic/3 , wait_for_will_msg/3 , connected/3 + , registering/3 , asleep/3 , awake/3 ]). @@ -96,7 +97,9 @@ has_pending_pingresp = false :: boolean(), %% Store all qos0 messages for waiting REGACK %% Note: QoS1/QoS2 messages will kept inflight queue - pending_topic_ids = #{} :: pending_msgs() + pending_topic_ids = #{} :: pending_msgs(), + waiting_sync_topics = [], + previous_outgoings_and_state = undefined }). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]). @@ -126,6 +129,9 @@ Reason =:= asleep_timeout; Reason =:= keepalive_timeout). +-define(RETRY_TIMEOUT, 5000). +-define(MAX_RETRY_TIMES, 3). + %%-------------------------------------------------------------------- %% Exported APIs %%-------------------------------------------------------------------- @@ -379,6 +385,13 @@ connected(cast, {outgoing, Packet}, State) -> connected(cast, {connack, ConnAck}, State) -> {keep_state, handle_outgoing(ConnAck, State)}; +connected(cast, {register, TopicNames, BlockedOutgoins}, State) -> + NState = State#state{ + waiting_sync_topics = TopicNames, + previous_outgoings_and_state = {BlockedOutgoins, ?FUNCTION_NAME} + }, + {next_state, registering, NState, [next_event(shooting)]}; + connected(cast, {shutdown, Reason, Packet}, State) -> stop(Reason, handle_outgoing(Packet, State)); @@ -392,6 +405,80 @@ connected(cast, {close, Reason}, State) -> connected(EventType, EventContent, State) -> handle_event(EventType, EventContent, connected, State). +registering(cast, shooting, + State = #state{ + channel = Channel, + waiting_sync_topics = [], + previous_outgoings_and_state = {Outgoings, StateName}}) -> + Session = emqx_channel:get_session(Channel), + ClientInfo = emqx_channel:info(clientinfo, Channel), + {Outgoings2, NChannel} = + case emqx_session:dequeue(ClientInfo, Session) of + {ok, NSession} -> + {[], emqx_channel:set_session(NSession, Channel)}; + {ok, Pubs, NSession} -> + emqx_channel:do_deliver( + Pubs, + emqx_channel:set_session(NSession, Channel) + ) + end, + NState = State#state{ + channel = NChannel, + previous_outgoings_and_state = undefined}, + {next_state, StateName, NState, outgoing_events(Outgoings ++ Outgoings2)}; + +registering(cast, shooting, + State = #state{ + clientid = ClientId, + waiting_sync_topics = [TopicName | Remainings]}) -> + TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), + NState = send_register( + TopicName, + TopicId, + 16#FFFF, %% FIXME: msgid ? + State#state{waiting_sync_topics = [{TopicId, TopicName, 0} | Remainings]} + ), + {keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}}; + +registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED)}, + State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) -> + ?LOG(debug, "Register topic name ~s with id ~w successfully!", [TopicName, TopicId]), + {keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]}; + +registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, + State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) -> + ?LOG(error, "client does not accept register TopicName=~s, TopicId=~p, MsgId=~p, ReturnCode=~p", + [TopicName, TopicId, MsgId, ReturnCode]), + {keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]}; + +registering(cast, {incoming, Packet}, + State = #state{previous_outgoings_and_state = {_, StateName}}) + when is_record(Packet, mqtt_sn_message) -> + apply(?MODULE, StateName, [cast, {incoming, Packet}, State]); + +registering({timeout, wait_regack}, _, + State = #state{waiting_sync_topics = [{TopicId, TopicName, Times} | Remainings]}) + when Times < ?MAX_RETRY_TIMES -> + ?LOG(warning, "Waiting REGACK timeout for TopicName=~s, TopicId=~w, try it again(~w)", + [TopicName, TopicId, Times+1]), + NState = send_register( + TopicName, + TopicId, + 16#FFFF, %% FIXME: msgid? + State#state{waiting_sync_topics = [{TopicId, TopicName, Times + 1} | Remainings]} + ), + {keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}}; + +registering({timeout, wait_regack}, _, + State = #state{waiting_sync_topics = [{TopicId, TopicName, ?MAX_RETRY_TIMES} | _]}) -> + ?LOG(error, "Retry register TopicName=~s, TopicId=~w reached the max retry times", + [TopicId, TopicName]), + NState = send_message(?SN_DISCONNECT_MSG(undefined), State), + stop(reached_max_retry_times, NState); + +registering(EventType, EventContent, State) -> + handle_event(EventType, EventContent, ?FUNCTION_NAME, State). + asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) -> State0 = send_message(?SN_DISCONNECT_MSG(undefined), State), case Duration of @@ -519,10 +606,13 @@ handle_event(info, {datagram, SockPid, Data}, StateName, stop(frame_error, State) end; -handle_event(info, {deliver, _Topic, Msg}, asleep, - State = #state{channel = Channel}) -> +handle_event(info, {deliver, _Topic, Msg}, StateName, + State = #state{channel = Channel}) + when StateName == alseep; + StateName == registering -> % section 6.14, Support of sleeping clients - ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]), + ?LOG(debug, "enqueue downlink message in ~s state, msg: ~0p", + [StateName, Msg]), Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel), Msg, emqx_channel:get_session(Channel)), {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; @@ -643,6 +733,9 @@ outgoing_event(Packet) when is_record(Packet, mqtt_packet); outgoing_event(Action) -> next_event(Action). +next_event(Content) -> + {next_event, cast, Content}. + close_socket(State = #state{sockstate = closed}) -> State; close_socket(State = #state{socket = _Socket}) -> %ok = gen_udp:close(Socket), @@ -1058,6 +1151,38 @@ handle_incoming(#mqtt_packet{variable = #mqtt_packet_puback{}} = Packet, awake, Result = channel_handle_in(Packet, State), handle_return(Result, State, [try_goto_asleep]); +handle_incoming( + #mqtt_packet{ + variable = #mqtt_packet_connect{ + clean_start = false} + } = Packet, + _, + State) -> + Result = channel_handle_in(Packet, State), + case {subs_resume(), Result} of + {true, {ok, Replies, NChannel}} -> + case maps:get( + subscriptions, + emqx_channel:info(session, NChannel) + ) of + Subs when map_size(Subs) == 0 -> + handle_return(Result, State); + Subs -> + TopicNames = lists:filter( + fun(T) -> not emqx_topic:wildcard(T) + end, maps:keys(Subs)), + {ConnackEvents, Outgoings} = split_connack_replies( + Replies), + Events = outgoing_events( + ConnackEvents ++ + [{register, TopicNames, Outgoings}] + ), + {keep_state, State#state{channel = NChannel}, Events} + end; + _ -> + handle_return(Result, State) + end; + handle_incoming(Packet, _StName, State) -> Result = channel_handle_in(Packet, State), handle_return(Result, State). @@ -1167,9 +1292,6 @@ inc_outgoing_stats(Type) -> false -> ok end. -next_event(Content) -> - {next_event, cast, Content}. - inc_counter(Key, Inc) -> _ = emqx_pd:inc_counter(Key, Inc), ok. @@ -1183,3 +1305,11 @@ maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) -> State; maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) -> send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State). + +subs_resume() -> + application:get_env(emqx_sn, subs_resume, false). + +%% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}] +split_connack_replies([A = {event, connected}, + B = {connack, _ConnAck} | Outgoings]) -> + {[A, B], Outgoings}.