feat(mqttsn): introduce subs_resume option

As the mqtt-sn v1.2 spec metioned, the gateway will be able to sync the
subscriptions topic-name registry to client when the client resume it's
session

port from: https://github.com/emqx/emqx-sn/pull/195
This commit is contained in:
JianBo He 2022-03-14 17:44:42 +08:00
parent eb5d9fa501
commit 3201d11212
4 changed files with 151 additions and 9 deletions

View File

@ -51,3 +51,10 @@ mqtt.sn.username = mqtt_sn_user
## ##
## Value: String ## Value: String
mqtt.sn.password = abc mqtt.sn.password = abc
## Whether to initiate all subscribed topic registration messages to the
## client after the Session has been taken over by a new channel.
##
## Value: Boolean
## Default: false
#mqtt.sn.subs_resume = false

View File

@ -56,6 +56,11 @@ end}.
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "mqtt.sn.subs_resume", "emqx_sn.subs_resume", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqx_sn.username", fun(Conf) -> {translation, "emqx_sn.username", fun(Conf) ->
Username = cuttlefish:conf_get("mqtt.sn.username", Conf), Username = cuttlefish:conf_get("mqtt.sn.username", Conf),
list_to_binary(Username) list_to_binary(Username)

View File

@ -339,9 +339,9 @@ format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) ->
format(?SN_PINGREQ_MSG(ClientId)) -> format(?SN_PINGREQ_MSG(ClientId)) ->
io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]); io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]);
format(?SN_PINGRESP_MSG()) -> format(?SN_PINGRESP_MSG()) ->
"SN_PINGREQ()"; "SN_PINGRESP()";
format(?SN_DISCONNECT_MSG(Duration)) -> format(?SN_DISCONNECT_MSG(Duration)) ->
io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]); io_lib:format("SN_DISCONNECT(Duration=~w)", [Duration]);
format(#mqtt_sn_message{type = Type, variable = Var}) -> format(#mqtt_sn_message{type = Type, variable = Var}) ->
io_lib:format("mqtt_sn_message(type=~s, Var=~w)", io_lib:format("mqtt_sn_message(type=~s, Var=~w)",

View File

@ -48,6 +48,7 @@
, wait_for_will_topic/3 , wait_for_will_topic/3
, wait_for_will_msg/3 , wait_for_will_msg/3
, connected/3 , connected/3
, registering/3
, asleep/3 , asleep/3
, awake/3 , awake/3
]). ]).
@ -96,7 +97,9 @@
has_pending_pingresp = false :: boolean(), has_pending_pingresp = false :: boolean(),
%% Store all qos0 messages for waiting REGACK %% Store all qos0 messages for waiting REGACK
%% Note: QoS1/QoS2 messages will kept inflight queue %% Note: QoS1/QoS2 messages will kept inflight queue
pending_topic_ids = #{} :: pending_msgs() pending_topic_ids = #{} :: pending_msgs(),
waiting_sync_topics = [],
previous_outgoings_and_state = undefined
}). }).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]).
@ -126,6 +129,9 @@
Reason =:= asleep_timeout; Reason =:= asleep_timeout;
Reason =:= keepalive_timeout). Reason =:= keepalive_timeout).
-define(RETRY_TIMEOUT, 5000).
-define(MAX_RETRY_TIMES, 3).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Exported APIs %% Exported APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -379,6 +385,13 @@ connected(cast, {outgoing, Packet}, State) ->
connected(cast, {connack, ConnAck}, State) -> connected(cast, {connack, ConnAck}, State) ->
{keep_state, handle_outgoing(ConnAck, State)}; {keep_state, handle_outgoing(ConnAck, State)};
connected(cast, {register, TopicNames, BlockedOutgoins}, State) ->
NState = State#state{
waiting_sync_topics = TopicNames,
previous_outgoings_and_state = {BlockedOutgoins, ?FUNCTION_NAME}
},
{next_state, registering, NState, [next_event(shooting)]};
connected(cast, {shutdown, Reason, Packet}, State) -> connected(cast, {shutdown, Reason, Packet}, State) ->
stop(Reason, handle_outgoing(Packet, State)); stop(Reason, handle_outgoing(Packet, State));
@ -392,6 +405,80 @@ connected(cast, {close, Reason}, State) ->
connected(EventType, EventContent, State) -> connected(EventType, EventContent, State) ->
handle_event(EventType, EventContent, connected, State). handle_event(EventType, EventContent, connected, State).
registering(cast, shooting,
State = #state{
channel = Channel,
waiting_sync_topics = [],
previous_outgoings_and_state = {Outgoings, StateName}}) ->
Session = emqx_channel:get_session(Channel),
ClientInfo = emqx_channel:info(clientinfo, Channel),
{Outgoings2, NChannel} =
case emqx_session:dequeue(ClientInfo, Session) of
{ok, NSession} ->
{[], emqx_channel:set_session(NSession, Channel)};
{ok, Pubs, NSession} ->
emqx_channel:do_deliver(
Pubs,
emqx_channel:set_session(NSession, Channel)
)
end,
NState = State#state{
channel = NChannel,
previous_outgoings_and_state = undefined},
{next_state, StateName, NState, outgoing_events(Outgoings ++ Outgoings2)};
registering(cast, shooting,
State = #state{
clientid = ClientId,
waiting_sync_topics = [TopicName | Remainings]}) ->
TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName),
NState = send_register(
TopicName,
TopicId,
16#FFFF, %% FIXME: msgid ?
State#state{waiting_sync_topics = [{TopicId, TopicName, 0} | Remainings]}
),
{keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}};
registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED)},
State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) ->
?LOG(debug, "Register topic name ~s with id ~w successfully!", [TopicName, TopicId]),
{keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]};
registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)},
State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) ->
?LOG(error, "client does not accept register TopicName=~s, TopicId=~p, MsgId=~p, ReturnCode=~p",
[TopicName, TopicId, MsgId, ReturnCode]),
{keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]};
registering(cast, {incoming, Packet},
State = #state{previous_outgoings_and_state = {_, StateName}})
when is_record(Packet, mqtt_sn_message) ->
apply(?MODULE, StateName, [cast, {incoming, Packet}, State]);
registering({timeout, wait_regack}, _,
State = #state{waiting_sync_topics = [{TopicId, TopicName, Times} | Remainings]})
when Times < ?MAX_RETRY_TIMES ->
?LOG(warning, "Waiting REGACK timeout for TopicName=~s, TopicId=~w, try it again(~w)",
[TopicName, TopicId, Times+1]),
NState = send_register(
TopicName,
TopicId,
16#FFFF, %% FIXME: msgid?
State#state{waiting_sync_topics = [{TopicId, TopicName, Times + 1} | Remainings]}
),
{keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}};
registering({timeout, wait_regack}, _,
State = #state{waiting_sync_topics = [{TopicId, TopicName, ?MAX_RETRY_TIMES} | _]}) ->
?LOG(error, "Retry register TopicName=~s, TopicId=~w reached the max retry times",
[TopicId, TopicName]),
NState = send_message(?SN_DISCONNECT_MSG(undefined), State),
stop(reached_max_retry_times, NState);
registering(EventType, EventContent, State) ->
handle_event(EventType, EventContent, ?FUNCTION_NAME, State).
asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) -> asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) ->
State0 = send_message(?SN_DISCONNECT_MSG(undefined), State), State0 = send_message(?SN_DISCONNECT_MSG(undefined), State),
case Duration of case Duration of
@ -519,10 +606,13 @@ handle_event(info, {datagram, SockPid, Data}, StateName,
stop(frame_error, State) stop(frame_error, State)
end; end;
handle_event(info, {deliver, _Topic, Msg}, asleep, handle_event(info, {deliver, _Topic, Msg}, StateName,
State = #state{channel = Channel}) -> State = #state{channel = Channel})
when StateName == alseep;
StateName == registering ->
% section 6.14, Support of sleeping clients % section 6.14, Support of sleeping clients
?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]), ?LOG(debug, "enqueue downlink message in ~s state, msg: ~0p",
[StateName, Msg]),
Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel), Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel),
Msg, emqx_channel:get_session(Channel)), Msg, emqx_channel:get_session(Channel)),
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
@ -643,6 +733,9 @@ outgoing_event(Packet) when is_record(Packet, mqtt_packet);
outgoing_event(Action) -> outgoing_event(Action) ->
next_event(Action). next_event(Action).
next_event(Content) ->
{next_event, cast, Content}.
close_socket(State = #state{sockstate = closed}) -> State; close_socket(State = #state{sockstate = closed}) -> State;
close_socket(State = #state{socket = _Socket}) -> close_socket(State = #state{socket = _Socket}) ->
%ok = gen_udp:close(Socket), %ok = gen_udp:close(Socket),
@ -1058,6 +1151,38 @@ handle_incoming(#mqtt_packet{variable = #mqtt_packet_puback{}} = Packet, awake,
Result = channel_handle_in(Packet, State), Result = channel_handle_in(Packet, State),
handle_return(Result, State, [try_goto_asleep]); handle_return(Result, State, [try_goto_asleep]);
handle_incoming(
#mqtt_packet{
variable = #mqtt_packet_connect{
clean_start = false}
} = Packet,
_,
State) ->
Result = channel_handle_in(Packet, State),
case {subs_resume(), Result} of
{true, {ok, Replies, NChannel}} ->
case maps:get(
subscriptions,
emqx_channel:info(session, NChannel)
) of
Subs when map_size(Subs) == 0 ->
handle_return(Result, State);
Subs ->
TopicNames = lists:filter(
fun(T) -> not emqx_topic:wildcard(T)
end, maps:keys(Subs)),
{ConnackEvents, Outgoings} = split_connack_replies(
Replies),
Events = outgoing_events(
ConnackEvents ++
[{register, TopicNames, Outgoings}]
),
{keep_state, State#state{channel = NChannel}, Events}
end;
_ ->
handle_return(Result, State)
end;
handle_incoming(Packet, _StName, State) -> handle_incoming(Packet, _StName, State) ->
Result = channel_handle_in(Packet, State), Result = channel_handle_in(Packet, State),
handle_return(Result, State). handle_return(Result, State).
@ -1167,9 +1292,6 @@ inc_outgoing_stats(Type) ->
false -> ok false -> ok
end. end.
next_event(Content) ->
{next_event, cast, Content}.
inc_counter(Key, Inc) -> inc_counter(Key, Inc) ->
_ = emqx_pd:inc_counter(Key, Inc), _ = emqx_pd:inc_counter(Key, Inc),
ok. ok.
@ -1183,3 +1305,11 @@ maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) ->
State; State;
maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) -> maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) ->
send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State). send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State).
subs_resume() ->
application:get_env(emqx_sn, subs_resume, false).
%% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}]
split_connack_replies([A = {event, connected},
B = {connack, _ConnAck} | Outgoings]) ->
{[A, B], Outgoings}.