refactor(emqx): refine SLOG messages

unified logging for unexpected handle_cast handle_call and handle_info
This commit is contained in:
Zaiming Shi 2021-10-11 01:35:43 +02:00
parent 785793b345
commit 71731c01f1
30 changed files with 210 additions and 345 deletions

View File

@ -239,17 +239,11 @@ handle_call({get_alarms, deactivated}, _From, State) ->
{reply, Alarms, State}; {reply, Alarms, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{ ?SLOG(error, #{msg => "unexpected_call", call => Req}),
msg => "unexpected_call",
call => Req
}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{ ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
msg => "unexpected_msg",
payload => Msg
}),
{noreply, State}. {noreply, State}.
handle_info({timeout, _TRef, delete_expired_deactivated_alarm}, handle_info({timeout, _TRef, delete_expired_deactivated_alarm},
@ -259,10 +253,7 @@ handle_info({timeout, _TRef, delete_expired_deactivated_alarm},
{noreply, State#state{timer = ensure_timer(TRef, Period)}}; {noreply, State#state{timer = ensure_timer(TRef, Period)}};
handle_info({update_timer, Period}, #state{timer = TRef} = State) -> handle_info({update_timer, Period}, #state{timer = TRef} = State) ->
?SLOG(warning, #{ ?SLOG(warning, #{msg => "update_the_validity_period_timer", period => Period}),
msg => "update_the_validity_period_timer",
period => Period
}),
{noreply, State#state{timer = ensure_timer(TRef, Period)}}; {noreply, State#state{timer = ensure_timer(TRef, Period)}};
handle_info(Info, State) -> handle_info(Info, State) ->

View File

@ -311,7 +311,7 @@ do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | M
catch catch
Class:Reason:Stacktrace -> Class:Reason:Stacktrace ->
?SLOG(warning, #{msg => "unexpected_error_in_authentication", ?SLOG(warning, #{msg => "unexpected_error_in_authentication",
class => Class, exception => Class,
reason => Reason, reason => Reason,
stacktrace => Stacktrace, stacktrace => Stacktrace,
authenticator => ID}), authenticator => ID}),
@ -652,11 +652,11 @@ handle_call({list_users, ChainName, AuthenticatorID}, _From, State) ->
reply(Reply, State); reply(Reply, State);
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(Req, State) -> handle_cast(Req, State) ->
?SLOG(error, #{msg => "unexpected_cast", req => Req}), ?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
{noreply, State}. {noreply, State}.
handle_info(Info, State) -> handle_info(Info, State) ->

View File

@ -187,14 +187,11 @@ init([]) ->
{ok, ensure_expiry_timer(#{expiry_timer => undefined})}. {ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{ ?SLOG(error, #{msg => "unexpected_msg", cast => Msg}),
msg => "unexpected_msg",
payload => Msg
}),
{noreply, State}. {noreply, State}.
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->

View File

@ -202,10 +202,8 @@ publish(Msg) when is_record(Msg, message) ->
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
#message{headers = #{allow_publish := false}} -> #message{headers = #{allow_publish := false}} ->
?SLOG(notice, #{ ?SLOG(debug, #{msg => "message_not_published",
msg => "stop_publishing", payload => emqx_message:to_log_map(Msg)}),
payload => emqx_message:format(Msg)
}),
[]; [];
Msg1 = #message{topic = Topic} -> Msg1 = #message{topic = Topic} ->
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
@ -217,11 +215,12 @@ safe_publish(Msg) when is_record(Msg, message) ->
try try
publish(Msg) publish(Msg)
catch catch
_:Error:Stk-> Error : Reason : Stk->
?SLOG(error,#{ ?SLOG(error,#{
msg => "publishing_error", msg => "publishing_error",
error => Error, exception => Error,
payload => Msg, reason => Reason,
payload => emqx_message:to_log_map(Msg),
stacktrace => Stk stacktrace => Stk
}), }),
[] []
@ -465,17 +464,14 @@ handle_call({subscribe, Topic, I}, _From, State) ->
{reply, Ok, State}; {reply, Ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast({subscribe, Topic}, State) -> handle_cast({subscribe, Topic}, State) ->
case emqx_router:do_add_route(Topic) of case emqx_router:do_add_route(Topic) of
ok -> ok; ok -> ok;
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{msg => "failed_to_add_route", reason => Reason})
msg => "failed_to_add_route",
reason => Reason
})
end, end,
{noreply, State}; {noreply, State};
@ -499,7 +495,7 @@ handle_cast({unsubscribed, Topic, I}, State) ->
{noreply, State}; {noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", req => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info(Info, State) -> handle_info(Info, State) ->

View File

@ -118,7 +118,7 @@ init([]) ->
{ok, #{pmon => emqx_pmon:new()}}. {ok, #{pmon => emqx_pmon:new()}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
@ -127,7 +127,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
{noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}}; {noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", req => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) -> handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) ->

View File

@ -373,17 +373,11 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
ok = after_message_acked(ClientInfo, Msg, Properties), ok = after_message_acked(ClientInfo, Msg, Properties),
handle_out(publish, Publishes, Channel#channel{session = NSession}); handle_out(publish, Publishes, Channel#channel{session = NSession});
{error, ?RC_PACKET_IDENTIFIER_IN_USE} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?SLOG(warning, #{ ?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}),
msg => "puback_packetId_inuse",
packetId => PacketId
}),
ok = emqx_metrics:inc('packets.puback.inuse'), ok = emqx_metrics:inc('packets.puback.inuse'),
{ok, Channel}; {ok, Channel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?SLOG(warning, #{ ?SLOG(warning, #{msg => "puback_packetId_not_found", packetId => PacketId}),
msg => "puback_packetId_not_found",
packetId => PacketId
}),
ok = emqx_metrics:inc('packets.puback.missed'), ok = emqx_metrics:inc('packets.puback.missed'),
{ok, Channel} {ok, Channel}
end; end;
@ -507,17 +501,11 @@ handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState})
handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel); handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) -> handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
?SLOG(error, #{ ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
msg => "malformed_mqtt_message",
reason => Reason
}),
{ok, Channel}; {ok, Channel};
handle_in(Packet, Channel) -> handle_in(Packet, Channel) ->
?SLOG(error, #{ ?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}),
msg => "disconnecting_due_to_unexpected_message",
packet => Packet
}),
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel). handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -541,10 +529,7 @@ process_connect(AckProps, Channel = #channel{conninfo = ConnInfo,
{error, client_id_unavailable} -> {error, client_id_unavailable} ->
handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel); handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel);
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{msg => "failed_to_open_session", reason => Reason}),
msg => "failed_to_open_session",
reason => Reason
}),
handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel) handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel)
end. end.
@ -995,7 +980,7 @@ handle_call({quota, Policy}, Channel) ->
reply(ok, Channel#channel{quota = Quota}); reply(ok, Channel#channel{quota = Quota});
handle_call(Req, Channel) -> handle_call(Req, Channel) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
reply(ignored, Channel). reply(ignored, Channel).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1035,10 +1020,7 @@ handle_info({sock_closed, Reason}, Channel =
end; end;
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
?SLOG(error, #{ ?SLOG(error, #{msg => "unexpected_sock_close", reason => Reason}),
msg => "unexpected_sock_closed",
reason => Reason
}),
{ok, Channel}; {ok, Channel};
handle_info(clean_authz_cache, Channel) -> handle_info(clean_authz_cache, Channel) ->
@ -1109,10 +1091,7 @@ handle_timeout(_TRef, expire_quota_limit, Channel) ->
{ok, clean_timer(quota_timer, Channel)}; {ok, clean_timer(quota_timer, Channel)};
handle_timeout(_TRef, Msg, Channel) -> handle_timeout(_TRef, Msg, Channel) ->
?SLOG(error, #{ ?SLOG(error, #{msg => "unexpected_timeout", timeout_message => Msg}),
msg => "unexpected_timeout",
payload => Msg
}),
{ok, Channel}. {ok, Channel}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -266,9 +266,8 @@ get_mqtt_conf(Zone, Key) ->
emqx_config:get_zone_conf(Zone, [mqtt, Key]). emqx_config:get_zone_conf(Zone, [mqtt, Key]).
%% @doc Try to takeover a session. %% @doc Try to takeover a session.
-spec(takeover_session(emqx_types:clientid()) -spec(takeover_session(emqx_types:clientid()) ->
-> {error, term()} {error, term()} | {ok, atom(), pid(), emqx_session:session()}).
| {ok, atom(), pid(), emqx_session:session()}).
takeover_session(ClientId) -> takeover_session(ClientId) ->
case lookup_channels(ClientId) of case lookup_channels(ClientId) of
[] -> {error, not_found}; [] -> {error, not_found};
@ -276,10 +275,7 @@ takeover_session(ClientId) ->
takeover_session(ClientId, ChanPid); takeover_session(ClientId, ChanPid);
ChanPids -> ChanPids ->
[ChanPid|StalePids] = lists:reverse(ChanPids), [ChanPid|StalePids] = lists:reverse(ChanPids),
?SLOG(error, #{ ?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}),
msg => "more_than_one_channel_found",
chan_pids => ChanPids
}),
lists:foreach(fun(StalePid) -> lists:foreach(fun(StalePid) ->
catch discard_session(ClientId, StalePid) catch discard_session(ClientId, StalePid)
end, StalePids), end, StalePids),
@ -344,10 +340,7 @@ kick_session(ClientId) ->
kick_session(ClientId, ChanPid); kick_session(ClientId, ChanPid);
ChanPids -> ChanPids ->
[ChanPid|StalePids] = lists:reverse(ChanPids), [ChanPid|StalePids] = lists:reverse(ChanPids),
?SLOG(error, #{ ?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}),
msg => "more_than_one_channel_found",
chan_pids => ChanPids
}),
lists:foreach(fun(StalePid) -> lists:foreach(fun(StalePid) ->
catch discard_session(ClientId, StalePid) catch discard_session(ClientId, StalePid)
end, StalePids), end, StalePids),
@ -422,7 +415,7 @@ init([]) ->
{ok, #{chan_pmon => emqx_pmon:new()}}. {ok, #{chan_pmon => emqx_pmon:new()}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
@ -430,7 +423,7 @@ handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
{noreply, State#{chan_pmon := PMon1}}; {noreply, State#{chan_pmon := PMon1}};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", req => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->

View File

@ -114,11 +114,11 @@ init([]) ->
{ok, #{}}. {ok, #{}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", req => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info({membership, {mnesia, down, Node}}, State) -> handle_info({membership, {mnesia, down, Node}}, State) ->

View File

@ -119,9 +119,9 @@ handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
catch Error:Reason:ST -> catch Error:Reason:ST ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "change_config_failed", msg => "change_config_failed",
error => Error, exception => Error,
reason => Reason, reason => Reason,
st => ST stacktrace => ST
}), }),
{error, Reason} {error, Reason}
end, end,

