Merge pull request #12746 from zmstone/0320-add-throttle-log-for-auth-failure
feat: add authentication_failure throttled log
This commit is contained in:
commit
bede5a5b85
|
@ -154,7 +154,7 @@ do_authorize(ClientInfo, Action, Topic) ->
|
|||
case run_hooks('client.authorize', [ClientInfo, Action, Topic], Default) of
|
||||
AuthzResult = #{result := Result} when Result == allow; Result == deny ->
|
||||
From = maps:get(from, AuthzResult, unknown),
|
||||
ok = log_result(ClientInfo, Topic, Action, From, Result),
|
||||
ok = log_result(Topic, Action, From, Result),
|
||||
emqx_hooks:run(
|
||||
'client.check_authz_complete',
|
||||
[ClientInfo, Action, Topic, Result, From]
|
||||
|
@ -173,24 +173,28 @@ do_authorize(ClientInfo, Action, Topic) ->
|
|||
deny
|
||||
end.
|
||||
|
||||
log_result(#{username := Username}, Topic, Action, From, Result) ->
|
||||
log_result(Topic, Action, From, Result) ->
|
||||
LogMeta = fun() ->
|
||||
#{
|
||||
username => Username,
|
||||
topic => Topic,
|
||||
action => format_action(Action),
|
||||
source => format_from(From)
|
||||
}
|
||||
end,
|
||||
case Result of
|
||||
allow ->
|
||||
?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"});
|
||||
deny ->
|
||||
?SLOG_THROTTLE(
|
||||
warning,
|
||||
(LogMeta())#{msg => authorization_permission_denied}
|
||||
)
|
||||
end.
|
||||
do_log_result(Action, Result, LogMeta).
|
||||
|
||||
do_log_result(_Action, allow, LogMeta) ->
|
||||
?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"}, #{tag => "AUTHZ"});
|
||||
do_log_result(?AUTHZ_PUBLISH_MATCH_MAP(_, _), deny, LogMeta) ->
|
||||
%% for publish action, we do not log permission deny at warning level here
|
||||
%% because it will be logged as cannot_publish_to_topic_due_to_not_authorized
|
||||
?SLOG(info, (LogMeta())#{msg => "authorization_permission_denied"}, #{tag => "AUTHZ"});
|
||||
do_log_result(_, deny, LogMeta) ->
|
||||
?SLOG_THROTTLE(
|
||||
warning,
|
||||
(LogMeta())#{msg => authorization_permission_denied},
|
||||
#{tag => "AUTHZ"}
|
||||
).
|
||||
|
||||
%% @private Format authorization rules source.
|
||||
format_from(default) ->
|
||||
|
|
|
@ -633,7 +633,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
|
|||
msg => cannot_publish_to_topic_due_to_not_authorized,
|
||||
reason => emqx_reason_codes:name(Rc)
|
||||
},
|
||||
#{topic => Topic}
|
||||
#{topic => Topic, tag => "AUTHZ"}
|
||||
),
|
||||
case emqx:get_config([authorization, deny_action], ignore) of
|
||||
ignore ->
|
||||
|
@ -652,7 +652,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
|
|||
msg => cannot_publish_to_topic_due_to_quota_exceeded,
|
||||
reason => emqx_reason_codes:name(Rc)
|
||||
},
|
||||
#{topic => Topic}
|
||||
#{topic => Topic, tag => "AUTHZ"}
|
||||
),
|
||||
case QoS of
|
||||
?QOS_0 ->
|
||||
|
@ -1612,8 +1612,10 @@ fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := MountPoint}) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Set log metadata
|
||||
|
||||
set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
|
||||
emqx_logger:set_metadata_clientid(ClientId).
|
||||
set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId} = ClientInfo}) ->
|
||||
Username = maps:get(username, ClientInfo, undefined),
|
||||
emqx_logger:set_metadata_clientid(ClientId),
|
||||
emqx_logger:set_metadata_username(Username).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Check banned
|
||||
|
@ -1680,6 +1682,7 @@ authenticate(
|
|||
Channel
|
||||
);
|
||||
_ ->
|
||||
log_auth_failure("bad_authentication_method"),
|
||||
{error, ?RC_BAD_AUTHENTICATION_METHOD}
|
||||
end.
|
||||
|
||||
|
@ -1706,6 +1709,7 @@ do_authenticate(
|
|||
auth_cache = AuthCache
|
||||
}};
|
||||
{error, Reason} ->
|
||||
log_auth_failure(Reason),
|
||||
{error, emqx_reason_codes:connack_error(Reason)}
|
||||
end;
|
||||
do_authenticate(Credential, #channel{clientinfo = ClientInfo} = Channel) ->
|
||||
|
@ -1713,9 +1717,20 @@ do_authenticate(Credential, #channel{clientinfo = ClientInfo} = Channel) ->
|
|||
{ok, AuthResult} ->
|
||||
{ok, #{}, Channel#channel{clientinfo = merge_auth_result(ClientInfo, AuthResult)}};
|
||||
{error, Reason} ->
|
||||
log_auth_failure(Reason),
|
||||
{error, emqx_reason_codes:connack_error(Reason)}
|
||||
end.
|
||||
|
||||
log_auth_failure(Reason) ->
|
||||
?SLOG_THROTTLE(
|
||||
warning,
|
||||
#{
|
||||
msg => authentication_failure,
|
||||
reason => Reason
|
||||
},
|
||||
#{tag => "AUTHN"}
|
||||
).
|
||||
|
||||
merge_auth_result(ClientInfo, AuthResult) when is_map(ClientInfo) andalso is_map(AuthResult) ->
|
||||
IsSuperuser = maps:get(is_superuser, AuthResult, false),
|
||||
maps:merge(ClientInfo, AuthResult#{is_superuser => IsSuperuser}).
|
||||
|
|
|
@ -43,6 +43,7 @@
|
|||
-export([
|
||||
set_metadata_peername/1,
|
||||
set_metadata_clientid/1,
|
||||
set_metadata_username/1,
|
||||
set_proc_metadata/1,
|
||||
set_primary_log_level/1,
|
||||
set_log_handler_level/2,
|
||||
|
@ -142,6 +143,12 @@ set_metadata_clientid(<<>>) ->
|
|||
set_metadata_clientid(ClientId) ->
|
||||
set_proc_metadata(#{clientid => ClientId}).
|
||||
|
||||
-spec set_metadata_username(emqx_types:username()) -> ok.
|
||||
set_metadata_username(Username) when Username =:= undefined orelse Username =:= <<>> ->
|
||||
ok;
|
||||
set_metadata_username(Username) ->
|
||||
set_proc_metadata(#{username => Username}).
|
||||
|
||||
-spec set_metadata_peername(peername_str()) -> ok.
|
||||
set_metadata_peername(Peername) ->
|
||||
set_proc_metadata(#{peername => Peername}).
|
||||
|
|
|
@ -69,7 +69,9 @@ enrich_report(ReportRaw, Meta) ->
|
|||
ClientId = maps:get(clientid, Meta, undefined),
|
||||
Peer = maps:get(peername, Meta, undefined),
|
||||
Msg = maps:get(msg, ReportRaw, undefined),
|
||||
Tag = maps:get(tag, ReportRaw, undefined),
|
||||
%% TODO: move all tags to Meta so we can filter traces
|
||||
%% based on tags (currently not supported)
|
||||
Tag = maps:get(tag, ReportRaw, maps:get(tag, Meta, undefined)),
|
||||
%% turn it into a list so that the order of the fields is determined
|
||||
lists:foldl(
|
||||
fun
|
||||
|
|
|
@ -78,6 +78,7 @@
|
|||
-define(DEFAULT_MAX_PORTS, 1024 * 1024).
|
||||
|
||||
-define(LOG_THROTTLING_MSGS, [
|
||||
authentication_failure,
|
||||
authorization_permission_denied,
|
||||
cannot_publish_to_topic_due_to_not_authorized,
|
||||
cannot_publish_to_topic_due_to_quota_exceeded,
|
||||
|
|
|
@ -340,7 +340,7 @@ transform_bridge_v1_config_to_action_config(
|
|||
ActionMap0 = lists:foldl(
|
||||
fun
|
||||
({enable, _Spec}, ToTransformSoFar) ->
|
||||
%% Enable filed is used in both
|
||||
%% Enable field is used in both
|
||||
ToTransformSoFar;
|
||||
({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
|
||||
ConnectorFieldNameBin = to_bin(ConnectorFieldName),
|
||||
|
|
|
@ -234,7 +234,7 @@ parse_data(
|
|||
<<Year:?BYTE, Month:?BYTE, Day:?BYTE, Hour:?BYTE, Minute:?BYTE, Second:?BYTE, Total:?BYTE,
|
||||
Rest/binary>>
|
||||
) ->
|
||||
%% XXX: need check ACK filed?
|
||||
%% XXX: need check ACK field?
|
||||
#{
|
||||
<<"Time">> => #{
|
||||
<<"Year">> => Year,
|
||||
|
|
|
@ -80,24 +80,29 @@ update_setting(Setting) when is_map(Setting) ->
|
|||
check(_ConnInfo, AckProps) ->
|
||||
case emqx_license_checker:limits() of
|
||||
{ok, #{max_connections := ?ERR_EXPIRED}} ->
|
||||
?SLOG(error, #{msg => "connection_rejected_due_to_license_expired"}),
|
||||
?SLOG(error, #{msg => "connection_rejected_due_to_license_expired"}, #{tag => "LICENSE"}),
|
||||
{stop, {error, ?RC_QUOTA_EXCEEDED}};
|
||||
{ok, #{max_connections := MaxClients}} ->
|
||||
case check_max_clients_exceeded(MaxClients) of
|
||||
true ->
|
||||
?SLOG_THROTTLE(
|
||||
error,
|
||||
#{msg => connection_rejected_due_to_license_limit_reached}
|
||||
#{msg => connection_rejected_due_to_license_limit_reached},
|
||||
#{tag => "LICENSE"}
|
||||
),
|
||||
{stop, {error, ?RC_QUOTA_EXCEEDED}};
|
||||
false ->
|
||||
{ok, AckProps}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "connection_rejected_due_to_license_not_loaded",
|
||||
reason => Reason
|
||||
}),
|
||||
?SLOG(
|
||||
error,
|
||||
#{
|
||||
msg => "connection_rejected_due_to_license_not_loaded",
|
||||
reason => Reason
|
||||
},
|
||||
#{tag => "LICENSE"}
|
||||
),
|
||||
{stop, {error, ?RC_QUOTA_EXCEEDED}}
|
||||
end.
|
||||
|
||||
|
|
|
@ -172,11 +172,15 @@ refresh(State) ->
|
|||
State.
|
||||
|
||||
log_new_license(Old, New) ->
|
||||
?SLOG(info, #{
|
||||
msg => "new_license_loaded",
|
||||
old_license => emqx_license_parser:summary(Old),
|
||||
new_license => emqx_license_parser:summary(New)
|
||||
}).
|
||||
?SLOG(
|
||||
info,
|
||||
#{
|
||||
msg => "new_license_loaded",
|
||||
old_license => emqx_license_parser:summary(Old),
|
||||
new_license => emqx_license_parser:summary(New)
|
||||
},
|
||||
#{tag => "LICENSE"}
|
||||
).
|
||||
|
||||
ensure_check_license_timer(#{check_license_interval := CheckInterval} = State) ->
|
||||
ok = cancel_timer(State, check_timer),
|
||||
|
|
|
@ -129,13 +129,17 @@ error_msg(Code, Msg) ->
|
|||
'/license'(post, #{body := #{<<"key">> := Key}}) ->
|
||||
case emqx_license:update_key(Key) of
|
||||
{error, Error} ->
|
||||
?SLOG(error, #{
|
||||
msg => "bad_license_key",
|
||||
reason => Error
|
||||
}),
|
||||
?SLOG(
|
||||
error,
|
||||
#{
|
||||
msg => "bad_license_key",
|
||||
reason => Error
|
||||
},
|
||||
#{tag => "LICENSE"}
|
||||
),
|
||||
{400, error_msg(?BAD_REQUEST, <<"Bad license key">>)};
|
||||
{ok, _} ->
|
||||
?SLOG(info, #{msg => "updated_license_key"}),
|
||||
?SLOG(info, #{msg => "updated_license_key"}, #{tag => "LICENSE"}),
|
||||
License = maps:from_list(emqx_license_checker:dump()),
|
||||
{200, License}
|
||||
end;
|
||||
|
@ -147,13 +151,17 @@ error_msg(Code, Msg) ->
|
|||
'/license/setting'(put, #{body := Setting}) ->
|
||||
case emqx_license:update_setting(Setting) of
|
||||
{error, Error} ->
|
||||
?SLOG(error, #{
|
||||
msg => "bad_license_setting",
|
||||
reason => Error
|
||||
}),
|
||||
?SLOG(
|
||||
error,
|
||||
#{
|
||||
msg => "bad_license_setting",
|
||||
reason => Error
|
||||
},
|
||||
#{tag => "LICENSE"}
|
||||
),
|
||||
{400, error_msg(?BAD_REQUEST, <<"Bad license setting">>)};
|
||||
{ok, _} ->
|
||||
?SLOG(info, #{msg => "updated_license_setting"}),
|
||||
?SLOG(info, #{msg => "updated_license_setting"}, #{tag => "LICENSE"}),
|
||||
'/license/setting'(get, undefined)
|
||||
end.
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
Implement log throttling. The feature reduces the number of potentially flooding logged events by
|
||||
dropping all but the first event within a configured time window.
|
||||
Throttling is applied to the following log events:
|
||||
- authentication_failure,
|
||||
- authorization_permission_denied,
|
||||
- cannot_publish_to_topic_due_to_not_authorized,
|
||||
- cannot_publish_to_topic_due_to_quota_exceeded,
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
Add `username` log field.
|
||||
|
||||
If MQTT client is connected with a non-empty username the logs and traces will include `username` field.
|
|
@ -70,8 +70,8 @@ msg_topic.label:
|
|||
|
||||
msg_qos.desc:
|
||||
"""Message QoS."""
|
||||
msg_topic.label:
|
||||
"""Message Qos"""
|
||||
msg_qos.label:
|
||||
"""Message QoS"""
|
||||
|
||||
msg_publish_at.desc:
|
||||
"""Message publish time, a millisecond precision Unix epoch timestamp."""
|
||||
|
|
Loading…
Reference in New Issue