diff --git a/priv/emqx.schema b/priv/emqx.schema index 8026a6400..f003c37f3 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -437,7 +437,9 @@ end}. [{peername, [client_id,"@",peername," "], [client_id, " "]}], - []}, + [{peername, + [peername," "], + []}]}, msg,"\n"]}}, FileConf = fun(Filename) -> #{type => wrap, diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 6c3d4c7e6..9118933ef 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -50,9 +50,12 @@ -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). --define(LOG(Level, Format, Args, State), - emqx_logger:Level("MQTT(~s): " ++ Format, - [esockd_net:format(State#state.peername) | Args])). +-define(LOG(Level, Format, Args), + emqx_logger:Level(#{header => "[TCP] ", format => Format, args => Args}, + #{report_cb => + fun(#{header := Hdr0, format := Fmt0, args := Args0}) -> + {Hdr0 ++ Fmt0, Args0} + end})). start_link(Transport, Socket, Options) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Socket, Options]])}. @@ -153,6 +156,8 @@ init([Transport, RawSocket, Options]) -> GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), ok = emqx_gc:init(GcPolicy), ok = emqx_misc:init_proc_mng_policy(Zone), + + emqx_logger:add_proc_metadata(#{peername => esockd_net:format(Peername)}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State, self(), IdleTimout); {error, Reason} -> @@ -169,7 +174,6 @@ send_fun(Transport, Socket, Peername) -> Data = emqx_frame:serialize(Packet, Options), try Transport:async_send(Socket, Data) of ok -> - ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}), emqx_metrics:inc('bytes/sent', iolist_size(Data)), ok; Error -> Error @@ -195,11 +199,11 @@ handle_call(session, _From, State = #state{proto_state = ProtoState}) -> {reply, emqx_protocol:session(ProtoState), State}; handle_call(Req, _From, State) -> - ?LOG(error, "unexpected call: ~p", [Req], State), + ?LOG(error, "unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "unexpected cast: ~p", [Msg], State), + ?LOG(error, "unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> @@ -225,7 +229,7 @@ handle_info({timeout, Timer, emit_stats}, ok = emqx_gc:reset(), {noreply, NewState, hibernate}; {shutdown, Reason} -> - ?LOG(warning, "shutdown due to ~p", [Reason], NewState), + ?LOG(warning, "shutdown due to ~p", [Reason]), shutdown(Reason, NewState) end; handle_info(timeout, State) -> @@ -235,18 +239,18 @@ handle_info({shutdown, Reason}, State) -> shutdown(Reason, State); handle_info({shutdown, discard, {ClientId, ByPid}}, State) -> - ?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State), + ?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid]), shutdown(discard, State); handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> - ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), + ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); handle_info(activate_sock, State) -> {noreply, run_socket(State#state{conn_state = running, limit_timer = undefined})}; handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> - ?LOG(debug, "RECV ~p", [Data], State), + ?LOG(debug, "RECV ~p", [Data]), Size = iolist_size(Data), emqx_metrics:inc('bytes/received', Size), Incoming = #{bytes => Size, packets => 0}, @@ -262,7 +266,7 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) -> - ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), + ?LOG(debug, "Keepalive at the interval of ~p", [Interval]), StatFun = fun() -> case Transport:getstat(Socket, [recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; @@ -287,14 +291,14 @@ handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> end; handle_info(Info, State) -> - ?LOG(error, "unexpected info: ~p", [Info], State), + ?LOG(error, "unexpected info: ~p", [Info]), {noreply, State}. -terminate(Reason, State = #state{transport = Transport, - socket = Socket, - keepalive = KeepAlive, - proto_state = ProtoState}) -> - ?LOG(debug, "Terminated for ~p", [Reason], State), +terminate(Reason, #state{transport = Transport, + socket = Socket, + keepalive = KeepAlive, + proto_state = ProtoState}) -> + ?LOG(debug, "Terminated for ~p", [Reason]), Transport:fast_close(Socket), emqx_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of @@ -330,7 +334,7 @@ handle_packet(Data, State = #state{proto_state = ProtoState, NewState = State#state{proto_state = ProtoState1}, handle_packet(Rest, inc_publish_cnt(Type, reset_parser(NewState))); {error, Reason} -> - ?LOG(error, "Process packet error - ~p", [Reason], State), + ?LOG(error, "Process packet error - ~p", [Reason]), shutdown(Reason, State); {error, Reason, ProtoState1} -> shutdown(Reason, State#state{proto_state = ProtoState1}); @@ -338,10 +342,10 @@ handle_packet(Data, State = #state{proto_state = ProtoState, stop(Error, State#state{proto_state = ProtoState1}) end; {error, Error} -> - ?LOG(error, "Framing error - ~p", [Error], State), + ?LOG(error, "Framing error - ~p", [Error]), shutdown(Error, State); {'EXIT', Reason} -> - ?LOG(error, "Parse failed for ~p~nError data:~p", [Reason, Data], State), + ?LOG(error, "Parse failed for ~p~nError data:~p", [Reason, Data]), shutdown(parse_error, State) end. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 9289dd77c..904b158ab 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -72,8 +72,12 @@ -define(NO_PROPS, undefined). --define(LOG(Level, Format, Args, _PState), - emqx_logger:Level("[MQTT] " ++ Format, Args)). +-define(LOG(Level, Format, Args), + emqx_logger:Level(#{header => "[MQTT] ", format => Format, args => Args}, + #{report_cb => + fun(#{header := Hdr0, format := Fmt0, args := Args0}) -> + {Hdr0 ++ Fmt0, Args0} + end})). %%------------------------------------------------------------------------------ %% Init @@ -81,7 +85,6 @@ -spec(init(map(), list()) -> state()). init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> - emqx_logger:add_proc_metadata(#{peername => esockd_net:format(Peername)}), Zone = proplists:get_value(zone, Options), #pstate{zone = Zone, sendfun = SendFun, @@ -211,7 +214,7 @@ received(?PACKET(?CONNECT), PState = #pstate{connected = true}) -> received(Packet = ?PACKET(Type), PState) -> PState1 = set_protover(Packet, PState), - trace(recv, Packet, PState1), + trace(recv, Packet), try emqx_packet:validate(Packet) of true -> {Packet1, PState2} = preprocess_properties(Packet, PState1), @@ -319,11 +322,11 @@ process_packet(?CONNECT_PACKET( %% Success {?RC_SUCCESS, SP, PState4}; {error, Error} -> - ?LOG(error, "Failed to open session: ~p", [Error], PState1), + ?LOG(error, "Failed to open session: ~p", [Error]), {?RC_UNSPECIFIED_ERROR, PState1} end; {error, Reason} -> - ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], PState2), + ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason]), {?RC_NOT_AUTHORIZED, PState1} end; {error, ReasonCode} -> @@ -335,26 +338,31 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PSt {ok, PState1} -> do_publish(Packet, PState1); {error, ?RC_TOPIC_ALIAS_INVALID} -> - ?LOG(error, "Protocol error - ~p", [?RC_TOPIC_ALIAS_INVALID], PState), + ?LOG(error, "Protocol error - ~p", [?RC_TOPIC_ALIAS_INVALID]), {error, ?RC_TOPIC_ALIAS_INVALID, PState}; {error, ReasonCode} -> - ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, ReasonCode], PState), + ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", + [Topic, emqx_reason_codes:text(ReasonCode)]), {error, ReasonCode, PState} end; -process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) -> +process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) -> case check_publish(Packet, PState) of {ok, PState1} -> do_publish(Packet, PState1); {error, ReasonCode} -> + ?LOG(warning, "Cannot publish qos1 message to ~s for ~s", + [Topic, emqx_reason_codes:text(ReasonCode)]), deliver({puback, PacketId, ReasonCode}, PState) end; -process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) -> +process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) -> case check_publish(Packet, PState) of {ok, PState1} -> do_publish(Packet, PState1); {error, ReasonCode} -> + ?LOG(warning, "Cannot publish qos2 message to ~s for ~s", + [Topic, emqx_reason_codes:text(ReasonCode)]), deliver({pubrec, PacketId, ReasonCode}, PState) end; @@ -404,11 +412,14 @@ process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), deliver({suback, PacketId, ReasonCodes}, PState) end; {error, TopicFilters} -> - ReasonCodes = lists:map(fun({_, #{rc := ?RC_SUCCESS}}) -> - ?RC_IMPLEMENTATION_SPECIFIC_ERROR; - ({_, #{rc := ReasonCode}}) -> - ReasonCode - end, TopicFilters), + {SubTopics, ReasonCodes} = + lists:foldr(fun({Topic, #{rc := ?RC_SUCCESS}}, {Topics, Codes}) -> + {[Topic|Topics], [?RC_IMPLEMENTATION_SPECIFIC_ERROR | Codes]}; + ({Topic, #{rc := Code}}, {Topics, Codes}) -> + {[Topic|Topics], [Code|Codes]} + end, {[], []}, TopicFilters), + ?LOG(warning, "Cannot subscribe ~p for ~p", + [SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]), deliver({suback, PacketId, ReasonCodes}, PState) end; @@ -585,7 +596,7 @@ deliver({disconnect, _ReasonCode}, PState) -> -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}). send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) -> - trace(send, Packet, PState), + trace(send, Packet), case SendFun(Packet, #{version => Ver}) of ok -> emqx_metrics:sent(Packet), @@ -759,7 +770,8 @@ check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl}) check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, PState) -> case emqx_access_control:check_acl(credentials(PState), publish, Topic) of allow -> ok; - deny -> {error, ?RC_NOT_AUTHORIZED} + deny -> + {error, ?RC_NOT_AUTHORIZED} end. run_check_steps([], _Packet, PState) -> @@ -793,17 +805,22 @@ check_sub_acl(TopicFilters, PState) -> case emqx_access_control:check_acl(Credentials, subscribe, Topic) of allow -> {Ok, [{Topic, SubOpts}|Acc]}; deny -> - emqx_logger:warning([{client, PState#pstate.client_id}], - "ACL(~s) Cannot SUBSCRIBE ~p for ACL Deny", - [PState#pstate.client_id, Topic]), {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]} end end, {ok, []}, TopicFilters). -trace(recv, Packet, PState) -> - ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)], PState); -trace(send, Packet, PState) -> - ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)], PState). +trace(recv, Packet) -> + emqx_logger:debug(#{header => "[MQTT] RECV ~s", pck => Packet}, + #{report_cb => + fun(#{header := Fmt, pck := Pckt}) -> + {Fmt, [emqx_packet:format(Pckt)]} + end}); +trace(send, Packet) -> + emqx_logger:debug(#{header => "[MQTT] SEND ~s", pck => Packet}, + #{report_cb => + fun(#{header := Fmt, pck := Pckt}) -> + {Fmt, [emqx_packet:format(Pckt)]} + end}). inc_stats(recv, Type, PState = #pstate{recv_stats = Stats}) -> PState#pstate{recv_stats = inc_stats(Type, Stats)}; @@ -826,7 +843,7 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict; emqx_cm:unregister_connection(ClientId); shutdown(Reason, PState = #pstate{connected = true, client_id = ClientId}) -> - ?LOG(info, "Shutdown for ~p", [Reason], PState), + ?LOG(info, "Shutdown for ~p", [Reason]), emqx_hooks:run('client.disconnected', [credentials(PState), Reason]), emqx_cm:unregister_connection(ClientId). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 6a8b71094..fd421dde5 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -45,9 +45,12 @@ -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). --define(WSLOG(Level, Format, Args, _State), - emqx_logger:Level("[MQTT/WS] " ++ Format, Args)). - +-define(WSLOG(Level, Format, Args), + emqx_logger:Level(#{header => "[WS] ", format => Format, args => Args}, + #{report_cb => + fun(#{header := Hdr0, format := Fmt0, args := Args0}) -> + {Hdr0 ++ Fmt0, Args0} + end})). %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ @@ -135,6 +138,8 @@ websocket_init(#state{request = Req, options = Options}) -> EnableStats = emqx_zone:get_env(Zone, enable_stats, true), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS), + + emqx_logger:add_proc_metadata(#{peername => esockd_net:format(Peername)}), {ok, #state{peername = Peername, sockname = Sockname, parser_state = ParserState, @@ -164,7 +169,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, proto_state = ProtoState}) -> BinSize = iolist_size(Data), put(recv_oct, get(recv_oct) + BinSize), - ?WSLOG(debug, "RECV ~p", [Data], State), + ?WSLOG(debug, "RECV ~p", [Data]), emqx_metrics:inc('bytes/received', BinSize), case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of {more, NewParserState} -> @@ -176,7 +181,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, {ok, ProtoState1} -> websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1})); {error, Error} -> - ?WSLOG(error, "Protocol error - ~p", [Error], State), + ?WSLOG(error, "Protocol error - ~p", [Error]), stop(Error, State); {error, Reason, ProtoState1} -> shutdown(Reason, State#state{proto_state = ProtoState1}); @@ -184,10 +189,10 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, stop(Error, State#state{proto_state = ProtoState1}) end; {error, Error} -> - ?WSLOG(error, "Frame error: ~p", [Error], State), + ?WSLOG(error, "Frame error: ~p", [Error]), stop(Error, State); {'EXIT', Reason} -> - ?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data], State), + ?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data]), shutdown(parse_error, State) end. @@ -225,12 +230,12 @@ websocket_info({timeout, Timer, emit_stats}, {ok, State#state{stats_timer = undefined}, hibernate}; websocket_info({keepalive, start, Interval}, State) -> - ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State), + ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval]), case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of {ok, KeepAlive} -> {ok, State#state{keepalive = KeepAlive}}; {error, Error} -> - ?WSLOG(warning, "Keepalive error - ~p", [Error], State), + ?WSLOG(warning, "Keepalive error - ~p", [Error]), shutdown(Error, State) end; @@ -239,19 +244,19 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> {ok, KeepAlive1} -> {ok, State#state{keepalive = KeepAlive1}}; {error, timeout} -> - ?WSLOG(debug, "Keepalive Timeout!", [], State), + ?WSLOG(debug, "Keepalive Timeout!", []), shutdown(keepalive_timeout, State); {error, Error} -> - ?WSLOG(warning, "Keepalive error - ~p", [Error], State), + ?WSLOG(warning, "Keepalive error - ~p", [Error]), shutdown(keepalive_error, State) end; websocket_info({shutdown, discard, {ClientId, ByPid}}, State) -> - ?WSLOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State), + ?WSLOG(warning, "discarded by ~s:~p", [ClientId, ByPid]), shutdown(discard, State); websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) -> - ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), + ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); websocket_info({binary, Data}, State) -> @@ -261,14 +266,14 @@ websocket_info({shutdown, Reason}, State) -> shutdown(Reason, State); websocket_info(Info, State) -> - ?WSLOG(error, "unexpected info: ~p", [Info], State), + ?WSLOG(error, "unexpected info: ~p", [Info]), {ok, State}. -terminate(SockError, _Req, State = #state{keepalive = Keepalive, - proto_state = ProtoState, - shutdown = Shutdown}) -> +terminate(SockError, _Req, #state{keepalive = Keepalive, + proto_state = ProtoState, + shutdown = Shutdown}) -> ?WSLOG(debug, "Terminated for ~p, sockerror: ~p", - [Shutdown, SockError], State), + [Shutdown, SockError]), emqx_keepalive:cancel(Keepalive), case {ProtoState, Shutdown} of {undefined, _} -> ok;