Merge pull request #12520 from SergeTupchiy/EMQX-11530-log-throttling
feat: implement log throttling
This commit is contained in:
commit
f72558737c
|
@ -40,6 +40,21 @@
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
|
%% NOTE: do not forget to use atom for msg and add every used msg to
|
||||||
|
%% the default value of `log.thorttling.msgs` list.
|
||||||
|
-define(SLOG_THROTTLE(Level, Data),
|
||||||
|
?SLOG_THROTTLE(Level, Data, #{})
|
||||||
|
).
|
||||||
|
|
||||||
|
-define(SLOG_THROTTLE(Level, Data, Meta),
|
||||||
|
case emqx_log_throttler:allow(Level, maps:get(msg, Data)) of
|
||||||
|
true ->
|
||||||
|
?SLOG(Level, Data, Meta);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
-define(AUDIT_HANDLER, emqx_audit).
|
-define(AUDIT_HANDLER, emqx_audit).
|
||||||
-define(TRACE_FILTER, emqx_trace_filter).
|
-define(TRACE_FILTER, emqx_trace_filter).
|
||||||
-define(OWN_KEYS, [level, filters, filter_default, handlers]).
|
-define(OWN_KEYS, [level, filters, filter_default, handlers]).
|
||||||
|
|
|
@ -183,8 +183,13 @@ log_result(#{username := Username}, Topic, Action, From, Result) ->
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
case Result of
|
case Result of
|
||||||
allow -> ?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"});
|
allow ->
|
||||||
deny -> ?SLOG(info, (LogMeta())#{msg => "authorization_permission_denied"})
|
?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"});
|
||||||
|
deny ->
|
||||||
|
?SLOG_THROTTLE(
|
||||||
|
warning,
|
||||||
|
(LogMeta())#{msg => authorization_permission_denied}
|
||||||
|
)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private Format authorization rules source.
|
%% @private Format authorization rules source.
|
||||||
|
|
|
@ -616,10 +616,10 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
|
||||||
Msg = packet_to_message(NPacket, NChannel),
|
Msg = packet_to_message(NPacket, NChannel),
|
||||||
do_publish(PacketId, Msg, NChannel);
|
do_publish(PacketId, Msg, NChannel);
|
||||||
{error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
|
{error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
|
||||||
?SLOG(
|
?SLOG_THROTTLE(
|
||||||
info,
|
warning,
|
||||||
#{
|
#{
|
||||||
msg => "cannot_publish_to_topic",
|
msg => cannot_publish_to_topic_due_to_not_authorized,
|
||||||
reason => emqx_reason_codes:name(Rc)
|
reason => emqx_reason_codes:name(Rc)
|
||||||
},
|
},
|
||||||
#{topic => Topic}
|
#{topic => Topic}
|
||||||
|
@ -635,10 +635,10 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
|
||||||
handle_out(disconnect, Rc, NChannel)
|
handle_out(disconnect, Rc, NChannel)
|
||||||
end;
|
end;
|
||||||
{error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} ->
|
{error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} ->
|
||||||
?SLOG(
|
?SLOG_THROTTLE(
|
||||||
info,
|
warning,
|
||||||
#{
|
#{
|
||||||
msg => "cannot_publish_to_topic",
|
msg => cannot_publish_to_topic_due_to_quota_exceeded,
|
||||||
reason => emqx_reason_codes:name(Rc)
|
reason => emqx_reason_codes:name(Rc)
|
||||||
},
|
},
|
||||||
#{topic => Topic}
|
#{topic => Topic}
|
||||||
|
|
|
@ -40,7 +40,8 @@ init([]) ->
|
||||||
child_spec(emqx_authn_authz_metrics_sup, supervisor),
|
child_spec(emqx_authn_authz_metrics_sup, supervisor),
|
||||||
child_spec(emqx_ocsp_cache, worker),
|
child_spec(emqx_ocsp_cache, worker),
|
||||||
child_spec(emqx_crl_cache, worker),
|
child_spec(emqx_crl_cache, worker),
|
||||||
child_spec(emqx_tls_lib_sup, supervisor)
|
child_spec(emqx_tls_lib_sup, supervisor),
|
||||||
|
child_spec(emqx_log_throttler, worker)
|
||||||
]
|
]
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,151 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_log_throttler).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-include("logger.hrl").
|
||||||
|
-include("types.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
%% throttler API
|
||||||
|
-export([allow/2]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([
|
||||||
|
init/1,
|
||||||
|
handle_call/3,
|
||||||
|
handle_cast/2,
|
||||||
|
handle_info/2,
|
||||||
|
terminate/2,
|
||||||
|
code_change/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(SEQ_ID(Msg), {?MODULE, Msg}).
|
||||||
|
-define(NEW_SEQ, atomics:new(1, [{signed, false}])).
|
||||||
|
-define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)).
|
||||||
|
-define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)).
|
||||||
|
-define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)).
|
||||||
|
-define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1).
|
||||||
|
-define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1).
|
||||||
|
|
||||||
|
-define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)).
|
||||||
|
|
||||||
|
-define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
|
||||||
|
-define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).
|
||||||
|
|
||||||
|
-spec allow(logger:level(), atom()) -> boolean().
|
||||||
|
allow(debug, _Msg) ->
|
||||||
|
true;
|
||||||
|
allow(_Level, Msg) when is_atom(Msg) ->
|
||||||
|
Seq = persistent_term:get(?SEQ_ID(Msg), undefined),
|
||||||
|
case Seq of
|
||||||
|
undefined ->
|
||||||
|
%% This is either a race condition (emqx_log_throttler is not started yet)
|
||||||
|
%% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is
|
||||||
|
%% not added to the default value of `log.throttling.msgs`.
|
||||||
|
?SLOG(info, #{
|
||||||
|
msg => "missing_log_throttle_sequence",
|
||||||
|
throttled_msg => Msg
|
||||||
|
}),
|
||||||
|
true;
|
||||||
|
SeqRef ->
|
||||||
|
?IS_ALLOWED(SeqRef)
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec start_link() -> startlink_ret().
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% gen_server callbacks
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST),
|
||||||
|
CurrentPeriodMs = ?TIME_WINDOW_MS,
|
||||||
|
TimerRef = schedule_refresh(CurrentPeriodMs),
|
||||||
|
{ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}.
|
||||||
|
|
||||||
|
handle_call(Req, _From, State) ->
|
||||||
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
|
{reply, ignored, State}.
|
||||||
|
|
||||||
|
handle_cast(Msg, State) ->
|
||||||
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info(refresh, #{current_period_ms := PeriodMs} = State) ->
|
||||||
|
Msgs = ?MSGS_LIST,
|
||||||
|
DroppedStats = lists:foldl(
|
||||||
|
fun(Msg, Acc) ->
|
||||||
|
case ?GET_SEQ(Msg) of
|
||||||
|
%% Should not happen, unless the static ids list is updated at run-time.
|
||||||
|
undefined ->
|
||||||
|
?NEW_THROTTLE(Msg, ?NEW_SEQ),
|
||||||
|
?tp(log_throttler_new_msg, #{throttled_msg => Msg}),
|
||||||
|
Acc;
|
||||||
|
SeqRef ->
|
||||||
|
Dropped = ?GET_DROPPED(SeqRef),
|
||||||
|
ok = ?RESET_SEQ(SeqRef),
|
||||||
|
?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
|
||||||
|
maybe_add_dropped(Msg, Dropped, Acc)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
Msgs
|
||||||
|
),
|
||||||
|
maybe_log_dropped(DroppedStats, PeriodMs),
|
||||||
|
NewPeriodMs = ?TIME_WINDOW_MS,
|
||||||
|
State1 = State#{
|
||||||
|
timer_ref => schedule_refresh(NewPeriodMs),
|
||||||
|
current_period_ms => NewPeriodMs
|
||||||
|
},
|
||||||
|
{noreply, State1};
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
?SLOG(error, #{msg => "unxpected_info", info => Info}),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, _State) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
maybe_add_dropped(Msg, Dropped, DroppedAcc) when Dropped > 0 ->
|
||||||
|
DroppedAcc#{Msg => Dropped};
|
||||||
|
maybe_add_dropped(_Msg, _Dropped, DroppedAcc) ->
|
||||||
|
DroppedAcc.
|
||||||
|
|
||||||
|
maybe_log_dropped(DroppedStats, PeriodMs) when map_size(DroppedStats) > 0 ->
|
||||||
|
?SLOG(warning, #{
|
||||||
|
msg => "log_events_throttled_during_last_period",
|
||||||
|
dropped => DroppedStats,
|
||||||
|
period => emqx_utils_calendar:human_readable_duration_string(PeriodMs)
|
||||||
|
});
|
||||||
|
maybe_log_dropped(_DroppedStats, _PeriodMs) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
schedule_refresh(PeriodMs) ->
|
||||||
|
?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}),
|
||||||
|
erlang:send_after(PeriodMs, ?MODULE, refresh).
|
|
@ -62,10 +62,10 @@ handle_event(ClientInfo, {dropped, Msg, #{reason := queue_full, logctx := Ctx}})
|
||||||
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
|
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
|
||||||
ok = inc_pd('send_msg.dropped', 1),
|
ok = inc_pd('send_msg.dropped', 1),
|
||||||
ok = inc_pd('send_msg.dropped.queue_full', 1),
|
ok = inc_pd('send_msg.dropped.queue_full', 1),
|
||||||
?SLOG(
|
?SLOG_THROTTLE(
|
||||||
info,
|
warning,
|
||||||
Ctx#{
|
Ctx#{
|
||||||
msg => "dropped_msg_due_to_mqueue_is_full",
|
msg => dropped_msg_due_to_mqueue_is_full,
|
||||||
payload => Msg#message.payload
|
payload => Msg#message.payload
|
||||||
},
|
},
|
||||||
#{topic => Msg#message.topic}
|
#{topic => Msg#message.topic}
|
||||||
|
|
|
@ -0,0 +1,170 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_log_throttler_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
%% Have to use real msgs, as the schema is guarded by enum.
|
||||||
|
-define(THROTTLE_MSG, authorization_permission_denied).
|
||||||
|
-define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized).
|
||||||
|
-define(TIME_WINDOW, <<"1s">>).
|
||||||
|
|
||||||
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
%% This test suite can't be run in standalone tests (without emqx_conf)
|
||||||
|
case module_exists(emqx_conf) of
|
||||||
|
true ->
|
||||||
|
Apps = emqx_cth_suite:start(
|
||||||
|
[
|
||||||
|
{emqx_conf, #{
|
||||||
|
config =>
|
||||||
|
#{
|
||||||
|
log => #{
|
||||||
|
throttling => #{
|
||||||
|
time_window => ?TIME_WINDOW, msgs => [?THROTTLE_MSG]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}},
|
||||||
|
emqx
|
||||||
|
],
|
||||||
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||||
|
),
|
||||||
|
[{suite_apps, Apps} | Config];
|
||||||
|
false ->
|
||||||
|
{skip, standalone_not_supported}
|
||||||
|
end.
|
||||||
|
|
||||||
|
end_per_suite(Config) ->
|
||||||
|
emqx_cth_suite:stop(?config(suite_apps, Config)),
|
||||||
|
emqx_config:delete_override_conf_files().
|
||||||
|
|
||||||
|
init_per_testcase(t_throttle_add_new_msg, Config) ->
|
||||||
|
ok = snabbkaffe:start_trace(),
|
||||||
|
[?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
|
||||||
|
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG1 | Conf], #{}),
|
||||||
|
Config;
|
||||||
|
init_per_testcase(_TC, Config) ->
|
||||||
|
ok = snabbkaffe:start_trace(),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(t_throttle_add_new_msg, _Config) ->
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
|
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
|
||||||
|
ok;
|
||||||
|
end_per_testcase(t_update_time_window, _Config) ->
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
|
{ok, _} = emqx_conf:update([log, throttling, time_window], ?TIME_WINDOW, #{}),
|
||||||
|
ok;
|
||||||
|
end_per_testcase(_TC, _Config) ->
|
||||||
|
ok = snabbkaffe:stop().
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Test cases
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_throttle(_Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
%% Warm-up and block to increase the probability that next events
|
||||||
|
%% will be in the same throttling time window.
|
||||||
|
lists:foreach(
|
||||||
|
fun(_) -> emqx_log_throttler:allow(warning, ?THROTTLE_MSG) end,
|
||||||
|
lists:seq(1, 100)
|
||||||
|
),
|
||||||
|
{ok, _} = ?block_until(
|
||||||
|
#{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 5000
|
||||||
|
),
|
||||||
|
|
||||||
|
?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)),
|
||||||
|
?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)),
|
||||||
|
%% Debug is always allowed
|
||||||
|
?assert(emqx_log_throttler:allow(debug, ?THROTTLE_MSG)),
|
||||||
|
{ok, _} = ?block_until(
|
||||||
|
#{
|
||||||
|
?snk_kind := log_throttler_dropped,
|
||||||
|
throttled_msg := ?THROTTLE_MSG,
|
||||||
|
dropped_count := 1
|
||||||
|
},
|
||||||
|
3000
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
).
|
||||||
|
|
||||||
|
t_throttle_add_new_msg(_Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
?block_until(
|
||||||
|
#{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000
|
||||||
|
),
|
||||||
|
?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)),
|
||||||
|
?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)),
|
||||||
|
{ok, _} = ?block_until(
|
||||||
|
#{
|
||||||
|
?snk_kind := log_throttler_dropped,
|
||||||
|
throttled_msg := ?THROTTLE_MSG1,
|
||||||
|
dropped_count := 1
|
||||||
|
},
|
||||||
|
3000
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
).
|
||||||
|
|
||||||
|
t_throttle_no_msg(_Config) ->
|
||||||
|
%% Must simply pass with no crashes
|
||||||
|
?assert(emqx_log_throttler:allow(warning, no_test_throttle_msg)),
|
||||||
|
?assert(emqx_log_throttler:allow(warning, no_test_throttle_msg)),
|
||||||
|
timer:sleep(10),
|
||||||
|
?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))).
|
||||||
|
|
||||||
|
t_update_time_window(_Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
?wait_async_action(
|
||||||
|
emqx_conf:update([log, throttling, time_window], <<"2s">>, #{}),
|
||||||
|
#{?snk_kind := log_throttler_sched_refresh, new_period_ms := 2000},
|
||||||
|
5000
|
||||||
|
),
|
||||||
|
timer:sleep(10),
|
||||||
|
?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler)))
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
module_exists(Mod) ->
|
||||||
|
case erlang:module_loaded(Mod) of
|
||||||
|
true ->
|
||||||
|
true;
|
||||||
|
false ->
|
||||||
|
case code:ensure_loaded(Mod) of
|
||||||
|
ok -> true;
|
||||||
|
{module, Mod} -> true;
|
||||||
|
_ -> false
|
||||||
|
end
|
||||||
|
end.
|
|
@ -75,6 +75,14 @@
|
||||||
%% 1 million default ports counter
|
%% 1 million default ports counter
|
||||||
-define(DEFAULT_MAX_PORTS, 1024 * 1024).
|
-define(DEFAULT_MAX_PORTS, 1024 * 1024).
|
||||||
|
|
||||||
|
-define(LOG_THROTTLING_MSGS, [
|
||||||
|
authorization_permission_denied,
|
||||||
|
cannot_publish_to_topic_due_to_not_authorized,
|
||||||
|
cannot_publish_to_topic_due_to_quota_exceeded,
|
||||||
|
connection_rejected_due_to_license_limit_reached,
|
||||||
|
dropped_msg_due_to_mqueue_is_full
|
||||||
|
]).
|
||||||
|
|
||||||
%% Callback to upgrade config after loaded from config file but before validation.
|
%% Callback to upgrade config after loaded from config file but before validation.
|
||||||
upgrade_raw_conf(Raw0) ->
|
upgrade_raw_conf(Raw0) ->
|
||||||
Raw1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(Raw0),
|
Raw1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(Raw0),
|
||||||
|
@ -909,7 +917,12 @@ fields("log") ->
|
||||||
aliases => [file_handlers],
|
aliases => [file_handlers],
|
||||||
importance => ?IMPORTANCE_HIGH
|
importance => ?IMPORTANCE_HIGH
|
||||||
}
|
}
|
||||||
)}
|
)},
|
||||||
|
{throttling,
|
||||||
|
sc(?R_REF("log_throttling"), #{
|
||||||
|
desc => ?DESC("log_throttling"),
|
||||||
|
importance => ?IMPORTANCE_MEDIUM
|
||||||
|
})}
|
||||||
];
|
];
|
||||||
fields("console_handler") ->
|
fields("console_handler") ->
|
||||||
log_handler_common_confs(console, #{});
|
log_handler_common_confs(console, #{});
|
||||||
|
@ -1012,6 +1025,28 @@ fields("log_burst_limit") ->
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
|
fields("log_throttling") ->
|
||||||
|
[
|
||||||
|
{time_window,
|
||||||
|
sc(
|
||||||
|
emqx_schema:duration_s(),
|
||||||
|
#{
|
||||||
|
default => <<"1m">>,
|
||||||
|
desc => ?DESC("log_throttling_time_window"),
|
||||||
|
importance => ?IMPORTANCE_MEDIUM
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
%% A static list of msgs used in ?SLOG_THROTTLE/2,3 macro.
|
||||||
|
%% For internal (developer) use only.
|
||||||
|
{msgs,
|
||||||
|
sc(
|
||||||
|
hoconsc:array(hoconsc:enum(?LOG_THROTTLING_MSGS)),
|
||||||
|
#{
|
||||||
|
default => ?LOG_THROTTLING_MSGS,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
];
|
||||||
fields("authorization") ->
|
fields("authorization") ->
|
||||||
emqx_schema:authz_fields() ++
|
emqx_schema:authz_fields() ++
|
||||||
emqx_authz_schema:authz_fields().
|
emqx_authz_schema:authz_fields().
|
||||||
|
@ -1046,6 +1081,8 @@ desc("log_burst_limit") ->
|
||||||
?DESC("desc_log_burst_limit");
|
?DESC("desc_log_burst_limit");
|
||||||
desc("authorization") ->
|
desc("authorization") ->
|
||||||
?DESC("desc_authorization");
|
?DESC("desc_authorization");
|
||||||
|
desc("log_throttling") ->
|
||||||
|
?DESC("desc_log_throttling");
|
||||||
desc(_) ->
|
desc(_) ->
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,10 @@
|
||||||
level = info
|
level = info
|
||||||
path = \"log/emqx.log\"
|
path = \"log/emqx.log\"
|
||||||
}
|
}
|
||||||
|
throttling {
|
||||||
|
msgs = []
|
||||||
|
time_window = 1m
|
||||||
|
}
|
||||||
}
|
}
|
||||||
").
|
").
|
||||||
|
|
||||||
|
@ -84,7 +88,9 @@ t_log_conf(_Conf) ->
|
||||||
<<"time_offset">> => <<"system">>
|
<<"time_offset">> => <<"system">>
|
||||||
},
|
},
|
||||||
<<"file">> =>
|
<<"file">> =>
|
||||||
#{<<"default">> => FileExpect}
|
#{<<"default">> => FileExpect},
|
||||||
|
<<"throttling">> =>
|
||||||
|
#{<<"time_window">> => <<"1m">>, <<"msgs">> => []}
|
||||||
},
|
},
|
||||||
?assertEqual(ExpectLog1, emqx_conf:get_raw([<<"log">>])),
|
?assertEqual(ExpectLog1, emqx_conf:get_raw([<<"log">>])),
|
||||||
UpdateLog0 = emqx_utils_maps:deep_remove([<<"file">>, <<"default">>], ExpectLog1),
|
UpdateLog0 = emqx_utils_maps:deep_remove([<<"file">>, <<"default">>], ExpectLog1),
|
||||||
|
|
|
@ -102,5 +102,8 @@ t_audit_log_conf(_Config) ->
|
||||||
<<"time_offset">> => <<"system">>
|
<<"time_offset">> => <<"system">>
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
?assertEqual(ExpectLog1, emqx_conf:get_raw([<<"log">>])),
|
%% The default value of throttling.msgs can be frequently updated,
|
||||||
|
%% remove it here, otherwise this test needs to be updated each time
|
||||||
|
%% a new throttle event is added.
|
||||||
|
?assertEqual(ExpectLog1, maps:remove(<<"throttling">>, emqx_conf:get_raw([<<"log">>]))),
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -85,7 +85,10 @@ check(_ConnInfo, AckProps) ->
|
||||||
{ok, #{max_connections := MaxClients}} ->
|
{ok, #{max_connections := MaxClients}} ->
|
||||||
case check_max_clients_exceeded(MaxClients) of
|
case check_max_clients_exceeded(MaxClients) of
|
||||||
true ->
|
true ->
|
||||||
?SLOG(info, #{msg => "connection_rejected_due_to_license_limit_reached"}),
|
?SLOG_THROTTLE(
|
||||||
|
error,
|
||||||
|
#{msg => connection_rejected_due_to_license_limit_reached}
|
||||||
|
),
|
||||||
{stop, {error, ?RC_QUOTA_EXCEEDED}};
|
{stop, {error, ?RC_QUOTA_EXCEEDED}};
|
||||||
false ->
|
false ->
|
||||||
{ok, AckProps}
|
{ok, AckProps}
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
Implement log throttling. The feature reduces the number of potentially flooding logged events by
|
||||||
|
dropping all but the first event within a configured time window.
|
|
@ -475,6 +475,19 @@ log_burst_limit_window_time.desc:
|
||||||
log_burst_limit_window_time.label:
|
log_burst_limit_window_time.label:
|
||||||
"""Window Time"""
|
"""Window Time"""
|
||||||
|
|
||||||
|
desc_log_throttling.label:
|
||||||
|
"""Log Throttling"""
|
||||||
|
|
||||||
|
desc_log_throttling.desc:
|
||||||
|
"""Log throttling feature reduces the number of potentially flooding logged events by
|
||||||
|
dropping all but the first event within a configured time window."""
|
||||||
|
|
||||||
|
log_throttling_time_window.desc:
|
||||||
|
"""For throttled messages, only log 1 in each time window."""
|
||||||
|
|
||||||
|
log_throttling_time_window.label:
|
||||||
|
"""Log Throttling Time Window"""
|
||||||
|
|
||||||
cluster_dns_record_type.desc:
|
cluster_dns_record_type.desc:
|
||||||
"""DNS record type."""
|
"""DNS record type."""
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue