diff --git a/Makefile b/Makefile index 1470bd9ab..667a0cbc7 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,7 @@ dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master +ERLC_OPTS += +debug_info ERLC_OPTS += +'{parse_transform, lager_transform}' NO_AUTOPATCH = cuttlefish diff --git a/etc/emq.conf b/etc/emq.conf index 5fb004b12..d90d4d248 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -457,6 +457,17 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## listener.wss.external.fail_if_no_peer_cert = true +##-------------------------------------------------------------------- +## HTTP Management API Listener + +listener.api.mgmt = 127.0.0.1:8080 + +listener.api.mgmt.acceptors = 4 + +listener.api.mgmt.max_clients = 64 + +listener.api.mgmt.access.1 = allow all + ##------------------------------------------------------------------- ## System Monitor ##------------------------------------------------------------------- diff --git a/priv/emq.schema b/priv/emq.schema index b5e03a359..2ae853dec 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -1020,15 +1020,113 @@ end}. end end, + ApiListeners = fun(Type, Name) -> + Prefix = string:join(["listener", Type, Name], "."), + case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> + []; + ListenOn -> + SslOpts1 = case SslOpts(Prefix) of + [] -> []; + SslOpts0 -> [{sslopts, SslOpts0}] + end, + [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, + {sockopts, TcpOpts(Prefix)}| LisOpts(Prefix)] ++ SslOpts1}] + end + end, + + lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] ++ [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) + ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)] + ++ + [ApiListeners(Type, Name) || {["listener", Type, Name], ListenOn} + <- cuttlefish_variable:filter_by_prefix("listener.api", Conf)]) end}. +%%-------------------------------------------------------------------- +%% MQTT REST API Listeners + +{mapping, "listener.api.$name", "emqttd.listeners", [ + {datatype, [integer, ip]} +]}. + +{mapping, "listener.api.$name.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.api.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.api.$name.handshake_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}} +]}. + +{mapping, "listener.api.$name.keyfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.certfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.verify", "emqttd.listeners", [ + {datatype, atom} +]}. + +{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqttd.listeners", [ + {datatype, {enum, [true, false]}} +]}. + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 992e22da6..7709d134f 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -165,11 +165,14 @@ start_listener({ssl, ListenOn, Opts}) -> %% Start http listener start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws -> - mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqttd_http, handle_request, []}); + mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqttd_ws, handle_request, []}); %% Start https listener start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss -> - mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_http, handle_request, []}). + mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_ws, handle_request, []}); + +start_listener({Proto, ListenOn, Opts}) when Proto == api -> + mochiweb:start_http('mqtt:api', ListenOn, Opts, {emqttd_http, handle_request, []}). start_listener(Proto, ListenOn, Opts) -> Env = lists:append(emqttd:env(client, []), emqttd:env(protocol, [])), @@ -197,6 +200,8 @@ stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> mochiweb:stop_http('mqtt:ws', ListenOn); stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> mochiweb:stop_http('mqtt:wss', ListenOn); +stop_listener({Proto, ListenOn, _Opts}) when Proto == api -> + mochiweb:stop_http('mqtt:api', ListenOn); stop_listener({Proto, ListenOn, _Opts}) -> esockd:close(Proto, ListenOn). diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index 815ee38d9..49b5a95d0 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -37,7 +37,6 @@ -record(state, {pool, id, node, subtopic, - qos = ?QOS_0, topic_suffix = <<>>, topic_prefix = <<>>, mqueue :: emqttd_mqueue:mqueue(), @@ -45,8 +44,7 @@ ping_down_interval = ?PING_DOWN_INTERVAL, status = up}). --type(option() :: {qos, mqtt_qos()} | - {topic_suffix, binary()} | +-type(option() :: {topic_suffix, binary()} | {topic_prefix, binary()} | {max_queue_len, pos_integer()} | {ping_down_interval, pos_integer()}). @@ -87,8 +85,6 @@ init([Pool, Id, Node, Topic, Options]) -> parse_opts([], State) -> State; -parse_opts([{qos, Qos} | Opts], State) -> - parse_opts(Opts, State#state{qos = Qos}); parse_opts([{topic_suffix, Suffix} | Opts], State) -> parse_opts(Opts, State#state{topic_suffix= Suffix}); parse_opts([{topic_prefix, Prefix} | Opts], State) -> diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 8123fe669..6ff94893c 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -326,12 +326,11 @@ bridges(["list"]) -> bridges(["options"]) -> ?PRINT_MSG("Options:~n"), - ?PRINT_MSG(" qos = 0 | 1 | 2~n"), ?PRINT_MSG(" prefix = string~n"), ?PRINT_MSG(" suffix = string~n"), ?PRINT_MSG(" queue = integer~n"), ?PRINT_MSG("Example:~n"), - ?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n"); + ?PRINT_MSG(" prefix=abc/,suffix=/yxz,queue=1000~n"); bridges(["start", SNode, Topic]) -> case emqttd_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of @@ -363,8 +362,6 @@ parse_opts(Cmd, OptStr) -> Tokens = string:tokens(OptStr, ","), [parse_opt(Cmd, list_to_atom(Opt), Val) || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]]. -parse_opt(bridge, qos, Qos) -> - {qos, list_to_integer(Qos)}; parse_opt(bridge, suffix, Suffix) -> {topic_suffix, bin(Suffix)}; parse_opt(bridge, prefix, Prefix) -> diff --git a/src/emqttd_cluster.erl b/src/emqttd_cluster.erl index ab40875aa..28b2d1723 100644 --- a/src/emqttd_cluster.erl +++ b/src/emqttd_cluster.erl @@ -50,7 +50,7 @@ prepare() -> %% @doc Is node in cluster? -spec(is_clustered(node()) -> boolean()). is_clustered(Node) -> - lists:member(Node, emqttd_mnesia:running_nodes()). + lists:member(Node, emqttd_mnesia:cluster_nodes(all)). %% @doc Reboot after join or leave cluster. -spec(reboot() -> ok). diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 9b50cf34a..cf705ff00 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -52,25 +52,6 @@ handle_request('POST', "/mqtt/publish", Req) -> false -> Req:respond({401, [], <<"Unauthorized">>}) end; -%%-------------------------------------------------------------------- -%% MQTT Over WebSocket -%%-------------------------------------------------------------------- - -handle_request('GET', "/mqtt", Req) -> - lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), - Upgrade = Req:get_header_value("Upgrade"), - Proto = check_protocol_header(Req), - case {is_websocket(Upgrade), Proto} of - {true, "mqtt" ++ _Vsn} -> - emqttd_ws:handle_request(Req); - {false, _} -> - lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]), - Req:respond({400, [], <<"Bad Request">>}); - {_, Proto} -> - lager:error("WebSocket with error Protocol: ~s", [Proto]), - Req:respond({400, [], <<"Bad WebSocket Protocol">>}) - end; - %%-------------------------------------------------------------------- %% Get static files %%-------------------------------------------------------------------- @@ -83,18 +64,6 @@ handle_request(Method, Path, Req) -> lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]), Req:not_found(). -check_protocol_header(Req) -> - case emqttd:env(websocket_protocol_header, false) of - true -> get_protocol_header(Req); - false -> "mqtt-v3.1.1" - end. - -get_protocol_header(Req) -> - case Req:get_header_value("EMQ-WebSocket-Protocol") of - undefined -> Req:get_header_value("Sec-WebSocket-Protocol"); - Proto -> Proto - end. - %%-------------------------------------------------------------------- %% HTTP Publish %%-------------------------------------------------------------------- @@ -174,9 +143,6 @@ bool("1") -> true; bool(<<"0">>) -> false; bool(<<"1">>) -> true. -is_websocket(Upgrade) -> - Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket". - docroot() -> {file, Here} = code:is_loaded(?MODULE), Dir = filename:dirname(filename:dirname(Here)), diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 31fa88d3a..20c054ab2 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -27,7 +27,7 @@ %% Cluster mnesia -export([join_cluster/1, cluster_status/0, leave_cluster/0, - remove_from_cluster/1, running_nodes/0]). + remove_from_cluster/1, cluster_nodes/1, running_nodes/0]). %% Schema and tables -export([copy_schema/1, delete_schema/0, del_schema_copy/1, @@ -186,9 +186,11 @@ remove_from_cluster(Node) when Node =/= node() -> case {is_node_in_cluster(Node), is_running_db_node(Node)} of {true, true} -> ensure_ok(rpc:call(Node, ?MODULE, ensure_stopped, [])), + mnesia_lib:del(extra_db_nodes, Node), ensure_ok(del_schema_copy(Node)), ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); {true, false} -> + mnesia_lib:del(extra_db_nodes, Node), ensure_ok(del_schema_copy(Node)); %ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); {false, _} -> @@ -213,10 +215,18 @@ connect(Node) -> Error -> Error end. -%% @doc Running nodes +%% @doc Running nodes. -spec(running_nodes() -> list(node())). -running_nodes() -> - mnesia:system_info(running_db_nodes). +running_nodes() -> cluster_nodes(running). + +%% @doc Cluster nodes. +-spec(cluster_nodes(all | running | stopped) -> [node()]). +cluster_nodes(all) -> + mnesia:system_info(db_nodes); +cluster_nodes(running) -> + mnesia:system_info(running_db_nodes); +cluster_nodes(stopped) -> + cluster_nodes(all) -- cluster_nodes(running). %% @private ensure_ok(ok) -> ok; diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index bbba50d6a..d53ab8c3c 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -33,7 +33,8 @@ %% API Function Exports -export([start_link/2]). --export([start_session/2, lookup_session/1, register_session/3, unregister_session/1]). +-export([start_session/2, lookup_session/1, register_session/3, + unregister_session/1, unregister_session/2]). -export([dispatch/3]). @@ -99,9 +100,17 @@ register_session(ClientId, CleanSess, Properties) -> ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}). %% @doc Unregister a session. --spec(unregister_session(binary()) -> true). +-spec(unregister_session(binary()) -> boolean()). unregister_session(ClientId) -> - ets:delete(mqtt_local_session, ClientId). + unregister_session(ClientId, self()). + +unregister_session(ClientId, Pid) -> + case ets:lookup(mqtt_local_session, ClientId) of + [LocalSess = {_, Pid, _, _}] -> + ets:delete_object(mqtt_local_session, LocalSess); + _ -> + false + end. dispatch(ClientId, Topic, Msg) -> try ets:lookup_element(mqtt_local_session, ClientId, 2) of diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl index c6a68150d..dbf0ea08c 100644 --- a/src/emqttd_ws.erl +++ b/src/emqttd_ws.erl @@ -31,20 +31,53 @@ lager:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#wsocket_state.peername) | Args])). -%%-------------------------------------------------------------------- -%% Handle WebSocket Request -%%-------------------------------------------------------------------- -%% @doc Handle WebSocket Request. handle_request(Req) -> - {ok, ProtoEnv} = emqttd:env(protocol), - PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE), - Parser = emqttd_parser:initial_state(PacketSize), - %% Upgrade WebSocket. - {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3), - {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel), - ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser, - max_packet_size = PacketSize, client_pid = ClientPid}). + handle_request(Req:get(method), Req:get(path), Req). + +%%-------------------------------------------------------------------- +%% MQTT Over WebSocket +%%-------------------------------------------------------------------- +handle_request('GET', "/mqtt", Req) -> + lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), + Upgrade = Req:get_header_value("Upgrade"), + Proto = check_protocol_header(Req), + case {is_websocket(Upgrade), Proto} of + {true, "mqtt" ++ _Vsn} -> + {ok, ProtoEnv} = emqttd:env(protocol), + PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE), + Parser = emqttd_parser:initial_state(PacketSize), + %% Upgrade WebSocket. + {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3), + {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel), + ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser, + max_packet_size = PacketSize, client_pid = ClientPid}); + {false, _} -> + lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]), + Req:respond({400, [], <<"Bad Request">>}); + {_, Proto} -> + lager:error("WebSocket with error Protocol: ~s", [Proto]), + Req:respond({400, [], <<"Bad WebSocket Protocol">>}) + end; + +handle_request(Method, Path, Req) -> + lager:error("Unexpected WS Request: ~s ~s", [Method, Path]), + Req:not_found(). + +is_websocket(Upgrade) -> + Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket". + +check_protocol_header(Req) -> + case emqttd:env(websocket_protocol_header, false) of + true -> get_protocol_header(Req); + false -> "mqtt-v3.1.1" + end. + +get_protocol_header(Req) -> + case Req:get_header_value("EMQ-WebSocket-Protocol") of + undefined -> Req:get_header_value("Sec-WebSocket-Protocol"); + Proto -> Proto + end. %%-------------------------------------------------------------------- %% Receive Loop diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index b5402830d..f59c8968b 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -433,7 +433,7 @@ request_status(_) -> end, Status = iolist_to_binary(io_lib:format("Node ~s is ~s~nemqttd is ~s", [node(), InternalStatus, AppStatus])), - Url = "http://127.0.0.1:8083/status", + Url = "http://127.0.0.1:8080/status", {ok, {{"HTTP/1.1", 200, "OK"}, _, Return}} = httpc:request(get, {Url, []}, [], []), ?assertEqual(binary_to_list(Status), Return). @@ -446,7 +446,7 @@ request_publish(_) -> emqttd:unsubscribe(<<"a/b/c">>). connect_emqttd_publish_(Method, Api, Params, Auth) -> - Url = "http://127.0.0.1:8083/" ++ Api, + Url = "http://127.0.0.1:8080/" ++ Api, case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of {error, socket_closed_remotely} -> false; @@ -647,8 +647,8 @@ conflict_listeners(_) -> {current_clients, esockd:get_current_clients(Pid)}, {shutdown_count, esockd:get_shutdown_count(Pid)}]} end, esockd:listeners()), - ?assertEqual(1, proplists:get_value(current_clients, proplists:get_value("mqtt:tcp:1883", Listeners))), - ?assertEqual([{conflict,1}], proplists:get_value(shutdown_count, proplists:get_value("mqtt:tcp:1883", Listeners))), + ?assertEqual(1, proplists:get_value(current_clients, proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners))), + ?assertEqual([{conflict,1}], proplists:get_value(shutdown_count, proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners))), emqttc:disconnect(C2). cli_vm(_) -> diff --git a/test/emqttd_SUITE_data/emqttd.conf b/test/emqttd_SUITE_data/emqttd.conf index 46306484a..c3694c4b9 100644 --- a/test/emqttd_SUITE_data/emqttd.conf +++ b/test/emqttd_SUITE_data/emqttd.conf @@ -54,7 +54,7 @@ node.max_ets_tables = 256000 node.fullsweep_after = 1000 ## Crash dump -node.crash_dump = log/crash.dump +node.crash_dump = {{ platform_log_dir }}/crash.dump ## Distributed node ticktime node.dist_net_ticktime = 60 @@ -68,7 +68,7 @@ node.dist_listen_max = 6369 ##-------------------------------------------------------------------- ## Set the log dir -log.dir = log +log.dir = {{ platform_log_dir }} ## Console log. Enum: off, file, console, both log.console = console @@ -83,15 +83,15 @@ log.syslog = on log.syslog.level = error ## Console log file -## log.console.file = log/console.log +## log.console.file = {{ platform_log_dir }}/console.log ## Error log file -log.error.file = log/error.log +log.error.file = {{ platform_log_dir }}/error.log ## Enable the crash log. Enum: on, off log.crash = on -log.crash.file = log/crash.log +log.crash.file = {{ platform_log_dir }}/crash.log ##-------------------------------------------------------------------- ## Allow Anonymous and Default ACL @@ -104,7 +104,7 @@ mqtt.allow_anonymous = true mqtt.acl_nomatch = allow ## Default ACL File -mqtt.acl_file = etc/acl.conf +mqtt.acl_file = {{ platform_etc_dir }}/acl.conf ## Cache ACL for PUBLISH mqtt.cache_acl = true @@ -119,6 +119,9 @@ mqtt.max_clientid_len = 1024 ## Max Packet Size Allowed, 64K by default. mqtt.max_packet_size = 64KB +## Check Websocket Protocol Header. Enum: on, off +mqtt.websocket_protocol_header = on + ##-------------------------------------------------------------------- ## MQTT Connection ##-------------------------------------------------------------------- @@ -188,7 +191,7 @@ mqtt.mqueue.type = simple ## Max queue length. Enqueued messages when persistent client disconnected, ## or inflight window is full. 0 means no limit. -mqtt.mqueue.max_length = 0 +mqtt.mqueue.max_length = 1000 ## Low-water mark of queued messages mqtt.mqueue.low_watermark = 20% @@ -229,10 +232,10 @@ mqtt.bridge.ping_down_interval = 1 ##------------------------------------------------------------------- ## Dir of plugins' config -mqtt.plugins.etc_dir =etc/plugins/ +mqtt.plugins.etc_dir ={{ platform_etc_dir }}/plugins/ ## File to store loaded plugin names. -mqtt.plugins.loaded_file = data/loaded_plugins +mqtt.plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins ##-------------------------------------------------------------------- ## MQTT Listeners @@ -354,9 +357,9 @@ listener.ssl.external.keyfile = certs/key.pem listener.ssl.external.certfile = certs/cert.pem -## listener.ssl.external.cacertfile = certs/cacert.pem +## listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem -## listener.ssl.external.dhfile = certs/dh-params.pem +## listener.ssl.external.dhfile = {{ platform_etc_dir }}/certs/dh-params.pem ## listener.ssl.external.verify = verify_peer @@ -437,12 +440,23 @@ listener.wss.external.keyfile = certs/key.pem listener.wss.external.certfile = certs/cert.pem -## listener.wss.external.cacertfile = certs/cacert.pem +## listener.wss.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem ## listener.wss.external.verify = verify_peer ## listener.wss.external.fail_if_no_peer_cert = true +##-------------------------------------------------------------------- +## HTTP Management API Listener + +listener.api.mgmt = 127.0.0.1:8080 + +listener.api.mgmt.acceptors = 4 + +listener.api.mgmt.max_clients = 64 + +listener.api.mgmt.access.1 = allow all + ##------------------------------------------------------------------- ## System Monitor ##------------------------------------------------------------------- diff --git a/test/emqttd_SUITE_data/emqttd.schema b/test/emqttd_SUITE_data/emqttd.schema index 9b20ea4c2..a2860322c 100644 --- a/test/emqttd_SUITE_data/emqttd.schema +++ b/test/emqttd_SUITE_data/emqttd.schema @@ -346,6 +346,11 @@ end}. {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}] end}. +{mapping, "mqtt.websocket_protocol_header", "emqttd.websocket_protocol_header", [ + {default, on}, + {datatype, flag} +]}. + %%-------------------------------------------------------------------- %% MQTT Connection %%-------------------------------------------------------------------- @@ -1016,15 +1021,113 @@ end}. end end, + ApiListeners = fun(Type, Name) -> + Prefix = string:join(["listener", Type, Name], "."), + case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> + []; + ListenOn -> + SslOpts1 = case SslOpts(Prefix) of + [] -> []; + SslOpts0 -> [{sslopts, SslOpts0}] + end, + [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, + {sockopts, TcpOpts(Prefix)}| LisOpts(Prefix)] ++ SslOpts1}] + end + end, + + lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] ++ [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) + ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)] + ++ + [ApiListeners(Type, Name) || {["listener", Type, Name], ListenOn} + <- cuttlefish_variable:filter_by_prefix("listener.api", Conf)]) end}. +%%-------------------------------------------------------------------- +%% MQTT REST API Listeners + +{mapping, "listener.api.$name", "emqttd.listeners", [ + {datatype, [integer, ip]} +]}. + +{mapping, "listener.api.$name.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.api.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.api.$name.handshake_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}} +]}. + +{mapping, "listener.api.$name.keyfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.certfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.verify", "emqttd.listeners", [ + {datatype, atom} +]}. + +{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqttd.listeners", [ + {datatype, {enum, [true, false]}} +]}. + %%-------------------------------------------------------------------- %% System Monitor %%--------------------------------------------------------------------