chore: add tag for logs from MQTT connection modules
This commit is contained in:
parent
6c9eb16a95
commit
cb0066d639
|
@ -186,6 +186,8 @@
|
|||
-define(LIMITER_BYTES_IN, bytes).
|
||||
-define(LIMITER_MESSAGE_IN, messages).
|
||||
|
||||
-define(LOG(Level, Data), ?SLOG(Level, (Data)#{tag => "MQTT"})).
|
||||
|
||||
-dialyzer({no_match, [info/2]}).
|
||||
-dialyzer(
|
||||
{nowarn_function, [
|
||||
|
@ -282,7 +284,7 @@ async_set_keepalive(OS, Pid, Idle, Interval, Probes) ->
|
|||
{ok, Options} ->
|
||||
async_set_socket_options(Pid, Options);
|
||||
{error, {unsupported_os, OS}} ->
|
||||
?SLOG(warning, #{
|
||||
?LOG(warning, #{
|
||||
msg => "Unsupported operation: set TCP keepalive",
|
||||
os => OS
|
||||
}),
|
||||
|
@ -774,7 +776,7 @@ handle_timeout(TRef, Msg, State) ->
|
|||
%% Parse incoming data
|
||||
-compile({inline, [when_bytes_in/3]}).
|
||||
when_bytes_in(Oct, Data, State) ->
|
||||
?SLOG(debug, #{
|
||||
?LOG(debug, #{
|
||||
msg => "raw_bin_received",
|
||||
size => Oct,
|
||||
bin => binary_to_list(binary:encode_hex(Data)),
|
||||
|
@ -810,7 +812,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
|||
parse_incoming(Rest, [Packet | Packets], NState)
|
||||
catch
|
||||
throw:{?FRAME_PARSE_ERROR, Reason} ->
|
||||
?SLOG(info, #{
|
||||
?LOG(info, #{
|
||||
reason => Reason,
|
||||
at_state => emqx_frame:describe_state(ParseState),
|
||||
input_bytes => Data,
|
||||
|
@ -818,7 +820,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
|||
}),
|
||||
{[{frame_error, Reason} | Packets], State};
|
||||
error:Reason:Stacktrace ->
|
||||
?SLOG(error, #{
|
||||
?LOG(error, #{
|
||||
at_state => emqx_frame:describe_state(ParseState),
|
||||
input_bytes => Data,
|
||||
parsed_packets => Packets,
|
||||
|
@ -873,7 +875,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|||
fun(Packet) ->
|
||||
try emqx_frame:serialize_pkt(Packet, Serialize) of
|
||||
<<>> ->
|
||||
?SLOG(warning, #{
|
||||
?LOG(warning, #{
|
||||
msg => "packet_is_discarded",
|
||||
reason => "frame_is_too_large",
|
||||
packet => emqx_packet:format(Packet, hidden)
|
||||
|
@ -889,13 +891,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|||
catch
|
||||
%% Maybe Never happen.
|
||||
throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
|
||||
?SLOG(info, #{
|
||||
?LOG(info, #{
|
||||
reason => Reason,
|
||||
input_packet => Packet
|
||||
}),
|
||||
erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
|
||||
error:Reason:Stacktrace ->
|
||||
?SLOG(error, #{
|
||||
?LOG(error, #{
|
||||
input_packet => Packet,
|
||||
exception => Reason,
|
||||
stacktrace => Stacktrace
|
||||
|
@ -1018,7 +1020,7 @@ check_limiter(
|
|||
WhenOk(Data, Msgs, State#state{limiter = Limiter2});
|
||||
{pause, Time, Limiter2} ->
|
||||
?SLOG(debug, #{
|
||||
msg => "pause_time_dueto_rate_limit",
|
||||
msg => "pause_time_due_to_rate_limit",
|
||||
needs => Needs,
|
||||
time_in_ms => Time
|
||||
}),
|
||||
|
@ -1070,7 +1072,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
|
|||
);
|
||||
{pause, Time, Limiter2} ->
|
||||
?SLOG(debug, #{
|
||||
msg => "pause_time_dueto_rate_limit",
|
||||
msg => "pause_time_due_to_rate_limit",
|
||||
types => Types,
|
||||
time_in_ms => Time
|
||||
}),
|
||||
|
|
|
@ -128,6 +128,8 @@
|
|||
-dialyzer({no_match, [info/2]}).
|
||||
-dialyzer({nowarn_function, [websocket_init/1]}).
|
||||
|
||||
-define(LOG(Level, Data), ?SLOG(Level, (Data)#{tag => "MQTT"})).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Info, Stats
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -401,7 +403,7 @@ get_peer_info(Type, Listener, Req, Opts) ->
|
|||
websocket_handle({binary, Data}, State) when is_list(Data) ->
|
||||
websocket_handle({binary, iolist_to_binary(Data)}, State);
|
||||
websocket_handle({binary, Data}, State) ->
|
||||
?SLOG(debug, #{
|
||||
?LOG(debug, #{
|
||||
msg => "raw_bin_received",
|
||||
size => iolist_size(Data),
|
||||
bin => binary_to_list(binary:encode_hex(Data)),
|
||||
|
@ -428,7 +430,7 @@ websocket_handle({Frame, _}, State) when Frame =:= ping; Frame =:= pong ->
|
|||
return(State);
|
||||
websocket_handle({Frame, _}, State) ->
|
||||
%% TODO: should not close the ws connection
|
||||
?SLOG(error, #{msg => "unexpected_frame", frame => Frame}),
|
||||
?LOG(error, #{msg => "unexpected_frame", frame => Frame}),
|
||||
shutdown(unexpected_ws_frame, State).
|
||||
websocket_info({call, From, Req}, State) ->
|
||||
handle_call(From, Req, State);
|
||||
|
@ -714,7 +716,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
|||
parse_incoming(Rest, [{incoming, Packet} | Packets], NState)
|
||||
catch
|
||||
throw:{?FRAME_PARSE_ERROR, Reason} ->
|
||||
?SLOG(info, #{
|
||||
?LOG(info, #{
|
||||
reason => Reason,
|
||||
at_state => emqx_frame:describe_state(ParseState),
|
||||
input_bytes => Data
|
||||
|
@ -722,7 +724,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
|||
FrameError = {frame_error, Reason},
|
||||
{[{incoming, FrameError} | Packets], State};
|
||||
error:Reason:Stacktrace ->
|
||||
?SLOG(error, #{
|
||||
?LOG(error, #{
|
||||
at_state => emqx_frame:describe_state(ParseState),
|
||||
input_bytes => Data,
|
||||
exception => Reason,
|
||||
|
@ -812,7 +814,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|||
fun(Packet) ->
|
||||
try emqx_frame:serialize_pkt(Packet, Serialize) of
|
||||
<<>> ->
|
||||
?SLOG(warning, #{
|
||||
?LOG(warning, #{
|
||||
msg => "packet_discarded",
|
||||
reason => "frame_too_large",
|
||||
packet => emqx_packet:format(Packet)
|
||||
|
@ -828,13 +830,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|||
catch
|
||||
%% Maybe Never happen.
|
||||
throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
|
||||
?SLOG(info, #{
|
||||
?LOG(info, #{
|
||||
reason => Reason,
|
||||
input_packet => Packet
|
||||
}),
|
||||
erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
|
||||
error:Reason:Stacktrace ->
|
||||
?SLOG(error, #{
|
||||
?LOG(error, #{
|
||||
input_packet => Packet,
|
||||
exception => Reason,
|
||||
stacktrace => Stacktrace
|
||||
|
|
Loading…
Reference in New Issue