View File

@ -417,20 +417,14 @@ handle_msg({'$gen_cast', Req}, State) ->
{ok, NewState}; {ok, NewState};
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
?SLOG(debug, #{ ?SLOG(debug, #{msg => "RECV_data", data => Data, transport => Inet}),
msg => "RECV_data",
data => Data
}),
Oct = iolist_size(Data), Oct = iolist_size(Data),
inc_counter(incoming_bytes, Oct), inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct), ok = emqx_metrics:inc('bytes.received', Oct),
parse_incoming(Data, State); parse_incoming(Data, State);
handle_msg({quic, Data, _Sock, _, _, _}, State) -> handle_msg({quic, Data, _Sock, _, _, _}, State) ->
?SLOG(debug, #{ ?SLOG(debug, #{msg => "RECV_data", data => Data, transport => quic}),
msg => "RECV_data",
data => Data
}),
Oct = iolist_size(Data), Oct = iolist_size(Data),
inc_counter(incoming_bytes, Oct), inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct), ok = emqx_metrics:inc('bytes.received', Oct),
@ -495,7 +489,7 @@ handle_msg({connack, ConnAck}, State) ->
handle_outgoing(ConnAck, State); handle_outgoing(ConnAck, State);
handle_msg({close, Reason}, State) -> handle_msg({close, Reason}, State) ->
?SLOG(debug, #{msg => "force_to_close_the_socket", reason => Reason}), ?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
handle_info({sock_closed, Reason}, close_socket(State)); handle_info({sock_closed, Reason}, close_socket(State));
handle_msg({event, connected}, State = #state{channel = Channel}) -> handle_msg({event, connected}, State = #state{channel = Channel}) ->
@ -661,7 +655,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState) ?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState)
, input_bytes => Data , input_bytes => Data
, parsed_packets => Packets , parsed_packets => Packets
, exception => Reason , reason => Reason
, stacktrace => Stacktrace , stacktrace => Stacktrace
}), }),
{[{frame_error, Reason} | Packets], State} {[{frame_error, Reason} | Packets], State}
@ -678,10 +672,7 @@ next_incoming_msgs(Packets) ->
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) -> handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
ok = inc_incoming_stats(Packet), ok = inc_incoming_stats(Packet),
?SLOG(debug, #{ ?SLOG(debug, #{msg => "RECV_packet", packet => Packet}),
msg => "RECV_packet",
packet => Packet
}),
with_channel(handle_in, [Packet], State); with_channel(handle_in, [Packet], State);
handle_incoming(FrameError, State) -> handle_incoming(FrameError, State) ->
@ -718,7 +709,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet) -> fun(Packet) ->
try emqx_frame:serialize_pkt(Packet, Serialize) of try emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> -> ?SLOG(warning, #{ <<>> -> ?SLOG(warning, #{
msg => "packet_is_discarded_because_the frame_is_too_large", msg => "packet_is_discarded",
reason => "frame_is_too_large",
packet => emqx_packet:format(Packet) packet => emqx_packet:format(Packet)
}), }),
ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped.too_large'),

