From fdd9377a65e0eef8cbb8b5f7c4ee90d3127561e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Sat, 8 Sep 2018 13:35:23 +0800 Subject: [PATCH 01/17] Retain flag in retained message must set to 1 --- src/emqx_session.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 05142297e..8e6e150c5 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -726,9 +726,10 @@ run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State); run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) -> run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State); +run_dispatch_steps([{rap, _Rap}|Steps], Msg = #message{flags = Flags, headers = #{retained := true}}, State = #state{}) -> + run_dispatch_steps(Steps, Msg#message{flags = maps:put(retain, true, Flags)}, State); run_dispatch_steps([{rap, 0}|Steps], Msg = #message{flags = Flags}, State = #state{}) -> - Flags1 = maps:put(retain, false, Flags), - run_dispatch_steps(Steps, Msg#message{flags = Flags1}, State); + run_dispatch_steps(Steps, Msg#message{flags = maps:put(retain, false, Flags)}, State); run_dispatch_steps([{rap, _}|Steps], Msg, State) -> run_dispatch_steps(Steps, Msg, State); run_dispatch_steps([{subid, SubId}|Steps], Msg, State) -> From 758d18e21f3321950b9d276ce96d05fb4b6f7d9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Sat, 8 Sep 2018 18:05:04 +0800 Subject: [PATCH 02/17] no message --- src/emqx_reason_codes.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index fdc1377ec..75118b563 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -139,4 +139,4 @@ compat(connack, 16#9D) -> ?CONNACK_SERVER; compat(connack, 16#9F) -> ?CONNACK_SERVER; compat(suback, Code) when Code =< ?QOS2 -> Code; -compat(suback, Code) when Code > 16#80 -> 16#80. +compat(suback, Code) when Code >= 16#80 -> 16#80. From 5b47df163188e0a03bdc2f9a1ef8aa424a664217 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Sat, 8 Sep 2018 18:59:57 +0800 Subject: [PATCH 03/17] Add run hook when duplicated subscription --- src/emqx_session.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 8e6e150c5..118ab6e21 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -444,6 +444,7 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) -> {[QoS|RcAcc], case maps:find(Topic, SubMap) of {ok, SubOpts} -> + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]), SubMap; {ok, _SubOpts} -> emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), From d29069a50d590e78b5778a73e000626b0eb6e742 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Wed, 12 Sep 2018 15:17:18 +0800 Subject: [PATCH 04/17] Add feature for issue#1809 --- etc/emqx.conf | 10 ++++++++++ priv/emqx.schema | 13 ++++++++++++- src/emqx_listeners.erl | 10 ++++++++-- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index deb702211..5d54389dc 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1207,6 +1207,11 @@ listener.ssl.external.reuseaddr = true ## Examples: 8083, 127.0.0.1:8083, ::1:8083 listener.ws.external = 8083 +## Whether the client must include "mqtt" in the list of WebSocket Sub Protocols it offers +## +## Value: true | false +listener.ws.external.standard_mqtt = true + ## The acceptor pool for external MQTT/WebSocket listener. ## ## Value: Number @@ -1346,6 +1351,11 @@ listener.ws.external.nodelay = true ## Examples: 8084, 127.0.0.1:8084, ::1:8084 listener.wss.external = 8084 +## Whether the client must include "mqtt" in the list of WebSocket Sub Protocols it offers +## +## Value: true | false +listener.wss.external.standard_mqtt = true + ## The acceptor pool for external MQTT/WebSocket/SSL listener. ## ## Value: Number diff --git a/priv/emqx.schema b/priv/emqx.schema index e9f4932c4..8e8979908 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1094,6 +1094,11 @@ end}. {datatype, [integer, ip]} ]}. +{mapping, "listener.ws.$name.standard_mqtt", "emqx.listeners", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + {mapping, "listener.ws.$name.acceptors", "emqx.listeners", [ {default, 8}, {datatype, integer} @@ -1195,6 +1200,11 @@ end}. {datatype, [integer, ip]} ]}. +{mapping, "listener.wss.$name.standard_mqtt", "emqx.listeners", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + {mapping, "listener.wss.$name.acceptors", "emqx.listeners", [ {default, 8}, {datatype, integer} @@ -1365,7 +1375,8 @@ end}. end, LisOpts = fun(Prefix) -> - Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + Filter([{standard_mqtt, cuttlefish:conf_get(Prefix ++ ".standard_mqtt", Conf, undefined)}, + {acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 421304f3a..127c62b39 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -51,12 +51,12 @@ start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls -> %% Start MQTT/WS listener start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> - Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{subprotocol_name(Options), emqx_ws_connection, Options}]}]), start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch); %% Start MQTT/WSS listener start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> - Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{subprotocol_name(Options), emqx_ws_connection, Options}]}]), start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch). start_mqtt_listener(Name, ListenOn, Options) -> @@ -67,6 +67,12 @@ start_mqtt_listener(Name, ListenOn, Options) -> start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) -> Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). +subprotocol_name(Options) -> + case proplists:get_value(standard_mqtt, Options, true) of + true -> "/mqtt"; + false -> "/" + end. + ranch_opts(Options) -> NumAcceptors = proplists:get_value(acceptors, Options, 4), MaxConnections = proplists:get_value(max_connections, Options, 1024), From 713c43b833a8f812a1daa161fc08fab98e982eaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Thu, 13 Sep 2018 10:31:36 +0800 Subject: [PATCH 05/17] Set default value of message expiry interval for not mqtt 5.0 message --- src/emqx_protocol.erl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 8301cf014..054ec2a57 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -249,6 +249,13 @@ preprocess_properties(Packet = #mqtt_packet{ PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) -> {Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}}; +preprocess_properties(Packet = #mqtt_packet{variable = #mqtt_packet_publish{properties = #{'Message-Expiry-Interval' := _Interval}}}, + PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) -> + {Packet, PState}; +preprocess_properties(Packet = #mqtt_packet{variable = Publish = #mqtt_packet_publish{properties = Properties}}, + PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) -> + {Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{properties = maps:put('Message-Expiry-Interval', 0, Properties)}}, PState}; + preprocess_properties(Packet, PState) -> {Packet, PState}. From e33414aca1375458bff8e8f2d8edd5d877cc29bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 14 Sep 2018 10:10:32 +0800 Subject: [PATCH 06/17] Add customized mqtt path for websocket --- etc/emqx.conf | 12 ++++++------ priv/emqx.schema | 14 +++++++------- src/emqx_listeners.erl | 14 ++++++++------ 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 5d54389dc..db4eedffa 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1207,10 +1207,10 @@ listener.ssl.external.reuseaddr = true ## Examples: 8083, 127.0.0.1:8083, ::1:8083 listener.ws.external = 8083 -## Whether the client must include "mqtt" in the list of WebSocket Sub Protocols it offers +## Define the path you want to add to the end of the URL ## -## Value: true | false -listener.ws.external.standard_mqtt = true +## Value: / | / +listener.ws.external.mqtt_path = /mqtt ## The acceptor pool for external MQTT/WebSocket listener. ## @@ -1351,10 +1351,10 @@ listener.ws.external.nodelay = true ## Examples: 8084, 127.0.0.1:8084, ::1:8084 listener.wss.external = 8084 -## Whether the client must include "mqtt" in the list of WebSocket Sub Protocols it offers +## Define the path you want to add to the end of the URL ## -## Value: true | false -listener.wss.external.standard_mqtt = true +## Value: / | / +listener.wss.external.mqtt_path = /mqtt ## The acceptor pool for external MQTT/WebSocket/SSL listener. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index 8e8979908..cfd8c83a6 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1094,9 +1094,9 @@ end}. {datatype, [integer, ip]} ]}. -{mapping, "listener.ws.$name.standard_mqtt", "emqx.listeners", [ - {default, true}, - {datatype, {enum, [true, false]}} +{mapping, "listener.ws.$name.mqtt_path", "emqx.listeners", [ + {default, "/mqtt"}, + {datatype, string} ]}. {mapping, "listener.ws.$name.acceptors", "emqx.listeners", [ @@ -1200,9 +1200,9 @@ end}. {datatype, [integer, ip]} ]}. -{mapping, "listener.wss.$name.standard_mqtt", "emqx.listeners", [ - {default, true}, - {datatype, {enum, [true, false]}} +{mapping, "listener.wss.$name.mqtt_path", "emqx.listeners", [ + {default, "/mqtt"}, + {datatype, string} ]}. {mapping, "listener.wss.$name.acceptors", "emqx.listeners", [ @@ -1375,7 +1375,7 @@ end}. end, LisOpts = fun(Prefix) -> - Filter([{standard_mqtt, cuttlefish:conf_get(Prefix ++ ".standard_mqtt", Conf, undefined)}, + Filter([{mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)}, {acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 127c62b39..4cd20a091 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -51,12 +51,12 @@ start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls -> %% Start MQTT/WS listener start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> - Dispatch = cowboy_router:compile([{'_', [{subprotocol_name(Options), emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]), start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch); %% Start MQTT/WSS listener start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> - Dispatch = cowboy_router:compile([{'_', [{subprotocol_name(Options), emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]), start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch). start_mqtt_listener(Name, ListenOn, Options) -> @@ -67,10 +67,12 @@ start_mqtt_listener(Name, ListenOn, Options) -> start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) -> Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). -subprotocol_name(Options) -> - case proplists:get_value(standard_mqtt, Options, true) of - true -> "/mqtt"; - false -> "/" +mqtt_path(Options) -> + MQTTPath = proplists:get_value(mqtt_path, Options, "/mqtt"), + case erlang:list_to_bitstring(MQTTPath) of + <<"/">> -> MQTTPath; + <<"/", _/binary>> -> MQTTPath; + _ -> "/mqtt" end. ranch_opts(Options) -> From a38d3578477b1c6a57102fa5c7170aac3cb53fec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 14 Sep 2018 10:40:16 +0800 Subject: [PATCH 07/17] Add test case for last change --- src/emqx_listeners.erl | 6 ++++++ test/emqx_listeners_SUITE.erl | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 4cd20a091..2d9bb4bba 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -22,6 +22,12 @@ -export([restart_listener/1, restart_listener/3]). -export([stop_listener/1, stop_listener/3]). +-ifdef(TEST). + +-export([mqtt_path/1]). + +-endif. + -type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}). %% @doc Start all listeners. diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl index 17181aa31..8d97a8158 100644 --- a/test/emqx_listeners_SUITE.erl +++ b/test/emqx_listeners_SUITE.erl @@ -26,7 +26,8 @@ all() -> [start_stop_listeners, - restart_listeners]. + restart_listeners, + t_mqtt_path]. init_per_suite(Config) -> NewConfig = generate_config(), @@ -49,6 +50,11 @@ restart_listeners(_) -> ok = emqx_listeners:restart(), ok = emqx_listeners:stop(). +t_mqtt_path(_) -> + ?assertEqual("/test", emqx_listeners:mqtt_path([{mqtt_path, "/test"}])), + ?assertEqual("/", emqx_listeners:mqtt_path([{mqtt_path, "/"}])), + ?assertEqual("/mqtt", emqx_listeners:mqtt_path([{mqtt_path, "test"}])). + generate_config() -> Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]), From c7928235c3cdd064b969ea556f5815470cb77e81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 14 Sep 2018 11:29:38 +0800 Subject: [PATCH 08/17] Remove check for MQTT path, and normalize code --- etc/emqx.conf | 4 ++-- priv/emqx.schema | 4 ++-- src/emqx_listeners.erl | 13 +------------ src/emqx_protocol.erl | 7 ------- test/emqx_listeners_SUITE.erl | 8 +------- 5 files changed, 6 insertions(+), 30 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index db4eedffa..4a7c195d2 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1207,7 +1207,7 @@ listener.ssl.external.reuseaddr = true ## Examples: 8083, 127.0.0.1:8083, ::1:8083 listener.ws.external = 8083 -## Define the path you want to add to the end of the URL +## The path of WebSocket MQTT endpoint ## ## Value: / | / listener.ws.external.mqtt_path = /mqtt @@ -1351,7 +1351,7 @@ listener.ws.external.nodelay = true ## Examples: 8084, 127.0.0.1:8084, ::1:8084 listener.wss.external = 8084 -## Define the path you want to add to the end of the URL +## The path of WebSocket MQTT endpoint ## ## Value: / | / listener.wss.external.mqtt_path = /mqtt diff --git a/priv/emqx.schema b/priv/emqx.schema index cfd8c83a6..a8ed316c2 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1375,8 +1375,8 @@ end}. end, LisOpts = fun(Prefix) -> - Filter([{mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)}, - {acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + {mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)}, {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 2d9bb4bba..d3ef43069 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -22,12 +22,6 @@ -export([restart_listener/1, restart_listener/3]). -export([stop_listener/1, stop_listener/3]). --ifdef(TEST). - --export([mqtt_path/1]). - --endif. - -type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}). %% @doc Start all listeners. @@ -74,12 +68,7 @@ start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) -> Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). mqtt_path(Options) -> - MQTTPath = proplists:get_value(mqtt_path, Options, "/mqtt"), - case erlang:list_to_bitstring(MQTTPath) of - <<"/">> -> MQTTPath; - <<"/", _/binary>> -> MQTTPath; - _ -> "/mqtt" - end. + proplists:get_value(mqtt_path, Options, "/mqtt"). ranch_opts(Options) -> NumAcceptors = proplists:get_value(acceptors, Options, 4), diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 054ec2a57..8301cf014 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -249,13 +249,6 @@ preprocess_properties(Packet = #mqtt_packet{ PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) -> {Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}}; -preprocess_properties(Packet = #mqtt_packet{variable = #mqtt_packet_publish{properties = #{'Message-Expiry-Interval' := _Interval}}}, - PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) -> - {Packet, PState}; -preprocess_properties(Packet = #mqtt_packet{variable = Publish = #mqtt_packet_publish{properties = Properties}}, - PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) -> - {Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{properties = maps:put('Message-Expiry-Interval', 0, Properties)}}, PState}; - preprocess_properties(Packet, PState) -> {Packet, PState}. diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl index 8d97a8158..17181aa31 100644 --- a/test/emqx_listeners_SUITE.erl +++ b/test/emqx_listeners_SUITE.erl @@ -26,8 +26,7 @@ all() -> [start_stop_listeners, - restart_listeners, - t_mqtt_path]. + restart_listeners]. init_per_suite(Config) -> NewConfig = generate_config(), @@ -50,11 +49,6 @@ restart_listeners(_) -> ok = emqx_listeners:restart(), ok = emqx_listeners:stop(). -t_mqtt_path(_) -> - ?assertEqual("/test", emqx_listeners:mqtt_path([{mqtt_path, "/test"}])), - ?assertEqual("/", emqx_listeners:mqtt_path([{mqtt_path, "/"}])), - ?assertEqual("/mqtt", emqx_listeners:mqtt_path([{mqtt_path, "test"}])). - generate_config() -> Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]), From 49ed6f800c68639aba8e9bff5707769ec0e3cf95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 14 Sep 2018 11:40:57 +0800 Subject: [PATCH 09/17] Change comments --- etc/emqx.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 4a7c195d2..2bba8d024 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1209,7 +1209,7 @@ listener.ws.external = 8083 ## The path of WebSocket MQTT endpoint ## -## Value: / | / +## Value: URL Path listener.ws.external.mqtt_path = /mqtt ## The acceptor pool for external MQTT/WebSocket listener. @@ -1353,7 +1353,7 @@ listener.wss.external = 8084 ## The path of WebSocket MQTT endpoint ## -## Value: / | / +## Value: URL Path listener.wss.external.mqtt_path = /mqtt ## The acceptor pool for external MQTT/WebSocket/SSL listener. From 073bf481c9648b0e54d5ee56257f8705b36532c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Thu, 20 Sep 2018 15:51:28 +0800 Subject: [PATCH 10/17] Calculate the 1.5 keep alive time exactly --- etc/emqx.conf | 4 +-- priv/emqx.schema | 2 +- src/emqx_connection.erl | 21 ++++++-------- src/emqx_keepalive.erl | 56 ++++++++++++++------------------------ src/emqx_protocol.erl | 2 +- src/emqx_ws_connection.erl | 18 ++++++------ 6 files changed, 41 insertions(+), 62 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 3604f2efd..ea90b8d28 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -591,10 +591,10 @@ zone.external.enable_stats = on ## zone.external.server_keepalive = 0 ## The backoff for MQTT keepalive timeout. The broker will kick a connection out -## until 'Keepalive * backoff * 2' timeout. +## until 'Keepalive * backoff' timeout. ## ## Value: Float > 0.5 -zone.external.keepalive_backoff = 0.75 +zone.external.keepalive_backoff = 1.5 ## Maximum number of subscriptions allowed, 0 means no limit. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index a8ed316c2..77167b440 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -748,7 +748,7 @@ end}. %% @doc Keepalive backoff {mapping, "zone.$name.keepalive_backoff", "emqx.zones", [ - {default, 0.75}, + {default, 1.5}, {datatype, float} ]}. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index a7f71d9aa..139011891 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -45,7 +45,8 @@ rate_limit, publish_limit, limit_timer, - idle_timeout + idle_timeout, + last_packet_ts = 0 }). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -243,23 +244,17 @@ handle_info({inet_reply, _Sock, ok}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); -handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) -> +handle_info({keepalive, start, Interval}, State) -> ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), - StatFun = fun() -> - case Transport:getstat(Socket, [recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; - Error -> Error - end - end, - case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of + case emqx_keepalive:start(Interval, {keepalive, check}) of {ok, KeepAlive} -> {noreply, State#state{keepalive = KeepAlive}}; {error, Error} -> shutdown(Error, State) end; -handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> - case emqx_keepalive:check(KeepAlive) of +handle_info({keepalive, check}, State = #state{keepalive = KeepAlive, last_packet_ts = LastPacketTs}) -> + case emqx_keepalive:check(KeepAlive, LastPacketTs) of {ok, KeepAlive1} -> {noreply, State#state{keepalive = KeepAlive1}}; {error, timeout} -> @@ -296,14 +291,14 @@ code_change(_OldVsn, State, _Extra) -> %% Receive and parse data handle_packet(<<>>, State) -> - {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))}; + {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State#state{last_packet_ts = erlang:system_time(millisecond)})))}; handle_packet(Data, State = #state{proto_state = ProtoState, parser_state = ParserState, idle_timeout = IdleTimeout}) -> case catch emqx_frame:parse(Data, ParserState) of {more, NewParserState} -> - {noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout}; + {noreply, State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}, IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of diff --git a/src/emqx_keepalive.erl b/src/emqx_keepalive.erl index 25740b099..59dbe73b9 100644 --- a/src/emqx_keepalive.erl +++ b/src/emqx_keepalive.erl @@ -14,51 +14,37 @@ -module(emqx_keepalive). --export([start/3, check/1, cancel/1]). +-export([start/2, check/2, cancel/1]). --record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}). +-record(keepalive, {tmsec, tmsg, tref}). -type(keepalive() :: #keepalive{}). -export_type([keepalive/0]). +-define(SWEET_SPOT, 50). % 50ms + %% @doc Start a keepalive --spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}). -start(_, 0, _) -> +-spec(start(integer(), any()) -> {ok, keepalive()}). +start(0, _) -> {ok, #keepalive{}}; -start(StatFun, TimeoutSec, TimeoutMsg) -> - case catch StatFun() of - {ok, StatVal} -> - {ok, #keepalive{statfun = StatFun, statval = StatVal, - tsec = TimeoutSec, tmsg = TimeoutMsg, - tref = timer(TimeoutSec, TimeoutMsg)}}; - {error, Error} -> - {error, Error}; - {'EXIT', Reason} -> - {error, Reason} - end. +start(TimeoutSec, TimeoutMsg) -> + {ok, #keepalive{tmsec = TimeoutSec * 1000, tmsg = TimeoutMsg, tref = timer(TimeoutSec * 1000 + ?SWEET_SPOT, TimeoutMsg)}}. %% @doc Check keepalive, called when timeout... --spec(check(keepalive()) -> {ok, keepalive()} | {error, term()}). -check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) -> - case catch StatFun() of - {ok, NewVal} -> - if NewVal =/= LastVal -> - {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})}; - Repeat < 1 -> - {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})}; - true -> - {error, timeout} - end; - {error, Error} -> - {error, Error}; - {'EXIT', Reason} -> - {error, Reason} +-spec(check(keepalive(), integer()) -> {ok, keepalive()} | {error, term()}). +check(KeepAlive = #keepalive{tmsec = TimeoutMs}, LastPacketTs) -> + TimeDiff = erlang:system_time(millisecond) - LastPacketTs, + case TimeDiff >= TimeoutMs of + true -> + {error, timeout}; + false -> + {ok, resume(KeepAlive, TimeoutMs + ?SWEET_SPOT - TimeDiff)} end. --spec(resume(keepalive()) -> keepalive()). -resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) -> - KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}. +-spec(resume(keepalive(), integer()) -> keepalive()). +resume(KeepAlive = #keepalive{tmsg = TimeoutMsg}, TimeoutMs) -> + KeepAlive#keepalive{tref = timer(TimeoutMs, TimeoutMsg)}. %% @doc Cancel Keepalive -spec(cancel(keepalive()) -> ok). @@ -67,6 +53,6 @@ cancel(#keepalive{tref = TRef}) when is_reference(TRef) -> cancel(_) -> ok. -timer(Secs, Msg) -> - erlang:send_after(timer:seconds(Secs), self(), Msg). +timer(Millisecond, Msg) -> + erlang:send_after(Millisecond, self(), Msg). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 8301cf014..9d548f066 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -607,7 +607,7 @@ check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}, _PState) -> case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of true -> ok; - false -> {error, ?RC_PROTOCOL_ERROR} + false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} end. %% MQTT3.1 does not allow null clientId diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index fa08fa1bb..3a2d4fdae 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -40,7 +40,8 @@ keepalive, enable_stats, stats_timer, - shutdown + shutdown, + last_packet_ts = 0 }). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). @@ -152,13 +153,10 @@ send_fun(WsPid) -> WsPid ! {binary, iolist_to_binary(Data)} end. -stat_fun() -> - fun() -> {ok, get(recv_oct)} end. - websocket_handle({binary, <<>>}, State) -> - {ok, ensure_stats_timer(State)}; + {ok, ensure_stats_timer(State#state{last_packet_ts = erlang:system_time(millisecond)})}; websocket_handle({binary, [<<>>]}, State) -> - {ok, ensure_stats_timer(State)}; + {ok, ensure_stats_timer(State#state{last_packet_ts = erlang:system_time(millisecond)})}; websocket_handle({binary, Data}, State = #state{parser_state = ParserState, proto_state = ProtoState}) -> BinSize = iolist_size(Data), @@ -167,7 +165,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, emqx_metrics:inc('bytes/received', BinSize), case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of {more, NewParserState} -> - {ok, State#state{parser_state = NewParserState}}; + {ok, State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}}; {ok, Packet, Rest} -> emqx_metrics:received(Packet), put(recv_cnt, get(recv_cnt) + 1), @@ -225,7 +223,7 @@ websocket_info({timeout, Timer, emit_stats}, websocket_info({keepalive, start, Interval}, State) -> ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State), - case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of + case emqx_keepalive:start(Interval, {keepalive, check}) of {ok, KeepAlive} -> {ok, State#state{keepalive = KeepAlive}}; {error, Error} -> @@ -233,8 +231,8 @@ websocket_info({keepalive, start, Interval}, State) -> shutdown(Error, State) end; -websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> - case emqx_keepalive:check(KeepAlive) of +websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive, last_packet_ts = LastPacketTs}) -> + case emqx_keepalive:check(KeepAlive, LastPacketTs) of {ok, KeepAlive1} -> {ok, State#state{keepalive = KeepAlive1}}; {error, timeout} -> From 768d1786c7807960470e1c288504a3f5f2390d26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Thu, 20 Sep 2018 15:55:36 +0800 Subject: [PATCH 11/17] Fix bug --- src/emqx_connection.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 139011891..14412ab95 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -298,7 +298,7 @@ handle_packet(Data, State = #state{proto_state = ProtoState, idle_timeout = IdleTimeout}) -> case catch emqx_frame:parse(Data, ParserState) of {more, NewParserState} -> - {noreply, State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}, IdleTimeout}; + {noreply, run_socket(State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}), IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of From 2c350bf5fb747d70b413305944c308622217cfb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Thu, 20 Sep 2018 17:19:21 +0800 Subject: [PATCH 12/17] Match test case for last change --- test/emqx_keepalive_SUITE.erl | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/test/emqx_keepalive_SUITE.erl b/test/emqx_keepalive_SUITE.erl index c4dbd80f2..333d66f45 100644 --- a/test/emqx_keepalive_SUITE.erl +++ b/test/emqx_keepalive_SUITE.erl @@ -26,17 +26,18 @@ groups() -> [{keepalive, [], [t_keepalive]}]. %%-------------------------------------------------------------------- t_keepalive(_) -> - {ok, KA} = emqx_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}), - [resumed, timeout] = lists:reverse(keepalive_recv(KA, [])). + {ok, KA} = emqx_keepalive:start(1, {keepalive, timeout}), + resumed = keepalive_recv(KA, 100), + timeout = keepalive_recv(KA, 2000). -keepalive_recv(KA, Acc) -> +keepalive_recv(KA, MockInterval) -> receive {keepalive, timeout} -> - case emqx_keepalive:check(KA) of - {ok, KA1} -> keepalive_recv(KA1, [resumed | Acc]); - {error, timeout} -> [timeout | Acc] + case emqx_keepalive:check(KA, erlang:system_time(millisecond) - MockInterval) of + {ok, _} -> resumed; + {error, timeout} -> timeout end after 4000 -> - Acc + error end. From b1d4ec750a53234f0d085474aa976451324a0f23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Thu, 20 Sep 2018 17:58:54 +0800 Subject: [PATCH 13/17] Remove the same test cases as emqx_keepalive --- Makefile | 2 +- test/emqx_net_SUITE.erl | 43 ----------------------------------------- 2 files changed, 1 insertion(+), 44 deletions(-) delete mode 100644 test/emqx_net_SUITE.erl diff --git a/Makefile b/Makefile index 54b13727a..302aad42a 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ EUNIT_OPTS = verbose CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \ emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ - emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ + emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ emqx_mountpoint emqx_listeners emqx_protocol emqx_pool diff --git a/test/emqx_net_SUITE.erl b/test/emqx_net_SUITE.erl deleted file mode 100644 index 50a830d10..000000000 --- a/test/emqx_net_SUITE.erl +++ /dev/null @@ -1,43 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_net_SUITE). - -%% CT --compile(export_all). --compile(nowarn_export_all). - -all() -> [{group, keepalive}]. - -groups() -> [{keepalive, [], [t_keepalive]}]. - -%%-------------------------------------------------------------------- -%% Keepalive -%%-------------------------------------------------------------------- - -t_keepalive(_) -> - {ok, KA} = emqx_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}), - [resumed, timeout] = lists:reverse(keepalive_recv(KA, [])). - -keepalive_recv(KA, Acc) -> - receive - {keepalive, timeout} -> - case emqx_keepalive:check(KA) of - {ok, KA1} -> keepalive_recv(KA1, [resumed | Acc]); - {error, timeout} -> [timeout | Acc] - end - after 4000 -> - Acc - end. - From aade94711c52f95648c0d615f61b7a523b5409bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 21 Sep 2018 18:50:26 +0800 Subject: [PATCH 14/17] Use process dictionaries to record last packet timestamp --- src/emqx_connection.erl | 13 +++++++------ src/emqx_ws_connection.erl | 16 +++++++++------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 14412ab95..6c1ab3e66 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -45,8 +45,7 @@ rate_limit, publish_limit, limit_timer, - idle_timeout, - last_packet_ts = 0 + idle_timeout }). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -253,8 +252,8 @@ handle_info({keepalive, start, Interval}, State) -> shutdown(Error, State) end; -handle_info({keepalive, check}, State = #state{keepalive = KeepAlive, last_packet_ts = LastPacketTs}) -> - case emqx_keepalive:check(KeepAlive, LastPacketTs) of +handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> + case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of {ok, KeepAlive1} -> {noreply, State#state{keepalive = KeepAlive1}}; {error, timeout} -> @@ -291,14 +290,16 @@ code_change(_OldVsn, State, _Extra) -> %% Receive and parse data handle_packet(<<>>, State) -> - {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State#state{last_packet_ts = erlang:system_time(millisecond)})))}; + put(last_packet_ts, erlang:system_time(millisecond)), + {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))}; handle_packet(Data, State = #state{proto_state = ProtoState, parser_state = ParserState, idle_timeout = IdleTimeout}) -> case catch emqx_frame:parse(Data, ParserState) of {more, NewParserState} -> - {noreply, run_socket(State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}), IdleTimeout}; + put(last_packet_ts, erlang:system_time(millisecond)), + {noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 3a2d4fdae..2526392ee 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -40,8 +40,7 @@ keepalive, enable_stats, stats_timer, - shutdown, - last_packet_ts = 0 + shutdown }). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). @@ -154,9 +153,11 @@ send_fun(WsPid) -> end. websocket_handle({binary, <<>>}, State) -> - {ok, ensure_stats_timer(State#state{last_packet_ts = erlang:system_time(millisecond)})}; + put(last_packet_ts, erlang:system_time(millisecond)), + {ok, ensure_stats_timer(State)}; websocket_handle({binary, [<<>>]}, State) -> - {ok, ensure_stats_timer(State#state{last_packet_ts = erlang:system_time(millisecond)})}; + put(last_packet_ts, erlang:system_time(millisecond)), + {ok, ensure_stats_timer(State)}; websocket_handle({binary, Data}, State = #state{parser_state = ParserState, proto_state = ProtoState}) -> BinSize = iolist_size(Data), @@ -165,7 +166,8 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, emqx_metrics:inc('bytes/received', BinSize), case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of {more, NewParserState} -> - {ok, State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}}; + put(last_packet_ts, erlang:system_time(millisecond)), + {ok, State#state{parser_state = NewParserState}}; {ok, Packet, Rest} -> emqx_metrics:received(Packet), put(recv_cnt, get(recv_cnt) + 1), @@ -231,8 +233,8 @@ websocket_info({keepalive, start, Interval}, State) -> shutdown(Error, State) end; -websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive, last_packet_ts = LastPacketTs}) -> - case emqx_keepalive:check(KeepAlive, LastPacketTs) of +websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> + case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of {ok, KeepAlive1} -> {ok, State#state{keepalive = KeepAlive1}}; {error, timeout} -> From ab2697671a8ceffa92069f6c8a25c7b7e2b90a76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 21 Sep 2018 19:59:32 +0800 Subject: [PATCH 15/17] Change the location of the recording last packet timestamp --- src/emqx_connection.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 6c1ab3e66..d4f160714 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -232,6 +232,7 @@ handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> Size = iolist_size(Data), emqx_metrics:inc('bytes/received', Size), Incoming = #{bytes => Size, packets => 0}, + put(last_packet_ts, erlang:system_time(millisecond)), handle_packet(Data, State#state{await_recv = false, incoming = Incoming}); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> @@ -290,7 +291,6 @@ code_change(_OldVsn, State, _Extra) -> %% Receive and parse data handle_packet(<<>>, State) -> - put(last_packet_ts, erlang:system_time(millisecond)), {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))}; handle_packet(Data, State = #state{proto_state = ProtoState, @@ -298,7 +298,6 @@ handle_packet(Data, State = #state{proto_state = ProtoState, idle_timeout = IdleTimeout}) -> case catch emqx_frame:parse(Data, ParserState) of {more, NewParserState} -> - put(last_packet_ts, erlang:system_time(millisecond)), {noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), From 389b3c80bc36bfecf2d5d098df7f1fde54dc432b Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 21 Sep 2018 18:18:18 +0800 Subject: [PATCH 16/17] Fix the message delivery to remote --- src/emqx_session.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 118ab6e21..5f078daeb 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -775,7 +775,7 @@ redeliver({pubrel, PacketId}, #state{conn_pid = ConnPid}) -> deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = local}) -> ConnPid ! {deliver, {publish, PacketId, Msg}}; deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = remote}) -> - emqx_rpc:cast(node(ConnPid), erlang, send, [ConnPid, {deliver, PacketId, Msg}]). + emqx_rpc:cast(node(ConnPid), erlang, send, [ConnPid, {deliver, {publish, PacketId, Msg}}]). %%------------------------------------------------------------------------------ %% Awaiting ACK for QoS1/QoS2 Messages From 721f237bc40e3df22dbb456e7226140e94cc0645 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Sun, 9 Sep 2018 16:30:40 +0200 Subject: [PATCH 17/17] Rewrite emqx_gc.erl The implementation prior to this commit supports only one gc enforcement policy which is message count threshold. The new implementation introduces 1 more: volume threshold based --- .gitignore | 2 + priv/emqx.schema | 14 ++++++ src/emqx_connection.erl | 34 ++++++++++----- src/emqx_gc.erl | 97 +++++++++++++++++++++++++++++++---------- src/emqx_session.erl | 35 ++++++++++----- test/emqx_gc_tests.erl | 53 ++++++++++++++++++++++ 6 files changed, 190 insertions(+), 45 deletions(-) create mode 100644 test/emqx_gc_tests.erl diff --git a/.gitignore b/.gitignore index 0927f0bbc..80e3bf42d 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,5 @@ bbmustache/ etc/gen.emqx.conf compile_commands.json cuttlefish +rebar.lock +xrefr diff --git a/priv/emqx.schema b/priv/emqx.schema index 77167b440..261c1981a 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -824,6 +824,14 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Force connection/session process GC after this number of +%% messages | bytes passed through. +%% Numbers delimited by `|'. Zero or negative is to disable. +{mapping, "zone.$name.force_gc_policy", "emqx.zones", [ + {default, "0|0"}, + {datatype, string} + ]}. + {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> {mqtt_retain_available, Val}; @@ -831,6 +839,10 @@ end}. {mqtt_wildcard_subscription, Val}; ("shared_subscription", Val) -> {mqtt_shared_subscription, Val}; + ("force_gc_policy", Val) -> + [Count, Bytes] = string:tokens(Val, "| "), + {force_gc_policy, #{count => list_to_integer(Count), + bytes => list_to_integer(Bytes)}}; (Opt, Val) -> {list_to_atom(Opt), Val} end, @@ -1750,3 +1762,5 @@ end}. {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] end}. + + diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index d4f160714..e2c8fdeed 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -148,7 +148,10 @@ init([Transport, RawSocket, Options]) -> proto_state = ProtoState, parser_state = ParserState, enable_stats = EnableStats, - idle_timeout = IdleTimout}), + idle_timeout = IdleTimout + }), + GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), + ok = emqx_gc:init(GcPolicy), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State, self(), IdleTimout); {error, Reason} -> @@ -200,14 +203,18 @@ handle_cast(Msg, State) -> handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> case emqx_protocol:deliver(PubOrAck, ProtoState) of {ok, ProtoState1} -> - {noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))}; + State1 = ensure_stats_timer(State#state{proto_state = ProtoState1}), + ok = maybe_gc(State1, PubOrAck), + {noreply, State1}; {error, Reason} -> shutdown(Reason, State) end; - handle_info({timeout, Timer, emit_stats}, - State = #state{stats_timer = Timer, proto_state = ProtoState}) -> + State = #state{stats_timer = Timer, + proto_state = ProtoState + }) -> emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), + ok = emqx_gc:reset(), {noreply, State#state{stats_timer = undefined}, hibernate}; handle_info(timeout, State) -> @@ -290,9 +297,10 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ %% Receive and parse data -handle_packet(<<>>, State) -> - {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))}; - +handle_packet(<<>>, State0) -> + State = ensure_stats_timer(ensure_rate_limit(State0)), + ok = maybe_gc(State, incoming), + {noreply, State}; handle_packet(Data, State = #state{proto_state = ProtoState, parser_state = ParserState, idle_timeout = IdleTimeout}) -> @@ -376,7 +384,13 @@ shutdown(Reason, State) -> stop(Reason, State) -> {stop, Reason, State}. -maybe_gc(State) -> - %% TODO: gc and shutdown policy - State. +%% For incoming messages, bump gc-stats with packet count and totoal volume +%% For outgoing messages, only 'publish' type is taken into account. +maybe_gc(#state{incoming = #{bytes := Oct, packets := Cnt}}, incoming) -> + ok = emqx_gc:inc(Cnt, Oct); +maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) -> + Oct = iolist_size(Payload), + ok = emqx_gc:inc(1, Oct); +maybe_gc(_, _) -> + ok. diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index 5a32b43c5..65bbccad1 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -12,38 +12,87 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%% GC Utility functions. +%% @doc This module manages an opaque collection of statistics data used to +%% force garbage collection on `self()' process when hitting thresholds. +%% Namely: +%% (1) Total number of messages passed through +%% (2) Total data volume passed through +%% @end -module(emqx_gc). -%% Memory: (10, 100, 1000) -%% +-author("Feng Lee "). --export([conn_max_gc_count/0, reset_conn_gc_count/2, maybe_force_gc/2, - maybe_force_gc/3]). +-export([init/1, inc/2, reset/0]). --spec(conn_max_gc_count() -> integer()). -conn_max_gc_count() -> - case emqx_config:get_env(conn_force_gc_count) of - I when is_integer(I), I > 0 -> I + rand:uniform(I); - I when is_integer(I), I =< 0 -> undefined; - undefined -> undefined +-type st() :: #{ cnt => {integer(), integer()} + , oct => {integer(), integer()} + }. + +-define(disabled, disabled). +-define(ENABLED(X), (is_integer(X) andalso X > 0)). + +%% @doc Initialize force GC parameters. +-spec init(false | map()) -> ok. +init(#{count := Count, bytes := Bytes}) -> + Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)], + Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)], + erlang:put(?MODULE, maps:from_list(Cnt ++ Oct)), + ok; +init(_) -> erlang:put(?MODULE, #{}), ok. + +%% @doc Increase count and bytes stats in one call, +%% ensure gc is triggered at most once, even if both thresholds are hit. +-spec inc(pos_integer(), pos_integer()) -> ok. +inc(Cnt, Oct) -> + mutate_pd_with(fun(St) -> inc(St, Cnt, Oct) end). + +%% @doc Reset counters to zero. +-spec reset() -> ok. +reset() -> + mutate_pd_with(fun(St) -> reset(St) end). + +%% ======== Internals ======== + +%% mutate gc stats numbers in process dict with the given function +mutate_pd_with(F) -> + St = F(erlang:get(?MODULE)), + erlang:put(?MODULE, St), + ok. + +%% Increase count and bytes stats in one call, +%% ensure gc is triggered at most once, even if both thresholds are hit. +-spec inc(st(), pos_integer(), pos_integer()) -> st(). +inc(St0, Cnt, Oct) -> + case do_inc(St0, cnt, Cnt) of + {true, St} -> + St; + {false, St1} -> + {_, St} = do_inc(St1, oct, Oct), + St end. --spec(reset_conn_gc_count(pos_integer(), tuple()) -> tuple()). -reset_conn_gc_count(Pos, State) -> - case element(Pos, State) of - undefined -> State; - _I -> setelement(Pos, State, conn_max_gc_count()) +%% Reset counters to zero. +reset(St) -> reset(cnt, reset(oct, St)). + +-spec do_inc(st(), cnt | oct, pos_integer()) -> {boolean(), st()}. +do_inc(St, Key, Num) -> + case maps:get(Key, St, ?disabled) of + ?disabled -> + {false, St}; + {Init, Remain} when Remain > Num -> + {false, maps:put(Key, {Init, Remain - Num}, St)}; + _ -> + {true, do_gc(St)} end. -maybe_force_gc(Pos, State) -> - maybe_force_gc(Pos, State, fun() -> ok end). -maybe_force_gc(Pos, State, Cb) -> - case element(Pos, State) of - undefined -> State; - I when I =< 0 -> Cb(), garbage_collect(), - reset_conn_gc_count(Pos, State); - I -> setelement(Pos, State, I - 1) +do_gc(St) -> + erlang:garbage_collect(), + reset(St). + +reset(Key, St) -> + case maps:get(Key, St, ?disabled) of + ?disabled -> St; + {Init, _} -> maps:put(Key, {Init, Init}, St) end. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 5f078daeb..eab657eb6 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -350,10 +350,13 @@ init([Parent, #{zone := Zone, enable_stats = get_env(Zone, enable_stats, true), deliver_stats = 0, enqueue_stats = 0, - created_at = os:timestamp()}, + created_at = os:timestamp() + }, emqx_sm:register_session(ClientId, attrs(State)), emqx_sm:set_session_stats(ClientId, stats(State)), emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), + GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), + ok = emqx_gc:init(GcPolicy), ok = proc_lib:init_ack(Parent, {ok, self()}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). @@ -567,8 +570,11 @@ handle_info({timeout, Timer, retry_delivery}, State = #state{retry_timer = Timer handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) -> noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined})); -handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) -> +handle_info({timeout, Timer, emit_stats}, + State = #state{client_id = ClientId, + stats_timer = Timer}) -> _ = emqx_sm:set_session_stats(ClientId, stats(State)), + ok = emqx_gc:reset(), %% going to hibernate, reset gc stats {noreply, State#state{stats_timer = undefined}, hibernate}; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> @@ -744,21 +750,22 @@ dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) -> end; %% Deliver qos0 message directly to client -dispatch(Msg = #message{qos = ?QOS0}, State) -> +dispatch(Msg = #message{qos = ?QOS0} = Msg, State) -> deliver(undefined, Msg, State), - inc_stats(deliver, State); + inc_stats(deliver, Msg, State); -dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight}) +dispatch(Msg = #message{qos = QoS} = Msg, + State = #state{next_pkt_id = PacketId, inflight = Inflight}) when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> case emqx_inflight:is_full(Inflight) of true -> enqueue_msg(Msg, State); false -> deliver(PacketId, Msg, State), - await(PacketId, Msg, inc_stats(deliver, next_pkt_id(State))) + await(PacketId, Msg, inc_stats(deliver, Msg, next_pkt_id(State))) end. enqueue_msg(Msg, State = #state{mqueue = Q}) -> - inc_stats(enqueue, State#state{mqueue = emqx_mqueue:in(Msg, Q)}). + inc_stats(enqueue, Msg, State#state{mqueue = emqx_mqueue:in(Msg, Q)}). %%------------------------------------------------------------------------------ %% Deliver @@ -882,11 +889,19 @@ next_pkt_id(State = #state{next_pkt_id = Id}) -> %%------------------------------------------------------------------------------ %% Inc stats -inc_stats(deliver, State = #state{deliver_stats = I}) -> +inc_stats(deliver, Msg, State = #state{deliver_stats = I}) -> + MsgSize = msg_size(Msg), + ok = emqx_gc:inc(1, MsgSize), State#state{deliver_stats = I + 1}; -inc_stats(enqueue, State = #state{enqueue_stats = I}) -> +inc_stats(enqueue, _Msg, State = #state{enqueue_stats = I}) -> State#state{enqueue_stats = I + 1}. +%% Take only the payload size into account, add other fields if necessary +msg_size(#message{payload = Payload}) -> payload_size(Payload). + +%% Payload should be binary(), but not 100% sure. Need dialyzer! +payload_size(Payload) -> erlang:iolist_size(Payload). + %%------------------------------------------------------------------------------ %% Helper functions @@ -902,5 +917,3 @@ noreply(State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. -%% TODO: GC Policy and Shutdown Policy -%% maybe_gc(State) -> State. diff --git a/test/emqx_gc_tests.erl b/test/emqx_gc_tests.erl new file mode 100644 index 000000000..ffcac91d1 --- /dev/null +++ b/test/emqx_gc_tests.erl @@ -0,0 +1,53 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_gc_tests). + +-include_lib("eunit/include/eunit.hrl"). + +trigger_by_cnt_test() -> + Args = #{count => 2, bytes => 0}, + ok = emqx_gc:init(Args), + ok = emqx_gc:inc(1, 1000), + St1 = inspect(), + ?assertMatch({_, Remain} when Remain > 0, maps:get(cnt, St1)), + ok = emqx_gc:inc(2, 2), + St2 = inspect(), + ok = emqx_gc:inc(0, 2000), + St3 = inspect(), + ?assertEqual(St2, St3), + ?assertMatch({N, N}, maps:get(cnt, St2)), + ?assertNot(maps:is_key(oct, St2)), + ok. + +trigger_by_oct_test() -> + Args = #{count => 2, bytes => 2}, + ok = emqx_gc:init(Args), + ok = emqx_gc:inc(1, 1), + St1 = inspect(), + ?assertMatch({_, Remain} when Remain > 0, maps:get(oct, St1)), + ok = emqx_gc:inc(2, 2), + St2 = inspect(), + ?assertMatch({N, N}, maps:get(oct, St2)), + ?assertMatch({M, M}, maps:get(cnt, St2)), + ok. + +disabled_test() -> + Args = #{count => -1, bytes => false}, + ok = emqx_gc:init(Args), + ok = emqx_gc:inc(1, 1), + ?assertEqual(#{}, inspect()), + ok. + +inspect() -> erlang:get(emqx_gc).