feat: add converter to flapping_detect
This commit is contained in:
parent
1d33b7dbd8
commit
5659cc2b47
|
@ -711,7 +711,7 @@ do_put(Type, Putter, [RootName | KeyPath], DeepValue) ->
|
||||||
NewValue = do_deep_put(Type, Putter, KeyPath, OldValue, DeepValue),
|
NewValue = do_deep_put(Type, Putter, KeyPath, OldValue, DeepValue),
|
||||||
Key = ?PERSIS_KEY(Type, RootName),
|
Key = ?PERSIS_KEY(Type, RootName),
|
||||||
persistent_term:put(Key, NewValue),
|
persistent_term:put(Key, NewValue),
|
||||||
post_save_config_hook(Key, NewValue),
|
put_config_post_change_actions(Key, NewValue),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
do_deep_get(?CONF, AtomKeyPath, Map, Default) ->
|
do_deep_get(?CONF, AtomKeyPath, Map, Default) ->
|
||||||
|
@ -918,8 +918,8 @@ rawconf_to_conf(SchemaModule, RawPath, RawValue) ->
|
||||||
%% When the global zone change, the zones is updated with the new global zone.
|
%% When the global zone change, the zones is updated with the new global zone.
|
||||||
%% The global zone's keys is too many,
|
%% The global zone's keys is too many,
|
||||||
%% so we don't choose to write a global zone change emqx_config_handler callback to hook
|
%% so we don't choose to write a global zone change emqx_config_handler callback to hook
|
||||||
post_save_config_hook(?PERSIS_KEY(?CONF, zones), _Zones) ->
|
put_config_post_change_actions(?PERSIS_KEY(?CONF, zones), _Zones) ->
|
||||||
emqx_flapping:update_config(),
|
emqx_flapping:update_config(),
|
||||||
ok;
|
ok;
|
||||||
post_save_config_hook(_Key, _NewValue) ->
|
put_config_post_change_actions(_Key, _NewValue) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -97,9 +97,17 @@ detect(ClientId, PeerHost, #{enable := true, max_count := Threshold} = Policy) -
|
||||||
detect(_ClientId, _PeerHost, #{enable := false}) ->
|
detect(_ClientId, _PeerHost, #{enable := false}) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
%% with default, if we delete Zone at running time. we should not crash.
|
|
||||||
get_policy(Zone) ->
|
get_policy(Zone) ->
|
||||||
emqx_config:get_zone_conf(Zone, [flapping_detect], ?DEFAULT_POLICY).
|
Flapping = [flapping_detect],
|
||||||
|
case emqx_config:get_zone_conf(Zone, Flapping, undefined) of
|
||||||
|
undefined ->
|
||||||
|
%% If zone has be deleted at running time,
|
||||||
|
%% we don't crash the connection and disable flapping detect.
|
||||||
|
Policy = emqx_config:get(Flapping),
|
||||||
|
Policy#{enable => false};
|
||||||
|
Policy ->
|
||||||
|
Policy
|
||||||
|
end.
|
||||||
|
|
||||||
now_diff(TS) -> erlang:system_time(millisecond) - TS.
|
now_diff(TS) -> erlang:system_time(millisecond) - TS.
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
-define(MAX_INT_TIMEOUT_MS, 4294967295).
|
-define(MAX_INT_TIMEOUT_MS, 4294967295).
|
||||||
%% floor(?MAX_INT_TIMEOUT_MS / 1000).
|
%% floor(?MAX_INT_TIMEOUT_MS / 1000).
|
||||||
-define(MAX_INT_TIMEOUT_S, 4294967).
|
-define(MAX_INT_TIMEOUT_S, 4294967).
|
||||||
|
-define(DEFAULT_WINDOW_TIME, "1m").
|
||||||
|
|
||||||
-type duration() :: integer().
|
-type duration() :: integer().
|
||||||
-type duration_s() :: integer().
|
-type duration_s() :: integer().
|
||||||
|
@ -275,7 +276,10 @@ roots(low) ->
|
||||||
{"flapping_detect",
|
{"flapping_detect",
|
||||||
sc(
|
sc(
|
||||||
ref("flapping_detect"),
|
ref("flapping_detect"),
|
||||||
#{importance => ?DEFAULT_IMPORTANCE}
|
#{
|
||||||
|
importance => ?DEFAULT_IMPORTANCE,
|
||||||
|
converter => fun flapping_detect_converter/2
|
||||||
|
}
|
||||||
)},
|
)},
|
||||||
{"persistent_session_store",
|
{"persistent_session_store",
|
||||||
sc(
|
sc(
|
||||||
|
@ -692,7 +696,7 @@ fields("flapping_detect") ->
|
||||||
sc(
|
sc(
|
||||||
duration(),
|
duration(),
|
||||||
#{
|
#{
|
||||||
default => "1m",
|
default => ?DEFAULT_WINDOW_TIME,
|
||||||
importance => ?IMPORTANCE_HIGH,
|
importance => ?IMPORTANCE_HIGH,
|
||||||
desc => ?DESC(flapping_detect_window_time)
|
desc => ?DESC(flapping_detect_window_time)
|
||||||
}
|
}
|
||||||
|
@ -3495,3 +3499,9 @@ mqtt_converter(#{<<"keepalive_backoff">> := Backoff} = Mqtt, _Opts) ->
|
||||||
Mqtt#{<<"keepalive_multiplier">> => Backoff * 2};
|
Mqtt#{<<"keepalive_multiplier">> => Backoff * 2};
|
||||||
mqtt_converter(Mqtt, _Opts) ->
|
mqtt_converter(Mqtt, _Opts) ->
|
||||||
Mqtt.
|
Mqtt.
|
||||||
|
|
||||||
|
%% For backward compatibility with window_time is disable
|
||||||
|
flapping_detect_converter(Conf = #{<<"window_time">> := <<"disable">>}, _Opts) ->
|
||||||
|
Conf#{<<"window_time">> => ?DEFAULT_WINDOW_TIME, <<"enable">> => false};
|
||||||
|
flapping_detect_converter(Conf, _Opts) ->
|
||||||
|
Conf.
|
||||||
|
|
|
@ -161,5 +161,15 @@ validate_timer(Names) ->
|
||||||
?assertEqual(maps:keys(Zones), maps:keys(Timers)),
|
?assertEqual(maps:keys(Zones), maps:keys(Timers)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_window_compatibility_check(_Conf) ->
|
||||||
|
Flapping = emqx:get_config([flapping_detect]),
|
||||||
|
ok = emqx_config:init_load(emqx_schema, <<"flapping_detect {window_time = disable}">>),
|
||||||
|
?assertMatch(#{window_time := 60000, enable := false}, emqx:get_config([flapping_detect])),
|
||||||
|
%% reset
|
||||||
|
FlappingBin = iolist_to_binary(["flapping_detect {", hocon_pp:do(Flapping, #{}), "}"]),
|
||||||
|
ok = emqx_config:init_load(emqx_schema, FlappingBin),
|
||||||
|
?assertEqual(Flapping, emqx:get_config([flapping_detect])),
|
||||||
|
ok.
|
||||||
|
|
||||||
get_policy(Zone) ->
|
get_policy(Zone) ->
|
||||||
emqx_config:get_zone_conf(Zone, [flapping_detect]).
|
emqx_config:get_zone_conf(Zone, [flapping_detect]).
|
||||||
|
|
Loading…
Reference in New Issue