From 3201d1121266020a12a3508b5e0ef290b1b19548 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 14 Mar 2022 17:44:42 +0800 Subject: [PATCH 1/7] feat(mqttsn): introduce subs_resume option As the mqtt-sn v1.2 spec metioned, the gateway will be able to sync the subscriptions topic-name registry to client when the client resume it's session port from: https://github.com/emqx/emqx-sn/pull/195 --- apps/emqx_sn/etc/emqx_sn.conf | 7 ++ apps/emqx_sn/priv/emqx_sn.schema | 5 + apps/emqx_sn/src/emqx_sn_frame.erl | 4 +- apps/emqx_sn/src/emqx_sn_gateway.erl | 144 +++++++++++++++++++++++++-- 4 files changed, 151 insertions(+), 9 deletions(-) diff --git a/apps/emqx_sn/etc/emqx_sn.conf b/apps/emqx_sn/etc/emqx_sn.conf index 6572812c1..655ef4028 100644 --- a/apps/emqx_sn/etc/emqx_sn.conf +++ b/apps/emqx_sn/etc/emqx_sn.conf @@ -51,3 +51,10 @@ mqtt.sn.username = mqtt_sn_user ## ## Value: String mqtt.sn.password = abc + +## Whether to initiate all subscribed topic registration messages to the +## client after the Session has been taken over by a new channel. +## +## Value: Boolean +## Default: false +#mqtt.sn.subs_resume = false diff --git a/apps/emqx_sn/priv/emqx_sn.schema b/apps/emqx_sn/priv/emqx_sn.schema index a585c1037..bd0995c77 100644 --- a/apps/emqx_sn/priv/emqx_sn.schema +++ b/apps/emqx_sn/priv/emqx_sn.schema @@ -56,6 +56,11 @@ end}. {datatype, string} ]}. +{mapping, "mqtt.sn.subs_resume", "emqx_sn.subs_resume", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {translation, "emqx_sn.username", fun(Conf) -> Username = cuttlefish:conf_get("mqtt.sn.username", Conf), list_to_binary(Username) diff --git a/apps/emqx_sn/src/emqx_sn_frame.erl b/apps/emqx_sn/src/emqx_sn_frame.erl index 28a20956e..7ec18dd4a 100644 --- a/apps/emqx_sn/src/emqx_sn_frame.erl +++ b/apps/emqx_sn/src/emqx_sn_frame.erl @@ -339,9 +339,9 @@ format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) -> format(?SN_PINGREQ_MSG(ClientId)) -> io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]); format(?SN_PINGRESP_MSG()) -> - "SN_PINGREQ()"; + "SN_PINGRESP()"; format(?SN_DISCONNECT_MSG(Duration)) -> - io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]); + io_lib:format("SN_DISCONNECT(Duration=~w)", [Duration]); format(#mqtt_sn_message{type = Type, variable = Var}) -> io_lib:format("mqtt_sn_message(type=~s, Var=~w)", diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 265607229..d5c574f8c 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -48,6 +48,7 @@ , wait_for_will_topic/3 , wait_for_will_msg/3 , connected/3 + , registering/3 , asleep/3 , awake/3 ]). @@ -96,7 +97,9 @@ has_pending_pingresp = false :: boolean(), %% Store all qos0 messages for waiting REGACK %% Note: QoS1/QoS2 messages will kept inflight queue - pending_topic_ids = #{} :: pending_msgs() + pending_topic_ids = #{} :: pending_msgs(), + waiting_sync_topics = [], + previous_outgoings_and_state = undefined }). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]). @@ -126,6 +129,9 @@ Reason =:= asleep_timeout; Reason =:= keepalive_timeout). +-define(RETRY_TIMEOUT, 5000). +-define(MAX_RETRY_TIMES, 3). + %%-------------------------------------------------------------------- %% Exported APIs %%-------------------------------------------------------------------- @@ -379,6 +385,13 @@ connected(cast, {outgoing, Packet}, State) -> connected(cast, {connack, ConnAck}, State) -> {keep_state, handle_outgoing(ConnAck, State)}; +connected(cast, {register, TopicNames, BlockedOutgoins}, State) -> + NState = State#state{ + waiting_sync_topics = TopicNames, + previous_outgoings_and_state = {BlockedOutgoins, ?FUNCTION_NAME} + }, + {next_state, registering, NState, [next_event(shooting)]}; + connected(cast, {shutdown, Reason, Packet}, State) -> stop(Reason, handle_outgoing(Packet, State)); @@ -392,6 +405,80 @@ connected(cast, {close, Reason}, State) -> connected(EventType, EventContent, State) -> handle_event(EventType, EventContent, connected, State). +registering(cast, shooting, + State = #state{ + channel = Channel, + waiting_sync_topics = [], + previous_outgoings_and_state = {Outgoings, StateName}}) -> + Session = emqx_channel:get_session(Channel), + ClientInfo = emqx_channel:info(clientinfo, Channel), + {Outgoings2, NChannel} = + case emqx_session:dequeue(ClientInfo, Session) of + {ok, NSession} -> + {[], emqx_channel:set_session(NSession, Channel)}; + {ok, Pubs, NSession} -> + emqx_channel:do_deliver( + Pubs, + emqx_channel:set_session(NSession, Channel) + ) + end, + NState = State#state{ + channel = NChannel, + previous_outgoings_and_state = undefined}, + {next_state, StateName, NState, outgoing_events(Outgoings ++ Outgoings2)}; + +registering(cast, shooting, + State = #state{ + clientid = ClientId, + waiting_sync_topics = [TopicName | Remainings]}) -> + TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), + NState = send_register( + TopicName, + TopicId, + 16#FFFF, %% FIXME: msgid ? + State#state{waiting_sync_topics = [{TopicId, TopicName, 0} | Remainings]} + ), + {keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}}; + +registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED)}, + State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) -> + ?LOG(debug, "Register topic name ~s with id ~w successfully!", [TopicName, TopicId]), + {keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]}; + +registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, + State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) -> + ?LOG(error, "client does not accept register TopicName=~s, TopicId=~p, MsgId=~p, ReturnCode=~p", + [TopicName, TopicId, MsgId, ReturnCode]), + {keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]}; + +registering(cast, {incoming, Packet}, + State = #state{previous_outgoings_and_state = {_, StateName}}) + when is_record(Packet, mqtt_sn_message) -> + apply(?MODULE, StateName, [cast, {incoming, Packet}, State]); + +registering({timeout, wait_regack}, _, + State = #state{waiting_sync_topics = [{TopicId, TopicName, Times} | Remainings]}) + when Times < ?MAX_RETRY_TIMES -> + ?LOG(warning, "Waiting REGACK timeout for TopicName=~s, TopicId=~w, try it again(~w)", + [TopicName, TopicId, Times+1]), + NState = send_register( + TopicName, + TopicId, + 16#FFFF, %% FIXME: msgid? + State#state{waiting_sync_topics = [{TopicId, TopicName, Times + 1} | Remainings]} + ), + {keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}}; + +registering({timeout, wait_regack}, _, + State = #state{waiting_sync_topics = [{TopicId, TopicName, ?MAX_RETRY_TIMES} | _]}) -> + ?LOG(error, "Retry register TopicName=~s, TopicId=~w reached the max retry times", + [TopicId, TopicName]), + NState = send_message(?SN_DISCONNECT_MSG(undefined), State), + stop(reached_max_retry_times, NState); + +registering(EventType, EventContent, State) -> + handle_event(EventType, EventContent, ?FUNCTION_NAME, State). + asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) -> State0 = send_message(?SN_DISCONNECT_MSG(undefined), State), case Duration of @@ -519,10 +606,13 @@ handle_event(info, {datagram, SockPid, Data}, StateName, stop(frame_error, State) end; -handle_event(info, {deliver, _Topic, Msg}, asleep, - State = #state{channel = Channel}) -> +handle_event(info, {deliver, _Topic, Msg}, StateName, + State = #state{channel = Channel}) + when StateName == alseep; + StateName == registering -> % section 6.14, Support of sleeping clients - ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]), + ?LOG(debug, "enqueue downlink message in ~s state, msg: ~0p", + [StateName, Msg]), Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel), Msg, emqx_channel:get_session(Channel)), {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; @@ -643,6 +733,9 @@ outgoing_event(Packet) when is_record(Packet, mqtt_packet); outgoing_event(Action) -> next_event(Action). +next_event(Content) -> + {next_event, cast, Content}. + close_socket(State = #state{sockstate = closed}) -> State; close_socket(State = #state{socket = _Socket}) -> %ok = gen_udp:close(Socket), @@ -1058,6 +1151,38 @@ handle_incoming(#mqtt_packet{variable = #mqtt_packet_puback{}} = Packet, awake, Result = channel_handle_in(Packet, State), handle_return(Result, State, [try_goto_asleep]); +handle_incoming( + #mqtt_packet{ + variable = #mqtt_packet_connect{ + clean_start = false} + } = Packet, + _, + State) -> + Result = channel_handle_in(Packet, State), + case {subs_resume(), Result} of + {true, {ok, Replies, NChannel}} -> + case maps:get( + subscriptions, + emqx_channel:info(session, NChannel) + ) of + Subs when map_size(Subs) == 0 -> + handle_return(Result, State); + Subs -> + TopicNames = lists:filter( + fun(T) -> not emqx_topic:wildcard(T) + end, maps:keys(Subs)), + {ConnackEvents, Outgoings} = split_connack_replies( + Replies), + Events = outgoing_events( + ConnackEvents ++ + [{register, TopicNames, Outgoings}] + ), + {keep_state, State#state{channel = NChannel}, Events} + end; + _ -> + handle_return(Result, State) + end; + handle_incoming(Packet, _StName, State) -> Result = channel_handle_in(Packet, State), handle_return(Result, State). @@ -1167,9 +1292,6 @@ inc_outgoing_stats(Type) -> false -> ok end. -next_event(Content) -> - {next_event, cast, Content}. - inc_counter(Key, Inc) -> _ = emqx_pd:inc_counter(Key, Inc), ok. @@ -1183,3 +1305,11 @@ maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) -> State; maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) -> send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State). + +subs_resume() -> + application:get_env(emqx_sn, subs_resume, false). + +%% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}] +split_connack_replies([A = {event, connected}, + B = {connack, _ConnAck} | Outgoings]) -> + {[A, B], Outgoings}. From d4c1b3acc634fcd796149620a749c8d02c5cc99b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 14 Mar 2022 17:45:46 +0800 Subject: [PATCH 2/7] test(mqttsn): more tests for topic register and subs_resume --- apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl | 427 ++++++++++++++----- 1 file changed, 324 insertions(+), 103 deletions(-) diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 9ff519fb5..9fd731f28 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -862,108 +862,6 @@ t_delivery_qos1_register_invalid_topic_id(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). -t_delivery_takeover_and_re_register(_) -> - MsgId = 1, - {ok, Socket} = gen_udp:open(0, [binary]), - send_connect_msg(Socket, <<"test">>, 0), - ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), - - send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1), - <<_, ?SN_SUBACK, 2#00100000, - TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), - - send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), - <<_, ?SN_SUBACK, 2#01000000, - TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), - - _ = emqx:publish( - emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), - _ = emqx:publish( - emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), - send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), - send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), - - send_disconnect_msg(Socket, undefined), - ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), - gen_udp:close(Socket), - - %% offline messages will be queued into the MQTT-SN session - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), - - {ok, NSocket} = gen_udp:open(0, [binary]), - send_connect_msg(NSocket, <<"test">>, 0), - ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, - receive_response(NSocket)), - - %% qos1 - - %% received the resume messages - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket), - %% only one qos1/qos2 inflight - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID), - %% recv register - <<_, ?SN_REGISTER, - TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), - send_regack_msg(NSocket, TopicIdA, RegMsgIdA), - %% received the replay messages - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED), - - %% qos2 - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket), - %% only one qos1/qos2 inflight - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID), - %% recv register - <<_, ?SN_REGISTER, - TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), - send_regack_msg(NSocket, TopicIdB, RegMsgIdB), - %% received the replay messages - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket), - send_pubrec_msg(NSocket, MsgIdB1), - <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket), - send_pubcomp_msg(NSocket, MsgIdB1), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED), - - %% no more messages - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - - send_disconnect_msg(NSocket, undefined), - ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), - gen_udp:close(NSocket). - t_will_case01(_) -> QoS = 1, Duration = 1, @@ -1725,6 +1623,326 @@ t_broadcast_test1(_) -> timer:sleep(600), gen_udp:close(Socket). +t_register_subs_resume_on(_) -> + application:set_env(emqx_sn, subs_resume, true), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"test-b">>)), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + %% offline messages will be queued into the MQTT-SN session + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-a">>, <<"m3">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-b">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA), + + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdB, RegMsgIdB), + + %% receive the queued messages + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdA:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdA:16, MsgIdA2:16, "m3">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdA2), + <<_, ?SN_PUBREL, MsgIdA2:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdA2), + + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdB:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdB:16, MsgIdB1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB2:16, "m3">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdB2), + <<_, ?SN_PUBREL, MsgIdB2:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdB2), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + application:set_env(emqx_sn, subs_resume, false), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1). + +t_register_subs_resume_off(_) -> + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#00100000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), + _ = emqx:publish( + emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + %% offline messages will be queued into the MQTT-SN session + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% qos1 + + %% received the resume messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED), + + %% qos2 + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdB, RegMsgIdB), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdB1), + <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdB1), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1). + +t_register_skip_failure_topic_name_and_reach_max_retry_times(_) -> + application:set_env(emqx_sn, subs_resume, true), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + + %% registered failured topic-name will be skipped + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_INVALID_TOPIC_ID), + + %% the gateway try to shutdown this client if it reached max-retry-times + %% + %% times-0 + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% times-1 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% times-2 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% just a ping + send_pingreq_msg(NSocket, <<"test">>), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(NSocket)), + %% times-3 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% shutdown due to reached max retry times + timer:sleep(5000), %% RETYRY_TIMEOUT + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), + application:set_env(emqx_sn, subs_resume, false), + gen_udp:close(NSocket). + +t_register_enqueue_delivering_messages(_) -> + application:set_env(emqx_sn, subs_resume, true), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + emqx_logger:set_log_level(debug), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + + %% registered failured topic-name will be skipped + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + + send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_ACCEPTED), + + %% receive the queued messages + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdA:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + application:set_env(emqx_sn, subs_resume, false), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1). + %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- @@ -1816,9 +2034,12 @@ send_register_msg(Socket, TopicName, MsgId) -> ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket). send_regack_msg(Socket, TopicId, MsgId) -> + send_regack_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED). + +send_regack_msg(Socket, TopicId, MsgId, Rc) -> Length = 7, MsgType = ?SN_REGACK, - Packet = <>, + Packet = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet). send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) -> From fcf1178f3b0e80c34fee70e707eba55ca26937f7 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 14 Mar 2022 17:48:17 +0800 Subject: [PATCH 3/7] chore: update CHANGES-4.3.md --- CHANGES-4.3.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 04447e220..a8f0ac8ba 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -27,6 +27,7 @@ File format: * Add UTF-8 string validity check in `strict_mode` for MQTT packet. When set to true, invalid UTF-8 strings will cause the client to be disconnected. i.e. client ID, topic name. [#7261] * Changed systemd service restart delay from 10 seconds to 60 seconds. +* MQTT-SN gateway supports initiative to synchronize registered topics after session resumed. [#7300] ### Bug fixes From f8b7b9415d16d13ba594355e24f59eff7e8942c1 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 14 Mar 2022 18:36:23 +0800 Subject: [PATCH 4/7] chore(mqttsn): update appup.src --- apps/emqx_sn/src/emqx_sn.appup.src | 34 +++++++++++++++++++++------- apps/emqx_sn/src/emqx_sn_gateway.erl | 25 ++++++++++++++++++-- 2 files changed, 49 insertions(+), 10 deletions(-) diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 6a4eb66d1..7b0bc91c3 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -1,26 +1,44 @@ %% -*- mode: erlang -*- {VSN, [ - {<<"4\\.3\\.[4-5]">>,[ + {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {update,emqx_sn_gateway,{advance,["4.3.5"]}} ]}, - {<<"4.3.[2-3]">>,[ + {"4.3.4",[ + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advance,["4.3.4"]}} + ]}, + {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {update,emqx_sn_gateway,{advance, ["4.3.3"]}} + ]}, + {"4.3.2",[ + {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advance, ["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ], [ - {<<"4\\.3\\.[4-5]">>,[ + {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {update,emqx_sn_gateway,{advance,["4.3.5"]}} ]}, - {<<"4.3.[2-3]">>,[ + {"4.3.4",[ + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advance,["4.3.4"]}} + ]}, + {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {update,emqx_sn_gateway,{advance, ["4.3.3"]}} + ]}, + {"4.3.2",[ + {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advance, ["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ]}. diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index d5c574f8c..3e8c18dc9 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -683,8 +683,29 @@ terminate(Reason, _StateName, #state{channel = Channel}) -> emqx_channel:terminate(Reason, Channel), ok. -code_change(_Vsn, StateName, State, _Extra) -> - {ok, StateName, State}. +%% in the emqx_sn:v4.3.6, we have added two new fields in the state last: +%% - waiting_sync_topics +%% - previous_outgoings_and_state +code_change({down, _Vsn}, StateName, State, [ToVsn]) -> + case re:run(ToVsn, "4\\.3\\.[2-5]") of + {match, _} -> + NState0 = lists:droplast(lists:droplast(tuple_to_list(State))), + NState = list_to_tuple(lists:reverse(NState0)), + {ok, StateName, NState}; + _ -> + {ok, StateName, State} + end; + +code_change(_Vsn, StateName, State, [FromVsn]) -> + case re:run(FromVsn, "4\\.3\\.[2-5]") of + {match, _} -> + NState = list_to_tuple( + tuple_to_list(State) ++ [[], undefined] + ), + {ok, StateName, NState}; + _ -> + {ok, StateName, State} + end. %%-------------------------------------------------------------------- %% Handle Call/Info From 3823ab8693420892f0b32be78cc29d007a70d5d6 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 14 Mar 2022 19:10:05 +0800 Subject: [PATCH 5/7] fix: typos in emqx_sn.appup.src --- apps/emqx_sn/src/emqx_sn.appup.src | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 7b0bc91c3..7a8679db1 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -3,42 +3,42 @@ [ {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance,["4.3.5"]}} + {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, {"4.3.4",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance,["4.3.4"]}} + {update,emqx_sn_gateway,{advanced,["4.3.4"]}} ]}, {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance, ["4.3.3"]}} + {update,emqx_sn_gateway,{advanced,["4.3.3"]}} ]}, {"4.3.2",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance, ["4.3.2"]}} + {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ], [ {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance,["4.3.5"]}} + {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, {"4.3.4",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance,["4.3.4"]}} + {update,emqx_sn_gateway,{advanced,["4.3.4"]}} ]}, {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance, ["4.3.3"]}} + {update,emqx_sn_gateway,{advanced,["4.3.3"]}} ]}, {"4.3.2",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance, ["4.3.2"]}} + {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ]}. From af65310ce77f680e6573c0379bdc63a2ff42b5a7 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 15 Mar 2022 10:40:37 +0800 Subject: [PATCH 6/7] chore(script): update_appup.escript support the update cmd We often use the advanced directive `update` when hot upgrading gen_server, gen_statem, and other such processes, and it will be parsed as: ``` {suspend,[Mod]}, {load,{Mod,brutal_purge,brutal_purge}}, {code_change,up,[{Mod,[Extra]}]}, {resume,[Mod]}, ``` So, we should treat the update instruction as having completed the upgrade of this module. --- scripts/update_appup.escript | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript index 9083b62f1..840f63509 100755 --- a/scripts/update_appup.escript +++ b/scripts/update_appup.escript @@ -324,6 +324,8 @@ process_old_action({add_module, Module}) -> [Module]; process_old_action({delete_module, Module}) -> [Module]; +process_old_action({update, Module, _Change}) -> + [Module]; process_old_action(LoadModule) when is_tuple(LoadModule) andalso element(1, LoadModule) =:= load_module -> element(2, LoadModule); From a3d8981635b0179b421fe968ff530b93c30a5c1a Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 15 Mar 2022 11:28:40 +0800 Subject: [PATCH 7/7] refactor(mqttsn): assign subs_resume to mqtt-sn client process state --- apps/emqx_sn/src/emqx_sn.appup.src | 8 ++++++ apps/emqx_sn/src/emqx_sn_app.erl | 2 ++ apps/emqx_sn/src/emqx_sn_gateway.erl | 20 ++++++++------- apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl | 26 +++++++++++--------- 4 files changed, 36 insertions(+), 20 deletions(-) diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 7a8679db1..269605afa 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -3,20 +3,24 @@ [ {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, {"4.3.4",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.4"]}} ]}, {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.3"]}} ]}, {"4.3.2",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} @@ -24,20 +28,24 @@ [ {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, {"4.3.4",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.4"]}} ]}, {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.3"]}} ]}, {"4.3.2",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} diff --git a/apps/emqx_sn/src/emqx_sn_app.erl b/apps/emqx_sn/src/emqx_sn_app.erl index 9575523f8..e7c86d5b7 100644 --- a/apps/emqx_sn/src/emqx_sn_app.erl +++ b/apps/emqx_sn/src/emqx_sn_app.erl @@ -122,12 +122,14 @@ listeners_confs() -> EnableQos3 = application:get_env(emqx_sn, enable_qos3, false), EnableStats = application:get_env(emqx_sn, enable_stats, false), IdleTimeout = application:get_env(emqx_sn, idle_timeout, 30000), + SubsResume = application:get_env(emqx_sn, subs_resume, false), [{udp, ListenOn, [{gateway_id, GwId}, {username, Username}, {password, Password}, {enable_qos3, EnableQos3}, {enable_stats, EnableStats}, {idle_timeout, IdleTimeout}, + {subs_resume, SubsResume}, {max_connections, 1024000}, {max_conn_rate, 1000}, {udp_options, []}]}]. diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 3e8c18dc9..80a1339c0 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -98,6 +98,7 @@ %% Store all qos0 messages for waiting REGACK %% Note: QoS1/QoS2 messages will kept inflight queue pending_topic_ids = #{} :: pending_msgs(), + subs_resume = false, waiting_sync_topics = [], previous_outgoings_and_state = undefined }). @@ -158,6 +159,7 @@ init([{_, SockPid, Sock}, Peername, Options]) -> Password = proplists:get_value(password, Options, undefined), EnableQos3 = proplists:get_value(enable_qos3, Options, false), IdleTimeout = proplists:get_value(idle_timeout, Options, 30000), + SubsResume = proplists:get_value(subs_resume, Options, false), EnableStats = proplists:get_value(enable_stats, Options, false), case inet:sockname(Sock) of {ok, Sockname} -> @@ -174,7 +176,8 @@ init([{_, SockPid, Sock}, Peername, Options]) -> asleep_timer = emqx_sn_asleep_timer:init(), enable_stats = EnableStats, enable_qos3 = EnableQos3, - idle_timeout = IdleTimeout + idle_timeout = IdleTimeout, + subs_resume = SubsResume }, emqx_logger:set_metadata_peername(esockd:format(Peername)), {ok, idle, State, [IdleTimeout]}; @@ -689,8 +692,10 @@ terminate(Reason, _StateName, #state{channel = Channel}) -> code_change({down, _Vsn}, StateName, State, [ToVsn]) -> case re:run(ToVsn, "4\\.3\\.[2-5]") of {match, _} -> - NState0 = lists:droplast(lists:droplast(tuple_to_list(State))), - NState = list_to_tuple(lists:reverse(NState0)), + NState0 = lists:droplast( + lists:droplast( + lists:droplast(tuple_to_list(State)))), + NState = list_to_tuple(NState0), {ok, StateName, NState}; _ -> {ok, StateName, State} @@ -700,7 +705,7 @@ code_change(_Vsn, StateName, State, [FromVsn]) -> case re:run(FromVsn, "4\\.3\\.[2-5]") of {match, _} -> NState = list_to_tuple( - tuple_to_list(State) ++ [[], undefined] + tuple_to_list(State) ++ [false, [], undefined] ), {ok, StateName, NState}; _ -> @@ -1178,9 +1183,9 @@ handle_incoming( clean_start = false} } = Packet, _, - State) -> + State = #state{subs_resume = SubsResume}) -> Result = channel_handle_in(Packet, State), - case {subs_resume(), Result} of + case {SubsResume, Result} of {true, {ok, Replies, NChannel}} -> case maps:get( subscriptions, @@ -1327,9 +1332,6 @@ maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) -> maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) -> send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State). -subs_resume() -> - application:get_env(emqx_sn, subs_resume, false). - %% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}] split_connack_replies([A = {event, connected}, B = {connack, _ConnAck} | Outgoings]) -> diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 9fd731f28..cdecc06bb 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -79,6 +79,12 @@ set_special_confs(emqx_sn) -> set_special_confs(_App) -> ok. +restart_emqx_sn(#{subs_resume := Bool}) -> + application:set_env(emqx_sn, subs_resume, Bool), + _ = application:stop(emqx_sn), + _ = application:ensure_all_started(emqx_sn), + ok. + %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- @@ -1624,7 +1630,7 @@ t_broadcast_test1(_) -> gen_udp:close(Socket). t_register_subs_resume_on(_) -> - application:set_env(emqx_sn, subs_resume, true), + restart_emqx_sn(#{subs_resume => true}), MsgId = 1, {ok, Socket} = gen_udp:open(0, [binary]), send_connect_msg(Socket, <<"test">>, 0), @@ -1710,8 +1716,6 @@ t_register_subs_resume_on(_) -> %% no more messages ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - application:set_env(emqx_sn, subs_resume, false), - gen_udp:close(NSocket), {ok, NSocket1} = gen_udp:open(0, [binary]), send_connect_msg(NSocket1, <<"test">>), @@ -1719,7 +1723,8 @@ t_register_subs_resume_on(_) -> receive_response(NSocket1)), send_disconnect_msg(NSocket1, undefined), ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), - gen_udp:close(NSocket1). + gen_udp:close(NSocket1), + restart_emqx_sn(#{subs_resume => false}). t_register_subs_resume_off(_) -> MsgId = 1, @@ -1829,7 +1834,7 @@ t_register_subs_resume_off(_) -> gen_udp:close(NSocket1). t_register_skip_failure_topic_name_and_reach_max_retry_times(_) -> - application:set_env(emqx_sn, subs_resume, true), + restart_emqx_sn(#{subs_resume => true}), MsgId = 1, {ok, Socket} = gen_udp:open(0, [binary]), send_connect_msg(Socket, <<"test">>, 0), @@ -1883,11 +1888,11 @@ t_register_skip_failure_topic_name_and_reach_max_retry_times(_) -> %% shutdown due to reached max retry times timer:sleep(5000), %% RETYRY_TIMEOUT ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), - application:set_env(emqx_sn, subs_resume, false), - gen_udp:close(NSocket). + gen_udp:close(NSocket), + restart_emqx_sn(#{subs_resume => false}). t_register_enqueue_delivering_messages(_) -> - application:set_env(emqx_sn, subs_resume, true), + restart_emqx_sn(#{subs_resume => true}), MsgId = 1, {ok, Socket} = gen_udp:open(0, [binary]), send_connect_msg(Socket, <<"test">>, 0), @@ -1932,8 +1937,6 @@ t_register_enqueue_delivering_messages(_) -> %% no more messages ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - application:set_env(emqx_sn, subs_resume, false), - gen_udp:close(NSocket), {ok, NSocket1} = gen_udp:open(0, [binary]), send_connect_msg(NSocket1, <<"test">>), @@ -1941,7 +1944,8 @@ t_register_enqueue_delivering_messages(_) -> receive_response(NSocket1)), send_disconnect_msg(NSocket1, undefined), ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), - gen_udp:close(NSocket1). + gen_udp:close(NSocket1), + restart_emqx_sn(#{subs_resume => false}). %%-------------------------------------------------------------------- %% Helper funcs