Merge pull request #6708 from zhongwencool/improve-error-log
Improve authn/authz error log when query failed.
This commit is contained in:
commit
03a135c592
|
@ -76,14 +76,16 @@
|
|||
-define(TRACE_FILTER, emqx_trace_filter).
|
||||
|
||||
%% Only evaluate when necessary
|
||||
-define(TRACE(Event, Msg, Meta),
|
||||
%% Always debug the trace events.
|
||||
-define(TRACE(Tag, Msg, Meta),
|
||||
begin
|
||||
case persistent_term:get(?TRACE_FILTER, undefined) of
|
||||
undefined -> ok;
|
||||
[] -> ok;
|
||||
List ->
|
||||
emqx_trace:log(List, Event, Msg, Meta)
|
||||
end
|
||||
List -> emqx_trace:log(List, Msg, Meta#{trace_tag => Tag})
|
||||
end,
|
||||
?SLOG(debug, (emqx_trace_formatter:format_meta(Meta))#{msg => Msg, tag => Tag},
|
||||
#{is_trace => false})
|
||||
end).
|
||||
|
||||
%% print to 'user' group leader
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
-export([ publish/1
|
||||
, subscribe/3
|
||||
, unsubscribe/2
|
||||
, log/4
|
||||
, log/3
|
||||
]).
|
||||
|
||||
-export([ start_link/0
|
||||
|
@ -94,13 +94,13 @@ unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore;
|
|||
unsubscribe(Topic, SubOpts) ->
|
||||
?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
|
||||
|
||||
log(List, Event, Msg, Meta0) ->
|
||||
log(List, Msg, Meta0) ->
|
||||
Meta =
|
||||
case logger:get_process_metadata() of
|
||||
undefined -> Meta0;
|
||||
ProcMeta -> maps:merge(ProcMeta, Meta0)
|
||||
end,
|
||||
Log = #{level => trace, event => Event, meta => Meta, msg => Msg},
|
||||
Log = #{level => debug, meta => Meta, msg => Msg},
|
||||
log_filter(List, Log).
|
||||
|
||||
log_filter([], _Log) -> ok;
|
||||
|
|
|
@ -16,39 +16,54 @@
|
|||
-module(emqx_trace_formatter).
|
||||
|
||||
-export([format/2]).
|
||||
-export([format_meta/1]).
|
||||
|
||||
%%%-----------------------------------------------------------------
|
||||
%%% API
|
||||
-spec format(LogEvent, Config) -> unicode:chardata() when
|
||||
LogEvent :: logger:log_event(),
|
||||
Config :: logger:config().
|
||||
format(#{level := trace, event := Event, meta := Meta, msg := Msg},
|
||||
format(#{level := debug, meta := Meta = #{trace_tag := Tag}, msg := Msg},
|
||||
#{payload_encode := PEncode}) ->
|
||||
Time = calendar:system_time_to_rfc3339(erlang:system_time(second)),
|
||||
ClientId = to_iolist(maps:get(clientid, Meta, "")),
|
||||
Peername = maps:get(peername, Meta, ""),
|
||||
MetaBin = format_meta(Meta, PEncode),
|
||||
[Time, " [", Event, "] ", ClientId, "@", Peername, " msg: ", Msg, MetaBin, "\n"];
|
||||
[Time, " [", Tag, "] ", ClientId, "@", Peername, " msg: ", Msg, MetaBin, "\n"];
|
||||
|
||||
format(Event, Config) ->
|
||||
emqx_logger_textfmt:format(Event, Config).
|
||||
|
||||
format_meta(Meta) ->
|
||||
Encode = emqx_trace_handler:payload_encode(),
|
||||
do_format_meta(Meta, Encode).
|
||||
|
||||
format_meta(Meta0, Encode) ->
|
||||
Packet = format_packet(maps:get(packet, Meta0, undefined), Encode),
|
||||
Payload = format_payload(maps:get(payload, Meta0, undefined), Encode),
|
||||
Meta1 = maps:without([msg, clientid, peername, packet, payload], Meta0),
|
||||
case Meta1 =:= #{} of
|
||||
Meta1 = #{packet := Packet0, payload := Payload0} = do_format_meta(Meta0, Encode),
|
||||
Packet = enrich(", packet: ", Packet0),
|
||||
Payload = enrich(", payload: ", Payload0),
|
||||
Meta2 = maps:without([msg, clientid, peername, packet, payload, trace_tag], Meta1),
|
||||
case Meta2 =:= #{} of
|
||||
true -> [Packet, Payload];
|
||||
false -> [Packet, ", ", map_to_iolist(Meta1), Payload]
|
||||
false -> [Packet, ", ", map_to_iolist(Meta2), Payload]
|
||||
end.
|
||||
|
||||
enrich(_, "") -> "";
|
||||
enrich(Key, IoData) -> [Key, IoData].
|
||||
|
||||
do_format_meta(Meta, Encode) ->
|
||||
Meta#{
|
||||
packet => format_packet(maps:get(packet, Meta, undefined), Encode),
|
||||
payload => format_payload(maps:get(payload, Meta, undefined), Encode)
|
||||
}.
|
||||
|
||||
format_packet(undefined, _) -> "";
|
||||
format_packet(Packet, Encode) -> [", packet: ", emqx_packet:format(Packet, Encode)].
|
||||
format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode).
|
||||
|
||||
format_payload(undefined, _) -> "";
|
||||
format_payload(Payload, text) -> [", payload: ", io_lib:format("~ts", [Payload])];
|
||||
format_payload(Payload, hex) -> [", payload(hex): ", emqx_packet:encode_hex(Payload)];
|
||||
format_payload(_, hidden) -> ", payload=******".
|
||||
format_payload(Payload, text) -> io_lib:format("~ts", [Payload]);
|
||||
format_payload(Payload, hex) -> emqx_packet:encode_hex(Payload);
|
||||
format_payload(_, hidden) -> "******".
|
||||
|
||||
to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
||||
to_iolist(Int) when is_integer(Int) -> integer_to_list(Int);
|
||||
|
|
|
@ -120,25 +120,28 @@ running() ->
|
|||
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
|
||||
|
||||
-spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
|
||||
filter_clientid(#{meta := #{clientid := ClientId}} = Log, {ClientId, _Name}) -> Log;
|
||||
filter_clientid(#{meta := Meta = #{clientid := ClientId}} = Log, {MatchId, _Name}) ->
|
||||
filter_ret(ClientId =:= MatchId andalso is_trace(Meta), Log);
|
||||
filter_clientid(_Log, _ExpectId) -> stop.
|
||||
|
||||
-spec filter_topic(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
|
||||
filter_topic(#{meta := #{topic := Topic}} = Log, {TopicFilter, _Name}) ->
|
||||
case emqx_topic:match(Topic, TopicFilter) of
|
||||
true -> Log;
|
||||
false -> stop
|
||||
end;
|
||||
filter_topic(#{meta := Meta = #{topic := Topic}} = Log, {TopicFilter, _Name}) ->
|
||||
filter_ret(is_trace(Meta) andalso emqx_topic:match(Topic, TopicFilter), Log);
|
||||
filter_topic(_Log, _ExpectId) -> stop.
|
||||
|
||||
-spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop.
|
||||
filter_ip_address(#{meta := #{peername := Peername}} = Log, {IP, _Name}) ->
|
||||
case lists:prefix(IP, Peername) of
|
||||
true -> Log;
|
||||
false -> stop
|
||||
end;
|
||||
filter_ip_address(#{meta := Meta = #{peername := Peername}} = Log, {IP, _Name}) ->
|
||||
filter_ret(is_trace(Meta) andalso lists:prefix(IP, Peername), Log);
|
||||
filter_ip_address(_Log, _ExpectId) -> stop.
|
||||
|
||||
-compile({inline, [is_trace/1, filter_ret/2]}).
|
||||
%% TRUE when is_trace is missing.
|
||||
is_trace(#{is_trace := false}) -> false;
|
||||
is_trace(_) -> true.
|
||||
|
||||
filter_ret(true, Log) -> Log;
|
||||
filter_ret(false, _Log) -> stop.
|
||||
|
||||
filters(#{type := clientid, filter := Filter, name := Name}) ->
|
||||
[{clientid, {fun ?MODULE:filter_clientid/2, {Filter, Name}}}];
|
||||
filters(#{type := topic, filter := Filter, name := Name}) ->
|
||||
|
|
|
@ -101,7 +101,7 @@ handle_info({http, {RequestID, Result}},
|
|||
endpoint => Endpoint,
|
||||
reason => Reason}),
|
||||
State1;
|
||||
{_StatusLine, _Headers, Body} ->
|
||||
{StatusLine, Headers, Body} ->
|
||||
try
|
||||
JWKS = jose_jwk:from(emqx_json:decode(Body, [return_maps])),
|
||||
{_, JWKs} = JWKS#jose_jwk.keys,
|
||||
|
@ -109,6 +109,8 @@ handle_info({http, {RequestID, Result}},
|
|||
catch _:_ ->
|
||||
?SLOG(warning, #{msg => "invalid_jwks_returned",
|
||||
endpoint => Endpoint,
|
||||
status => StatusLine,
|
||||
headers => Headers,
|
||||
body => Body}),
|
||||
State1
|
||||
end
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
-include("emqx_authn.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-behaviour(hocon_schema).
|
||||
-behaviour(emqx_authentication).
|
||||
|
@ -271,8 +272,8 @@ verify(JWS, [JWK | More], VerifyClaims) ->
|
|||
{false, _, _} ->
|
||||
verify(JWS, More, VerifyClaims)
|
||||
catch
|
||||
_:_Reason:_Stacktrace ->
|
||||
%% TODO: Add log
|
||||
_:_Reason ->
|
||||
?TRACE("JWT", "authn_jwt_invalid_signature", #{jwk => JWK, jws => JWS}),
|
||||
{error, invalid_signature}
|
||||
end.
|
||||
|
||||
|
|
|
@ -143,6 +143,8 @@ authenticate(#{password := Password} = Credential,
|
|||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "mongodb_query_failed",
|
||||
resource => ResourceId,
|
||||
collection => Collection,
|
||||
selector => Selector2,
|
||||
reason => Reason}),
|
||||
ignore;
|
||||
Doc ->
|
||||
|
@ -152,6 +154,8 @@ authenticate(#{password := Password} = Credential,
|
|||
{error, {cannot_find_password_hash_field, PasswordHashField}} ->
|
||||
?SLOG(error, #{msg => "cannot_find_password_hash_field",
|
||||
resource => ResourceId,
|
||||
collection => Collection,
|
||||
selector => Selector2,
|
||||
password_hash_field => PasswordHashField}),
|
||||
ignore;
|
||||
{error, Reason} ->
|
||||
|
|
|
@ -123,6 +123,9 @@ authenticate(#{password := Password} = Credential,
|
|||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "mysql_query_failed",
|
||||
resource => ResourceId,
|
||||
query => Query,
|
||||
params => Params,
|
||||
timeout => Timeout,
|
||||
reason => Reason}),
|
||||
ignore
|
||||
end.
|
||||
|
|
|
@ -119,6 +119,7 @@ authenticate(#{password := Password} = Credential,
|
|||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "postgresql_query_failed",
|
||||
resource => ResourceId,
|
||||
params => Params,
|
||||
reason => Reason}),
|
||||
ignore
|
||||
end.
|
||||
|
|
|
@ -125,6 +125,7 @@ authenticate(#{password := Password} = Credential,
|
|||
password_hash_algorithm := Algorithm}) ->
|
||||
NKey = binary_to_list(iolist_to_binary(replace_placeholders(Key, Credential))),
|
||||
case emqx_resource:query(ResourceId, {cmd, [Command, NKey | Fields]}) of
|
||||
{ok, []} -> ignore;
|
||||
{ok, Values} ->
|
||||
case merge(Fields, Values) of
|
||||
#{<<"password_hash">> := _} = Selected ->
|
||||
|
@ -137,12 +138,18 @@ authenticate(#{password := Password} = Credential,
|
|||
end;
|
||||
_ ->
|
||||
?SLOG(error, #{msg => "cannot_find_password_hash_field",
|
||||
cmd => Command,
|
||||
keys => NKey,
|
||||
fields => Fields,
|
||||
resource => ResourceId}),
|
||||
ignore
|
||||
end;
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "redis_query_failed",
|
||||
resource => ResourceId,
|
||||
cmd => Command,
|
||||
keys => NKey,
|
||||
fields => Fields,
|
||||
reason => Reason}),
|
||||
ignore
|
||||
end.
|
||||
|
|
|
@ -67,6 +67,8 @@ authorize(Client, PubSub, Topic,
|
|||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "query_mongo_error",
|
||||
reason => Reason,
|
||||
collection => Collection,
|
||||
selector => RenderedSelector,
|
||||
resource_id => ResourceID}),
|
||||
nomatch;
|
||||
[] -> nomatch;
|
||||
|
|
|
@ -58,13 +58,16 @@ authorize(Client, PubSub, Topic,
|
|||
query := {Query, Params}
|
||||
}
|
||||
}) ->
|
||||
case emqx_resource:query(ResourceID, {sql, Query, replvar(Params, Client)}) of
|
||||
RenderParams = replvar(Params, Client),
|
||||
case emqx_resource:query(ResourceID, {sql, Query, RenderParams}) of
|
||||
{ok, _Columns, []} -> nomatch;
|
||||
{ok, Columns, Rows} ->
|
||||
do_authorize(Client, PubSub, Topic, Columns, Rows);
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{ msg => "query_mysql_error"
|
||||
, reason => Reason
|
||||
, query => Query
|
||||
, params => RenderParams
|
||||
, resource_id => ResourceID}),
|
||||
nomatch
|
||||
end.
|
||||
|
|
|
@ -62,8 +62,8 @@ dry_run(Source) ->
|
|||
|
||||
parse_query(Sql) ->
|
||||
case re:run(Sql, ?RE_PLACEHOLDER, [global, {capture, all, list}]) of
|
||||
{match, Capured} ->
|
||||
PlaceHolders = [PlaceHolder || [PlaceHolder] <- Capured],
|
||||
{match, Captured} ->
|
||||
PlaceHolders = [PlaceHolder || [PlaceHolder] <- Captured],
|
||||
Replacements = ["$" ++ integer_to_list(I) || I <- lists:seq(1, length(PlaceHolders))],
|
||||
NSql = lists:foldl(
|
||||
fun({PlaceHolder, Replacement}, S) ->
|
||||
|
@ -80,13 +80,15 @@ authorize(Client, PubSub, Topic,
|
|||
placeholders := Placeholders
|
||||
}
|
||||
}) ->
|
||||
case emqx_resource:query(ResourceID, {prepared_query, ResourceID, replvar(Placeholders, Client)}) of
|
||||
RenderedParams = replvar(Placeholders, Client),
|
||||
case emqx_resource:query(ResourceID, {prepared_query, ResourceID, RenderedParams}) of
|
||||
{ok, _Columns, []} -> nomatch;
|
||||
{ok, Columns, Rows} ->
|
||||
do_authorize(Client, PubSub, Topic, Columns, Rows);
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{ msg => "query_postgresql_error"
|
||||
, reason => Reason
|
||||
, params => RenderedParams
|
||||
, resource_id => ResourceID}),
|
||||
nomatch
|
||||
end.
|
||||
|
|
|
@ -63,6 +63,7 @@ authorize(Client, PubSub, Topic,
|
|||
{error, Reason} ->
|
||||
?SLOG(error, #{ msg => "query_redis_error"
|
||||
, reason => Reason
|
||||
, cmd => NCMD
|
||||
, resource_id => ResourceID}),
|
||||
nomatch
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue