From 8db9f3e81d905509552ce0d508ac69af9c912932 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Wed, 12 Sep 2018 20:01:31 +0200 Subject: [PATCH 01/13] Update & clarify module doc for emqx_mqueue.erl --- src/emqx_mqueue.erl | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index a93fd8838..d9270dd5f 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -18,7 +18,7 @@ %% %% This module implements a simple in-memory queue for MQTT persistent session. %% -%% If the broker restarted or crashed, all the messages queued will be gone. +%% If the broker restarts or crashes, all queued messages will be lost. %% %% Concept of Message Queue and Inflight Window: %% @@ -29,12 +29,15 @@ %% |<--- Win Size --->| %% %% -%% 1. Inflight Window to store the messages delivered and awaiting for puback. +%% 1. Inflight Window is to store the messages +%% that are delivered but still awaiting for puback. %% -%% 2. Enqueue messages when the inflight window is full. +%% 2. Messages are enqueued to tail when the inflight window is full. %% -%% 3. If the queue is full, dropped qos0 messages if store_qos0 is true, -%% otherwise dropped the oldest one. +%% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true` +%% in init options +%% +%% 4. If the queue is full drop the oldest one unless `max_len' is set to `0'. %% %% @end From fde6a2a4c34fee30d92bf00e9625c03e5546be39 Mon Sep 17 00:00:00 2001 From: HuangDan Date: Thu, 13 Sep 2018 20:22:13 +0800 Subject: [PATCH 02/13] Fixed issue #1811 Add tests case for issue #1811 --- src/emqx_connection.erl | 2 +- test/emqx_SUITE.erl | 31 ++++++++++++++++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index adda71450..a7f71d9aa 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -303,7 +303,7 @@ handle_packet(Data, State = #state{proto_state = ProtoState, idle_timeout = IdleTimeout}) -> case catch emqx_frame:parse(Data, ParserState) of {more, NewParserState} -> - {noreply, State#state{parser_state = NewParserState}, IdleTimeout}; + {noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index a08305a30..166d64a30 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -49,8 +49,18 @@ -define(PUBPACKET, ?PUBLISH_PACKET(?PUBQOS, <<"sub/topic">>, ?PACKETID, <<"publish">>)). +-define(PAYLOAD, [{type,"dsmSimulationData"}, + {id, 9999}, + {status, "running"}, + {soc, 1536702170}, + {fracsec, 451000}, + {data, lists:seq(1, 20480)}]). + +-define(BIG_PUBPACKET, ?PUBLISH_PACKET(?PUBQOS, <<"sub/topic">>, ?PACKETID, emqx_json:encode(?PAYLOAD))). + all() -> - [{group, connect}]. + [{group, connect}, + {group, publish}]. groups() -> [{connect, [non_parallel_tests], @@ -60,6 +70,10 @@ groups() -> mqtt_connect_with_ssl_oneway, mqtt_connect_with_ssl_twoway, mqtt_connect_with_ws + ]}, + {publish, [non_parallel_tests], + [ + packet_size ]}]. init_per_suite(Config) -> @@ -157,6 +171,21 @@ mqtt_connect_with_ws(_Config) -> {close, _} = rfc6455_client:close(WS), ok. +%%issue 1811 +packet_size(_Config) -> + {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), + Packet = raw_send_serialise(?CLIENT), + emqx_client_sock:send(Sock, Packet), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), + + %% Pub Packet QoS 1 + PubPacket = raw_send_serialise(?BIG_PUBPACKET), + emqx_client_sock:send(Sock, PubPacket), + {ok, Data1} = gen_tcp:recv(Sock, 0), + {ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(Data1), + emqx_client_sock:close(Sock). + raw_send_serialise(Packet) -> emqx_frame:serialize(Packet). From 35d209f36401445c0dd1c5f83f24235d39a348c5 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Thu, 13 Sep 2018 18:23:19 +0200 Subject: [PATCH 03/13] Fix travis build --- .travis.yml | 2 ++ Makefile | 11 +++++++---- rebar.config | 10 +++------- rebar.config.script | 1 - 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/.travis.yml b/.travis.yml index e4088022d..adef0f3cd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,8 +8,10 @@ before_install: script: - make dep-vsn-check + - make rebar-compile - make rebar-eunit - make rebar-ct + - make rebar-cover - make coveralls sudo: false diff --git a/Makefile b/Makefile index 2417ad040..73438ca01 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ dep_lager_syslog = git https://github.com/basho/lager_syslog 3.0.1 NO_AUTOPATCH = cuttlefish -ERLC_OPTS += +debug_info +ERLC_OPTS += +debug_info -DAPPLICATION=emqx ERLC_OPTS += +'{parse_transform, lager_transform}' BUILD_DEPS = cuttlefish @@ -27,7 +27,7 @@ dep_cuttlefish = git https://github.com/emqx/cuttlefish emqx30 #TEST_DEPS = emqx_ct_helplers #dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers -TEST_ERLC_OPTS += +debug_info +TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx TEST_ERLC_OPTS += +'{parse_transform, lager_transform}' EUNIT_OPTS = verbose @@ -60,7 +60,7 @@ gen-clean: @rm -f etc/gen.emqx.conf bbmustache: - $(verbose) git clone https://github.com/soranoba/bbmustache.git && pushd bbmustache && ./rebar3 compile && popd + $(verbose) git clone https://github.com/soranoba/bbmustache.git && cd bbmustache && ./rebar3 compile && cd .. # This hack is to generate a conf file for testing # relx overlay is used for release @@ -78,6 +78,9 @@ app.config: etc/gen.emqx.conf ct: cuttlefish app.config +rebar-cover: + @rebar3 cover + coveralls: @rebar3 coveralls send @@ -91,7 +94,7 @@ rebar-cuttlefish: rebar-deps rebar-deps: @rebar3 get-deps -rebar-eunit: +rebar-eunit: rebar-cuttlefish @rebar3 eunit rebar-compile: diff --git a/rebar.config b/rebar.config index d7d14c883..aa77f3a02 100644 --- a/rebar.config +++ b/rebar.config @@ -20,7 +20,8 @@ warn_unused_import, warn_obsolete_guard, debug_info, - {parse_transform, lager_transform}]}. + {parse_transform, lager_transform}, + {d, 'APPLICATION', emqx}]}. {xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, warnings_as_errors, deprecated_functions]}. @@ -29,10 +30,5 @@ {cover_export_enabled, true}. %% rebar3_neotoma_plugin is needed to compile the .peg file for cuttlefish -{plugins, [rebar3_neotoma_plugin]}. +{plugins, [coveralls, rebar3_neotoma_plugin]}. -%% Do not include cuttlefish's dependencies as mine -%% its dependencies are only fetched to compile itself -%% they are however not needed by emqx -{overrides, [{override, cuttlefish, [{deps, []}]} - ]}. diff --git a/rebar.config.script b/rebar.config.script index 7c247ac48..0b18592f1 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -3,7 +3,6 @@ CONFIG1 = case os:getenv("TRAVIS") of "true" -> JobId = os:getenv("TRAVIS_JOB_ID"), [{coveralls_service_job_id, JobId}, - {plugins, [coveralls]}, {coveralls_coverdata, "_build/test/cover/*.coverdata"}, {coveralls_service_name , "travis-ci"} | CONFIG]; _ -> From 45b2686e1c2ea748870644ef4aae83f60aed8923 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 14 Sep 2018 09:23:27 +0800 Subject: [PATCH 04/13] Delete unnecessary code Prior to this change, there are multiple deprecated functions. --- src/emqx_message.erl | 18 +----------------- test/emqx_message_SUITE.erl | 7 ++----- 2 files changed, 3 insertions(+), 22 deletions(-) diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 311bd58dd..91e5d4d59 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -22,7 +22,7 @@ -export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]). -export([set_headers/2]). -export([get_header/2, get_header/3, set_header/3]). --export([is_expired/1, check_expiry/1, check_expiry/2, update_expiry/1]). +-export([is_expired/1, update_expiry/1]). -export([format/1]). -type(flag() :: atom()). @@ -100,21 +100,6 @@ is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestam is_expired(_Msg) -> false. --spec(check_expiry(emqx_types:message()) -> {ok, pos_integer()} | expired | false). -check_expiry(Msg = #message{timestamp = CreatedAt}) -> - check_expiry(Msg, CreatedAt); -check_expiry(_Msg) -> - false. - --spec(check_expiry(emqx_types:message(), erlang:timestamp()) -> {ok, pos_integer()} | expired | false). -check_expiry(#message{headers = #{'Message-Expiry-Interval' := Interval}}, Since) -> - case Interval - (elapsed(Since) div 1000) of - Timeout when Timeout > 0 -> {ok, Timeout}; - _ -> expired - end; -check_expiry(_Msg, _Since) -> - false. - update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> case elapsed(CreatedAt) of Elapsed when Elapsed > 0 -> @@ -138,4 +123,3 @@ format(flags, Flags) -> io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); format(headers, Headers) -> io_lib:format("~p", [Headers]). - diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index c2ca0f0aa..37207e40c 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -29,7 +29,7 @@ all() -> message_flag, message_header, message_format, - message_expired + message_expired ]. message_make(_) -> @@ -53,7 +53,7 @@ message_flag(_) -> ?assert(emqx_message:get_flag(dup, Msg6)), ?assert(emqx_message:get_flag(retain, Msg6)). -message_header(_) -> +message_header(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg), Msg2 = emqx_message:set_header(c, 3, Msg1), @@ -68,11 +68,8 @@ message_expired(_) -> Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg), timer:sleep(500), ?assertNot(emqx_message:is_expired(Msg1)), - {ok, 1} = emqx_message:check_expiry(Msg1), timer:sleep(600), ?assert(emqx_message:is_expired(Msg1)), - expired = emqx_message:check_expiry(Msg1), timer:sleep(1000), Msg2 = emqx_message:update_expiry(Msg1), ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). - From 88b3460715a25a52545382adb079ce0a6ddcc88e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Wed, 12 Sep 2018 15:17:18 +0800 Subject: [PATCH 05/13] Add feature for issue#1809 --- etc/emqx.conf | 10 ++++++++++ priv/emqx.schema | 13 ++++++++++++- src/emqx_listeners.erl | 10 ++++++++-- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index deb702211..5d54389dc 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1207,6 +1207,11 @@ listener.ssl.external.reuseaddr = true ## Examples: 8083, 127.0.0.1:8083, ::1:8083 listener.ws.external = 8083 +## Whether the client must include "mqtt" in the list of WebSocket Sub Protocols it offers +## +## Value: true | false +listener.ws.external.standard_mqtt = true + ## The acceptor pool for external MQTT/WebSocket listener. ## ## Value: Number @@ -1346,6 +1351,11 @@ listener.ws.external.nodelay = true ## Examples: 8084, 127.0.0.1:8084, ::1:8084 listener.wss.external = 8084 +## Whether the client must include "mqtt" in the list of WebSocket Sub Protocols it offers +## +## Value: true | false +listener.wss.external.standard_mqtt = true + ## The acceptor pool for external MQTT/WebSocket/SSL listener. ## ## Value: Number diff --git a/priv/emqx.schema b/priv/emqx.schema index e9f4932c4..8e8979908 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1094,6 +1094,11 @@ end}. {datatype, [integer, ip]} ]}. +{mapping, "listener.ws.$name.standard_mqtt", "emqx.listeners", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + {mapping, "listener.ws.$name.acceptors", "emqx.listeners", [ {default, 8}, {datatype, integer} @@ -1195,6 +1200,11 @@ end}. {datatype, [integer, ip]} ]}. +{mapping, "listener.wss.$name.standard_mqtt", "emqx.listeners", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + {mapping, "listener.wss.$name.acceptors", "emqx.listeners", [ {default, 8}, {datatype, integer} @@ -1365,7 +1375,8 @@ end}. end, LisOpts = fun(Prefix) -> - Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + Filter([{standard_mqtt, cuttlefish:conf_get(Prefix ++ ".standard_mqtt", Conf, undefined)}, + {acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 421304f3a..127c62b39 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -51,12 +51,12 @@ start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls -> %% Start MQTT/WS listener start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> - Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{subprotocol_name(Options), emqx_ws_connection, Options}]}]), start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch); %% Start MQTT/WSS listener start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> - Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{subprotocol_name(Options), emqx_ws_connection, Options}]}]), start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch). start_mqtt_listener(Name, ListenOn, Options) -> @@ -67,6 +67,12 @@ start_mqtt_listener(Name, ListenOn, Options) -> start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) -> Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). +subprotocol_name(Options) -> + case proplists:get_value(standard_mqtt, Options, true) of + true -> "/mqtt"; + false -> "/" + end. + ranch_opts(Options) -> NumAcceptors = proplists:get_value(acceptors, Options, 4), MaxConnections = proplists:get_value(max_connections, Options, 1024), From 0c6a268539718ef5764ce4e29a136736f31b7a1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Thu, 13 Sep 2018 10:31:36 +0800 Subject: [PATCH 06/13] Set default value of message expiry interval for not mqtt 5.0 message --- src/emqx_protocol.erl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 8301cf014..054ec2a57 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -249,6 +249,13 @@ preprocess_properties(Packet = #mqtt_packet{ PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) -> {Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}}; +preprocess_properties(Packet = #mqtt_packet{variable = #mqtt_packet_publish{properties = #{'Message-Expiry-Interval' := _Interval}}}, + PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) -> + {Packet, PState}; +preprocess_properties(Packet = #mqtt_packet{variable = Publish = #mqtt_packet_publish{properties = Properties}}, + PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) -> + {Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{properties = maps:put('Message-Expiry-Interval', 0, Properties)}}, PState}; + preprocess_properties(Packet, PState) -> {Packet, PState}. From 6f536eaac42af64b8f2beecb69843a3b241be2c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 14 Sep 2018 10:10:32 +0800 Subject: [PATCH 07/13] Add customized mqtt path for websocket --- etc/emqx.conf | 12 ++++++------ priv/emqx.schema | 14 +++++++------- src/emqx_listeners.erl | 14 ++++++++------ 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 5d54389dc..db4eedffa 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1207,10 +1207,10 @@ listener.ssl.external.reuseaddr = true ## Examples: 8083, 127.0.0.1:8083, ::1:8083 listener.ws.external = 8083 -## Whether the client must include "mqtt" in the list of WebSocket Sub Protocols it offers +## Define the path you want to add to the end of the URL ## -## Value: true | false -listener.ws.external.standard_mqtt = true +## Value: / | / +listener.ws.external.mqtt_path = /mqtt ## The acceptor pool for external MQTT/WebSocket listener. ## @@ -1351,10 +1351,10 @@ listener.ws.external.nodelay = true ## Examples: 8084, 127.0.0.1:8084, ::1:8084 listener.wss.external = 8084 -## Whether the client must include "mqtt" in the list of WebSocket Sub Protocols it offers +## Define the path you want to add to the end of the URL ## -## Value: true | false -listener.wss.external.standard_mqtt = true +## Value: / | / +listener.wss.external.mqtt_path = /mqtt ## The acceptor pool for external MQTT/WebSocket/SSL listener. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index 8e8979908..cfd8c83a6 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1094,9 +1094,9 @@ end}. {datatype, [integer, ip]} ]}. -{mapping, "listener.ws.$name.standard_mqtt", "emqx.listeners", [ - {default, true}, - {datatype, {enum, [true, false]}} +{mapping, "listener.ws.$name.mqtt_path", "emqx.listeners", [ + {default, "/mqtt"}, + {datatype, string} ]}. {mapping, "listener.ws.$name.acceptors", "emqx.listeners", [ @@ -1200,9 +1200,9 @@ end}. {datatype, [integer, ip]} ]}. -{mapping, "listener.wss.$name.standard_mqtt", "emqx.listeners", [ - {default, true}, - {datatype, {enum, [true, false]}} +{mapping, "listener.wss.$name.mqtt_path", "emqx.listeners", [ + {default, "/mqtt"}, + {datatype, string} ]}. {mapping, "listener.wss.$name.acceptors", "emqx.listeners", [ @@ -1375,7 +1375,7 @@ end}. end, LisOpts = fun(Prefix) -> - Filter([{standard_mqtt, cuttlefish:conf_get(Prefix ++ ".standard_mqtt", Conf, undefined)}, + Filter([{mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)}, {acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 127c62b39..4cd20a091 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -51,12 +51,12 @@ start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls -> %% Start MQTT/WS listener start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> - Dispatch = cowboy_router:compile([{'_', [{subprotocol_name(Options), emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]), start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch); %% Start MQTT/WSS listener start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> - Dispatch = cowboy_router:compile([{'_', [{subprotocol_name(Options), emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]), start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch). start_mqtt_listener(Name, ListenOn, Options) -> @@ -67,10 +67,12 @@ start_mqtt_listener(Name, ListenOn, Options) -> start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) -> Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). -subprotocol_name(Options) -> - case proplists:get_value(standard_mqtt, Options, true) of - true -> "/mqtt"; - false -> "/" +mqtt_path(Options) -> + MQTTPath = proplists:get_value(mqtt_path, Options, "/mqtt"), + case erlang:list_to_bitstring(MQTTPath) of + <<"/">> -> MQTTPath; + <<"/", _/binary>> -> MQTTPath; + _ -> "/mqtt" end. ranch_opts(Options) -> From 5465b015be1bb635da7cc0230cdc6d8336aaa271 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 14 Sep 2018 10:40:16 +0800 Subject: [PATCH 08/13] Add test case for last change --- src/emqx_listeners.erl | 6 ++++++ test/emqx_listeners_SUITE.erl | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 4cd20a091..2d9bb4bba 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -22,6 +22,12 @@ -export([restart_listener/1, restart_listener/3]). -export([stop_listener/1, stop_listener/3]). +-ifdef(TEST). + +-export([mqtt_path/1]). + +-endif. + -type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}). %% @doc Start all listeners. diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl index 17181aa31..8d97a8158 100644 --- a/test/emqx_listeners_SUITE.erl +++ b/test/emqx_listeners_SUITE.erl @@ -26,7 +26,8 @@ all() -> [start_stop_listeners, - restart_listeners]. + restart_listeners, + t_mqtt_path]. init_per_suite(Config) -> NewConfig = generate_config(), @@ -49,6 +50,11 @@ restart_listeners(_) -> ok = emqx_listeners:restart(), ok = emqx_listeners:stop(). +t_mqtt_path(_) -> + ?assertEqual("/test", emqx_listeners:mqtt_path([{mqtt_path, "/test"}])), + ?assertEqual("/", emqx_listeners:mqtt_path([{mqtt_path, "/"}])), + ?assertEqual("/mqtt", emqx_listeners:mqtt_path([{mqtt_path, "test"}])). + generate_config() -> Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]), From 5eb92e37cc0a5863233b7852572c984334c3c6f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 14 Sep 2018 11:29:38 +0800 Subject: [PATCH 09/13] Remove check for MQTT path, and normalize code --- etc/emqx.conf | 4 ++-- priv/emqx.schema | 4 ++-- src/emqx_listeners.erl | 13 +------------ src/emqx_protocol.erl | 7 ------- test/emqx_listeners_SUITE.erl | 8 +------- 5 files changed, 6 insertions(+), 30 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index db4eedffa..4a7c195d2 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1207,7 +1207,7 @@ listener.ssl.external.reuseaddr = true ## Examples: 8083, 127.0.0.1:8083, ::1:8083 listener.ws.external = 8083 -## Define the path you want to add to the end of the URL +## The path of WebSocket MQTT endpoint ## ## Value: / | / listener.ws.external.mqtt_path = /mqtt @@ -1351,7 +1351,7 @@ listener.ws.external.nodelay = true ## Examples: 8084, 127.0.0.1:8084, ::1:8084 listener.wss.external = 8084 -## Define the path you want to add to the end of the URL +## The path of WebSocket MQTT endpoint ## ## Value: / | / listener.wss.external.mqtt_path = /mqtt diff --git a/priv/emqx.schema b/priv/emqx.schema index cfd8c83a6..a8ed316c2 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1375,8 +1375,8 @@ end}. end, LisOpts = fun(Prefix) -> - Filter([{mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)}, - {acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + {mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)}, {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 2d9bb4bba..d3ef43069 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -22,12 +22,6 @@ -export([restart_listener/1, restart_listener/3]). -export([stop_listener/1, stop_listener/3]). --ifdef(TEST). - --export([mqtt_path/1]). - --endif. - -type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}). %% @doc Start all listeners. @@ -74,12 +68,7 @@ start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) -> Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). mqtt_path(Options) -> - MQTTPath = proplists:get_value(mqtt_path, Options, "/mqtt"), - case erlang:list_to_bitstring(MQTTPath) of - <<"/">> -> MQTTPath; - <<"/", _/binary>> -> MQTTPath; - _ -> "/mqtt" - end. + proplists:get_value(mqtt_path, Options, "/mqtt"). ranch_opts(Options) -> NumAcceptors = proplists:get_value(acceptors, Options, 4), diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 054ec2a57..8301cf014 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -249,13 +249,6 @@ preprocess_properties(Packet = #mqtt_packet{ PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) -> {Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}}; -preprocess_properties(Packet = #mqtt_packet{variable = #mqtt_packet_publish{properties = #{'Message-Expiry-Interval' := _Interval}}}, - PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) -> - {Packet, PState}; -preprocess_properties(Packet = #mqtt_packet{variable = Publish = #mqtt_packet_publish{properties = Properties}}, - PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) -> - {Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{properties = maps:put('Message-Expiry-Interval', 0, Properties)}}, PState}; - preprocess_properties(Packet, PState) -> {Packet, PState}. diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl index 8d97a8158..17181aa31 100644 --- a/test/emqx_listeners_SUITE.erl +++ b/test/emqx_listeners_SUITE.erl @@ -26,8 +26,7 @@ all() -> [start_stop_listeners, - restart_listeners, - t_mqtt_path]. + restart_listeners]. init_per_suite(Config) -> NewConfig = generate_config(), @@ -50,11 +49,6 @@ restart_listeners(_) -> ok = emqx_listeners:restart(), ok = emqx_listeners:stop(). -t_mqtt_path(_) -> - ?assertEqual("/test", emqx_listeners:mqtt_path([{mqtt_path, "/test"}])), - ?assertEqual("/", emqx_listeners:mqtt_path([{mqtt_path, "/"}])), - ?assertEqual("/mqtt", emqx_listeners:mqtt_path([{mqtt_path, "test"}])). - generate_config() -> Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]), From f5ed6ddb0588748fb13dd0d6787e086d98db2f99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 14 Sep 2018 11:40:57 +0800 Subject: [PATCH 10/13] Change comments --- etc/emqx.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 4a7c195d2..2bba8d024 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1209,7 +1209,7 @@ listener.ws.external = 8083 ## The path of WebSocket MQTT endpoint ## -## Value: / | / +## Value: URL Path listener.ws.external.mqtt_path = /mqtt ## The acceptor pool for external MQTT/WebSocket listener. @@ -1353,7 +1353,7 @@ listener.wss.external = 8084 ## The path of WebSocket MQTT endpoint ## -## Value: / | / +## Value: URL Path listener.wss.external.mqtt_path = /mqtt ## The acceptor pool for external MQTT/WebSocket/SSL listener. From 737dcff44eb7fca9ec25d5f315d237d590e6215c Mon Sep 17 00:00:00 2001 From: Michal-Drobniak Date: Fri, 14 Sep 2018 11:46:52 +0200 Subject: [PATCH 11/13] Add space before plugins.etc_dir in emqx.conf --- etc/emqx.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 2bba8d024..3604f2efd 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1839,7 +1839,7 @@ module.rewrite = off ## The etc dir for plugins' config. ## ## Value: Folder -plugins.etc_dir ={{ platform_etc_dir }}/plugins/ +plugins.etc_dir = {{ platform_etc_dir }}/plugins/ ## The file to store loaded plugin names. ## From 69e5869fa0b7f3aa56f2a0317961c5918733b12b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 16 Sep 2018 20:49:47 +0800 Subject: [PATCH 12/13] Add submit/2, async_submit/2 functions for emqx_pool module. --- src/emqx_pool.erl | 50 +++++++++++++++++++++++++++++++++------------- src/emqx_topic.erl | 4 ++-- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index 276352797..52df7fec3 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -17,7 +17,8 @@ -behaviour(gen_server). -export([start_link/0, start_link/2]). --export([submit/1, async_submit/1]). +-export([submit/1, submit/2]). +-export([async_submit/1, async_submit/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -25,6 +26,8 @@ -define(POOL, ?MODULE). +-type(task() :: fun() | mfa() | {fun(), Args :: list(any())}). + %% @doc Start pooler supervisor. start_link() -> emqx_pool_sup:start_link(?POOL, random, {?MODULE, start_link, []}). @@ -34,16 +37,33 @@ start_link() -> start_link(Pool, Id) -> gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). -%% @doc Submit work to the pool --spec(submit(fun()) -> any()). -submit(Fun) -> - gen_server:call(worker(), {submit, Fun}, infinity). +%% @doc Submit work to the pool. +-spec(submit(task()) -> any()). +submit(Task) -> + call({submit, Task}). -%% @doc Submit work to the pool asynchronously --spec(async_submit(fun()) -> ok). -async_submit(Fun) -> - gen_server:cast(worker(), {async_submit, Fun}). +-spec(submit(fun(), list(any())) -> any()). +submit(Fun, Args) -> + call({submit, {Fun, Args}}). +%% @private +call(Req) -> + gen_server:call(worker(), Req, infinity). + +%% @doc Submit work to the pool asynchronously. +-spec(async_submit(task()) -> ok). +async_submit(Task) -> + cast({async_submit, Task}). + +-spec(async_submit(fun(), list(any())) -> ok). +async_submit(Fun, Args) -> + cast({async_submit, {Fun, Args}}). + +%% @private +cast(Msg) -> + gen_server:cast(worker(), Msg). + +%% @private worker() -> gproc_pool:pick_worker(pool). @@ -55,15 +75,15 @@ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. -handle_call({submit, Fun}, _From, State) -> - {reply, catch run(Fun), State}; +handle_call({submit, Task}, _From, State) -> + {reply, catch run(Task), State}; handle_call(Req, _From, State) -> emqx_logger:error("[Pool] unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({async_submit, Fun}, State) -> - try run(Fun) +handle_cast({async_submit, Task}, State) -> + try run(Task) catch _:Error:Stacktrace -> emqx_logger:error("[Pool] error: ~p, ~p", [Error, Stacktrace]) end, @@ -78,7 +98,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> - true = gproc_pool:disconnect_worker(Pool, {Pool, Id}). + gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -89,6 +109,8 @@ code_change(_OldVsn, State, _Extra) -> run({M, F, A}) -> erlang:apply(M, F, A); +run({F, A}) when is_function(F), is_list(A) -> + erlang:apply(F, A); run(Fun) when is_function(Fun) -> Fun(). diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index b3b417717..6540bbef2 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -188,8 +188,8 @@ parse(Topic = <<"$share/", Topic1/binary>>, Options) -> case binary:split(Topic1, <<"/">>) of [<<>>] -> error({invalid_topic, Topic}); [_] -> error({invalid_topic, Topic}); - [Group, Topic2] -> - case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of + [Group, Topic2] -> + case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of nomatch -> {Topic2, maps:put(share, Group, Options)}; _ -> error({invalid_topic, Topic}) end From 3822ff987bdcdcfd0a7d3b6d9e5149610e6ad9ae Mon Sep 17 00:00:00 2001 From: HuangDan Date: Tue, 18 Sep 2018 00:01:15 +0800 Subject: [PATCH 13/13] Fix function args Add test cases for emqx_pool module --- Makefile | 2 +- src/emqx_pool.erl | 2 +- test/emqx_pool_SUITE.erl | 65 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 test/emqx_pool_SUITE.erl diff --git a/Makefile b/Makefile index 73438ca01..54b13727a 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ - emqx_mountpoint emqx_listeners emqx_protocol + emqx_mountpoint emqx_listeners emqx_protocol emqx_pool CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index 52df7fec3..762f5dc6d 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -65,7 +65,7 @@ cast(Msg) -> %% @private worker() -> - gproc_pool:pick_worker(pool). + gproc_pool:pick_worker(?POOL). %%------------------------------------------------------------------------------ %% gen_server callbacks diff --git a/test/emqx_pool_SUITE.erl b/test/emqx_pool_SUITE.erl new file mode 100644 index 000000000..3d7d0f7e5 --- /dev/null +++ b/test/emqx_pool_SUITE.erl @@ -0,0 +1,65 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_pool_SUITE). + +-compile(export_all). + +-compile(nowarn_export_all). + +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> [ + {group, submit_case}, + {group, async_submit_case} + ]. + +groups() -> + [ + {submit_case, [sequence], [submit_mfa, submit_fa]}, + {async_submit_case, [sequence], [async_submit_mfa]} + ]. + +init_per_suite(Config) -> + application:ensure_all_started(gproc), + Config. + +end_per_suite(_Config) -> + ok. + +submit_mfa(_Config) -> + erlang:process_flag(trap_exit, true), + {ok, Pid} = emqx_pool:start_link(), + Result = emqx_pool:submit({?MODULE, test_mfa, []}), + ?assertEqual(15, Result), + gen_server:stop(Pid, normal, 3000), + ok. + +submit_fa(_Config) -> + {ok, Pid} = emqx_pool:start_link(), + Fun = fun(X) -> case X rem 2 of 0 -> {true, X div 2}; _ -> false end end, + Result = emqx_pool:submit(Fun, [2]), + ?assertEqual({true, 1}, Result), + exit(Pid, normal). + +test_mfa() -> + lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]). + +async_submit_mfa(_Config) -> + {ok, Pid} = emqx_pool:start_link(), + emqx_pool:async_submit({?MODULE, test_mfa, []}), + exit(Pid, normal). +