Merge pull request #7300 from HJianBo/mqtt-sn-resume-subs

feat(mqttsn): introduce subs_resume option
This commit is contained in:
JianBo He 2022-03-17 17:51:41 +08:00 committed by GitHub
commit 72e37dd144
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 544 additions and 123 deletions

View File

@ -27,6 +27,7 @@ File format:
* Add UTF-8 string validity check in `strict_mode` for MQTT packet.
When set to true, invalid UTF-8 strings will cause the client to be disconnected. i.e. client ID, topic name. [#7261]
* Changed systemd service restart delay from 10 seconds to 60 seconds.
* MQTT-SN gateway supports initiative to synchronize registered topics after session resumed. [#7300]
* Add load control app for future development.
### Bug fixes

View File

@ -51,3 +51,10 @@ mqtt.sn.username = mqtt_sn_user
##
## Value: String
mqtt.sn.password = abc
## Whether to initiate all subscribed topic registration messages to the
## client after the Session has been taken over by a new channel.
##
## Value: Boolean
## Default: false
#mqtt.sn.subs_resume = false

View File

@ -56,6 +56,11 @@ end}.
{datatype, string}
]}.
{mapping, "mqtt.sn.subs_resume", "emqx_sn.subs_resume", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqx_sn.username", fun(Conf) ->
Username = cuttlefish:conf_get("mqtt.sn.username", Conf),
list_to_binary(Username)

View File

@ -1,26 +1,52 @@
%% -*- mode: erlang -*-
{VSN,
[
{<<"4\\.3\\.[4-5]">>,[
{"4.3.5",[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.5"]}}
]},
{<<"4.3.[2-3]">>,[
{"4.3.4",[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.4"]}}
]},
{"4.3.3",[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.3"]}}
]},
{"4.3.2",[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.2"]}}
]},
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
],
[
{<<"4\\.3\\.[4-5]">>,[
{"4.3.5",[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.5"]}}
]},
{<<"4.3.[2-3]">>,[
{"4.3.4",[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.4"]}}
]},
{"4.3.3",[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.3"]}}
]},
{"4.3.2",[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.2"]}}
]},
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
]}.

View File

@ -122,12 +122,14 @@ listeners_confs() ->
EnableQos3 = application:get_env(emqx_sn, enable_qos3, false),
EnableStats = application:get_env(emqx_sn, enable_stats, false),
IdleTimeout = application:get_env(emqx_sn, idle_timeout, 30000),
SubsResume = application:get_env(emqx_sn, subs_resume, false),
[{udp, ListenOn, [{gateway_id, GwId},
{username, Username},
{password, Password},
{enable_qos3, EnableQos3},
{enable_stats, EnableStats},
{idle_timeout, IdleTimeout},
{subs_resume, SubsResume},
{max_connections, 1024000},
{max_conn_rate, 1000},
{udp_options, []}]}].

View File

@ -339,9 +339,9 @@ format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) ->
format(?SN_PINGREQ_MSG(ClientId)) ->
io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]);
format(?SN_PINGRESP_MSG()) ->
"SN_PINGREQ()";
"SN_PINGRESP()";
format(?SN_DISCONNECT_MSG(Duration)) ->
io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]);
io_lib:format("SN_DISCONNECT(Duration=~w)", [Duration]);
format(#mqtt_sn_message{type = Type, variable = Var}) ->
io_lib:format("mqtt_sn_message(type=~s, Var=~w)",

View File

@ -48,6 +48,7 @@
, wait_for_will_topic/3
, wait_for_will_msg/3
, connected/3
, registering/3
, asleep/3
, awake/3
]).
@ -96,7 +97,10 @@
has_pending_pingresp = false :: boolean(),
%% Store all qos0 messages for waiting REGACK
%% Note: QoS1/QoS2 messages will kept inflight queue
pending_topic_ids = #{} :: pending_msgs()
pending_topic_ids = #{} :: pending_msgs(),
subs_resume = false,
waiting_sync_topics = [],
previous_outgoings_and_state = undefined
}).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]).
@ -126,6 +130,9 @@
Reason =:= asleep_timeout;
Reason =:= keepalive_timeout).
-define(RETRY_TIMEOUT, 5000).
-define(MAX_RETRY_TIMES, 3).
%%--------------------------------------------------------------------
%% Exported APIs
%%--------------------------------------------------------------------
@ -152,6 +159,7 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
Password = proplists:get_value(password, Options, undefined),
EnableQos3 = proplists:get_value(enable_qos3, Options, false),
IdleTimeout = proplists:get_value(idle_timeout, Options, 30000),
SubsResume = proplists:get_value(subs_resume, Options, false),
EnableStats = proplists:get_value(enable_stats, Options, false),
case inet:sockname(Sock) of
{ok, Sockname} ->
@ -168,7 +176,8 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
asleep_timer = emqx_sn_asleep_timer:init(),
enable_stats = EnableStats,
enable_qos3 = EnableQos3,
idle_timeout = IdleTimeout
idle_timeout = IdleTimeout,
subs_resume = SubsResume
},
emqx_logger:set_metadata_peername(esockd:format(Peername)),
{ok, idle, State, [IdleTimeout]};
@ -379,6 +388,13 @@ connected(cast, {outgoing, Packet}, State) ->
connected(cast, {connack, ConnAck}, State) ->
{keep_state, handle_outgoing(ConnAck, State)};
connected(cast, {register, TopicNames, BlockedOutgoins}, State) ->
NState = State#state{
waiting_sync_topics = TopicNames,
previous_outgoings_and_state = {BlockedOutgoins, ?FUNCTION_NAME}
},
{next_state, registering, NState, [next_event(shooting)]};
connected(cast, {shutdown, Reason, Packet}, State) ->
stop(Reason, handle_outgoing(Packet, State));
@ -392,6 +408,80 @@ connected(cast, {close, Reason}, State) ->
connected(EventType, EventContent, State) ->
handle_event(EventType, EventContent, connected, State).
registering(cast, shooting,
State = #state{
channel = Channel,
waiting_sync_topics = [],
previous_outgoings_and_state = {Outgoings, StateName}}) ->
Session = emqx_channel:get_session(Channel),
ClientInfo = emqx_channel:info(clientinfo, Channel),
{Outgoings2, NChannel} =
case emqx_session:dequeue(ClientInfo, Session) of
{ok, NSession} ->
{[], emqx_channel:set_session(NSession, Channel)};
{ok, Pubs, NSession} ->
emqx_channel:do_deliver(
Pubs,
emqx_channel:set_session(NSession, Channel)
)
end,
NState = State#state{
channel = NChannel,
previous_outgoings_and_state = undefined},
{next_state, StateName, NState, outgoing_events(Outgoings ++ Outgoings2)};
registering(cast, shooting,
State = #state{
clientid = ClientId,
waiting_sync_topics = [TopicName | Remainings]}) ->
TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName),
NState = send_register(
TopicName,
TopicId,
16#FFFF, %% FIXME: msgid ?
State#state{waiting_sync_topics = [{TopicId, TopicName, 0} | Remainings]}
),
{keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}};
registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED)},
State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) ->
?LOG(debug, "Register topic name ~s with id ~w successfully!", [TopicName, TopicId]),
{keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]};
registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)},
State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) ->
?LOG(error, "client does not accept register TopicName=~s, TopicId=~p, MsgId=~p, ReturnCode=~p",
[TopicName, TopicId, MsgId, ReturnCode]),
{keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]};
registering(cast, {incoming, Packet},
State = #state{previous_outgoings_and_state = {_, StateName}})
when is_record(Packet, mqtt_sn_message) ->
apply(?MODULE, StateName, [cast, {incoming, Packet}, State]);
registering({timeout, wait_regack}, _,
State = #state{waiting_sync_topics = [{TopicId, TopicName, Times} | Remainings]})
when Times < ?MAX_RETRY_TIMES ->
?LOG(warning, "Waiting REGACK timeout for TopicName=~s, TopicId=~w, try it again(~w)",
[TopicName, TopicId, Times+1]),
NState = send_register(
TopicName,
TopicId,
16#FFFF, %% FIXME: msgid?
State#state{waiting_sync_topics = [{TopicId, TopicName, Times + 1} | Remainings]}
),
{keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}};
registering({timeout, wait_regack}, _,
State = #state{waiting_sync_topics = [{TopicId, TopicName, ?MAX_RETRY_TIMES} | _]}) ->
?LOG(error, "Retry register TopicName=~s, TopicId=~w reached the max retry times",
[TopicId, TopicName]),
NState = send_message(?SN_DISCONNECT_MSG(undefined), State),
stop(reached_max_retry_times, NState);
registering(EventType, EventContent, State) ->
handle_event(EventType, EventContent, ?FUNCTION_NAME, State).
asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) ->
State0 = send_message(?SN_DISCONNECT_MSG(undefined), State),
case Duration of
@ -519,10 +609,13 @@ handle_event(info, {datagram, SockPid, Data}, StateName,
stop(frame_error, State)
end;
handle_event(info, {deliver, _Topic, Msg}, asleep,
State = #state{channel = Channel}) ->
handle_event(info, {deliver, _Topic, Msg}, StateName,
State = #state{channel = Channel})
when StateName == alseep;
StateName == registering ->
% section 6.14, Support of sleeping clients
?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]),
?LOG(debug, "enqueue downlink message in ~s state, msg: ~0p",
[StateName, Msg]),
Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel),
Msg, emqx_channel:get_session(Channel)),
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
@ -593,8 +686,31 @@ terminate(Reason, _StateName, #state{channel = Channel}) ->
emqx_channel:terminate(Reason, Channel),
ok.
code_change(_Vsn, StateName, State, _Extra) ->
{ok, StateName, State}.
%% in the emqx_sn:v4.3.6, we have added two new fields in the state last:
%% - waiting_sync_topics
%% - previous_outgoings_and_state
code_change({down, _Vsn}, StateName, State, [ToVsn]) ->
case re:run(ToVsn, "4\\.3\\.[2-5]") of
{match, _} ->
NState0 = lists:droplast(
lists:droplast(
lists:droplast(tuple_to_list(State)))),
NState = list_to_tuple(NState0),
{ok, StateName, NState};
_ ->
{ok, StateName, State}
end;
code_change(_Vsn, StateName, State, [FromVsn]) ->
case re:run(FromVsn, "4\\.3\\.[2-5]") of
{match, _} ->
NState = list_to_tuple(
tuple_to_list(State) ++ [false, [], undefined]
),
{ok, StateName, NState};
_ ->
{ok, StateName, State}
end.
%%--------------------------------------------------------------------
%% Handle Call/Info
@ -643,6 +759,9 @@ outgoing_event(Packet) when is_record(Packet, mqtt_packet);
outgoing_event(Action) ->
next_event(Action).
next_event(Content) ->
{next_event, cast, Content}.
close_socket(State = #state{sockstate = closed}) -> State;
close_socket(State = #state{socket = _Socket}) ->
%ok = gen_udp:close(Socket),
@ -1058,6 +1177,38 @@ handle_incoming(#mqtt_packet{variable = #mqtt_packet_puback{}} = Packet, awake,
Result = channel_handle_in(Packet, State),
handle_return(Result, State, [try_goto_asleep]);
handle_incoming(
#mqtt_packet{
variable = #mqtt_packet_connect{
clean_start = false}
} = Packet,
_,
State = #state{subs_resume = SubsResume}) ->
Result = channel_handle_in(Packet, State),
case {SubsResume, Result} of
{true, {ok, Replies, NChannel}} ->
case maps:get(
subscriptions,
emqx_channel:info(session, NChannel)
) of
Subs when map_size(Subs) == 0 ->
handle_return(Result, State);
Subs ->
TopicNames = lists:filter(
fun(T) -> not emqx_topic:wildcard(T)
end, maps:keys(Subs)),
{ConnackEvents, Outgoings} = split_connack_replies(
Replies),
Events = outgoing_events(
ConnackEvents ++
[{register, TopicNames, Outgoings}]
),
{keep_state, State#state{channel = NChannel}, Events}
end;
_ ->
handle_return(Result, State)
end;
handle_incoming(Packet, _StName, State) ->
Result = channel_handle_in(Packet, State),
handle_return(Result, State).
@ -1167,9 +1318,6 @@ inc_outgoing_stats(Type) ->
false -> ok
end.
next_event(Content) ->
{next_event, cast, Content}.
inc_counter(Key, Inc) ->
_ = emqx_pd:inc_counter(Key, Inc),
ok.
@ -1183,3 +1331,8 @@ maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) ->
State;
maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) ->
send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State).
%% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}]
split_connack_replies([A = {event, connected},
B = {connack, _ConnAck} | Outgoings]) ->
{[A, B], Outgoings}.

