From 8cfd080a97d848b8d808aef435d6f87dbd045f42 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Sun, 7 Apr 2019 22:31:42 +0800 Subject: [PATCH 01/14] Fix gen_statem:reply/2 bug in emqx_client module Prior to this change, the arguments passed to gen_statem:reply is bad args, first arg should be a tuple, but it is a pid. So it would trigger crash. This change fix this bug. --- src/emqx_client.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 75b384fcb..371b020f1 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -1049,7 +1049,7 @@ timeout_calls(Timeout, Calls) -> timeout_calls(Now, Timeout, Calls) -> lists:foldl(fun(C = #call{from = From, ts = Ts}, Acc) -> case (timer:now_diff(Now, Ts) div 1000) >= Timeout of - true -> gen_statem:reply(From, {error, ack_timeout}), + true -> From ! {error, ack_timeout}, Acc; false -> [C | Acc] end @@ -1231,4 +1231,3 @@ bump_last_packet_id(State = #state{last_packet_id = Id}) -> -spec next_packet_id(packet_id()) -> packet_id(). next_packet_id(?MAX_PACKET_ID) -> 1; next_packet_id(Id) -> Id + 1. - From 627ea0afe8cfc8e144572ac1516f7984a0b5c55e Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Tue, 9 Apr 2019 11:32:37 +0800 Subject: [PATCH 02/14] Add `is_bridge_exist` api in emqx_bridge_sup --- src/emqx_bridge_sup.erl | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index fc0be3995..b00bb9012 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -20,11 +20,12 @@ %% APIs -export([ start_link/0 , start_link/1 - , bridges/0 ]). -export([ create_bridge/2 , drop_bridge/1 + , bridges/0 + , is_bridge_exist/1 ]). %% supervisor callbacks @@ -58,6 +59,13 @@ bridge_spec({Name, Config}) -> bridges() -> [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)]. +-spec(is_bridge_exist(atom() | pid()) -> boolean()). +is_bridge_exist(Id) -> + case supervisor:get_childspec(?SUP, Id) of + {ok, _ChildSpec} -> true; + {error, _Error} -> false + end. + create_bridge(Id, Config) -> supervisor:start_child(?SUP, bridge_spec({Id, Config})). @@ -69,4 +77,3 @@ drop_bridge(Id) -> ?LOG(error, "[Bridge] Delete bridge failed", [Error]), Error end. - From eb7b1797c258beeef5be99a7586fcb9bf6d84884 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Tue, 9 Apr 2019 18:38:10 +0800 Subject: [PATCH 03/14] Provide bridge handler to extense emqx_bridge --- src/emqx_bridge.erl | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index a15851f0f..6af56d11b 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -119,6 +119,7 @@ -define(DEFAULT_SEND_AHEAD, 8). -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). -define(DEFAULT_SEG_BYTES, (1 bsl 20)). +-define(NO_BRIDGE_HANDLER, undefined). -define(maybe_send, {next_event, internal, maybe_send}). %% @doc Start a bridge worker. Supported configs: @@ -277,7 +278,8 @@ init(Config) -> subscriptions => Subs, replayq => Queue, inflight => [], - connection => undefined + connection => undefined, + bridge_handler => Get(bridge_handler, ?NO_BRIDGE_HANDLER) }}. code_change(_Vsn, State, Data, _Extra) -> @@ -321,7 +323,10 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout, {ok, ConnRef, Conn} -> ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), Action = {state_timeout, 0, connected}, - {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action}; + {keep_state, + eval_bridge_handler(State#{ conn_ref => ConnRef + , connection => Conn}, connected), + Action}; error -> Action = {state_timeout, Timeout, reconnect}, {keep_state_and_data, Action} @@ -416,6 +421,12 @@ common(StateName, Type, Content, State) -> [name(), Type, StateName, Content]), {keep_state, State}. +eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) -> + State; +eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) -> + _ = Handler(Msg), + State. + ensure_present(Key, Topic, State) -> Topics = maps:get(Key, State), case is_topic_present(Topic, Topics) of @@ -553,9 +564,11 @@ disconnect(#{connection := Conn, connect_module := Module } = State) when Conn =/= undefined -> ok = Module:stop(ConnRef, Conn), - State#{conn_ref => undefined, - connection => undefined}; -disconnect(State) -> State. + eval_bridge_handler(State#{conn_ref => undefined, + connection => undefined}, + disconnected); +disconnect(State) -> + eval_bridge_handler(State, disconnected). %% Called only when replayq needs to dump it to disk. msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin); From a53320069be69ad1f80ebc315caf520ae33f6f94 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 10 Apr 2019 09:14:53 +0800 Subject: [PATCH 04/14] Fix trace logger level not work (#2408) * Fix trace logger level not work #2385 --- src/emqx_tracer.erl | 29 +++++++++++++++++++++++++---- test/emqx_tracer_SUITE.erl | 4 ++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index 464a28ad2..c85aa5007 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -76,11 +76,21 @@ trace(publish, #message{from = From, topic = Topic, payload = Payload}) %% @doc Start to trace client_id or topic. -spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}). start_trace({client_id, ClientId}, Level, LogFile) -> - start_trace({start_trace, {client_id, ClientId}, Level, LogFile}); + do_start_trace({client_id, ClientId}, Level, LogFile); start_trace({topic, Topic}, Level, LogFile) -> - start_trace({start_trace, {topic, Topic}, Level, LogFile}). + do_start_trace({topic, Topic}, Level, LogFile). -start_trace(Req) -> gen_server:call(?MODULE, Req, infinity). +do_start_trace(Who, Level, LogFile) -> + #{level := PrimaryLevel} = logger:get_primary_config(), + try logger:compare_levels(log_level(Level), PrimaryLevel) of + lt -> + {error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])}; + _GtOrEq -> + gen_server:call(?MODULE, {start_trace, Who, Level, LogFile}, 5000) + catch + _:Error -> + {error, Error} + end. %% @doc Stop tracing client_id or topic. -spec(stop_trace(trace_who()) -> ok | {error, term()}). @@ -109,7 +119,7 @@ handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = T config => #{type => halt, file => LogFile}, filter_default => stop, filters => [{meta_key_filter, - {fun filter_by_meta_key/2, Who} }]}) of + {fun filter_by_meta_key/2, Who} }]}) of ok -> ?LOG(info, "[Tracer] Start trace for ~p", [Who]), {reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}}; @@ -168,3 +178,14 @@ filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) -> end; _ -> ignore end. + +log_level(emergency) -> emergency; +log_level(alert) -> alert; +log_level(critical) -> critical; +log_level(error) -> error; +log_level(warning) -> warning; +log_level(notice) -> notice; +log_level(info) -> info; +log_level(debug) -> debug; +log_level(all) -> debug; +log_level(_) -> throw(invalid_log_level). diff --git a/test/emqx_tracer_SUITE.erl b/test/emqx_tracer_SUITE.erl index 323700823..327930fc3 100644 --- a/test/emqx_tracer_SUITE.erl +++ b/test/emqx_tracer_SUITE.erl @@ -38,8 +38,12 @@ start_traces(_Config) -> emqx_client:connect(T), %% Start tracing + emqx_logger:set_log_level(error), + {error, _} = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"), + emqx_logger:set_log_level(debug), ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"), ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"), + {error, invalid_log_level} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"), ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"), ct:sleep(100), From 6a4c318acbc27d6e8379ceb9f50e26d26f2973ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Wed, 10 Apr 2019 14:22:50 +0800 Subject: [PATCH 05/14] Update metrics when session process terminates --- src/emqx_session.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 2ef64c9aa..f1aa6d3bd 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -676,6 +676,7 @@ terminate(Reason, #state{will_msg = WillMsg, username = Username, conn_pid = ConnPid, old_conn_pid = OldConnPid}) -> + emqx_metrics:commit(), send_willmsg(WillMsg), [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]], ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]). From 4fc81cef855d09829ba655f343903ad7bc57f264 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Thu, 11 Apr 2019 12:20:32 +0800 Subject: [PATCH 06/14] Change log level unexpected info in client (#2422) Change log level for unexpected info in client and rejust code format in emqx_bridge --- src/emqx_bridge.erl | 14 ++++++-------- src/emqx_client.erl | 5 +++++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 6af56d11b..03e7a478e 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -323,10 +323,9 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout, {ok, ConnRef, Conn} -> ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), Action = {state_timeout, 0, connected}, - {keep_state, - eval_bridge_handler(State#{ conn_ref => ConnRef - , connection => Conn}, connected), - Action}; + State0 = State#{conn_ref => ConnRef, connection => Conn}, + State1 = eval_bridge_handler(State0, connected), + {keep_state, State1, Action}; error -> Action = {state_timeout, Timeout, reconnect}, {keep_state_and_data, Action} @@ -424,7 +423,7 @@ common(StateName, Type, Content, State) -> eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) -> State; eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) -> - _ = Handler(Msg), + Handler(Msg), State. ensure_present(Key, Topic, State) -> @@ -564,9 +563,8 @@ disconnect(#{connection := Conn, connect_module := Module } = State) when Conn =/= undefined -> ok = Module:stop(ConnRef, Conn), - eval_bridge_handler(State#{conn_ref => undefined, - connection => undefined}, - disconnected); + State0 = State#{conn_ref => undefined, connection => undefined}, + eval_bridge_handler(State0, disconnected); disconnect(State) -> eval_bridge_handler(State, disconnected). diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 371b020f1..737c699ac 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -928,6 +928,11 @@ handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> ?LOG(error, "[Client] Got tcp error: ~p", [Reason]), {stop, {shutdown, Reason}, State}; +handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) -> + ?LOG(error, "[Client] State: ~s, Unexpected Event: (info, ~p)", + [StateName, EventContent]), + keep_state_and_data; + handle_event(EventType, EventContent, StateName, _StateData) -> ?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)", [StateName, EventType, EventContent]), From 4d2bc48822bba8d45f4aa256141db63312ba431d Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 11 Apr 2019 16:40:02 +0800 Subject: [PATCH 07/14] Redesign ensure_start and ensure_stop api of bridge --- src/emqx_bridge.erl | 52 +++++++++++++++++++++++-------------- src/emqx_bridge_connect.erl | 3 +-- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 03e7a478e..3261a02bc 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -120,6 +120,7 @@ -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). -define(DEFAULT_SEG_BYTES, (1 bsl 20)). -define(NO_BRIDGE_HANDLER, undefined). +-define(NO_FROM, undefined). -define(maybe_send, {next_event, internal, maybe_send}). %% @doc Start a bridge worker. Supported configs: @@ -297,8 +298,7 @@ standing_by(enter, _, #{start_type := auto}) -> standing_by(enter, _, #{start_type := manual}) -> keep_state_and_data; standing_by({call, From}, ensure_started, State) -> - {next_state, connecting, State, - [{reply, From, ok}]}; + do_connect({call, From}, standing_by, State); standing_by(state_timeout, do_connect, State) -> {next_state, connecting, State}; standing_by(info, Info, State) -> @@ -313,23 +313,8 @@ standing_by(Type, Content, State) -> connecting(enter, connected, #{reconnect_delay_ms := Timeout}) -> Action = {state_timeout, Timeout, reconnect}, {keep_state_and_data, Action}; -connecting(enter, _, #{reconnect_delay_ms := Timeout, - connect_fun := ConnectFun, - subscriptions := Subs, - forwards := Forwards - } = State) -> - ok = subscribe_local_topics(Forwards), - case ConnectFun(Subs) of - {ok, ConnRef, Conn} -> - ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), - Action = {state_timeout, 0, connected}, - State0 = State#{conn_ref => ConnRef, connection => Conn}, - State1 = eval_bridge_handler(State0, connected), - {keep_state, State1, Action}; - error -> - Action = {state_timeout, Timeout, reconnect}, - {keep_state_and_data, Action} - end; +connecting(enter, _, State) -> + do_connect(enter, connecting, State); connecting(state_timeout, connected, State) -> {next_state, connected, State}; connecting(state_timeout, reconnect, _State) -> @@ -455,6 +440,35 @@ is_topic_present({Topic, _QoS}, Topics) -> is_topic_present(Topic, Topics) -> lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics). +do_connect(Type, StateName, #{ forwards := Forwards + , subscriptions := Subs + , connect_fun := ConnectFun + , reconnect_delay_ms := Timeout + } = State) -> + ok = subscribe_local_topics(Forwards), + From = case StateName of + standing_by -> {call, Pid} = Type, Pid; + connecting -> ?NO_FROM + end, + DoEvent = fun (standing_by, StandingbyAction, _ConnectingAction) -> + StandingbyAction; + (connecting, _StandingbyAction, ConnectingAction) -> + ConnectingAction + end, + case ConnectFun(Subs) of + {ok, ConnRef, Conn} -> + ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), + State0 = State#{conn_ref => ConnRef, connection => Conn}, + State1 = eval_bridge_handler(State0, connected), + StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]}, + ConnectingAction = {keep_state, State1, {state_timeout, 0, connected}}, + DoEvent(StateName, StandingbyAction, ConnectingAction); + {error, Reason} -> + StandingbyAction = {keep_state_and_data, [{reply, From, {error, Reason}}]}, + ConnectingAction = {keep_state_and_data, {state_timeout, Timeout, reconnect}}, + DoEvent(StateName, StandingbyAction, ConnectingAction) + end. + do_ensure_present(forwards, Topic, _) -> ok = subscribe_local_topic(Topic); do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule, diff --git a/src/emqx_bridge_connect.erl b/src/emqx_bridge_connect.erl index 37231ca88..8685451ae 100644 --- a/src/emqx_bridge_connect.erl +++ b/src/emqx_bridge_connect.erl @@ -56,7 +56,7 @@ start(Module, Config) -> Config1 = obfuscate(Config), ?LOG(error, "[Bridge connect] Failed to connect with module=~p\n" "config=~p\nreason:~p", [Module, Config1, Reason]), - error + {error, Reason} end. obfuscate(Map) -> @@ -69,4 +69,3 @@ obfuscate(Map) -> is_sensitive(password) -> true; is_sensitive(_) -> false. - From 81ef5b9b8d9bec72233bd8c0c83acc86e25a06d6 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Wed, 10 Apr 2019 18:50:28 +0800 Subject: [PATCH 08/14] Support cookie based auth --- src/emqx_protocol.erl | 14 ++++++++++---- src/emqx_ws_connection.erl | 12 ++++++++++++ test/rfc6455_client.erl | 2 +- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index a8bc0b288..06b0e7886 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -68,7 +68,8 @@ ignore_loop, topic_alias_maximum, conn_mod, - credentials + credentials, + ws_cookie }). -opaque(state() :: #pstate{}). @@ -85,7 +86,9 @@ %%------------------------------------------------------------------------------ -spec(init(map(), list()) -> state()). -init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> +init(SocketOpts = #{ peername := Peername + , peercert := Peercert + , sendfun := SendFun}, Options) -> Zone = proplists:get_value(zone, Options), #pstate{zone = Zone, sendfun = SendFun, @@ -110,7 +113,8 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), topic_alias_maximum = #{to_client => 0, from_client => 0}, conn_mod = maps:get(conn_mod, SocketOpts, undefined), - credentials = #{}}. + credentials = #{}, + ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -202,11 +206,13 @@ credentials(#pstate{zone = Zone, client_id = ClientId, username = Username, peername = Peername, - peercert = Peercert}) -> + peercert = Peercert, + ws_cookie = WsCookie}) -> with_cert(#{zone => Zone, client_id => ClientId, username => Username, peername => Peername, + ws_cookie => WsCookie, mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert). with_cert(Credentials, undefined) -> Credentials; diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 92ae6a7dd..1d6f3e244 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -138,10 +138,22 @@ websocket_init(#state{request = Req, options = Options}) -> Peername = cowboy_req:peer(Req), Sockname = cowboy_req:sock(Req), Peercert = cowboy_req:cert(Req), + WsCookie = try cowboy_req:parse_cookies(Req) + catch + error:badarg -> + ?LOG(error, "[WS Connection] Illegal cookie"), + undefined; + Error:Reason -> + ?LOG(error, + "[WS Connection] Cookie is parsed failed, Error: ~p, Reason ~p", + [Error, Reason]), + undefined + end, ProtoState = emqx_protocol:init(#{peername => Peername, sockname => Sockname, peercert => Peercert, sendfun => send_fun(self()), + ws_cookie => WsCookie, conn_mod => ?MODULE}, Options), ParserState = emqx_protocol:parser(ProtoState), Zone = proplists:get_value(zone, Options), diff --git a/test/rfc6455_client.erl b/test/rfc6455_client.erl index f5d8f1ef4..987b72407 100644 --- a/test/rfc6455_client.erl +++ b/test/rfc6455_client.erl @@ -36,7 +36,7 @@ new(WsUrl, PPid) -> addr = Addr, path = "/" ++ Path, ppid = PPid}, - spawn(fun () -> + spawn(fun() -> start_conn(State) end). From f1616c33d97b43133072490c62db68ce0bbe3e98 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Sat, 13 Apr 2019 10:35:59 +0800 Subject: [PATCH 09/14] Convert value of attribute table to map --- src/emqx_bridge_sup.erl | 2 +- src/emqx_client.erl | 2 +- src/emqx_connection.erl | 22 ++++++++--------- src/emqx_mod_presence.erl | 16 ++++++------ src/emqx_protocol.erl | 41 ++++++++++++++++--------------- src/emqx_ws_connection.erl | 16 ++++++------ test/emqx_connection_SUITE.erl | 16 ++++++------ test/emqx_ws_connection_SUITE.erl | 18 +++++++------- 8 files changed, 68 insertions(+), 65 deletions(-) diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index b00bb9012..a40e7b2e3 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -74,6 +74,6 @@ drop_bridge(Id) -> ok -> supervisor:delete_child(?SUP, Id); Error -> - ?LOG(error, "[Bridge] Delete bridge failed", [Error]), + ?LOG(error, "[Bridge] Delete bridge failed, error : ~p", [Error]), Error end. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 737c699ac..1c9b26a0f 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -929,7 +929,7 @@ handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> {stop, {shutdown, Reason}, State}; handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) -> - ?LOG(error, "[Client] State: ~s, Unexpected Event: (info, ~p)", + ?LOG(info, "[Client] State: ~s, Unexpected Event: (info, ~p)", [StateName, EventContent]), keep_state_and_data; diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index e9cfd6ae4..5beee28bb 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -87,15 +87,15 @@ info(#state{transport = Transport, rate_limit = RateLimit, pub_limit = PubLimit, proto_state = ProtoState}) -> - ConnInfo = [{socktype, Transport:type(Socket)}, - {peername, Peername}, - {sockname, Sockname}, - {conn_state, ConnState}, - {active_n, ActiveN}, - {rate_limit, rate_limit_info(RateLimit)}, - {pub_limit, rate_limit_info(PubLimit)}], + ConnInfo = #{socktype => Transport:type(Socket), + peername => Peername, + sockname => Sockname, + conn_state => ConnState, + active_n => ActiveN, + rate_limit => rate_limit_info(RateLimit), + pub_limit => rate_limit_info(PubLimit)}, ProtoInfo = emqx_protocol:info(ProtoState), - lists:usort(lists:append(ConnInfo, ProtoInfo)). + maps:merge(ConnInfo, ProtoInfo). rate_limit_info(undefined) -> #{}; @@ -109,10 +109,10 @@ attrs(CPid) when is_pid(CPid) -> attrs(#state{peername = Peername, sockname = Sockname, proto_state = ProtoState}) -> - SockAttrs = [{peername, Peername}, - {sockname, Sockname}], + SockAttrs = #{peername => Peername, + sockname => Sockname}, ProtoAttrs = emqx_protocol:attrs(ProtoState), - lists:usort(lists:append(SockAttrs, ProtoAttrs)). + maps:merge(SockAttrs, ProtoAttrs). %% Conn stats stats(CPid) when is_pid(CPid) -> diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index d2ce98abc..9789474d7 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -42,12 +42,15 @@ load(Env) -> on_client_connected(#{client_id := ClientId, username := Username, peername := {IpAddr, _}}, ConnAck, ConnAttrs, Env) -> - Attrs = lists:filter(fun({K, _}) -> lists:member(K, ?ATTR_KEYS) end, ConnAttrs), - case emqx_json:safe_encode([{clientid, ClientId}, - {username, Username}, - {ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))}, - {connack, ConnAck}, - {ts, os:system_time(second)} | Attrs]) of + Attrs = maps:filter(fun(K, _) -> + lists:member(K, ?ATTR_KEYS) + end, ConnAttrs), + case emqx_json:safe_encode(Attrs#{clientid => ClientId, + username => Username, + ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), + connack => ConnAck, + ts => os:system_time(second) + }) of {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> @@ -84,4 +87,3 @@ qos(Env) -> proplists:get_value(qos, Env, 0). reason(Reason) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; reason(_) -> internal_error. - diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 06b0e7886..54baac636 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -139,12 +139,13 @@ info(PState = #pstate{conn_props = ConnProps, topic_aliases = Aliases, will_msg = WillMsg, enable_acl = EnableAcl}) -> - attrs(PState) ++ [{conn_props, ConnProps}, - {ack_props, AckProps}, - {session, Session}, - {topic_aliases, Aliases}, - {will_msg, WillMsg}, - {enable_acl, EnableAcl}]. + maps:merge(attrs(PState), #{conn_props => ConnProps, + ack_props => AckProps, + session => Session, + topic_aliases => Aliases, + will_msg => WillMsg, + enable_acl => EnableAcl + }). attrs(#pstate{zone = Zone, client_id = ClientId, @@ -159,20 +160,20 @@ attrs(#pstate{zone = Zone, connected_at = ConnectedAt, conn_mod = ConnMod, credentials = Credentials}) -> - [{zone, Zone}, - {client_id, ClientId}, - {username, Username}, - {peername, Peername}, - {peercert, Peercert}, - {proto_ver, ProtoVer}, - {proto_name, ProtoName}, - {clean_start, CleanStart}, - {keepalive, Keepalive}, - {is_bridge, IsBridge}, - {connected_at, ConnectedAt}, - {conn_mod, ConnMod}, - {credentials, Credentials} - ]. + #{ zone => Zone + , client_id => ClientId + , username => Username + , peername => Peername + , peercert => Peercert + , proto_ver => ProtoVer + , proto_name => ProtoName + , clean_start => CleanStart + , keepalive => Keepalive + , is_bridge => IsBridge + , connected_at => ConnectedAt + , conn_mod => ConnMod + , credentials => Credentials + }. attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> get_property('Receive-Maximum', ConnProps, 65535); diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 1d6f3e244..44e1eb681 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -61,11 +61,11 @@ info(#state{peername = Peername, sockname = Sockname, proto_state = ProtoState}) -> ProtoInfo = emqx_protocol:info(ProtoState), - ConnInfo = [{socktype, websocket}, - {conn_state, running}, - {peername, Peername}, - {sockname, Sockname}], - lists:append([ConnInfo, ProtoInfo]). + ConnInfo = #{socktype => websocket, + conn_state => running, + peername => Peername, + sockname => Sockname}, + maps:merge(ProtoInfo, ConnInfo). %% for dashboard attrs(WSPid) when is_pid(WSPid) -> @@ -74,10 +74,10 @@ attrs(WSPid) when is_pid(WSPid) -> attrs(#state{peername = Peername, sockname = Sockname, proto_state = ProtoState}) -> - SockAttrs = [{peername, Peername}, - {sockname, Sockname}], + SockAttrs = #{peername => Peername, + sockname => Sockname}, ProtoAttrs = emqx_protocol:attrs(ProtoState), - lists:usort(lists:append(SockAttrs, ProtoAttrs)). + maps:merge(SockAttrs, ProtoAttrs). stats(WSPid) when is_pid(WSPid) -> call(WSPid, stats); diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 9c9c3ab55..bd2293c97 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -51,16 +51,16 @@ t_connect_api(_Config) -> emqx_client:disconnect(T1). t_info(ConnInfo) -> - ?assertEqual(tcp, proplists:get_value(socktype, ConnInfo)), - ?assertEqual(running, proplists:get_value(conn_state, ConnInfo)), - ?assertEqual(<<"client1">>, proplists:get_value(client_id, ConnInfo)), - ?assertEqual(<<"testuser1">>, proplists:get_value(username, ConnInfo)), - ?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, ConnInfo)). + ?assertEqual(tcp, maps:get(socktype, ConnInfo)), + ?assertEqual(running, maps:get(conn_state, ConnInfo)), + ?assertEqual(<<"client1">>, maps:get(client_id, ConnInfo)), + ?assertEqual(<<"testuser1">>, maps:get(username, ConnInfo)), + ?assertEqual(<<"MQTT">>, maps:get(proto_name, ConnInfo)). t_attrs(AttrsData) -> - ?assertEqual(<<"client1">>, proplists:get_value(client_id, AttrsData)), - ?assertEqual(emqx_connection, proplists:get_value(conn_mod, AttrsData)), - ?assertEqual(<<"testuser1">>, proplists:get_value(username, AttrsData)). + ?assertEqual(<<"client1">>, maps:get(client_id, AttrsData)), + ?assertEqual(emqx_connection, maps:get(conn_mod, AttrsData)), + ?assertEqual(<<"testuser1">>, maps:get(username, AttrsData)). t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0), diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index c45344bae..289608428 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -73,16 +73,16 @@ raw_recv_pase(P) -> version => ?MQTT_PROTO_V4} }). t_info(InfoData) -> - ?assertEqual(websocket, proplists:get_value(socktype, InfoData)), - ?assertEqual(running, proplists:get_value(conn_state, InfoData)), - ?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, InfoData)), - ?assertEqual(<<"admin">>, proplists:get_value(username, InfoData)), - ?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, InfoData)). + ?assertEqual(websocket, maps:get(socktype, InfoData)), + ?assertEqual(running, maps:get(conn_state, InfoData)), + ?assertEqual(<<"mqtt_client">>, maps:get(client_id, InfoData)), + ?assertEqual(<<"admin">>, maps:get(username, InfoData)), + ?assertEqual(<<"MQTT">>, maps:get(proto_name, InfoData)). t_attrs(AttrsData) -> - ?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, AttrsData)), - ?assertEqual(emqx_ws_connection, proplists:get_value(conn_mod, AttrsData)), - ?assertEqual(<<"admin">>, proplists:get_value(username, AttrsData)). + ?assertEqual(<<"mqtt_client">>, maps:get(client_id, AttrsData)), + ?assertEqual(emqx_ws_connection, maps:get(conn_mod, AttrsData)), + ?assertEqual(<<"admin">>, maps:get(username, AttrsData)). t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0), @@ -91,4 +91,4 @@ t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(reductions, StatsData) >=0), ?assertEqual(true, proplists:get_value(recv_pkt, StatsData) =:=1), ?assertEqual(true, proplists:get_value(recv_msg, StatsData) >=0), - ?assertEqual(true, proplists:get_value(send_pkt, StatsData) =:=1). \ No newline at end of file + ?assertEqual(true, proplists:get_value(send_pkt, StatsData) =:=1). From 8821cfcfc74a246ab638896537cb81d173abc2a0 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Tue, 16 Apr 2019 22:38:56 +0200 Subject: [PATCH 10/14] Fix typo round_robbin -> round_robin in emqx.schema --- etc/emqx.conf | 2 +- priv/emqx.schema | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 15ad24dd1..05cc35413 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2122,7 +2122,7 @@ broker.session_locking_strategy = quorum ## ## Value: Enum ## - random -## - round_robbin +## - round_robin ## - sticky ## - hash broker.shared_subscription_strategy = random diff --git a/priv/emqx.schema b/priv/emqx.schema index e6944ba79..2224c9935 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1996,11 +1996,11 @@ end}. %% @doc Shared Subscription Dispatch Strategy. {mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [ - {default, round_robbin}, + {default, round_robin}, {datatype, {enum, [random, %% randomly pick a subscriber - round_robbin, %% round robin alive subscribers one message after another + round_robin, %% round robin alive subscribers one message after another sticky, %% pick a random subscriber and stick to it hash %% hash client ID to a group member ]}} @@ -2125,4 +2125,4 @@ end}. [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)}, {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)}, {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}] -end}. \ No newline at end of file +end}. From 94aa1738587e81c97ffaae2f5306d49b8af89030 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Wed, 17 Apr 2019 10:43:20 +0800 Subject: [PATCH 11/14] Fix bugs of keepalive value init of emqx_client (#2443) Fix bugs of keepalive value init of emqx_client --- src/emqx_client.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 1c9b26a0f..96dec6716 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -503,7 +503,7 @@ init([{username, Username} | Opts], State) -> init([{password, Password} | Opts], State) -> init(Opts, State#state{password = iolist_to_binary(Password)}); init([{keepalive, Secs} | Opts], State) -> - init(Opts, State#state{keepalive = timer:seconds(Secs)}); + init(Opts, State#state{keepalive = Secs}); init([{proto_ver, v3} | Opts], State) -> init(Opts, State#state{proto_ver = ?MQTT_PROTO_V3, proto_name = <<"MQIsdp">>}); @@ -1026,11 +1026,11 @@ publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), end. ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) -> - ensure_keepalive_timer(timer:seconds(Secs), State); + ensure_keepalive_timer(timer:seconds(Secs), State#state{keepalive = Secs}); ensure_keepalive_timer(State = #state{keepalive = 0}) -> State; ensure_keepalive_timer(State = #state{keepalive = I}) -> - ensure_keepalive_timer(I, State). + ensure_keepalive_timer(timer:seconds(I), State). ensure_keepalive_timer(I, State) when is_integer(I) -> State#state{keepalive_timer = erlang:start_timer(I, self(), keepalive)}. From 5680b191eed2b949a4406b1d1f1155cfce0c18a4 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 18 Apr 2019 14:05:40 +0800 Subject: [PATCH 12/14] Extend status api --- src/emqx_bridge.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 3261a02bc..a01837659 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -186,8 +186,10 @@ ensure_stopped(Id, Timeout) -> stop(Pid) -> gen_statem:stop(Pid). -status(Pid) -> - gen_statem:call(Pid, status). +status(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, status); +status(Id) -> + status(name(Id)). %% @doc This function is to be evaluated on message/batch receiver side. -spec import_batch(batch(), fun(() -> ok)) -> ok. From bcbb4b68e96abfd969a4b678ebdfd86aa2ce7627 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 19 Apr 2019 16:34:33 +0800 Subject: [PATCH 13/14] Add flapping detect feature --- Makefile | 2 +- etc/emqx.conf | 65 ++++++++++++- include/types.hrl | 1 - priv/emqx.schema | 49 ++++++---- src/emqx_banned.erl | 7 +- src/emqx_bridge_mqtt.erl | 4 +- src/emqx_client.erl | 2 +- src/emqx_cm_sup.erl | 13 ++- src/emqx_config.erl | 2 - src/emqx_connection.erl | 8 +- src/emqx_flapping.erl | 160 ++++++++++++++++++++++++-------- src/emqx_frame.erl | 20 ++-- src/emqx_protocol.erl | 88 ++++++++++++------ src/emqx_tables.erl | 13 ++- src/emqx_types.erl | 3 +- test/emqx_access_SUITE.erl | 7 ++ test/emqx_ct_broker_helpers.erl | 53 +++++------ test/emqx_flapping_SUITE.erl | 60 ++++++++++++ test/emqx_tables_SUITE.erl | 4 +- test/emqx_zone_SUITE.erl | 1 - 20 files changed, 416 insertions(+), 146 deletions(-) create mode 100644 test/emqx_flapping_SUITE.erl diff --git a/Makefile b/Makefile index f83b07330..c0d71c9bb 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \ - emqx_vm_mon emqx_alarm_handler emqx_rpc + emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/etc/emqx.conf b/etc/emqx.conf index 05cc35413..610898b53 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -438,6 +438,17 @@ acl_cache_ttl = 1m ## Default: ignore acl_deny_action = ignore +## The cleanning interval for flapping +## +## Value: Duration +## -d: day +## -h: hour +## -m: minute +## -s: second +## +## Default: 1h, 1 hour +## flapping_clean_interval = 1h + ##-------------------------------------------------------------------- ## MQTT Protocol ##-------------------------------------------------------------------- @@ -650,11 +661,35 @@ zone.external.mqueue_priorities = none ## Value: highest | lowest zone.external.mqueue_default_priority = highest -## Whether to enqueue Qos0 messages. +## Whether to enqueue QoS0 messages. ## ## Value: false | true zone.external.mqueue_store_qos0 = true +## Whether to turn on flapping detect +## +## Value: on | off +zone.external.enable_flapping_detect = off + +## The times of state change per min, specifying the threshold which is used to +## detect if the connection starts flapping +## +## Value: number +zone.external.flapping_threshold = 10, 1m + +## Flapping expiry interval for connections. +## This config entry is used to determine when the connection +## will be unbanned. +## +## Value: Duration +## -d: day +## -h: hour +## -m: minute +## -s: second +## +## Default: 1h, 1 hour +zone.external.flapping_expiry_interval = 1h + ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## ## Variables in mountpoint path: @@ -726,6 +761,30 @@ zone.internal.max_mqueue_len = 1000 ## Value: false | true zone.internal.mqueue_store_qos0 = true +## Whether to turn on flapping detect +## +## Value: on | off +zone.internal.enable_flapping_detect = off + +## The times of state change per second, specifying the threshold which is used to +## detect if the connection starts flapping +## +## Value: number +zone.internal.flapping_threshold = 10, 1m + +## Flapping expiry interval for connections. +## This config entry is used to determine when the connection +## will be unbanned. +## +## Value: Duration +## -d: day +## -h: hour +## -m: minute +## -s: second +## +## Default: 1h, 1 hour +zone.internal.flapping_expiry_interval = 1h + ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## ## Variables in mountpoint path: @@ -1784,13 +1843,13 @@ listener.wss.external.send_timeout_close = on ## SSL Ciphers used by the bridge. ## ## Value: String -#bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 +## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 ## Ciphers for TLS PSK. ## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot ## be configured at the same time. ## See 'https://tools.ietf.org/html/rfc4279#section-2'. -#bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA +## bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA ## Ping interval of a down bridge. ## diff --git a/include/types.hrl b/include/types.hrl index 8032bfe7e..85a9aadf0 100644 --- a/include/types.hrl +++ b/include/types.hrl @@ -19,4 +19,3 @@ -type(ok_or_error(Reason) :: ok | {error, Reason}). -type(ok_or_error(Value, Reason) :: {ok, Value} | {error, Reason}). - diff --git a/priv/emqx.schema b/priv/emqx.schema index 2224c9935..459349068 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -270,8 +270,7 @@ end}. X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes; _ -> undefined end - end -}. + end}. {validator, "zdbbl_range", "must be between 1KB and 2097151KB", fun(ZDBBL) -> @@ -574,6 +573,11 @@ end}. {datatype, {enum, [ignore, disconnect]}} ]}. +%% @doc time interval to clean flapping records +{mapping, "flapping_clean_interval", "emqx.flapping_clean_interval", [ + {datatype, {duration, ms}} +]}. + {validator, "range:gt_0", "must greater than 0", fun(X) -> X > 0 end }. @@ -814,6 +818,18 @@ end}. {datatype, {enum, [true, false]}} ]}. +{mapping, "zone.$name.enable_flapping_detect", "emqx.zones", [ + {datatype, flag} +]}. + +{mapping, "zone.$name.flapping_threshold", "emqx.zones", [ + {datatype, string} +]}. + +{mapping, "zone.$name.flapping_expiry_interval", "emqx.zones", [ + {datatype, {duration, s}} +]}. + %% @doc Force connection/session process GC after this number of %% messages | bytes passed through. %% Numbers delimited by `|'. Zero or negative is to disable. @@ -845,6 +861,15 @@ end}. {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> {mqtt_retain_available, Val}; + ("flapping_threshold", Val) -> + [Limit, Duration] = string:tokens(Val, ", "), + FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of + Min when is_integer(Min) -> + {list_to_integer(Limit), Min}; + {error, Reason} -> + error(Reason) + end, + {flapping_threshold, FlappingThreshold}; ("wildcard_subscription", Val) -> {mqtt_wildcard_subscription, Val}; ("shared_subscription", Val) -> @@ -2053,11 +2078,8 @@ end}. ]}. {translation, "emqx.sysmon", fun(Conf) -> - [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)}, - {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)}, - {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)}, - {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, - {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] + Configs = cuttlefish_variable:filter_by_prefix("sysmon", Conf), + [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] end}. %%-------------------------------------------------------------------- @@ -2095,12 +2117,8 @@ end}. ]}. {translation, "emqx.os_mon", fun(Conf) -> - [{cpu_check_interval, cuttlefish:conf_get("os_mon.cpu_check_interval", Conf)}, - {cpu_high_watermark, cuttlefish:conf_get("os_mon.cpu_high_watermark", Conf)}, - {cpu_low_watermark, cuttlefish:conf_get("os_mon.cpu_low_watermark", Conf)}, - {mem_check_interval, cuttlefish:conf_get("os_mon.mem_check_interval", Conf)}, - {sysmem_high_watermark, cuttlefish:conf_get("os_mon.sysmem_high_watermark", Conf)}, - {procmem_high_watermark, cuttlefish:conf_get("os_mon.procmem_high_watermark", Conf)}] + Configs = cuttlefish_variable:filter_by_prefix("os_mon", Conf), + [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] end}. %%-------------------------------------------------------------------- @@ -2122,7 +2140,6 @@ end}. ]}. {translation, "emqx.vm_mon", fun(Conf) -> - [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)}, - {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)}, - {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}] + Configs = cuttlefish_variable:filter_by_prefix("vm_mon", Conf), + [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] end}. diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index ab21bce50..126562401 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -70,13 +70,13 @@ check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) - orelse ets:member(?TAB, {username, Username}) orelse ets:member(?TAB, {ipaddr, IPAddr}). --spec(add(#banned{}) -> ok). +-spec(add(emqx_types:banned()) -> ok). add(Banned) when is_record(Banned, banned) -> mnesia:dirty_write(?TAB, Banned). -spec(delete({client_id, emqx_types:client_id()} - | {username, emqx_types:username()} - | {peername, emqx_types:peername()}) -> ok). + | {username, emqx_types:username()} + | {peername, emqx_types:peername()}) -> ok). delete(Key) -> mnesia:dirty_delete(?TAB, Key). @@ -127,4 +127,3 @@ expire_banned_items(Now) -> mnesia:delete_object(?TAB, B, sticky_write); (_, _Acc) -> ok end, ok, ?TAB). - diff --git a/src/emqx_bridge_mqtt.erl b/src/emqx_bridge_mqtt.erl index 870efe51e..8a66f77a0 100644 --- a/src/emqx_bridge_mqtt.erl +++ b/src/emqx_bridge_mqtt.erl @@ -56,7 +56,9 @@ start(Config = #{address := Address}) -> ClientConfig = Config#{msg_handler => Handlers, owner => AckCollector, host => Host, - port => Port}, + port => Port, + bridge_mode => true + }, case emqx_client:start_link(ClientConfig) of {ok, Pid} -> case emqx_client:connect(Pid) of diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 96dec6716..cd83e61ad 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -88,7 +88,7 @@ ]). %% Default timeout --define(DEFAULT_KEEPALIVE, 60000). +-define(DEFAULT_KEEPALIVE, 60). -define(DEFAULT_ACK_TIMEOUT, 30000). -define(DEFAULT_CONNECT_TIMEOUT, 60000). diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index 6b0a8fb15..19940da05 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -30,11 +30,20 @@ init([]) -> shutdown => 1000, type => worker, modules => [emqx_banned]}, + FlappingOption = emqx_config:get_env(flapping_clean_interval, 3600000), + Flapping = #{id => flapping, + start => {emqx_flapping, start_link, [FlappingOption]}, + restart => permanent, + shutdown => 1000, + type => worker, + modules => [emqx_flapping]}, Manager = #{id => manager, start => {emqx_cm, start_link, []}, restart => permanent, shutdown => 2000, type => worker, modules => [emqx_cm]}, - {ok, {{one_for_one, 10, 100}, [Banned, Manager]}}. - + SupFlags = #{strategy => one_for_one, + intensity => 100, + period => 10}, + {ok, {SupFlags, [Banned, Manager, Flapping]}}. diff --git a/src/emqx_config.erl b/src/emqx_config.erl index 3d37fc001..fd80de0c2 100644 --- a/src/emqx_config.erl +++ b/src/emqx_config.erl @@ -19,7 +19,6 @@ %% 1. Store in mnesia database? %% 2. Store in dets? %% 3. Store in data/app.config? -%% -module(emqx_config). @@ -138,4 +137,3 @@ read_(_App) -> error(no_impl). % end, [], Configs), % RequiredCfg ++ OptionalCfg % end. - diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 5beee28bb..89b84dc6b 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -242,10 +242,10 @@ connected(info, {deliver, PubOrAck}, State = #state{proto_state = ProtoState}) - connected(info, {keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) -> StatFun = fun() -> - case Transport:getstat(Socket, [recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; - Error -> Error - end + case Transport:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + Error -> Error + end end, case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of {ok, KeepAlive} -> diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index ed1d3e0c8..7e369a2e3 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -12,70 +12,150 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%% @doc TODO: -%% 1. Flapping Detection -%% 2. Conflict Detection? -module(emqx_flapping). -%% Use ets:update_counter??? +-include("emqx.hrl"). +-include("logger.hrl"). +-include("types.hrl"). --behaviour(gen_server). +-behaviour(gen_statem). --export([start_link/0]). +-export([start_link/1]). --export([ is_banned/1 - , banned/1 +%% This module is used to garbage clean the flapping records + +%% gen_statem callbacks +-export([ terminate/3 + , code_change/4 + , init/1 + , initialized/3 + , callback_mode/0 ]). -%% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). +-define(FLAPPING_TAB, ?MODULE). --define(SERVER, ?MODULE). +-export([check/3]). --record(state, {}). +-record(flapping, + { client_id :: binary() + , check_count :: integer() + , timestamp :: integer() + }). --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-type(flapping_record() :: #flapping{}). +-type(flapping_state() :: flapping | ok). -is_banned(ClientId) -> - ets:member(banned, ClientId). -banned(ClientId) -> - ets:insert(banned, {ClientId, os:timestamp()}). +%% @doc This function is used to initialize flapping records +%% the expiry time unit is minutes. +-spec(init_flapping(ClientId :: binary(), Interval :: integer()) -> flapping_record()). +init_flapping(ClientId, Interval) -> + #flapping{ client_id = ClientId + , check_count = 1 + , timestamp = emqx_time:now_secs() + Interval + }. + +%% @doc This function is used to initialize flapping records +%% the expiry time unit is minutes. +-spec(check( Action :: atom() + , ClientId :: binary() + , Threshold :: {integer(), integer()}) + -> flapping_state()). +check(Action, ClientId, Threshold = {_TimesThreshold, TimeInterval}) -> + check(Action, ClientId, Threshold, init_flapping(ClientId, TimeInterval)). + +-spec(check( Action :: atom() + , ClientId :: binary() + , Threshold :: {integer(), integer()} + , InitFlapping :: flapping_record()) + -> flapping_state()). +check(Action, ClientId, Threshold, InitFlapping) -> + try ets:update_counter(?FLAPPING_TAB, ClientId, {_Pos = #flapping.check_count, 1}) of + CheckCount -> + case ets:lookup(?FLAPPING_TAB, ClientId) of + [Flapping] -> + check_flapping(Action, CheckCount, Threshold, Flapping); + _Flapping -> + ok + end + catch + error:badarg -> + ets:insert_new(?FLAPPING_TAB, InitFlapping), + ok + end. + +-spec(check_flapping( Action :: atom() + , CheckTimes :: integer() + , Threshold :: {integer(), integer()} + , InitFlapping :: flapping_record()) + -> flapping_state()). +check_flapping(Action, CheckTimes, _Threshold = {TimesThreshold, TimeInterval}, + Flapping = #flapping{ client_id = ClientId + , timestamp = Timestamp }) -> + case emqx_time:now_secs() of + NowTimestamp when NowTimestamp =< Timestamp, + CheckTimes > TimesThreshold -> + ets:delete(?FLAPPING_TAB, ClientId), + flapping; + NowTimestamp when NowTimestamp > Timestamp, + Action =:= disconnect -> + ets:delete(?FLAPPING_TAB, ClientId), + ok; + NowTimestamp -> + NewFlapping = Flapping#flapping{timestamp = NowTimestamp + TimeInterval}, + ets:insert(?FLAPPING_TAB, NewFlapping), + ok + end. %%-------------------------------------------------------------------- -%% gen_server callbacks +%% gen_statem callbacks %%-------------------------------------------------------------------- +-spec(start_link(TimerInterval :: integer()) -> startlink_ret()). +start_link(TimerInterval) -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []). -init([]) -> - %% ets:new(banned, [public, ordered_set, named_table]), - {ok, #state{}}. +init([TimerInterval]) -> + TabOpts = [ public + , set + , {keypos, 2} + , {write_concurrency, true} + , {read_concurrency, true}], + ok = emqx_tables:new(?FLAPPING_TAB, TabOpts), + {ok, initialized, #{timer_interval => TimerInterval}}. -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. +callback_mode() -> [state_functions, state_enter]. -handle_cast(_Msg, State) -> - {noreply, State}. +initialized(enter, _OldState, #{timer_interval := Time}) -> + Action = {state_timeout, Time, clean_expired_records}, + {keep_state_and_data, Action}; +initialized(state_timeout, clean_expired_records, #{}) -> + clean_expired_records(), + repeat_state_and_data. -handle_info(_Info, State) -> - {noreply, State}. +code_change(_Vsn, State, Data, _Extra) -> + {ok, State, Data}. -terminate(_Reason, _State) -> +terminate(_Reason, _StateName, _State) -> + emqx_tables:delete(?FLAPPING_TAB), ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- +%% @doc clean expired records in ets +clean_expired_records() -> + Records = ets:tab2list(?FLAPPING_TAB), + traverse_records(Records). +traverse_records([]) -> + ok; +traverse_records([#flapping{client_id = ClientId, + timestamp = Timestamp} | LeftRecords]) -> + case emqx_time:now_secs() > Timestamp of + true -> + ets:delete(?FLAPPING_TAB, ClientId); + false -> + true + end, + traverse_records(LeftRecords). diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index bdc440215..79c8ed3c8 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -141,16 +141,16 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> {Properties, Rest3} = parse_properties(Rest2, ProtoVer), {ClientId, Rest4} = parse_utf8_string(Rest3), - ConnPacket = #mqtt_packet_connect{proto_name = ProtoName, - proto_ver = ProtoVer, - is_bridge = (BridgeTag =:= 8), - clean_start = bool(CleanStart), - will_flag = bool(WillFlag), - will_qos = WillQoS, - will_retain = bool(WillRetain), - keepalive = KeepAlive, - properties = Properties, - client_id = ClientId}, + ConnPacket = #mqtt_packet_connect{proto_name = ProtoName, + proto_ver = ProtoVer, + is_bridge = (BridgeTag =:= 8), + clean_start = bool(CleanStart), + will_flag = bool(WillFlag), + will_qos = WillQoS, + will_retain = bool(WillRetain), + keepalive = KeepAlive, + properties = Properties, + client_id = ClientId}, {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4), {Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)), {Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)), diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 54baac636..3de7f977d 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -60,6 +60,7 @@ is_bridge, enable_ban, enable_acl, + enable_flapping_detect, acl_deny_action, recv_stats, send_stats, @@ -90,31 +91,32 @@ init(SocketOpts = #{ peername := Peername , peercert := Peercert , sendfun := SendFun}, Options) -> Zone = proplists:get_value(zone, Options), - #pstate{zone = Zone, - sendfun = SendFun, - peername = Peername, - peercert = Peercert, - proto_ver = ?MQTT_PROTO_V4, - proto_name = <<"MQTT">>, - client_id = <<>>, - is_assigned = false, - conn_pid = self(), - username = init_username(Peercert, Options), - clean_start = false, - topic_aliases = #{}, - packet_size = emqx_zone:get_env(Zone, max_packet_size), - is_bridge = false, - enable_ban = emqx_zone:get_env(Zone, enable_ban, false), - enable_acl = emqx_zone:get_env(Zone, enable_acl), - acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore), - recv_stats = #{msg => 0, pkt => 0}, - send_stats = #{msg => 0, pkt => 0}, - connected = false, - ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), - topic_alias_maximum = #{to_client => 0, from_client => 0}, - conn_mod = maps:get(conn_mod, SocketOpts, undefined), - credentials = #{}, - ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}. + #pstate{zone = Zone, + sendfun = SendFun, + peername = Peername, + peercert = Peercert, + proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + client_id = <<>>, + is_assigned = false, + conn_pid = self(), + username = init_username(Peercert, Options), + clean_start = false, + topic_aliases = #{}, + packet_size = emqx_zone:get_env(Zone, max_packet_size), + is_bridge = false, + enable_ban = emqx_zone:get_env(Zone, enable_ban, false), + enable_acl = emqx_zone:get_env(Zone, enable_acl), + enable_flapping_detect = emqx_zone:get_env(Zone, enable_flapping_detect, false), + acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore), + recv_stats = #{msg => 0, pkt => 0}, + send_stats = #{msg => 0, pkt => 0}, + connected = false, + ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), + topic_alias_maximum = #{to_client => 0, from_client => 0}, + conn_mod = maps:get(conn_mod, SocketOpts, undefined), + credentials = #{}, + ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -766,6 +768,7 @@ make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer, check_connect(Packet, PState) -> run_check_steps([fun check_proto_ver/2, fun check_client_id/2, + fun check_flapping/2, fun check_banned/2, fun check_will_topic/2], Packet, PState). @@ -798,6 +801,9 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone} false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} end. +check_flapping(#mqtt_packet_connect{}, PState) -> + do_flapping_detect(connect, PState). + check_banned(_ConnPkt, #pstate{enable_ban = false}) -> ok; check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username}, @@ -896,14 +902,16 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) -> terminate(_Reason, #pstate{client_id = undefined}) -> ok; -terminate(_Reason, #pstate{connected = false}) -> +terminate(_Reason, PState = #pstate{connected = false}) -> + do_flapping_detect(disconnect, PState), ok; -terminate(conflict, _PState) -> - ok; -terminate(discard, _PState) -> +terminate(Reason, PState) when Reason =:= conflict; + Reason =:= discard -> + do_flapping_detect(disconnect, PState), ok; -terminate(Reason, #pstate{credentials = Credentials}) -> +terminate(Reason, PState = #pstate{credentials = Credentials}) -> + do_flapping_detect(disconnect, PState), ?LOG(info, "[Protocol] Shutdown for ~p", [Reason]), ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]). @@ -932,6 +940,26 @@ flag(true) -> 1. %%------------------------------------------------------------------------------ %% Execute actions in case acl deny +do_flapping_detect(Action, #pstate{zone = Zone, + client_id = ClientId, + enable_flapping_detect = true}) -> + ExpiryInterval = emqx_zone:get_env(Zone, flapping_expiry_interval, 3600000), + Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20), + Until = erlang:system_time(second) + ExpiryInterval, + case emqx_flapping:check(Action, ClientId, Threshold) of + flapping -> + emqx_banned:add(#banned{who = {client_id, ClientId}, + reason = <<"flapping">>, + by = <<"flapping_checker">>, + until = Until + }), + ok; + _Other -> + ok + end; +do_flapping_detect(_Action, _PState) -> + ok. + do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload), ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer, acl_deny_action = disconnect}) -> diff --git a/src/emqx_tables.erl b/src/emqx_tables.erl index 2c11b9d88..16812036a 100644 --- a/src/emqx_tables.erl +++ b/src/emqx_tables.erl @@ -14,7 +14,7 @@ -module(emqx_tables). --export([new/2]). +-export([new/2, delete/1]). -export([ lookup_value/2 , lookup_value/3 @@ -30,6 +30,16 @@ new(Tab, Opts) -> Tab -> ok end. +-spec(delete(atom()) -> ok). +delete(Tab) -> + case ets:info(Tab, name) of + undefined -> + ok; + Tab -> + ets:delete(Tab), + ok + end. + %% KV lookup -spec(lookup_value(atom(), term()) -> any()). lookup_value(Tab, Key) -> @@ -42,4 +52,3 @@ lookup_value(Tab, Key, Def) -> catch error:badarg -> Def end. - diff --git a/src/emqx_types.erl b/src/emqx_types.erl index c8c274b70..021609dc8 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -53,6 +53,7 @@ -export_type([ alarm/0 , plugin/0 + , banned/0 , command/0 ]). @@ -91,6 +92,7 @@ -type(topic_table() :: [{topic(), subopts()}]). -type(payload() :: binary() | iodata()). -type(message() :: #message{}). +-type(banned() :: #banned{}). -type(delivery() :: #delivery{}). -type(deliver_results() :: [{route, node(), topic()} | {dispatch, topic(), pos_integer()}]). @@ -98,4 +100,3 @@ -type(alarm() :: #alarm{}). -type(plugin() :: #plugin{}). -type(command() :: #command{}). - diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index 60f709ee5..e58868e78 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -62,6 +62,13 @@ groups() -> [compile_rule, match_rule]}]. +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teadown_steps(). + init_per_group(Group, Config) when Group =:= access_control; Group =:= access_control_cache_mode -> prepare_config(Group), diff --git a/test/emqx_ct_broker_helpers.erl b/test/emqx_ct_broker_helpers.erl index 88240be85..71e1d0d25 100644 --- a/test/emqx_ct_broker_helpers.erl +++ b/test/emqx_ct_broker_helpers.erl @@ -62,6 +62,7 @@ run_setup_steps(Config) -> NewConfig = generate_config(), lists:foreach(fun set_app_env/1, NewConfig), set_bridge_env(), + {ok, _} = application:ensure_all_started(?APP), set_log_level(Config), Config. @@ -109,32 +110,32 @@ set_bridge_env() -> change_opts(SslType) -> {ok, Listeners} = application:get_env(?APP, listeners), NewListeners = - lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) -> - case Protocol of - ssl -> - SslOpts = proplists:get_value(ssl_options, Opts), - Keyfile = local_path(["etc/certs", "key.pem"]), - Certfile = local_path(["etc/certs", "cert.pem"]), - TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}), - TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}), - TupleList3 = - case SslType of - ssl_twoway-> - CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]), - MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}), - lists:merge(TupleList2, MutSslList); - _ -> - lists:filter(fun ({cacertfile, _}) -> false; - ({verify, _}) -> false; - ({fail_if_no_peer_cert, _}) -> false; - (_) -> true - end, TupleList2) - end, - [{Protocol, Port, lists:keyreplace(ssl_options, 1, Opts, {ssl_options, TupleList3})} | Acc]; - _ -> - [Listener | Acc] - end - end, [], Listeners), + lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) -> + case Protocol of + ssl -> + SslOpts = proplists:get_value(ssl_options, Opts), + Keyfile = local_path(["etc/certs", "key.pem"]), + Certfile = local_path(["etc/certs", "cert.pem"]), + TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}), + TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}), + TupleList3 = + case SslType of + ssl_twoway-> + CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]), + MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}), + lists:merge(TupleList2, MutSslList); + _ -> + lists:filter(fun ({cacertfile, _}) -> false; + ({verify, _}) -> false; + ({fail_if_no_peer_cert, _}) -> false; + (_) -> true + end, TupleList2) + end, + [{Protocol, Port, lists:keyreplace(ssl_options, 1, Opts, {ssl_options, TupleList3})} | Acc]; + _ -> + [Listener | Acc] + end + end, [], Listeners), application:set_env(?APP, listeners, NewListeners). client_ssl_twoway() -> diff --git a/test/emqx_flapping_SUITE.erl b/test/emqx_flapping_SUITE.erl new file mode 100644 index 000000000..3317672cf --- /dev/null +++ b/test/emqx_flapping_SUITE.erl @@ -0,0 +1,60 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_flapping_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx.hrl"). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [t_flapping]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + prepare_for_test(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +t_flapping(_Config) -> + process_flag(trap_exit, true), + flapping_connect(5), + {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]), + {error, _} = emqx_client:connect(C), + receive + {'EXIT', Client, _Reason} -> + ct:log("receive exit signal, Client: ~p", [Client]) + after 1000 -> + ct:log("timeout") + end. + + +flapping_connect(Times) -> + [flapping_connect() || _ <- lists:seq(1, Times)]. + +flapping_connect() -> + {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]), + {ok, _} = emqx_client:connect(C), + ok = emqx_client:disconnect(C). + +prepare_for_test() -> + emqx_zone:set_env(external, enable_flapping_detect, true), + emqx_zone:set_env(external, flapping_threshold, {10, 60}), + emqx_zone:set_env(external, flapping_expiry_interval, 3600). diff --git a/test/emqx_tables_SUITE.erl b/test/emqx_tables_SUITE.erl index c282e93af..c028d3681 100644 --- a/test/emqx_tables_SUITE.erl +++ b/test/emqx_tables_SUITE.erl @@ -23,4 +23,6 @@ t_new(_) -> ok = emqx_tables:new(test_table, [{read_concurrency, true}]), ets:insert(test_table, {key, 100}), ok = emqx_tables:new(test_table, [{read_concurrency, true}]), - 100 = ets:lookup_element(test_table, key, 2). + 100 = ets:lookup_element(test_table, key, 2), + ok = emqx_tables:delete(test_table), + ok = emqx_tables:delete(test_table). diff --git a/test/emqx_zone_SUITE.erl b/test/emqx_zone_SUITE.erl index 23ef3c67d..7f17d5258 100644 --- a/test/emqx_zone_SUITE.erl +++ b/test/emqx_zone_SUITE.erl @@ -35,4 +35,3 @@ t_set_get_env(_) -> emqx_zone:force_reload(), ?assertEqual(val, emqx_zone:get_env(zone1, key)), emqx_zone:stop(). - From a8d2497480f6fdc1ae7ee79a052a4ee32a9b8458 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 19 Apr 2019 16:33:48 +0800 Subject: [PATCH 14/14] Truncate logs that is too long --- etc/emqx.conf | 6 ++++++ priv/emqx.schema | 12 +++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 610898b53..5da71b1d2 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -368,6 +368,12 @@ log.dir = {{ platform_log_dir }} ## Default: emqx.log log.file = emqx.log +## Limits the total number of characters printed for each log event. +## +## Value: Integer +## Default: 1024 +log.chars_limit = 1024 + ## Maximum size of each log file. ## ## Value: Number diff --git a/priv/emqx.schema b/priv/emqx.schema index 459349068..f28913565 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -420,6 +420,11 @@ end}. {datatype, file} ]}. +{mapping, "log.chars_limit", "log.chars_limit", [ + {default, 1024}, + {datatype, integer} +]}. + {mapping, "log.rotation.size", "kernel.logger", [ {default, "10MB"}, {datatype, bytesize} @@ -464,6 +469,10 @@ end}. {translation, "kernel.logger", fun(Conf) -> LogTo = cuttlefish:conf_get("log.to", Conf), LogLevel = cuttlefish:conf_get("log.level", Conf), + CharsLimit = case cuttlefish:conf_get("log.chars_limit", Conf) of + -1 -> unlimited; + V -> V + end, Formatter = {emqx_logger_formatter, #{template => [time," [",level,"] ", @@ -474,7 +483,8 @@ end}. [{peername, [peername," "], []}]}, - msg,"\n"]}}, + msg,"\n"], + chars_limit => CharsLimit}}, FileConf = fun(Filename) -> #{type => wrap, file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename),