From fa1adf5cfb65005a4f9d6f5a2d54eb7ec1754006 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 16 Aug 2019 18:14:48 +0800 Subject: [PATCH 1/9] Fix Message-Expiry-Interval not working --- src/emqx_session.erl | 8 ++- test/emqx_msg_expiry_interval_SUITE.erl | 91 +++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 test/emqx_msg_expiry_interval_SUITE.erl diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 2530f3a42..855cd030e 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -465,7 +465,13 @@ dequeue(Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of {empty, _Q} -> {Msgs, Q}; {{value, Msg}, Q1} -> - dequeue(Cnt-1, [Msg|Msgs], Q1) + case emqx_message:is_expired(Msg) of + true -> + ok = emqx_metrics:inc('messages.expired'), + dequeue(Cnt-1, Msgs, Q1); + false -> + dequeue(Cnt-1, [Msg|Msgs], Q1) + end end. batch_n(Inflight) -> diff --git a/test/emqx_msg_expiry_interval_SUITE.erl b/test/emqx_msg_expiry_interval_SUITE.erl new file mode 100644 index 000000000..555da9dd7 --- /dev/null +++ b/test/emqx_msg_expiry_interval_SUITE.erl @@ -0,0 +1,91 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_msg_expiry_interval_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_message_expiry_interval_1(_) -> + ClientA = message_expiry_interval_init(), + [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]]. + +t_message_expiry_interval_2(_) -> + ClientA = message_expiry_interval_init(), + [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]]. + +message_expiry_interval_init() -> + {ok, ClientA} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqx_client:connect(ClientA), + {ok, _} = emqx_client:connect(ClientB), + %% subscribe and disconnect client-b + emqx_client:subscribe(ClientB, <<"t/a">>, 1), + emqx_client:stop(ClientB), + ClientA. + +message_expiry_interval_exipred(ClientA, QoS) -> + ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), + %% publish to t/a and waiting for the message expired + emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), + ct:sleep(1000), + + %% resume the session for client-b + {ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqx_client:connect(ClientB1), + + %% verify client-b could not receive the publish message + receive + {publish,#{client_pid := ClientB1, topic := <<"t/a">>}} -> + ct:fail(should_have_expired) + after 300 -> + ok + end, + emqx_client:stop(ClientB1). + +message_expiry_interval_not_exipred(ClientA, QoS) -> + ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), + %% publish to t/a + emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), + + %% wait for 1s and then resume the session for client-b, the message should not expires + %% as Message-Expiry-Interval = 20s + ct:sleep(1000), + {ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqx_client:connect(ClientB1), + + %% verify client-b could receive the publish message and the Message-Expiry-Interval is set + receive + {publish,#{client_pid := ClientB1, topic := <<"t/a">>, + properties := #{'Message-Expiry-Interval' := MsgExpItvl}}} + when MsgExpItvl < 20 -> ok; + {publish, _} = Msg -> + ct:fail({incorrect_publish, Msg}) + after 300 -> + ct:fail(no_publish_received) + end, + emqx_client:stop(ClientB1). From 1667cbd359293c84cb928e117c56a17cf9077147 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Mon, 16 Sep 2019 16:05:06 +0800 Subject: [PATCH 2/9] Fix test cases --- src/emqx_channel.erl | 3 --- src/emqx_packet.erl | 4 ++++ test/emqx_msg_expiry_interval_SUITE.erl | 28 ++++++++++++------------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 1e780eb08..224965fde 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -542,9 +542,6 @@ handle_out({deliver, Delivers}, Channel = #channel{session = Session}) -> {ok, Channel#channel{session = NSession}} end; -handle_out({publish, [Publish]}, Channel) -> - handle_out(Publish, Channel); - handle_out({publish, Publishes}, Channel) when is_list(Publishes) -> Packets = lists:foldl( fun(Publish, Acc) -> diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index b06f526af..1113c15d7 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -110,6 +110,10 @@ check(#mqtt_packet{variable = SubPkt}) when is_record(SubPkt, mqtt_packet_subscr check(#mqtt_packet{variable = UnsubPkt}) when is_record(UnsubPkt, mqtt_packet_unsubscribe) -> check(UnsubPkt); +check(#mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias':= _TopicAlias}}) -> + ok; +check(#mqtt_packet_publish{topic_name = <<>>, properties = #{}}) -> + {error, ?RC_PROTOCOL_ERROR}; check(#mqtt_packet_publish{topic_name = TopicName, properties = Props}) -> try emqx_topic:validate(name, TopicName) of true -> check_pub_props(Props) diff --git a/test/emqx_msg_expiry_interval_SUITE.erl b/test/emqx_msg_expiry_interval_SUITE.erl index 555da9dd7..f3170b718 100644 --- a/test/emqx_msg_expiry_interval_SUITE.erl +++ b/test/emqx_msg_expiry_interval_SUITE.erl @@ -39,24 +39,24 @@ t_message_expiry_interval_2(_) -> [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]]. message_expiry_interval_init() -> - {ok, ClientA} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, ClientB} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqx_client:connect(ClientA), - {ok, _} = emqx_client:connect(ClientB), + {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientA), + {ok, _} = emqtt:connect(ClientB), %% subscribe and disconnect client-b - emqx_client:subscribe(ClientB, <<"t/a">>, 1), - emqx_client:stop(ClientB), + emqtt:subscribe(ClientB, <<"t/a">>, 1), + emqtt:stop(ClientB), ClientA. message_expiry_interval_exipred(ClientA, QoS) -> ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), %% publish to t/a and waiting for the message expired - emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), + emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), ct:sleep(1000), %% resume the session for client-b - {ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqx_client:connect(ClientB1), + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientB1), %% verify client-b could not receive the publish message receive @@ -65,18 +65,18 @@ message_expiry_interval_exipred(ClientA, QoS) -> after 300 -> ok end, - emqx_client:stop(ClientB1). + emqtt:stop(ClientB1). message_expiry_interval_not_exipred(ClientA, QoS) -> ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), %% publish to t/a - emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), + emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), %% wait for 1s and then resume the session for client-b, the message should not expires %% as Message-Expiry-Interval = 20s ct:sleep(1000), - {ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqx_client:connect(ClientB1), + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientB1), %% verify client-b could receive the publish message and the Message-Expiry-Interval is set receive @@ -88,4 +88,4 @@ message_expiry_interval_not_exipred(ClientA, QoS) -> after 300 -> ct:fail(no_publish_received) end, - emqx_client:stop(ClientB1). + emqtt:stop(ClientB1). From 600cd11f1fe988af0d7219822dcb1326f1130b6d Mon Sep 17 00:00:00 2001 From: zhouzb Date: Mon, 16 Sep 2019 17:22:50 +0800 Subject: [PATCH 3/9] Rename connection to conninfo --- src/emqx_connection.erl | 2 +- src/emqx_ws_connection.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 302a1b4c5..f4a11bcbc 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -135,7 +135,7 @@ attrs(CPid) when is_pid(CPid) -> attrs(Conn = #connection{chan_state = ChanState}) -> ConnAttrs = info(?ATTR_KEYS, Conn), ChanAttrs = emqx_channel:attrs(ChanState), - maps:merge(ChanAttrs, #{connection => maps:from_list(ConnAttrs)}). + maps:merge(ChanAttrs, #{conninfo => maps:from_list(ConnAttrs)}). %% @doc Get stats of the channel. -spec(stats(pid()|connection()) -> emqx_types:stats()). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index f9527634c..6ed7cca63 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -76,7 +76,7 @@ info(WsPid) when is_pid(WsPid) -> info(WsConn = #ws_connection{chan_state = ChanState}) -> ConnInfo = info(?INFO_KEYS, WsConn), ChanInfo = emqx_channel:info(ChanState), - maps:merge(ChanInfo, #{connection => maps:from_list(ConnInfo)}). + maps:merge(ChanInfo, #{conninfo => maps:from_list(ConnInfo)}). info(Keys, WsConn) when is_list(Keys) -> [{Key, info(Key, WsConn)} || Key <- Keys]; From e35eaa97b365645352647682d814f36e6329667d Mon Sep 17 00:00:00 2001 From: zhouzb Date: Mon, 16 Sep 2019 17:36:22 +0800 Subject: [PATCH 4/9] Fix test case --- test/emqx_client_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index 859406dd8..19557143b 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -97,6 +97,7 @@ t_cm(_) -> ClientId = <<"myclient">>, {ok, C} = emqtt:start_link([{client_id, ClientId}]), {ok, _} = emqtt:connect(C), + ct:sleep(50), #{client := #{client_id := ClientId}} = emqx_cm:get_chan_attrs(ClientId), emqtt:subscribe(C, <<"mytopic">>, 0), ct:sleep(1200), From 0c7c72001865db4cc9910eb413e9faff82c8e3f7 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Mon, 16 Sep 2019 17:51:36 +0800 Subject: [PATCH 5/9] Add delay in test case --- test/emqx_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index 96c76b3e6..563683bf1 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -50,6 +50,7 @@ t_emqx_pubsub_api(_) -> Payload = <<"Hello World">>, Topic1 = <<"mytopic1">>, emqx:subscribe(Topic, ClientId), + ct:sleep(100), ?assertEqual([Topic], emqx:topics()), ?assertEqual([self()], emqx:subscribers(Topic)), ?assertEqual([{Topic,#{qos => 0,subid => ClientId}}], emqx:subscriptions(self())), From decdce2ae26009a9237a03ac03bffd60b3c19a23 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Mon, 16 Sep 2019 18:54:34 +0800 Subject: [PATCH 6/9] Fix unused variable --- src/emqx_sys_mon.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index f63dd95f6..7d6e5dd6c 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -166,7 +166,7 @@ code_change(_OldVsn, State, _Extra) -> handle_partition_event({partition, {occurred, Node}}) -> alarm_handler:set_alarm({partitioned, Node}); -handle_partition_event({partition, {healed, Node}}) -> +handle_partition_event({partition, {healed, _Node}}) -> alarm_handler:clear_alarm(partitioned). suppress(Key, SuccFun, State = #{events := Events}) -> From 46659854da26368a06f4345494ca978d83b60a58 Mon Sep 17 00:00:00 2001 From: turtled Date: Mon, 16 Sep 2019 20:25:19 +0800 Subject: [PATCH 7/9] Fix test cases --- test/emqx_msg_expiry_interval_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/emqx_msg_expiry_interval_SUITE.erl b/test/emqx_msg_expiry_interval_SUITE.erl index f3170b718..0da3efde1 100644 --- a/test/emqx_msg_expiry_interval_SUITE.erl +++ b/test/emqx_msg_expiry_interval_SUITE.erl @@ -24,6 +24,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. From a9dd94b2b5402cbd65d8940f221e0e9c20736882 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Wed, 18 Sep 2019 17:38:51 +0800 Subject: [PATCH 8/9] Improve mechanism of waiting for session to expire --- src/emqx_channel.erl | 68 ++++++++++++++++++----------------- src/emqx_connection.erl | 71 +++++++++++++++++++------------------ src/emqx_ws_connection.erl | 70 +++++++++++++++++++++--------------- test/emqx_channel_SUITE.erl | 2 +- 4 files changed, 114 insertions(+), 97 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 224965fde..2faadce62 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -328,32 +328,25 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(?PACKET(?PINGREQ), Channel) -> {ok, ?PACKET(?PINGRESP), Channel}; -handle_in(?DISCONNECT_PACKET(RC, Properties), Channel = #channel{session = Session, protocol = Protocol}) -> +handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{session = Session, protocol = Protocol}) -> OldInterval = emqx_session:info(expiry_interval, Session), Interval = get_property('Session-Expiry-Interval', Properties, OldInterval), case OldInterval =:= 0 andalso Interval =/= OldInterval of true -> handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); false -> - Channel1 = case RC of - ?RC_SUCCESS -> Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)}; - _ -> Channel - end, - Channel2 = Channel1#channel{session = emqx_session:update_expiry_interval(Interval, Session)}, - case Interval of - ?UINT_MAX -> - {ok, ensure_timer(will_timer, Channel2)}; - Int when Int > 0 -> - {ok, ensure_timer([will_timer, expire_timer], Channel2)}; - _Other -> - Reason = case RC of - ?RC_SUCCESS -> normal; - _ -> - Ver = emqx_protocol:info(proto_ver, Protocol), - emqx_reason_codes:name(RC, Ver) - end, - {stop, {shutdown, Reason}, Channel2} - end + Reason = case ReasonCode of + ?RC_SUCCESS -> normal; + _ -> + ProtoVer = emqx_protocol:info(proto_ver, Protocol), + emqx_reason_codes:name(ReasonCode, ProtoVer) + end, + {wait_session_expire, {shutdown, Reason}, + Channel#channel{session = emqx_session:update_expiry_interval(Interval, Session), + protocol = case ReasonCode of + ?RC_SUCCESS -> emqx_protocol:clear_will_msg(Protocol); + _ -> Protocol + end}} end; handle_in(?AUTH_PACKET(), Channel) -> @@ -362,7 +355,7 @@ handle_in(?AUTH_PACKET(), Channel) -> handle_in(Packet, Channel) -> ?LOG(error, "Unexpected incoming: ~p", [Packet]), - {stop, {shutdown, unexpected_incoming_packet}, Channel}. + handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel). %%-------------------------------------------------------------------- %% Process Connect @@ -599,10 +592,10 @@ handle_out({disconnect, ReasonCode}, Channel = #channel{protocol = Protocol}) -> ?MQTT_PROTO_V5 -> Reason = emqx_reason_codes:name(ReasonCode), Packet = ?DISCONNECT_PACKET(ReasonCode), - {stop, {shutdown, Reason}, Packet, Channel}; + {wait_session_expire, {shutdown, Reason}, Packet, Channel}; ProtoVer -> Reason = emqx_reason_codes:name(ReasonCode, ProtoVer), - {stop, {shutdown, Reason}, Channel} + {wait_session_expire, {shutdown, Reason}, Channel} end; handle_out({Type, Data}, Channel) -> @@ -674,18 +667,26 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) -> handle_info(disconnected, Channel = #channel{connected = undefined}) -> shutdown(closed, Channel); +handle_info(disconnected, Channel = #channel{connected = false}) -> + {ok, Channel}; + handle_info(disconnected, Channel = #channel{protocol = Protocol, session = Session}) -> - %% TODO: Why handle will_msg here? - publish_will_msg(emqx_protocol:info(will_msg, Protocol)), - NChannel = Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)}, - Interval = emqx_session:info(expiry_interval, Session), - case Interval of + Channel1 = ensure_disconnected(Channel), + Channel2 = case timer:seconds(emqx_protocol:info(will_delay_interval, Protocol)) of + 0 -> + publish_will_msg(emqx_protocol:info(will_msg, Protocol)), + Channel1#channel{protocol = emqx_protocol:clear_will_msg(Protocol)}; + _ -> + ensure_timer(will_timer, Channel1) + end, + case emqx_session:info(expiry_interval, Session) of ?UINT_MAX -> - {ok, ensure_disconnected(NChannel)}; + {ok, Channel2}; Int when Int > 0 -> - {ok, ensure_timer(expire_timer, ensure_disconnected(NChannel))}; - _Other -> shutdown(closed, NChannel) + {ok, ensure_timer(expire_timer, Channel2)}; + _Other -> + shutdown(closed, Channel2) end; handle_info(Info, Channel) -> @@ -715,7 +716,7 @@ timeout(TRef, {keepalive, StatVal}, Channel = #channel{keepalive = Keepalive, NChannel = Channel#channel{keepalive = NKeepalive}, {ok, reset_timer(alive_timer, NChannel)}; {error, timeout} -> - {stop, {shutdown, keepalive_timeout}, Channel} + {wait_session_expire, {shutdown, keepalive_timeout}, Channel} end; timeout(TRef, retry_delivery, Channel = #channel{session = Session, @@ -804,6 +805,9 @@ interval(will_timer, #channel{protocol = Protocol}) -> terminate(normal, #channel{client = Client}) -> ok = emqx_hooks:run('client.disconnected', [Client, normal]); +terminate({shutdown, Reason}, #channel{client = Client}) + when Reason =:= kicked orelse Reason =:= discarded orelse Reason =:= takeovered -> + ok = emqx_hooks:run('client.disconnected', [Client, Reason]); terminate(Reason, #channel{client = Client, protocol = Protocol }) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index f4a11bcbc..338d3e85d 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -224,10 +224,13 @@ idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end, handle_incoming(Packet, SuccFun, NState); -idle(cast, {incoming, Packet}, State) -> +idle(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> ?LOG(warning, "Unexpected incoming: ~p", [Packet]), shutdown(unexpected_incoming_packet, State); +idle(cast, {incoming, {error, Reason}}, State) -> + shutdown(Reason, State); + idle(EventType, Content, State) -> ?HANDLE(EventType, Content, State). @@ -241,6 +244,17 @@ connected(enter, _PrevSt, State) -> connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> handle_incoming(Packet, fun keep_state/1, State); +connected(cast, {incoming, {error, Reason}}, State = #connection{chan_state = ChanState}) -> + case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of + {wait_session_expire, _, NChanState} -> + ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]), + {next_state, disconnected, State#connection{chan_state= NChanState}}; + {wait_session_expire, _, OutPackets, NChanState} -> + ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]), + NState = State#connection{chan_state= NChanState}, + {next_state, disconnected, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)} + end; + connected(info, Deliver = {deliver, _Topic, _Msg}, State) -> handle_deliver(emqx_misc:drain_deliver([Deliver]), State); @@ -408,8 +422,7 @@ process_incoming(Data, State) -> process_incoming(<<>>, Packets, State) -> {keep_state, State, next_incoming_events(Packets)}; -process_incoming(Data, Packets, State = #connection{parse_state = ParseState, - chan_state = ChanState}) -> +process_incoming(Data, Packets, State = #connection{parse_state = ParseState}) -> try emqx_frame:parse(Data, ParseState) of {more, NParseState} -> NState = State#connection{parse_state = NParseState}, @@ -418,32 +431,16 @@ process_incoming(Data, Packets, State = #connection{parse_state = ParseState, NState = State#connection{parse_state = NParseState}, process_incoming(Rest, [Packet|Packets], NState); {error, Reason} -> - shutdown(Reason, State) + {keep_state, State, next_incoming_events({error, Reason})} catch error:Reason:Stk -> - ?LOG(error, "Parse failed for ~p~nStacktrace:~p~nError data:~p", [Reason, Stk, Data]), - Result = - case emqx_channel:info(connected, ChanState) of - undefined -> - emqx_channel:handle_out({connack, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState); - true -> - emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState); - _ -> - ignore - end, - case Result of - {stop, Reason0, OutPackets, NChanState} -> - Shutdown = fun(NewSt) -> stop(Reason0, NewSt) end, - NState = State#connection{chan_state = NChanState}, - handle_outgoing(OutPackets, Shutdown, NState); - {stop, Reason0, NChanState} -> - stop(Reason0, State#connection{chan_state = NChanState}); - ignore -> - keep_state(State) - end + ?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nError data:~p", [Reason, Stk, Data]), + {keep_state, State, next_incoming_events({error, Reason})} end. -compile({inline, [next_incoming_events/1]}). +next_incoming_events({error, Reason}) -> + [next_event(cast, {incoming, {error, Reason}})]; next_incoming_events(Packets) -> [next_event(cast, {incoming, Packet}) || Packet <- Packets]. @@ -459,14 +456,19 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun, {ok, NChanState} -> SuccFun(State#connection{chan_state= NChanState}); {ok, OutPackets, NChanState} -> - handle_outgoing(OutPackets, SuccFun, - State#connection{chan_state = NChanState}); + handle_outgoing(OutPackets, SuccFun, State#connection{chan_state = NChanState}); + {wait_session_expire, Reason, NChanState} -> + ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]), + {next_state, disconnected, State#connection{chan_state = NChanState}}; + {wait_session_expire, Reason, OutPackets, NChanState} -> + ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]), + NState = State#connection{chan_state= NChanState}, + {next_state, disconnected, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)}; {stop, Reason, NChanState} -> stop(Reason, State#connection{chan_state = NChanState}); {stop, Reason, OutPackets, NChanState} -> - Shutdown = fun(NewSt) -> stop(Reason, NewSt) end, - NState = State#connection{chan_state = NChanState}, - handle_outgoing(OutPackets, Shutdown, NState) + NState = State#connection{chan_state= NChanState}, + stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)) end. %%------------------------------------------------------------------- @@ -477,10 +479,7 @@ handle_deliver(Delivers, State = #connection{chan_state = ChanState}) -> {ok, NChanState} -> keep_state(State#connection{chan_state = NChanState}); {ok, Packets, NChanState} -> - NState = State#connection{chan_state = NChanState}, - handle_outgoing(Packets, fun keep_state/1, NState); - {stop, Reason, NChanState} -> - stop(Reason, State#connection{chan_state = NChanState}) + handle_outgoing(Packets, fun keep_state/1, State#connection{chan_state = NChanState}) end. %%-------------------------------------------------------------------- @@ -534,8 +533,10 @@ handle_timeout(TRef, Msg, State = #connection{chan_state = ChanState}) -> {ok, NChanState} -> keep_state(State#connection{chan_state = NChanState}); {ok, Packets, NChanState} -> - handle_outgoing(Packets, fun keep_state/1, - State#connection{chan_state = NChanState}); + handle_outgoing(Packets, fun keep_state/1, State#connection{chan_state = NChanState}); + {wait_session_expire, Reason, NChanState} -> + ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]), + {next_state, disconnected, State#connection{chan_state = NChanState}}; {stop, Reason, NChanState} -> stop(Reason, State#connection{chan_state = NChanState}) end. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 6ed7cca63..c99829356 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -254,6 +254,22 @@ websocket_info({cast, Msg}, State = #ws_connection{chan_state = ChanState}) -> stop(Reason, State#ws_connection{chan_state = NChanState}) end; +websocket_info({incoming, {error, Reason}}, State = #ws_connection{fsm_state = idle}) -> + stop({shutdown, Reason}, State); + +websocket_info({incoming, {error, Reason}}, State = #ws_connection{fsm_state = connected, chan_state = ChanState}) -> + case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of + {wait_session_expire, _, NChanState} -> + ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]), + disconnected(State#ws_connection{chan_state= NChanState}); + {wait_session_expire, _, OutPackets, NChanState} -> + ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]), + disconnected(enqueue(OutPackets, State#ws_connection{chan_state = NChanState})) + end; + +websocket_info({incoming, {error, _Reason}}, State = #ws_connection{fsm_state = disconnected}) -> + reply(State); + websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State = #ws_connection{fsm_state = idle}) -> #mqtt_packet_connect{proto_ver = ProtoVer, properties = Properties} = ConnPkt, @@ -276,9 +292,7 @@ websocket_info(Deliver = {deliver, _Topic, _Msg}, {ok, NChanState} -> reply(State#ws_connection{chan_state = NChanState}); {ok, Packets, NChanState} -> - reply(enqueue(Packets, State#ws_connection{chan_state = NChanState})); - {stop, Reason, NChanState} -> - stop(Reason, State#ws_connection{chan_state = NChanState}) + reply(enqueue(Packets, State#ws_connection{chan_state = NChanState})) end; websocket_info({timeout, TRef, keepalive}, State) when is_reference(TRef) -> @@ -307,8 +321,7 @@ websocket_info(Info, State = #ws_connection{chan_state = ChanState}) -> terminate(SockError, _Req, #ws_connection{chan_state = ChanState, stop_reason = Reason}) -> - ?LOG(debug, "Terminated for ~p, sockerror: ~p", - [Reason, SockError]), + ?LOG(debug, "Terminated for ~p, sockerror: ~p", [Reason, SockError]), emqx_channel:terminate(Reason, ChanState). %%-------------------------------------------------------------------- @@ -318,6 +331,12 @@ connected(State = #ws_connection{chan_state = ChanState}) -> ok = emqx_channel:handle_cast({register, attrs(State), stats(State)}, ChanState), reply(State#ws_connection{fsm_state = connected}). +%%-------------------------------------------------------------------- +%% Disconnected callback + +disconnected(State) -> + reply(State#ws_connection{fsm_state = disconnected}). + %%-------------------------------------------------------------------- %% Handle timeout @@ -328,6 +347,9 @@ handle_timeout(TRef, Msg, State = #ws_connection{chan_state = ChanState}) -> {ok, Packets, NChanState} -> NState = State#ws_connection{chan_state = NChanState}, reply(enqueue(Packets, NState)); + {wait_session_expire, Reason, NChanState} -> + ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]), + disconnected(State#ws_connection{chan_state = NChanState}); {stop, Reason, NChanState} -> stop(Reason, State#ws_connection{chan_state = NChanState}) end. @@ -347,29 +369,13 @@ process_incoming(Data, State = #ws_connection{parse_state = ParseState, self() ! {incoming, Packet}, process_incoming(Rest, State#ws_connection{parse_state = NParseState}); {error, Reason} -> - ?LOG(error, "Frame error: ~p", [Reason]), - stop(Reason, State) + self() ! {incoming, {error, Reason}}, + {ok, State} catch error:Reason:Stk -> - ?LOG(error, "Parse failed for ~p~nStacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]), - Result = - case emqx_channel:info(connected, ChanState) of - undefined -> - emqx_channel:handle_out({connack, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState); - true -> - emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState); - _ -> - ignore - end, - case Result of - {stop, Reason0, OutPackets, NChanState} -> - NState = State#ws_connection{chan_state = NChanState}, - stop(Reason0, enqueue(OutPackets, NState)); - {stop, Reason0, NChanState} -> - stop(Reason0, State#ws_connection{chan_state = NChanState}); - ignore -> - {ok, State} - end + ?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nFrame data: ~p", [Reason, Stk, Data]), + self() ! {incoming, {error, Reason}}, + {ok, State} end. %%-------------------------------------------------------------------- @@ -386,11 +392,17 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun, {ok, OutPackets, NChanState} -> NState = State#ws_connection{chan_state= NChanState}, SuccFun(enqueue(OutPackets, NState)); + {wait_session_expire, Reason, NChanState} -> + ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]), + disconnected(State#ws_connection{chan_state = NChanState}); + {wait_session_expire, Reason, OutPackets, NChanState} -> + ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]), + disconnected(enqueue(OutPackets, State#ws_connection{chan_state = NChanState})); {stop, Reason, NChanState} -> - stop(Reason, State#ws_connection{chan_state= NChanState}); - {stop, Reason, OutPacket, NChanState} -> + stop(Reason, State#ws_connection{chan_state = NChanState}); + {stop, Reason, OutPackets, NChanState} -> NState = State#ws_connection{chan_state= NChanState}, - stop(Reason, enqueue(OutPacket, NState)) + stop(Reason, enqueue(OutPackets, NState)) end. %%-------------------------------------------------------------------- diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 82f52c11b..977cd397e 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -144,7 +144,7 @@ t_handle_pingreq(_) -> t_handle_disconnect(_) -> with_channel( fun(Channel) -> - {stop, {shutdown, normal}, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel), + {wait_session_expire, {shutdown, normal}, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel), ?assertMatch(#{will_msg := undefined}, emqx_channel:info(protocol, Channel1)) end). From 24bfaa768d2c1082e814daa63fc1c0b97772824b Mon Sep 17 00:00:00 2001 From: zhouzb Date: Thu, 19 Sep 2019 13:11:50 +0800 Subject: [PATCH 9/9] Call emqx_flapping:detect and generate alarm when flapping is detected --- src/emqx_channel.erl | 4 +++- src/emqx_flapping.erl | 20 ++++++++++++++------ test/emqx_flapping_SUITE.erl | 10 ++++++---- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 2faadce62..f317dbd8d 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -671,7 +671,9 @@ handle_info(disconnected, Channel = #channel{connected = false}) -> {ok, Channel}; handle_info(disconnected, Channel = #channel{protocol = Protocol, - session = Session}) -> + session = Session, + client = Client = #{zone := Zone}}) -> + emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(Client), Channel1 = ensure_disconnected(Channel), Channel2 = case timer:seconds(emqx_protocol:info(will_delay_interval, Protocol)) of 0 -> diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index 6e5f98c14..ee898acbd 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -141,11 +141,11 @@ handle_cast({detected, Flapping = #flapping{client_id = ClientId, %% Log first ?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms", [ClientId, esockd_net:format(Peername), DetectCnt, Duration]), - %% TODO: Send Alarm %% Banned. BannedFlapping = Flapping#flapping{client_id = {banned, ClientId}, banned_at = emqx_time:now_ms() }, + alarm_handler:set_alarm({{flapping_detected, ClientId}, BannedFlapping}), ets:insert(?FLAPPING_TAB, BannedFlapping); false -> ?LOG(warning, "~s(~s) disconnected ~w times in ~wms", @@ -189,9 +189,17 @@ with_flapping_tab(Fun, Args) -> end. expire_flapping(NowTime, #{duration := Duration, banned_interval := Interval}) -> - ets:select_delete(?FLAPPING_TAB, - [{#flapping{started_at = '$1', banned_at = undefined, _ = '_'}, - [{'<', '$1', NowTime-Duration}], [true]}, - {#flapping{client_id = {banned, '_'}, banned_at = '$1', _ = '_'}, - [{'<', '$1', NowTime-Interval}], [true]}]). + case ets:select(?FLAPPING_TAB, + [{#flapping{started_at = '$1', banned_at = undefined, _ = '_'}, + [{'<', '$1', NowTime-Duration}], ['$_']}, + {#flapping{client_id = {banned, '_'}, banned_at = '$1', _ = '_'}, + [{'<', '$1', NowTime-Interval}], ['$_']}]) of + [] -> ok; + Flappings -> + lists:foreach(fun(Flapping = #flapping{client_id = {banned, ClientId}}) -> + ets:delete_object(?FLAPPING_TAB, Flapping), + alarm_handler:clear_alarm({flapping_detected, ClientId}); + (_) -> ok + end, Flappings) + end. diff --git a/test/emqx_flapping_SUITE.erl b/test/emqx_flapping_SUITE.erl index 493a57b6e..cb5d0321c 100644 --- a/test/emqx_flapping_SUITE.erl +++ b/test/emqx_flapping_SUITE.erl @@ -22,22 +22,24 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - prepare_env(), + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([], fun set_special_configs/1), Config. -prepare_env() -> +set_special_configs(emqx) -> emqx_zone:set_env(external, enable_flapping_detect, true), application:set_env(emqx, flapping_detect_policy, #{threshold => 3, duration => 100, banned_interval => 200 - }). + }); +set_special_configs(_App) -> ok. end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]), ok. t_detect_check(_) -> - {ok, _Pid} = emqx_flapping:start_link(), Client = #{zone => external, client_id => <<"clientid">>, peername => {{127,0,0,1}, 5000}