Cache publishes before receiving the REGACK (#4695)

* refactor(emqx_sn): return new state from send_message

* fix(emqx_sn): send publish only after regack received
This commit is contained in:
Shawn 2021-04-30 15:31:34 +08:00 committed by GitHub
parent 3c103ae546
commit cb31d66bf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 127 additions and 104 deletions

View File

@ -67,6 +67,8 @@
-type(maybe(T) :: T | undefined). -type(maybe(T) :: T | undefined).
-type(pending_msgs() :: #{integer() => [#mqtt_sn_message{}]}).
-record(will_msg, {retain = false :: boolean(), -record(will_msg, {retain = false :: boolean(),
qos = ?QOS_0 :: emqx_mqtt_types:qos(), qos = ?QOS_0 :: emqx_mqtt_types:qos(),
topic :: maybe(binary()), topic :: maybe(binary()),
@ -92,7 +94,8 @@
stats_timer :: maybe(reference()), stats_timer :: maybe(reference()),
idle_timeout :: integer(), idle_timeout :: integer(),
enable_qos3 = false :: boolean(), enable_qos3 = false :: boolean(),
has_pending_pingresp = false :: boolean() has_pending_pingresp = false :: boolean(),
pending_topic_ids = #{} :: pending_msgs()
}). }).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]).
@ -180,8 +183,8 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
callback_mode() -> state_functions. callback_mode() -> state_functions.
idle(cast, {incoming, ?SN_SEARCHGW_MSG(_Radius)}, State = #state{gwid = GwId}) -> idle(cast, {incoming, ?SN_SEARCHGW_MSG(_Radius)}, State = #state{gwid = GwId}) ->
send_message(?SN_GWINFO_MSG(GwId, <<>>), State), State0 = send_message(?SN_GWINFO_MSG(GwId, <<>>), State),
{keep_state, State, State#state.idle_timeout}; {keep_state, State0, State0#state.idle_timeout};
idle(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) -> idle(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, State) ->
#mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags, #mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags,
@ -221,12 +224,10 @@ idle(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) ->
handle_ping(PingReq, State); handle_ping(PingReq, State);
idle(cast, {outgoing, Packet}, State) -> idle(cast, {outgoing, Packet}, State) ->
ok = handle_outgoing(Packet, State), {keep_state, handle_outgoing(Packet, State)};
{keep_state, State};
idle(cast, {connack, ConnAck}, State) -> idle(cast, {connack, ConnAck}, State) ->
ok = handle_outgoing(ConnAck, State), {next_state, connected, handle_outgoing(ConnAck, State)};
{next_state, connected, State};
idle(timeout, _Timeout, State) -> idle(timeout, _Timeout, State) ->
stop(idle_timeout, State); stop(idle_timeout, State);
@ -245,8 +246,8 @@ wait_for_will_topic(cast, {incoming, ?SN_WILLTOPIC_EMPTY_MSG}, State = #state{co
wait_for_will_topic(cast, {incoming, ?SN_WILLTOPIC_MSG(Flags, Topic)}, State) -> wait_for_will_topic(cast, {incoming, ?SN_WILLTOPIC_MSG(Flags, Topic)}, State) ->
#mqtt_sn_flags{qos = QoS, retain = Retain} = Flags, #mqtt_sn_flags{qos = QoS, retain = Retain} = Flags,
WillMsg = #will_msg{retain = Retain, qos = QoS, topic = Topic}, WillMsg = #will_msg{retain = Retain, qos = QoS, topic = Topic},
send_message(?SN_WILLMSGREQ_MSG(), State), State0 = send_message(?SN_WILLMSGREQ_MSG(), State),
{next_state, wait_for_will_msg, State#state{will_msg = WillMsg}}; {next_state, wait_for_will_msg, State0#state{will_msg = WillMsg}};
wait_for_will_topic(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, _State) -> wait_for_will_topic(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, _State) ->
% ignore % ignore
@ -256,12 +257,10 @@ wait_for_will_topic(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration,
do_2nd_connect(Flags, Duration, ClientId, State); do_2nd_connect(Flags, Duration, ClientId, State);
wait_for_will_topic(cast, {outgoing, Packet}, State) -> wait_for_will_topic(cast, {outgoing, Packet}, State) ->
ok = handle_outgoing(Packet, State), {keep_state, handle_outgoing(Packet, State)};
{keep_state, State};
wait_for_will_topic(cast, {connack, ConnAck}, State) -> wait_for_will_topic(cast, {connack, ConnAck}, State) ->
ok = handle_outgoing(ConnAck, State), {next_state, connected, handle_outgoing(ConnAck, State)};
{next_state, connected, State};
wait_for_will_topic(cast, Event, _State) -> wait_for_will_topic(cast, Event, _State) ->
?LOG(error, "wait_for_will_topic UNEXPECTED Event: ~p", [Event]), ?LOG(error, "wait_for_will_topic UNEXPECTED Event: ~p", [Event]),
@ -284,18 +283,17 @@ wait_for_will_msg(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, Cl
do_2nd_connect(Flags, Duration, ClientId, State); do_2nd_connect(Flags, Duration, ClientId, State);
wait_for_will_msg(cast, {outgoing, Packet}, State) -> wait_for_will_msg(cast, {outgoing, Packet}, State) ->
ok = handle_outgoing(Packet, State), {keep_state, handle_outgoing(Packet, State)};
{keep_state, State};
wait_for_will_msg(cast, {connack, ConnAck}, State) -> wait_for_will_msg(cast, {connack, ConnAck}, State) ->
ok = handle_outgoing(ConnAck, State), {next_state, connected, handle_outgoing(ConnAck, State)};
{next_state, connected, State};
wait_for_will_msg(EventType, EventContent, State) -> wait_for_will_msg(EventType, EventContent, State) ->
handle_event(EventType, EventContent, wait_for_will_msg, State). handle_event(EventType, EventContent, wait_for_will_msg, State).
connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)}, connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)},
State = #state{clientid = ClientId, registry = Registry}) -> State = #state{clientid = ClientId, registry = Registry}) ->
State0 =
case emqx_sn_registry:register_topic(Registry, self(), TopicName) of case emqx_sn_registry:register_topic(Registry, self(), TopicName) of
TopicId when is_integer(TopicId) -> TopicId when is_integer(TopicId) ->
?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]), ?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]),
@ -307,7 +305,7 @@ connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)},
?LOG(error, "wildcard topic can not be registered! ClientId=~p, TopicName=~p", [ClientId, TopicName]), ?LOG(error, "wildcard topic can not be registered! ClientId=~p, TopicName=~p", [ClientId, TopicName]),
send_message(?SN_REGACK_MSG(?SN_INVALID_TOPIC_ID, MsgId, ?SN_RC_NOT_SUPPORTED), State) send_message(?SN_REGACK_MSG(?SN_INVALID_TOPIC_ID, MsgId, ?SN_RC_NOT_SUPPORTED), State)
end, end,
{keep_state, State}; {keep_state, State0};
connected(cast, {incoming, ?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)}, connected(cast, {incoming, ?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)},
State = #state{enable_qos3 = EnableQoS3}) -> State = #state{enable_qos3 = EnableQoS3}) ->
@ -339,19 +337,19 @@ connected(cast, {incoming, ?SN_UNSUBSCRIBE_MSG(Flags, MsgId, TopicId)}, State) -
connected(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) -> connected(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) ->
handle_ping(PingReq, State); handle_ping(PingReq, State);
connected(cast, {incoming, ?SN_REGACK_MSG(_TopicId, _MsgId, ?SN_RC_ACCEPTED)}, State) -> connected(cast, {incoming, ?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED)}, State) ->
{keep_state, State}; {keep_state, replay_no_reg_pending_publishes(TopicId, State)};
connected(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, State) -> connected(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, State) ->
?LOG(error, "client does not accept register TopicId=~p, MsgId=~p, ReturnCode=~p", ?LOG(error, "client does not accept register TopicId=~p, MsgId=~p, ReturnCode=~p",
[TopicId, MsgId, ReturnCode]), [TopicId, MsgId, ReturnCode]),
{keep_state, State}; {keep_state, State};
connected(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) -> connected(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) ->
ok = send_message(?SN_DISCONNECT_MSG(undefined), State), State0 = send_message(?SN_DISCONNECT_MSG(undefined), State),
case Duration of case Duration of
undefined -> undefined ->
handle_incoming(?DISCONNECT_PACKET(), State); handle_incoming(?DISCONNECT_PACKET(), State0);
_Other -> goto_asleep_state(Duration, State) _Other -> goto_asleep_state(Duration, State0)
end; end;
connected(cast, {incoming, ?SN_WILLTOPICUPD_MSG(Flags, Topic)}, State = #state{will_msg = WillMsg}) -> connected(cast, {incoming, ?SN_WILLTOPICUPD_MSG(Flags, Topic)}, State = #state{will_msg = WillMsg}) ->
@ -359,12 +357,12 @@ connected(cast, {incoming, ?SN_WILLTOPICUPD_MSG(Flags, Topic)}, State = #state{w
undefined -> undefined; undefined -> undefined;
_ -> update_will_topic(WillMsg, Flags, Topic) _ -> update_will_topic(WillMsg, Flags, Topic)
end, end,
send_message(?SN_WILLTOPICRESP_MSG(0), State), State0 = send_message(?SN_WILLTOPICRESP_MSG(0), State),
{keep_state, State#state{will_msg = WillMsg1}}; {keep_state, State0#state{will_msg = WillMsg1}};
connected(cast, {incoming, ?SN_WILLMSGUPD_MSG(Payload)}, State = #state{will_msg = WillMsg}) -> connected(cast, {incoming, ?SN_WILLMSGUPD_MSG(Payload)}, State = #state{will_msg = WillMsg}) ->
ok = send_message(?SN_WILLMSGRESP_MSG(0), State), State0 = send_message(?SN_WILLMSGRESP_MSG(0), State),
{keep_state, State#state{will_msg = update_will_msg(WillMsg, Payload)}}; {keep_state, State0#state{will_msg = update_will_msg(WillMsg, Payload)}};
connected(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, State) -> connected(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, State) ->
% ignore % ignore
@ -374,17 +372,14 @@ connected(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}
do_2nd_connect(Flags, Duration, ClientId, State); do_2nd_connect(Flags, Duration, ClientId, State);
connected(cast, {outgoing, Packet}, State) -> connected(cast, {outgoing, Packet}, State) ->
ok = handle_outgoing(Packet, State), {keep_state, handle_outgoing(Packet, State)};
{keep_state, State};
%% XXX: It's so strange behavoir!!! %% XXX: It's so strange behavoir!!!
connected(cast, {connack, ConnAck}, State) -> connected(cast, {connack, ConnAck}, State) ->
ok = handle_outgoing(ConnAck, State), {keep_state, handle_outgoing(ConnAck, State)};
{keep_state, State};
connected(cast, {shutdown, Reason, Packet}, State) -> connected(cast, {shutdown, Reason, Packet}, State) ->
ok = handle_outgoing(Packet, State), stop(Reason, handle_outgoing(Packet, State));
stop(Reason, State);
connected(cast, {shutdown, Reason}, State) -> connected(cast, {shutdown, Reason}, State) ->
stop(Reason, State); stop(Reason, State);
@ -397,12 +392,12 @@ connected(EventType, EventContent, State) ->
handle_event(EventType, EventContent, connected, State). handle_event(EventType, EventContent, connected, State).
asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) -> asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) ->
ok = send_message(?SN_DISCONNECT_MSG(undefined), State), State0 = send_message(?SN_DISCONNECT_MSG(undefined), State),
case Duration of case Duration of
undefined -> undefined ->
handle_incoming(?DISCONNECT_PACKET(), State); handle_incoming(?DISCONNECT_PACKET(), State0);
_Other -> _Other ->
goto_asleep_state(Duration, State) goto_asleep_state(Duration, State0)
end; end;
asleep(cast, {incoming, ?SN_PINGREQ_MSG(undefined)}, State) -> asleep(cast, {incoming, ?SN_PINGREQ_MSG(undefined)}, State) ->
@ -411,13 +406,13 @@ asleep(cast, {incoming, ?SN_PINGREQ_MSG(undefined)}, State) ->
asleep(cast, {incoming, ?SN_PINGREQ_MSG(ClientIdPing)}, asleep(cast, {incoming, ?SN_PINGREQ_MSG(ClientIdPing)},
State = #state{clientid = ClientId, channel = Channel}) -> State = #state{clientid = ClientId, channel = Channel}) ->
inc_ping_counter(),
case ClientIdPing of case ClientIdPing of
ClientId -> ClientId ->
inc_ping_counter(),
case emqx_session:replay(emqx_channel:get_session(Channel)) of case emqx_session:replay(emqx_channel:get_session(Channel)) of
{ok, [], Session0} -> {ok, [], Session0} ->
send_message(?SN_PINGRESP_MSG(), State), State0 = send_message(?SN_PINGRESP_MSG(), State),
{keep_state, State#state{ {keep_state, State0#state{
channel = emqx_channel:set_session(Session0, Channel)}}; channel = emqx_channel:set_session(Session0, Channel)}};
{ok, Publishes, Session0} -> {ok, Publishes, Session0} ->
{Packets, Channel1} = emqx_channel:do_deliver(Publishes, {Packets, Channel1} = emqx_channel:do_deliver(Publishes,
@ -449,14 +444,13 @@ asleep(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}
% keepalive timer may timeout in asleep state and delete itself, need to restart keepalive % keepalive timer may timeout in asleep state and delete itself, need to restart keepalive
% TODO: Fixme later. % TODO: Fixme later.
%% self() ! {keepalive, start, Interval}, %% self() ! {keepalive, start, Interval},
send_connack(State), {next_state, connected, send_connack(State)};
{next_state, connected, State};
asleep(EventType, EventContent, State) -> asleep(EventType, EventContent, State) ->
handle_event(EventType, EventContent, asleep, State). handle_event(EventType, EventContent, asleep, State).
awake(cast, {incoming, ?SN_REGACK_MSG(_TopicId, _MsgId, ?SN_RC_ACCEPTED)}, State) -> awake(cast, {incoming, ?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED)}, State) ->
{keep_state, State}; {keep_state, replay_no_reg_pending_publishes(TopicId, State)};
awake(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, State) -> awake(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, State) ->
?LOG(error, "client does not accept register TopicId=~p, MsgId=~p, ReturnCode=~p", ?LOG(error, "client does not accept register TopicId=~p, MsgId=~p, ReturnCode=~p",
@ -467,8 +461,7 @@ awake(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) ->
handle_ping(PingReq, State); handle_ping(PingReq, State);
awake(cast, {outgoing, Packet}, State) -> awake(cast, {outgoing, Packet}, State) ->
ok = handle_outgoing(Packet, State), {keep_state, handle_outgoing(Packet, State)};
{keep_state, State};
awake(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)}, State) -> awake(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)}, State) ->
do_puback(TopicId, MsgId, ReturnCode, awake, State); do_puback(TopicId, MsgId, ReturnCode, awake, State);
@ -482,8 +475,8 @@ awake(cast, try_goto_asleep, State=#state{channel = Channel,
Inflight = emqx_session:info(inflight, emqx_channel:get_session(Channel)), Inflight = emqx_session:info(inflight, emqx_channel:get_session(Channel)),
case emqx_inflight:size(Inflight) of case emqx_inflight:size(Inflight) of
0 when PingPending =:= true -> 0 when PingPending =:= true ->
send_message(?SN_PINGRESP_MSG(), State), State0 = send_message(?SN_PINGRESP_MSG(), State),
goto_asleep_state(State#state{has_pending_pingresp = false}); goto_asleep_state(State0#state{has_pending_pingresp = false});
0 when PingPending =:= false -> 0 when PingPending =:= false ->
goto_asleep_state(State); goto_asleep_state(State);
_Size -> _Size ->
@ -499,13 +492,13 @@ handle_event({call, From}, Req, _StateName, State) ->
gen_server:reply(From, Reply), gen_server:reply(From, Reply),
{keep_state, NState}; {keep_state, NState};
{stop, Reason, Reply, NState} -> {stop, Reason, Reply, NState} ->
case NState#state.sockstate of State0 = case NState#state.sockstate of
running -> running ->
send_message(?SN_DISCONNECT_MSG(undefined), NState); send_message(?SN_DISCONNECT_MSG(undefined), NState);
_ -> ok _ -> NState
end, end,
gen_server:reply(From, Reply), gen_server:reply(From, Reply),
stop(Reason, NState) stop(Reason, State0)
end; end;
handle_event(info, {datagram, SockPid, Data}, StateName, handle_event(info, {datagram, SockPid, Data}, StateName,
@ -526,9 +519,10 @@ handle_event(info, {datagram, SockPid, Data}, StateName,
end; end;
handle_event(info, {deliver, _Topic, Msg}, asleep, handle_event(info, {deliver, _Topic, Msg}, asleep,
State = #state{channel = Channel}) -> State = #state{channel = Channel, pending_topic_ids = Pendings}) ->
% section 6.14, Support of sleeping clients % section 6.14, Support of sleeping clients
?LOG(debug, "enqueue downlink message in asleep state Msg=~0p", [Msg]), ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p",
[Msg, Pendings]),
Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)), Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)),
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
@ -537,8 +531,7 @@ handle_event(info, Deliver = {deliver, _Topic, _Msg}, _StateName,
handle_return(emqx_channel:handle_deliver([Deliver], Channel), State); handle_return(emqx_channel:handle_deliver([Deliver], Channel), State);
handle_event(info, {redeliver, {?PUBREL, MsgId}}, _StateName, State) -> handle_event(info, {redeliver, {?PUBREL, MsgId}}, _StateName, State) ->
send_message(?SN_PUBREC_MSG(?SN_PUBREL, MsgId), State), {keep_state, send_message(?SN_PUBREC_MSG(?SN_PUBREL, MsgId), State)};
{keep_state, State};
%% FIXME: Is not unused in v4.x %% FIXME: Is not unused in v4.x
handle_event(info, {timeout, TRef, emit_stats}, _StateName, handle_event(info, {timeout, TRef, emit_stats}, _StateName,
@ -634,8 +627,7 @@ handle_return({shutdown, Reason, NChannel}, State, _AddEvents) ->
stop(Reason, State#state{channel = NChannel}); stop(Reason, State#state{channel = NChannel});
handle_return({shutdown, Reason, OutPacket, NChannel}, State, _AddEvents) -> handle_return({shutdown, Reason, OutPacket, NChannel}, State, _AddEvents) ->
NState = State#state{channel = NChannel}, NState = State#state{channel = NChannel},
ok = handle_outgoing(OutPacket, NState), stop(Reason, handle_outgoing(OutPacket, NState)).
stop(Reason, NState).
outgoing_events(Actions) -> outgoing_events(Actions) ->
lists:map(fun outgoing_event/1, Actions). lists:map(fun outgoing_event/1, Actions).
@ -702,9 +694,9 @@ call(Pid, Req, Timeout) ->
%% Internal Functions %% Internal Functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_ping(_PingReq, State) -> handle_ping(_PingReq, State) ->
ok = send_message(?SN_PINGRESP_MSG(), State), State0 = send_message(?SN_PINGRESP_MSG(), State),
inc_ping_counter(), inc_ping_counter(),
{keep_state, State}. {keep_state, State0}.
inc_ping_counter() -> inc_ping_counter() ->
inc_counter(recv_msg, 1). inc_counter(recv_msg, 1).
@ -768,13 +760,13 @@ send_connack(State) ->
send_message(?SN_CONNACK_MSG(?SN_RC_ACCEPTED), State). send_message(?SN_CONNACK_MSG(?SN_RC_ACCEPTED), State).
send_message(Msg = #mqtt_sn_message{type = Type}, send_message(Msg = #mqtt_sn_message{type = Type},
#state{sockpid = SockPid, peername = Peername}) -> State = #state{sockpid = SockPid, peername = Peername}) ->
?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)]), ?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)]),
inc_outgoing_stats(Type), inc_outgoing_stats(Type),
Data = emqx_sn_frame:serialize(Msg), Data = emqx_sn_frame:serialize(Msg),
ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)), ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)),
SockPid ! {datagram, Peername, Data}, SockPid ! {datagram, Peername, Data},
ok. State.
goto_asleep_state(State) -> goto_asleep_state(State) ->
goto_asleep_state(undefined, State). goto_asleep_state(undefined, State).
@ -834,8 +826,8 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
properties = OnlyOneInflight properties = OnlyOneInflight
}, },
case WillFlag of case WillFlag of
true -> send_message(?SN_WILLTOPICREQ_MSG(), State), true -> State0 = send_message(?SN_WILLTOPICREQ_MSG(), State),
NState = State#state{connpkt = ConnPkt, NState = State0#state{connpkt = ConnPkt,
clientid = ClientId, clientid = ClientId,
keepalive_interval = Duration keepalive_interval = Duration
}, },
@ -872,11 +864,11 @@ handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
State=#state{registry = Registry}) -> State=#state{registry = Registry}) ->
case emqx_sn_registry:register_topic(Registry, self(), TopicName) of case emqx_sn_registry:register_topic(Registry, self(), TopicName) of
{error, too_large} -> {error, too_large} ->
ok = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
?SN_INVALID_TOPIC_ID, ?SN_INVALID_TOPIC_ID,
MsgId, MsgId,
?SN_RC_INVALID_TOPIC_ID), State), ?SN_RC_INVALID_TOPIC_ID), State),
{keep_state, State}; {keep_state, State0};
{error, wildcard_topic} -> {error, wildcard_topic} ->
proto_subscribe(TopicName, QoS, MsgId, ?SN_INVALID_TOPIC_ID, State); proto_subscribe(TopicName, QoS, MsgId, ?SN_INVALID_TOPIC_ID, State);
NewTopicId when is_integer(NewTopicId) -> NewTopicId when is_integer(NewTopicId) ->
@ -887,11 +879,11 @@ handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId,
State = #state{registry = Registry}) -> State = #state{registry = Registry}) ->
case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
undefined -> undefined ->
ok = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
TopicId, TopicId,
MsgId, MsgId,
?SN_RC_INVALID_TOPIC_ID), State), ?SN_RC_INVALID_TOPIC_ID), State),
{next_state, connected, State}; {next_state, connected, State0};
PredefinedTopic -> PredefinedTopic ->
proto_subscribe(PredefinedTopic, QoS, MsgId, TopicId, State) proto_subscribe(PredefinedTopic, QoS, MsgId, TopicId, State)
end; end;
@ -904,11 +896,11 @@ handle_subscribe(?SN_SHORT_TOPIC, TopicId, QoS, MsgId, State) ->
proto_subscribe(TopicName, QoS, MsgId, ?SN_INVALID_TOPIC_ID, State); proto_subscribe(TopicName, QoS, MsgId, ?SN_INVALID_TOPIC_ID, State);
handle_subscribe(_, _TopicId, QoS, MsgId, State) -> handle_subscribe(_, _TopicId, QoS, MsgId, State) ->
ok = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
?SN_INVALID_TOPIC_ID, ?SN_INVALID_TOPIC_ID,
MsgId, MsgId,
?SN_RC_INVALID_TOPIC_ID), State), ?SN_RC_INVALID_TOPIC_ID), State),
{keep_state, State}. {keep_state, State0}.
handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) -> handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) ->
proto_unsubscribe(TopicId, MsgId, State); proto_unsubscribe(TopicId, MsgId, State);
@ -917,8 +909,7 @@ handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId,
State = #state{registry = Registry}) -> State = #state{registry = Registry}) ->
case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
undefined -> undefined ->
ok = send_message(?SN_UNSUBACK_MSG(MsgId), State), {keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)};
{keep_state, State};
PredefinedTopic -> PredefinedTopic ->
proto_unsubscribe(PredefinedTopic, MsgId, State) proto_unsubscribe(PredefinedTopic, MsgId, State)
end; end;
@ -931,8 +922,7 @@ handle_unsubscribe(?SN_SHORT_TOPIC, TopicId, MsgId, State) ->
proto_unsubscribe(TopicName, MsgId, State); proto_unsubscribe(TopicName, MsgId, State);
handle_unsubscribe(_, _TopicId, MsgId, State) -> handle_unsubscribe(_, _TopicId, MsgId, State) ->
send_message(?SN_UNSUBACK_MSG(MsgId), State), {keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)}.
{keep_state, State}.
do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) -> do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) ->
%% XXX: Handle normal topic id as predefined topic id, to be compatible with paho mqtt-sn library %% XXX: Handle normal topic id as predefined topic id, to be compatible with paho mqtt-sn library
@ -944,25 +934,26 @@ do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
NewQoS = get_corrected_qos(QoS), NewQoS = get_corrected_qos(QoS),
case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
undefined -> undefined ->
(NewQoS =/= ?QOS_0) andalso send_message(?SN_PUBACK_MSG(TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID), State), {keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID,
{keep_state, State}; State)};
TopicName -> TopicName ->
proto_publish(TopicName, Data, Dup, NewQoS, Retain, MsgId, TopicId, State) proto_publish(TopicName, Data, Dup, NewQoS, Retain, MsgId, TopicId, State)
end; end;
do_publish(?SN_SHORT_TOPIC, STopicName, Data, Flags, MsgId, State) -> do_publish(?SN_SHORT_TOPIC, STopicName, Data, Flags, MsgId, State) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS), NewQoS = get_corrected_qos(QoS),
<<TopicId:16>> = STopicName , <<TopicId:16>> = STopicName ,
case emqx_topic:wildcard(STopicName) of case emqx_topic:wildcard(STopicName) of
true -> true ->
(NewQoS =/= ?QOS_0) andalso send_message(?SN_PUBACK_MSG(TopicId, MsgId, ?SN_RC_NOT_SUPPORTED), State), {keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_NOT_SUPPORTED,
{keep_state, State}; State)};
false -> false ->
proto_publish(STopicName, Data, Dup, NewQoS, Retain, MsgId, TopicId, State) proto_publish(STopicName, Data, Dup, NewQoS, Retain, MsgId, TopicId, State)
end; end;
do_publish(_, TopicId, _Data, #mqtt_sn_flags{qos = QoS}, MsgId, State) -> do_publish(_, TopicId, _Data, #mqtt_sn_flags{qos = QoS}, MsgId, State) ->
(QoS =/= ?QOS_0) andalso send_message(?SN_PUBACK_MSG(TopicId, MsgId, ?SN_RC_NOT_SUPPORTED), State), {keep_state, maybe_send_puback(QoS, TopicId, MsgId, ?SN_RC_NOT_SUPPORTED,
{keep_state, State}. State)}.
do_publish_will(#state{will_msg = undefined}) -> do_publish_will(#state{will_msg = undefined}) ->
ok; ok;
@ -986,12 +977,11 @@ do_puback(TopicId, MsgId, ReturnCode, StateName,
handle_incoming(?PUBACK_PACKET(MsgId), StateName, State); handle_incoming(?PUBACK_PACKET(MsgId), StateName, State);
?SN_RC_INVALID_TOPIC_ID -> ?SN_RC_INVALID_TOPIC_ID ->
case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
undefined -> ok; undefined -> {keep_state, State};
TopicName -> TopicName ->
%%notice that this TopicName maybe normal or predefined, %%notice that this TopicName maybe normal or predefined,
%% involving the predefined topic name in register to enhance the gateway's robustness even inconsistent with MQTT-SN channels %% involving the predefined topic name in register to enhance the gateway's robustness even inconsistent with MQTT-SN channels
send_register(TopicName, TopicId, MsgId, State), {keep_state, send_register(TopicName, TopicId, MsgId, State)}
{keep_state, State}
end; end;
_ -> _ ->
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]), ?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
@ -1070,30 +1060,45 @@ channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) ->
emqx_channel:handle_in(Packet, Channel). emqx_channel:handle_in(Packet, Channel).
handle_outgoing(Packets, State) when is_list(Packets) -> handle_outgoing(Packets, State) when is_list(Packets) ->
lists:foreach(fun(Packet) -> handle_outgoing(Packet, State) end, Packets); lists:foldl(fun(Packet, State0) ->
handle_outgoing(Packet, State0)
end, State, Packets);
handle_outgoing(PubPkt = ?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload), handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _),
State = #state{registry = Registry}) -> State = #state{registry = Registry}) ->
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, ?LOG(debug, "Handle outgoing publish: ~0p", [PubPkt]),
MsgId = message_id(PacketId), TopicId = emqx_sn_registry:lookup_topic_id(Registry, self(), TopicName),
?LOG(debug, "Handle outgoing: ~0p", [PubPkt]), case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of
true -> register_and_notify_client(PubPkt, State);
(emqx_sn_registry:lookup_topic_id(Registry, self(), TopicName) == undefined) false -> send_message(mqtt2sn(PubPkt, State), State)
andalso (byte_size(TopicName) =/= 2) end;
andalso register_and_notify_client(TopicName, Payload, Dup, QoS,
Retain, MsgId, State),
send_message(mqtt2sn(PubPkt, State), State);
handle_outgoing(Packet, State) -> handle_outgoing(Packet, State) ->
send_message(mqtt2sn(Packet, State), State). send_message(mqtt2sn(Packet, State), State).
register_and_notify_client(TopicName, Payload, Dup, QoS, Retain, MsgId, cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State) ->
State = #state{registry = Registry}) -> ?LOG(debug, "cache non-registered publish message for topic-id: ~p, msg: ~0p, pendings: ~0p",
[TopicId, PubPkt, Pendings]),
Msgs = maps:get(pending_topic_ids, Pendings, []),
Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}.
replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} = State0) ->
?LOG(debug, "replay non-registered publish message for topic-id: ~p, pendings: ~0p",
[TopicId, Pendings]),
State = lists:foldl(fun(Msg, State1) ->
send_message(Msg, State1)
end, State0, maps:get(TopicId, Pendings, [])),
State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}.
register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt,
State = #state{registry = Registry, pending_topic_ids = Pendings}) ->
MsgId = message_id(PacketId),
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
TopicId = emqx_sn_registry:register_topic(Registry, self(), TopicName), TopicId = emqx_sn_registry:register_topic(Registry, self(), TopicName),
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, " ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, "
"Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
send_register(TopicName, TopicId, MsgId, State). NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State),
send_register(TopicName, TopicId, MsgId, State#state{pending_topic_ids = NewPendings}).
message_id(undefined) -> message_id(undefined) ->
rand:uniform(16#FFFF); rand:uniform(16#FFFF);
@ -1126,3 +1131,8 @@ append(Replies, AddEvents) when is_list(Replies) ->
Replies ++ AddEvents; Replies ++ AddEvents;
append(Replies, AddEvents) -> append(Replies, AddEvents) ->
[Replies] ++ AddEvents. [Replies] ++ AddEvents.
maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) ->
State;
maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) ->
send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State).

