diff --git a/apps/emqx/src/emqx_access_control.erl b/apps/emqx/src/emqx_access_control.erl index 64ea1a741..016386011 100644 --- a/apps/emqx/src/emqx_access_control.erl +++ b/apps/emqx/src/emqx_access_control.erl @@ -154,7 +154,7 @@ do_authorize(ClientInfo, Action, Topic) -> case run_hooks('client.authorize', [ClientInfo, Action, Topic], Default) of AuthzResult = #{result := Result} when Result == allow; Result == deny -> From = maps:get(from, AuthzResult, unknown), - ok = log_result(ClientInfo, Topic, Action, From, Result), + ok = log_result(Topic, Action, From, Result), emqx_hooks:run( 'client.check_authz_complete', [ClientInfo, Action, Topic, Result, From] @@ -173,24 +173,28 @@ do_authorize(ClientInfo, Action, Topic) -> deny end. -log_result(#{username := Username}, Topic, Action, From, Result) -> +log_result(Topic, Action, From, Result) -> LogMeta = fun() -> #{ - username => Username, topic => Topic, action => format_action(Action), source => format_from(From) } end, - case Result of - allow -> - ?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"}); - deny -> - ?SLOG_THROTTLE( - warning, - (LogMeta())#{msg => authorization_permission_denied} - ) - end. + do_log_result(Action, Result, LogMeta). + +do_log_result(_Action, allow, LogMeta) -> + ?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"}, #{tag => "AUTHZ"}); +do_log_result(?AUTHZ_PUBLISH_MATCH_MAP(_, _), deny, LogMeta) -> + %% for publish action, we do not log permission deny at warning level here + %% because it will be logged as cannot_publish_to_topic_due_to_not_authorized + ?SLOG(info, (LogMeta())#{msg => "authorization_permission_denied"}, #{tag => "AUTHZ"}); +do_log_result(_, deny, LogMeta) -> + ?SLOG_THROTTLE( + warning, + (LogMeta())#{msg => authorization_permission_denied}, + #{tag => "AUTHZ"} + ). %% @private Format authorization rules source. format_from(default) -> diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index bc0bf18ea..c0971c29a 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -633,7 +633,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> msg => cannot_publish_to_topic_due_to_not_authorized, reason => emqx_reason_codes:name(Rc) }, - #{topic => Topic} + #{topic => Topic, tag => "AUTHZ"} ), case emqx:get_config([authorization, deny_action], ignore) of ignore -> @@ -652,7 +652,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> msg => cannot_publish_to_topic_due_to_quota_exceeded, reason => emqx_reason_codes:name(Rc) }, - #{topic => Topic} + #{topic => Topic, tag => "AUTHZ"} ), case QoS of ?QOS_0 -> @@ -1612,8 +1612,10 @@ fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := MountPoint}) -> %%-------------------------------------------------------------------- %% Set log metadata -set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) -> - emqx_logger:set_metadata_clientid(ClientId). +set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId} = ClientInfo}) -> + Username = maps:get(username, ClientInfo, undefined), + emqx_logger:set_metadata_clientid(ClientId), + emqx_logger:set_metadata_username(Username). %%-------------------------------------------------------------------- %% Check banned @@ -1680,6 +1682,7 @@ authenticate( Channel ); _ -> + log_auth_failure("bad_authentication_method"), {error, ?RC_BAD_AUTHENTICATION_METHOD} end. @@ -1706,6 +1709,7 @@ do_authenticate( auth_cache = AuthCache }}; {error, Reason} -> + log_auth_failure(Reason), {error, emqx_reason_codes:connack_error(Reason)} end; do_authenticate(Credential, #channel{clientinfo = ClientInfo} = Channel) -> @@ -1713,9 +1717,20 @@ do_authenticate(Credential, #channel{clientinfo = ClientInfo} = Channel) -> {ok, AuthResult} -> {ok, #{}, Channel#channel{clientinfo = merge_auth_result(ClientInfo, AuthResult)}}; {error, Reason} -> + log_auth_failure(Reason), {error, emqx_reason_codes:connack_error(Reason)} end. +log_auth_failure(Reason) -> + ?SLOG_THROTTLE( + warning, + #{ + msg => authentication_failure, + reason => Reason + }, + #{tag => "AUTHN"} + ). + merge_auth_result(ClientInfo, AuthResult) when is_map(ClientInfo) andalso is_map(AuthResult) -> IsSuperuser = maps:get(is_superuser, AuthResult, false), maps:merge(ClientInfo, AuthResult#{is_superuser => IsSuperuser}). diff --git a/apps/emqx/src/emqx_logger.erl b/apps/emqx/src/emqx_logger.erl index 36e030934..247695485 100644 --- a/apps/emqx/src/emqx_logger.erl +++ b/apps/emqx/src/emqx_logger.erl @@ -43,6 +43,7 @@ -export([ set_metadata_peername/1, set_metadata_clientid/1, + set_metadata_username/1, set_proc_metadata/1, set_primary_log_level/1, set_log_handler_level/2, @@ -142,6 +143,12 @@ set_metadata_clientid(<<>>) -> set_metadata_clientid(ClientId) -> set_proc_metadata(#{clientid => ClientId}). +-spec set_metadata_username(emqx_types:username()) -> ok. +set_metadata_username(Username) when Username =:= undefined orelse Username =:= <<>> -> + ok; +set_metadata_username(Username) -> + set_proc_metadata(#{username => Username}). + -spec set_metadata_peername(peername_str()) -> ok. set_metadata_peername(Peername) -> set_proc_metadata(#{peername => Peername}). diff --git a/apps/emqx/src/emqx_logger_textfmt.erl b/apps/emqx/src/emqx_logger_textfmt.erl index b68c4b366..1b2ea8758 100644 --- a/apps/emqx/src/emqx_logger_textfmt.erl +++ b/apps/emqx/src/emqx_logger_textfmt.erl @@ -69,7 +69,9 @@ enrich_report(ReportRaw, Meta) -> ClientId = maps:get(clientid, Meta, undefined), Peer = maps:get(peername, Meta, undefined), Msg = maps:get(msg, ReportRaw, undefined), - Tag = maps:get(tag, ReportRaw, undefined), + %% TODO: move all tags to Meta so we can filter traces + %% based on tags (currently not supported) + Tag = maps:get(tag, ReportRaw, maps:get(tag, Meta, undefined)), %% turn it into a list so that the order of the fields is determined lists:foldl( fun diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index a40716b4d..c72f6a87b 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -78,6 +78,7 @@ -define(DEFAULT_MAX_PORTS, 1024 * 1024). -define(LOG_THROTTLING_MSGS, [ + authentication_failure, authorization_permission_denied, cannot_publish_to_topic_due_to_not_authorized, cannot_publish_to_topic_due_to_quota_exceeded, diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index d68514c41..033220bd3 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -340,7 +340,7 @@ transform_bridge_v1_config_to_action_config( ActionMap0 = lists:foldl( fun ({enable, _Spec}, ToTransformSoFar) -> - %% Enable filed is used in both + %% Enable field is used in both ToTransformSoFar; ({ConnectorFieldName, _Spec}, ToTransformSoFar) -> ConnectorFieldNameBin = to_bin(ConnectorFieldName), diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl index 467d3e9f2..6fbfa8ef8 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl @@ -234,7 +234,7 @@ parse_data( <> ) -> - %% XXX: need check ACK filed? + %% XXX: need check ACK field? #{ <<"Time">> => #{ <<"Year">> => Year, diff --git a/apps/emqx_license/src/emqx_license.erl b/apps/emqx_license/src/emqx_license.erl index 61a648154..c95ad0e7f 100644 --- a/apps/emqx_license/src/emqx_license.erl +++ b/apps/emqx_license/src/emqx_license.erl @@ -80,24 +80,29 @@ update_setting(Setting) when is_map(Setting) -> check(_ConnInfo, AckProps) -> case emqx_license_checker:limits() of {ok, #{max_connections := ?ERR_EXPIRED}} -> - ?SLOG(error, #{msg => "connection_rejected_due_to_license_expired"}), + ?SLOG(error, #{msg => "connection_rejected_due_to_license_expired"}, #{tag => "LICENSE"}), {stop, {error, ?RC_QUOTA_EXCEEDED}}; {ok, #{max_connections := MaxClients}} -> case check_max_clients_exceeded(MaxClients) of true -> ?SLOG_THROTTLE( error, - #{msg => connection_rejected_due_to_license_limit_reached} + #{msg => connection_rejected_due_to_license_limit_reached}, + #{tag => "LICENSE"} ), {stop, {error, ?RC_QUOTA_EXCEEDED}}; false -> {ok, AckProps} end; {error, Reason} -> - ?SLOG(error, #{ - msg => "connection_rejected_due_to_license_not_loaded", - reason => Reason - }), + ?SLOG( + error, + #{ + msg => "connection_rejected_due_to_license_not_loaded", + reason => Reason + }, + #{tag => "LICENSE"} + ), {stop, {error, ?RC_QUOTA_EXCEEDED}} end. diff --git a/apps/emqx_license/src/emqx_license_checker.erl b/apps/emqx_license/src/emqx_license_checker.erl index e02dc276a..8270e03d2 100644 --- a/apps/emqx_license/src/emqx_license_checker.erl +++ b/apps/emqx_license/src/emqx_license_checker.erl @@ -172,11 +172,15 @@ refresh(State) -> State. log_new_license(Old, New) -> - ?SLOG(info, #{ - msg => "new_license_loaded", - old_license => emqx_license_parser:summary(Old), - new_license => emqx_license_parser:summary(New) - }). + ?SLOG( + info, + #{ + msg => "new_license_loaded", + old_license => emqx_license_parser:summary(Old), + new_license => emqx_license_parser:summary(New) + }, + #{tag => "LICENSE"} + ). ensure_check_license_timer(#{check_license_interval := CheckInterval} = State) -> ok = cancel_timer(State, check_timer), diff --git a/apps/emqx_license/src/emqx_license_http_api.erl b/apps/emqx_license/src/emqx_license_http_api.erl index 9b2cfb1c6..dcf7afc7e 100644 --- a/apps/emqx_license/src/emqx_license_http_api.erl +++ b/apps/emqx_license/src/emqx_license_http_api.erl @@ -129,13 +129,17 @@ error_msg(Code, Msg) -> '/license'(post, #{body := #{<<"key">> := Key}}) -> case emqx_license:update_key(Key) of {error, Error} -> - ?SLOG(error, #{ - msg => "bad_license_key", - reason => Error - }), + ?SLOG( + error, + #{ + msg => "bad_license_key", + reason => Error + }, + #{tag => "LICENSE"} + ), {400, error_msg(?BAD_REQUEST, <<"Bad license key">>)}; {ok, _} -> - ?SLOG(info, #{msg => "updated_license_key"}), + ?SLOG(info, #{msg => "updated_license_key"}, #{tag => "LICENSE"}), License = maps:from_list(emqx_license_checker:dump()), {200, License} end; @@ -147,13 +151,17 @@ error_msg(Code, Msg) -> '/license/setting'(put, #{body := Setting}) -> case emqx_license:update_setting(Setting) of {error, Error} -> - ?SLOG(error, #{ - msg => "bad_license_setting", - reason => Error - }), + ?SLOG( + error, + #{ + msg => "bad_license_setting", + reason => Error + }, + #{tag => "LICENSE"} + ), {400, error_msg(?BAD_REQUEST, <<"Bad license setting">>)}; {ok, _} -> - ?SLOG(info, #{msg => "updated_license_setting"}), + ?SLOG(info, #{msg => "updated_license_setting"}, #{tag => "LICENSE"}), '/license/setting'(get, undefined) end. diff --git a/changes/ce/feat-12520.en.md b/changes/ce/feat-12520.en.md index 99d29fb2e..5c2717688 100644 --- a/changes/ce/feat-12520.en.md +++ b/changes/ce/feat-12520.en.md @@ -1,6 +1,7 @@ Implement log throttling. The feature reduces the number of potentially flooding logged events by dropping all but the first event within a configured time window. Throttling is applied to the following log events: + - authentication_failure, - authorization_permission_denied, - cannot_publish_to_topic_due_to_not_authorized, - cannot_publish_to_topic_due_to_quota_exceeded, diff --git a/changes/ce/feat-12746.en.md b/changes/ce/feat-12746.en.md new file mode 100644 index 000000000..a897607ad --- /dev/null +++ b/changes/ce/feat-12746.en.md @@ -0,0 +1,3 @@ +Add `username` log field. + +If MQTT client is connected with a non-empty username the logs and traces will include `username` field. diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon index d37f52097..58feadb4e 100644 --- a/rel/i18n/emqx_mgmt_api_clients.hocon +++ b/rel/i18n/emqx_mgmt_api_clients.hocon @@ -70,8 +70,8 @@ msg_topic.label: msg_qos.desc: """Message QoS.""" -msg_topic.label: -"""Message Qos""" +msg_qos.label: +"""Message QoS""" msg_publish_at.desc: """Message publish time, a millisecond precision Unix epoch timestamp."""