View File

@ -79,6 +79,12 @@ set_special_confs(emqx_sn) ->
set_special_confs(_App) ->
ok.
restart_emqx_sn(#{subs_resume := Bool}) ->
application:set_env(emqx_sn, subs_resume, Bool),
_ = application:stop(emqx_sn),
_ = application:ensure_all_started(emqx_sn),
ok.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
@ -862,108 +868,6 @@ t_delivery_qos1_register_invalid_topic_id(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_delivery_takeover_and_re_register(_) ->
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#00100000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
_ = emqx:publish(
emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
%% offline messages will be queued into the MQTT-SN session
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% qos1
%% received the resume messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED),
%% qos2
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_disconnect_msg(NSocket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
gen_udp:close(NSocket).
t_will_case01(_) ->
QoS = 1,
Duration = 1,
@ -1725,6 +1629,324 @@ t_broadcast_test1(_) ->
timer:sleep(600),
gen_udp:close(Socket).
t_register_subs_resume_on(_) ->
restart_emqx_sn(#{subs_resume => true}),
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"test-b">>)),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
%% offline messages will be queued into the MQTT-SN session
_ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-a">>, <<"m3">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-b">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% receive subs register requests
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
%% receive the queued messages
<<_, ?SN_PUBLISH, 2#00000000,
TopicIdA:16, 0:16, "m1">> = receive_response(NSocket),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdA:16, MsgIdA2:16, "m3">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdA2),
<<_, ?SN_PUBREL, MsgIdA2:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdA2),
<<_, ?SN_PUBLISH, 2#00000000,
TopicIdB:16, 0:16, "m1">> = receive_response(NSocket),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdB:16, MsgIdB1:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB2:16, "m3">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdB2),
<<_, ?SN_PUBREL, MsgIdB2:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdB2),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
gen_udp:close(NSocket),
{ok, NSocket1} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket1, <<"test">>),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket1)),
send_disconnect_msg(NSocket1, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
gen_udp:close(NSocket1),
restart_emqx_sn(#{subs_resume => false}).
t_register_subs_resume_off(_) ->
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#00100000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
_ = emqx:publish(
emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
%% offline messages will be queued into the MQTT-SN session
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% qos1
%% received the resume messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED),
%% qos2
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
gen_udp:close(NSocket),
{ok, NSocket1} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket1, <<"test">>),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket1)),
send_disconnect_msg(NSocket1, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
gen_udp:close(NSocket1).
t_register_skip_failure_topic_name_and_reach_max_retry_times(_) ->
restart_emqx_sn(#{subs_resume => true}),
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% receive subs register requests
%% registered failured topic-name will be skipped
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_INVALID_TOPIC_ID),
%% the gateway try to shutdown this client if it reached max-retry-times
%%
%% times-0
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
%% times-1
timer:sleep(5000), %% RETYRY_TIMEOUT
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
%% times-2
timer:sleep(5000), %% RETYRY_TIMEOUT
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
%% just a ping
send_pingreq_msg(NSocket, <<"test">>),
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(NSocket)),
%% times-3
timer:sleep(5000), %% RETYRY_TIMEOUT
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
%% shutdown due to reached max retry times
timer:sleep(5000), %% RETYRY_TIMEOUT
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
gen_udp:close(NSocket),
restart_emqx_sn(#{subs_resume => false}).
t_register_enqueue_delivering_messages(_) ->
restart_emqx_sn(#{subs_resume => true}),
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
emqx_logger:set_log_level(debug),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% receive subs register requests
%% registered failured topic-name will be skipped
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
_ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_ACCEPTED),
%% receive the queued messages
<<_, ?SN_PUBLISH, 2#00000000,
TopicIdA:16, 0:16, "m1">> = receive_response(NSocket),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
gen_udp:close(NSocket),
{ok, NSocket1} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket1, <<"test">>),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket1)),
send_disconnect_msg(NSocket1, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
gen_udp:close(NSocket1),
restart_emqx_sn(#{subs_resume => false}).
%%--------------------------------------------------------------------
%% Helper funcs
%%--------------------------------------------------------------------
@ -1816,9 +2038,12 @@ send_register_msg(Socket, TopicName, MsgId) ->
ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket).
send_regack_msg(Socket, TopicId, MsgId) ->
send_regack_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED).
send_regack_msg(Socket, TopicId, MsgId, Rc) ->
Length = 7,
MsgType = ?SN_REGACK,
Packet = <<Length:8, MsgType:8, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
Packet = <<Length:8, MsgType:8, TopicId:16, MsgId:16, Rc>>,
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) ->

View File

@ -324,6 +324,8 @@ process_old_action({add_module, Module}) ->
[Module];
process_old_action({delete_module, Module}) ->
[Module];
process_old_action({update, Module, _Change}) ->
[Module];
process_old_action(LoadModule) when is_tuple(LoadModule) andalso
element(1, LoadModule) =:= load_module ->
element(2, LoadModule);