diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 526188c0c..4440aaba8 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -142,6 +142,13 @@ which do not support any other features except this one.
There is no connection setup nor tear down, no registration nor subscription.
The client just sends its 'PUBLISH' messages to a GW" })} + , {subs_resume, + sc(boolean(), + #{ default => false + , desc => +"Whether to initiate all subscribed topic name registration messages to the +client after the Session has been taken over by a new channel." + })} , {predefined, sc(hoconsc:array(ref(mqttsn_predefined)), #{ default => [] diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 05dff665a..cfbf246c3 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -114,7 +114,12 @@ -define(NEG_QOS_CLIENT_ID, <<"NegQoS-Client">>). --define(REGISTER_TIMEOUT, 10000). % 10s +-define(REGISTER_INFLIGHT(TopicId, TopicName), + #channel{register_inflight = {TopicId, _, TopicName}}). + +-define(MAX_RETRY_TIMES, 3). + +-define(REGISTER_TIMEOUT, 5000). % 5s -define(DEFAULT_SESSION_EXPIRY, 7200000). %% 2h %%-------------------------------------------------------------------- @@ -546,28 +551,19 @@ handle_in(?SN_REGISTER_MSG(_TopicId, MsgId, TopicName), {ok, {outgoing, AckPacket}, Channel} end; -handle_in(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), - Channel = #channel{register_inflight = Inflight}) -> - case Inflight of - {TopicId, _, TopicName} -> - ?SLOG(debug, #{ msg => "register_topic_name_to_client_succesfully" - , topic_id => TopicId - , topic_name => TopicName - }), - NChannel = cancel_timer( - register_timer, - Channel#channel{register_inflight = undefined}), - send_next_register_or_replay_publish(TopicName, NChannel); - _ -> - ?SLOG(error, #{ msg => "unexpected_regack_msg" - , msg_id => MsgId - , topic_id => TopicId - , current_inflight => Inflight - }), - {ok, Channel} - end; +handle_in(?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED), + Channel = ?REGISTER_INFLIGHT(TopicId, TopicName)) -> + ?SLOG(debug, #{ msg => "register_topic_name_to_client_succesfully" + , topic_id => TopicId + , topic_name => TopicName + }), + NChannel = cancel_timer( + register_timer, + Channel#channel{register_inflight = undefined}), + send_next_register_or_replay_publish(TopicName, NChannel); -handle_in(?SN_REGACK_MSG(_TopicId, _MsgId, Reason), Channel) -> +handle_in(?SN_REGACK_MSG(TopicId, _MsgId, Reason), + Channel = ?REGISTER_INFLIGHT(TopicId, TopicName)) -> case Reason of ?SN_RC_CONGESTION -> %% TODO: a or b? @@ -575,11 +571,28 @@ handle_in(?SN_REGACK_MSG(_TopicId, _MsgId, Reason), Channel) -> %% b. re-new the re-transmit timer {ok, Channel}; _ -> - %% disconnect this client, if the reason is + %% skipp this topic-name register, if the reason is %% ?SN_RC_NOT_SUPPORTED, ?SN_RC_INVALID_TOPIC_ID, etc. - handle_out(disconnect, ?SN_RC_NOT_SUPPORTED, Channel) + ?SLOG(warning, #{ msg => "skipp_register_topic_name_to_client" + , topic_id => TopicId + , topic_name => TopicName + }), + NChannel = cancel_timer( + register_timer, + Channel#channel{register_inflight = undefined}), + send_next_register_or_replay_publish(TopicName, NChannel) end; +handle_in(?SN_REGACK_MSG(TopicId, MsgId, Reason), + Channel = #channel{register_inflight = Inflight}) -> + ?SLOG(error, #{ msg => "unexpected_regack_msg" + , acked_msg_id => MsgId + , acked_topic_id => TopicId + , acked_reason => Reason + , current_inflight => Inflight + }), + {ok, Channel}; + handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) -> TopicId = case is_integer(TopicId0) of true -> TopicId0; @@ -642,7 +655,7 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), %% involving the predefined topic name in register to %% enhance the gateway's robustness even inconsistent %% with MQTT-SN channels - handle_out(register, {TopicId, MsgId, TopicName}, Channel) + handle_out(register, {TopicId, TopicName}, Channel) end; _ -> ?SLOG(error, #{ msg => "cannt_handle_PUBACK" @@ -779,12 +792,10 @@ handle_in(?SN_PINGREQ_MSG(ClientId), awake(ClientId, Channel); handle_in(?SN_PINGREQ_MSG(ClientId), - Channel = #channel{conn_state = ConnState}) -> - ?SLOG(error, #{ msg => "awake_pingreq_in_bad_conn_state" - , conn_state => ConnState - , clientid => ClientId - }), - handle_out(disconnect, protocol_error, Channel); + Channel = #channel{ + conn_state = connected, + clientinfo = #{clientid := ClientId}}) -> + {ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel}; handle_in(?SN_DISCONNECT_MSG(_Duration = undefined), Channel) -> handle_out(disconnect, normal, Channel); @@ -833,30 +844,18 @@ after_message_acked(ClientInfo, Msg, #channel{ctx = Ctx}) -> outgoing_and_update(Pkt) -> [{outgoing, Pkt}, {event, update}]. -send_next_register_or_replay_publish(TopicName, - Channel = #channel{ - session = Session, - register_awaiting_queue = []}) -> - case emqx_inflight:to_list(emqx_session:info(inflight, Session)) of - [] -> {ok, Channel}; - [{PktId, {inflight_data, _, Msg, _}}] -> - case TopicName =:= emqx_message:topic(Msg) of - false -> - ?SLOG(warning, #{ msg => "replay_inflight_message_failed" - , acked_topic_name => TopicName - , inflight_message => Msg - }), - {ok, Channel}; - true -> - NMsg = emqx_message:set_flag(dup, true, Msg), - handle_out(publish, {PktId, NMsg}, Channel) - end - end; -send_next_register_or_replay_publish(_TopicName, - Channel = #channel{ - register_awaiting_queue = RAQueue}) -> +send_next_register_or_replay_publish( + _TopicName, + Channel = #channel{ register_awaiting_queue = []}) -> + {Outgoing, NChannel} = resume_or_replay_messages(Channel), + {ok, Outgoing, NChannel}; + +send_next_register_or_replay_publish( + _TopicName, + Channel = #channel{register_awaiting_queue = RAQueue}) -> [RegisterReq | NRAQueue] = RAQueue, - handle_out(register, RegisterReq, Channel#channel{register_awaiting_queue = NRAQueue}). + handle_out(register, RegisterReq, + Channel#channel{register_awaiting_queue = NRAQueue}). %%-------------------------------------------------------------------- %% Handle Publish @@ -1263,36 +1262,32 @@ handle_out(pubrel, MsgId, Channel) -> handle_out(pubcomp, MsgId, Channel) -> {ok, {outgoing, ?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)}, Channel}; -handle_out(register, {TopicId, MsgId, TopicName}, - Channel = #channel{register_inflight = undefined}) -> +handle_out(register, {TopicId, TopicName}, + Channel = #channel{session = Session, + register_inflight = undefined}) -> + {MsgId, NSession} = emqx_session:obtain_next_pkt_id(Session), Outgoing = {outgoing, ?SN_REGISTER_MSG(TopicId, MsgId, TopicName)}, - NChannel = Channel#channel{register_inflight = {TopicId, MsgId, TopicName}}, - {ok, Outgoing, ensure_timer(register_timer, ?REGISTER_TIMEOUT, NChannel)}; + NChannel = Channel#channel{ + session = NSession, + register_inflight = {TopicId, MsgId, TopicName}}, + {ok, Outgoing, ensure_register_timer(NChannel)}; -handle_out(register, {TopicId, MsgId, TopicName}, +handle_out(register, {TopicId, TopicName}, Channel = #channel{register_inflight = Inflight, register_awaiting_queue = RAQueue}) -> - case Inflight of - {_, _, TopicName} -> - ?SLOG(debug, #{ msg => "ingore_handle_out_register" - , requested_register_msg => + case enqueue_register_request({TopicId, TopicName}, Inflight, RAQueue) of + ignore -> + ?SLOG(debug, #{ msg => "ingore_register_request_to_client" + , register_request => #{ topic_id => TopicId - , msg_id => MsgId , topic_name => TopicName } }), {ok, Channel}; - {InflightTopicId, InflightMsgId, InflightTopicName} -> - NRAQueue = RAQueue ++ [{TopicId, MsgId, TopicName}], + NRAQueue -> ?SLOG(debug, #{ msg => "put_register_msg_into_awaiting_queue" - , inflight_register_msg => - #{ topic_id => InflightTopicId - , msg_id => InflightMsgId - , topic_name => InflightTopicName - } - , queued_register_msg => + , register_request => #{ topic_id => TopicId - , msg_id => MsgId , topic_name => TopicName } , register_awaiting_queue_size => length(NRAQueue) @@ -1302,7 +1297,20 @@ handle_out(register, {TopicId, MsgId, TopicName}, handle_out(disconnect, RC, Channel) -> DisPkt = ?SN_DISCONNECT_MSG(undefined), - {ok, [{outgoing, DisPkt}, {close, RC}], Channel}. + Reason = case is_atom(RC) of + true -> RC; + false -> returncode_name(RC) + end, + {ok, [{outgoing, DisPkt}, {close, Reason}], Channel}. + +enqueue_register_request({_, TopicName}, {_, _, TopicName}, _RAQueue) -> + ignore; +enqueue_register_request({TopicId, TopicName}, _, RAQueue) -> + HasQueued = lists:any(fun({_, T}) -> T == TopicName end, RAQueue), + case HasQueued of + true -> ignore; + false -> RAQueue ++ [{TopicId, TopicName}] + end. %%-------------------------------------------------------------------- %% Return ConnAck @@ -1310,35 +1318,53 @@ handle_out(disconnect, RC, Channel) -> return_connack(AckPacket, Channel) -> Replies1 = [{event, connected}, {outgoing, AckPacket}], - case maybe_resume_session(Channel) of - ignore -> {ok, Replies1, Channel}; - {ok, Publishes, NSession} -> - NChannel = Channel#channel{session = NSession, - resuming = false, - pendings = [] - }, - {Replies2, NChannel1} = outgoing_deliver_and_register( - do_deliver(Publishes, NChannel) - ), - {ok, Replies1 ++ Replies2, NChannel1} - end. + {Replies2, NChannel} = maybe_resume_session(Channel), + {ok, Replies1 ++ Replies2, NChannel}. %%-------------------------------------------------------------------- %% Maybe Resume Session -maybe_resume_session(#channel{resuming = false}) -> - ignore; -maybe_resume_session(#channel{session = Session, - resuming = true, - pendings = Pendings, clientinfo = ClientInfo}) -> - {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), - case emqx_session:deliver(ClientInfo, Pendings, Session1) of - {ok, Session2} -> - {ok, Publishes, Session2}; - {ok, More, Session2} -> - {ok, lists:append(Publishes, More), Session2} +maybe_resume_session(Channel = #channel{resuming = false}) -> + {[], Channel}; +maybe_resume_session(Channel = #channel{session = Session, + resuming = true}) -> + Subs = emqx_session:info(subscriptions, Session), + case subs_resume() andalso map_size(Subs) =/= 0 of + true -> + TopicNames = lists:filter(fun(T) -> not emqx_topic:wildcard(T) + end, maps:keys(Subs)), + Registers = lists:map(fun(T) -> {register, T} end, TopicNames), + {Registers, Channel}; + false -> + resume_or_replay_messages(Channel) end. +resume_or_replay_messages(Channel = #channel{ + resuming = Resuming, + pendings = Pendings, + session = Session, + clientinfo = ClientInfo}) -> + {NPendings, NChannel} = + case Resuming of + true -> + {Pendings, Channel#channel{resuming = false, pendings = []}}; + false -> + {[], Channel} + end, + {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), + {NPublishes, NSession} = + case emqx_session:deliver(ClientInfo, NPendings, Session1) of + {ok, Session2} -> + {Publishes, Session2}; + {ok, More, Session2} -> + {lists:append(Publishes, More), Session2} + end, + outgoing_deliver_and_register( + do_deliver(NPublishes, NChannel#channel{session = NSession})). + +subs_resume() -> + emqx:get_config([gateway, mqttsn, subs_resume]). + %%-------------------------------------------------------------------- %% Deliver publish: broker -> client %%-------------------------------------------------------------------- @@ -1539,32 +1565,16 @@ handle_info(clean_authz_cache, Channel) -> handle_info({subscribe, _}, Channel) -> {ok, Channel}; -handle_info({register, TopicName}, - Channel = #channel{ - registry = Registry, - session = Session}) -> - ClientId = clientid(Channel), - case emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) of - undefined -> - case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of - {error, Reason} -> - ?SLOG(error, #{ msg => "register_topic_failed" - , topic_name => TopicName - , reason => Reason - }), - {ok, Channel}; - TopicId -> - {MsgId, NSession} = emqx_session:obtain_next_pkt_id(Session), - handle_out( - register, - {TopicId, MsgId, TopicName}, - Channel#channel{session = NSession}) - end; - Registered -> - ?SLOG(debug, #{ msg => "ignore_register_request" - , registered_as => Registered +handle_info({register, TopicName}, Channel) -> + case ensure_registered_topic_name(TopicName, Channel) of + {error, Reason} -> + ?SLOG(error, #{ msg => "register_topic_failed" + , topic_name => TopicName + , reason => Reason }), - {ok, Channel} + {ok, Channel}; + {ok, TopicId} -> + handle_out(register, {TopicId, TopicName}, Channel) end; handle_info(Info, Channel) -> @@ -1581,6 +1591,19 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> _ -> shutdown(Reason, Channel) end. +ensure_registered_topic_name(TopicName, + Channel = #channel{registry = Registry}) -> + ClientId = clientid(Channel), + case emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) of + undefined -> + case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of + {error, Reason} -> {error, Reason}; + TopicId -> {ok, TopicId} + end; + TopicId -> + {ok, TopicId} + end. + %%-------------------------------------------------------------------- %% Ensure disconnected @@ -1623,12 +1646,17 @@ handle_deliver(Delivers, Channel = #channel{ ), {ok, Channel#channel{session = NSession}}; +%% There are two secensar need to cache delivering messages: +%% 1. it is being takeover by other channel +%% 2. it is being resume registered topic-names handle_deliver(Delivers, Channel = #channel{ ctx = Ctx, - takeover = true, + takeover = Takeover, pendings = Pendings, session = Session, - clientinfo = #{clientid := ClientId}}) -> + resuming = Resuming, + clientinfo = #{clientid := ClientId}}) + when Takeover == true; Resuming == true -> NPendings = lists:append( Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx) @@ -1680,7 +1708,6 @@ not_nacked({deliver, _Topic, Msg}) -> -> {ok, channel()} | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()}. - handle_timeout(_TRef, {keepalive, _StatVal}, Channel = #channel{keepalive = undefined}) -> {ok, Channel}; @@ -1731,6 +1758,23 @@ handle_timeout(_TRef, expire_awaiting_rel, {ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})} end; +handle_timeout(_TRef, {retry_register, RetryTimes}, + Channel = #channel{register_inflight = {TopicId, MsgId, TopicName}}) -> + case RetryTimes < ?MAX_RETRY_TIMES of + true -> + Outgoing = {outgoing, ?SN_REGISTER_MSG(TopicId, MsgId, TopicName)}, + {ok, Outgoing, ensure_register_timer(RetryTimes + 1, Channel)}; + false -> + ?SLOG(error, #{ msg => "register_topic_reached_max_retry_times" + , register_request => + #{ topic_id => TopicId + , msg_id => MsgId + , topic_name => TopicName + } + }), + handle_out(disconnect, ?SN_RC2_REACHED_MAX_RETRY, Channel) + end; + handle_timeout(_TRef, expire_session, Channel) -> shutdown(expired, Channel); @@ -1788,6 +1832,14 @@ ensure_asleep_timer(Durtion, Channel) -> ensure_timer(asleep_timer, timer:seconds(Durtion), Channel#channel{asleep_timer_duration = Durtion}). +ensure_register_timer(Channel) -> + ensure_register_timer(0, Channel). + +ensure_register_timer(RetryTimes, Channel = #channel{timers = Timers}) -> + Msg = maps:get(register_timer, ?TIMER_TABLE), + TRef = emqx_misc:start_timer(?REGISTER_TIMEOUT, {Msg, RetryTimes}), + Channel#channel{timers = Timers#{register_timer => TRef}}. + cancel_timer(Name, Channel = #channel{timers = Timers}) -> case maps:get(Name, Timers, undefined) of undefined -> @@ -1858,4 +1910,5 @@ returncode_name(?SN_RC2_NOT_AUTHORIZE) -> rejected_not_authorize; returncode_name(?SN_RC2_FAILED_SESSION) -> rejected_failed_open_session; returncode_name(?SN_RC2_KEEPALIVE_TIMEOUT) -> rejected_keepalive_timeout; returncode_name(?SN_RC2_EXCEED_LIMITATION) -> rejected_exceed_limitation; +returncode_name(?SN_RC2_REACHED_MAX_RETRY) -> reached_max_retry_times; returncode_name(_) -> accepted. diff --git a/apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl b/apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl index 1b9d2ec9e..016fc1231 100644 --- a/apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl +++ b/apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl @@ -58,6 +58,7 @@ -define(SN_RC2_FAILED_SESSION, 16#81). -define(SN_RC2_KEEPALIVE_TIMEOUT, 16#82). -define(SN_RC2_EXCEED_LIMITATION, 16#83). +-define(SN_RC2_REACHED_MAX_RETRY, 16#84). -define(QOS_NEG1, 3). diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index 07f55a7d8..449ad7306 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -89,12 +89,22 @@ all() -> init_per_suite(Config) -> ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT), - emqx_mgmt_api_test_util:init_suite([emqx_gateway]), + emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_gateway]), Config. end_per_suite(_) -> {ok, _} = emqx:remove_config([gateway, mqttsn]), - emqx_mgmt_api_test_util:end_suite([emqx_gateway]). + emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_conf]). + +restart_mqttsn_with_subs_resume_on() -> + emqx_gateway_conf:update_gateway( + mqttsn, + #{<<"subs_resume">> => <<"true">>}). + +restart_mqttsn_with_subs_resume_off() -> + emqx_gateway_conf:update_gateway( + mqttsn, + #{<<"subs_resume">> => <<"false">>}). %%-------------------------------------------------------------------- %% Test cases @@ -992,8 +1002,8 @@ t_delivery_qos1_register_invalid_topic_id(_) -> %% acked with ?SN_RC_INVALID_TOPIC_ID to send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID), - ?assertEqual( - {TopicId, MsgId}, + ?assertMatch( + {TopicId, _}, check_register_msg_on_udp(<<"ab">>, receive_response(Socket))), send_regack_msg(Socket, TopicId, MsgId + 1), @@ -1008,111 +1018,6 @@ t_delivery_qos1_register_invalid_topic_id(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). -t_delivery_takeover_and_re_register(_) -> - MsgId = 1, - {ok, Socket} = gen_udp:open(0, [binary]), - send_connect_msg(Socket, <<"test">>, 0), - ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), - - send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1), - <<_, ?SN_SUBACK, 2#00100000, - TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), - - send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), - <<_, ?SN_SUBACK, 2#01000000, - TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), - - _ = emqx:publish( - emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), - _ = emqx:publish( - emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), - send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), - send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), - - send_disconnect_msg(Socket, undefined), - ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), - gen_udp:close(Socket), - - %% offline messages will be queued into the MQTT-SN session - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), - - {ok, NSocket} = gen_udp:open(0, [binary]), - send_connect_msg(NSocket, <<"test">>, 0), - ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, - receive_response(NSocket)), - - %% qos1 - - %% received the resume messages - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket), - %% only one qos1/qos2 inflight - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID), - %% recv register - <<_, ?SN_REGISTER, - TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), - send_regack_msg(NSocket, TopicIdA, RegMsgIdA), - %% received the replay messages - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED), - - %% qos2 - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket), - %% only one qos1/qos2 inflight - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID), - %% recv register - <<_, ?SN_REGISTER, - TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), - send_regack_msg(NSocket, TopicIdB, RegMsgIdB), - %% received the replay messages - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket), - send_pubrec_msg(NSocket, MsgIdB1), - <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket), - send_pubcomp_msg(NSocket, MsgIdB1), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED), - - %% no more messages - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - - send_disconnect_msg(NSocket, undefined), - ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), - - _ = emqx_gateway_cm:kick_session(mqttsn, <<"test">>), - - gen_udp:close(NSocket). - t_will_case01(_) -> QoS = 1, Duration = 1, @@ -1924,6 +1829,326 @@ t_broadcast_test1(_) -> timer:sleep(600), gen_udp:close(Socket). +t_register_subs_resume_on(_) -> + restart_mqttsn_with_subs_resume_on(), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"test-b">>)), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + %% offline messages will be queued into the MQTT-SN session + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-a">>, <<"m3">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-b">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), + + emqx_logger:set_log_level(debug), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA), + + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdB, RegMsgIdB), + + %% receive the queued messages + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdA:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdA:16, MsgIdA2:16, "m3">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdA2), + <<_, ?SN_PUBREL, MsgIdA2:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdA2), + + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdB:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdB:16, MsgIdB1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB2:16, "m3">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdB2), + <<_, ?SN_PUBREL, MsgIdB2:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdB2), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1), + restart_mqttsn_with_subs_resume_off(). + +t_register_subs_resume_off(_) -> + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#00100000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), + _ = emqx:publish( + emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + %% offline messages will be queued into the MQTT-SN session + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% qos1 + + %% received the resume messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED), + + %% qos2 + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdB, RegMsgIdB), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdB1), + <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdB1), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1). + +t_register_skip_failure_topic_name_and_reach_max_retry_times(_) -> + restart_mqttsn_with_subs_resume_on(), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + emqx_logger:set_log_level(debug), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + + %% registered failured topic-name will be skipped + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_INVALID_TOPIC_ID), + + %% the gateway try to shutdown this client if it reached max-retry-times + %% + %% times-0 + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% times-1 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% times-2 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% just a ping + send_pingreq_msg(NSocket, <<"test">>), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(NSocket)), + %% times-3 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% shutdown due to reached max retry times + timer:sleep(5000), %% RETYRY_TIMEOUT + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), + gen_udp:close(NSocket), + restart_mqttsn_with_subs_resume_off(). + +t_register_enqueue_delivering_messages(_) -> + restart_mqttsn_with_subs_resume_on(), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + + %% registered failured topic-name will be skipped + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + + send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_ACCEPTED), + + %% receive the queued messages + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdA:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1), + restart_mqttsn_with_subs_resume_off(). + t_socket_passvice(_) -> %% TODO: test this gateway enter the passvie event ok. @@ -2082,9 +2307,12 @@ send_register_msg(Socket, TopicName, MsgId) -> ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket). send_regack_msg(Socket, TopicId, MsgId) -> + send_regack_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED). + +send_regack_msg(Socket, TopicId, MsgId, Rc) -> Length = 7, MsgType = ?SN_REGACK, - Packet = <>, + Packet = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet). send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) ->