diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index db44fa148..0b0e7a217 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -21,6 +21,9 @@ -include("emqx_sn.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-logger_header("[MQTT-SN]"). %% API. -export([start_link/3]). @@ -97,8 +100,6 @@ -define(STAT_TIMEOUT, 10000). -define(IDLE_TIMEOUT, 30000). -define(DEFAULT_CHAN_OPTIONS, [{max_packet_size, 256}, {zone, external}]). --define(LOG(Level, Format, Args, State), - emqx_logger:Level("MQTT-SN(~s): " ++ Format, [esockd:format(State#state.peername) | Args])). -define(NEG_QOS_CLIENT_ID, <<"NegQoS-Client">>). @@ -113,6 +114,12 @@ conn_mod => ?MODULE }). +-define(is_non_error_reason(Reason), + Reason =:= normal; + Reason =:= idle_timeout; + Reason =:= asleep_timeout; + Reason =:= keepalive_timeout). + %%-------------------------------------------------------------------- %% Exported APIs %%-------------------------------------------------------------------- @@ -159,6 +166,7 @@ init([{_, SockPid, Sock}, Peername, Options]) -> enable_qos3 = EnableQos3, idle_timeout = IdleTimeout }, + emqx_logger:set_metadata_peername(esockd:format(Peername)), {ok, idle, State, [IdleTimeout]}; {error, Reason} when Reason =:= enotconn; Reason =:= einval; @@ -186,7 +194,7 @@ idle(cast, {incoming, ?SN_DISCONNECT_MSG(_Duration)}, State) -> {keep_state, State, State#state.idle_timeout}; idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = #state{enable_qos3 = false}) -> - ?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in idle mode!", [], State), + ?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in idle mode!"), {keep_state, State#state.idle_timeout}; idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, @@ -204,7 +212,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, false -> ok end, - ?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId], State), + ?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId]), {keep_state, State#state.idle_timeout}; idle(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) -> @@ -253,8 +261,8 @@ wait_for_will_topic(cast, {connack, ConnAck}, State) -> ok = handle_outgoing(ConnAck, State), {next_state, connected, State}; -wait_for_will_topic(cast, Event, State) -> - ?LOG(error, "wait_for_will_topic UNEXPECTED Event: ~p", [Event], State), +wait_for_will_topic(cast, Event, _State) -> + ?LOG(error, "wait_for_will_topic UNEXPECTED Event: ~p", [Event]), keep_state_and_data; wait_for_will_topic(EventType, EventContent, State) -> @@ -288,13 +296,13 @@ connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)}, State = #state{clientid = ClientId, registry = Registry}) -> case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of TopicId when is_integer(TopicId) -> - ?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId], State), + ?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]), send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State); {error, too_large} -> - ?LOG(error, "TopicId is full! ClientId=~p, TopicName=~p", [ClientId, TopicName], State), + ?LOG(error, "TopicId is full! ClientId=~p, TopicName=~p", [ClientId, TopicName]), send_message(?SN_REGACK_MSG(?SN_INVALID_TOPIC_ID, MsgId, ?SN_RC_NOT_SUPPORTED), State); {error, wildcard_topic} -> - ?LOG(error, "wildcard topic can not be registered! ClientId=~p, TopicName=~p", [ClientId, TopicName], State), + ?LOG(error, "wildcard topic can not be registered! ClientId=~p, TopicName=~p", [ClientId, TopicName]), send_message(?SN_REGACK_MSG(?SN_INVALID_TOPIC_ID, MsgId, ?SN_RC_NOT_SUPPORTED), State) end, {keep_state, State}; @@ -305,7 +313,7 @@ connected(cast, {incoming, ?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)}, Skip = (EnableQoS3 =:= false) andalso (QoS =:= ?QOS_NEG1), case Skip of true -> - ?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in connected mode!", [], State), + ?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in connected mode!"), {keep_state, State}; false -> do_publish(TopicIdType, TopicId, Data, Flags, MsgId, State) @@ -333,7 +341,7 @@ connected(cast, {incoming, ?SN_REGACK_MSG(_TopicId, _MsgId, ?SN_RC_ACCEPTED)}, S {keep_state, State}; connected(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, State) -> ?LOG(error, "client does not accept register TopicId=~p, MsgId=~p, ReturnCode=~p", - [TopicId, MsgId, ReturnCode], State), + [TopicId, MsgId, ReturnCode]), {keep_state, State}; connected(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) -> @@ -380,7 +388,7 @@ connected(cast, {shutdown, Reason}, State) -> stop(Reason, State); connected(cast, {close, Reason}, State) -> - ?LOG(debug, "Force to close the socket due to ~p", [Reason], State), + ?LOG(debug, "Force to close the socket due to ~p", [Reason]), handle_info({sock_closed, Reason}, close_socket(State)); connected(EventType, EventContent, State) -> @@ -450,7 +458,7 @@ awake(cast, {incoming, ?SN_REGACK_MSG(_TopicId, _MsgId, ?SN_RC_ACCEPTED)}, State awake(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, State) -> ?LOG(error, "client does not accept register TopicId=~p, MsgId=~p, ReturnCode=~p", - [TopicId, MsgId, ReturnCode], State), + [TopicId, MsgId, ReturnCode]), {keep_state, State}; awake(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) -> @@ -494,26 +502,25 @@ handle_event({call, From}, Req, _StateName, State) -> handle_event(info, {datagram, SockPid, Data}, StateName, State = #state{sockpid = SockPid, channel = _Channel}) -> - ?LOG(debug, "RECV ~p", [Data], State), + ?LOG(debug, "RECV ~p", [Data]), Oct = iolist_size(Data), inc_counter(recv_oct, Oct), try emqx_sn_frame:parse(Data) of {ok, Msg} -> inc_counter(recv_cnt, 1), - ?LOG(info, "RECV ~s at state ~s", - [emqx_sn_frame:format(Msg), StateName], State), + ?LOG(info, "RECV ~s at state ~s", [emqx_sn_frame:format(Msg), StateName]), {keep_state, State, next_event({incoming, Msg})} catch error:Error:Stacktrace -> ?LOG(info, "Parse frame error: ~p at state ~s, Stacktrace: ~p", - [Error, StateName, Stacktrace], State), + [Error, StateName, Stacktrace]), stop(frame_error, State) end; handle_event(info, {deliver, _Topic, Msg}, asleep, State = #state{channel = Channel}) -> % section 6.14, Support of sleeping clients - ?LOG(debug, "enqueue downlink message in asleep state Msg=~p", [Msg], State), + ?LOG(debug, "enqueue downlink message in asleep state Msg=~p", [Msg]), Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)), {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; @@ -541,11 +548,11 @@ handle_event(info, {timeout, TRef, TMsg}, _StateName, State) -> handle_timeout(TRef, TMsg, State); handle_event(info, asleep_timeout, asleep, State) -> - ?LOG(debug, "asleep timer timeout, shutdown now", [], State), + ?LOG(debug, "asleep timer timeout, shutdown now"), stop(asleep_timeout, State); handle_event(info, asleep_timeout, StateName, State) -> - ?LOG(debug, "asleep timer timeout on StateName=~p, ignore it", [StateName], State), + ?LOG(debug, "asleep timer timeout on StateName=~p, ignore it", [StateName]), {keep_state, State}; handle_event(cast, {close, Reason}, _StateName, State) -> @@ -570,7 +577,7 @@ handle_event(cast, {event, _Other}, _StateName, State = #state{channel = Channel handle_event(EventType, EventContent, StateName, State) -> ?LOG(error, "StateName: ~s, Unexpected Event: ~p", - [StateName, {EventType, EventContent}], State), + [StateName, {EventType, EventContent}]), {keep_state, State}. terminate(Reason, _StateName, #state{clientid = ClientId, @@ -753,8 +760,8 @@ send_connack(State) -> send_message(?SN_CONNACK_MSG(?SN_RC_ACCEPTED), State). send_message(Msg = #mqtt_sn_message{type = Type}, - State = #state{sockpid = SockPid, peername = Peername}) -> - ?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)], State), + #state{sockpid = SockPid, peername = Peername}) -> + ?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)]), inc_outgoing_stats(Type), Data = emqx_sn_frame:serialize(Msg), ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)), @@ -764,7 +771,7 @@ send_message(Msg = #mqtt_sn_message{type = Type}, goto_asleep_state(State) -> goto_asleep_state(undefined, State). goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer}) -> - ?LOG(debug, "goto_asleep_state Duration=~p", [Duration], State), + ?LOG(debug, "goto_asleep_state Duration=~p", [Duration]), NewTimer = emqx_sn_asleep_timer:ensure(Duration, AsleepTimer), {next_state, asleep, State#state{asleep_timer = NewTimer}, hibernate}. @@ -774,7 +781,7 @@ goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer}) -> stop({shutdown, Reason}, State) -> stop(Reason, State); stop(Reason, State) -> - ?LOG(error, "stop due to ~p", [Reason], State), + ?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]), case Reason of %% FIXME: The Will-Msg should publish when a Session terminated! asleep_timeout -> do_publish_will(State); @@ -786,9 +793,14 @@ stop(Reason, State) -> stop({shutdown, Reason}, Reply, State) -> stop(Reason, Reply, State); stop(Reason, Reply, State) -> - ?LOG(error, "stop due to ~p", [Reason], State), + ?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]), {stop, {shutdown, Reason}, Reply, State}. +stop_log_level(Reason) when ?is_non_error_reason(Reason) -> + debug; +stop_log_level(_) -> + error. + mqttsn_to_mqtt(?SN_PUBACK, MsgId) -> ?PUBACK_PACKET(MsgId); mqttsn_to_mqtt(?SN_PUBREC, MsgId) -> @@ -799,6 +811,7 @@ mqttsn_to_mqtt(?SN_PUBCOMP, MsgId) -> ?PUBCOMP_PACKET(MsgId). do_connect(ClientId, CleanStart, WillFlag, Duration, State) -> + emqx_logger:set_metadata_clientid(ClientId), %% 6.6 Client’s Publish Procedure %% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message %% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange @@ -831,6 +844,7 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname, clientid = OldClientId, registry = Registry, channel = Channel}) -> + emqx_logger:set_metadata_clientid(ClientId), #mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags, NChannel = case CleanStart of true -> @@ -920,7 +934,7 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) -> do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State=#state{clientid = ClientId, registry = Registry}) -> #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, - NewQoS = get_corrected_qos(QoS, State), + NewQoS = get_corrected_qos(QoS), case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> (NewQoS =/= ?QOS_0) andalso send_message(?SN_PUBACK_MSG(TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID), State), @@ -930,7 +944,7 @@ do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, end; do_publish(?SN_SHORT_TOPIC, STopicName, Data, Flags, MsgId, State) -> #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, - NewQoS = get_corrected_qos(QoS, State), + NewQoS = get_corrected_qos(QoS), <> = STopicName , case emqx_topic:wildcard(STopicName) of true -> @@ -973,7 +987,7 @@ do_puback(TopicId, MsgId, ReturnCode, StateName, {keep_state, State} end; _ -> - ?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode], State), + ?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]), {keep_state, State} end. @@ -982,13 +996,13 @@ do_pubrec(PubRec, MsgId, State) -> proto_subscribe(TopicName, QoS, MsgId, TopicId, State) -> ?LOG(debug, "subscribe Topic=~p, MsgId=~p, TopicId=~p", - [TopicName, MsgId, TopicId], State), + [TopicName, MsgId, TopicId]), enqueue_msgid(suback, MsgId, TopicId), SubOpts = maps:put(qos, QoS, ?DEFAULT_SUBOPTS), handle_incoming(?SUBSCRIBE_PACKET(MsgId, [{TopicName, SubOpts}]), State). proto_unsubscribe(TopicName, MsgId, State) -> - ?LOG(debug, "unsubscribe Topic=~p, MsgId=~p", [TopicName, MsgId], State), + ?LOG(debug, "unsubscribe Topic=~p, MsgId=~p", [TopicName, MsgId]), handle_incoming(?UNSUBSCRIBE_PACKET(MsgId, [TopicName]), State). proto_publish(TopicName, Data, Dup, QoS, Retain, MsgId, TopicId, State) -> @@ -996,7 +1010,7 @@ proto_publish(TopicName, Data, Dup, QoS, Retain, MsgId, TopicId, State) -> Publish = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, dup = Dup, qos = QoS, retain = Retain}, variable = #mqtt_packet_publish{topic_name = TopicName, packet_id = MsgId}, payload = Data}, - ?LOG(debug, "[publish] Msg: ~p~n", [Publish], State), + ?LOG(debug, "[publish] Msg: ~p~n", [Publish]), handle_incoming(Publish, State). update_will_topic(undefined, #mqtt_sn_flags{qos = QoS, retain = Retain}, Topic) -> @@ -1019,11 +1033,10 @@ dequeue_msgid(suback, MsgId) -> dequeue_msgid(puback, MsgId) -> erase({puback, MsgId}). -get_corrected_qos(?QOS_NEG1, State) -> - ?LOG(debug, "Receive a publish with QoS=-1", [], State), +get_corrected_qos(?QOS_NEG1) -> + ?LOG(debug, "Receive a publish with QoS=-1"), ?QOS_0; - -get_corrected_qos(QoS, _State) -> +get_corrected_qos(QoS) -> QoS. get_topic_id(Type, MsgId) -> @@ -1042,10 +1055,10 @@ handle_incoming(Packet, _StName, State) -> Result = channel_handle_in(Packet, State), handle_return(Result, State). -channel_handle_in(Packet = ?PACKET(Type), State = #state{channel = Channel}) -> +channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) -> _ = inc_incoming_stats(Type), ok = emqx_metrics:inc_recv(Packet), - ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)], State), + ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), emqx_channel:handle_in(Packet, Channel). handle_outgoing(Packets, State) when is_list(Packets) -> @@ -1055,7 +1068,7 @@ handle_outgoing(PubPkt = ?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload), State = #state{clientid = ClientId, registry = Registry}) -> #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, MsgId = message_id(PacketId), - ?LOG(debug, "Handle outgoing: ~p", [PubPkt], State), + ?LOG(debug, "Handle outgoing: ~p", [PubPkt]), (emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) == undefined) andalso (byte_size(TopicName) =/= 2) @@ -1070,8 +1083,8 @@ handle_outgoing(Packet, State) -> register_and_notify_client(TopicName, Payload, Dup, QoS, Retain, MsgId, ClientId, State = #state{registry = Registry}) -> TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName), - ?LOG(debug, "register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, Retain=~p, MsgId=~p", - [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId], State), + ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, " + "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), send_register(TopicName, TopicId, MsgId, State). message_id(undefined) ->