Merge remote-tracking branch 'origin/release-56' into sync-5.5.1

This commit is contained in:
zmstone 2024-03-06 17:27:54 +01:00
commit e99546e009
7 changed files with 54 additions and 23 deletions

View File

@ -186,6 +186,8 @@
-define(LIMITER_BYTES_IN, bytes). -define(LIMITER_BYTES_IN, bytes).
-define(LIMITER_MESSAGE_IN, messages). -define(LIMITER_MESSAGE_IN, messages).
-define(LOG(Level, Data), ?SLOG(Level, (Data)#{tag => "MQTT"})).
-dialyzer({no_match, [info/2]}). -dialyzer({no_match, [info/2]}).
-dialyzer( -dialyzer(
{nowarn_function, [ {nowarn_function, [
@ -282,7 +284,7 @@ async_set_keepalive(OS, Pid, Idle, Interval, Probes) ->
{ok, Options} -> {ok, Options} ->
async_set_socket_options(Pid, Options); async_set_socket_options(Pid, Options);
{error, {unsupported_os, OS}} -> {error, {unsupported_os, OS}} ->
?SLOG(warning, #{ ?LOG(warning, #{
msg => "Unsupported operation: set TCP keepalive", msg => "Unsupported operation: set TCP keepalive",
os => OS os => OS
}), }),
@ -774,7 +776,7 @@ handle_timeout(TRef, Msg, State) ->
%% Parse incoming data %% Parse incoming data
-compile({inline, [when_bytes_in/3]}). -compile({inline, [when_bytes_in/3]}).
when_bytes_in(Oct, Data, State) -> when_bytes_in(Oct, Data, State) ->
?SLOG(debug, #{ ?LOG(debug, #{
msg => "raw_bin_received", msg => "raw_bin_received",
size => Oct, size => Oct,
bin => binary_to_list(binary:encode_hex(Data)), bin => binary_to_list(binary:encode_hex(Data)),
@ -810,7 +812,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
parse_incoming(Rest, [Packet | Packets], NState) parse_incoming(Rest, [Packet | Packets], NState)
catch catch
throw:{?FRAME_PARSE_ERROR, Reason} -> throw:{?FRAME_PARSE_ERROR, Reason} ->
?SLOG(info, #{ ?LOG(info, #{
reason => Reason, reason => Reason,
at_state => emqx_frame:describe_state(ParseState), at_state => emqx_frame:describe_state(ParseState),
input_bytes => Data, input_bytes => Data,
@ -818,7 +820,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
}), }),
{[{frame_error, Reason} | Packets], State}; {[{frame_error, Reason} | Packets], State};
error:Reason:Stacktrace -> error:Reason:Stacktrace ->
?SLOG(error, #{ ?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState), at_state => emqx_frame:describe_state(ParseState),
input_bytes => Data, input_bytes => Data,
parsed_packets => Packets, parsed_packets => Packets,
@ -873,7 +875,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet) -> fun(Packet) ->
try emqx_frame:serialize_pkt(Packet, Serialize) of try emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> -> <<>> ->
?SLOG(warning, #{ ?LOG(warning, #{
msg => "packet_is_discarded", msg => "packet_is_discarded",
reason => "frame_is_too_large", reason => "frame_is_too_large",
packet => emqx_packet:format(Packet, hidden) packet => emqx_packet:format(Packet, hidden)
@ -889,13 +891,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
catch catch
%% Maybe Never happen. %% Maybe Never happen.
throw:{?FRAME_SERIALIZE_ERROR, Reason} -> throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
?SLOG(info, #{ ?LOG(info, #{
reason => Reason, reason => Reason,
input_packet => Packet input_packet => Packet
}), }),
erlang:error({?FRAME_SERIALIZE_ERROR, Reason}); erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
error:Reason:Stacktrace -> error:Reason:Stacktrace ->
?SLOG(error, #{ ?LOG(error, #{
input_packet => Packet, input_packet => Packet,
exception => Reason, exception => Reason,
stacktrace => Stacktrace stacktrace => Stacktrace
@ -1018,7 +1020,7 @@ check_limiter(
WhenOk(Data, Msgs, State#state{limiter = Limiter2}); WhenOk(Data, Msgs, State#state{limiter = Limiter2});
{pause, Time, Limiter2} -> {pause, Time, Limiter2} ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "pause_time_dueto_rate_limit", msg => "pause_time_due_to_rate_limit",
needs => Needs, needs => Needs,
time_in_ms => Time time_in_ms => Time
}), }),
@ -1070,7 +1072,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
); );
{pause, Time, Limiter2} -> {pause, Time, Limiter2} ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "pause_time_dueto_rate_limit", msg => "pause_time_due_to_rate_limit",
types => Types, types => Types,
time_in_ms => Time time_in_ms => Time
}), }),

View File