View File

@ -1102,10 +1102,12 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
%% send downlink data in asleep state. This message should be send to device once it wake up %% send downlink data in asleep state. This message should be send to device once it wake up
Payload1 = <<55, 66, 77, 88, 99>>, Payload1 = <<55, 66, 77, 88, 99>>,
Payload2 = <<55, 66, 77, 88, 100>>,
{ok, C} = emqtt:start_link(), {ok, C} = emqtt:start_link(),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
{ok, _} = emqtt:publish(C, <<"a/b/c">>, Payload1, QoS), {ok, _} = emqtt:publish(C, <<"a/b/c">>, Payload1, QoS),
{ok, _} = emqtt:publish(C, <<"a/b/c">>, Payload2, QoS),
timer:sleep(100), timer:sleep(100),
ok = emqtt:disconnect(C), ok = emqtt:disconnect(C),
@ -1114,21 +1116,32 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
% goto awake state, receive downlink messages, and go back to asleep % goto awake state, receive downlink messages, and go back to asleep
send_pingreq_msg(Socket, <<"test">>), send_pingreq_msg(Socket, <<"test">>),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 1. get REGISTER first, since this topic has never been registered
%% get REGISTER first, since this topic has never been registered UdpData1 = receive_response(Socket),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% {TopicIdNew, MsgId3} = check_register_msg_on_udp(<<"a/b/c">>, UdpData1),
UdpData2 = receive_response(Socket),
{TopicIdNew, MsgId3} = check_register_msg_on_udp(<<"a/b/c">>, UdpData2), %% 2. but before we reply the REGACK, the sn-gateway should not send any PUBLISH
?assertError(_, receive_publish(Socket)),
send_regack_msg(Socket, TopicIdNew, MsgId3), send_regack_msg(Socket, TopicIdNew, MsgId3),
UdpData = receive_response(Socket), UdpData2 = receive_response(Socket),
MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData), MsgId_udp2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData2),
send_puback_msg(Socket, TopicIdNew, MsgId_udp), send_puback_msg(Socket, TopicIdNew, MsgId_udp2),
UdpData3 = receive_response(Socket),
MsgId_udp3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData3),
send_puback_msg(Socket, TopicIdNew, MsgId_udp3),
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
gen_udp:close(Socket). gen_udp:close(Socket).
receive_publish(Socket) ->
UdpData3 = receive_response(Socket, 1000),
<<HeaderUdp:5/binary, _:16, _/binary>> = UdpData3,
<<_:8, ?SN_PUBLISH, _/binary>> = HeaderUdp.
t_asleep_test05_to_awake_qos1_dl_msg(_) -> t_asleep_test05_to_awake_qos1_dl_msg(_) ->
QoS = 1, QoS = 1,
Duration = 5, Duration = 5,