View File

@ -185,13 +185,13 @@ handle_call({register_command, Cmd, MF, Opts}, _From, State = #state{seq = Seq})
case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of
[] -> ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}); [] -> ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts});
[[OriginSeq] | _] -> [[OriginSeq] | _] ->
?SLOG(warning, #{msg => "CMD is overidden", cmd => Cmd, mf => MF}), ?SLOG(warning, #{msg => "CMD_overidden", cmd => Cmd, mf => MF}),
true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts}) true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts})
end, end,
{reply, ok, next_seq(State)}; {reply, ok, next_seq(State)};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast({unregister_command, Cmd}, State) -> handle_cast({unregister_command, Cmd}, State) ->
@ -199,7 +199,7 @@ handle_cast({unregister_command, Cmd}, State) ->
noreply(State); noreply(State);
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", req => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
noreply(State). noreply(State).
handle_info(Info, State) -> handle_info(Info, State) ->

View File

@ -106,7 +106,7 @@ init([]) ->
{ok, #{}, hibernate}. {ok, #{}, hibernate}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast({detected, #flapping{clientid = ClientId, handle_cast({detected, #flapping{clientid = ClientId,
@ -116,10 +116,10 @@ handle_cast({detected, #flapping{clientid = ClientId,
#{window_time := WindTime, ban_time := Interval}}, State) -> #{window_time := WindTime, ban_time := Interval}}, State) ->
case now_diff(StartedAt) < WindTime of case now_diff(StartedAt) < WindTime of
true -> %% Flapping happened:( true -> %% Flapping happened:(
?SLOG(error, #{ ?SLOG(warning, #{
msg => "flapping_detected", msg => "flapping_detected",
client_id => ClientId, client_id => ClientId,
peer_host => inet:ntoa(PeerHost), peer_host => fmt_host(PeerHost),
detect_cnt => DetectCnt, detect_cnt => DetectCnt,
wind_time_in_ms => WindTime wind_time_in_ms => WindTime
}), }),
@ -134,7 +134,7 @@ handle_cast({detected, #flapping{clientid = ClientId,
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "client_disconnected", msg => "client_disconnected",
client_id => ClientId, client_id => ClientId,
peer_host => inet:ntoa(PeerHost), peer_host => fmt_host(PeerHost),
detect_cnt => DetectCnt, detect_cnt => DetectCnt,
interval => Interval interval => Interval
}) })
@ -142,7 +142,7 @@ handle_cast({detected, #flapping{clientid = ClientId,
{noreply, State}; {noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", req => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) -> handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
@ -171,3 +171,8 @@ start_timers() ->
lists:foreach(fun({Zone, _ZoneConf}) -> lists:foreach(fun({Zone, _ZoneConf}) ->
start_timer(Zone) start_timer(Zone)
end, maps:to_list(emqx:get_config([zones], #{}))). end, maps:to_list(emqx:get_config([zones], #{}))).
fmt_host(PeerHost) ->
try inet:ntoa(PeerHost)
catch _:_ -> PeerHost
end.

View File

@ -208,10 +208,11 @@ safe_execute({M, F, A}, Args) ->
Error:Reason:Stacktrace -> Error:Reason:Stacktrace ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_execute", msg => "failed_to_execute",
module_function_arity => {M, F, A}, exception => Error,
error_reason_stacktrace => {Error, Reason, Stacktrace} reason => Reason,
}), stacktrace => Stacktrace,
ok failed_call => {M, F, A}
})
end. end.
%% @doc execute a function. %% @doc execute a function.

View File

@ -66,6 +66,7 @@
-export([ to_packet/2 -export([ to_packet/2
, to_map/1 , to_map/1
, to_log_map/1
, to_list/1 , to_list/1
, from_map/1 , from_map/1
]). ]).
@ -82,8 +83,6 @@
timestamp := integer()} timestamp := integer()}
). ).
-export([format/1]).
-elvis([{elvis_style, god_modules, disable}]). -elvis([{elvis_style, god_modules, disable}]).
-spec(make(emqx_types:topic(), emqx_types:payload()) -> emqx_types:message()). -spec(make(emqx_types:topic(), emqx_types:payload()) -> emqx_types:message()).
@ -306,6 +305,9 @@ to_map(#message{
extra => Extra extra => Extra
}. }.
%% @doc To map for logging, with payload dropped.
to_log_map(Msg) -> maps:without([payload], to_map(Msg)).
%% @doc Message to tuple list %% @doc Message to tuple list
-spec(to_list(emqx_types:message()) -> list()). -spec(to_list(emqx_types:message()) -> list()).
to_list(Msg) -> to_list(Msg) ->
@ -336,18 +338,3 @@ from_map(#{id := Id,
%% MilliSeconds %% MilliSeconds
elapsed(Since) -> elapsed(Since) ->
max(0, erlang:system_time(millisecond) - Since). max(0, erlang:system_time(millisecond) - Since).
format(#message{id = Id,
qos = QoS,
topic = Topic,
from = From,
flags = Flags,
headers = Headers}) ->
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)",
[Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]).
format(flags, Flags) ->
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
format(headers, Headers) ->
io_lib:format("~p", [Headers]).

