diff --git a/Makefile b/Makefile index f83b07330..c0d71c9bb 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \ - emqx_vm_mon emqx_alarm_handler emqx_rpc + emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/etc/emqx.conf b/etc/emqx.conf index 05cc35413..610898b53 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -438,6 +438,17 @@ acl_cache_ttl = 1m ## Default: ignore acl_deny_action = ignore +## The cleanning interval for flapping +## +## Value: Duration +## -d: day +## -h: hour +## -m: minute +## -s: second +## +## Default: 1h, 1 hour +## flapping_clean_interval = 1h + ##-------------------------------------------------------------------- ## MQTT Protocol ##-------------------------------------------------------------------- @@ -650,11 +661,35 @@ zone.external.mqueue_priorities = none ## Value: highest | lowest zone.external.mqueue_default_priority = highest -## Whether to enqueue Qos0 messages. +## Whether to enqueue QoS0 messages. ## ## Value: false | true zone.external.mqueue_store_qos0 = true +## Whether to turn on flapping detect +## +## Value: on | off +zone.external.enable_flapping_detect = off + +## The times of state change per min, specifying the threshold which is used to +## detect if the connection starts flapping +## +## Value: number +zone.external.flapping_threshold = 10, 1m + +## Flapping expiry interval for connections. +## This config entry is used to determine when the connection +## will be unbanned. +## +## Value: Duration +## -d: day +## -h: hour +## -m: minute +## -s: second +## +## Default: 1h, 1 hour +zone.external.flapping_expiry_interval = 1h + ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## ## Variables in mountpoint path: @@ -726,6 +761,30 @@ zone.internal.max_mqueue_len = 1000 ## Value: false | true zone.internal.mqueue_store_qos0 = true +## Whether to turn on flapping detect +## +## Value: on | off +zone.internal.enable_flapping_detect = off + +## The times of state change per second, specifying the threshold which is used to +## detect if the connection starts flapping +## +## Value: number +zone.internal.flapping_threshold = 10, 1m + +## Flapping expiry interval for connections. +## This config entry is used to determine when the connection +## will be unbanned. +## +## Value: Duration +## -d: day +## -h: hour +## -m: minute +## -s: second +## +## Default: 1h, 1 hour +zone.internal.flapping_expiry_interval = 1h + ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## ## Variables in mountpoint path: @@ -1784,13 +1843,13 @@ listener.wss.external.send_timeout_close = on ## SSL Ciphers used by the bridge. ## ## Value: String -#bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 +## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 ## Ciphers for TLS PSK. ## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot ## be configured at the same time. ## See 'https://tools.ietf.org/html/rfc4279#section-2'. -#bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA +## bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA ## Ping interval of a down bridge. ## diff --git a/include/types.hrl b/include/types.hrl index 8032bfe7e..85a9aadf0 100644 --- a/include/types.hrl +++ b/include/types.hrl @@ -19,4 +19,3 @@ -type(ok_or_error(Reason) :: ok | {error, Reason}). -type(ok_or_error(Value, Reason) :: {ok, Value} | {error, Reason}). - diff --git a/priv/emqx.schema b/priv/emqx.schema index 2224c9935..459349068 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -270,8 +270,7 @@ end}. X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes; _ -> undefined end - end -}. + end}. {validator, "zdbbl_range", "must be between 1KB and 2097151KB", fun(ZDBBL) -> @@ -574,6 +573,11 @@ end}. {datatype, {enum, [ignore, disconnect]}} ]}. +%% @doc time interval to clean flapping records +{mapping, "flapping_clean_interval", "emqx.flapping_clean_interval", [ + {datatype, {duration, ms}} +]}. + {validator, "range:gt_0", "must greater than 0", fun(X) -> X > 0 end }. @@ -814,6 +818,18 @@ end}. {datatype, {enum, [true, false]}} ]}. +{mapping, "zone.$name.enable_flapping_detect", "emqx.zones", [ + {datatype, flag} +]}. + +{mapping, "zone.$name.flapping_threshold", "emqx.zones", [ + {datatype, string} +]}. + +{mapping, "zone.$name.flapping_expiry_interval", "emqx.zones", [ + {datatype, {duration, s}} +]}. + %% @doc Force connection/session process GC after this number of %% messages | bytes passed through. %% Numbers delimited by `|'. Zero or negative is to disable. @@ -845,6 +861,15 @@ end}. {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> {mqtt_retain_available, Val}; + ("flapping_threshold", Val) -> + [Limit, Duration] = string:tokens(Val, ", "), + FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of + Min when is_integer(Min) -> + {list_to_integer(Limit), Min}; + {error, Reason} -> + error(Reason) + end, + {flapping_threshold, FlappingThreshold}; ("wildcard_subscription", Val) -> {mqtt_wildcard_subscription, Val}; ("shared_subscription", Val) -> @@ -2053,11 +2078,8 @@ end}. ]}. {translation, "emqx.sysmon", fun(Conf) -> - [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)}, - {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)}, - {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)}, - {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, - {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] + Configs = cuttlefish_variable:filter_by_prefix("sysmon", Conf), + [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] end}. %%-------------------------------------------------------------------- @@ -2095,12 +2117,8 @@ end}. ]}. {translation, "emqx.os_mon", fun(Conf) -> - [{cpu_check_interval, cuttlefish:conf_get("os_mon.cpu_check_interval", Conf)}, - {cpu_high_watermark, cuttlefish:conf_get("os_mon.cpu_high_watermark", Conf)}, - {cpu_low_watermark, cuttlefish:conf_get("os_mon.cpu_low_watermark", Conf)}, - {mem_check_interval, cuttlefish:conf_get("os_mon.mem_check_interval", Conf)}, - {sysmem_high_watermark, cuttlefish:conf_get("os_mon.sysmem_high_watermark", Conf)}, - {procmem_high_watermark, cuttlefish:conf_get("os_mon.procmem_high_watermark", Conf)}] + Configs = cuttlefish_variable:filter_by_prefix("os_mon", Conf), + [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] end}. %%-------------------------------------------------------------------- @@ -2122,7 +2140,6 @@ end}. ]}. {translation, "emqx.vm_mon", fun(Conf) -> - [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)}, - {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)}, - {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}] + Configs = cuttlefish_variable:filter_by_prefix("vm_mon", Conf), + [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] end}. diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index ab21bce50..126562401 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -70,13 +70,13 @@ check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) - orelse ets:member(?TAB, {username, Username}) orelse ets:member(?TAB, {ipaddr, IPAddr}). --spec(add(#banned{}) -> ok). +-spec(add(emqx_types:banned()) -> ok). add(Banned) when is_record(Banned, banned) -> mnesia:dirty_write(?TAB, Banned). -spec(delete({client_id, emqx_types:client_id()} - | {username, emqx_types:username()} - | {peername, emqx_types:peername()}) -> ok). + | {username, emqx_types:username()} + | {peername, emqx_types:peername()}) -> ok). delete(Key) -> mnesia:dirty_delete(?TAB, Key). @@ -127,4 +127,3 @@ expire_banned_items(Now) -> mnesia:delete_object(?TAB, B, sticky_write); (_, _Acc) -> ok end, ok, ?TAB). - diff --git a/src/emqx_bridge_mqtt.erl b/src/emqx_bridge_mqtt.erl index 870efe51e..8a66f77a0 100644 --- a/src/emqx_bridge_mqtt.erl +++ b/src/emqx_bridge_mqtt.erl @@ -56,7 +56,9 @@ start(Config = #{address := Address}) -> ClientConfig = Config#{msg_handler => Handlers, owner => AckCollector, host => Host, - port => Port}, + port => Port, + bridge_mode => true + }, case emqx_client:start_link(ClientConfig) of {ok, Pid} -> case emqx_client:connect(Pid) of diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 96dec6716..cd83e61ad 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -88,7 +88,7 @@ ]). %% Default timeout --define(DEFAULT_KEEPALIVE, 60000). +-define(DEFAULT_KEEPALIVE, 60). -define(DEFAULT_ACK_TIMEOUT, 30000). -define(DEFAULT_CONNECT_TIMEOUT, 60000). diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index 6b0a8fb15..19940da05 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -30,11 +30,20 @@ init([]) -> shutdown => 1000, type => worker, modules => [emqx_banned]}, + FlappingOption = emqx_config:get_env(flapping_clean_interval, 3600000), + Flapping = #{id => flapping, + start => {emqx_flapping, start_link, [FlappingOption]}, + restart => permanent, + shutdown => 1000, + type => worker, + modules => [emqx_flapping]}, Manager = #{id => manager, start => {emqx_cm, start_link, []}, restart => permanent, shutdown => 2000, type => worker, modules => [emqx_cm]}, - {ok, {{one_for_one, 10, 100}, [Banned, Manager]}}. - + SupFlags = #{strategy => one_for_one, + intensity => 100, + period => 10}, + {ok, {SupFlags, [Banned, Manager, Flapping]}}. diff --git a/src/emqx_config.erl b/src/emqx_config.erl index 3d37fc001..fd80de0c2 100644 --- a/src/emqx_config.erl +++ b/src/emqx_config.erl @@ -19,7 +19,6 @@ %% 1. Store in mnesia database? %% 2. Store in dets? %% 3. Store in data/app.config? -%% -module(emqx_config). @@ -138,4 +137,3 @@ read_(_App) -> error(no_impl). % end, [], Configs), % RequiredCfg ++ OptionalCfg % end. - diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 5beee28bb..89b84dc6b 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -242,10 +242,10 @@ connected(info, {deliver, PubOrAck}, State = #state{proto_state = ProtoState}) - connected(info, {keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) -> StatFun = fun() -> - case Transport:getstat(Socket, [recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; - Error -> Error - end + case Transport:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + Error -> Error + end end, case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of {ok, KeepAlive} -> diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index ed1d3e0c8..7e369a2e3 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -12,70 +12,150 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%% @doc TODO: -%% 1. Flapping Detection -%% 2. Conflict Detection? -module(emqx_flapping). -%% Use ets:update_counter??? +-include("emqx.hrl"). +-include("logger.hrl"). +-include("types.hrl"). --behaviour(gen_server). +-behaviour(gen_statem). --export([start_link/0]). +-export([start_link/1]). --export([ is_banned/1 - , banned/1 +%% This module is used to garbage clean the flapping records + +%% gen_statem callbacks +-export([ terminate/3 + , code_change/4 + , init/1 + , initialized/3 + , callback_mode/0 ]). -%% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). +-define(FLAPPING_TAB, ?MODULE). --define(SERVER, ?MODULE). +-export([check/3]). --record(state, {}). +-record(flapping, + { client_id :: binary() + , check_count :: integer() + , timestamp :: integer() + }). --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-type(flapping_record() :: #flapping{}). +-type(flapping_state() :: flapping | ok). -is_banned(ClientId) -> - ets:member(banned, ClientId). -banned(ClientId) -> - ets:insert(banned, {ClientId, os:timestamp()}). +%% @doc This function is used to initialize flapping records +%% the expiry time unit is minutes. +-spec(init_flapping(ClientId :: binary(), Interval :: integer()) -> flapping_record()). +init_flapping(ClientId, Interval) -> + #flapping{ client_id = ClientId + , check_count = 1 + , timestamp = emqx_time:now_secs() + Interval + }. + +%% @doc This function is used to initialize flapping records +%% the expiry time unit is minutes. +-spec(check( Action :: atom() + , ClientId :: binary() + , Threshold :: {integer(), integer()}) + -> flapping_state()). +check(Action, ClientId, Threshold = {_TimesThreshold, TimeInterval}) -> + check(Action, ClientId, Threshold, init_flapping(ClientId, TimeInterval)). + +-spec(check( Action :: atom() + , ClientId :: binary() + , Threshold :: {integer(), integer()} + , InitFlapping :: flapping_record()) + -> flapping_state()). +check(Action, ClientId, Threshold, InitFlapping) -> + try ets:update_counter(?FLAPPING_TAB, ClientId, {_Pos = #flapping.check_count, 1}) of + CheckCount -> + case ets:lookup(?FLAPPING_TAB, ClientId) of + [Flapping] -> + check_flapping(Action, CheckCount, Threshold, Flapping); + _Flapping -> + ok + end + catch + error:badarg -> + ets:insert_new(?FLAPPING_TAB, InitFlapping), + ok + end. + +-spec(check_flapping( Action :: atom() + , CheckTimes :: integer() + , Threshold :: {integer(), integer()} + , InitFlapping :: flapping_record()) + -> flapping_state()). +check_flapping(Action, CheckTimes, _Threshold = {TimesThreshold, TimeInterval}, + Flapping = #flapping{ client_id = ClientId + , timestamp = Timestamp }) -> + case emqx_time:now_secs() of + NowTimestamp when NowTimestamp =< Timestamp, + CheckTimes > TimesThreshold -> + ets:delete(?FLAPPING_TAB, ClientId), + flapping; + NowTimestamp when NowTimestamp > Timestamp, + Action =:= disconnect -> + ets:delete(?FLAPPING_TAB, ClientId), + ok; + NowTimestamp -> + NewFlapping = Flapping#flapping{timestamp = NowTimestamp + TimeInterval}, + ets:insert(?FLAPPING_TAB, NewFlapping), + ok + end. %%-------------------------------------------------------------------- -%% gen_server callbacks +%% gen_statem callbacks %%-------------------------------------------------------------------- +-spec(start_link(TimerInterval :: integer()) -> startlink_ret()). +start_link(TimerInterval) -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []). -init([]) -> - %% ets:new(banned, [public, ordered_set, named_table]), - {ok, #state{}}. +init([TimerInterval]) -> + TabOpts = [ public + , set + , {keypos, 2} + , {write_concurrency, true} + , {read_concurrency, true}], + ok = emqx_tables:new(?FLAPPING_TAB, TabOpts), + {ok, initialized, #{timer_interval => TimerInterval}}. -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. +callback_mode() -> [state_functions, state_enter]. -handle_cast(_Msg, State) -> - {noreply, State}. +initialized(enter, _OldState, #{timer_interval := Time}) -> + Action = {state_timeout, Time, clean_expired_records}, + {keep_state_and_data, Action}; +initialized(state_timeout, clean_expired_records, #{}) -> + clean_expired_records(), + repeat_state_and_data. -handle_info(_Info, State) -> - {noreply, State}. +code_change(_Vsn, State, Data, _Extra) -> + {ok, State, Data}. -terminate(_Reason, _State) -> +terminate(_Reason, _StateName, _State) -> + emqx_tables:delete(?FLAPPING_TAB), ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- +%% @doc clean expired records in ets +clean_expired_records() -> + Records = ets:tab2list(?FLAPPING_TAB), + traverse_records(Records). +traverse_records([]) -> + ok; +traverse_records([#flapping{client_id = ClientId, + timestamp = Timestamp} | LeftRecords]) -> + case emqx_time:now_secs() > Timestamp of + true -> + ets:delete(?FLAPPING_TAB, ClientId); + false -> + true + end, + traverse_records(LeftRecords). diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index bdc440215..79c8ed3c8 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -141,16 +141,16 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> {Properties, Rest3} = parse_properties(Rest2, ProtoVer), {ClientId, Rest4} = parse_utf8_string(Rest3), - ConnPacket = #mqtt_packet_connect{proto_name = ProtoName, - proto_ver = ProtoVer, - is_bridge = (BridgeTag =:= 8), - clean_start = bool(CleanStart), - will_flag = bool(WillFlag), - will_qos = WillQoS, - will_retain = bool(WillRetain), - keepalive = KeepAlive, - properties = Properties, - client_id = ClientId}, + ConnPacket = #mqtt_packet_connect{proto_name = ProtoName, + proto_ver = ProtoVer, + is_bridge = (BridgeTag =:= 8), + clean_start = bool(CleanStart), + will_flag = bool(WillFlag), + will_qos = WillQoS, + will_retain = bool(WillRetain), + keepalive = KeepAlive, + properties = Properties, + client_id = ClientId}, {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4), {Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)), {Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)), diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 54baac636..3de7f977d 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -60,6 +60,7 @@ is_bridge, enable_ban, enable_acl, + enable_flapping_detect, acl_deny_action, recv_stats, send_stats, @@ -90,31 +91,32 @@ init(SocketOpts = #{ peername := Peername , peercert := Peercert , sendfun := SendFun}, Options) -> Zone = proplists:get_value(zone, Options), - #pstate{zone = Zone, - sendfun = SendFun, - peername = Peername, - peercert = Peercert, - proto_ver = ?MQTT_PROTO_V4, - proto_name = <<"MQTT">>, - client_id = <<>>, - is_assigned = false, - conn_pid = self(), - username = init_username(Peercert, Options), - clean_start = false, - topic_aliases = #{}, - packet_size = emqx_zone:get_env(Zone, max_packet_size), - is_bridge = false, - enable_ban = emqx_zone:get_env(Zone, enable_ban, false), - enable_acl = emqx_zone:get_env(Zone, enable_acl), - acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore), - recv_stats = #{msg => 0, pkt => 0}, - send_stats = #{msg => 0, pkt => 0}, - connected = false, - ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), - topic_alias_maximum = #{to_client => 0, from_client => 0}, - conn_mod = maps:get(conn_mod, SocketOpts, undefined), - credentials = #{}, - ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}. + #pstate{zone = Zone, + sendfun = SendFun, + peername = Peername, + peercert = Peercert, + proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + client_id = <<>>, + is_assigned = false, + conn_pid = self(), + username = init_username(Peercert, Options), + clean_start = false, + topic_aliases = #{}, + packet_size = emqx_zone:get_env(Zone, max_packet_size), + is_bridge = false, + enable_ban = emqx_zone:get_env(Zone, enable_ban, false), + enable_acl = emqx_zone:get_env(Zone, enable_acl), + enable_flapping_detect = emqx_zone:get_env(Zone, enable_flapping_detect, false), + acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore), + recv_stats = #{msg => 0, pkt => 0}, + send_stats = #{msg => 0, pkt => 0}, + connected = false, + ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), + topic_alias_maximum = #{to_client => 0, from_client => 0}, + conn_mod = maps:get(conn_mod, SocketOpts, undefined), + credentials = #{}, + ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -766,6 +768,7 @@ make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer, check_connect(Packet, PState) -> run_check_steps([fun check_proto_ver/2, fun check_client_id/2, + fun check_flapping/2, fun check_banned/2, fun check_will_topic/2], Packet, PState). @@ -798,6 +801,9 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone} false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} end. +check_flapping(#mqtt_packet_connect{}, PState) -> + do_flapping_detect(connect, PState). + check_banned(_ConnPkt, #pstate{enable_ban = false}) -> ok; check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username}, @@ -896,14 +902,16 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) -> terminate(_Reason, #pstate{client_id = undefined}) -> ok; -terminate(_Reason, #pstate{connected = false}) -> +terminate(_Reason, PState = #pstate{connected = false}) -> + do_flapping_detect(disconnect, PState), ok; -terminate(conflict, _PState) -> - ok; -terminate(discard, _PState) -> +terminate(Reason, PState) when Reason =:= conflict; + Reason =:= discard -> + do_flapping_detect(disconnect, PState), ok; -terminate(Reason, #pstate{credentials = Credentials}) -> +terminate(Reason, PState = #pstate{credentials = Credentials}) -> + do_flapping_detect(disconnect, PState), ?LOG(info, "[Protocol] Shutdown for ~p", [Reason]), ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]). @@ -932,6 +940,26 @@ flag(true) -> 1. %%------------------------------------------------------------------------------ %% Execute actions in case acl deny +do_flapping_detect(Action, #pstate{zone = Zone, + client_id = ClientId, + enable_flapping_detect = true}) -> + ExpiryInterval = emqx_zone:get_env(Zone, flapping_expiry_interval, 3600000), + Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20), + Until = erlang:system_time(second) + ExpiryInterval, + case emqx_flapping:check(Action, ClientId, Threshold) of + flapping -> + emqx_banned:add(#banned{who = {client_id, ClientId}, + reason = <<"flapping">>, + by = <<"flapping_checker">>, + until = Until + }), + ok; + _Other -> + ok + end; +do_flapping_detect(_Action, _PState) -> + ok. + do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload), ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer, acl_deny_action = disconnect}) -> diff --git a/src/emqx_tables.erl b/src/emqx_tables.erl index 2c11b9d88..16812036a 100644 --- a/src/emqx_tables.erl +++ b/src/emqx_tables.erl @@ -14,7 +14,7 @@ -module(emqx_tables). --export([new/2]). +-export([new/2, delete/1]). -export([ lookup_value/2 , lookup_value/3 @@ -30,6 +30,16 @@ new(Tab, Opts) -> Tab -> ok end. +-spec(delete(atom()) -> ok). +delete(Tab) -> + case ets:info(Tab, name) of + undefined -> + ok; + Tab -> + ets:delete(Tab), + ok + end. + %% KV lookup -spec(lookup_value(atom(), term()) -> any()). lookup_value(Tab, Key) -> @@ -42,4 +52,3 @@ lookup_value(Tab, Key, Def) -> catch error:badarg -> Def end. - diff --git a/src/emqx_types.erl b/src/emqx_types.erl index c8c274b70..021609dc8 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -53,6 +53,7 @@ -export_type([ alarm/0 , plugin/0 + , banned/0 , command/0 ]). @@ -91,6 +92,7 @@ -type(topic_table() :: [{topic(), subopts()}]). -type(payload() :: binary() | iodata()). -type(message() :: #message{}). +-type(banned() :: #banned{}). -type(delivery() :: #delivery{}). -type(deliver_results() :: [{route, node(), topic()} | {dispatch, topic(), pos_integer()}]). @@ -98,4 +100,3 @@ -type(alarm() :: #alarm{}). -type(plugin() :: #plugin{}). -type(command() :: #command{}). - diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index 60f709ee5..e58868e78 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -62,6 +62,13 @@ groups() -> [compile_rule, match_rule]}]. +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teadown_steps(). + init_per_group(Group, Config) when Group =:= access_control; Group =:= access_control_cache_mode -> prepare_config(Group), diff --git a/test/emqx_ct_broker_helpers.erl b/test/emqx_ct_broker_helpers.erl index 88240be85..71e1d0d25 100644 --- a/test/emqx_ct_broker_helpers.erl +++ b/test/emqx_ct_broker_helpers.erl @@ -62,6 +62,7 @@ run_setup_steps(Config) -> NewConfig = generate_config(), lists:foreach(fun set_app_env/1, NewConfig), set_bridge_env(), + {ok, _} = application:ensure_all_started(?APP), set_log_level(Config), Config. @@ -109,32 +110,32 @@ set_bridge_env() -> change_opts(SslType) -> {ok, Listeners} = application:get_env(?APP, listeners), NewListeners = - lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) -> - case Protocol of - ssl -> - SslOpts = proplists:get_value(ssl_options, Opts), - Keyfile = local_path(["etc/certs", "key.pem"]), - Certfile = local_path(["etc/certs", "cert.pem"]), - TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}), - TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}), - TupleList3 = - case SslType of - ssl_twoway-> - CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]), - MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}), - lists:merge(TupleList2, MutSslList); - _ -> - lists:filter(fun ({cacertfile, _}) -> false; - ({verify, _}) -> false; - ({fail_if_no_peer_cert, _}) -> false; - (_) -> true - end, TupleList2) - end, - [{Protocol, Port, lists:keyreplace(ssl_options, 1, Opts, {ssl_options, TupleList3})} | Acc]; - _ -> - [Listener | Acc] - end - end, [], Listeners), + lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) -> + case Protocol of + ssl -> + SslOpts = proplists:get_value(ssl_options, Opts), + Keyfile = local_path(["etc/certs", "key.pem"]), + Certfile = local_path(["etc/certs", "cert.pem"]), + TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}), + TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}), + TupleList3 = + case SslType of + ssl_twoway-> + CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]), + MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}), + lists:merge(TupleList2, MutSslList); + _ -> + lists:filter(fun ({cacertfile, _}) -> false; + ({verify, _}) -> false; + ({fail_if_no_peer_cert, _}) -> false; + (_) -> true + end, TupleList2) + end, + [{Protocol, Port, lists:keyreplace(ssl_options, 1, Opts, {ssl_options, TupleList3})} | Acc]; + _ -> + [Listener | Acc] + end + end, [], Listeners), application:set_env(?APP, listeners, NewListeners). client_ssl_twoway() -> diff --git a/test/emqx_flapping_SUITE.erl b/test/emqx_flapping_SUITE.erl new file mode 100644 index 000000000..3317672cf --- /dev/null +++ b/test/emqx_flapping_SUITE.erl @@ -0,0 +1,60 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_flapping_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx.hrl"). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [t_flapping]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + prepare_for_test(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +t_flapping(_Config) -> + process_flag(trap_exit, true), + flapping_connect(5), + {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]), + {error, _} = emqx_client:connect(C), + receive + {'EXIT', Client, _Reason} -> + ct:log("receive exit signal, Client: ~p", [Client]) + after 1000 -> + ct:log("timeout") + end. + + +flapping_connect(Times) -> + [flapping_connect() || _ <- lists:seq(1, Times)]. + +flapping_connect() -> + {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]), + {ok, _} = emqx_client:connect(C), + ok = emqx_client:disconnect(C). + +prepare_for_test() -> + emqx_zone:set_env(external, enable_flapping_detect, true), + emqx_zone:set_env(external, flapping_threshold, {10, 60}), + emqx_zone:set_env(external, flapping_expiry_interval, 3600). diff --git a/test/emqx_tables_SUITE.erl b/test/emqx_tables_SUITE.erl index c282e93af..c028d3681 100644 --- a/test/emqx_tables_SUITE.erl +++ b/test/emqx_tables_SUITE.erl @@ -23,4 +23,6 @@ t_new(_) -> ok = emqx_tables:new(test_table, [{read_concurrency, true}]), ets:insert(test_table, {key, 100}), ok = emqx_tables:new(test_table, [{read_concurrency, true}]), - 100 = ets:lookup_element(test_table, key, 2). + 100 = ets:lookup_element(test_table, key, 2), + ok = emqx_tables:delete(test_table), + ok = emqx_tables:delete(test_table). diff --git a/test/emqx_zone_SUITE.erl b/test/emqx_zone_SUITE.erl index 23ef3c67d..7f17d5258 100644 --- a/test/emqx_zone_SUITE.erl +++ b/test/emqx_zone_SUITE.erl @@ -35,4 +35,3 @@ t_set_get_env(_) -> emqx_zone:force_reload(), ?assertEqual(val, emqx_zone:get_env(zone1, key)), emqx_zone:stop(). -