From 71731c01f1b69b58ca5423c00d70845363b7b2a1 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Mon, 11 Oct 2021 01:35:43 +0200 Subject: [PATCH] refactor(emqx): refine SLOG messages unified logging for unexpected handle_cast handle_call and handle_info --- apps/emqx/src/emqx_alarm.erl | 15 +-- apps/emqx/src/emqx_authentication.erl | 6 +- apps/emqx/src/emqx_banned.erl | 7 +- apps/emqx/src/emqx_broker.erl | 22 ++-- apps/emqx/src/emqx_broker_helper.erl | 4 +- apps/emqx/src/emqx_channel.erl | 37 ++----- apps/emqx/src/emqx_cm.erl | 19 ++-- apps/emqx/src/emqx_cm_registry.erl | 4 +- apps/emqx/src/emqx_config_handler.erl | 4 +- apps/emqx/src/emqx_connection.erl | 22 ++-- apps/emqx/src/emqx_ctl.erl | 6 +- apps/emqx/src/emqx_flapping.erl | 15 ++- apps/emqx/src/emqx_hooks.erl | 9 +- apps/emqx/src/emqx_message.erl | 21 +--- apps/emqx/src/emqx_os_mon.erl | 10 +- apps/emqx/src/emqx_plugins.erl | 105 +++++++------------- apps/emqx/src/emqx_pool.erl | 16 +-- apps/emqx/src/emqx_router.erl | 4 +- apps/emqx/src/emqx_router_helper.erl | 9 +- apps/emqx/src/emqx_session.erl | 12 +-- apps/emqx/src/emqx_shared_sub.erl | 7 +- apps/emqx/src/emqx_stats.erl | 25 +++-- apps/emqx/src/emqx_sys.erl | 4 +- apps/emqx/src/emqx_sys_mon.erl | 67 ++++++------- apps/emqx/src/emqx_tracer.erl | 15 +-- apps/emqx/src/emqx_vm_mon.erl | 15 +-- apps/emqx/src/emqx_ws_connection.erl | 29 +++--- apps/emqx/test/emqx_message_SUITE.erl | 7 -- apps/emqx/test/emqx_sys_mon_SUITE.erl | 37 ++++--- apps/emqx/test/emqx_ws_connection_SUITE.erl | 2 +- 30 files changed, 210 insertions(+), 345 deletions(-) diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 296b30db1..1f59594c4 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -239,17 +239,11 @@ handle_call({get_alarms, deactivated}, _From, State) -> {reply, Alarms, State}; handle_call(Req, _From, State) -> - ?SLOG(error, #{ - msg => "unexpected_call", - call => Req - }), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?SLOG(error, #{ - msg => "unexpected_msg", - payload => Msg - }), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info({timeout, _TRef, delete_expired_deactivated_alarm}, @@ -259,10 +253,7 @@ handle_info({timeout, _TRef, delete_expired_deactivated_alarm}, {noreply, State#state{timer = ensure_timer(TRef, Period)}}; handle_info({update_timer, Period}, #state{timer = TRef} = State) -> - ?SLOG(warning, #{ - msg => "update_the_validity_period_timer", - period => Period - }), + ?SLOG(warning, #{msg => "update_the_validity_period_timer", period => Period}), {noreply, State#state{timer = ensure_timer(TRef, Period)}}; handle_info(Info, State) -> diff --git a/apps/emqx/src/emqx_authentication.erl b/apps/emqx/src/emqx_authentication.erl index 46387c240..ea077e171 100644 --- a/apps/emqx/src/emqx_authentication.erl +++ b/apps/emqx/src/emqx_authentication.erl @@ -311,7 +311,7 @@ do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | M catch Class:Reason:Stacktrace -> ?SLOG(warning, #{msg => "unexpected_error_in_authentication", - class => Class, + exception => Class, reason => Reason, stacktrace => Stacktrace, authenticator => ID}), @@ -652,11 +652,11 @@ handle_call({list_users, ChainName, AuthenticatorID}, _From, State) -> reply(Reply, State); handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast(Req, State) -> - ?SLOG(error, #{msg => "unexpected_cast", req => Req}), + ?SLOG(error, #{msg => "unexpected_cast", cast => Req}), {noreply, State}. handle_info(Info, State) -> diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 2bab96843..759c9f955 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -187,14 +187,11 @@ init([]) -> {ok, ensure_expiry_timer(#{expiry_timer => undefined})}. handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?SLOG(error, #{ - msg => "unexpected_msg", - payload => Msg - }), + ?SLOG(error, #{msg => "unexpected_msg", cast => Msg}), {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 2c0380f62..e556361c7 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -202,10 +202,8 @@ publish(Msg) when is_record(Msg, message) -> emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of #message{headers = #{allow_publish := false}} -> - ?SLOG(notice, #{ - msg => "stop_publishing", - payload => emqx_message:format(Msg) - }), + ?SLOG(debug, #{msg => "message_not_published", + payload => emqx_message:to_log_map(Msg)}), []; Msg1 = #message{topic = Topic} -> route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) @@ -217,11 +215,12 @@ safe_publish(Msg) when is_record(Msg, message) -> try publish(Msg) catch - _:Error:Stk-> + Error : Reason : Stk-> ?SLOG(error,#{ msg => "publishing_error", - error => Error, - payload => Msg, + exception => Error, + reason => Reason, + payload => emqx_message:to_log_map(Msg), stacktrace => Stk }), [] @@ -465,17 +464,14 @@ handle_call({subscribe, Topic, I}, _From, State) -> {reply, Ok, State}; handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast({subscribe, Topic}, State) -> case emqx_router:do_add_route(Topic) of ok -> ok; {error, Reason} -> - ?SLOG(error, #{ - msg => "failed_to_add_route", - reason => Reason - }) + ?SLOG(error, #{msg => "failed_to_add_route", reason => Reason}) end, {noreply, State}; @@ -499,7 +495,7 @@ handle_cast({unsubscribed, Topic, I}, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info(Info, State) -> diff --git a/apps/emqx/src/emqx_broker_helper.erl b/apps/emqx/src/emqx_broker_helper.erl index 804bbaebd..f31f5b164 100644 --- a/apps/emqx/src/emqx_broker_helper.erl +++ b/apps/emqx/src/emqx_broker_helper.erl @@ -118,7 +118,7 @@ init([]) -> {ok, #{pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> @@ -127,7 +127,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> {noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}}; handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) -> diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 93ee21416..61ccdae16 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -373,17 +373,11 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel ok = after_message_acked(ClientInfo, Msg, Properties), handle_out(publish, Publishes, Channel#channel{session = NSession}); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> - ?SLOG(warning, #{ - msg => "puback_packetId_inuse", - packetId => PacketId - }), + ?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}), ok = emqx_metrics:inc('packets.puback.inuse'), {ok, Channel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> - ?SLOG(warning, #{ - msg => "puback_packetId_not_found", - packetId => PacketId - }), + ?SLOG(warning, #{msg => "puback_packetId_not_found", packetId => PacketId}), ok = emqx_metrics:inc('packets.puback.missed'), {ok, Channel} end; @@ -507,17 +501,11 @@ handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel); handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?SLOG(error, #{ - msg => "malformed_mqtt_message", - reason => Reason - }), + ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}), {ok, Channel}; handle_in(Packet, Channel) -> - ?SLOG(error, #{ - msg => "disconnecting_due_to_unexpected_message", - packet => Packet - }), + ?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}), handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel). %%-------------------------------------------------------------------- @@ -541,10 +529,7 @@ process_connect(AckProps, Channel = #channel{conninfo = ConnInfo, {error, client_id_unavailable} -> handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel); {error, Reason} -> - ?SLOG(error, #{ - msg => "failed_to_open_session", - reason => Reason - }), + ?SLOG(error, #{msg => "failed_to_open_session", reason => Reason}), handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel) end. @@ -995,7 +980,7 @@ handle_call({quota, Policy}, Channel) -> reply(ok, Channel#channel{quota = Quota}); handle_call(Req, Channel) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), reply(ignored, Channel). %%-------------------------------------------------------------------- @@ -1035,10 +1020,7 @@ handle_info({sock_closed, Reason}, Channel = end; handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?SLOG(error, #{ - msg => "unexpected_sock_closed", - reason => Reason - }), + ?SLOG(error, #{msg => "unexpected_sock_close", reason => Reason}), {ok, Channel}; handle_info(clean_authz_cache, Channel) -> @@ -1109,10 +1091,7 @@ handle_timeout(_TRef, expire_quota_limit, Channel) -> {ok, clean_timer(quota_timer, Channel)}; handle_timeout(_TRef, Msg, Channel) -> - ?SLOG(error, #{ - msg => "unexpected_timeout", - payload => Msg - }), + ?SLOG(error, #{msg => "unexpected_timeout", timeout_message => Msg}), {ok, Channel}. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index eaef5b8a7..98622de4c 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -266,9 +266,8 @@ get_mqtt_conf(Zone, Key) -> emqx_config:get_zone_conf(Zone, [mqtt, Key]). %% @doc Try to takeover a session. --spec(takeover_session(emqx_types:clientid()) - -> {error, term()} - | {ok, atom(), pid(), emqx_session:session()}). +-spec(takeover_session(emqx_types:clientid()) -> + {error, term()} | {ok, atom(), pid(), emqx_session:session()}). takeover_session(ClientId) -> case lookup_channels(ClientId) of [] -> {error, not_found}; @@ -276,10 +275,7 @@ takeover_session(ClientId) -> takeover_session(ClientId, ChanPid); ChanPids -> [ChanPid|StalePids] = lists:reverse(ChanPids), - ?SLOG(error, #{ - msg => "more_than_one_channel_found", - chan_pids => ChanPids - }), + ?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) end, StalePids), @@ -344,10 +340,7 @@ kick_session(ClientId) -> kick_session(ClientId, ChanPid); ChanPids -> [ChanPid|StalePids] = lists:reverse(ChanPids), - ?SLOG(error, #{ - msg => "more_than_one_channel_found", - chan_pids => ChanPids - }), + ?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) end, StalePids), @@ -422,7 +415,7 @@ init([]) -> {ok, #{chan_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> @@ -430,7 +423,7 @@ handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> {noreply, State#{chan_pmon := PMon1}}; handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 37171f347..ef7ad6131 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -114,11 +114,11 @@ init([]) -> {ok, #{}}. handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info({membership, {mnesia, down, Node}}, State) -> diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index 5db6b28ba..83db8e480 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -119,9 +119,9 @@ handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From, catch Error:Reason:ST -> ?SLOG(error, #{ msg => "change_config_failed", - error => Error, + exception => Error, reason => Reason, - st => ST + stacktrace => ST }), {error, Reason} end, diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index b3c4fd0c5..cb6e2ce8f 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -417,20 +417,14 @@ handle_msg({'$gen_cast', Req}, State) -> {ok, NewState}; handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> - ?SLOG(debug, #{ - msg => "RECV_data", - data => Data - }), + ?SLOG(debug, #{msg => "RECV_data", data => Data, transport => Inet}), Oct = iolist_size(Data), inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), parse_incoming(Data, State); handle_msg({quic, Data, _Sock, _, _, _}, State) -> - ?SLOG(debug, #{ - msg => "RECV_data", - data => Data - }), + ?SLOG(debug, #{msg => "RECV_data", data => Data, transport => quic}), Oct = iolist_size(Data), inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), @@ -495,7 +489,7 @@ handle_msg({connack, ConnAck}, State) -> handle_outgoing(ConnAck, State); handle_msg({close, Reason}, State) -> - ?SLOG(debug, #{msg => "force_to_close_the_socket", reason => Reason}), + ?SLOG(debug, #{msg => "force_socket_close", reason => Reason}), handle_info({sock_closed, Reason}, close_socket(State)); handle_msg({event, connected}, State = #state{channel = Channel}) -> @@ -661,7 +655,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> ?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState) , input_bytes => Data , parsed_packets => Packets - , exception => Reason + , reason => Reason , stacktrace => Stacktrace }), {[{frame_error, Reason} | Packets], State} @@ -678,10 +672,7 @@ next_incoming_msgs(Packets) -> handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) -> ok = inc_incoming_stats(Packet), - ?SLOG(debug, #{ - msg => "RECV_packet", - packet => Packet - }), + ?SLOG(debug, #{msg => "RECV_packet", packet => Packet}), with_channel(handle_in, [Packet], State); handle_incoming(FrameError, State) -> @@ -718,7 +709,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> try emqx_frame:serialize_pkt(Packet, Serialize) of <<>> -> ?SLOG(warning, #{ - msg => "packet_is_discarded_because_the frame_is_too_large", + msg => "packet_is_discarded", + reason => "frame_is_too_large", packet => emqx_packet:format(Packet) }), ok = emqx_metrics:inc('delivery.dropped.too_large'), diff --git a/apps/emqx/src/emqx_ctl.erl b/apps/emqx/src/emqx_ctl.erl index 09f913df6..52930e714 100644 --- a/apps/emqx/src/emqx_ctl.erl +++ b/apps/emqx/src/emqx_ctl.erl @@ -185,13 +185,13 @@ handle_call({register_command, Cmd, MF, Opts}, _From, State = #state{seq = Seq}) case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of [] -> ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}); [[OriginSeq] | _] -> - ?SLOG(warning, #{msg => "CMD is overidden", cmd => Cmd, mf => MF}), + ?SLOG(warning, #{msg => "CMD_overidden", cmd => Cmd, mf => MF}), true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts}) end, {reply, ok, next_seq(State)}; handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast({unregister_command, Cmd}, State) -> @@ -199,7 +199,7 @@ handle_cast({unregister_command, Cmd}, State) -> noreply(State); handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), noreply(State). handle_info(Info, State) -> diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index cb20a67fd..0b4611c4c 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -106,7 +106,7 @@ init([]) -> {ok, #{}, hibernate}. handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast({detected, #flapping{clientid = ClientId, @@ -116,10 +116,10 @@ handle_cast({detected, #flapping{clientid = ClientId, #{window_time := WindTime, ban_time := Interval}}, State) -> case now_diff(StartedAt) < WindTime of true -> %% Flapping happened:( - ?SLOG(error, #{ + ?SLOG(warning, #{ msg => "flapping_detected", client_id => ClientId, - peer_host => inet:ntoa(PeerHost), + peer_host => fmt_host(PeerHost), detect_cnt => DetectCnt, wind_time_in_ms => WindTime }), @@ -134,7 +134,7 @@ handle_cast({detected, #flapping{clientid = ClientId, ?SLOG(warning, #{ msg => "client_disconnected", client_id => ClientId, - peer_host => inet:ntoa(PeerHost), + peer_host => fmt_host(PeerHost), detect_cnt => DetectCnt, interval => Interval }) @@ -142,7 +142,7 @@ handle_cast({detected, #flapping{clientid = ClientId, {noreply, State}; handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) -> @@ -171,3 +171,8 @@ start_timers() -> lists:foreach(fun({Zone, _ZoneConf}) -> start_timer(Zone) end, maps:to_list(emqx:get_config([zones], #{}))). + +fmt_host(PeerHost) -> + try inet:ntoa(PeerHost) + catch _:_ -> PeerHost + end. diff --git a/apps/emqx/src/emqx_hooks.erl b/apps/emqx/src/emqx_hooks.erl index a914a37c8..7817a9b2d 100644 --- a/apps/emqx/src/emqx_hooks.erl +++ b/apps/emqx/src/emqx_hooks.erl @@ -208,10 +208,11 @@ safe_execute({M, F, A}, Args) -> Error:Reason:Stacktrace -> ?SLOG(error, #{ msg => "failed_to_execute", - module_function_arity => {M, F, A}, - error_reason_stacktrace => {Error, Reason, Stacktrace} - }), - ok + exception => Error, + reason => Reason, + stacktrace => Stacktrace, + failed_call => {M, F, A} + }) end. %% @doc execute a function. diff --git a/apps/emqx/src/emqx_message.erl b/apps/emqx/src/emqx_message.erl index 9ab8e574a..76d68f430 100644 --- a/apps/emqx/src/emqx_message.erl +++ b/apps/emqx/src/emqx_message.erl @@ -66,6 +66,7 @@ -export([ to_packet/2 , to_map/1 + , to_log_map/1 , to_list/1 , from_map/1 ]). @@ -82,8 +83,6 @@ timestamp := integer()} ). --export([format/1]). - -elvis([{elvis_style, god_modules, disable}]). -spec(make(emqx_types:topic(), emqx_types:payload()) -> emqx_types:message()). @@ -306,6 +305,9 @@ to_map(#message{ extra => Extra }. +%% @doc To map for logging, with payload dropped. +to_log_map(Msg) -> maps:without([payload], to_map(Msg)). + %% @doc Message to tuple list -spec(to_list(emqx_types:message()) -> list()). to_list(Msg) -> @@ -336,18 +338,3 @@ from_map(#{id := Id, %% MilliSeconds elapsed(Since) -> max(0, erlang:system_time(millisecond) - Since). - -format(#message{id = Id, - qos = QoS, - topic = Topic, - from = From, - flags = Flags, - headers = Headers}) -> - io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)", - [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]). - -format(flags, Flags) -> - io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); -format(headers, Headers) -> - io_lib:format("~p", [Headers]). - diff --git a/apps/emqx/src/emqx_os_mon.erl b/apps/emqx/src/emqx_os_mon.erl index 5e8e2687c..24795c7ba 100644 --- a/apps/emqx/src/emqx_os_mon.erl +++ b/apps/emqx/src/emqx_os_mon.erl @@ -87,10 +87,7 @@ handle_call(Req, _From, State) -> {reply, {error, {unexpected_call, Req}}, State}. handle_cast(Msg, State) -> - ?SLOG(error, #{ - msg => "unexpected_cast_discarded", - payload => Msg - }), + ?SLOG(error, #{msg => "unexpected_cast", cast=> Msg}), {noreply, State}. handle_info({timeout, _Timer, check}, State) -> @@ -112,10 +109,7 @@ handle_info({timeout, _Timer, check}, State) -> {noreply, State}; handle_info(Info, State) -> - ?SLOG(info, #{ - msg => "unexpected_info_discarded", - info => Info - }), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_plugins.erl b/apps/emqx/src/emqx_plugins.erl index acd91bfea..e334bdb4a 100644 --- a/apps/emqx/src/emqx_plugins.erl +++ b/apps/emqx/src/emqx_plugins.erl @@ -29,8 +29,6 @@ , find_plugin/1 ]). --export([funlog/2]). - -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -50,16 +48,14 @@ load() -> load(PluginName) when is_atom(PluginName) -> case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of {false, _} -> - ?SLOG(alert, #{ - msg => "plugin_not_found_cannot_load", - plugin_name => PluginName - }), + ?SLOG(alert, #{msg => "failed_to_load_plugin", + plugin_name => PluginName, + reason => not_found}), {error, not_found}; {_, true} -> - ?SLOG(notice, #{ - msg => "plugin_already_started", - plugin_name => PluginName - }), + ?SLOG(notice, #{msg => "plugin_already_loaded", + plugin_name => PluginName, + reason => already_loaded}), {error, already_started}; {_, false} -> load_plugin(PluginName) @@ -75,16 +71,14 @@ unload() -> unload(PluginName) when is_atom(PluginName) -> case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of {false, _} -> - ?SLOG(error, #{ - msg => "plugin_not_found_cannot_load", - plugin_name => PluginName - }), + ?SLOG(error, #{msg => "fialed_to_unload_plugin", + plugin_name => PluginName, + reason => not_found}), {error, not_found}; {_, false} -> - ?SLOG(error, #{ - msg => "plugin_not_started", - plugin_name => PluginName - }), + ?SLOG(error, #{msg => "failed_to_unload_plugin", + plugin_name => PluginName, + reason => not_loaded}), {error, not_started}; {_, _} -> unload_plugin(PluginName) @@ -93,10 +87,9 @@ unload(PluginName) when is_atom(PluginName) -> reload(PluginName) when is_atom(PluginName)-> case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of {false, _} -> - ?SLOG(error, #{ - msg => "plugin_not_found_cannot_load", - plugin_name => PluginName - }), + ?SLOG(error, #{msg => "failed_to_reload_plugin", + plugin_name => PluginName, + reason => not_found}), {error, not_found}; {_, false} -> load(PluginName); @@ -142,20 +135,14 @@ load_ext_plugins(Dir) -> end, filelib:wildcard("*", Dir)). load_ext_plugin(PluginDir) -> - ?SLOG(debug, #{ - msg => "loading_extra_plugin", - plugin_dir => PluginDir - }), + ?SLOG(debug, #{msg => "loading_extra_plugin", plugin_dir => PluginDir}), Ebin = filename:join([PluginDir, "ebin"]), AppFile = filename:join([Ebin, "*.app"]), AppName = case filelib:wildcard(AppFile) of [App] -> list_to_atom(filename:basename(App, ".app")); [] -> - ?SLOG(alert, #{ - msg => "plugin_app_file_not_found", - app_file => AppFile - }), + ?SLOG(alert, #{msg => "plugin_app_file_not_found", app_file => AppFile}), error({plugin_app_file_not_found, AppFile}) end, ok = load_plugin_app(AppName, Ebin). @@ -205,12 +192,13 @@ load_plugin(Name) -> {error, Error0} -> {error, Error0} end - catch _ : Error : Stacktrace -> + catch Error : Reason : Stacktrace -> ?SLOG(alert, #{ msg => "plugin_load_failed", name => Name, - error => Error, - stk => Stacktrace + exception => Error, + reason => Reason, + stacktrace => Stacktrace }), {error, parse_config_file_failed} end. @@ -228,23 +216,19 @@ load_app(App) -> start_app(App) -> case application:ensure_all_started(App) of {ok, Started} -> - ?SLOG(info, #{ - msg => "all_started_plugins", - started => Started - }), - ?SLOG(info, #{ - msg => "load_plugin_app_successfully", - app => App - }), + case Started =/= [] of + true -> ?SLOG(info, #{msg => "started_plugin_dependency_apps", apps => Started}); + false -> ok + end, + ?SLOG(info, #{msg => "started_plugin_app", app => App}), ok; {error, {ErrApp, Reason}} -> - ?SLOG(error, #{ - msg => "load_plugin_failed_cannot_started", - app => App, - err_app => ErrApp, - reason => Reason - }), - {error, {ErrApp, Reason}} + ?SLOG(error, #{msg => failed_to_start_plugin_app, + app => App, + err_app => ErrApp, + reason => Reason + }), + {error, failed_to_start_plugin_app} end. unload_plugin(App) -> @@ -258,23 +242,16 @@ unload_plugin(App) -> stop_app(App) -> case application:stop(App) of ok -> - ?SLOG(info, #{ - msg => "stop_plugin_successfully", - app => App - }), + ?SLOG(info, #{msg => "stop_plugin_successfully", app => App}), ok; {error, {not_started, App}} -> - ?SLOG(error, #{ - msg => "plugin_not_started", - app => App - }), + ?SLOG(info, #{msg => "plugin_not_started", app => App}), ok; {error, Reason} -> - ?SLOG(error, #{ - msg => "stop_plugin", - app => App, - error => Reason - }), + ?SLOG(error, #{msg => "failed_to_stop_plugin_app", + app => App, + error => Reason + }), {error, Reason} end. @@ -286,9 +263,3 @@ names(started_app) -> names(Plugins) -> [Name || #plugin{name = Name} <- Plugins]. - -funlog(Key, Value) -> - ?SLOG(info, #{ - key => string:join(Key, "."), - value => Value - }). diff --git a/apps/emqx/src/emqx_pool.erl b/apps/emqx/src/emqx_pool.erl index a4720fc5b..8b9508768 100644 --- a/apps/emqx/src/emqx_pool.erl +++ b/apps/emqx/src/emqx_pool.erl @@ -100,22 +100,22 @@ handle_call({submit, Task}, _From, State) -> {reply, catch run(Task), State}; handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast({async_submit, Task}, State) -> try run(Task) - catch _:Error:Stacktrace -> - ?SLOG(error, #{ - msg => "error", - error => Error, - stk => Stacktrace - }) + catch Error:Reason:Stacktrace -> + ?SLOG(error, #{msg => "async_submit_error", + exception => Error, + reason => Reason, + stacktrace => Stacktrace + }) end, {noreply, State}; handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info(Info, State) -> diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 0ef6e699e..afc1c3f87 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -203,11 +203,11 @@ handle_call({delete_route, Topic, Dest}, _From, State) -> {reply, Ok, State}; handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info(Info, State) -> diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 1b6c7c042..a88e82d8d 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -109,11 +109,11 @@ init([]) -> {ok, #{nodes => Nodes}, hibernate}. handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, @@ -130,10 +130,7 @@ handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) -> {noreply, State}; handle_info({mnesia_table_event, Event}, State) -> - ?SLOG(error,#{ - msg => "unexpected_mnesia_table_event", - event => Event - }), + ?SLOG(error,#{msg => "unexpected_mnesia_table_event", event => Event}), {noreply, State}; handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index eac267fef..408435006 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -479,16 +479,12 @@ log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) -> case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of true -> ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), - ?SLOG(warning, #{ - msg => "dropped_qos0_msg", - payload => emqx_message:format(Msg) - }); + ?SLOG(warning, #{msg => "dropped_qos0_msg", + payload => emqx_message:to_log_map(Msg)}); false -> ok = emqx_metrics:inc('delivery.dropped.queue_full'), - ?SLOG(warning, #{ - msg => "dropped_msg_due_to_mqueue_is_full", - payload => emqx_message:format(Msg) - }) + ?SLOG(warning, #{msg => "dropped_msg_due_to_mqueue_is_full", + payload => emqx_message:to_log_map(Msg)}) end. enrich_fun(Session = #session{subscriptions = Subs}) -> diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 3109cf118..ef8e3d288 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -347,11 +347,8 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P handle_info({mnesia_table_event, _Event}, State) -> {noreply, State}; -handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> - ?SLOG(info, #{ - msg => "shared_subscriber_down", - sub_pid => SubPid - }), +handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{pmon = PMon}) -> + ?SLOG(info, #{msg => "shared_subscriber_down", sub_pid => SubPid, reason => Reason}), cleanup_down(SubPid), {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})}; diff --git a/apps/emqx/src/emqx_stats.erl b/apps/emqx/src/emqx_stats.erl index 0c84a754f..0d2b1a1fd 100644 --- a/apps/emqx/src/emqx_stats.erl +++ b/apps/emqx/src/emqx_stats.erl @@ -202,7 +202,7 @@ handle_call(stop, _From, State) -> {stop, normal, ok, State}; handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast({setstat, Stat, MaxStat, Val}, State) -> @@ -221,10 +221,9 @@ handle_cast({update_interval, Update = #update{name = Name}}, State = #state{updates = Updates}) -> NState = case lists:keyfind(Name, #update.name, Updates) of #update{} -> - ?SLOG(warning, #{ - msg => "duplicated_update", - name => Name - }), + ?SLOG(warning, #{msg => "duplicated_update", + name => Name + }), State; false -> State#state{updates = [Update|Updates]} end, @@ -235,7 +234,7 @@ handle_cast({cancel_update, Name}, State = #state{updates = Updates}) -> {noreply, State#state{updates = Updates1}}; handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) -> @@ -244,12 +243,13 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update func = UpFun}, Acc) when C =< 0 -> try UpFun() catch - _:Error -> - ?SLOG(error, #{ - msg => "update_name_failed", - name => Name, - error => Error - }) + Error : Reason : Stacktrace -> + ?SLOG(error, #{msg => "update_name_failed", + name => Name, + exception => Error, + reason => Reason, + stacktrace => Stacktrace + }) end, [Update#update{countdown = I} | Acc]; (Update = #update{countdown = C}, Acc) -> @@ -284,4 +284,3 @@ safe_update_element(Key, Val) -> val => Val }) end. - diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 84c3fa2e0..692d2bd0a 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -134,11 +134,11 @@ handle_call(uptime, _From, State) -> {reply, uptime(State), State}; handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) -> diff --git a/apps/emqx/src/emqx_sys_mon.erl b/apps/emqx/src/emqx_sys_mon.erl index dce3e39b1..3d47038c6 100644 --- a/apps/emqx/src/emqx_sys_mon.erl +++ b/apps/emqx/src/emqx_sys_mon.erl @@ -93,44 +93,41 @@ handle_cast(Msg, State) -> handle_info({monitor, Pid, long_gc, Info}, State) -> suppress({long_gc, Pid}, fun() -> - WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]), - ?SLOG(warning, #{ - warn_msg => WarnMsg, - pid_info => procinfo(Pid) - }), + WarnMsg = io_lib:format("long_gc warning: pid = ~p", [Pid]), + ?SLOG(warning, #{msg => long_gc, + info => Info, + porcinfo => procinfo(Pid) + }), safe_publish(long_gc, WarnMsg) end, State); handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) -> suppress({long_schedule, Pid}, fun() -> - WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]), - ?SLOG(warning, #{ - warn_msg => WarnMsg, - pid_info => procinfo(Pid) - }), + WarnMsg = io_lib:format("long_schedule warning: pid = ~p", [Pid]), + ?SLOG(warning, #{msg => long_schedule, + info => Info, + procinfo => procinfo(Pid)}), safe_publish(long_schedule, WarnMsg) end, State); handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) -> suppress({long_schedule, Port}, fun() -> - WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]), - ?SLOG(warning, #{ - warn_msg => WarnMsg, - port_info => erlang:port_info(Port) - }), + WarnMsg = io_lib:format("long_schedule warning: port = ~p", [Port]), + ?SLOG(warning, #{msg => long_schedule, + info => Info, + portinfo => portinfo(Port)}), safe_publish(long_schedule, WarnMsg) end, State); handle_info({monitor, Pid, large_heap, Info}, State) -> suppress({large_heap, Pid}, fun() -> - WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]), - ?SLOG(warning, #{ - warn_msg => WarnMsg, - pid_info => procinfo(Pid) - }), + WarnMsg = io_lib:format("large_heap warning: pid = ~p", [Pid]), + ?SLOG(warning, #{msg => large_heap, + info => Info, + procinfo => procinfo(Pid)}), safe_publish(large_heap, WarnMsg) end, State); @@ -138,11 +135,10 @@ handle_info({monitor, SusPid, busy_port, Port}, State) -> suppress({busy_port, Port}, fun() -> WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - ?SLOG(warning, #{ - warn_msg => WarnMsg, - pid_info => procinfo(SusPid), - port_info => erlang:port_info(Port) - }), + ?SLOG(warning, #{msg => busy_port, + portinfo => portinfo(Port), + procinfo => procinfo(SusPid) + }), safe_publish(busy_port, WarnMsg) end, State); @@ -150,11 +146,9 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) -> suppress({busy_dist_port, Port}, fun() -> WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - ?SLOG(warning, #{ - warn_msg => WarnMsg, - pid_info => procinfo(SusPid), - port_info => erlang:port_info(Port) - }), + ?SLOG(warning, #{msg => busy_dist_port, + portinfo => portinfo(Port), + procinfo => procinfo(SusPid)}), safe_publish(busy_dist_port, WarnMsg) end, State); @@ -190,11 +184,14 @@ suppress(Key, SuccFun, State = #{events := Events}) -> end. procinfo(Pid) -> - case {emqx_vm:get_process_info(Pid), emqx_vm:get_process_gc_info(Pid)} of - {undefined, _} -> undefined; - {_, undefined} -> undefined; - {Info, GcInfo} -> Info ++ GcInfo - end. + [{pid, Pid} | procinfo_l(emqx_vm:get_process_gc_info(Pid))] ++ + procinfo_l(emqx_vm:get_process_info(Pid)). + +procinfo_l(undefined) -> []; +procinfo_l(List) -> List. + +portinfo(Port) -> + [{port, Port} | erlang:port_info(Port)]. safe_publish(Event, WarnMsg) -> Topic = emqx_topic:systop(lists:concat(['sysmon/', Event])), diff --git a/apps/emqx/src/emqx_tracer.erl b/apps/emqx/src/emqx_tracer.erl index 0ee9de324..ab354ae21 100644 --- a/apps/emqx/src/emqx_tracer.erl +++ b/apps/emqx/src/emqx_tracer.erl @@ -115,25 +115,18 @@ install_trace_handler(Who, Level, LogFile) -> {fun filter_by_meta_key/2, Who}}]}) of ok -> - ?SLOG(info, #{msg => "start_trace_for", who => Who}); + ?SLOG(info, #{msg => "start_trace", who => Who}); {error, Reason} -> - ?SLOG(error, #{msg => "start_trace_for_who_failed", who => Who, reason => Reason}), + ?SLOG(error, #{msg => "failed_to_trace", who => Who, reason => Reason}), {error, Reason} end. uninstall_trance_handler(Who) -> case logger:remove_handler(handler_id(Who)) of ok -> - ?SLOG(info, #{ - msg => "stop_trace_for", - who => Who - }); + ?SLOG(info, #{msg => "stop_trace", who => Who}); {error, Reason} -> - ?SLOG(error, #{ - msg => "stop_trace_for", - who => Who, - reason => Reason - }), + ?SLOG(error, #{msg => "failed_to_stop_trace", who => Who, reason => Reason}), {error, Reason} end. diff --git a/apps/emqx/src/emqx_vm_mon.erl b/apps/emqx/src/emqx_vm_mon.erl index 5acb16d32..703aca52f 100644 --- a/apps/emqx/src/emqx_vm_mon.erl +++ b/apps/emqx/src/emqx_vm_mon.erl @@ -49,17 +49,11 @@ init([]) -> {ok, #{}}. handle_call(Req, _From, State) -> - ?SLOG(error, #{ - msg => "[VM_MON]_unexpected_call", - req => Req - }), + ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?SLOG(error, #{ - msg => "[VM_MON]_unexpected_cast", - cast => Msg - }), + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info({timeout, _Timer, check}, State) -> @@ -81,10 +75,7 @@ handle_info({timeout, _Timer, check}, State) -> {noreply, State}; handle_info(Info, State) -> - ?SLOG(error, #{ - msg => "[VM_MON]_unexpected_info", - info => Info - }), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index c6a427942..9ac8a03d0 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -181,13 +181,11 @@ init(Req, #{listener := {Type, Listener}} = Opts) -> idle_timeout => get_ws_opts(Type, Listener, idle_timeout) }, case check_origin_header(Req, Opts) of - {error, Message} -> - ?SLOG(error, #{ - msg => "invalid_origin_header", - payload => Message - }), + {error, Reason} -> + ?SLOG(error, #{msg => "invalid_origin_header", reason => Reason}), {ok, cowboy_req:reply(403, Req), WsOpts}; - ok -> parse_sec_websocket_protocol(Req, Opts, WsOpts) + ok -> + parse_sec_websocket_protocol(Req, Opts, WsOpts) end. parse_sec_websocket_protocol(Req, #{listener := {Type, Listener}} = Opts, WsOpts) -> @@ -234,7 +232,7 @@ parse_header_fun_origin(Req, #{listener := {Type, Listener}}) -> Value -> case lists:member(Value, get_ws_opts(Type, Listener, check_origins)) of true -> ok; - false -> {origin_not_allowed, Value} + false -> {error, #{bad_origin => Value}} end end. @@ -266,12 +264,12 @@ websocket_init([Req, #{zone := Zone, listener := {Type, Listener}} = Opts]) -> WsCookie = try cowboy_req:parse_cookies(Req) catch error:badarg -> - ?SLOG(error, #{msg => "illegal_cookie"}), + ?SLOG(error, #{msg => "bad_cookie"}), undefined; Error:Reason -> ?SLOG(error, #{msg => "failed_to_parse_cookie", - error => Error, - reason => Reason}), + exception => Error, + reason => Reason}), undefined end, ConnInfo = #{socktype => ws, @@ -328,7 +326,7 @@ websocket_handle({binary, Data}, State) when is_list(Data) -> websocket_handle({binary, iolist_to_binary(Data)}, State); websocket_handle({binary, Data}, State) -> - ?SLOG(debug, #{msg => "recv_data", data => Data}), + ?SLOG(debug, #{msg => "RECV_data", data => Data, transport => websocket}), ok = inc_recv_stats(1, iolist_size(Data)), NState = ensure_stats_timer(State), return(parse_incoming(Data, NState)); @@ -450,7 +448,7 @@ handle_info({connack, ConnAck}, State) -> return(enqueue(ConnAck, State)); handle_info({close, Reason}, State) -> - ?SLOG(debug, #{msg => "force_to_close_the_socket", reason => Reason}), + ?SLOG(debug, #{msg => "force_socket_close", reason => Reason}), return(enqueue({close, Reason}, State)); handle_info({event, connected}, State = #state{channel = Channel}) -> @@ -632,10 +630,9 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback, serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> try emqx_frame:serialize_pkt(Packet, Serialize) of - <<>> -> ?SLOG(warning, #{ - msg => "packet_is_discarded_due_to_the_frame_is_too_large", - packet => emqx_packet:format(Packet) - }), + <<>> -> ?SLOG(warning, #{msg => "packet_discarded", + reason => "frame_too_large", + packet => emqx_packet:format(Packet)}), ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), <<>>; diff --git a/apps/emqx/test/emqx_message_SUITE.erl b/apps/emqx/test/emqx_message_SUITE.erl index 7119e8c9c..7c8435a8d 100644 --- a/apps/emqx/test/emqx_message_SUITE.erl +++ b/apps/emqx/test/emqx_message_SUITE.erl @@ -141,13 +141,6 @@ t_undefined_headers(_) -> Msg2 = emqx_message:set_header(c, 3, Msg), ?assertEqual(3, emqx_message:get_header(c, Msg2)). -t_format(_) -> - Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), - io:format("~s~n", [emqx_message:format(Msg)]), - Msg1 = emqx_message:set_header(properties, #{'Subscription-Identifier' => 1}, - emqx_message:set_flag(dup, Msg)), - io:format("~s~n", [emqx_message:format(Msg1)]). - t_is_expired(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), ?assertNot(emqx_message:is_expired(Msg)), diff --git a/apps/emqx/test/emqx_sys_mon_SUITE.erl b/apps/emqx/test/emqx_sys_mon_SUITE.erl index 70f518ad5..53770f7e2 100644 --- a/apps/emqx/test/emqx_sys_mon_SUITE.erl +++ b/apps/emqx/test/emqx_sys_mon_SUITE.erl @@ -24,21 +24,22 @@ -define(SYSMON, emqx_sys_mon). +-define(FAKE_PORT, hd(erlang:ports())). +-define(FAKE_INFO, [{timeout, 100}, {in, foo}, {out, {?MODULE, bar, 1}}]). -define(INPUTINFO, [{self(), long_gc, - concat_str("long_gc warning: pid = ~p, info: ~p", self(), "hello"), "hello"}, + fmt("long_gc warning: pid = ~p", [self()]), ?FAKE_INFO}, {self(), long_schedule, - concat_str("long_schedule warning: pid = ~p, info: ~p", self(), "hello"), "hello"}, + fmt("long_schedule warning: pid = ~p", [self()]), ?FAKE_INFO}, {self(), large_heap, - concat_str("large_heap warning: pid = ~p, info: ~p", self(), "hello"), "hello"}, + fmt("large_heap warning: pid = ~p", [self()]), ?FAKE_INFO}, {self(), busy_port, - concat_str("busy_port warning: suspid = ~p, port = ~p", - self(), list_to_port("#Port<0.4>")), list_to_port("#Port<0.4>")}, + fmt("busy_port warning: suspid = ~p, port = ~p", + [self(), ?FAKE_PORT]), ?FAKE_PORT}, {self(), busy_dist_port, - concat_str("busy_dist_port warning: suspid = ~p, port = ~p", - self(), list_to_port("#Port<0.4>")),list_to_port("#Port<0.4>")}, - {list_to_port("#Port<0.4>"), long_schedule, - concat_str("long_schedule warning: port = ~p, info: ~p", - list_to_port("#Port<0.4>"), "hello"), "hello"} + fmt("busy_dist_port warning: suspid = ~p, port = ~p", + [self(), ?FAKE_PORT]), ?FAKE_PORT}, + {?FAKE_PORT, long_schedule, + fmt("long_schedule warning: port = ~p", [?FAKE_PORT]), ?FAKE_INFO} ]). all() -> emqx_ct:all(?MODULE). @@ -82,16 +83,16 @@ t_procinfo(_) -> ok = meck:new(emqx_vm, [passthrough, no_history]), ok = meck:expect(emqx_vm, get_process_info, fun(_) -> [] end), ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> [] end), - ?assertEqual([], emqx_sys_mon:procinfo([])), - ok = meck:expect(emqx_vm, get_process_info, fun(_) -> ok end), + ?assertEqual([{pid, undefined}], emqx_sys_mon:procinfo(undefined)), + ok = meck:expect(emqx_vm, get_process_info, fun(_) -> [] end), ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> undefined end), - ?assertEqual(undefined, emqx_sys_mon:procinfo([])), + ?assertEqual([{pid, self()}], emqx_sys_mon:procinfo(self())), ok = meck:unload(emqx_vm). t_sys_mon(_Config) -> lists:foreach( - fun({PidOrPort, SysMonName,ValidateInfo, InfoOrPort}) -> - validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) + fun({PidOrPort, SysMonName, ValidateInfo, InfoOrPort}) -> + validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort) end, ?INPUTINFO). t_sys_mon2(_Config) -> @@ -101,7 +102,7 @@ t_sys_mon2(_Config) -> ?assertEqual(ok, gen_server:cast(?SYSMON, ignored)), gen_server:stop(?SYSMON). -validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) -> +validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort) -> {ok, C} = emqtt:start_link([{host, "localhost"}]), {ok, _} = emqtt:connect(C), emqtt:subscribe(C, emqx_topic:systop(lists:concat(['sysmon/', SysMonName])), qos1), @@ -117,6 +118,4 @@ validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) -> end, emqtt:stop(C). -concat_str(ValidateInfo, InfoOrPort, Info) -> - WarnInfo = io_lib:format(ValidateInfo, [InfoOrPort, Info]), - lists:flatten(WarnInfo). +fmt(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)). diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index 767a7994e..b7484ba90 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -229,7 +229,7 @@ t_ws_check_origin(_) -> ?assertMatch({gun_upgrade, _}, start_ws_client(#{protocols => [<<"mqtt">>], headers => [{<<"origin">>, <<"http://localhost:18083">>}]})), - ?assertMatch({gun_response, {_, 500, _}}, + ?assertMatch({gun_response, {_, 403, _}}, start_ws_client(#{protocols => [<<"mqtt">>], headers => [{<<"origin">>, <<"http://localhost:18080">>}]})).