View File

@ -87,10 +87,7 @@ handle_call(Req, _From, State) ->
{reply, {error, {unexpected_call, Req}}, State}. {reply, {error, {unexpected_call, Req}}, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{ ?SLOG(error, #{msg => "unexpected_cast", cast=> Msg}),
msg => "unexpected_cast_discarded",
payload => Msg
}),
{noreply, State}. {noreply, State}.
handle_info({timeout, _Timer, check}, State) -> handle_info({timeout, _Timer, check}, State) ->
@ -112,10 +109,7 @@ handle_info({timeout, _Timer, check}, State) ->
{noreply, State}; {noreply, State};
handle_info(Info, State) -> handle_info(Info, State) ->
?SLOG(info, #{ ?SLOG(error, #{msg => "unexpected_info", info => Info}),
msg => "unexpected_info_discarded",
info => Info
}),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->

View File

@ -29,8 +29,6 @@
, find_plugin/1 , find_plugin/1
]). ]).
-export([funlog/2]).
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
@ -50,16 +48,14 @@ load() ->
load(PluginName) when is_atom(PluginName) -> load(PluginName) when is_atom(PluginName) ->
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
{false, _} -> {false, _} ->
?SLOG(alert, #{ ?SLOG(alert, #{msg => "failed_to_load_plugin",
msg => "plugin_not_found_cannot_load", plugin_name => PluginName,
plugin_name => PluginName reason => not_found}),
}),
{error, not_found}; {error, not_found};
{_, true} -> {_, true} ->
?SLOG(notice, #{ ?SLOG(notice, #{msg => "plugin_already_loaded",
msg => "plugin_already_started", plugin_name => PluginName,
plugin_name => PluginName reason => already_loaded}),
}),
{error, already_started}; {error, already_started};
{_, false} -> {_, false} ->
load_plugin(PluginName) load_plugin(PluginName)
@ -75,16 +71,14 @@ unload() ->
unload(PluginName) when is_atom(PluginName) -> unload(PluginName) when is_atom(PluginName) ->
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
{false, _} -> {false, _} ->
?SLOG(error, #{ ?SLOG(error, #{msg => "fialed_to_unload_plugin",
msg => "plugin_not_found_cannot_load", plugin_name => PluginName,
plugin_name => PluginName reason => not_found}),
}),
{error, not_found}; {error, not_found};
{_, false} -> {_, false} ->
?SLOG(error, #{ ?SLOG(error, #{msg => "failed_to_unload_plugin",
msg => "plugin_not_started", plugin_name => PluginName,
plugin_name => PluginName reason => not_loaded}),
}),
{error, not_started}; {error, not_started};
{_, _} -> {_, _} ->
unload_plugin(PluginName) unload_plugin(PluginName)
@ -93,10 +87,9 @@ unload(PluginName) when is_atom(PluginName) ->
reload(PluginName) when is_atom(PluginName)-> reload(PluginName) when is_atom(PluginName)->
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
{false, _} -> {false, _} ->
?SLOG(error, #{ ?SLOG(error, #{msg => "failed_to_reload_plugin",
msg => "plugin_not_found_cannot_load", plugin_name => PluginName,
plugin_name => PluginName reason => not_found}),
}),
{error, not_found}; {error, not_found};
{_, false} -> {_, false} ->
load(PluginName); load(PluginName);
@ -142,20 +135,14 @@ load_ext_plugins(Dir) ->
end, filelib:wildcard("*", Dir)). end, filelib:wildcard("*", Dir)).
load_ext_plugin(PluginDir) -> load_ext_plugin(PluginDir) ->
?SLOG(debug, #{ ?SLOG(debug, #{msg => "loading_extra_plugin", plugin_dir => PluginDir}),
msg => "loading_extra_plugin",
plugin_dir => PluginDir
}),
Ebin = filename:join([PluginDir, "ebin"]), Ebin = filename:join([PluginDir, "ebin"]),
AppFile = filename:join([Ebin, "*.app"]), AppFile = filename:join([Ebin, "*.app"]),
AppName = case filelib:wildcard(AppFile) of AppName = case filelib:wildcard(AppFile) of
[App] -> [App] ->
list_to_atom(filename:basename(App, ".app")); list_to_atom(filename:basename(App, ".app"));
[] -> [] ->
?SLOG(alert, #{ ?SLOG(alert, #{msg => "plugin_app_file_not_found", app_file => AppFile}),
msg => "plugin_app_file_not_found",
app_file => AppFile
}),
error({plugin_app_file_not_found, AppFile}) error({plugin_app_file_not_found, AppFile})
end, end,
ok = load_plugin_app(AppName, Ebin). ok = load_plugin_app(AppName, Ebin).
@ -205,12 +192,13 @@ load_plugin(Name) ->
{error, Error0} -> {error, Error0} ->
{error, Error0} {error, Error0}
end end
catch _ : Error : Stacktrace -> catch Error : Reason : Stacktrace ->
?SLOG(alert, #{ ?SLOG(alert, #{
msg => "plugin_load_failed", msg => "plugin_load_failed",
name => Name, name => Name,
error => Error, exception => Error,
stk => Stacktrace reason => Reason,
stacktrace => Stacktrace
}), }),
{error, parse_config_file_failed} {error, parse_config_file_failed}
end. end.
@ -228,23 +216,19 @@ load_app(App) ->
start_app(App) -> start_app(App) ->
case application:ensure_all_started(App) of case application:ensure_all_started(App) of
{ok, Started} -> {ok, Started} ->
?SLOG(info, #{ case Started =/= [] of
msg => "all_started_plugins", true -> ?SLOG(info, #{msg => "started_plugin_dependency_apps", apps => Started});
started => Started false -> ok
}), end,
?SLOG(info, #{ ?SLOG(info, #{msg => "started_plugin_app", app => App}),
msg => "load_plugin_app_successfully",
app => App
}),
ok; ok;
{error, {ErrApp, Reason}} -> {error, {ErrApp, Reason}} ->
?SLOG(error, #{ ?SLOG(error, #{msg => failed_to_start_plugin_app,
msg => "load_plugin_failed_cannot_started", app => App,
app => App, err_app => ErrApp,
err_app => ErrApp, reason => Reason
reason => Reason }),
}), {error, failed_to_start_plugin_app}
{error, {ErrApp, Reason}}
end. end.
unload_plugin(App) -> unload_plugin(App) ->
@ -258,23 +242,16 @@ unload_plugin(App) ->
stop_app(App) -> stop_app(App) ->
case application:stop(App) of case application:stop(App) of
ok -> ok ->
?SLOG(info, #{ ?SLOG(info, #{msg => "stop_plugin_successfully", app => App}),
msg => "stop_plugin_successfully",
app => App
}),
ok; ok;
{error, {not_started, App}} -> {error, {not_started, App}} ->
?SLOG(error, #{ ?SLOG(info, #{msg => "plugin_not_started", app => App}),
msg => "plugin_not_started",
app => App
}),
ok; ok;
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{msg => "failed_to_stop_plugin_app",
msg => "stop_plugin", app => App,
app => App, error => Reason
error => Reason }),
}),
{error, Reason} {error, Reason}
end. end.
@ -286,9 +263,3 @@ names(started_app) ->
names(Plugins) -> names(Plugins) ->
[Name || #plugin{name = Name} <- Plugins]. [Name || #plugin{name = Name} <- Plugins].
funlog(Key, Value) ->
?SLOG(info, #{
key => string:join(Key, "."),
value => Value
}).

