fix(mqtt_sn): refactor the log using logger.hrl

This commit is contained in:
Shawn 2021-04-09 12:23:56 +08:00
parent 6c57da31cb
commit 2b8c1efd1d
1 changed files with 54 additions and 41 deletions

View File

@ -21,6 +21,9 @@
-include("emqx_sn.hrl"). -include("emqx_sn.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[MQTT-SN]").
%% API. %% API.
-export([start_link/3]). -export([start_link/3]).
@ -97,8 +100,6 @@
-define(STAT_TIMEOUT, 10000). -define(STAT_TIMEOUT, 10000).
-define(IDLE_TIMEOUT, 30000). -define(IDLE_TIMEOUT, 30000).
-define(DEFAULT_CHAN_OPTIONS, [{max_packet_size, 256}, {zone, external}]). -define(DEFAULT_CHAN_OPTIONS, [{max_packet_size, 256}, {zone, external}]).
-define(LOG(Level, Format, Args, State),
emqx_logger:Level("MQTT-SN(~s): " ++ Format, [esockd:format(State#state.peername) | Args])).
-define(NEG_QOS_CLIENT_ID, <<"NegQoS-Client">>). -define(NEG_QOS_CLIENT_ID, <<"NegQoS-Client">>).
@ -113,6 +114,12 @@
conn_mod => ?MODULE conn_mod => ?MODULE
}). }).
-define(is_non_error_reason(Reason),
Reason =:= normal;
Reason =:= idle_timeout;
Reason =:= asleep_timeout;
Reason =:= keepalive_timeout).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Exported APIs %% Exported APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -159,6 +166,7 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
enable_qos3 = EnableQos3, enable_qos3 = EnableQos3,
idle_timeout = IdleTimeout idle_timeout = IdleTimeout
}, },
emqx_logger:set_metadata_peername(esockd:format(Peername)),
{ok, idle, State, [IdleTimeout]}; {ok, idle, State, [IdleTimeout]};
{error, Reason} when Reason =:= enotconn; {error, Reason} when Reason =:= enotconn;
Reason =:= einval; Reason =:= einval;
@ -186,7 +194,7 @@ idle(cast, {incoming, ?SN_DISCONNECT_MSG(_Duration)}, State) ->
{keep_state, State, State#state.idle_timeout}; {keep_state, State, State#state.idle_timeout};
idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = #state{enable_qos3 = false}) -> idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = #state{enable_qos3 = false}) ->
?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in idle mode!", [], State), ?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in idle mode!"),
{keep_state, State#state.idle_timeout}; {keep_state, State#state.idle_timeout};
idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
@ -204,7 +212,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
false -> false ->
ok ok
end, end,
?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId], State), ?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId]),
{keep_state, State#state.idle_timeout}; {keep_state, State#state.idle_timeout};
idle(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) -> idle(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) ->
@ -253,8 +261,8 @@ wait_for_will_topic(cast, {connack, ConnAck}, State) ->
ok = handle_outgoing(ConnAck, State), ok = handle_outgoing(ConnAck, State),
{next_state, connected, State}; {next_state, connected, State};
wait_for_will_topic(cast, Event, State) -> wait_for_will_topic(cast, Event, _State) ->
?LOG(error, "wait_for_will_topic UNEXPECTED Event: ~p", [Event], State), ?LOG(error, "wait_for_will_topic UNEXPECTED Event: ~p", [Event]),
keep_state_and_data; keep_state_and_data;
wait_for_will_topic(EventType, EventContent, State) -> wait_for_will_topic(EventType, EventContent, State) ->
@ -288,13 +296,13 @@ connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)},
State = #state{clientid = ClientId, registry = Registry}) -> State = #state{clientid = ClientId, registry = Registry}) ->
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
TopicId when is_integer(TopicId) -> TopicId when is_integer(TopicId) ->
?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId], State), ?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]),
send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State); send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State);
{error, too_large} -> {error, too_large} ->
?LOG(error, "TopicId is full! ClientId=~p, TopicName=~p", [ClientId, TopicName], State), ?LOG(error, "TopicId is full! ClientId=~p, TopicName=~p", [ClientId, TopicName]),
send_message(?SN_REGACK_MSG(?SN_INVALID_TOPIC_ID, MsgId, ?SN_RC_NOT_SUPPORTED), State); send_message(?SN_REGACK_MSG(?SN_INVALID_TOPIC_ID, MsgId, ?SN_RC_NOT_SUPPORTED), State);
{error, wildcard_topic} -> {error, wildcard_topic} ->
?LOG(error, "wildcard topic can not be registered! ClientId=~p, TopicName=~p", [ClientId, TopicName], State), ?LOG(error, "wildcard topic can not be registered! ClientId=~p, TopicName=~p", [ClientId, TopicName]),
send_message(?SN_REGACK_MSG(?SN_INVALID_TOPIC_ID, MsgId, ?SN_RC_NOT_SUPPORTED), State) send_message(?SN_REGACK_MSG(?SN_INVALID_TOPIC_ID, MsgId, ?SN_RC_NOT_SUPPORTED), State)
end, end,
{keep_state, State}; {keep_state, State};
@ -305,7 +313,7 @@ connected(cast, {incoming, ?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)},
Skip = (EnableQoS3 =:= false) andalso (QoS =:= ?QOS_NEG1), Skip = (EnableQoS3 =:= false) andalso (QoS =:= ?QOS_NEG1),
case Skip of case Skip of
true -> true ->
?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in connected mode!", [], State), ?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in connected mode!"),
{keep_state, State}; {keep_state, State};
false -> false ->
do_publish(TopicIdType, TopicId, Data, Flags, MsgId, State) do_publish(TopicIdType, TopicId, Data, Flags, MsgId, State)
@ -333,7 +341,7 @@ connected(cast, {incoming, ?SN_REGACK_MSG(_TopicId, _MsgId, ?SN_RC_ACCEPTED)}, S
{keep_state, State}; {keep_state, State};
connected(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, State) -> connected(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, State) ->
?LOG(error, "client does not accept register TopicId=~p, MsgId=~p, ReturnCode=~p", ?LOG(error, "client does not accept register TopicId=~p, MsgId=~p, ReturnCode=~p",
[TopicId, MsgId, ReturnCode], State), [TopicId, MsgId, ReturnCode]),
{keep_state, State}; {keep_state, State};
connected(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) -> connected(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) ->
@ -380,7 +388,7 @@ connected(cast, {shutdown, Reason}, State) ->
stop(Reason, State); stop(Reason, State);
connected(cast, {close, Reason}, State) -> connected(cast, {close, Reason}, State) ->
?LOG(debug, "Force to close the socket due to ~p", [Reason], State), ?LOG(debug, "Force to close the socket due to ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State)); handle_info({sock_closed, Reason}, close_socket(State));
connected(EventType, EventContent, State) -> connected(EventType, EventContent, State) ->
@ -450,7 +458,7 @@ awake(cast, {incoming, ?SN_REGACK_MSG(_TopicId, _MsgId, ?SN_RC_ACCEPTED)}, State
awake(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, State) -> awake(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, State) ->
?LOG(error, "client does not accept register TopicId=~p, MsgId=~p, ReturnCode=~p", ?LOG(error, "client does not accept register TopicId=~p, MsgId=~p, ReturnCode=~p",
[TopicId, MsgId, ReturnCode], State), [TopicId, MsgId, ReturnCode]),
{keep_state, State}; {keep_state, State};
awake(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) -> awake(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) ->
@ -494,26 +502,25 @@ handle_event({call, From}, Req, _StateName, State) ->
handle_event(info, {datagram, SockPid, Data}, StateName, handle_event(info, {datagram, SockPid, Data}, StateName,
State = #state{sockpid = SockPid, channel = _Channel}) -> State = #state{sockpid = SockPid, channel = _Channel}) ->
?LOG(debug, "RECV ~p", [Data], State), ?LOG(debug, "RECV ~p", [Data]),
Oct = iolist_size(Data), Oct = iolist_size(Data),
inc_counter(recv_oct, Oct), inc_counter(recv_oct, Oct),
try emqx_sn_frame:parse(Data) of try emqx_sn_frame:parse(Data) of
{ok, Msg} -> {ok, Msg} ->
inc_counter(recv_cnt, 1), inc_counter(recv_cnt, 1),
?LOG(info, "RECV ~s at state ~s", ?LOG(info, "RECV ~s at state ~s", [emqx_sn_frame:format(Msg), StateName]),
[emqx_sn_frame:format(Msg), StateName], State),
{keep_state, State, next_event({incoming, Msg})} {keep_state, State, next_event({incoming, Msg})}
catch catch
error:Error:Stacktrace -> error:Error:Stacktrace ->
?LOG(info, "Parse frame error: ~p at state ~s, Stacktrace: ~p", ?LOG(info, "Parse frame error: ~p at state ~s, Stacktrace: ~p",
[Error, StateName, Stacktrace], State), [Error, StateName, Stacktrace]),
stop(frame_error, State) stop(frame_error, State)
end; end;
handle_event(info, {deliver, _Topic, Msg}, asleep, handle_event(info, {deliver, _Topic, Msg}, asleep,
State = #state{channel = Channel}) -> State = #state{channel = Channel}) ->
% section 6.14, Support of sleeping clients % section 6.14, Support of sleeping clients
?LOG(debug, "enqueue downlink message in asleep state Msg=~p", [Msg], State), ?LOG(debug, "enqueue downlink message in asleep state Msg=~p", [Msg]),
Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)), Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)),
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
@ -541,11 +548,11 @@ handle_event(info, {timeout, TRef, TMsg}, _StateName, State) ->
handle_timeout(TRef, TMsg, State); handle_timeout(TRef, TMsg, State);
handle_event(info, asleep_timeout, asleep, State) -> handle_event(info, asleep_timeout, asleep, State) ->
?LOG(debug, "asleep timer timeout, shutdown now", [], State), ?LOG(debug, "asleep timer timeout, shutdown now"),
stop(asleep_timeout, State); stop(asleep_timeout, State);
handle_event(info, asleep_timeout, StateName, State) -> handle_event(info, asleep_timeout, StateName, State) ->
?LOG(debug, "asleep timer timeout on StateName=~p, ignore it", [StateName], State), ?LOG(debug, "asleep timer timeout on StateName=~p, ignore it", [StateName]),
{keep_state, State}; {keep_state, State};
handle_event(cast, {close, Reason}, _StateName, State) -> handle_event(cast, {close, Reason}, _StateName, State) ->
@ -570,7 +577,7 @@ handle_event(cast, {event, _Other}, _StateName, State = #state{channel = Channel
handle_event(EventType, EventContent, StateName, State) -> handle_event(EventType, EventContent, StateName, State) ->
?LOG(error, "StateName: ~s, Unexpected Event: ~p", ?LOG(error, "StateName: ~s, Unexpected Event: ~p",
[StateName, {EventType, EventContent}], State), [StateName, {EventType, EventContent}]),
{keep_state, State}. {keep_state, State}.
terminate(Reason, _StateName, #state{clientid = ClientId, terminate(Reason, _StateName, #state{clientid = ClientId,
@ -753,8 +760,8 @@ send_connack(State) ->
send_message(?SN_CONNACK_MSG(?SN_RC_ACCEPTED), State). send_message(?SN_CONNACK_MSG(?SN_RC_ACCEPTED), State).
send_message(Msg = #mqtt_sn_message{type = Type}, send_message(Msg = #mqtt_sn_message{type = Type},
State = #state{sockpid = SockPid, peername = Peername}) -> #state{sockpid = SockPid, peername = Peername}) ->
?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)], State), ?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)]),
inc_outgoing_stats(Type), inc_outgoing_stats(Type),
Data = emqx_sn_frame:serialize(Msg), Data = emqx_sn_frame:serialize(Msg),
ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)), ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)),
@ -764,7 +771,7 @@ send_message(Msg = #mqtt_sn_message{type = Type},
goto_asleep_state(State) -> goto_asleep_state(State) ->
goto_asleep_state(undefined, State). goto_asleep_state(undefined, State).
goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer}) -> goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer}) ->
?LOG(debug, "goto_asleep_state Duration=~p", [Duration], State), ?LOG(debug, "goto_asleep_state Duration=~p", [Duration]),
NewTimer = emqx_sn_asleep_timer:ensure(Duration, AsleepTimer), NewTimer = emqx_sn_asleep_timer:ensure(Duration, AsleepTimer),
{next_state, asleep, State#state{asleep_timer = NewTimer}, hibernate}. {next_state, asleep, State#state{asleep_timer = NewTimer}, hibernate}.
@ -774,7 +781,7 @@ goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer}) ->
stop({shutdown, Reason}, State) -> stop({shutdown, Reason}, State) ->
stop(Reason, State); stop(Reason, State);
stop(Reason, State) -> stop(Reason, State) ->
?LOG(error, "stop due to ~p", [Reason], State), ?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]),
case Reason of case Reason of
%% FIXME: The Will-Msg should publish when a Session terminated! %% FIXME: The Will-Msg should publish when a Session terminated!
asleep_timeout -> do_publish_will(State); asleep_timeout -> do_publish_will(State);
@ -786,9 +793,14 @@ stop(Reason, State) ->
stop({shutdown, Reason}, Reply, State) -> stop({shutdown, Reason}, Reply, State) ->
stop(Reason, Reply, State); stop(Reason, Reply, State);
stop(Reason, Reply, State) -> stop(Reason, Reply, State) ->
?LOG(error, "stop due to ~p", [Reason], State), ?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]),
{stop, {shutdown, Reason}, Reply, State}. {stop, {shutdown, Reason}, Reply, State}.
stop_log_level(Reason) when ?is_non_error_reason(Reason) ->
debug;
stop_log_level(_) ->
error.
mqttsn_to_mqtt(?SN_PUBACK, MsgId) -> mqttsn_to_mqtt(?SN_PUBACK, MsgId) ->
?PUBACK_PACKET(MsgId); ?PUBACK_PACKET(MsgId);
mqttsn_to_mqtt(?SN_PUBREC, MsgId) -> mqttsn_to_mqtt(?SN_PUBREC, MsgId) ->
@ -799,6 +811,7 @@ mqttsn_to_mqtt(?SN_PUBCOMP, MsgId) ->
?PUBCOMP_PACKET(MsgId). ?PUBCOMP_PACKET(MsgId).
do_connect(ClientId, CleanStart, WillFlag, Duration, State) -> do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
emqx_logger:set_metadata_clientid(ClientId),
%% 6.6 Clients Publish Procedure %% 6.6 Clients Publish Procedure
%% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message %% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message
%% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange %% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange
@ -831,6 +844,7 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
clientid = OldClientId, clientid = OldClientId,
registry = Registry, registry = Registry,
channel = Channel}) -> channel = Channel}) ->
emqx_logger:set_metadata_clientid(ClientId),
#mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags, #mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags,
NChannel = case CleanStart of NChannel = case CleanStart of
true -> true ->
@ -920,7 +934,7 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) ->
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
State=#state{clientid = ClientId, registry = Registry}) -> State=#state{clientid = ClientId, registry = Registry}) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS, State), NewQoS = get_corrected_qos(QoS),
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined -> undefined ->
(NewQoS =/= ?QOS_0) andalso send_message(?SN_PUBACK_MSG(TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID), State), (NewQoS =/= ?QOS_0) andalso send_message(?SN_PUBACK_MSG(TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID), State),
@ -930,7 +944,7 @@ do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
end; end;
do_publish(?SN_SHORT_TOPIC, STopicName, Data, Flags, MsgId, State) -> do_publish(?SN_SHORT_TOPIC, STopicName, Data, Flags, MsgId, State) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS, State), NewQoS = get_corrected_qos(QoS),
<<TopicId:16>> = STopicName , <<TopicId:16>> = STopicName ,
case emqx_topic:wildcard(STopicName) of case emqx_topic:wildcard(STopicName) of
true -> true ->
@ -973,7 +987,7 @@ do_puback(TopicId, MsgId, ReturnCode, StateName,
{keep_state, State} {keep_state, State}
end; end;
_ -> _ ->
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode], State), ?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
{keep_state, State} {keep_state, State}
end. end.
@ -982,13 +996,13 @@ do_pubrec(PubRec, MsgId, State) ->
proto_subscribe(TopicName, QoS, MsgId, TopicId, State) -> proto_subscribe(TopicName, QoS, MsgId, TopicId, State) ->
?LOG(debug, "subscribe Topic=~p, MsgId=~p, TopicId=~p", ?LOG(debug, "subscribe Topic=~p, MsgId=~p, TopicId=~p",
[TopicName, MsgId, TopicId], State), [TopicName, MsgId, TopicId]),
enqueue_msgid(suback, MsgId, TopicId), enqueue_msgid(suback, MsgId, TopicId),
SubOpts = maps:put(qos, QoS, ?DEFAULT_SUBOPTS), SubOpts = maps:put(qos, QoS, ?DEFAULT_SUBOPTS),
handle_incoming(?SUBSCRIBE_PACKET(MsgId, [{TopicName, SubOpts}]), State). handle_incoming(?SUBSCRIBE_PACKET(MsgId, [{TopicName, SubOpts}]), State).
proto_unsubscribe(TopicName, MsgId, State) -> proto_unsubscribe(TopicName, MsgId, State) ->
?LOG(debug, "unsubscribe Topic=~p, MsgId=~p", [TopicName, MsgId], State), ?LOG(debug, "unsubscribe Topic=~p, MsgId=~p", [TopicName, MsgId]),
handle_incoming(?UNSUBSCRIBE_PACKET(MsgId, [TopicName]), State). handle_incoming(?UNSUBSCRIBE_PACKET(MsgId, [TopicName]), State).
proto_publish(TopicName, Data, Dup, QoS, Retain, MsgId, TopicId, State) -> proto_publish(TopicName, Data, Dup, QoS, Retain, MsgId, TopicId, State) ->
@ -996,7 +1010,7 @@ proto_publish(TopicName, Data, Dup, QoS, Retain, MsgId, TopicId, State) ->
Publish = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, dup = Dup, qos = QoS, retain = Retain}, Publish = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, dup = Dup, qos = QoS, retain = Retain},
variable = #mqtt_packet_publish{topic_name = TopicName, packet_id = MsgId}, variable = #mqtt_packet_publish{topic_name = TopicName, packet_id = MsgId},
payload = Data}, payload = Data},
?LOG(debug, "[publish] Msg: ~p~n", [Publish], State), ?LOG(debug, "[publish] Msg: ~p~n", [Publish]),
handle_incoming(Publish, State). handle_incoming(Publish, State).
update_will_topic(undefined, #mqtt_sn_flags{qos = QoS, retain = Retain}, Topic) -> update_will_topic(undefined, #mqtt_sn_flags{qos = QoS, retain = Retain}, Topic) ->
@ -1019,11 +1033,10 @@ dequeue_msgid(suback, MsgId) ->
dequeue_msgid(puback, MsgId) -> dequeue_msgid(puback, MsgId) ->
erase({puback, MsgId}). erase({puback, MsgId}).
get_corrected_qos(?QOS_NEG1, State) -> get_corrected_qos(?QOS_NEG1) ->
?LOG(debug, "Receive a publish with QoS=-1", [], State), ?LOG(debug, "Receive a publish with QoS=-1"),
?QOS_0; ?QOS_0;
get_corrected_qos(QoS) ->
get_corrected_qos(QoS, _State) ->
QoS. QoS.
get_topic_id(Type, MsgId) -> get_topic_id(Type, MsgId) ->
@ -1042,10 +1055,10 @@ handle_incoming(Packet, _StName, State) ->
Result = channel_handle_in(Packet, State), Result = channel_handle_in(Packet, State),
handle_return(Result, State). handle_return(Result, State).
channel_handle_in(Packet = ?PACKET(Type), State = #state{channel = Channel}) -> channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) ->
_ = inc_incoming_stats(Type), _ = inc_incoming_stats(Type),
ok = emqx_metrics:inc_recv(Packet), ok = emqx_metrics:inc_recv(Packet),
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)], State), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
emqx_channel:handle_in(Packet, Channel). emqx_channel:handle_in(Packet, Channel).
handle_outgoing(Packets, State) when is_list(Packets) -> handle_outgoing(Packets, State) when is_list(Packets) ->
@ -1055,7 +1068,7 @@ handle_outgoing(PubPkt = ?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload),
State = #state{clientid = ClientId, registry = Registry}) -> State = #state{clientid = ClientId, registry = Registry}) ->
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
MsgId = message_id(PacketId), MsgId = message_id(PacketId),
?LOG(debug, "Handle outgoing: ~p", [PubPkt], State), ?LOG(debug, "Handle outgoing: ~p", [PubPkt]),
(emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) == undefined) (emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) == undefined)
andalso (byte_size(TopicName) =/= 2) andalso (byte_size(TopicName) =/= 2)
@ -1070,8 +1083,8 @@ handle_outgoing(Packet, State) ->
register_and_notify_client(TopicName, Payload, Dup, QoS, Retain, MsgId, ClientId, register_and_notify_client(TopicName, Payload, Dup, QoS, Retain, MsgId, ClientId,
State = #state{registry = Registry}) -> State = #state{registry = Registry}) ->
TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName), TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName),
?LOG(debug, "register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, Retain=~p, MsgId=~p", ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, "
[TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId], State), "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
send_register(TopicName, TopicId, MsgId, State). send_register(TopicName, TopicId, MsgId, State).
message_id(undefined) -> message_id(undefined) ->