diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 6bd2d5d49..296b30db1 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -239,11 +239,17 @@ handle_call({get_alarms, deactivated}, _From, State) -> {reply, Alarms, State}; handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{ + msg => "unexpected_call", + call => Req + }), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "Unexpected msg: ~p", [Msg]), + ?SLOG(error, #{ + msg => "unexpected_msg", + payload => Msg + }), {noreply, State}. handle_info({timeout, _TRef, delete_expired_deactivated_alarm}, @@ -253,11 +259,14 @@ handle_info({timeout, _TRef, delete_expired_deactivated_alarm}, {noreply, State#state{timer = ensure_timer(TRef, Period)}}; handle_info({update_timer, Period}, #state{timer = TRef} = State) -> - ?LOG(warning, "update the 'validity_period' timer to ~p", [Period]), + ?SLOG(warning, #{ + msg => "update_the_validity_period_timer", + period => Period + }), {noreply, State#state{timer = ensure_timer(TRef, Period)}}; handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, _State) -> @@ -323,8 +332,11 @@ deactivate_all_alarms() -> clear_table(TableName) -> case ekka_mnesia:clear_table(TableName) of {aborted, Reason} -> - ?LOG(warning, "Faile to clear table ~p reason: ~p", - [TableName, Reason]); + ?SLOG(warning, #{ + msg => "fail_to_clear_table", + table_name => TableName, + reason => Reason + }); {atomic, ok} -> ok end. @@ -354,10 +366,17 @@ delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) -> do_actions(_, _, []) -> ok; do_actions(activate, Alarm = #activated_alarm{name = Name, message = Message}, [log | More]) -> - ?LOG(warning, "Alarm ~s is activated, ~s", [Name, Message]), + ?SLOG(warning, #{ + msg => "alarm_is_activated", + name => Name, + message => Message + }), do_actions(activate, Alarm, More); do_actions(deactivate, Alarm = #deactivated_alarm{name = Name}, [log | More]) -> - ?LOG(warning, "Alarm ~s is deactivated", [Name]), + ?SLOG(warning, #{ + msg => "alarm_is_deactivated", + name => Name + }), do_actions(deactivate, Alarm, More); do_actions(Operation, Alarm, [publish | More]) -> Topic = topic(Operation), diff --git a/apps/emqx/src/emqx_authentication.erl b/apps/emqx/src/emqx_authentication.erl index 3b408723c..46387c240 100644 --- a/apps/emqx/src/emqx_authentication.erl +++ b/apps/emqx/src/emqx_authentication.erl @@ -349,9 +349,11 @@ initialize_authentication(ChainName, AuthenticatorsConfig) -> {ok, _} -> ok; {error, Reason} -> - ?SLOG(error, #{msg => "failed to create authenticator", - reason => Reason, - authenticator => generate_id(AuthenticatorConfig)}) + ?SLOG(error, #{ + msg => "failed_to_create_authenticator", + authenticator => generate_id(AuthenticatorConfig), + reason => Reason + }) end end, CheckedConfig). @@ -650,15 +652,15 @@ handle_call({list_users, ChainName, AuthenticatorID}, _From, State) -> reply(Reply, State); handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected call", req => Req}), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast(Req, State) -> - ?SLOG(error, #{msg => "unexpected cast", req => Req}), + ?SLOG(error, #{msg => "unexpected_cast", req => Req}), {noreply, State}. handle_info(Info, State) -> - ?SLOG(error, #{msg => "unexpected info", info => Info}), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 608734363..2bab96843 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -187,11 +187,14 @@ init([]) -> {ok, ensure_expiry_timer(#{expiry_timer => undefined})}. handle_call(Req, _From, State) -> - ?LOG(error, "unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "unexpected msg: ~p", [Msg]), + ?SLOG(error, #{ + msg => "unexpected_msg", + payload => Msg + }), {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> @@ -199,7 +202,7 @@ handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> - ?LOG(error, "unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, #{expiry_timer := TRef}) -> diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 56ac348da..2c0380f62 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -202,7 +202,10 @@ publish(Msg) when is_record(Msg, message) -> emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of #message{headers = #{allow_publish := false}} -> - ?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg)]), + ?SLOG(notice, #{ + msg => "stop_publishing", + payload => emqx_message:format(Msg) + }), []; Msg1 = #message{topic = Topic} -> route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) @@ -215,8 +218,12 @@ safe_publish(Msg) when is_record(Msg, message) -> publish(Msg) catch _:Error:Stk-> - ?LOG(error, "Publish error: ~0p~n~s~n~0p", - [Error, emqx_message:format(Msg), Stk]), + ?SLOG(error,#{ + msg => "publishing_error", + error => Error, + payload => Msg, + stacktrace => Stk + }), [] end. @@ -266,14 +273,22 @@ forward(Node, To, Delivery, async) -> case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of true -> emqx_metrics:inc('messages.forward'); {badrpc, Reason} -> - ?LOG(error, "Ansync forward msg to ~s failed due to ~p", [Node, Reason]), + ?SLOG(error, #{ + msg => "async_forward_msg_to_node_failed", + node => Node, + reason => Reason + }), {error, badrpc} end; forward(Node, To, Delivery, sync) -> case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of {badrpc, Reason} -> - ?LOG(error, "Sync forward msg to ~s failed due to ~p", [Node, Reason]), + ?SLOG(error, #{ + msg => "sync_forward_msg_to_node_failed", + node => Node, + reason => Reason + }), {error, badrpc}; Result -> emqx_metrics:inc('messages.forward'), Result @@ -450,14 +465,17 @@ handle_call({subscribe, Topic, I}, _From, State) -> {reply, Ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast({subscribe, Topic}, State) -> case emqx_router:do_add_route(Topic) of ok -> ok; {error, Reason} -> - ?LOG(error, "Failed to add route: ~p", [Reason]) + ?SLOG(error, #{ + msg => "failed_to_add_route", + reason => Reason + }) end, {noreply, State}; @@ -481,11 +499,11 @@ handle_cast({unsubscribed, Topic, I}, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/apps/emqx/src/emqx_broker_helper.erl b/apps/emqx/src/emqx_broker_helper.erl index fdd1a55d9..804bbaebd 100644 --- a/apps/emqx/src/emqx_broker_helper.erl +++ b/apps/emqx/src/emqx_broker_helper.erl @@ -118,7 +118,7 @@ init([]) -> {ok, #{pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> @@ -127,7 +127,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> {noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}}; handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) -> @@ -138,7 +138,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) {noreply, State#{pmon := PMon1}}; handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 0b1ff7e25..93ee21416 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -373,11 +373,17 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel ok = after_message_acked(ClientInfo, Msg, Properties), handle_out(publish, Publishes, Channel#channel{session = NSession}); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> - ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), + ?SLOG(warning, #{ + msg => "puback_packetId_inuse", + packetId => PacketId + }), ok = emqx_metrics:inc('packets.puback.inuse'), {ok, Channel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> - ?LOG(warning, "The PUBACK PacketId ~w is not found.", [PacketId]), + ?SLOG(warning, #{ + msg => "puback_packetId_not_found", + packetId => PacketId + }), ok = emqx_metrics:inc('packets.puback.missed'), {ok, Channel} end; @@ -390,11 +396,11 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel NChannel = Channel#channel{session = NSession}, handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> - ?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]), + ?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}), ok = emqx_metrics:inc('packets.pubrec.inuse'), handle_out(pubrel, {PacketId, RC}, Channel); {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> - ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]), + ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}), ok = emqx_metrics:inc('packets.pubrec.missed'), handle_out(pubrel, {PacketId, RC}, Channel) end; @@ -405,7 +411,7 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se NChannel = Channel#channel{session = NSession}, handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> - ?LOG(warning, "The PUBREL PacketId ~w is not found.", [PacketId]), + ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}), ok = emqx_metrics:inc('packets.pubrel.missed'), handle_out(pubcomp, {PacketId, RC}, Channel) end; @@ -420,7 +426,7 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S ok = emqx_metrics:inc('packets.pubcomp.inuse'), {ok, Channel}; {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> - ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]), + ?SLOG(warning, #{msg => "pubcomp_packetId_not_found", packetId => PacketId}), ok = emqx_metrics:inc('packets.pubcomp.missed'), {ok, Channel} end; @@ -501,11 +507,17 @@ handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel); handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?LOG(error, "Unexpected frame error: ~p", [Reason]), + ?SLOG(error, #{ + msg => "malformed_mqtt_message", + reason => Reason + }), {ok, Channel}; handle_in(Packet, Channel) -> - ?LOG(error, "Unexpected incoming: ~p", [Packet]), + ?SLOG(error, #{ + msg => "disconnecting_due_to_unexpected_message", + packet => Packet + }), handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel). %%-------------------------------------------------------------------- @@ -529,7 +541,10 @@ process_connect(AckProps, Channel = #channel{conninfo = ConnInfo, {error, client_id_unavailable} -> handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel); {error, Reason} -> - ?LOG(error, "Failed to open session due to ~p", [Reason]), + ?SLOG(error, #{ + msg => "failed_to_open_session", + reason => Reason + }), handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel) end. @@ -548,8 +563,11 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> Msg = packet_to_message(NPacket, NChannel), do_publish(PacketId, Msg, NChannel); {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} -> - ?LOG(warning, "Cannot publish message to ~s due to ~s.", - [Topic, emqx_reason_codes:text(Rc)]), + ?SLOG(warning, #{ + msg => "cannot_publish_to_topic", + topic => Topic, + reason => emqx_reason_codes:name(Rc) + }), case emqx:get_config([authorization, deny_action], ignore) of ignore -> case QoS of @@ -563,8 +581,11 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> handle_out(disconnect, Rc, NChannel) end; {error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} -> - ?LOG(warning, "Cannot publish messages to ~s due to ~s.", - [Topic, emqx_reason_codes:text(Rc)]), + ?SLOG(warning, #{ + msg => "cannot_publish_to_topic", + topic => Topic, + reason => emqx_reason_codes:name(Rc) + }), case QoS of ?QOS_0 -> ok = emqx_metrics:inc('packets.publish.dropped'), @@ -575,8 +596,11 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> handle_out(pubrec, {PacketId, Rc}, NChannel) end; {error, Rc, NChannel} -> - ?LOG(warning, "Cannot publish message to ~s due to ~s.", - [Topic, emqx_reason_codes:text(Rc)]), + ?SLOG(warning, #{ + msg => "cannot_publish_to_topic", + topic => Topic, + reason => emqx_reason_codes:name(Rc) + }), handle_out(disconnect, Rc, NChannel) end. @@ -621,8 +645,11 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, ok = emqx_metrics:inc('packets.publish.inuse'), handle_out(pubrec, {PacketId, RC}, Channel); {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> - ?LOG(warning, "Dropped the qos2 packet ~w " - "due to awaiting_rel is full.", [PacketId]), + ?SLOG(warning, #{ + msg => "dropped_qos2_packet", + reason => emqx_reason_codes:name(RC), + packetId => PacketId + }), ok = emqx_metrics:inc('packets.publish.dropped'), handle_out(pubrec, {PacketId, RC}, Channel) end. @@ -671,8 +698,10 @@ process_subscribe([Topic = {TopicFilter, SubOpts}|More], SubProps, Channel, Acc) Channel), process_subscribe(More, SubProps, NChannel, [{Topic, ReasonCode} | Acc]); {error, ReasonCode} -> - ?LOG(warning, "Cannot subscribe ~s due to ~s.", - [TopicFilter, emqx_reason_codes:text(ReasonCode)]), + ?SLOG(warning, #{ + msg => "cannot_subscribe_topic_filter", + reason => emqx_reason_codes:name(ReasonCode) + }), process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc]) end. @@ -685,8 +714,10 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = {ok, NSession} -> {QoS, Channel#channel{session = NSession}}; {error, RC} -> - ?LOG(warning, "Cannot subscribe ~s due to ~s.", - [TopicFilter, emqx_reason_codes:text(RC)]), + ?SLOG(warning, #{ + msg => "cannot_subscribe_topic_filter", + reason => emqx_reason_codes:text(RC) + }), {RC, Channel} end. @@ -869,7 +900,7 @@ handle_out(auth, {ReasonCode, Properties}, Channel) -> {ok, ?AUTH_PACKET(ReasonCode, Properties), Channel}; handle_out(Type, Data, Channel) -> - ?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]), + ?SLOG(error, #{msg => "unexpected_outgoing", type => Type, data => Data}), {ok, Channel}. %%-------------------------------------------------------------------- @@ -964,7 +995,7 @@ handle_call({quota, Policy}, Channel) -> reply(ok, Channel#channel{quota = Quota}); handle_call(Req, Channel) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), reply(ignored, Channel). %%-------------------------------------------------------------------- @@ -1004,7 +1035,10 @@ handle_info({sock_closed, Reason}, Channel = end; handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?LOG(error, "Unexpected sock_closed: ~p", [Reason]), + ?SLOG(error, #{ + msg => "unexpected_sock_closed", + reason => Reason + }), {ok, Channel}; handle_info(clean_authz_cache, Channel) -> @@ -1012,7 +1046,7 @@ handle_info(clean_authz_cache, Channel) -> {ok, Channel}; handle_info(Info, Channel) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {ok, Channel}. %%-------------------------------------------------------------------- @@ -1075,7 +1109,10 @@ handle_timeout(_TRef, expire_quota_limit, Channel) -> {ok, clean_timer(quota_timer, Channel)}; handle_timeout(_TRef, Msg, Channel) -> - ?LOG(error, "Unexpected timeout: ~p~n", [Msg]), + ?SLOG(error, #{ + msg => "unexpected_timeout", + payload => Msg + }), {ok, Channel}. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index f4f5f3981..eaef5b8a7 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -276,7 +276,10 @@ takeover_session(ClientId) -> takeover_session(ClientId, ChanPid); ChanPids -> [ChanPid|StalePids] = lists:reverse(ChanPids), - ?LOG(error, "More than one channel found: ~p", [ChanPids]), + ?SLOG(error, #{ + msg => "more_than_one_channel_found", + chan_pids => ChanPids + }), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) end, StalePids), @@ -341,7 +344,10 @@ kick_session(ClientId) -> kick_session(ClientId, ChanPid); ChanPids -> [ChanPid|StalePids] = lists:reverse(ChanPids), - ?LOG(error, "More than one channel found: ~p", [ChanPids]), + ?SLOG(error, #{ + msg => "more_than_one_channel_found", + chan_pids => ChanPids + }), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) end, StalePids), @@ -416,7 +422,7 @@ init([]) -> {ok, #{chan_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> @@ -424,7 +430,7 @@ handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> {noreply, State#{chan_pmon := PMon1}}; handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> @@ -434,7 +440,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon} {noreply, State#{chan_pmon := PMon1}}; handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), + {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 6fc34dee8..37171f347 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -114,11 +114,11 @@ init([]) -> {ok, #{}}. handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info({membership, {mnesia, down, Node}}, State) -> @@ -132,7 +132,7 @@ handle_info({membership, _Event}, State) -> {noreply, State}; handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index db2376784..5db6b28ba 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -117,7 +117,12 @@ handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From, {error, Result} end catch Error:Reason:ST -> - ?LOG(error, "change_config failed: ~p", [{Error, Reason, ST}]), + ?SLOG(error, #{ + msg => "change_config_failed", + error => Error, + reason => Reason, + st => ST + }), {error, Reason} end, {reply, Reply, State}; diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 8d0e74313..b3c4fd0c5 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -417,14 +417,20 @@ handle_msg({'$gen_cast', Req}, State) -> {ok, NewState}; handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> - ?LOG(debug, "RECV ~0p", [Data]), + ?SLOG(debug, #{ + msg => "RECV_data", + data => Data + }), Oct = iolist_size(Data), inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), parse_incoming(Data, State); handle_msg({quic, Data, _Sock, _, _, _}, State) -> - ?LOG(debug, "RECV ~0p", [Data]), + ?SLOG(debug, #{ + msg => "RECV_data", + data => Data + }), Oct = iolist_size(Data), inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), @@ -489,7 +495,7 @@ handle_msg({connack, ConnAck}, State) -> handle_outgoing(ConnAck, State); handle_msg({close, Reason}, State) -> - ?LOG(debug, "Force to close the socket due to ~p", [Reason]), + ?SLOG(debug, #{msg => "force_to_close_the_socket", reason => Reason}), handle_info({sock_closed, Reason}, close_socket(State)); handle_msg({event, connected}, State = #state{channel = Channel}) -> @@ -672,7 +678,10 @@ next_incoming_msgs(Packets) -> handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) -> ok = inc_incoming_stats(Packet), - ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), + ?SLOG(debug, #{ + msg => "RECV_packet", + packet => Packet + }), with_channel(handle_in, [Packet], State); handle_incoming(FrameError, State) -> @@ -708,12 +717,17 @@ handle_outgoing(Packet, State) -> serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> try emqx_frame:serialize_pkt(Packet, Serialize) of - <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", - [emqx_packet:format(Packet)]), + <<>> -> ?SLOG(warning, #{ + msg => "packet_is_discarded_because_the frame_is_too_large", + packet => emqx_packet:format(Packet) + }), ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), <<>>; - Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), + Data -> ?SLOG(debug, #{ + msg => "SEND_packet", + packet => emqx_packet:format(Packet) + }), ok = inc_outgoing_stats(Packet), Data catch @@ -763,7 +777,7 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) -> handle_info({sock_error, Reason}, State) -> case Reason =/= closed andalso Reason =/= einval of - true -> ?LOG(warning, "socket_error: ~p", [Reason]); + true -> ?SLOG(warning, #{msg => "socket_error", reason => Reason}); false -> ok end, handle_info({sock_closed, Reason}, close_socket(State)); @@ -805,7 +819,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> {ok, Limiter1} -> State#state{limiter = Limiter1}; {pause, Time, Limiter1} -> - ?LOG(warning, "Pause ~pms due to rate limit", [Time]), + ?SLOG(warning, #{msg => "pause_time_due_to_rate_limit", time_in_ms => Time}), TRef = start_timer(Time, limit_timeout), State#state{sockstate = blocked, limiter = Limiter1, diff --git a/apps/emqx/src/emqx_ctl.erl b/apps/emqx/src/emqx_ctl.erl index a71398095..09f913df6 100644 --- a/apps/emqx/src/emqx_ctl.erl +++ b/apps/emqx/src/emqx_ctl.erl @@ -185,13 +185,13 @@ handle_call({register_command, Cmd, MF, Opts}, _From, State = #state{seq = Seq}) case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of [] -> ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}); [[OriginSeq] | _] -> - ?LOG(warning, "CMD ~s is overidden by ~p", [Cmd, MF]), + ?SLOG(warning, #{msg => "CMD is overidden", cmd => Cmd, mf => MF}), true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts}) end, {reply, ok, next_seq(State)}; handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast({unregister_command, Cmd}, State) -> @@ -199,11 +199,11 @@ handle_cast({unregister_command, Cmd}, State) -> noreply(State); handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), noreply(State). handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), noreply(State). terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 1908430be..cb20a67fd 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -106,7 +106,7 @@ init([]) -> {ok, #{}, hibernate}. handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast({detected, #flapping{clientid = ClientId, @@ -116,8 +116,13 @@ handle_cast({detected, #flapping{clientid = ClientId, #{window_time := WindTime, ban_time := Interval}}, State) -> case now_diff(StartedAt) < WindTime of true -> %% Flapping happened:( - ?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms", - [ClientId, inet:ntoa(PeerHost), DetectCnt, WindTime]), + ?SLOG(error, #{ + msg => "flapping_detected", + client_id => ClientId, + peer_host => inet:ntoa(PeerHost), + detect_cnt => DetectCnt, + wind_time_in_ms => WindTime + }), Now = erlang:system_time(second), Banned = #banned{who = {clientid, ClientId}, by = <<"flapping detector">>, @@ -126,13 +131,18 @@ handle_cast({detected, #flapping{clientid = ClientId, until = Now + (Interval div 1000)}, emqx_banned:create(Banned); false -> - ?LOG(warning, "~s(~s) disconnected ~w times in ~wms", - [ClientId, inet:ntoa(PeerHost), DetectCnt, Interval]) + ?SLOG(warning, #{ + msg => "client_disconnected", + client_id => ClientId, + peer_host => inet:ntoa(PeerHost), + detect_cnt => DetectCnt, + interval => Interval + }) end, {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) -> @@ -144,7 +154,7 @@ handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) -> {noreply, State, hibernate}; handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_hooks.erl b/apps/emqx/src/emqx_hooks.erl index f056b754e..a914a37c8 100644 --- a/apps/emqx/src/emqx_hooks.erl +++ b/apps/emqx/src/emqx_hooks.erl @@ -206,7 +206,11 @@ safe_execute({M, F, A}, Args) -> Result -> Result catch Error:Reason:Stacktrace -> - ?LOG(error, "Failed to execute ~0p: ~0p", [{M, F, A}, {Error, Reason, Stacktrace}]), + ?SLOG(error, #{ + msg => "failed_to_execute", + module_function_arity => {M, F, A}, + error_reason_stacktrace => {Error, Reason, Stacktrace} + }), ok end. @@ -246,7 +250,7 @@ handle_call({put, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, S {reply, Reply, State}; handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast({del, HookPoint, Action}, State) -> @@ -259,11 +263,11 @@ handle_cast({del, HookPoint, Action}, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "Unexpected msg: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index 282b8b5f3..740c29290 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -442,13 +442,17 @@ init([]) -> {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}. handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) -> - ?LOG(error, "Failed to create ~s:~s for index exceeded.", [Type, Name]), + ?SLOG(error, #{ + msg => "failed_to_create_type_name_for_index_exceeded", + type => Type, + name => Name + }), {reply, {error, metric_index_exceeded}, State}; handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) -> case ets:lookup(?TAB, Name) of [#metric{idx = Idx}] -> - ?LOG(info, "~s already exists.", [Name]), + ?SLOG(info, #{msg => "name_already_exists", name => Name}), {reply, {ok, Idx}, State}; [] -> Metric = #metric{name = Name, type = Type, idx = NextIdx}, @@ -464,15 +468,15 @@ handle_call({set_type_to_counter, Keys}, _From, State) -> {reply, ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_os_mon.erl b/apps/emqx/src/emqx_os_mon.erl index 85e448f41..5e8e2687c 100644 --- a/apps/emqx/src/emqx_os_mon.erl +++ b/apps/emqx/src/emqx_os_mon.erl @@ -87,7 +87,10 @@ handle_call(Req, _From, State) -> {reply, {error, {unexpected_call, Req}}, State}. handle_cast(Msg, State) -> - ?LOG(error, "unexpected_cast_discarded: ~p", [Msg]), + ?SLOG(error, #{ + msg => "unexpected_cast_discarded", + payload => Msg + }), {noreply, State}. handle_info({timeout, _Timer, check}, State) -> @@ -109,7 +112,10 @@ handle_info({timeout, _Timer, check}, State) -> {noreply, State}; handle_info(Info, State) -> - ?LOG(info, "unexpected_info_discarded: ~p", [Info]), + ?SLOG(info, #{ + msg => "unexpected_info_discarded", + info => Info + }), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_plugins.erl b/apps/emqx/src/emqx_plugins.erl index 7bb9c084b..acd91bfea 100644 --- a/apps/emqx/src/emqx_plugins.erl +++ b/apps/emqx/src/emqx_plugins.erl @@ -50,10 +50,16 @@ load() -> load(PluginName) when is_atom(PluginName) -> case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of {false, _} -> - ?LOG(alert, "Plugin ~s not found, cannot load it", [PluginName]), + ?SLOG(alert, #{ + msg => "plugin_not_found_cannot_load", + plugin_name => PluginName + }), {error, not_found}; {_, true} -> - ?LOG(notice, "Plugin ~s is already started", [PluginName]), + ?SLOG(notice, #{ + msg => "plugin_already_started", + plugin_name => PluginName + }), {error, already_started}; {_, false} -> load_plugin(PluginName) @@ -69,10 +75,16 @@ unload() -> unload(PluginName) when is_atom(PluginName) -> case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of {false, _} -> - ?LOG(error, "Plugin ~s is not found, cannot unload it", [PluginName]), + ?SLOG(error, #{ + msg => "plugin_not_found_cannot_load", + plugin_name => PluginName + }), {error, not_found}; {_, false} -> - ?LOG(error, "Plugin ~s is not started", [PluginName]), + ?SLOG(error, #{ + msg => "plugin_not_started", + plugin_name => PluginName + }), {error, not_started}; {_, _} -> unload_plugin(PluginName) @@ -81,7 +93,10 @@ unload(PluginName) when is_atom(PluginName) -> reload(PluginName) when is_atom(PluginName)-> case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of {false, _} -> - ?LOG(error, "Plugin ~s is not found, cannot reload it", [PluginName]), + ?SLOG(error, #{ + msg => "plugin_not_found_cannot_load", + plugin_name => PluginName + }), {error, not_found}; {_, false} -> load(PluginName); @@ -127,14 +142,20 @@ load_ext_plugins(Dir) -> end, filelib:wildcard("*", Dir)). load_ext_plugin(PluginDir) -> - ?LOG(debug, "loading_extra_plugin: ~s", [PluginDir]), + ?SLOG(debug, #{ + msg => "loading_extra_plugin", + plugin_dir => PluginDir + }), Ebin = filename:join([PluginDir, "ebin"]), AppFile = filename:join([Ebin, "*.app"]), AppName = case filelib:wildcard(AppFile) of [App] -> list_to_atom(filename:basename(App, ".app")); [] -> - ?LOG(alert, "plugin_app_file_not_found: ~s", [AppFile]), + ?SLOG(alert, #{ + msg => "plugin_app_file_not_found", + app_file => AppFile + }), error({plugin_app_file_not_found, AppFile}) end, ok = load_plugin_app(AppName, Ebin). @@ -185,7 +206,12 @@ load_plugin(Name) -> {error, Error0} end catch _ : Error : Stacktrace -> - ?LOG(alert, "Plugin ~s load failed with ~p", [Name, {Error, Stacktrace}]), + ?SLOG(alert, #{ + msg => "plugin_load_failed", + name => Name, + error => Error, + stk => Stacktrace + }), {error, parse_config_file_failed} end. @@ -202,11 +228,22 @@ load_app(App) -> start_app(App) -> case application:ensure_all_started(App) of {ok, Started} -> - ?LOG(info, "Started plugins: ~p", [Started]), - ?LOG(info, "Load plugin ~s successfully", [App]), + ?SLOG(info, #{ + msg => "all_started_plugins", + started => Started + }), + ?SLOG(info, #{ + msg => "load_plugin_app_successfully", + app => App + }), ok; {error, {ErrApp, Reason}} -> - ?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~0p", [App, ErrApp, Reason]), + ?SLOG(error, #{ + msg => "load_plugin_failed_cannot_started", + app => App, + err_app => ErrApp, + reason => Reason + }), {error, {ErrApp, Reason}} end. @@ -221,11 +258,24 @@ unload_plugin(App) -> stop_app(App) -> case application:stop(App) of ok -> - ?LOG(info, "Stop plugin ~s successfully", [App]), ok; + ?SLOG(info, #{ + msg => "stop_plugin_successfully", + app => App + }), + ok; {error, {not_started, App}} -> - ?LOG(error, "Plugin ~s is not started", [App]), ok; + ?SLOG(error, #{ + msg => "plugin_not_started", + app => App + }), + ok; {error, Reason} -> - ?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason} + ?SLOG(error, #{ + msg => "stop_plugin", + app => App, + error => Reason + }), + {error, Reason} end. names(plugin) -> @@ -238,4 +288,7 @@ names(Plugins) -> [Name || #plugin{name = Name} <- Plugins]. funlog(Key, Value) -> - ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]). + ?SLOG(info, #{ + key => string:join(Key, "."), + value => Value + }). diff --git a/apps/emqx/src/emqx_pool.erl b/apps/emqx/src/emqx_pool.erl index 8fa950fe3..a4720fc5b 100644 --- a/apps/emqx/src/emqx_pool.erl +++ b/apps/emqx/src/emqx_pool.erl @@ -100,22 +100,26 @@ handle_call({submit, Task}, _From, State) -> {reply, catch run(Task), State}; handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast({async_submit, Task}, State) -> try run(Task) catch _:Error:Stacktrace -> - ?LOG(error, "Error: ~0p, ~0p", [Error, Stacktrace]) + ?SLOG(error, #{ + msg => "error", + error => Error, + stk => Stacktrace + }) end, {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index d25a8bec6..0ef6e699e 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -203,15 +203,15 @@ handle_call({delete_route, Topic, Dest}, _From, State) -> {reply, Ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 78d763cac..1b6c7c042 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -109,11 +109,11 @@ init([]) -> {ok, #{nodes => Nodes}, hibernate}. handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, @@ -130,7 +130,10 @@ handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) -> {noreply, State}; handle_info({mnesia_table_event, Event}, State) -> - ?LOG(error, "Unexpected mnesia_table_event: ~p", [Event]), + ?SLOG(error,#{ + msg => "unexpected_mnesia_table_event", + event => Event + }), {noreply, State}; handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> @@ -148,7 +151,7 @@ handle_info({membership, _Event}, State) -> {noreply, State}; handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index f915155cb..eac267fef 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -479,11 +479,16 @@ log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) -> case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of true -> ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), - ?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]); + ?SLOG(warning, #{ + msg => "dropped_qos0_msg", + payload => emqx_message:format(Msg) + }); false -> ok = emqx_metrics:inc('delivery.dropped.queue_full'), - ?LOG(warning, "Dropped msg due to mqueue is full: ~s", - [emqx_message:format(Msg)]) + ?SLOG(warning, #{ + msg => "dropped_msg_due_to_mqueue_is_full", + payload => emqx_message:format(Msg) + }) end. enrich_fun(Session = #session{subscriptions = Subs}) -> diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 59e364f58..3109cf118 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -325,11 +325,11 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> {reply, ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) -> @@ -348,7 +348,10 @@ handle_info({mnesia_table_event, _Event}, State) -> {noreply, State}; handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> - ?LOG(info, "Shared subscriber down: ~p", [SubPid]), + ?SLOG(info, #{ + msg => "shared_subscriber_down", + sub_pid => SubPid + }), cleanup_down(SubPid), {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})}; diff --git a/apps/emqx/src/emqx_stats.erl b/apps/emqx/src/emqx_stats.erl index 51ba72155..0c84a754f 100644 --- a/apps/emqx/src/emqx_stats.erl +++ b/apps/emqx/src/emqx_stats.erl @@ -202,7 +202,7 @@ handle_call(stop, _From, State) -> {stop, normal, ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast({setstat, Stat, MaxStat, Val}, State) -> @@ -221,7 +221,10 @@ handle_cast({update_interval, Update = #update{name = Name}}, State = #state{updates = Updates}) -> NState = case lists:keyfind(Name, #update.name, Updates) of #update{} -> - ?LOG(warning, "Duplicated update: ~s", [Name]), + ?SLOG(warning, #{ + msg => "duplicated_update", + name => Name + }), State; false -> State#state{updates = [Update|Updates]} end, @@ -232,7 +235,7 @@ handle_cast({cancel_update, Name}, State = #state{updates = Updates}) -> {noreply, State#state{updates = Updates1}}; handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) -> @@ -242,7 +245,11 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update try UpFun() catch _:Error -> - ?LOG(error, "Update ~s failed: ~0p", [Name, Error]) + ?SLOG(error, #{ + msg => "update_name_failed", + name => Name, + error => Error + }) end, [Update#update{countdown = I} | Acc]; (Update = #update{countdown = C}, Acc) -> @@ -251,7 +258,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update {noreply, start_timer(State#state{updates = Updates1}), hibernate}; handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, #state{timer = TRef}) -> @@ -271,6 +278,10 @@ safe_update_element(Key, Val) -> true -> true catch error:badarg -> - ?LOG(warning, "Failed to update ~0p to ~0p", [Key, Val]) + ?SLOG(warning, #{ + msg => "failed_to_update", + key => Key, + val => Val + }) end. diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 70043e2bb..84c3fa2e0 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -134,11 +134,11 @@ handle_call(uptime, _From, State) -> {reply, uptime(State), State}; handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) -> @@ -156,7 +156,7 @@ handle_info({timeout, TRef, tick}, {noreply, tick(State), hibernate}; handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> diff --git a/apps/emqx/src/emqx_sys_mon.erl b/apps/emqx/src/emqx_sys_mon.erl index 80f5e49ec..dce3e39b1 100644 --- a/apps/emqx/src/emqx_sys_mon.erl +++ b/apps/emqx/src/emqx_sys_mon.erl @@ -83,18 +83,21 @@ sysm_opts([_Opt|Opts], Acc) -> sysm_opts(Opts, Acc). handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", req => Msg}), {noreply, State}. handle_info({monitor, Pid, long_gc, Info}, State) -> suppress({long_gc, Pid}, fun() -> WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]), - ?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]), + ?SLOG(warning, #{ + warn_msg => WarnMsg, + pid_info => procinfo(Pid) + }), safe_publish(long_gc, WarnMsg) end, State); @@ -102,7 +105,10 @@ handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) -> suppress({long_schedule, Pid}, fun() -> WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]), - ?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]), + ?SLOG(warning, #{ + warn_msg => WarnMsg, + pid_info => procinfo(Pid) + }), safe_publish(long_schedule, WarnMsg) end, State); @@ -110,7 +116,10 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) -> suppress({long_schedule, Port}, fun() -> WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]), - ?LOG(warning, "~s~n~p", [WarnMsg, erlang:port_info(Port)]), + ?SLOG(warning, #{ + warn_msg => WarnMsg, + port_info => erlang:port_info(Port) + }), safe_publish(long_schedule, WarnMsg) end, State); @@ -118,7 +127,10 @@ handle_info({monitor, Pid, large_heap, Info}, State) -> suppress({large_heap, Pid}, fun() -> WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]), - ?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]), + ?SLOG(warning, #{ + warn_msg => WarnMsg, + pid_info => procinfo(Pid) + }), safe_publish(large_heap, WarnMsg) end, State); @@ -126,7 +138,11 @@ handle_info({monitor, SusPid, busy_port, Port}, State) -> suppress({busy_port, Port}, fun() -> WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + ?SLOG(warning, #{ + warn_msg => WarnMsg, + pid_info => procinfo(SusPid), + port_info => erlang:port_info(Port) + }), safe_publish(busy_port, WarnMsg) end, State); @@ -134,7 +150,11 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) -> suppress({busy_dist_port, Port}, fun() -> WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + ?SLOG(warning, #{ + warn_msg => WarnMsg, + pid_info => procinfo(SusPid), + port_info => erlang:port_info(Port) + }), safe_publish(busy_dist_port, WarnMsg) end, State); @@ -142,7 +162,7 @@ handle_info({timeout, _Ref, reset}, State) -> {noreply, State#{events := []}, hibernate}; handle_info(Info, State) -> - ?LOG(error, "Unexpected Info: ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, #{timer := TRef}) -> diff --git a/apps/emqx/src/emqx_tracer.erl b/apps/emqx/src/emqx_tracer.erl index d05840433..0ee9de324 100644 --- a/apps/emqx/src/emqx_tracer.erl +++ b/apps/emqx/src/emqx_tracer.erl @@ -115,18 +115,25 @@ install_trace_handler(Who, Level, LogFile) -> {fun filter_by_meta_key/2, Who}}]}) of ok -> - ?LOG(info, "Start trace for ~p", [Who]); + ?SLOG(info, #{msg => "start_trace_for", who => Who}); {error, Reason} -> - ?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]), + ?SLOG(error, #{msg => "start_trace_for_who_failed", who => Who, reason => Reason}), {error, Reason} end. uninstall_trance_handler(Who) -> case logger:remove_handler(handler_id(Who)) of ok -> - ?LOG(info, "Stop trace for ~p", [Who]); + ?SLOG(info, #{ + msg => "stop_trace_for", + who => Who + }); {error, Reason} -> - ?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]), + ?SLOG(error, #{ + msg => "stop_trace_for", + who => Who, + reason => Reason + }), {error, Reason} end. diff --git a/apps/emqx/src/emqx_vm_mon.erl b/apps/emqx/src/emqx_vm_mon.erl index 51710b5b5..5acb16d32 100644 --- a/apps/emqx/src/emqx_vm_mon.erl +++ b/apps/emqx/src/emqx_vm_mon.erl @@ -49,11 +49,17 @@ init([]) -> {ok, #{}}. handle_call(Req, _From, State) -> - ?LOG(error, "[VM_MON] Unexpected call: ~p", [Req]), + ?SLOG(error, #{ + msg => "[VM_MON]_unexpected_call", + req => Req + }), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[VM_MON] Unexpected cast: ~p", [Msg]), + ?SLOG(error, #{ + msg => "[VM_MON]_unexpected_cast", + cast => Msg + }), {noreply, State}. handle_info({timeout, _Timer, check}, State) -> @@ -75,7 +81,10 @@ handle_info({timeout, _Timer, check}, State) -> {noreply, State}; handle_info(Info, State) -> - ?LOG(error, "[VM_MON] Unexpected info: ~p", [Info]), + ?SLOG(error, #{ + msg => "[VM_MON]_unexpected_info", + info => Info + }), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index bbd1fb62d..c6a427942 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -182,7 +182,10 @@ init(Req, #{listener := {Type, Listener}} = Opts) -> }, case check_origin_header(Req, Opts) of {error, Message} -> - ?LOG(error, "Invalid Origin Header ~p~n", [Message]), + ?SLOG(error, #{ + msg => "invalid_origin_header", + payload => Message + }), {ok, cowboy_req:reply(403, Req), WsOpts}; ok -> parse_sec_websocket_protocol(Req, Opts, WsOpts) end. @@ -263,11 +266,12 @@ websocket_init([Req, #{zone := Zone, listener := {Type, Listener}} = Opts]) -> WsCookie = try cowboy_req:parse_cookies(Req) catch error:badarg -> - ?LOG(error, "Illegal cookie"), + ?SLOG(error, #{msg => "illegal_cookie"}), undefined; Error:Reason -> - ?LOG(error, "Failed to parse cookie, Error: ~0p, Reason ~0p", - [Error, Reason]), + ?SLOG(error, #{msg => "failed_to_parse_cookie", + error => Error, + reason => Reason}), undefined end, ConnInfo = #{socktype => ws, @@ -324,7 +328,7 @@ websocket_handle({binary, Data}, State) when is_list(Data) -> websocket_handle({binary, iolist_to_binary(Data)}, State); websocket_handle({binary, Data}, State) -> - ?LOG(debug, "RECV ~0p", [Data]), + ?SLOG(debug, #{msg => "recv_data", data => Data}), ok = inc_recv_stats(1, iolist_size(Data)), NState = ensure_stats_timer(State), return(parse_incoming(Data, NState)); @@ -339,7 +343,7 @@ websocket_handle({Frame, _}, State) when Frame =:= ping; Frame =:= pong -> websocket_handle({Frame, _}, State) -> %% TODO: should not close the ws connection - ?LOG(error, "Unexpected frame - ~p", [Frame]), + ?SLOG(error, #{msg => "unexpected_frame", frame => Frame}), shutdown(unexpected_ws_frame, State). websocket_info({call, From, Req}, State) -> @@ -397,11 +401,11 @@ websocket_info(Info, State) -> websocket_close({_, ReasonCode, _Payload}, State) when is_integer(ReasonCode) -> websocket_close(ReasonCode, State); websocket_close(Reason, State) -> - ?LOG(debug, "Websocket closed due to ~p~n", [Reason]), + ?SLOG(debug, #{msg => "websocket_closed", reason => Reason}), handle_info({sock_closed, Reason}, State). terminate(Reason, _Req, #state{channel = Channel}) -> - ?LOG(debug, "Terminated due to ~p", [Reason]), + ?SLOG(debug, #{msg => "terminated", reason => Reason}), emqx_channel:terminate(Reason, Channel); terminate(_Reason, _Req, _UnExpectedState) -> @@ -446,7 +450,7 @@ handle_info({connack, ConnAck}, State) -> return(enqueue(ConnAck, State)); handle_info({close, Reason}, State) -> - ?LOG(debug, "Force to close the socket due to ~p", [Reason]), + ?SLOG(debug, #{msg => "force_to_close_the_socket", reason => Reason}), return(enqueue({close, Reason}, State)); handle_info({event, connected}, State = #state{channel = Channel}) -> @@ -499,7 +503,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> {ok, Limiter1} -> State#state{limiter = Limiter1}; {pause, Time, Limiter1} -> - ?LOG(warning, "Pause ~pms due to rate limit", [Time]), + ?SLOG(warning, #{msg => "pause_due_to_rate_limit", time => Time}), TRef = start_timer(Time, limit_timeout), NState = State#state{sockstate = blocked, limiter = Limiter1, @@ -570,7 +574,7 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> handle_incoming(Packet, State = #state{listener = {Type, Listener}}) when is_record(Packet, mqtt_packet) -> - ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), + ?SLOG(debug, #{msg => "RECV", packet => emqx_packet:format(Packet)}), ok = inc_incoming_stats(Packet), NState = case emqx_pd:get_counter(incoming_pubs) > get_active_n(Type, Listener) of @@ -628,12 +632,14 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback, serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> try emqx_frame:serialize_pkt(Packet, Serialize) of - <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.", - [emqx_packet:format(Packet)]), + <<>> -> ?SLOG(warning, #{ + msg => "packet_is_discarded_due_to_the_frame_is_too_large", + packet => emqx_packet:format(Packet) + }), ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), <<>>; - Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), + Data -> ?SLOG(debug, #{msg => "SEND", packet => Packet}), ok = inc_outgoing_stats(Packet), Data catch