View File

@ -100,22 +100,22 @@ handle_call({submit, Task}, _From, State) ->
{reply, catch run(Task), State}; {reply, catch run(Task), State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast({async_submit, Task}, State) -> handle_cast({async_submit, Task}, State) ->
try run(Task) try run(Task)
catch _:Error:Stacktrace -> catch Error:Reason:Stacktrace ->
?SLOG(error, #{ ?SLOG(error, #{msg => "async_submit_error",
msg => "error", exception => Error,
error => Error, reason => Reason,
stk => Stacktrace stacktrace => Stacktrace
}) })
end, end,
{noreply, State}; {noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", req => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info(Info, State) -> handle_info(Info, State) ->

View File

@ -203,11 +203,11 @@ handle_call({delete_route, Topic, Dest}, _From, State) ->
{reply, Ok, State}; {reply, Ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", req => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info(Info, State) -> handle_info(Info, State) ->

View File

@ -109,11 +109,11 @@ init([]) ->
{ok, #{nodes => Nodes}, hibernate}. {ok, #{nodes => Nodes}, hibernate}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", req => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}},
@ -130,10 +130,7 @@ handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) ->
{noreply, State}; {noreply, State};
handle_info({mnesia_table_event, Event}, State) -> handle_info({mnesia_table_event, Event}, State) ->
?SLOG(error,#{ ?SLOG(error,#{msg => "unexpected_mnesia_table_event", event => Event}),
msg => "unexpected_mnesia_table_event",
event => Event
}),
{noreply, State}; {noreply, State};
handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->

View File

@ -479,16 +479,12 @@ log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of
true -> true ->
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
?SLOG(warning, #{ ?SLOG(warning, #{msg => "dropped_qos0_msg",
msg => "dropped_qos0_msg", payload => emqx_message:to_log_map(Msg)});
payload => emqx_message:format(Msg)
});
false -> false ->
ok = emqx_metrics:inc('delivery.dropped.queue_full'), ok = emqx_metrics:inc('delivery.dropped.queue_full'),
?SLOG(warning, #{ ?SLOG(warning, #{msg => "dropped_msg_due_to_mqueue_is_full",
msg => "dropped_msg_due_to_mqueue_is_full", payload => emqx_message:to_log_map(Msg)})
payload => emqx_message:format(Msg)
})
end. end.
enrich_fun(Session = #session{subscriptions = Subs}) -> enrich_fun(Session = #session{subscriptions = Subs}) ->

View File

@ -347,11 +347,8 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P
handle_info({mnesia_table_event, _Event}, State) -> handle_info({mnesia_table_event, _Event}, State) ->
{noreply, State}; {noreply, State};
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{pmon = PMon}) ->
?SLOG(info, #{ ?SLOG(info, #{msg => "shared_subscriber_down", sub_pid => SubPid, reason => Reason}),
msg => "shared_subscriber_down",
sub_pid => SubPid
}),
cleanup_down(SubPid), cleanup_down(SubPid),
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})}; {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};

View File

