From 034f39e81101130a5e7f219af23988690017e65f Mon Sep 17 00:00:00 2001 From: huangdan Date: Mon, 5 Jun 2017 10:38:48 +0800 Subject: [PATCH 01/15] =?UTF-8?q?Add=20=E2=80=98Compiled=20with=20option?= =?UTF-8?q?=20debug=5Finfo=E2=80=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index abd416178..5841ef611 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,7 @@ dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master +ERLC_OPTS += +debug_info ERLC_OPTS += +'{parse_transform, lager_transform}' NO_AUTOPATCH = cuttlefish From 1a8cc2e146d8aa15791a848b0817cb823243e6e7 Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 9 Jun 2017 14:54:03 +0800 Subject: [PATCH 02/15] Add http to listen on port 8080 for the http REST API --- etc/emq.conf | 24 ++++++++++++ priv/emq.schema | 90 ++++++++++++++++++++++++++++++++++++++++++++- src/emqttd_app.erl | 7 +++- src/emqttd_http.erl | 34 ----------------- src/emqttd_ws.erl | 57 ++++++++++++++++++++++------ 5 files changed, 163 insertions(+), 49 deletions(-) diff --git a/etc/emq.conf b/etc/emq.conf index 1b78c9ae4..ea7e3e2ba 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -446,6 +446,30 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## listener.wss.external.fail_if_no_peer_cert = true +##-------------------------------------------------------------------- +## External MQTT/REST API Listener + +listener.api.external = 8080 + +listener.api.external.acceptors = 4 + +listener.api.external.max_clients = 64 + +listener.api.external.access.1 = allow all + +## TCP Options +listener.api.external.backlog = 1024 + +listener.api.external.recbuf = 4KB + +listener.api.external.sndbuf = 4KB + +listener.api.external.buffer = 4KB + +listener.api.external.nodelay = true + + + ##------------------------------------------------------------------- ## System Monitor ##------------------------------------------------------------------- diff --git a/priv/emq.schema b/priv/emq.schema index 5d9617e64..b812c52b6 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -1027,9 +1027,97 @@ end}. ++ [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) + ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf) + ++ cuttlefish_variable:filter_by_prefix("listener.api", Conf)]) end}. +%%-------------------------------------------------------------------- +%% MQTT REST API Listeners + +{mapping, "listener.api.$name", "emqttd.listeners", [ + {datatype, [integer, ip]} +]}. + +{mapping, "listener.api.$name.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.zone", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.mountpoint", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.api.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.api.$name.handshake_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}} +]}. + +{mapping, "listener.api.$name.keyfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.certfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.verify", "emqttd.listeners", [ + {datatype, atom} +]}. + +{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqttd.listeners", [ + {datatype, {enum, [true, false]}} +]}. + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 992e22da6..272ccffc9 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -165,11 +165,14 @@ start_listener({ssl, ListenOn, Opts}) -> %% Start http listener start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws -> - mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqttd_http, handle_request, []}); + mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqttd_ws, handle_request, []}); %% Start https listener start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss -> - mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_http, handle_request, []}). + mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_ws, handle_request, []}); + +start_listener({Proto, ListenOn, Opts}) when Proto == api -> + mochiweb:start_http('mqtt:api', ListenOn, Opts, {emqttd_http, handle_request, []}). start_listener(Proto, ListenOn, Opts) -> Env = lists:append(emqttd:env(client, []), emqttd:env(protocol, [])), diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 9b50cf34a..cf705ff00 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -52,25 +52,6 @@ handle_request('POST', "/mqtt/publish", Req) -> false -> Req:respond({401, [], <<"Unauthorized">>}) end; -%%-------------------------------------------------------------------- -%% MQTT Over WebSocket -%%-------------------------------------------------------------------- - -handle_request('GET', "/mqtt", Req) -> - lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), - Upgrade = Req:get_header_value("Upgrade"), - Proto = check_protocol_header(Req), - case {is_websocket(Upgrade), Proto} of - {true, "mqtt" ++ _Vsn} -> - emqttd_ws:handle_request(Req); - {false, _} -> - lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]), - Req:respond({400, [], <<"Bad Request">>}); - {_, Proto} -> - lager:error("WebSocket with error Protocol: ~s", [Proto]), - Req:respond({400, [], <<"Bad WebSocket Protocol">>}) - end; - %%-------------------------------------------------------------------- %% Get static files %%-------------------------------------------------------------------- @@ -83,18 +64,6 @@ handle_request(Method, Path, Req) -> lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]), Req:not_found(). -check_protocol_header(Req) -> - case emqttd:env(websocket_protocol_header, false) of - true -> get_protocol_header(Req); - false -> "mqtt-v3.1.1" - end. - -get_protocol_header(Req) -> - case Req:get_header_value("EMQ-WebSocket-Protocol") of - undefined -> Req:get_header_value("Sec-WebSocket-Protocol"); - Proto -> Proto - end. - %%-------------------------------------------------------------------- %% HTTP Publish %%-------------------------------------------------------------------- @@ -174,9 +143,6 @@ bool("1") -> true; bool(<<"0">>) -> false; bool(<<"1">>) -> true. -is_websocket(Upgrade) -> - Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket". - docroot() -> {file, Here} = code:is_loaded(?MODULE), Dir = filename:dirname(filename:dirname(Here)), diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl index c6a68150d..dbf0ea08c 100644 --- a/src/emqttd_ws.erl +++ b/src/emqttd_ws.erl @@ -31,20 +31,53 @@ lager:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#wsocket_state.peername) | Args])). -%%-------------------------------------------------------------------- -%% Handle WebSocket Request -%%-------------------------------------------------------------------- -%% @doc Handle WebSocket Request. handle_request(Req) -> - {ok, ProtoEnv} = emqttd:env(protocol), - PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE), - Parser = emqttd_parser:initial_state(PacketSize), - %% Upgrade WebSocket. - {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3), - {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel), - ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser, - max_packet_size = PacketSize, client_pid = ClientPid}). + handle_request(Req:get(method), Req:get(path), Req). + +%%-------------------------------------------------------------------- +%% MQTT Over WebSocket +%%-------------------------------------------------------------------- +handle_request('GET', "/mqtt", Req) -> + lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), + Upgrade = Req:get_header_value("Upgrade"), + Proto = check_protocol_header(Req), + case {is_websocket(Upgrade), Proto} of + {true, "mqtt" ++ _Vsn} -> + {ok, ProtoEnv} = emqttd:env(protocol), + PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE), + Parser = emqttd_parser:initial_state(PacketSize), + %% Upgrade WebSocket. + {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3), + {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel), + ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser, + max_packet_size = PacketSize, client_pid = ClientPid}); + {false, _} -> + lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]), + Req:respond({400, [], <<"Bad Request">>}); + {_, Proto} -> + lager:error("WebSocket with error Protocol: ~s", [Proto]), + Req:respond({400, [], <<"Bad WebSocket Protocol">>}) + end; + +handle_request(Method, Path, Req) -> + lager:error("Unexpected WS Request: ~s ~s", [Method, Path]), + Req:not_found(). + +is_websocket(Upgrade) -> + Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket". + +check_protocol_header(Req) -> + case emqttd:env(websocket_protocol_header, false) of + true -> get_protocol_header(Req); + false -> "mqtt-v3.1.1" + end. + +get_protocol_header(Req) -> + case Req:get_header_value("EMQ-WebSocket-Protocol") of + undefined -> Req:get_header_value("Sec-WebSocket-Protocol"); + Proto -> Proto + end. %%-------------------------------------------------------------------- %% Receive Loop From 0d1205962ef1b814cd30200026827db0e2b95397 Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 9 Jun 2017 15:04:53 +0800 Subject: [PATCH 03/15] Update listener.api configuration --- etc/emq.conf | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/etc/emq.conf b/etc/emq.conf index e0a0508d3..a063025dc 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -449,7 +449,7 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ##-------------------------------------------------------------------- ## External MQTT/REST API Listener -listener.api.external = 8080 +listener.api.external = 127.0.0.1:8080 listener.api.external.acceptors = 4 @@ -457,19 +457,6 @@ listener.api.external.max_clients = 64 listener.api.external.access.1 = allow all -## TCP Options -listener.api.external.backlog = 1024 - -listener.api.external.recbuf = 4KB - -listener.api.external.sndbuf = 4KB - -listener.api.external.buffer = 4KB - -listener.api.external.nodelay = true - - - ##------------------------------------------------------------------- ## System Monitor ##------------------------------------------------------------------- From 4888702712de028d84f528acd20260d80a517555 Mon Sep 17 00:00:00 2001 From: turtled Date: Mon, 12 Jun 2017 15:13:54 +0800 Subject: [PATCH 04/15] Remove bridge command qos option --- src/emqttd_bridge.erl | 6 +----- src/emqttd_cli.erl | 3 --- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index 815ee38d9..49b5a95d0 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -37,7 +37,6 @@ -record(state, {pool, id, node, subtopic, - qos = ?QOS_0, topic_suffix = <<>>, topic_prefix = <<>>, mqueue :: emqttd_mqueue:mqueue(), @@ -45,8 +44,7 @@ ping_down_interval = ?PING_DOWN_INTERVAL, status = up}). --type(option() :: {qos, mqtt_qos()} | - {topic_suffix, binary()} | +-type(option() :: {topic_suffix, binary()} | {topic_prefix, binary()} | {max_queue_len, pos_integer()} | {ping_down_interval, pos_integer()}). @@ -87,8 +85,6 @@ init([Pool, Id, Node, Topic, Options]) -> parse_opts([], State) -> State; -parse_opts([{qos, Qos} | Opts], State) -> - parse_opts(Opts, State#state{qos = Qos}); parse_opts([{topic_suffix, Suffix} | Opts], State) -> parse_opts(Opts, State#state{topic_suffix= Suffix}); parse_opts([{topic_prefix, Prefix} | Opts], State) -> diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 8123fe669..6918c68e8 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -326,7 +326,6 @@ bridges(["list"]) -> bridges(["options"]) -> ?PRINT_MSG("Options:~n"), - ?PRINT_MSG(" qos = 0 | 1 | 2~n"), ?PRINT_MSG(" prefix = string~n"), ?PRINT_MSG(" suffix = string~n"), ?PRINT_MSG(" queue = integer~n"), @@ -363,8 +362,6 @@ parse_opts(Cmd, OptStr) -> Tokens = string:tokens(OptStr, ","), [parse_opt(Cmd, list_to_atom(Opt), Val) || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]]. -parse_opt(bridge, qos, Qos) -> - {qos, list_to_integer(Qos)}; parse_opt(bridge, suffix, Suffix) -> {topic_suffix, bin(Suffix)}; parse_opt(bridge, prefix, Prefix) -> From 6918a69a2b704d6f1cc409f5338dedd571592e88 Mon Sep 17 00:00:00 2001 From: turtled Date: Mon, 12 Jun 2017 16:09:18 +0800 Subject: [PATCH 05/15] Update listener.api configuration --- etc/emq.conf | 10 +++++----- priv/emq.schema | 8 -------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/etc/emq.conf b/etc/emq.conf index a063025dc..ca694cf70 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -447,15 +447,15 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## listener.wss.external.fail_if_no_peer_cert = true ##-------------------------------------------------------------------- -## External MQTT/REST API Listener +## HTTP Management API Listener -listener.api.external = 127.0.0.1:8080 +listener.api.mgmt = 127.0.0.1:8080 -listener.api.external.acceptors = 4 +listener.api.mgmt.acceptors = 4 -listener.api.external.max_clients = 64 +listener.api.mgmt.max_clients = 64 -listener.api.external.access.1 = allow all +listener.api.mgmt.access.1 = allow all ##------------------------------------------------------------------- ## System Monitor diff --git a/priv/emq.schema b/priv/emq.schema index b812c52b6..8922e1e9e 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -1048,14 +1048,6 @@ end}. {datatype, integer} ]}. -{mapping, "listener.api.$name.zone", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.mountpoint", "emqttd.listeners", [ - {datatype, string} -]}. - {mapping, "listener.api.$name.rate_limit", "emqttd.listeners", [ {datatype, string} ]}. From 53f5188cbba5edce4734ccce62ef6ee14e8d6698 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 12 Jun 2017 19:13:58 +0800 Subject: [PATCH 06/15] Fix the race condition issue caused by unregister_session/1 --- src/emqttd_sm.erl | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index bbba50d6a..d53ab8c3c 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -33,7 +33,8 @@ %% API Function Exports -export([start_link/2]). --export([start_session/2, lookup_session/1, register_session/3, unregister_session/1]). +-export([start_session/2, lookup_session/1, register_session/3, + unregister_session/1, unregister_session/2]). -export([dispatch/3]). @@ -99,9 +100,17 @@ register_session(ClientId, CleanSess, Properties) -> ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}). %% @doc Unregister a session. --spec(unregister_session(binary()) -> true). +-spec(unregister_session(binary()) -> boolean()). unregister_session(ClientId) -> - ets:delete(mqtt_local_session, ClientId). + unregister_session(ClientId, self()). + +unregister_session(ClientId, Pid) -> + case ets:lookup(mqtt_local_session, ClientId) of + [LocalSess = {_, Pid, _, _}] -> + ets:delete_object(mqtt_local_session, LocalSess); + _ -> + false + end. dispatch(ClientId, Topic, Msg) -> try ets:lookup_element(mqtt_local_session, ClientId, 2) of From 70507e893cc83d734a4d31b7c000c73d9a0b5aed Mon Sep 17 00:00:00 2001 From: turtled Date: Mon, 12 Jun 2017 21:54:51 +0800 Subject: [PATCH 07/15] Update listener.api schema --- priv/emq.schema | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/priv/emq.schema b/priv/emq.schema index 8922e1e9e..a2860322c 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -1021,14 +1021,32 @@ end}. end end, + ApiListeners = fun(Type, Name) -> + Prefix = string:join(["listener", Type, Name], "."), + case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> + []; + ListenOn -> + SslOpts1 = case SslOpts(Prefix) of + [] -> []; + SslOpts0 -> [{sslopts, SslOpts0}] + end, + [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, + {sockopts, TcpOpts(Prefix)}| LisOpts(Prefix)] ++ SslOpts1}] + end + end, + + lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] ++ [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.api", Conf)]) + ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)] + ++ + [ApiListeners(Type, Name) || {["listener", Type, Name], ListenOn} + <- cuttlefish_variable:filter_by_prefix("listener.api", Conf)]) end}. %%-------------------------------------------------------------------- From de2e1f79e8687fd4aa6065ebf95cbfb9867f6c78 Mon Sep 17 00:00:00 2001 From: HuangDan Date: Mon, 12 Jun 2017 22:45:39 +0800 Subject: [PATCH 08/15] Update test case for http api --- test/emqttd_SUITE.erl | 8 +- test/emqttd_SUITE_data/emqttd.conf | 38 +++++++--- test/emqttd_SUITE_data/emqttd.schema | 105 ++++++++++++++++++++++++++- 3 files changed, 134 insertions(+), 17 deletions(-) diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index b5402830d..f59c8968b 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -433,7 +433,7 @@ request_status(_) -> end, Status = iolist_to_binary(io_lib:format("Node ~s is ~s~nemqttd is ~s", [node(), InternalStatus, AppStatus])), - Url = "http://127.0.0.1:8083/status", + Url = "http://127.0.0.1:8080/status", {ok, {{"HTTP/1.1", 200, "OK"}, _, Return}} = httpc:request(get, {Url, []}, [], []), ?assertEqual(binary_to_list(Status), Return). @@ -446,7 +446,7 @@ request_publish(_) -> emqttd:unsubscribe(<<"a/b/c">>). connect_emqttd_publish_(Method, Api, Params, Auth) -> - Url = "http://127.0.0.1:8083/" ++ Api, + Url = "http://127.0.0.1:8080/" ++ Api, case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of {error, socket_closed_remotely} -> false; @@ -647,8 +647,8 @@ conflict_listeners(_) -> {current_clients, esockd:get_current_clients(Pid)}, {shutdown_count, esockd:get_shutdown_count(Pid)}]} end, esockd:listeners()), - ?assertEqual(1, proplists:get_value(current_clients, proplists:get_value("mqtt:tcp:1883", Listeners))), - ?assertEqual([{conflict,1}], proplists:get_value(shutdown_count, proplists:get_value("mqtt:tcp:1883", Listeners))), + ?assertEqual(1, proplists:get_value(current_clients, proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners))), + ?assertEqual([{conflict,1}], proplists:get_value(shutdown_count, proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners))), emqttc:disconnect(C2). cli_vm(_) -> diff --git a/test/emqttd_SUITE_data/emqttd.conf b/test/emqttd_SUITE_data/emqttd.conf index 46306484a..c3694c4b9 100644 --- a/test/emqttd_SUITE_data/emqttd.conf +++ b/test/emqttd_SUITE_data/emqttd.conf @@ -54,7 +54,7 @@ node.max_ets_tables = 256000 node.fullsweep_after = 1000 ## Crash dump -node.crash_dump = log/crash.dump +node.crash_dump = {{ platform_log_dir }}/crash.dump ## Distributed node ticktime node.dist_net_ticktime = 60 @@ -68,7 +68,7 @@ node.dist_listen_max = 6369 ##-------------------------------------------------------------------- ## Set the log dir -log.dir = log +log.dir = {{ platform_log_dir }} ## Console log. Enum: off, file, console, both log.console = console @@ -83,15 +83,15 @@ log.syslog = on log.syslog.level = error ## Console log file -## log.console.file = log/console.log +## log.console.file = {{ platform_log_dir }}/console.log ## Error log file -log.error.file = log/error.log +log.error.file = {{ platform_log_dir }}/error.log ## Enable the crash log. Enum: on, off log.crash = on -log.crash.file = log/crash.log +log.crash.file = {{ platform_log_dir }}/crash.log ##-------------------------------------------------------------------- ## Allow Anonymous and Default ACL @@ -104,7 +104,7 @@ mqtt.allow_anonymous = true mqtt.acl_nomatch = allow ## Default ACL File -mqtt.acl_file = etc/acl.conf +mqtt.acl_file = {{ platform_etc_dir }}/acl.conf ## Cache ACL for PUBLISH mqtt.cache_acl = true @@ -119,6 +119,9 @@ mqtt.max_clientid_len = 1024 ## Max Packet Size Allowed, 64K by default. mqtt.max_packet_size = 64KB +## Check Websocket Protocol Header. Enum: on, off +mqtt.websocket_protocol_header = on + ##-------------------------------------------------------------------- ## MQTT Connection ##-------------------------------------------------------------------- @@ -188,7 +191,7 @@ mqtt.mqueue.type = simple ## Max queue length. Enqueued messages when persistent client disconnected, ## or inflight window is full. 0 means no limit. -mqtt.mqueue.max_length = 0 +mqtt.mqueue.max_length = 1000 ## Low-water mark of queued messages mqtt.mqueue.low_watermark = 20% @@ -229,10 +232,10 @@ mqtt.bridge.ping_down_interval = 1 ##------------------------------------------------------------------- ## Dir of plugins' config -mqtt.plugins.etc_dir =etc/plugins/ +mqtt.plugins.etc_dir ={{ platform_etc_dir }}/plugins/ ## File to store loaded plugin names. -mqtt.plugins.loaded_file = data/loaded_plugins +mqtt.plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins ##-------------------------------------------------------------------- ## MQTT Listeners @@ -354,9 +357,9 @@ listener.ssl.external.keyfile = certs/key.pem listener.ssl.external.certfile = certs/cert.pem -## listener.ssl.external.cacertfile = certs/cacert.pem +## listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem -## listener.ssl.external.dhfile = certs/dh-params.pem +## listener.ssl.external.dhfile = {{ platform_etc_dir }}/certs/dh-params.pem ## listener.ssl.external.verify = verify_peer @@ -437,12 +440,23 @@ listener.wss.external.keyfile = certs/key.pem listener.wss.external.certfile = certs/cert.pem -## listener.wss.external.cacertfile = certs/cacert.pem +## listener.wss.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem ## listener.wss.external.verify = verify_peer ## listener.wss.external.fail_if_no_peer_cert = true +##-------------------------------------------------------------------- +## HTTP Management API Listener + +listener.api.mgmt = 127.0.0.1:8080 + +listener.api.mgmt.acceptors = 4 + +listener.api.mgmt.max_clients = 64 + +listener.api.mgmt.access.1 = allow all + ##------------------------------------------------------------------- ## System Monitor ##------------------------------------------------------------------- diff --git a/test/emqttd_SUITE_data/emqttd.schema b/test/emqttd_SUITE_data/emqttd.schema index 9b20ea4c2..a2860322c 100644 --- a/test/emqttd_SUITE_data/emqttd.schema +++ b/test/emqttd_SUITE_data/emqttd.schema @@ -346,6 +346,11 @@ end}. {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}] end}. +{mapping, "mqtt.websocket_protocol_header", "emqttd.websocket_protocol_header", [ + {default, on}, + {datatype, flag} +]}. + %%-------------------------------------------------------------------- %% MQTT Connection %%-------------------------------------------------------------------- @@ -1016,15 +1021,113 @@ end}. end end, + ApiListeners = fun(Type, Name) -> + Prefix = string:join(["listener", Type, Name], "."), + case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> + []; + ListenOn -> + SslOpts1 = case SslOpts(Prefix) of + [] -> []; + SslOpts0 -> [{sslopts, SslOpts0}] + end, + [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, + {sockopts, TcpOpts(Prefix)}| LisOpts(Prefix)] ++ SslOpts1}] + end + end, + + lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] ++ [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) + ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)] + ++ + [ApiListeners(Type, Name) || {["listener", Type, Name], ListenOn} + <- cuttlefish_variable:filter_by_prefix("listener.api", Conf)]) end}. +%%-------------------------------------------------------------------- +%% MQTT REST API Listeners + +{mapping, "listener.api.$name", "emqttd.listeners", [ + {datatype, [integer, ip]} +]}. + +{mapping, "listener.api.$name.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.api.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.api.$name.handshake_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}} +]}. + +{mapping, "listener.api.$name.keyfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.certfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.verify", "emqttd.listeners", [ + {datatype, atom} +]}. + +{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqttd.listeners", [ + {datatype, {enum, [true, false]}} +]}. + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- From c0bb20c2d921f23b24cb953bca5dc00101d92b5e Mon Sep 17 00:00:00 2001 From: HuangDan Date: Mon, 12 Jun 2017 22:52:23 +0800 Subject: [PATCH 09/15] Remove bridge command qos option --- src/emqttd_cli.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 6918c68e8..6ff94893c 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -330,7 +330,7 @@ bridges(["options"]) -> ?PRINT_MSG(" suffix = string~n"), ?PRINT_MSG(" queue = integer~n"), ?PRINT_MSG("Example:~n"), - ?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n"); + ?PRINT_MSG(" prefix=abc/,suffix=/yxz,queue=1000~n"); bridges(["start", SNode, Topic]) -> case emqttd_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of From a5cac277c140bc2dde5a1edb5127fa9c8d6979b1 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 13 Jun 2017 10:33:56 +0800 Subject: [PATCH 10/15] Add cluster_nodes/1 function --- src/emqttd_mnesia.erl | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 31fa88d3a..d8e0f88af 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -27,7 +27,7 @@ %% Cluster mnesia -export([join_cluster/1, cluster_status/0, leave_cluster/0, - remove_from_cluster/1, running_nodes/0]). + remove_from_cluster/1, cluster_nodes/1, running_nodes/0]). %% Schema and tables -export([copy_schema/1, delete_schema/0, del_schema_copy/1, @@ -213,10 +213,18 @@ connect(Node) -> Error -> Error end. -%% @doc Running nodes +%% @doc Running nodes. -spec(running_nodes() -> list(node())). -running_nodes() -> - mnesia:system_info(running_db_nodes). +running_nodes() -> cluster_nodes(running). + +%% @doc Cluster nodes. +-spec(cluster_nodes(all | running | stopped) -> [node()]). +cluster_nodes(all) -> + mnesia:system_info(db_nodes); +cluster_nodes(running) -> + mnesia:system_info(running_db_nodes); +cluster_nodes(stopped) -> + cluster_nodes(all) -- cluster_nodes(running). %% @private ensure_ok(ok) -> ok; From 79c2fa631e6e7cd4e3ff8cdcb3f9c9b3b1dbcbcf Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 13 Jun 2017 10:35:22 +0800 Subject: [PATCH 11/15] Fix the issue that we cannot remove a down node from the cluster --- src/emqttd_cluster.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_cluster.erl b/src/emqttd_cluster.erl index ab40875aa..28b2d1723 100644 --- a/src/emqttd_cluster.erl +++ b/src/emqttd_cluster.erl @@ -50,7 +50,7 @@ prepare() -> %% @doc Is node in cluster? -spec(is_clustered(node()) -> boolean()). is_clustered(Node) -> - lists:member(Node, emqttd_mnesia:running_nodes()). + lists:member(Node, emqttd_mnesia:cluster_nodes(all)). %% @doc Reboot after join or leave cluster. -spec(reboot() -> ok). From f18051c34029a3987a227532884ac8aa8ad4198e Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 13 Jun 2017 11:06:35 +0800 Subject: [PATCH 12/15] Add stop api listener function --- src/emqttd_app.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 272ccffc9..76506ee7c 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -200,7 +200,9 @@ stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> mochiweb:stop_http('mqtt:ws', ListenOn); stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> mochiweb:stop_http('mqtt:wss', ListenOn); -stop_listener({Proto, ListenOn, _Opts}) -> +sstop_listener({Proto, ListenOn, _Opts}) when Proto == api -> + mochiweb:stop_http('mqtt:api', ListenOn); +top_listener({Proto, ListenOn, _Opts}) -> esockd:close(Proto, ListenOn). -ifdef(TEST). From bbcc082a0d184bdc8b548df8a3920f7a103f56ba Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 13 Jun 2017 11:15:26 +0800 Subject: [PATCH 13/15] Add stop api listener function --- src/emqttd_app.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 76506ee7c..7709d134f 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -200,9 +200,9 @@ stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> mochiweb:stop_http('mqtt:ws', ListenOn); stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> mochiweb:stop_http('mqtt:wss', ListenOn); -sstop_listener({Proto, ListenOn, _Opts}) when Proto == api -> +stop_listener({Proto, ListenOn, _Opts}) when Proto == api -> mochiweb:stop_http('mqtt:api', ListenOn); -top_listener({Proto, ListenOn, _Opts}) -> +stop_listener({Proto, ListenOn, _Opts}) -> esockd:close(Proto, ListenOn). -ifdef(TEST). From 88d466eb1a299294f4aa35db692be77231c07094 Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 13 Jun 2017 21:02:38 +0800 Subject: [PATCH 14/15] remove from cluster, delete the node in extra_db_nodes --- src/emqttd_mnesia.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index d8e0f88af..9c713cdd6 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -187,9 +187,11 @@ remove_from_cluster(Node) when Node =/= node() -> {true, true} -> ensure_ok(rpc:call(Node, ?MODULE, ensure_stopped, [])), ensure_ok(del_schema_copy(Node)), - ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); + ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])), + mnesia_lib:del(extra_db_nodes, Node); {true, false} -> - ensure_ok(del_schema_copy(Node)); + ensure_ok(del_schema_copy(Node)), + mnesia_lib:del(extra_db_nodes, Node); %ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); {false, _} -> {error, node_not_in_cluster} From 37b625aa575a2b377e5e032a096c1a67cb14ef59 Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 13 Jun 2017 21:13:57 +0800 Subject: [PATCH 15/15] Format code --- src/emqttd_mnesia.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 9c713cdd6..20c054ab2 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -186,12 +186,12 @@ remove_from_cluster(Node) when Node =/= node() -> case {is_node_in_cluster(Node), is_running_db_node(Node)} of {true, true} -> ensure_ok(rpc:call(Node, ?MODULE, ensure_stopped, [])), + mnesia_lib:del(extra_db_nodes, Node), ensure_ok(del_schema_copy(Node)), - ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])), - mnesia_lib:del(extra_db_nodes, Node); + ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); {true, false} -> - ensure_ok(del_schema_copy(Node)), - mnesia_lib:del(extra_db_nodes, Node); + mnesia_lib:del(extra_db_nodes, Node), + ensure_ok(del_schema_copy(Node)); %ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); {false, _} -> {error, node_not_in_cluster}