diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index c59f37e04..d13af6f67 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -753,6 +753,7 @@ serialize_and_inc_stats_fun(#state{ send(IoData, State = #state{socket = Socket, chann_mod = ChannMod, channel = Channel}) -> + ?SLOG(debug, #{msg => "SEND_data", data => IoData}), Ctx = ChannMod:info(ctx, Channel), Oct = iolist_size(IoData), ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.sent', Oct), diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 24ec4415f..0e33793f8 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -103,6 +103,8 @@ -define(T_TAKEOVER, 15000). -define(DEFAULT_BATCH_SIZE, 10000). +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -164,21 +166,20 @@ insert_channel_info(GwName, ClientId, Info, Stats) -> %% @doc Get info of a channel. -spec get_chan_info(gateway_name(), emqx_types:clientid()) - -> emqx_types:infos() | undefined. + -> emqx_types:infos() | undefined. get_chan_info(GwName, ClientId) -> with_channel(GwName, ClientId, fun(ChanPid) -> get_chan_info(GwName, ClientId, ChanPid) end). --spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> - [pid()]. +-spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> [pid()]. do_lookup_by_clientid(GwName, ClientId) -> ChanTab = emqx_gateway_cm:tabname(chan, GwName), [Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)]. -spec do_get_chan_info(gateway_name(), emqx_types:clientid(), pid()) - -> emqx_types:infos() | undefined. + -> emqx_types:infos() | undefined. do_get_chan_info(GwName, ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try @@ -189,15 +190,17 @@ do_get_chan_info(GwName, ClientId, ChanPid) -> end. -spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) - -> emqx_types:infos() | undefined. + -> emqx_types:infos() | undefined. get_chan_info(GwName, ClientId, ChanPid) -> - wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)). + wrap_rpc( + emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid) + ). --spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> - [pid()]. +-spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> [pid()]. lookup_by_clientid(GwName, ClientId) -> Nodes = mria_mnesia:running_nodes(), - case emqx_gateway_cm_proto_v1:lookup_by_clientid(Nodes, GwName, ClientId) of + case emqx_gateway_cm_proto_v1:lookup_by_clientid( + Nodes, GwName, ClientId) of {Pids, []} -> lists:append(Pids); {_, _BadNodes} -> @@ -390,7 +393,7 @@ takeover_session(GwName, ClientId) -> [ChanPid] -> do_takeover_session(GwName, ClientId, ChanPid); ChanPids -> - [ChanPid|StalePids] = lists:reverse(ChanPids), + [ChanPid | StalePids] = lists:reverse(ChanPids), ?SLOG(warning, #{ msg => "more_than_one_channel_found" , chan_pids => ChanPids }), @@ -565,41 +568,54 @@ do_get_chann_conn_mod(GwName, ClientId, ChanPid) -> get_chann_conn_mod(GwName, ClientId, ChanPid) -> wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)). --spec call(gateway_name(), emqx_types:clientid(), term()) -> term(). +-spec call(gateway_name(), emqx_types:clientid(), term()) + -> undefined | term(). call(GwName, ClientId, Req) -> - with_channel(GwName, ClientId, fun(ChanPid) -> - wrap_rpc(emqx_gateway_cm_proto_v1:call(GwName, ClientId, ChanPid, Req)) - end). + with_channel( + GwName, ClientId, + fun(ChanPid) -> + wrap_rpc( + emqx_gateway_cm_proto_v1:call(GwName, ClientId, ChanPid, Req) + ) + end). --spec call(gateway_name(), emqx_types:clientid(), term(), timeout()) -> term(). +-spec call(gateway_name(), emqx_types:clientid(), term(), timeout()) + -> undefined | term(). call(GwName, ClientId, Req, Timeout) -> - with_channel(GwName, ClientId, fun(ChanPid) -> - wrap_rpc( - emqx_gateway_cm_proto_v1:call( - GwName, ClientId, ChanPid, Req, Timeout)) - end). + with_channel( + GwName, ClientId, + fun(ChanPid) -> + wrap_rpc( + emqx_gateway_cm_proto_v1:call( + GwName, ClientId, ChanPid, Req, Timeout) + ) + end). do_call(GwName, ClientId, ChanPid, Req) -> case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of - undefined -> error(noproc); + undefined -> throw(noproc); ConnMod -> ConnMod:call(ChanPid, Req) end. do_call(GwName, ClientId, ChanPid, Req, Timeout) -> case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of - undefined -> error(noproc); + undefined -> throw(noproc); ConnMod -> ConnMod:call(ChanPid, Req, Timeout) end. --spec cast(gateway_name(), emqx_types:clientid(), term()) -> term(). +-spec cast(gateway_name(), emqx_types:clientid(), term()) -> ok. cast(GwName, ClientId, Req) -> - with_channel(GwName, ClientId, fun(ChanPid) -> - wrap_rpc(emqx_gateway_cm_proto_v1:cast(GwName, ClientId, ChanPid, Req)) - end). + with_channel( + GwName, ClientId, + fun(ChanPid) -> + wrap_rpc( + emqx_gateway_cm_proto_v1:cast(GwName, ClientId, ChanPid, Req)) + end), + ok. do_cast(GwName, ClientId, ChanPid, Req) -> case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of - undefined -> error(noproc); + undefined -> throw(noproc); ConnMod -> ConnMod:cast(ChanPid, Req) end. @@ -625,7 +641,7 @@ locker_unlock(Locker, ClientId) -> %% @private wrap_rpc(Ret) -> case Ret of - {badrpc, Reason} -> error(Reason); + {badrpc, Reason} -> throw({badrpc, Reason}); Res -> Res end. @@ -642,7 +658,7 @@ init(Options) -> TabOpts = [public, {write_concurrency, true}], {ChanTab, ConnTab, InfoTab} = cmtabs(GwName), - ok = emqx_tables:new(ChanTab, [bag, {read_concurrency, true}|TabOpts]), + ok = emqx_tables:new(ChanTab, [bag, {read_concurrency, true} | TabOpts]), ok = emqx_tables:new(ConnTab, [bag | TabOpts]), ok = emqx_tables:new(InfoTab, [set, compressed | TabOpts]), diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index bb4bfa7f9..e0795c950 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -74,8 +74,10 @@ , listeners => [] }. --elvis([{elvis_style, god_modules, disable}]). --elvis([{elvis_style, no_nested_try_catch, disable}]). +-elvis([ {elvis_style, god_modules, disable} + , {elvis_style, no_nested_try_catch, disable} + , {elvis_style, invalid_dynamic_call, disable} + ]). -define(DEFAULT_CALL_TIMEOUT, 15000). @@ -255,48 +257,39 @@ kickout_client(GwName, ClientId) -> -> {error, any()} | {ok, list()}. list_client_subscriptions(GwName, ClientId) -> - with_channel(GwName, ClientId, - fun(Pid) -> - case emqx_gateway_conn:call( - Pid, - subscriptions, ?DEFAULT_CALL_TIMEOUT) of - {ok, Subs} -> - {ok, lists:map(fun({Topic, SubOpts}) -> - SubOpts#{topic => Topic} - end, Subs)}; - {error, Reason} -> - {error, Reason} - end - end). + case client_call(GwName, ClientId, subscriptions) of + {error, Reason} -> {error, Reason}; + {ok, Subs} -> + {ok, lists:map(fun({Topic, SubOpts}) -> + SubOpts#{topic => Topic} + end, Subs)} + end. -spec client_subscribe(gateway_name(), emqx_types:clientid(), emqx_types:topic(), emqx_types:subopts()) -> {error, any()} | {ok, {emqx_types:topic(), emqx_types:subopts()}}. client_subscribe(GwName, ClientId, Topic, SubOpts) -> - with_channel(GwName, ClientId, - fun(Pid) -> - emqx_gateway_conn:call( - Pid, {subscribe, Topic, SubOpts}, - ?DEFAULT_CALL_TIMEOUT - ) - end). + client_call(GwName, ClientId, {subscribe, Topic, SubOpts}). -spec client_unsubscribe(gateway_name(), emqx_types:clientid(), emqx_types:topic()) -> {error, any()} | ok. client_unsubscribe(GwName, ClientId, Topic) -> - with_channel(GwName, ClientId, - fun(Pid) -> - emqx_gateway_conn:call( - Pid, {unsubscribe, Topic}, ?DEFAULT_CALL_TIMEOUT) - end). + client_call(GwName, ClientId, {unsubscribe, Topic}). -with_channel(GwName, ClientId, Fun) -> - case emqx_gateway_cm:with_channel(GwName, ClientId, Fun) of - undefined -> {error, not_found}; +client_call(GwName, ClientId, Req) -> + try emqx_gateway_cm:call( + GwName, ClientId, + Req, ?DEFAULT_CALL_TIMEOUT) of + undefined -> + {error, not_found}; Res -> Res + catch throw : noproc -> + {error, not_found}; + throw : {badrpc, Reason} -> + {error, {badrpc, Reason}} end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index ff52c2295..fac55e24d 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -127,10 +127,14 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) -> services => #{ 'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr} }, - SvrOptions = case maps:to_list(maps:get(ssl, Options, #{})) of - [] -> []; - SslOpts -> - [{ssl_options, SslOpts}] + SvrOptions = case emqx_map_lib:deep_get([ssl, enable], Options, false) of + false -> []; + true -> + [{ssl_options, + maps:to_list( + maps:without([enable], maps:get(ssl, Options, #{})) + ) + }] end, case grpc:start_server(GwName, ListenOn, Services, SvrOptions) of {ok, _SvrPid} -> diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index f01225431..05dff665a 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -71,6 +71,8 @@ register_inflight :: maybe(term()), %% Topics list for awaiting to register to client register_awaiting_queue :: list(), + %% Duration for asleep + asleep_timer_duration :: integer() | undefined, %% Timer timers :: #{atom() => disable | undefined | reference()}, %%% Takeover @@ -81,16 +83,17 @@ pendings :: list() }). --type(channel() :: #channel{}). +-type channel() :: #channel{}. --type(conn_state() :: idle | connecting | connected | asleep | disconnected). +-type conn_state() :: idle | connecting | connected | asleep | awake + | disconnected. --type(reply() :: {outgoing, mqtt_sn_message()} +-type reply() :: {outgoing, mqtt_sn_message()} | {outgoing, [mqtt_sn_message()]} | {event, conn_state()|updated} - | {close, Reason :: atom()}). + | {close, Reason :: atom()}. --type(replies() :: reply() | [reply()]). +-type replies() :: reply() | [reply()]. -define(TIMER_TABLE, #{ alive_timer => keepalive, @@ -471,8 +474,25 @@ handle_in(?SN_WILLMSG_MSG(Payload), handle_out(connack, ReasonCode, Channel) end; +%% TODO: takeover ??? +handle_in(?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, ClientId), + Channel = #channel{ + clientinfo = #{clientid := ClientId}, + conn_state = ConnState}) + when ConnState == asleep; + ConnState == awake -> + %% From the asleep or awake state a client can return either to the + %% active state by sending a CONNECT message [6.14] + ?SLOG(info, #{ msg => "goto_connected_state" + , previous_state => ConnState + , clientid => ClientId + }), + handle_out(connack, ?SN_RC_ACCEPTED, + Channel#channel{conn_state = connected}); + +%% new connection handle_in(Packet = ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId), - Channel) -> + Channel = #channel{conn_state = idle}) -> case emqx_misc:pipeline( [ fun enrich_conninfo/2 , fun run_conn_hooks/2 @@ -589,7 +609,10 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), case emqx_session:puback(ClientInfo, MsgId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), - {ok, Channel#channel{session = NSession}}; + {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent( + Channel#channel{session = NSession} + ), + {ok, Replies, NChannel}; {ok, Msg, Publishes, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), handle_out(publish, @@ -672,7 +695,10 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId), Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) -> case emqx_session:pubcomp(ClientInfo, MsgId, Session) of {ok, NSession} -> - {ok, Channel#channel{session = NSession}}; + {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent( + Channel#channel{session = NSession} + ), + {ok, Replies, NChannel}; {ok, Publishes, NSession} -> handle_out(publish, Publishes, Channel#channel{session = NSession}); @@ -732,32 +758,47 @@ handle_in(UnsubPkt = ?SN_UNSUBSCRIBE_MSG(_, MsgId, TopicIdOrName), {ok, {outgoing, UnsubAck}, NChannel} end; -handle_in(?SN_PINGREQ_MSG(_ClientId), - Channel = #channel{conn_state = asleep}) -> - {ok, Outgoing, NChannel} = awake(Channel), - NOutgoings = Outgoing ++ [{outgoing, ?SN_PINGRESP_MSG()}], - {ok, NOutgoings, NChannel}; - -handle_in(?SN_PINGREQ_MSG(_ClientId), Channel) -> +handle_in(?SN_PINGREQ_MSG(ClientId), Channel) + when ClientId == undefined; + ClientId == <<>> -> {ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel}; -handle_in(?SN_PINGRESP_MSG(), Channel) -> +handle_in(?SN_PINGREQ_MSG(ReqClientId), + Channel = #channel{clientinfo = #{clientid := ClientId}}) + when ReqClientId =/= ClientId -> + ?SLOG(warning, #{ msg => "awake_pingreq_clientid_not_match" + , clientid => ClientId + , request_clientid => ReqClientId + }), + %% FIXME: takeover_and_awake.. {ok, Channel}; -handle_in(?SN_DISCONNECT_MSG(Duration), Channel) -> - case Duration of - undefined -> - handle_out(disconnect, normal, Channel); - _ -> - %% A DISCONNECT message with a Duration field is sent by a client - %% when it wants to go to the “asleep” state. The receipt of this - %% message is also acknowledged by the gateway by means of a - %% DISCONNECT message (without a duration field) [5.4.21] - %% - %% TODO: asleep mechanism - AckPkt = ?SN_DISCONNECT_MSG(undefined), - {ok, {outgoing, AckPkt}, asleep(Duration, Channel)} - end; +handle_in(?SN_PINGREQ_MSG(ClientId), + Channel = #channel{conn_state = ConnState}) + when ConnState == idle; ConnState == asleep; ConnState == awake -> + awake(ClientId, Channel); + +handle_in(?SN_PINGREQ_MSG(ClientId), + Channel = #channel{conn_state = ConnState}) -> + ?SLOG(error, #{ msg => "awake_pingreq_in_bad_conn_state" + , conn_state => ConnState + , clientid => ClientId + }), + handle_out(disconnect, protocol_error, Channel); + +handle_in(?SN_DISCONNECT_MSG(_Duration = undefined), Channel) -> + handle_out(disconnect, normal, Channel); + +handle_in(?SN_DISCONNECT_MSG(Duration), + Channel = #channel{conn_state = ConnState}) + when ConnState == connected; ConnState == asleep -> + %% A DISCONNECT message with a Duration field is sent by a client + %% when it wants to go to the “asleep” state. The receipt of this + %% message is also acknowledged by the gateway by means of a + %% DISCONNECT message (without a duration field) [5.4.21] + %% + AckPkt = ?SN_DISCONNECT_MSG(undefined), + {ok, [{outgoing, AckPkt}, {event, asleep}], asleep(Duration, Channel)}; handle_in(?SN_WILLTOPICUPD_MSG(Flags, Topic), Channel = #channel{will_msg = WillMsg, @@ -1100,7 +1141,24 @@ do_unsubscribe(TopicFilters, %%-------------------------------------------------------------------- %% Awake & Asleep -awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) -> +awake(ClientId, Channel = #channel{conn_state = idle}) -> + ?SLOG(warning, #{ msg => "awake_pingreq_in_idle_state" + , clientid => ClientId + }), + %% TODO: takeover and awake? + %% 1. Query emqx_cm_registry to get the session state? + %% 2. Takeover it and goto awake state + {ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel}; + +awake(ClientId, Channel = #channel{ + conn_state = ConnState, + session = Session, + clientinfo = ClientInfo = #{clientid := ClientId}}) + when ConnState == asleep; ConnState == awake -> + ?SLOG(info, #{ msg => "goto_awake_state" + , clientid => ClientId + , previous_state => ConnState + }), {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), {NPublishes, NSession} = case emqx_session:deliver(ClientInfo, [], Session1) of {ok, Session2} -> @@ -1108,24 +1166,57 @@ awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) -> {ok, More, Session2} -> {lists:append(Publishes, More), Session2} end, - {Replies, NChannel} = outgoing_deliver_and_register( - do_deliver(NPublishes, - Channel#channel{session = NSession}) - ), - {ok, Replies, NChannel}. + Channel1 = cancel_timer(asleep_timer, Channel), + {Replies0, NChannel0} = outgoing_deliver_and_register( + do_deliver( + NPublishes, + Channel1#channel{ + conn_state = awake, session = NSession} + ) + ), + Replies1 = [{event, awake} | Replies0], + + {Replies2, NChannel} = goto_asleep_if_buffered_msgs_sent(NChannel0), + {ok, Replies1 ++ Replies2, NChannel}. + +goto_asleep_if_buffered_msgs_sent( + Channel = #channel{ + conn_state = awake, + session = Session, + asleep_timer_duration = Duration}) -> + case emqx_mqueue:is_empty(emqx_session:info(mqueue, Session)) andalso + emqx_inflight:is_empty(emqx_session:info(inflight, Session)) of + true -> + ?SLOG(info, #{ msg => "goto_asleep_state" + , reason => buffered_messages_sent + , duration => Duration + }), + Replies = [ {outgoing, ?SN_PINGRESP_MSG()} + , {event, asleep} + ], + {Replies, ensure_asleep_timer(Channel#channel{conn_state = asleep})}; + false -> + {[], Channel} + end; +goto_asleep_if_buffered_msgs_sent(Channel) -> + {[], Channel}. asleep(Duration, Channel = #channel{conn_state = asleep}) -> %% 6.14: The client can also modify its sleep duration %% by sending a DISCONNECT message with a new value of %% the sleep duration - ensure_timer(asleep_timer, Duration, - cancel_timer(asleep_timer, Channel) - ); + %% + %% XXX: Do we need to limit the maximum of Duration? + ?SLOG(debug, #{ msg => "update_asleep_timer" + , new_duration => Duration + }), + ensure_asleep_timer(Duration, cancel_timer(asleep_timer, Channel)); asleep(Duration, Channel = #channel{conn_state = connected}) -> - ensure_timer(asleep_timer, Duration, - Channel#channel{conn_state = asleep} - ). + ?SLOG(info, #{ msg => "goto_asleep_state" + , duration => Duration + }), + ensure_asleep_timer(Duration, Channel#channel{conn_state = asleep}). %%-------------------------------------------------------------------- %% Handle outgoing packet @@ -1154,10 +1245,11 @@ handle_out(connack, ReasonCode, shutdown(Reason, AckPacket, Channel); handle_out(publish, Publishes, Channel) -> - {Replies, NChannel} = outgoing_deliver_and_register( - do_deliver(Publishes, Channel) - ), - {ok, Replies, NChannel}; + {Replies1, NChannel} = outgoing_deliver_and_register( + do_deliver(Publishes, Channel) + ), + {Replies2, NChannel2} = goto_asleep_if_buffered_msgs_sent(NChannel), + {ok, Replies1 ++ Replies2, NChannel2}; handle_out(puback, {TopicId, MsgId, Rc}, Channel) -> {ok, {outgoing, ?SN_PUBACK_MSG(TopicId, MsgId, Rc)}, Channel}; @@ -1688,6 +1780,14 @@ update_will_msg(Will, Payload) -> %%-------------------------------------------------------------------- %% Timer +ensure_asleep_timer(Channel = #channel{asleep_timer_duration = Duration}) + when is_integer(Duration) -> + ensure_asleep_timer(Duration, Channel). + +ensure_asleep_timer(Durtion, Channel) -> + ensure_timer(asleep_timer, timer:seconds(Durtion), + Channel#channel{asleep_timer_duration = Durtion}). + cancel_timer(Name, Channel = #channel{timers = Timers}) -> case maps:get(Name, Timers, undefined) of undefined -> diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl index e6d136494..15f9c5288 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl @@ -362,9 +362,9 @@ format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) -> format(?SN_PINGREQ_MSG(ClientId)) -> io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]); format(?SN_PINGRESP_MSG()) -> - "SN_PINGREQ()"; + "SN_PINGRESP()"; format(?SN_DISCONNECT_MSG(Duration)) -> - io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]); + io_lib:format("SN_DISCONNECT(Duration=~w)", [Duration]); format(#mqtt_sn_message{type = Type, variable = Var}) -> io_lib:format("mqtt_sn_message(type=~s, Var=~w)", diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index 6a046b854..07f55a7d8 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -1108,6 +1108,9 @@ t_delivery_takeover_and_re_register(_) -> send_disconnect_msg(NSocket, undefined), ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), + + _ = emqx_gateway_cm:kick_session(mqttsn, <<"test">>), + gen_udp:close(NSocket). t_will_case01(_) -> @@ -1283,626 +1286,636 @@ t_will_case06(_) -> gen_udp:close(Socket). -%t_asleep_test01_timeout(_) -> -% QoS = 1, -% Duration = 1, -% WillTopic = <<"dead">>, -% WillPayload = <<10, 11, 12, 13, 14>>, -% {ok, Socket} = gen_udp:open(0, [binary]), -% -% ClientId = ?CLIENTID, -% send_connect_msg_with_will(Socket, Duration, ClientId), -% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), -% send_willtopic_msg(Socket, WillTopic, QoS), -% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), -% send_willmsg_msg(Socket, WillPayload), -% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), -% -% send_disconnect_msg(Socket, 1), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% %% asleep timer should get timeout, and device is lost -% timer:sleep(3000), -% -% gen_udp:close(Socket). -% -%t_asleep_test02_to_awake_and_back(_) -> -% QoS = 1, -% Keepalive_Duration = 1, -% SleepDuration = 5, -% WillTopic = <<"dead">>, -% WillPayload = <<10, 11, 12, 13, 14>>, -% {ok, Socket} = gen_udp:open(0, [binary]), -% -% ClientId = ?CLIENTID, -% send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), -% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), -% send_willtopic_msg(Socket, WillTopic, QoS), -% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), -% send_willmsg_msg(Socket, WillPayload), -% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), -% -% % goto asleep state -% send_disconnect_msg(Socket, SleepDuration), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% timer:sleep(4500), -% -% % goto awake state and back -% send_pingreq_msg(Socket, ClientId), -% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), -% -% timer:sleep(4500), -% -% % goto awake state and back -% send_pingreq_msg(Socket, ClientId), -% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), -% -% %% during above procedure, mqtt keepalive timer should not terminate mqtt-sn process -% -% %% asleep timer should get timeout, and device should get lost -% timer:sleep(8000), -% -% gen_udp:close(Socket). -% -%t_asleep_test03_to_awake_qos1_dl_msg(_) -> -% QoS = 1, -% Duration = 5, -% WillTopic = <<"dead">>, -% WillPayload = <<10, 11, 12, 13, 14>>, -% MsgId = 1000, -% {ok, Socket} = gen_udp:open(0, [binary]), -% ClientId = ?CLIENTID, -% send_connect_msg_with_will(Socket, Duration, ClientId), -% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), -% send_willtopic_msg(Socket, WillTopic, QoS), -% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), -% send_willmsg_msg(Socket, WillPayload), -% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), -% -% % subscribe -% TopicName1 = <<"abc">>, -% MsgId1 = 25, -% TopicId1 = ?MAX_PRED_TOPIC_ID + 1, -% WillBit = 0, -% Dup = 0, -% Retain = 0, -% CleanSession = 0, -% ReturnCode = 0, -% Payload1 = <<55, 66, 77, 88, 99>>, -% MsgId2 = 87, -% -% send_register_msg(Socket, TopicName1, MsgId1), -% ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId1:16, 0:8>>, receive_response(Socket)), -% send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId), -% ?assertEqual( -% <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, -% WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, -% TopicId1:16, MsgId:16, ReturnCode>>, -% receive_response(Socket) -% ), -% -% % goto asleep state -% send_disconnect_msg(Socket, 1), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% timer:sleep(300), -% -% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -% %% send downlink data in asleep state. This message should be send to device once it wake up -% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -% send_publish_msg_predefined_topic(Socket, QoS, MsgId2, TopicId1, Payload1), -% -% {ok, C} = emqtt:start_link(), -% {ok, _} = emqtt:connect(C), -% {ok, _} = emqtt:publish(C, TopicName1, Payload1, QoS), -% timer:sleep(100), -% ok = emqtt:disconnect(C), -% -% timer:sleep(50), -% -% % goto awake state, receive downlink messages, and go back to asleep -% send_pingreq_msg(Socket, ClientId), -% -% %% the broker should sent dl msgs to the awake client before sending the pingresp -% UdpData = receive_response(Socket), -% MsgId_udp = check_publish_msg_on_udp( -% {Dup, QoS, Retain, WillBit, CleanSession, -% ?SN_NORMAL_TOPIC, TopicId1, Payload1}, UdpData), -% send_puback_msg(Socket, TopicId1, MsgId_udp), -% -% %% check the pingresp is received at last -% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), -% -% gen_udp:close(Socket). -% -%t_asleep_test04_to_awake_qos1_dl_msg(_) -> -% QoS = 1, -% Duration = 5, -% WillTopic = <<"dead">>, -% WillPayload = <<10, 11, 12, 13, 14>>, -% {ok, Socket} = gen_udp:open(0, [binary]), -% ClientId = ?CLIENTID, -% send_connect_msg_with_will(Socket, Duration, ClientId), -% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), -% send_willtopic_msg(Socket, WillTopic, QoS), -% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), -% send_willmsg_msg(Socket, WillPayload), -% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), -% -% % subscribe -% TopicName1 = <<"a/+/c">>, -% MsgId1 = 25, -% TopicId0 = 0, -% WillBit = 0, -% Dup = 0, -% Retain = 0, -% CleanSession = 0, -% ReturnCode = 0, -% send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), -% ?assertEqual( -% <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, -% WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, -% TopicId0:16, MsgId1:16, ReturnCode>>, -% receive_response(Socket) -% ), -% -% % goto asleep state -% send_disconnect_msg(Socket, 1), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% timer:sleep(300), -% -% %% send downlink data in asleep state. This message should be send to device once it wake up -% Payload1 = <<55, 66, 77, 88, 99>>, -% Payload2 = <<55, 66, 77, 88, 100>>, -% -% {ok, C} = emqtt:start_link(), -% {ok, _} = emqtt:connect(C), -% {ok, _} = emqtt:publish(C, <<"a/b/c">>, Payload1, QoS), -% {ok, _} = emqtt:publish(C, <<"a/b/c">>, Payload2, QoS), -% timer:sleep(100), -% ok = emqtt:disconnect(C), -% -% timer:sleep(300), -% -% % goto awake state, receive downlink messages, and go back to asleep -% send_pingreq_msg(Socket, ClientId), -% -% %% 1. get REGISTER first, since this topic has never been registered -% UdpData1 = receive_response(Socket), -% {TopicIdNew, MsgId3} = check_register_msg_on_udp(<<"a/b/c">>, UdpData1), -% -% %% 2. but before we reply the REGACK, the sn-gateway should not send any PUBLISH -% ?assertError(_, receive_publish(Socket)), -% -% send_regack_msg(Socket, TopicIdNew, MsgId3), -% -% UdpData2 = receive_response(Socket), -% MsgId_udp2 = check_publish_msg_on_udp( -% {Dup, QoS, Retain, WillBit, CleanSession, -% ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData2), -% send_puback_msg(Socket, TopicIdNew, MsgId_udp2), -% -% UdpData3 = receive_response(Socket), -% MsgId_udp3 = check_publish_msg_on_udp( -% {Dup, QoS, Retain, WillBit, CleanSession, -% ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData3), -% send_puback_msg(Socket, TopicIdNew, MsgId_udp3), -% -% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), -% -% gen_udp:close(Socket). +t_asleep_test01_timeout(_) -> + QoS = 1, + Duration = 1, + WillTopic = <<"dead">>, + WillPayload = <<10, 11, 12, 13, 14>>, + {ok, Socket} = gen_udp:open(0, [binary]), + + ClientId = ?CLIENTID, + send_connect_msg_with_will(Socket, Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + send_willmsg_msg(Socket, WillPayload), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + send_disconnect_msg(Socket, 1), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + %% asleep timer should get timeout, and device is lost + timer:sleep(3000), + + ?assertEqual([], emqx_gateway_cm:lookup_by_clientid(mqttsn, ClientId)), + + gen_udp:close(Socket). + +t_asleep_test02_to_awake_and_back(_) -> + QoS = 1, + Keepalive_Duration = 1, + SleepDuration = 5, + WillTopic = <<"dead">>, + WillPayload = <<10, 11, 12, 13, 14>>, + {ok, Socket} = gen_udp:open(0, [binary]), + + ClientId = ?CLIENTID, + send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + send_willmsg_msg(Socket, WillPayload), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + % goto asleep state + send_disconnect_msg(Socket, SleepDuration), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + timer:sleep(4500), + + % goto awake state and back + send_pingreq_msg(Socket, ClientId), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + + timer:sleep(4500), + + % goto awake state and back + send_pingreq_msg(Socket, ClientId), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + + %% during above procedure, mqtt keepalive timer should not terminate mqtt-sn process + + %% asleep timer should get timeout, and device should get lost + timer:sleep(8000), + + ?assertEqual([], emqx_gateway_cm:lookup_by_clientid(mqttsn, ClientId)), + + gen_udp:close(Socket). + +t_asleep_test03_to_awake_qos1_dl_msg(_) -> + QoS = 1, + Duration = 5, + WillTopic = <<"dead">>, + WillPayload = <<10, 11, 12, 13, 14>>, + MsgId = 1000, + {ok, Socket} = gen_udp:open(0, [binary]), + ClientId = ?CLIENTID, + send_connect_msg_with_will(Socket, Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + send_willmsg_msg(Socket, WillPayload), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + % subscribe + TopicName1 = <<"abc">>, + MsgId1 = 25, + TopicId1 = ?MAX_PRED_TOPIC_ID + 1, + WillBit = 0, + Dup = 0, + Retain = 0, + CleanSession = 0, + ReturnCode = 0, + Payload1 = <<55, 66, 77, 88, 99>>, + + send_register_msg(Socket, TopicName1, MsgId1), + ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId1:16, 0:8>>, receive_response(Socket)), + send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId), + ?assertEqual( + <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), + + % goto asleep state + send_disconnect_msg(Socket, 1), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + timer:sleep(300), + + emqx_broker:publish( + emqx_message:make(<<"ct">>, QoS, TopicName1, Payload1)), + + timer:sleep(50), + + % goto awake state, receive downlink messages, and go back to asleep + send_pingreq_msg(Socket, ClientId), + + %% the broker should sent dl msgs to the awake client before sending the pingresp + UdpData = receive_response(Socket), + MsgId_udp = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, CleanSession, + ?SN_NORMAL_TOPIC, TopicId1, Payload1}, UdpData), + send_puback_msg(Socket, TopicId1, MsgId_udp), + + %% check the pingresp is received at last + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + + timer:sleep(5000), + + ?assertEqual([], emqx_gateway_cm:lookup_by_clientid(mqttsn, ClientId)), + + gen_udp:close(Socket). + +t_asleep_test04_to_awake_qos1_dl_msg(_) -> + QoS = 1, + Duration = 5, + WillTopic = <<"dead">>, + WillPayload = <<10, 11, 12, 13, 14>>, + {ok, Socket} = gen_udp:open(0, [binary]), + ClientId = ?CLIENTID, + send_connect_msg_with_will(Socket, Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + send_willmsg_msg(Socket, WillPayload), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + % subscribe + TopicName1 = <<"a/+/c">>, + MsgId1 = 25, + TopicId0 = 0, + WillBit = 0, + Dup = 0, + Retain = 0, + CleanSession = 0, + ReturnCode = 0, + send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), + ?assertEqual( + <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId1:16, ReturnCode>>, + receive_response(Socket) + ), + + % goto asleep state + send_disconnect_msg(Socket, 1), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + timer:sleep(300), + + %% send downlink data in asleep state. This message should be send to device once it wake up + Payload1 = <<55, 66, 77, 88, 99>>, + Payload2 = <<55, 66, 77, 88, 100>>, + + {ok, C} = emqtt:start_link(), + {ok, _} = emqtt:connect(C), + {ok, _} = emqtt:publish(C, <<"a/b/c">>, Payload1, QoS), + {ok, _} = emqtt:publish(C, <<"a/b/c">>, Payload2, QoS), + timer:sleep(100), + ok = emqtt:disconnect(C), + + timer:sleep(300), + + % goto awake state, receive downlink messages, and go back to asleep + send_pingreq_msg(Socket, ClientId), + + %% 1. get REGISTER first, since this topic has never been registered + UdpData1 = receive_response(Socket), + {TopicIdNew, MsgId3} = check_register_msg_on_udp(<<"a/b/c">>, UdpData1), + + %% 2. but before we reply the REGACK, the sn-gateway should not send any PUBLISH + ?assertError(_, receive_publish(Socket)), + + send_regack_msg(Socket, TopicIdNew, MsgId3), + + UdpData2 = receive_response(Socket), + MsgId_udp2 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, CleanSession, + ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData2), + send_puback_msg(Socket, TopicIdNew, MsgId_udp2), + + UdpData3 = receive_response(Socket), + MsgId_udp3 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, CleanSession, + ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData3), + send_puback_msg(Socket, TopicIdNew, MsgId_udp3), + + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + + timer:sleep(8000), + + ?assertEqual([], emqx_gateway_cm:lookup_by_clientid(mqttsn, ClientId)), + + gen_udp:close(Socket). receive_publish(Socket) -> UdpData3 = receive_response(Socket, 1000), <> = UdpData3, <<_:8, ?SN_PUBLISH, _/binary>> = HeaderUdp. -%t_asleep_test05_to_awake_qos1_dl_msg(_) -> -% QoS = 1, -% Duration = 5, -% WillTopic = <<"dead">>, -% WillPayload = <<10, 11, 12, 13, 14>>, -% {ok, Socket} = gen_udp:open(0, [binary]), -% ClientId = ?CLIENTID, -% send_connect_msg_with_will(Socket, Duration, ClientId), -% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), -% send_willtopic_msg(Socket, WillTopic, QoS), -% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), -% send_willmsg_msg(Socket, WillPayload), -% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), -% -% % subscribe -% TopicName1 = <<"u/+/w">>, -% MsgId1 = 25, -% TopicId0 = 0, -% WillBit = 0, -% Dup = 0, -% Retain = 0, -% CleanSession = 0, -% ReturnCode = 0, -% send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), -% ?assertEqual( -% <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, -% WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, -% TopicId0:16, MsgId1:16, ReturnCode>>, -% receive_response(Socket) -% ), -% -% % goto asleep state -% SleepDuration = 30, -% send_disconnect_msg(Socket, SleepDuration), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% timer:sleep(300), -% -% %% send downlink data in asleep state. This message should be send to device once it wake up -% Payload2 = <<55, 66, 77, 88, 99>>, -% Payload3 = <<61, 71, 81>>, -% Payload4 = <<100, 101, 102, 103, 104, 105, 106, 107>>, -% TopicName_test5 = <<"u/v/w">>, -% {ok, C} = emqtt:start_link(), -% {ok, _} = emqtt:connect(C), -% {ok, _} = emqtt:publish(C, TopicName_test5, Payload2, QoS), -% timer:sleep(100), -% {ok, _} = emqtt:publish(C, TopicName_test5, Payload3, QoS), -% timer:sleep(100), -% {ok, _} = emqtt:publish(C, TopicName_test5, Payload4, QoS), -% timer:sleep(200), -% ok = emqtt:disconnect(C), -% timer:sleep(50), -% -% % goto awake state, receive downlink messages, and go back to asleep -% send_pingreq_msg(Socket, ClientId), -% -% UdpData_reg = receive_response(Socket), -% {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test5, UdpData_reg), -% send_regack_msg(Socket, TopicIdNew, MsgId_reg), -% -% UdpData2 = receive_response(Socket), -% MsgId2 = check_publish_msg_on_udp( -% {Dup, QoS, Retain, WillBit, CleanSession, -% ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2), -% send_puback_msg(Socket, TopicIdNew, MsgId2), -% timer:sleep(50), -% -% UdpData3 = wrap_receive_response(Socket), -% MsgId3 = check_publish_msg_on_udp( -% {Dup, QoS, Retain, WillBit, CleanSession, -% ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3), -% send_puback_msg(Socket, TopicIdNew, MsgId3), -% timer:sleep(50), -% -% case receive_response(Socket) of -% <<2,23>> -> ok; -% UdpData4 -> -% MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, -% CleanSession, ?SN_NORMAL_TOPIC, -% TopicIdNew, Payload4}, UdpData4), -% send_puback_msg(Socket, TopicIdNew, MsgId4) -% end, -% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), -% gen_udp:close(Socket). -% -%t_asleep_test06_to_awake_qos2_dl_msg(_) -> -% QoS = 2, -% Duration = 1, -% WillTopic = <<"dead">>, -% WillPayload = <<10, 11, 12, 13, 14>>, -% {ok, Socket} = gen_udp:open(0, [binary]), -% ClientId = ?CLIENTID, -% send_connect_msg_with_will(Socket, Duration, ClientId), -% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), -% send_willtopic_msg(Socket, WillTopic, QoS), -% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), -% send_willmsg_msg(Socket, WillPayload), -% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), -% -% % subscribe -% TopicName_tom = <<"tom">>, -% MsgId1 = 25, -% WillBit = 0, -% Dup = 0, -% Retain = 0, -% CleanSession = 0, -% ReturnCode = 0, -% send_register_msg(Socket, TopicName_tom, MsgId1), -% timer:sleep(50), -% TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)), -% send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1), -% ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, -% ?SN_NORMAL_TOPIC:2, TopicId_tom:16, MsgId1:16, ReturnCode>>, -% receive_response(Socket)), -% -% % goto asleep state -% SleepDuration = 11, -% send_disconnect_msg(Socket, SleepDuration), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% timer:sleep(100), -% -% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -% %% send downlink data in asleep state. This message should be send to device once it wake up -% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -% Payload1 = <<55, 66, 77, 88, 99>>, -% {ok, C} = emqtt:start_link(), -% {ok, _} = emqtt:connect(C), -% {ok, _} = emqtt:publish(C, TopicName_tom, Payload1, QoS), -% timer:sleep(100), -% ok = emqtt:disconnect(C), -% timer:sleep(300), -% -% % goto awake state, receive downlink messages, and go back to asleep -% send_pingreq_msg(Socket, ClientId), -% -% UdpData = wrap_receive_response(Socket), -% MsgId_udp = check_publish_msg_on_udp( -% {Dup, QoS, Retain, WillBit, CleanSession, -% ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData), -% send_pubrec_msg(Socket, MsgId_udp), -% ?assertMatch(<<_:8, ?SN_PUBREL:8, _/binary>>, receive_response(Socket)), -% send_pubcomp_msg(Socket, MsgId_udp), -% -% %% verify the pingresp is received after receiving all the buffered qos2 msgs -% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), -% gen_udp:close(Socket). -% -%t_asleep_test07_to_connected(_) -> -% QoS = 1, -% Keepalive_Duration = 10, -% SleepDuration = 1, -% WillTopic = <<"dead">>, -% WillPayload = <<10, 11, 12, 13, 14>>, -% {ok, Socket} = gen_udp:open(0, [binary]), -% ClientId = ?CLIENTID, -% send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), -% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), -% send_willtopic_msg(Socket, WillTopic, QoS), -% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), -% send_willmsg_msg(Socket, WillPayload), -% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), -% -% % subscribe -% TopicName_tom = <<"tom">>, -% MsgId1 = 25, -% WillBit = 0, -% Dup = 0, -% Retain = 0, -% CleanSession = 0, -% ReturnCode = 0, -% send_register_msg(Socket, TopicName_tom, MsgId1), -% TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)), -% send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1), -% ?assertEqual( -% <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, -% WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, -% TopicId_tom:16, MsgId1:16, ReturnCode>>, -% receive_response(Socket) -% ), -% -% % goto asleep state -% send_disconnect_msg(Socket, SleepDuration), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% timer:sleep(100), -% -% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -% %% send connect message, and goto connected state -% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -% send_connect_msg(Socket, ClientId), -% ?assertEqual(<<3, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, receive_response(Socket)), -% -% timer:sleep(1500), -% % asleep timer should get timeout, without any effect -% -% timer:sleep(4000), -% % keepalive timer should get timeout -% -% gen_udp:close(Socket). -% -%t_asleep_test08_to_disconnected(_) -> -% QoS = 1, -% Keepalive_Duration = 3, -% SleepDuration = 1, -% WillTopic = <<"dead">>, -% WillPayload = <<10, 11, 12, 13, 14>>, -% {ok, Socket} = gen_udp:open(0, [binary]), -% ClientId = ?CLIENTID, -% send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), -% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), -% send_willtopic_msg(Socket, WillTopic, QoS), -% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), -% send_willmsg_msg(Socket, WillPayload), -% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), -% -% % goto asleep state -% send_disconnect_msg(Socket, SleepDuration), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% timer:sleep(100), -% -% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -% %% send disconnect message, and goto disconnected state -% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -% send_disconnect_msg(Socket, undefined), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% timer:sleep(100), -% % it is a normal termination, without will message -% -% gen_udp:close(Socket). -% -%t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> -% QoS = 1, -% Duration = 5, -% WillTopic = <<"dead">>, -% WillPayload = <<10, 11, 12, 13, 14>>, -% {ok, Socket} = gen_udp:open(0, [binary]), -% ClientId = ?CLIENTID, -% send_connect_msg_with_will(Socket, Duration, ClientId), -% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), -% send_willtopic_msg(Socket, WillTopic, QoS), -% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), -% send_willmsg_msg(Socket, WillPayload), -% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), -% -% % subscribe -% TopicName1 = <<"u/+/k">>, -% MsgId1 = 25, -% TopicId0 = 0, -% WillBit = 0, -% Dup = 0, -% Retain = 0, -% CleanSession = 0, -% ReturnCode = 0, -% send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), -% ?assertEqual( -% <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, -% WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, -% TopicId0:16, MsgId1:16, ReturnCode>>, -% receive_response(Socket) -% ), -% % goto asleep state -% SleepDuration = 30, -% send_disconnect_msg(Socket, SleepDuration), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% timer:sleep(1000), -% -% %% send downlink data in asleep state. This message should be send to device once it wake up -% Payload2 = <<55, 66, 77, 88, 99>>, -% Payload3 = <<61, 71, 81>>, -% Payload4 = <<100, 101, 102, 103, 104, 105, 106, 107>>, -% TopicName_test9 = <<"u/v/k">>, -% {ok, C} = emqtt:start_link(), -% {ok, _} = emqtt:connect(C), -% {ok, _} = emqtt:publish(C, TopicName_test9, Payload2, QoS), -% timer:sleep(100), -% {ok, _} = emqtt:publish(C, TopicName_test9, Payload3, QoS), -% timer:sleep(100), -% {ok, _} = emqtt:publish(C, TopicName_test9, Payload4, QoS), -% timer:sleep(1000), -% ok = emqtt:disconnect(C), -% -% % goto awake state, receive downlink messages, and go back to asleep -% send_pingreq_msg(Socket, ClientId), -% -% UdpData_reg = receive_response(Socket), -% {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test9, UdpData_reg), -% send_regack_msg(Socket, TopicIdNew, MsgId_reg), -% -% case wrap_receive_response(Socket) of -% udp_receive_timeout -> -% ok; -% UdpData2 -> -% MsgId2 = check_publish_msg_on_udp( -% {Dup, QoS, Retain, WillBit, CleanSession, -% ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2), -% send_puback_msg(Socket, TopicIdNew, MsgId2) -% end, -% timer:sleep(100), -% -% case wrap_receive_response(Socket) of -% udp_receive_timeout -> -% ok; -% UdpData3 -> -% MsgId3 = check_publish_msg_on_udp( -% {Dup, QoS, Retain, WillBit, CleanSession, -% ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3), -% send_puback_msg(Socket, TopicIdNew, MsgId3) -% end, -% timer:sleep(100), -% -% case wrap_receive_response(Socket) of -% udp_receive_timeout -> -% ok; -% UdpData4 -> -% MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, -% CleanSession, ?SN_NORMAL_TOPIC, -% TopicIdNew, Payload4}, UdpData4), -% send_puback_msg(Socket, TopicIdNew, MsgId4) -% end, -% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), -% -% %% send PINGREQ again to enter awake state -% send_pingreq_msg(Socket, ClientId), -% %% will not receive any buffered PUBLISH messages buffered before last -% %% awake, only receive PINGRESP here -% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), -% -% gen_udp:close(Socket). -% -%t_awake_test01_to_connected(_) -> -% QoS = 1, -% Keepalive_Duration = 3, -% SleepDuration = 1, -% WillTopic = <<"dead">>, -% WillPayload = <<10, 11, 12, 13, 14>>, -% {ok, Socket} = gen_udp:open(0, [binary]), -% ClientId = ?CLIENTID, -% send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), -% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), -% send_willtopic_msg(Socket, WillTopic, QoS), -% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), -% send_willmsg_msg(Socket, WillPayload), -% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), -% -% % goto asleep state -% send_disconnect_msg(Socket, SleepDuration), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% timer:sleep(100), -% -% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -% %% send connect message, and goto connected state -% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -% send_connect_msg(Socket, ClientId), -% ?assertEqual(<<3, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, receive_response(Socket)), -% -% timer:sleep(1500), -% % asleep timer should get timeout -% -% timer:sleep(4000), -% % keepalive timer should get timeout -% gen_udp:close(Socket). -% -%t_awake_test02_to_disconnected(_) -> -% QoS = 1, -% Keepalive_Duration = 3, -% SleepDuration = 1, -% WillTopic = <<"dead">>, -% WillPayload = <<10, 11, 12, 13, 14>>, -% {ok, Socket} = gen_udp:open(0, [binary]), -% ClientId = ?CLIENTID, -% send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), -% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), -% send_willtopic_msg(Socket, WillTopic, QoS), -% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), -% send_willmsg_msg(Socket, WillPayload), -% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), -% -% -% % goto asleep state -% send_disconnect_msg(Socket, SleepDuration), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% timer:sleep(100), -% -% % goto awake state -% send_pingreq_msg(Socket, ClientId), -% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), -% -% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -% %% send disconnect message, and goto disconnected state -% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -% send_disconnect_msg(Socket, undefined), -% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), -% -% timer:sleep(100), -% % it is a normal termination, no will message will be send -% -% gen_udp:close(Socket). +t_asleep_test05_to_awake_qos1_dl_msg(_) -> + QoS = 1, + Duration = 5, + WillTopic = <<"dead">>, + WillPayload = <<10, 11, 12, 13, 14>>, + {ok, Socket} = gen_udp:open(0, [binary]), + ClientId = ?CLIENTID, + send_connect_msg_with_will(Socket, Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + send_willmsg_msg(Socket, WillPayload), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + % subscribe + TopicName1 = <<"u/+/w">>, + MsgId1 = 25, + TopicId0 = 0, + WillBit = 0, + Dup = 0, + Retain = 0, + CleanSession = 0, + ReturnCode = 0, + send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), + ?assertEqual( + <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId1:16, ReturnCode>>, + receive_response(Socket) + ), + + % goto asleep state + SleepDuration = 5, + send_disconnect_msg(Socket, SleepDuration), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + timer:sleep(300), + + %% send downlink data in asleep state. This message should be send to device once it wake up + Payload2 = <<55, 66, 77, 88, 99>>, + Payload3 = <<61, 71, 81>>, + Payload4 = <<100, 101, 102, 103, 104, 105, 106, 107>>, + TopicName_test5 = <<"u/v/w">>, + {ok, C} = emqtt:start_link(), + {ok, _} = emqtt:connect(C), + {ok, _} = emqtt:publish(C, TopicName_test5, Payload2, QoS), + timer:sleep(100), + {ok, _} = emqtt:publish(C, TopicName_test5, Payload3, QoS), + timer:sleep(100), + {ok, _} = emqtt:publish(C, TopicName_test5, Payload4, QoS), + timer:sleep(200), + ok = emqtt:disconnect(C), + timer:sleep(50), + + % goto awake state, receive downlink messages, and go back to asleep + send_pingreq_msg(Socket, ClientId), + + UdpData_reg = receive_response(Socket), + {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test5, UdpData_reg), + send_regack_msg(Socket, TopicIdNew, MsgId_reg), + + UdpData2 = receive_response(Socket), + MsgId2 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, CleanSession, + ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2), + send_puback_msg(Socket, TopicIdNew, MsgId2), + timer:sleep(50), + + UdpData3 = wrap_receive_response(Socket), + MsgId3 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, CleanSession, + ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3), + send_puback_msg(Socket, TopicIdNew, MsgId3), + timer:sleep(50), + + case receive_response(Socket) of + <<2,23>> -> ok; + UdpData4 -> + MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload4}, UdpData4), + send_puback_msg(Socket, TopicIdNew, MsgId4) + end, + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + + timer:sleep(8000), + ?assertEqual([], emqx_gateway_cm:lookup_by_clientid(mqttsn, ClientId)), + + gen_udp:close(Socket). + +t_asleep_test06_to_awake_qos2_dl_msg(_) -> + QoS = 2, + Duration = 1, + WillTopic = <<"dead">>, + WillPayload = <<10, 11, 12, 13, 14>>, + {ok, Socket} = gen_udp:open(0, [binary]), + ClientId = ?CLIENTID, + send_connect_msg_with_will(Socket, Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + send_willmsg_msg(Socket, WillPayload), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + % subscribe + TopicName_tom = <<"tom">>, + MsgId1 = 25, + WillBit = 0, + Dup = 0, + Retain = 0, + CleanSession = 0, + ReturnCode = 0, + send_register_msg(Socket, TopicName_tom, MsgId1), + timer:sleep(50), + TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)), + send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, + ?SN_NORMAL_TOPIC:2, TopicId_tom:16, MsgId1:16, ReturnCode>>, + receive_response(Socket)), + + % goto asleep state + SleepDuration = 5, + send_disconnect_msg(Socket, SleepDuration), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + timer:sleep(100), + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% send downlink data in asleep state. This message should be send to device once it wake up + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + Payload1 = <<55, 66, 77, 88, 99>>, + {ok, C} = emqtt:start_link(), + {ok, _} = emqtt:connect(C), + {ok, _} = emqtt:publish(C, TopicName_tom, Payload1, QoS), + timer:sleep(100), + ok = emqtt:disconnect(C), + timer:sleep(300), + + % goto awake state, receive downlink messages, and go back to asleep + send_pingreq_msg(Socket, ClientId), + + UdpData = wrap_receive_response(Socket), + MsgId_udp = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, CleanSession, + ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData), + send_pubrec_msg(Socket, MsgId_udp), + ?assertMatch(<<_:8, ?SN_PUBREL:8, _/binary>>, receive_response(Socket)), + send_pubcomp_msg(Socket, MsgId_udp), + + %% verify the pingresp is received after receiving all the buffered qos2 msgs + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + + timer:sleep(8000), + ?assertEqual([], emqx_gateway_cm:lookup_by_clientid(mqttsn, ClientId)), + + gen_udp:close(Socket). + +t_asleep_test07_to_connected(_) -> + QoS = 1, + Keepalive_Duration = 10, + SleepDuration = 1, + WillTopic = <<"dead">>, + WillPayload = <<10, 11, 12, 13, 14>>, + {ok, Socket} = gen_udp:open(0, [binary]), + ClientId = ?CLIENTID, + send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + send_willmsg_msg(Socket, WillPayload), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + % subscribe + TopicName_tom = <<"tom">>, + MsgId1 = 25, + WillBit = 0, + Dup = 0, + Retain = 0, + CleanSession = 0, + ReturnCode = 0, + send_register_msg(Socket, TopicName_tom, MsgId1), + TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)), + send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1), + ?assertEqual( + <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId_tom:16, MsgId1:16, ReturnCode>>, + receive_response(Socket) + ), + + % goto asleep state + send_disconnect_msg(Socket, SleepDuration), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + timer:sleep(100), + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% send connect message, and goto connected state + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + send_connect_msg(Socket, ClientId), + ?assertEqual(<<3, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, receive_response(Socket)), + + timer:sleep(1500), + % asleep timer should get timeout, without any effect + ?assertEqual([], emqx_gateway_cm:lookup_by_clientid(mqttsn, ClientId)), + gen_udp:close(Socket). + +t_asleep_test08_to_disconnected(_) -> + QoS = 1, + Keepalive_Duration = 3, + SleepDuration = 1, + WillTopic = <<"dead">>, + WillPayload = <<10, 11, 12, 13, 14>>, + {ok, Socket} = gen_udp:open(0, [binary]), + ClientId = ?CLIENTID, + send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + send_willmsg_msg(Socket, WillPayload), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + % goto asleep state + send_disconnect_msg(Socket, SleepDuration), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + timer:sleep(100), + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% send disconnect message, and goto disconnected state + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + send_disconnect_msg(Socket, undefined), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + timer:sleep(8000), + % it is a normal termination, without will message + ?assertEqual([], emqx_gateway_cm:lookup_by_clientid(mqttsn, ClientId)), + + gen_udp:close(Socket). + +t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> + QoS = 1, + Duration = 5, + WillTopic = <<"dead">>, + WillPayload = <<10, 11, 12, 13, 14>>, + {ok, Socket} = gen_udp:open(0, [binary]), + ClientId = ?CLIENTID, + send_connect_msg_with_will(Socket, Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + send_willmsg_msg(Socket, WillPayload), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + % subscribe + TopicName1 = <<"u/+/k">>, + MsgId1 = 25, + TopicId0 = 0, + WillBit = 0, + Dup = 0, + Retain = 0, + CleanSession = 0, + ReturnCode = 0, + send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), + ?assertEqual( + <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId1:16, ReturnCode>>, + receive_response(Socket) + ), + % goto asleep state + SleepDuration = 5, + send_disconnect_msg(Socket, SleepDuration), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + timer:sleep(200), + + %% send downlink data in asleep state. This message should be send to device once it wake up + Payload2 = <<55, 66, 77, 88, 99>>, + Payload3 = <<61, 71, 81>>, + Payload4 = <<100, 101, 102, 103, 104, 105, 106, 107>>, + TopicName_test9 = <<"u/v/k">>, + {ok, C} = emqtt:start_link(), + {ok, _} = emqtt:connect(C), + {ok, _} = emqtt:publish(C, TopicName_test9, Payload2, QoS), + timer:sleep(100), + {ok, _} = emqtt:publish(C, TopicName_test9, Payload3, QoS), + timer:sleep(100), + {ok, _} = emqtt:publish(C, TopicName_test9, Payload4, QoS), + timer:sleep(200), + ok = emqtt:disconnect(C), + + % goto awake state, receive downlink messages, and go back to asleep + send_pingreq_msg(Socket, ClientId), + + UdpData_reg = receive_response(Socket), + {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test9, UdpData_reg), + send_regack_msg(Socket, TopicIdNew, MsgId_reg), + + case wrap_receive_response(Socket) of + udp_receive_timeout -> + ok; + UdpData2 -> + MsgId2 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, CleanSession, + ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2), + send_puback_msg(Socket, TopicIdNew, MsgId2) + end, + timer:sleep(100), + + case wrap_receive_response(Socket) of + udp_receive_timeout -> + ok; + UdpData3 -> + MsgId3 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, CleanSession, + ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3), + send_puback_msg(Socket, TopicIdNew, MsgId3) + end, + timer:sleep(100), + + case wrap_receive_response(Socket) of + udp_receive_timeout -> + ok; + UdpData4 -> + MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload4}, UdpData4), + send_puback_msg(Socket, TopicIdNew, MsgId4) + end, + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + + %% send PINGREQ again to enter awake state + send_pingreq_msg(Socket, ClientId), + %% will not receive any buffered PUBLISH messages buffered before last + %% awake, only receive PINGRESP here + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + + timer:sleep(10000), + ?assertEqual([], emqx_gateway_cm:lookup_by_clientid(mqttsn, ClientId)), + + gen_udp:close(Socket). + +t_awake_test01_to_connected(_) -> + QoS = 1, + Keepalive_Duration = 3, + SleepDuration = 1, + WillTopic = <<"dead">>, + WillPayload = <<10, 11, 12, 13, 14>>, + {ok, Socket} = gen_udp:open(0, [binary]), + ClientId = ?CLIENTID, + send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + send_willmsg_msg(Socket, WillPayload), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + % goto asleep state + send_disconnect_msg(Socket, SleepDuration), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + timer:sleep(100), + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% send connect message, and goto connected state + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + send_connect_msg(Socket, ClientId), + ?assertEqual(<<3, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, receive_response(Socket)), + + timer:sleep(1500), + % asleep timer should get timeout + ?assertEqual([], emqx_gateway_cm:lookup_by_clientid(mqttsn, ClientId)), + gen_udp:close(Socket). + +t_awake_test02_to_disconnected(_) -> + QoS = 1, + Keepalive_Duration = 3, + SleepDuration = 1, + WillTopic = <<"dead">>, + WillPayload = <<10, 11, 12, 13, 14>>, + {ok, Socket} = gen_udp:open(0, [binary]), + ClientId = ?CLIENTID, + send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + send_willmsg_msg(Socket, WillPayload), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + % goto asleep state + send_disconnect_msg(Socket, SleepDuration), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + timer:sleep(100), + + % goto awake state + send_pingreq_msg(Socket, ClientId), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% send disconnect message, and goto disconnected state + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + send_disconnect_msg(Socket, undefined), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + + %% asleep timeout + timer:sleep(8000), + ?assertEqual([], emqx_gateway_cm:lookup_by_clientid(mqttsn, ClientId)), + + gen_udp:close(Socket). t_broadcast_test1(_) -> {ok, Socket} = gen_udp:open( 0, [binary]), @@ -2289,17 +2302,6 @@ receive_response(Socket, Timeout) -> udp_receive_timeout end. -receive_emqttc_response() -> - receive - {mqttc, _From, Data2} -> - Data2; - {publish, Topic, Payload} -> - {publish, Topic, Payload}; - Other -> {unexpected_emqttc_data, Other} - after 2000 -> - emqttc_receive_timeout - end. - check_dispatched_message(Dup, QoS, Retain, TopicIdType, TopicId, Payload, Socket) -> PubMsg = receive_response(Socket), Length = 7 + byte_size(Payload),