@ -202,7 +202,7 @@ handle_call(stop, _From, State) ->
{stop, normal, ok, State}; {stop, normal, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast({setstat, Stat, MaxStat, Val}, State) -> handle_cast({setstat, Stat, MaxStat, Val}, State) ->
@ -221,10 +221,9 @@ handle_cast({update_interval, Update = #update{name = Name}},
State = #state{updates = Updates}) -> State = #state{updates = Updates}) ->
NState = case lists:keyfind(Name, #update.name, Updates) of NState = case lists:keyfind(Name, #update.name, Updates) of
#update{} -> #update{} ->
?SLOG(warning, #{ ?SLOG(warning, #{msg => "duplicated_update",
msg => "duplicated_update", name => Name
name => Name }),
}),
State; State;
false -> State#state{updates = [Update|Updates]} false -> State#state{updates = [Update|Updates]}
end, end,
@ -235,7 +234,7 @@ handle_cast({cancel_update, Name}, State = #state{updates = Updates}) ->
{noreply, State#state{updates = Updates1}}; {noreply, State#state{updates = Updates1}};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", req => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) -> handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) ->
@ -244,12 +243,13 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
func = UpFun}, Acc) when C =< 0 -> func = UpFun}, Acc) when C =< 0 ->
try UpFun() try UpFun()
catch catch
_:Error -> Error : Reason : Stacktrace ->
?SLOG(error, #{ ?SLOG(error, #{msg => "update_name_failed",
msg => "update_name_failed", name => Name,
name => Name, exception => Error,
error => Error reason => Reason,
}) stacktrace => Stacktrace
})
end, end,
[Update#update{countdown = I} | Acc]; [Update#update{countdown = I} | Acc];
(Update = #update{countdown = C}, Acc) -> (Update = #update{countdown = C}, Acc) ->
@ -284,4 +284,3 @@ safe_update_element(Key, Val) ->
val => Val val => Val
}) })
end. end.

View File

@ -134,11 +134,11 @@ handle_call(uptime, _From, State) ->
{reply, uptime(State), State}; {reply, uptime(State), State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", req => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) -> handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->

View File

@ -93,44 +93,41 @@ handle_cast(Msg, State) ->
handle_info({monitor, Pid, long_gc, Info}, State) -> handle_info({monitor, Pid, long_gc, Info}, State) ->
suppress({long_gc, Pid}, suppress({long_gc, Pid},
fun() -> fun() ->
WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]), WarnMsg = io_lib:format("long_gc warning: pid = ~p", [Pid]),
?SLOG(warning, #{ ?SLOG(warning, #{msg => long_gc,
warn_msg => WarnMsg, info => Info,
pid_info => procinfo(Pid) porcinfo => procinfo(Pid)
}), }),
safe_publish(long_gc, WarnMsg) safe_publish(long_gc, WarnMsg)
end, State); end, State);
handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) -> handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
suppress({long_schedule, Pid}, suppress({long_schedule, Pid},
fun() -> fun() ->
WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]), WarnMsg = io_lib:format("long_schedule warning: pid = ~p", [Pid]),
?SLOG(warning, #{ ?SLOG(warning, #{msg => long_schedule,
warn_msg => WarnMsg, info => Info,
pid_info => procinfo(Pid) procinfo => procinfo(Pid)}),
}),
safe_publish(long_schedule, WarnMsg) safe_publish(long_schedule, WarnMsg)
end, State); end, State);
handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) -> handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
suppress({long_schedule, Port}, suppress({long_schedule, Port},
fun() -> fun() ->
WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]), WarnMsg = io_lib:format("long_schedule warning: port = ~p", [Port]),
?SLOG(warning, #{ ?SLOG(warning, #{msg => long_schedule,
warn_msg => WarnMsg, info => Info,
port_info => erlang:port_info(Port) portinfo => portinfo(Port)}),
}),
safe_publish(long_schedule, WarnMsg) safe_publish(long_schedule, WarnMsg)
end, State); end, State);
handle_info({monitor, Pid, large_heap, Info}, State) -> handle_info({monitor, Pid, large_heap, Info}, State) ->
suppress({large_heap, Pid}, suppress({large_heap, Pid},
fun() -> fun() ->
WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]), WarnMsg = io_lib:format("large_heap warning: pid = ~p", [Pid]),
?SLOG(warning, #{ ?SLOG(warning, #{msg => large_heap,
warn_msg => WarnMsg, info => Info,
pid_info => procinfo(Pid) procinfo => procinfo(Pid)}),
}),
safe_publish(large_heap, WarnMsg) safe_publish(large_heap, WarnMsg)
end, State); end, State);
@ -138,11 +135,10 @@ handle_info({monitor, SusPid, busy_port, Port}, State) ->
suppress({busy_port, Port}, suppress({busy_port, Port},
fun() -> fun() ->
WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]), WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
?SLOG(warning, #{ ?SLOG(warning, #{msg => busy_port,
warn_msg => WarnMsg, portinfo => portinfo(Port),
pid_info => procinfo(SusPid), procinfo => procinfo(SusPid)
port_info => erlang:port_info(Port) }),
}),
safe_publish(busy_port, WarnMsg) safe_publish(busy_port, WarnMsg)
end, State); end, State);
@ -150,11 +146,9 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
suppress({busy_dist_port, Port}, suppress({busy_dist_port, Port},
fun() -> fun() ->
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]), WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
?SLOG(warning, #{ ?SLOG(warning, #{msg => busy_dist_port,
warn_msg => WarnMsg, portinfo => portinfo(Port),
pid_info => procinfo(SusPid), procinfo => procinfo(SusPid)}),
port_info => erlang:port_info(Port)
}),
safe_publish(busy_dist_port, WarnMsg) safe_publish(busy_dist_port, WarnMsg)
end, State); end, State);
@ -190,11 +184,14 @@ suppress(Key, SuccFun, State = #{events := Events}) ->
end. end.
procinfo(Pid) -> procinfo(Pid) ->
case {emqx_vm:get_process_info(Pid), emqx_vm:get_process_gc_info(Pid)} of [{pid, Pid} | procinfo_l(emqx_vm:get_process_gc_info(Pid))] ++
{undefined, _} -> undefined; procinfo_l(emqx_vm:get_process_info(Pid)).
{_, undefined} -> undefined;
{Info, GcInfo} -> Info ++ GcInfo procinfo_l(undefined) -> [];
end. procinfo_l(List) -> List.
portinfo(Port) ->
[{port, Port} | erlang:port_info(Port)].
safe_publish(Event, WarnMsg) -> safe_publish(Event, WarnMsg) ->
Topic = emqx_topic:systop(lists:concat(['sysmon/', Event])), Topic = emqx_topic:systop(lists:concat(['sysmon/', Event])),

View File

@ -115,25 +115,18 @@ install_trace_handler(Who, Level, LogFile) ->
{fun filter_by_meta_key/2, Who}}]}) {fun filter_by_meta_key/2, Who}}]})
of of
ok -> ok ->
?SLOG(info, #{msg => "start_trace_for", who => Who}); ?SLOG(info, #{msg => "start_trace", who => Who});
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "start_trace_for_who_failed", who => Who, reason => Reason}), ?SLOG(error, #{msg => "failed_to_trace", who => Who, reason => Reason}),
{error, Reason} {error, Reason}
end. end.
uninstall_trance_handler(Who) -> uninstall_trance_handler(Who) ->
case logger:remove_handler(handler_id(Who)) of case logger:remove_handler(handler_id(Who)) of
ok -> ok ->
?SLOG(info, #{ ?SLOG(info, #{msg => "stop_trace", who => Who});
msg => "stop_trace_for",
who => Who
});
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{msg => "failed_to_stop_trace", who => Who, reason => Reason}),
msg => "stop_trace_for",
who => Who,
reason => Reason
}),
{error, Reason} {error, Reason}
end. end.