@ -22,7 +22,11 @@
check_config(X) -> logger_formatter:check_config(X). check_config(X) -> logger_formatter:check_config(X).
%% Principle here is to delegate the formatting to logger_formatter:format/2
%% as much as possible, and only enrich the report with clientid, peername, topic, username
format(#{msg := {report, ReportMap}, meta := Meta} = Event, Config) when is_map(ReportMap) -> format(#{msg := {report, ReportMap}, meta := Meta} = Event, Config) when is_map(ReportMap) ->
%% The most common case, when entering from SLOG macro
%% i.e. logger:log(Level, #{msg => "my_msg", foo => bar})
ReportList = enrich_report(ReportMap, Meta), ReportList = enrich_report(ReportMap, Meta),
Report = Report =
case is_list_report_acceptable(Meta) of case is_list_report_acceptable(Meta) of
@ -33,13 +37,17 @@ format(#{msg := {report, ReportMap}, meta := Meta} = Event, Config) when is_map(
end, end,
logger_formatter:format(Event#{msg := {report, Report}}, Config); logger_formatter:format(Event#{msg := {report, Report}}, Config);
format(#{msg := {string, String}} = Event, Config) -> format(#{msg := {string, String}} = Event, Config) ->
%% copied from logger_formatter:format/2
%% unsure how this case is triggered
format(Event#{msg => {"~ts ", [String]}}, Config); format(Event#{msg => {"~ts ", [String]}}, Config);
%% trace
format(#{msg := Msg0, meta := Meta} = Event, Config) -> format(#{msg := Msg0, meta := Meta} = Event, Config) ->
%% For format strings like logger:log(Level, "~p", [Var])
%% and logger:log(Level, "message", #{key => value})
Msg1 = enrich_client_info(Msg0, Meta), Msg1 = enrich_client_info(Msg0, Meta),
Msg2 = enrich_topic(Msg1, Meta), Msg2 = enrich_topic(Msg1, Meta),
logger_formatter:format(Event#{msg := Msg2}, Config). logger_formatter:format(Event#{msg := Msg2}, Config).
%% Other report callbacks may only accept map() reports such as gen_server formatter
is_list_report_acceptable(#{report_cb := Cb}) -> is_list_report_acceptable(#{report_cb := Cb}) ->
Cb =:= fun logger:format_otp_report/1 orelse Cb =:= fun logger:format_report/1; Cb =:= fun logger:format_otp_report/1 orelse Cb =:= fun logger:format_report/1;
is_list_report_acceptable(_) -> is_list_report_acceptable(_) ->
@ -61,19 +69,21 @@ enrich_report(ReportRaw, Meta) ->
ClientId = maps:get(clientid, Meta, undefined), ClientId = maps:get(clientid, Meta, undefined),
Peer = maps:get(peername, Meta, undefined), Peer = maps:get(peername, Meta, undefined),
Msg = maps:get(msg, ReportRaw, undefined), Msg = maps:get(msg, ReportRaw, undefined),
Tag = maps:get(tag, ReportRaw, undefined),
%% turn it into a list so that the order of the fields is determined %% turn it into a list so that the order of the fields is determined
lists:foldl( lists:foldl(
fun fun
({_, undefined}, Acc) -> Acc; ({_, undefined}, Acc) -> Acc;
(Item, Acc) -> [Item | Acc] (Item, Acc) -> [Item | Acc]
end, end,
maps:to_list(maps:without([topic, msg, clientid, username], ReportRaw)), maps:to_list(maps:without([topic, msg, clientid, username, tag], ReportRaw)),
[ [
{username, try_format_unicode(Username)},
{topic, try_format_unicode(Topic)}, {topic, try_format_unicode(Topic)},
{clientid, try_format_unicode(ClientId)}, {username, try_format_unicode(Username)},
{peername, Peer}, {peername, Peer},
{msg, Msg} {msg, Msg},
{clientid, try_format_unicode(ClientId)},
{tag, Tag}
] ]
). ).

View File

@ -128,6 +128,8 @@
-dialyzer({no_match, [info/2]}). -dialyzer({no_match, [info/2]}).
-dialyzer({nowarn_function, [websocket_init/1]}). -dialyzer({nowarn_function, [websocket_init/1]}).
-define(LOG(Level, Data), ?SLOG(Level, (Data)#{tag => "MQTT"})).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Info, Stats %% Info, Stats
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -401,7 +403,7 @@ get_peer_info(Type, Listener, Req, Opts) ->
websocket_handle({binary, Data}, State) when is_list(Data) -> websocket_handle({binary, Data}, State) when is_list(Data) ->
websocket_handle({binary, iolist_to_binary(Data)}, State); websocket_handle({binary, iolist_to_binary(Data)}, State);
websocket_handle({binary, Data}, State) -> websocket_handle({binary, Data}, State) ->
?SLOG(debug, #{ ?LOG(debug, #{
msg => "raw_bin_received", msg => "raw_bin_received",
size => iolist_size(Data), size => iolist_size(Data),
bin => binary_to_list(binary:encode_hex(Data)), bin => binary_to_list(binary:encode_hex(Data)),
@ -428,7 +430,7 @@ websocket_handle({Frame, _}, State) when Frame =:= ping; Frame =:= pong ->
return(State); return(State);
websocket_handle({Frame, _}, State) -> websocket_handle({Frame, _}, State) ->
%% TODO: should not close the ws connection %% TODO: should not close the ws connection
?SLOG(error, #{msg => "unexpected_frame", frame => Frame}), ?LOG(error, #{msg => "unexpected_frame", frame => Frame}),
shutdown(unexpected_ws_frame, State). shutdown(unexpected_ws_frame, State).
websocket_info({call, From, Req}, State) -> websocket_info({call, From, Req}, State) ->
handle_call(From, Req, State); handle_call(From, Req, State);
@ -714,7 +716,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
parse_incoming(Rest, [{incoming, Packet} | Packets], NState) parse_incoming(Rest, [{incoming, Packet} | Packets], NState)
catch catch
throw:{?FRAME_PARSE_ERROR, Reason} -> throw:{?FRAME_PARSE_ERROR, Reason} ->
?SLOG(info, #{ ?LOG(info, #{
reason => Reason, reason => Reason,
at_state => emqx_frame:describe_state(ParseState), at_state => emqx_frame:describe_state(ParseState),
input_bytes => Data input_bytes => Data
@ -722,7 +724,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
FrameError = {frame_error, Reason}, FrameError = {frame_error, Reason},
{[{incoming, FrameError} | Packets], State}; {[{incoming, FrameError} | Packets], State};
error:Reason:Stacktrace -> error:Reason:Stacktrace ->
?SLOG(error, #{ ?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState), at_state => emqx_frame:describe_state(ParseState),
input_bytes => Data, input_bytes => Data,
exception => Reason, exception => Reason,
@ -812,7 +814,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet) -> fun(Packet) ->
try emqx_frame:serialize_pkt(Packet, Serialize) of try emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> -> <<>> ->
?SLOG(warning, #{ ?LOG(warning, #{
msg => "packet_discarded", msg => "packet_discarded",
reason => "frame_too_large", reason => "frame_too_large",
packet => emqx_packet:format(Packet) packet => emqx_packet:format(Packet)
@ -828,13 +830,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
catch catch
%% Maybe Never happen. %% Maybe Never happen.
throw:{?FRAME_SERIALIZE_ERROR, Reason} -> throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
?SLOG(info, #{ ?LOG(info, #{
reason => Reason, reason => Reason,
input_packet => Packet input_packet => Packet
}), }),
erlang:error({?FRAME_SERIALIZE_ERROR, Reason}); erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
error:Reason:Stacktrace -> error:Reason:Stacktrace ->
?SLOG(error, #{ ?LOG(error, #{
input_packet => Packet, input_packet => Packet,
exception => Reason, exception => Reason,
stacktrace => Stacktrace stacktrace => Stacktrace

View File

@ -1192,7 +1192,18 @@ t_parse_date_errors(_) ->
?assertEqual( ?assertEqual(
UnixTsLeap2, UnixTsLeap2,
emqx_rule_funcs:date_to_unix_ts(second, <<"%Y-%m-%d %H:%M:%S">>, <<"2024-03-04 06:56:27">>) emqx_rule_funcs:date_to_unix_ts(second, <<"%Y-%m-%d %H:%M:%S">>, <<"2024-03-04 06:56:27">>)
). ),
%% None zero zone shift with millisecond level precision
Tz1 = calendar:rfc3339_to_system_time("2024-02-23T15:00:00.123+08:00", [{unit, second}]),
?assertEqual(
Tz1,
emqx_rule_funcs:date_to_unix_ts(
second, <<"%Y-%m-%d %H:%M:%S.%3N%:z">>, <<"2024-02-23 15:00:00.123+08:00">>
)
),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Utility functions %% Utility functions

View File

@ -507,7 +507,7 @@ do_parse(DateStr, Unit, Formatter) ->
(nanosecond, V, Res) -> (nanosecond, V, Res) ->
Res + V; Res + V;
(parsed_offset, V, Res) -> (parsed_offset, V, Res) ->
Res - V Res - V * Precise
end, end,
Count = maps:fold(Counter, 0, DateInfo) - (?SECONDS_PER_DAY * Precise), Count = maps:fold(Counter, 0, DateInfo) - (?SECONDS_PER_DAY * Precise),
erlang:convert_time_unit(Count, PrecisionUnit, Unit). erlang:convert_time_unit(Count, PrecisionUnit, Unit).

View File

@ -0,0 +1,3 @@
Improve text log formatter fields order.
`tag` > `clientid` > `msg` > `peername` > `username` > `topic` > [other fields]

View File

@ -0,0 +1,3 @@
Fix rule engine date time string parser.
Prior to this fix, time zone shift can only work when date time string is at second level precision.