View File

@ -49,17 +49,11 @@ init([]) ->
{ok, #{}}. {ok, #{}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{ ?SLOG(error, #{msg => "unexpected_call", call => Req}),
msg => "[VM_MON]_unexpected_call",
req => Req
}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{ ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
msg => "[VM_MON]_unexpected_cast",
cast => Msg
}),
{noreply, State}. {noreply, State}.
handle_info({timeout, _Timer, check}, State) -> handle_info({timeout, _Timer, check}, State) ->
@ -81,10 +75,7 @@ handle_info({timeout, _Timer, check}, State) ->
{noreply, State}; {noreply, State};
handle_info(Info, State) -> handle_info(Info, State) ->
?SLOG(error, #{ ?SLOG(error, #{msg => "unexpected_info", info => Info}),
msg => "[VM_MON]_unexpected_info",
info => Info
}),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->

View File

@ -181,13 +181,11 @@ init(Req, #{listener := {Type, Listener}} = Opts) ->
idle_timeout => get_ws_opts(Type, Listener, idle_timeout) idle_timeout => get_ws_opts(Type, Listener, idle_timeout)
}, },
case check_origin_header(Req, Opts) of case check_origin_header(Req, Opts) of
{error, Message} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{msg => "invalid_origin_header", reason => Reason}),
msg => "invalid_origin_header",
payload => Message
}),
{ok, cowboy_req:reply(403, Req), WsOpts}; {ok, cowboy_req:reply(403, Req), WsOpts};
ok -> parse_sec_websocket_protocol(Req, Opts, WsOpts) ok ->
parse_sec_websocket_protocol(Req, Opts, WsOpts)
end. end.
parse_sec_websocket_protocol(Req, #{listener := {Type, Listener}} = Opts, WsOpts) -> parse_sec_websocket_protocol(Req, #{listener := {Type, Listener}} = Opts, WsOpts) ->
@ -234,7 +232,7 @@ parse_header_fun_origin(Req, #{listener := {Type, Listener}}) ->
Value -> Value ->
case lists:member(Value, get_ws_opts(Type, Listener, check_origins)) of case lists:member(Value, get_ws_opts(Type, Listener, check_origins)) of
true -> ok; true -> ok;
false -> {origin_not_allowed, Value} false -> {error, #{bad_origin => Value}}
end end
end. end.
@ -266,12 +264,12 @@ websocket_init([Req, #{zone := Zone, listener := {Type, Listener}} = Opts]) ->
WsCookie = try cowboy_req:parse_cookies(Req) WsCookie = try cowboy_req:parse_cookies(Req)
catch catch
error:badarg -> error:badarg ->
?SLOG(error, #{msg => "illegal_cookie"}), ?SLOG(error, #{msg => "bad_cookie"}),
undefined; undefined;
Error:Reason -> Error:Reason ->
?SLOG(error, #{msg => "failed_to_parse_cookie", ?SLOG(error, #{msg => "failed_to_parse_cookie",
error => Error, exception => Error,
reason => Reason}), reason => Reason}),
undefined undefined
end, end,
ConnInfo = #{socktype => ws, ConnInfo = #{socktype => ws,
@ -328,7 +326,7 @@ websocket_handle({binary, Data}, State) when is_list(Data) ->
websocket_handle({binary, iolist_to_binary(Data)}, State); websocket_handle({binary, iolist_to_binary(Data)}, State);
websocket_handle({binary, Data}, State) -> websocket_handle({binary, Data}, State) ->
?SLOG(debug, #{msg => "recv_data", data => Data}), ?SLOG(debug, #{msg => "RECV_data", data => Data, transport => websocket}),
ok = inc_recv_stats(1, iolist_size(Data)), ok = inc_recv_stats(1, iolist_size(Data)),
NState = ensure_stats_timer(State), NState = ensure_stats_timer(State),
return(parse_incoming(Data, NState)); return(parse_incoming(Data, NState));
@ -450,7 +448,7 @@ handle_info({connack, ConnAck}, State) ->
return(enqueue(ConnAck, State)); return(enqueue(ConnAck, State));
handle_info({close, Reason}, State) -> handle_info({close, Reason}, State) ->
?SLOG(debug, #{msg => "force_to_close_the_socket", reason => Reason}), ?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
return(enqueue({close, Reason}, State)); return(enqueue({close, Reason}, State));
handle_info({event, connected}, State = #state{channel = Channel}) -> handle_info({event, connected}, State = #state{channel = Channel}) ->
@ -632,10 +630,9 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback,
serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet) -> fun(Packet) ->
try emqx_frame:serialize_pkt(Packet, Serialize) of try emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> -> ?SLOG(warning, #{ <<>> -> ?SLOG(warning, #{msg => "packet_discarded",
msg => "packet_is_discarded_due_to_the_frame_is_too_large", reason => "frame_too_large",
packet => emqx_packet:format(Packet) packet => emqx_packet:format(Packet)}),
}),
ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped'),
<<>>; <<>>;

View File

@ -141,13 +141,6 @@ t_undefined_headers(_) ->
Msg2 = emqx_message:set_header(c, 3, Msg), Msg2 = emqx_message:set_header(c, 3, Msg),
?assertEqual(3, emqx_message:get_header(c, Msg2)). ?assertEqual(3, emqx_message:get_header(c, Msg2)).
t_format(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
io:format("~s~n", [emqx_message:format(Msg)]),
Msg1 = emqx_message:set_header(properties, #{'Subscription-Identifier' => 1},
emqx_message:set_flag(dup, Msg)),
io:format("~s~n", [emqx_message:format(Msg1)]).
t_is_expired(_) -> t_is_expired(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
?assertNot(emqx_message:is_expired(Msg)), ?assertNot(emqx_message:is_expired(Msg)),

View File

@ -24,21 +24,22 @@
-define(SYSMON, emqx_sys_mon). -define(SYSMON, emqx_sys_mon).
-define(FAKE_PORT, hd(erlang:ports())).
-define(FAKE_INFO, [{timeout, 100}, {in, foo}, {out, {?MODULE, bar, 1}}]).
-define(INPUTINFO, [{self(), long_gc, -define(INPUTINFO, [{self(), long_gc,
concat_str("long_gc warning: pid = ~p, info: ~p", self(), "hello"), "hello"}, fmt("long_gc warning: pid = ~p", [self()]), ?FAKE_INFO},
{self(), long_schedule, {self(), long_schedule,
concat_str("long_schedule warning: pid = ~p, info: ~p", self(), "hello"), "hello"}, fmt("long_schedule warning: pid = ~p", [self()]), ?FAKE_INFO},
{self(), large_heap, {self(), large_heap,
concat_str("large_heap warning: pid = ~p, info: ~p", self(), "hello"), "hello"}, fmt("large_heap warning: pid = ~p", [self()]), ?FAKE_INFO},
{self(), busy_port, {self(), busy_port,
concat_str("busy_port warning: suspid = ~p, port = ~p", fmt("busy_port warning: suspid = ~p, port = ~p",
self(), list_to_port("#Port<0.4>")), list_to_port("#Port<0.4>")}, [self(), ?FAKE_PORT]), ?FAKE_PORT},
{self(), busy_dist_port, {self(), busy_dist_port,
concat_str("busy_dist_port warning: suspid = ~p, port = ~p", fmt("busy_dist_port warning: suspid = ~p, port = ~p",
self(), list_to_port("#Port<0.4>")),list_to_port("#Port<0.4>")}, [self(), ?FAKE_PORT]), ?FAKE_PORT},
{list_to_port("#Port<0.4>"), long_schedule, {?FAKE_PORT, long_schedule,
concat_str("long_schedule warning: port = ~p, info: ~p", fmt("long_schedule warning: port = ~p", [?FAKE_PORT]), ?FAKE_INFO}
list_to_port("#Port<0.4>"), "hello"), "hello"}
]). ]).
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
@ -82,16 +83,16 @@ t_procinfo(_) ->
ok = meck:new(emqx_vm, [passthrough, no_history]), ok = meck:new(emqx_vm, [passthrough, no_history]),
ok = meck:expect(emqx_vm, get_process_info, fun(_) -> [] end), ok = meck:expect(emqx_vm, get_process_info, fun(_) -> [] end),
ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> [] end), ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> [] end),
?assertEqual([], emqx_sys_mon:procinfo([])), ?assertEqual([{pid, undefined}], emqx_sys_mon:procinfo(undefined)),
ok = meck:expect(emqx_vm, get_process_info, fun(_) -> ok end), ok = meck:expect(emqx_vm, get_process_info, fun(_) -> [] end),
ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> undefined end), ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> undefined end),
?assertEqual(undefined, emqx_sys_mon:procinfo([])), ?assertEqual([{pid, self()}], emqx_sys_mon:procinfo(self())),
ok = meck:unload(emqx_vm). ok = meck:unload(emqx_vm).
t_sys_mon(_Config) -> t_sys_mon(_Config) ->
lists:foreach( lists:foreach(
fun({PidOrPort, SysMonName,ValidateInfo, InfoOrPort}) -> fun({PidOrPort, SysMonName, ValidateInfo, InfoOrPort}) ->
validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort)
end, ?INPUTINFO). end, ?INPUTINFO).
t_sys_mon2(_Config) -> t_sys_mon2(_Config) ->
@ -101,7 +102,7 @@ t_sys_mon2(_Config) ->
?assertEqual(ok, gen_server:cast(?SYSMON, ignored)), ?assertEqual(ok, gen_server:cast(?SYSMON, ignored)),
gen_server:stop(?SYSMON). gen_server:stop(?SYSMON).
validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) -> validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort) ->
{ok, C} = emqtt:start_link([{host, "localhost"}]), {ok, C} = emqtt:start_link([{host, "localhost"}]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
emqtt:subscribe(C, emqx_topic:systop(lists:concat(['sysmon/', SysMonName])), qos1), emqtt:subscribe(C, emqx_topic:systop(lists:concat(['sysmon/', SysMonName])), qos1),
@ -117,6 +118,4 @@ validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) ->
end, end,
emqtt:stop(C). emqtt:stop(C).
concat_str(ValidateInfo, InfoOrPort, Info) -> fmt(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).
WarnInfo = io_lib:format(ValidateInfo, [InfoOrPort, Info]),
lists:flatten(WarnInfo).

View File

@ -229,7 +229,7 @@ t_ws_check_origin(_) ->
?assertMatch({gun_upgrade, _}, ?assertMatch({gun_upgrade, _},
start_ws_client(#{protocols => [<<"mqtt">>], start_ws_client(#{protocols => [<<"mqtt">>],
headers => [{<<"origin">>, <<"http://localhost:18083">>}]})), headers => [{<<"origin">>, <<"http://localhost:18083">>}]})),
?assertMatch({gun_response, {_, 500, _}}, ?assertMatch({gun_response, {_, 403, _}},
start_ws_client(#{protocols => [<<"mqtt">>], start_ws_client(#{protocols => [<<"mqtt">>],
headers => [{<<"origin">>, <<"http://localhost:18080">>}]})). headers => [{<<"origin">>, <<"http://localhost:18080">>}]})).