diff --git a/Makefile b/Makefile index c8ea41300..9270a562e 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,15 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker -PROJECT_VERSION = 2.2 +PROJECT_VERSION = 2.3 -DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt +DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt dep_goldrush = git https://github.com/basho/goldrush 0.1.9 dep_gproc = git https://github.com/uwiger/gproc dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master dep_esockd = git https://github.com/emqtt/esockd master +dep_ekka = git https://github.com/emqtt/ekka develop dep_mochiweb = git https://github.com/emqtt/mochiweb master dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog diff --git a/etc/emq.conf b/etc/emq.conf index d90d4d248..82209dde7 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -1,24 +1,76 @@ - ##=================================================================== -## EMQ Configuration R2.2 +## EMQ Configuration R2.3 ##=================================================================== ##-------------------------------------------------------------------- ## Cluster ##-------------------------------------------------------------------- -## The cluster Id -cluster.id = emq +## Cluster name +cluster.name = emqcl -## The multicast address and port. -cluster.multicast = 239.192.0.1:44369 +## Cluster discovery strategy: manual | static | mcast | dns | etcd | k8s +cluster.discovery = manual + +## Cluster Autoheal: on | off +cluster.autoheal = on + +## Clean down node of the cluster +cluster.autoclean = 5m + +##-------------------------------------------------------------------- +## Cluster with static node list + +## cluster.static.seeds = emq1@127.0.0.1,emq2@127.0.0.1 + +##-------------------------------------------------------------------- +## Cluster with multicast + +## cluster.mcast.addr = 239.192.0.1 + +## cluster.mcast.ports = 4369,4370 + +## cluster.mcast.iface = 0.0.0.0 + +## cluster.mcast.ttl = 255 + +## cluster.mcast.loop = on + +##-------------------------------------------------------------------- +## Cluster with DNS + +## cluster.dns.name = localhost + +## cluster.dns.app = emq + +##-------------------------------------------------------------------- +## Cluster with Etcd + +## cluster.etcd.server = http://127.0.0.1:2379 + +## cluster.etcd.prefix = emqcl + +## cluster.etcd.node_ttl = 1m + +##-------------------------------------------------------------------- +## Cluster with k8s + +## cluster.k8s.apiserver = http://10.110.111.204:8080 + +## cluster.k8s.service_name = emq + +## Address Type: ip | dns +## cluster.k8s.address_type = ip + +## The Erlang application name +## cluster.k8s.app_name = emq ##-------------------------------------------------------------------- ## Node Args ##-------------------------------------------------------------------- ## Node name -node.name = emqttd@127.0.0.1 +node.name = emq@127.0.0.1 ## Cookie for distributed node node.cookie = emqsecretcookie @@ -61,7 +113,7 @@ node.dist_net_ticktime = 60 ## Distributed node port range node.dist_listen_min = 6369 -node.dist_listen_max = 6369 +node.dist_listen_max = 6379 ##-------------------------------------------------------------------- ## Log @@ -85,6 +137,9 @@ log.syslog.level = error ## Console log file ## log.console.file = {{ platform_log_dir }}/console.log +## Info log file +## log.info.file = {{ platform_log_dir }}/info.log + ## Error log file log.error.file = {{ platform_log_dir }}/error.log diff --git a/priv/emq.schema b/priv/emq.schema index 2ae853dec..f4c369b9c 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -1,28 +1,162 @@ %%-*- mode: erlang -*- -%% EMQ config mapping +%% EMQ R2.3 config mapping %%-------------------------------------------------------------------- %% Cluster %%-------------------------------------------------------------------- -%% Cluster ID -{mapping, "cluster.id", "emqttd.cluster", [ - {default, "emq"}, +%% @doc Cluster name +{mapping, "cluster.name", "ekka.cluster_name", [ + {default, emqcl}, + {datatype, atom} +]}. + +%% @doc Cluster discovery +{mapping, "cluster.discovery", "ekka.cluster_discovery", [ + {default, manual}, + {datatype, atom} +]}. + +%% @doc Clean down node from the cluster +{mapping, "cluster.autoclean", "ekka.cluster_autoclean", [ + {datatype, {duration, ms}} +]}. + +%% @doc Cluster autoheal +{mapping, "cluster.autoheal", "ekka.cluster_autoheal", [ + {datatype, flag}, + {default, off} +]}. + +%%-------------------------------------------------------------------- +%% Cluster by static node list + +{mapping, "cluster.static.seeds", "ekka.cluster_discovery", [ {datatype, string} ]}. -%% Cluster Multicast Addr -{mapping, "cluster.multicast", "emqttd.cluster", [ - {default, "239.192.0.1:44369"}, +%%-------------------------------------------------------------------- +%% Cluster by UDP Multicast + +{mapping, "cluster.mcast.addr", "ekka.cluster_discovery", [ + {default, "239.192.0.1"}, {datatype, string} ]}. -{translation, "emqttd.cluster", fun(Conf) -> - Multicast = cuttlefish:conf_get("cluster.multicast", Conf), - [Addr, Port] = string:tokens(Multicast, ":"), - {ok, Ip} = inet_parse:address(Addr), - [{id, cuttlefish:conf_get("cluster.id", Conf)}, - {multicast, {Ip, list_to_integer(Port)}}] +{mapping, "cluster.mcast.ports", "ekka.cluster_discovery", [ + {default, "4369"}, + {datatype, string} +]}. + +{mapping, "cluster.mcast.iface", "ekka.cluster_discovery", [ + {datatype, string}, + {default, "0.0.0.0"} +]}. + +{mapping, "cluster.mcast.ttl", "ekka.cluster_discovery", [ + {datatype, integer}, + {default, 255} +]}. + +{mapping, "cluster.mcast.loop", "ekka.cluster_discovery", [ + {datatype, flag}, + {default, on} +]}. + +{mapping, "cluster.mcast.sndbuf", "ekka.cluster_discovery", [ + {datatype, bytesize}, + {default, "16KB"} +]}. + +{mapping, "cluster.mcast.recbuf", "ekka.cluster_discovery", [ + {datatype, bytesize}, + {default, "16KB"} +]}. + +{mapping, "cluster.mcast.buffer", "ekka.cluster_discovery", [ + {datatype, bytesize}, + {default, "32KB"} +]}. + +%%-------------------------------------------------------------------- +%% Cluster by DNS A Record + +{mapping, "cluster.dns.name", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.dns.app", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +%%-------------------------------------------------------------------- +%% Cluster using etcd + +{mapping, "cluster.etcd.server", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.etcd.prefix", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.etcd.node_ttl", "ekka.cluster_discovery", [ + {datatype, {duration, ms}}, + {default, "1m"} +]}. + +%%-------------------------------------------------------------------- +%% Cluster on K8s + +{mapping, "cluster.k8s.apiserver", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.k8s.service_name", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.k8s.address_type", "ekka.cluster_discovery", [ + {datatype, {enum, [ip, dns]}} +]}. + +{mapping, "cluster.k8s.app_name", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{translation, "ekka.cluster_discovery", fun(Conf) -> + Strategy = cuttlefish:conf_get("cluster.discovery", Conf), + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + IpPort = fun(S) -> + [Addr, Port] = string:tokens(S, ":"), + {ok, Ip} = inet:parse_address(Addr), + {Ip, Port} + end, + Options = fun(static) -> + [{seeds, [list_to_atom(S) || S <- string:tokens(cuttlefish:conf_get("cluster.static.seeds", Conf, ""), ",")]}]; + (mcast) -> + {ok, Addr} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.addr", Conf)), + {ok, Iface} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.iface", Conf)), + Ports = [list_to_integer(S) || S <- string:tokens(cuttlefish:conf_get("cluster.mcast.ports", Conf), ",")], + [{addr, Addr}, {ports, Ports}, {iface, Iface}, + {ttl, cuttlefish:conf_get("cluster.mcast.ttl", Conf, 1)}, + {loop, cuttlefish:conf_get("cluster.mcast.loop", Conf, true)}]; + (dns) -> + [{name, cuttlefish:conf_get("cluster.dns.name", Conf)}, + {app, cuttlefish:conf_get("cluster.dns.app", Conf)}]; + (etcd) -> + [{server, string:tokens(cuttlefish:conf_get("cluster.etcd.server", Conf), ",")}, + {prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emqcl")}, + {node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}]; + (k8s) -> + [{apiserver, cuttlefish:conf_get("cluster.k8s.apiserver", Conf)}, + {service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)}, + {address_type, cuttlefish:conf_get("cluster.k8s.address_type", Conf, ip)}, + {app_name, cuttlefish:conf_get("cluster.k8s.app_name", Conf)}]; + (manual) -> + [ ] + end, + {Strategy, Filter(Options(Strategy))} end}. %%-------------------------------------------------------------------- diff --git a/rebar.config b/rebar.config index 28b47bf58..834382f05 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq22"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","emq22"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{bcrypt,".*",{git,"https://github.com/smarkets/erlang-bcrypt","master"}} +{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","master"}},{ekka,".*",{git,"https://github.com/emqtt/ekka","develop"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","master"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{bcrypt,".*",{git,"https://github.com/smarkets/erlang-bcrypt","master"}} ]}. -{erl_opts, [{parse_transform,lager_transform}]}. +{erl_opts, [debug_info,{parse_transform,lager_transform}]}. diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 3a7ed3482..0b85fe0f0 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,6 +1,6 @@ {application,emqttd, [{description,"Erlang MQTT Broker"}, - {vsn,"2.2"}, + {vsn,"2.3"}, {modules,[]}, {registered,[emqttd_sup]}, {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb, diff --git a/src/emqttd.erl b/src/emqttd.erl index c60f8d590..d4cdd8437 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -40,6 +40,9 @@ %% Debug API -export([dump/0]). +%% Shutdown and reboot +-export([shutdown/0, shutdown/1, reboot/0]). + -type(subscriber() :: pid() | binary()). -type(suboption() :: local | {qos, non_neg_integer()} | {share, {'$queue' | binary()}}). @@ -161,6 +164,21 @@ run_hooks(Hook, Args) -> run_hooks(Hook, Args, Acc) -> emqttd_hooks:run(Hook, Args, Acc). +%%-------------------------------------------------------------------- +%% Shutdown and reboot +%%-------------------------------------------------------------------- + +shutdown() -> + shutdown(normal). + +shutdown(Reason) -> + lager:error("EMQ shutdown for ~s", [Reason]), + emqttd_plugins:unload(), + lists:foreach(fun application:stop/1, [emqttd, ekka, mochiweb, esockd, gproc]). + +reboot() -> + lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, ekka, emqttd]). + %%-------------------------------------------------------------------- %% Debug %%-------------------------------------------------------------------- diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index ad52416df..b85834722 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -34,19 +34,17 @@ -define(APP, emqttd). %%-------------------------------------------------------------------- -%% Application callbacks +%% Application Callbacks %%-------------------------------------------------------------------- start(_Type, _Args) -> print_banner(), - emqttd_mnesia:start(), + ekka:start(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd_cli:load(), register_acl_mod(), - emqttd_plugins:init(), - emqttd_plugins:load(), - start_listeners(), + start_autocluster(), register(emqttd, self()), print_vsn(), {ok, Sup}. @@ -146,6 +144,20 @@ register_acl_mod() -> undefined -> ok end. +%%-------------------------------------------------------------------- +%% Autocluster +%%-------------------------------------------------------------------- + +start_autocluster() -> + ekka:callback(prepare, fun emqttd:shutdown/1), + ekka:callback(reboot, fun emqttd:reboot/0), + ekka:autocluster(fun after_autocluster/0). + +after_autocluster() -> + emqttd_plugins:init(), + emqttd_plugins:load(), + start_listeners(). + %%-------------------------------------------------------------------- %% Start Listeners %%-------------------------------------------------------------------- diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index e729ec9cb..c35f44ae3 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -40,7 +40,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {started_at, sys_interval, heartbeat, tick_tref, version, sysdescr}). +-record(state, {started_at, sys_interval, heartbeat, ticker, version, sysdescr}). -define(APP, emqttd). @@ -122,9 +122,9 @@ init([]) -> % Tick {ok, #state{started_at = os:timestamp(), heartbeat = start_tick(1000, heartbeat), - version = list_to_binary(version()), - sysdescr = list_to_binary(sysdescr()), - tick_tref = start_tick(tick)}, hibernate}. + version = list_to_binary(version()), + sysdescr = list_to_binary(sysdescr()), + ticker = start_tick(tick)}, hibernate}. handle_call(uptime, _From, State) -> {reply, uptime(State), State}; @@ -149,7 +149,7 @@ handle_info(tick, State = #state{version = Version, sysdescr = Descr}) -> handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). -terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) -> +terminate(_Reason, #state{heartbeat = Hb, ticker = TRef}) -> stop_tick(Hb), stop_tick(TRef), ok. @@ -163,7 +163,7 @@ code_change(_OldVsn, State, _Extra) -> retain(brokers) -> Payload = list_to_binary(string:join([atom_to_list(N) || - N <- emqttd_mnesia:running_nodes()], ",")), + N <- ekka_mnesia:running_nodes()], ",")), Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload), emqttd:publish(emqttd_message:set_flag(sys, emqttd_message:set_flag(retain, Msg))). diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 369053909..d6301d484 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -111,7 +111,7 @@ broker(_) -> %% @doc Cluster with other nodes cluster(["join", SNode]) -> - case emqttd_cluster:join(emqttd_node:parse_name(SNode)) of + case ekka:join(ekka_node:parse_name(SNode)) of ok -> ?PRINT_MSG("Join the cluster successfully.~n"), cluster(["status"]); @@ -120,7 +120,7 @@ cluster(["join", SNode]) -> end; cluster(["leave"]) -> - case emqttd_cluster:leave() of + case ekka:leave() of ok -> ?PRINT_MSG("Leave the cluster successfully.~n"), cluster(["status"]); @@ -128,8 +128,8 @@ cluster(["leave"]) -> ?PRINT("Failed to leave the cluster: ~p~n", [Error]) end; -cluster(["remove", SNode]) -> - case emqttd_cluster:remove(emqttd_node:parse_name(SNode)) of +cluster(["force-leave", SNode]) -> + case ekka:force_leave(ekka_node:parse_name(SNode)) of ok -> ?PRINT_MSG("Remove the node from cluster successfully.~n"), cluster(["status"]); @@ -138,13 +138,13 @@ cluster(["remove", SNode]) -> end; cluster(["status"]) -> - ?PRINT("Cluster status: ~p~n", [emqttd_cluster:status()]); + ?PRINT("Cluster status: ~p~n", [ekka_cluster:status()]); cluster(_) -> - ?USAGE([{"cluster join ", "Join the cluster"}, - {"cluster leave", "Leave the cluster"}, - {"cluster remove ","Remove the node from cluster"}, - {"cluster status", "Cluster status"}]). + ?USAGE([{"cluster join ", "Join the cluster"}, + {"cluster leave", "Leave the cluster"}, + {"cluster force-leave ","Force the node leave from cluster"}, + {"cluster status", "Cluster status"}]). %%-------------------------------------------------------------------- %% @doc Users usage diff --git a/src/emqttd_cluster.erl b/src/emqttd_cluster.erl deleted file mode 100644 index 28b2d1723..000000000 --- a/src/emqttd_cluster.erl +++ /dev/null @@ -1,92 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_cluster). - --author("Feng Lee "). - --include("emqttd.hrl"). - -%% Cluster API --export([join/1, leave/0, status/0, remove/1]). - -%% RPC Call --export([prepare/0, reboot/0]). - -%% @doc Join cluster --spec(join(node()) -> ok | {error, any()}). -join(Node) when Node =:= node() -> - {error, {cannot_join_with_self, Node}}; - -join(Node) when is_atom(Node) -> - case {is_clustered(Node), emqttd:is_running(Node)} of - {false, true} -> - prepare(), ok = emqttd_mnesia:join_cluster(Node), reboot(); - {false, false} -> - {error, {node_not_running, Node}}; - {true, _} -> - {error, {already_clustered, Node}} - end. - -%% @doc Prepare to join or leave cluster. --spec(prepare() -> ok). -prepare() -> - emqttd_plugins:unload(), - lists:foreach(fun application:stop/1, [emqttd, mochiweb, esockd, gproc]). - -%% @doc Is node in cluster? --spec(is_clustered(node()) -> boolean()). -is_clustered(Node) -> - lists:member(Node, emqttd_mnesia:cluster_nodes(all)). - -%% @doc Reboot after join or leave cluster. --spec(reboot() -> ok). -reboot() -> - lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, emqttd]). - -%% @doc Leave from Cluster. --spec(leave() -> ok | {error, any()}). -leave() -> - case emqttd_mnesia:running_nodes() -- [node()] of - [_|_] -> - prepare(), ok = emqttd_mnesia:leave_cluster(), reboot(); - [] -> - {error, node_not_in_cluster} - end. - -%% @doc Remove a node from cluster. --spec(remove(node()) -> ok | {error, any()}). -remove(Node) when Node =:= node() -> - {error, {cannot_remove_self, Node}}; - -remove(Node) -> - case is_clustered(Node) andalso rpc:call(Node, ?MODULE, prepare, []) of - ok -> - case emqttd_mnesia:remove_from_cluster(Node) of - ok -> rpc:call(Node, ?MODULE, reboot, []); - Error -> Error - end; - false -> - {error, node_not_in_cluster}; - {badrpc, nodedown} -> - emqttd_mnesia:remove_from_cluster(Node); - {badrpc, Reason} -> - {error, Reason} - end. - -%% @doc Cluster status -status() -> emqttd_mnesia:cluster_status(). - diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index cf705ff00..e09154245 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -70,7 +70,7 @@ handle_request(Method, Path, Req) -> http_publish(Req) -> Params = [{iolist_to_binary(Key), Val} || {Key, Val} <- mochiweb_request:parse_post(Req)], - lager:info("HTTP Publish: ~p", [Params]), + lager:debug("HTTP Publish: ~p", [Params]), Topics = topics(Params), ClientId = get_value(<<"client">>, Params, http), Qos = int(get_value(<<"qos">>, Params, "0")), diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl deleted file mode 100644 index 20c054ab2..000000000 --- a/src/emqttd_mnesia.erl +++ /dev/null @@ -1,268 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_mnesia). - --author("Feng Lee "). - --include("emqttd.hrl"). - --include("emqttd_internal.hrl"). - -%% Start and stop mnesia --export([start/0, ensure_started/0, ensure_stopped/0, connect/1]). - -%% Cluster mnesia --export([join_cluster/1, cluster_status/0, leave_cluster/0, - remove_from_cluster/1, cluster_nodes/1, running_nodes/0]). - -%% Schema and tables --export([copy_schema/1, delete_schema/0, del_schema_copy/1, - create_table/2, copy_table/1, copy_table/2]). - -%%-------------------------------------------------------------------- -%% Start and init mnesia -%%-------------------------------------------------------------------- - -%% @doc Start mnesia database. --spec(start() -> ok). -start() -> - ensure_ok(ensure_data_dir()), - ensure_ok(init_schema()), - ok = mnesia:start(), - init_tables(), - wait_for(tables). - -%% @private -ensure_data_dir() -> - Dir = mnesia:system_info(directory) ++ "/", - case filelib:ensure_dir(Dir) of - ok -> ok; - {error, Reason} -> {error, {mnesia_dir_error, Dir, Reason}} - end. - -%% @doc ensure mnesia started. --spec(ensure_started() -> ok | {error, any()}). -ensure_started() -> - ok = mnesia:start(), wait_for(start). - -%% @doc ensure mnesia stopped. --spec(ensure_stopped() -> ok | {error, any()}). -ensure_stopped() -> - stopped = mnesia:stop(), wait_for(stop). - -%% @private -%% @doc Init mnesia schema or tables. -init_schema() -> - case mnesia:system_info(extra_db_nodes) of - [] -> mnesia:create_schema([node()]); - [_|_] -> ok - end. - -%% @private -%% @doc Init mnesia tables. -init_tables() -> - case mnesia:system_info(extra_db_nodes) of - [] -> create_tables(); - [_|_] -> copy_tables() - end. - -%% @doc Create mnesia tables. -create_tables() -> - emqttd_boot:apply_module_attributes(boot_mnesia). - -%% @doc Copy mnesia tables. -copy_tables() -> - emqttd_boot:apply_module_attributes(copy_mnesia). - -%% @doc Create mnesia table. --spec(create_table(Name:: atom(), TabDef :: list()) -> ok | {error, any()}). -create_table(Name, TabDef) -> - ensure_tab(mnesia:create_table(Name, TabDef)). - -%% @doc Copy mnesia table. --spec(copy_table(Name :: atom()) -> ok). -copy_table(Name) -> - copy_table(Name, ram_copies). - --spec(copy_table(Name:: atom(), ram_copies | disc_copies) -> ok). -copy_table(Name, RamOrDisc) -> - ensure_tab(mnesia:add_table_copy(Name, node(), RamOrDisc)). - -%% @doc Copy schema. -copy_schema(Node) -> - case mnesia:change_table_copy_type(schema, Node, disc_copies) of - {atomic, ok} -> - ok; - {aborted, {already_exists, schema, Node, disc_copies}} -> - ok; - {aborted, Error} -> - {error, Error} - end. - -%% @doc Force to delete schema. -delete_schema() -> - mnesia:delete_schema([node()]). - -%% @doc Delete schema copy -del_schema_copy(Node) -> - case mnesia:del_table_copy(schema, Node) of - {atomic, ok} -> ok; - {aborted, Reason} -> {error, Reason} - end. - -%%-------------------------------------------------------------------- -%% Cluster mnesia -%%-------------------------------------------------------------------- - -%% @doc Join the mnesia cluster --spec(join_cluster(node()) -> ok). -join_cluster(Node) when Node =/= node() -> - %% Stop mnesia and delete schema first - ensure_ok(ensure_stopped()), - ensure_ok(delete_schema()), - %% Start mnesia and cluster to node - ensure_ok(ensure_started()), - ensure_ok(connect(Node)), - ensure_ok(copy_schema(node())), - %% Copy tables - copy_tables(), - ensure_ok(wait_for(tables)). - -%% @doc Cluster status --spec(cluster_status() -> list()). -cluster_status() -> - Running = mnesia:system_info(running_db_nodes), - Stopped = mnesia:system_info(db_nodes) -- Running, - ?IF(Stopped =:= [], [{running_nodes, Running}], - [{running_nodes, Running}, {stopped_nodes, Stopped}]). - -%% @doc This node try leave the cluster --spec(leave_cluster() -> ok | {error, any()}). -leave_cluster() -> - case running_nodes() -- [node()] of - [] -> - {error, node_not_in_cluster}; - Nodes -> - case lists:any(fun(Node) -> - case leave_cluster(Node) of - ok -> true; - {error, _Reason} -> false - end - end, Nodes) of - true -> ok; - false -> {error, {failed_to_leave, Nodes}} - end - end. - --spec(leave_cluster(node()) -> ok | {error, any()}). -leave_cluster(Node) when Node =/= node() -> - case is_running_db_node(Node) of - true -> - ensure_ok(ensure_stopped()), - ensure_ok(rpc:call(Node, ?MODULE, del_schema_copy, [node()])), - ensure_ok(delete_schema()); - %%ensure_ok(start()); %% restart? - false -> - {error, {node_not_running, Node}} - end. - -%% @doc Remove node from mnesia cluster. --spec(remove_from_cluster(node()) -> ok | {error, any()}). -remove_from_cluster(Node) when Node =/= node() -> - case {is_node_in_cluster(Node), is_running_db_node(Node)} of - {true, true} -> - ensure_ok(rpc:call(Node, ?MODULE, ensure_stopped, [])), - mnesia_lib:del(extra_db_nodes, Node), - ensure_ok(del_schema_copy(Node)), - ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); - {true, false} -> - mnesia_lib:del(extra_db_nodes, Node), - ensure_ok(del_schema_copy(Node)); - %ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); - {false, _} -> - {error, node_not_in_cluster} - end. - -%% @doc Is node in mnesia cluster. -is_node_in_cluster(Node) -> - lists:member(Node, mnesia:system_info(db_nodes)). - -%% @private -%% @doc Is running db node. -is_running_db_node(Node) -> - lists:member(Node, running_nodes()). - -%% @doc Cluster with node. --spec(connect(node()) -> ok | {error, any()}). -connect(Node) -> - case mnesia:change_config(extra_db_nodes, [Node]) of - {ok, [Node]} -> ok; - {ok, []} -> {error, {failed_to_connect_node, Node}}; - Error -> Error - end. - -%% @doc Running nodes. --spec(running_nodes() -> list(node())). -running_nodes() -> cluster_nodes(running). - -%% @doc Cluster nodes. --spec(cluster_nodes(all | running | stopped) -> [node()]). -cluster_nodes(all) -> - mnesia:system_info(db_nodes); -cluster_nodes(running) -> - mnesia:system_info(running_db_nodes); -cluster_nodes(stopped) -> - cluster_nodes(all) -- cluster_nodes(running). - -%% @private -ensure_ok(ok) -> ok; -ensure_ok({error, {_Node, {already_exists, _Node}}}) -> ok; -ensure_ok({badrpc, Reason}) -> throw({error, {badrpc, Reason}}); -ensure_ok({error, Reason}) -> throw({error, Reason}). - -%% @private -ensure_tab({atomic, ok}) -> ok; -ensure_tab({aborted, {already_exists, _Name}}) -> ok; -ensure_tab({aborted, {already_exists, _Name, _Node}})-> ok; -ensure_tab({aborted, Error}) -> Error. - -%% @doc Wait for mnesia to start, stop or tables ready. --spec(wait_for(start | stop | tables) -> ok | {error, Reason :: atom()}). -wait_for(start) -> - case mnesia:system_info(is_running) of - yes -> ok; - no -> {error, mnesia_unexpectedly_stopped}; - stopping -> {error, mnesia_unexpectedly_stopping}; - starting -> timer:sleep(1000), wait_for(start) - end; - -wait_for(stop) -> - case mnesia:system_info(is_running) of - no -> ok; - yes -> {error, mnesia_unexpectedly_running}; - starting -> {error, mnesia_unexpectedly_starting}; - stopping -> timer:sleep(1000), wait_for(stop) - end; - -wait_for(tables) -> - Tables = mnesia:system_info(local_tables), - case mnesia:wait_for_tables(Tables, 600000) of - ok -> ok; - {error, Reason} -> {error, Reason}; - {timeout, BadTables} -> {error, {timetout, BadTables}} - end. - diff --git a/src/emqttd_node.erl b/src/emqttd_node.erl deleted file mode 100644 index ecf44503f..000000000 --- a/src/emqttd_node.erl +++ /dev/null @@ -1,44 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_node). - --author("Feng Lee "). - --import(lists, [concat/1]). - --export([is_aliving/1, parse_name/1]). - -%% @doc Is node aliving --spec(is_aliving(node()) -> boolean()). -is_aliving(Node) -> - case net_adm:ping(Node) of - pong -> true; - pang -> false - end. - -%% @doc Parse node name --spec(parse_name(string()) -> atom()). -parse_name(Name) when is_list(Name) -> - case string:tokens(Name, "@") of - [_Node, _Host] -> list_to_atom(Name); - _ -> with_host(Name) - end. - -with_host(Name) -> - [_, Host] = string:tokens(atom_to_list(node()), "@"), - list_to_atom(concat([Name, "@", Host])). - diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index f126f631c..73f184a3a 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -156,8 +156,8 @@ load_app(App) -> start_app(App, SuccFun) -> case application:ensure_all_started(App) of {ok, Started} -> - lager:info("started Apps: ~p", [Started]), - lager:info("load plugin ~p successfully", [App]), + lager:info("Started Apps: ~p", [Started]), + lager:info("Load plugin ~p successfully", [App]), SuccFun(App), {ok, Started}; {error, {ErrApp, Reason}} -> @@ -196,11 +196,11 @@ unload_plugin(App, Persistent) -> stop_app(App) -> case application:stop(App) of ok -> - lager:info("stop plugin ~p successfully~n", [App]), ok; + lager:info("Stop plugin ~p successfully~n", [App]), ok; {error, {not_started, App}} -> - lager:error("plugin ~p is not started~n", [App]), ok; + lager:error("Plugin ~p is not started~n", [App]), ok; {error, Reason} -> - lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} + lager:error("Stop plugin ~p error: ~p", [App]), {error, Reason} end. %%-------------------------------------------------------------------- diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 433825253..b5be7c066 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -344,10 +344,10 @@ send(Packet = ?PACKET(Type), {ok, State#proto_state{stats_data = Stats1}}. trace(recv, Packet, ProtoState) -> - ?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState); + ?LOG(debug, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState); trace(send, Packet, ProtoState) -> - ?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState). + ?LOG(debug, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState). inc_stats(_Direct, _Type, Stats = #proto_stats{enable_stats = false}) -> Stats; @@ -382,7 +382,7 @@ shutdown(conflict, #proto_state{client_id = _ClientId}) -> ignore; shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> - ?LOG(info, "Shutdown for ~p", [Error], State), + ?LOG(debug, "Shutdown for ~p", [Error], State), Client = client(State), send_willmsg(Client, WillMsg), emqttd_hooks:run('client.disconnected', [Error], Client), diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 4d7762e5d..b3dd8b4ad 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -48,24 +48,26 @@ -define(ROUTER, ?MODULE). +-define(LOCK, {?ROUTER, clean_routes}). + %%-------------------------------------------------------------------- %% Mnesia Bootstrap %%-------------------------------------------------------------------- mnesia(boot) -> - ok = emqttd_mnesia:create_table(mqtt_topic, [ + ok = ekka_mnesia:create_table(mqtt_topic, [ {ram_copies, [node()]}, {record_name, mqtt_topic}, {attributes, record_info(fields, mqtt_topic)}]), - ok = emqttd_mnesia:create_table(mqtt_route, [ + ok = ekka_mnesia:create_table(mqtt_route, [ {type, bag}, {ram_copies, [node()]}, {record_name, mqtt_route}, {attributes, record_info(fields, mqtt_route)}]); mnesia(copy) -> - ok = emqttd_mnesia:copy_table(mqtt_topic), - ok = emqttd_mnesia:copy_table(mqtt_route, ram_copies). + ok = ekka_mnesia:copy_table(mqtt_topic), + ok = ekka_mnesia:copy_table(mqtt_route, ram_copies). %%-------------------------------------------------------------------- %% Start the Router @@ -216,9 +218,9 @@ stop() -> gen_server:call(?ROUTER, stop). %%-------------------------------------------------------------------- init([]) -> - mnesia:subscribe(system), + ekka:monitor(membership), ets:new(mqtt_local_route, [set, named_table, protected]), - {ok, TRef} = timer:send_interval(timer:seconds(1), stats), + {ok, TRef} = timer:send_interval(timer:seconds(1), stats), {ok, #state{stats_timer = TRef}}. handle_call(stop, _From, State) -> @@ -231,7 +233,7 @@ handle_cast({add_local_route, Topic}, State) -> %% why node()...? ets:insert(mqtt_local_route, {Topic, node()}), {noreply, State}; - + handle_cast({del_local_route, Topic}, State) -> ets:delete(mqtt_local_route, Topic), {noreply, State}; @@ -239,27 +241,16 @@ handle_cast({del_local_route, Topic}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({mnesia_system_event, {mnesia_up, Node}}, State) -> - lager:error("Mnesia up: ~p~n", [Node]), - {noreply, State}; - -handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - lager:error("Mnesia down: ~p~n", [Node]), - clean_routes_(Node), - update_stats_(), +handle_info({membership, {mnesia, down, Node}}, State) -> + global:trans({?LOCK, self()}, + fun() -> + clean_routes_(Node), + update_stats_() + end), {noreply, State, hibernate}; -handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) -> - %% 1. Backup and restart - %% 2. Set master nodes - lager:critical("Mnesia inconsistent_database event: ~p, ~p~n", [Context, Node]), - {noreply, State}; - -handle_info({mnesia_system_event, {mnesia_overload, Details}}, State) -> - lager:critical("Mnesia overload: ~p~n", [Details]), - {noreply, State}; - -handle_info({mnesia_system_event, _Event}, State) -> +handle_info({membership, _Event}, State) -> + %% ignore {noreply, State}; handle_info(stats, State) -> @@ -271,7 +262,7 @@ handle_info(_Info, State) -> terminate(_Reason, #state{stats_timer = TRef}) -> timer:cancel(TRef), - mnesia:unsubscribe(system). + ekka:unmonitor(membership). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl index ee79ac4fb..69d18e1e4 100644 --- a/src/emqttd_server.erl +++ b/src/emqttd_server.erl @@ -102,11 +102,11 @@ trace(publish, From, _Msg) when is_atom(From) -> %% Dont' trace '$SYS' publish ignore; trace(publish, {ClientId, Username}, #mqtt_message{topic = Topic, payload = Payload}) -> - lager:info([{client, ClientId}, {topic, Topic}], - "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]); -trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) when is_binary(From); is_list(From) -> - lager:info([{client, From}, {topic, Topic}], - "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). + lager:debug([{client, ClientId}, {topic, Topic}], + "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]); +trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) -> + lager:debug([{client, From}, {topic, Topic}], + "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). %% @doc Unsubscribe -spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()). diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 864905b05..31934759f 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -379,7 +379,7 @@ handle_cast({subscribe, _From, TopicTable, AckFun}, State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) -> - ?LOG(info, "Subscribe ~p", [TopicTable], State), + ?LOG(debug, "Subscribe ~p", [TopicTable], State), {GrantedQos, Subscriptions1} = lists:foldl(fun({Topic, Opts}, {QosAcc, SubMap}) -> NewQos = proplists:get_value(qos, Opts), @@ -407,7 +407,7 @@ handle_cast({unsubscribe, _From, TopicTable}, State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) -> - ?LOG(info, "Unsubscribe ~p", [TopicTable], State), + ?LOG(debug, "Unsubscribe ~p", [TopicTable], State), Subscriptions1 = lists:foldl(fun({Topic, Opts}, SubMap) -> case maps:find(Topic, SubMap) of @@ -482,7 +482,7 @@ handle_cast({resume, ClientId, ClientPid}, await_rel_timer = AwaitTimer, expiry_timer = ExpireTimer}) -> - ?LOG(info, "Resumed by ~p", [ClientPid], State), + ?LOG(debug, "Resumed by ~p", [ClientPid], State), %% Cancel Timers lists:foreach(fun emqttd_misc:cancel_timer/1, @@ -552,7 +552,7 @@ handle_info({timeout, _Timer, check_awaiting_rel}, State) -> hibernate(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined}))); handle_info({timeout, _Timer, expired}, State) -> - ?LOG(info, "Expired, shutdown now.", [], State), + ?LOG(debug, "Expired, shutdown now.", [], State), shutdown(expired, State); handle_info({'EXIT', ClientPid, _Reason}, @@ -563,7 +563,7 @@ handle_info({'EXIT', ClientPid, Reason}, State = #state{clean_sess = false, client_pid = ClientPid, expiry_interval = Interval}) -> - ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State), + ?LOG(debug, "Client ~p EXIT for ~p", [ClientPid, Reason], State), ExpireTimer = start_timer(Interval, expired), State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer}, hibernate(emit_stats(State1)); diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index d53ab8c3c..dacb0b6a1 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -62,14 +62,14 @@ mnesia(boot) -> %% Global Session Table - ok = emqttd_mnesia:create_table(mqtt_session, [ + ok = ekka_mnesia:create_table(mqtt_session, [ {type, set}, {ram_copies, [node()]}, {record_name, mqtt_session}, {attributes, record_info(fields, mqtt_session)}]); mnesia(copy) -> - ok = emqttd_mnesia:copy_table(mqtt_session). + ok = ekka_mnesia:copy_table(mqtt_session). %%-------------------------------------------------------------------- %% API diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index 0f7cb7ef1..2e8e9a749 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -34,7 +34,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {stats_fun, tick_tref}). +-record(state, {stats_fun, ticker}). + +-define(LOCK, {?MODULE, clean_sessions}). %% @doc Start a session helper -spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}). @@ -42,9 +44,9 @@ start_link(StatsFun) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []). init([StatsFun]) -> - mnesia:subscribe(system), + ekka:monitor(membership), {ok, TRef} = timer:send_interval(timer:seconds(1), tick), - {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}. + {ok, #state{stats_fun = StatsFun, ticker = TRef}}. handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). @@ -52,18 +54,17 @@ handle_call(Req, _From, State) -> handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - lager:error("!!!Mnesia node down: ~s", [Node]), +handle_info({membership, {mnesia, down, Node}}, State) -> Fun = fun() -> ClientIds = mnesia:select(mqtt_session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'}, [{'==', {node, '$2'}, Node}], ['$1']}]), lists:foreach(fun(ClientId) -> mnesia:delete({mqtt_session, ClientId}) end, ClientIds) end, - mnesia:async_dirty(Fun), - {noreply, State}; + global:trans({?LOCK, self()}, fun() -> mnesia:async_dirty(Fun) end), + {noreply, State, hibernate}; -handle_info({mnesia_system_event, {mnesia_up, _Node}}, State) -> +handle_info({membership, _Event}, State) -> {noreply, State}; handle_info(tick, State) -> @@ -72,9 +73,9 @@ handle_info(tick, State) -> handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). -terminate(_Reason, _State = #state{tick_tref = TRef}) -> +terminate(_Reason, _State = #state{ticker = TRef}) -> timer:cancel(TRef), - mnesia:unsubscribe(system). + ekka:unmonitor(membership). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_trie.erl b/src/emqttd_trie.erl index 70b1c003f..5b36e6e04 100644 --- a/src/emqttd_trie.erl +++ b/src/emqttd_trie.erl @@ -41,21 +41,21 @@ -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> %% Trie Table - ok = emqttd_mnesia:create_table(mqtt_trie, [ + ok = ekka_mnesia:create_table(mqtt_trie, [ {ram_copies, [node()]}, {record_name, trie}, {attributes, record_info(fields, trie)}]), %% Trie Node Table - ok = emqttd_mnesia:create_table(mqtt_trie_node, [ + ok = ekka_mnesia:create_table(mqtt_trie_node, [ {ram_copies, [node()]}, {record_name, trie_node}, {attributes, record_info(fields, trie_node)}]); mnesia(copy) -> %% Copy Trie Table - ok = emqttd_mnesia:copy_table(mqtt_trie), + ok = ekka_mnesia:copy_table(mqtt_trie), %% Copy Trie Node Table - ok = emqttd_mnesia:copy_table(mqtt_trie_node). + ok = ekka_mnesia:copy_table(mqtt_trie_node). %%-------------------------------------------------------------------- %% Trie API diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl index dbf0ea08c..c7d0b2119 100644 --- a/src/emqttd_ws.erl +++ b/src/emqttd_ws.erl @@ -39,7 +39,7 @@ handle_request(Req) -> %% MQTT Over WebSocket %%-------------------------------------------------------------------- handle_request('GET', "/mqtt", Req) -> - lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), + lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]), Upgrade = Req:get_header_value("Upgrade"), Proto = check_protocol_header(Req), case {is_websocket(Upgrade), Proto} of diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index f59c8968b..160e023f2 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -24,6 +24,8 @@ -include_lib("common_test/include/ct.hrl"). +-define(APP, emqttd). + -define(CONTENT_TYPE, "application/x-www-form-urlencoded"). -define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"}, @@ -45,7 +47,6 @@ all() -> {group, stats}, {group, hook}, {group, http}, - {group, cluster}, {group, alarms}, {group, cli}, {group, cleanSession}]. @@ -53,8 +54,9 @@ all() -> groups() -> [{protocol, [sequence], [mqtt_connect, - mqtt_ssl_oneway, - mqtt_ssl_twoway]}, + mqtt_ssl_twoway, + mqtt_ssl_oneway + ]}, {pubsub, [sequence], [subscribe_unsubscribe, publish, pubsub, @@ -81,14 +83,6 @@ groups() -> request_publish % websocket_test ]}, - {cluster, [sequence], - [cluster_test, - cluster_join, - cluster_leave, - cluster_remove, - cluster_remove2, - cluster_node_down - ]}, {alarms, [sequence], [set_alarms] }, @@ -109,24 +103,17 @@ groups() -> ]}, cli_vm]}, {cleanSession, [sequence], - [cleanSession_validate, - cleanSession_validate1 + [cleanSession_validate ]}]. init_per_suite(Config) -> - application:start(lager), - DataDir = proplists:get_value(data_dir, Config), - NewConfig = emqttd_config(DataDir), - Vals = change_opts(ssl_oneway, DataDir, proplists:get_value(emqttd, NewConfig)), - [application:set_env(emqttd, Par, Value) || {Par, Value} <- Vals], - application:ensure_all_started(emqttd), - [{config, NewConfig} | Config]. + NewConfig = generate_config(), + lists:foreach(fun set_app_env/1, NewConfig), + application:ensure_all_started(?APP), + Config. end_per_suite(_Config) -> - application:stop(emqttd), - application:stop(esockd), - application:stop(gproc), - emqttd_mnesia:ensure_stopped(). + emqttd:shutdown(). %%-------------------------------------------------------------------- %% Protocol Test @@ -147,31 +134,32 @@ connect_broker_(Packet, RecvSize) -> Data. mqtt_ssl_oneway(_) -> + emqttd:stop(), + change_opts(ssl_oneway), + emqttd:start(), {ok, SslOneWay} = emqttc:start_link([{host, "localhost"}, {port, 8883}, {client_id, <<"ssloneway">>}, ssl]), - timer:sleep(10), + timer:sleep(100), emqttc:subscribe(SslOneWay, <<"topic">>, qos1), {ok, Pub} = emqttc:start_link([{host, "localhost"}, {client_id, <<"pub">>}]), emqttc:publish(Pub, <<"topic">>, <<"SSL oneWay test">>, [{qos, 1}]), - timer:sleep(10), + timer:sleep(100), receive {publish, _Topic, RM} -> ?assertEqual(<<"SSL oneWay test">>, RM) after 1000 -> false end, + timer:sleep(100), emqttc:disconnect(SslOneWay), emqttc:disconnect(Pub). -mqtt_ssl_twoway(Config) -> - emqttd_cluster:prepare(), - DataDir = proplists:get_value(data_dir, Config), - EmqConfig = proplists:get_value(config, Config), - Vals = change_opts(ssl_twoway, DataDir, proplists:get_value(emqttd, EmqConfig)), - [application:set_env(emqttd, Par, Value) || {Par, Value} <- Vals], - emqttd_cluster:reboot(), - ClientSSl = [{Key, filename:join([DataDir, File])} || - {Key, File} <- ?MQTT_SSL_CLIENT ], +mqtt_ssl_twoway(_) -> + emqttd:stop(), + change_opts(ssl_twoway), + emqttd:start(), + ClientSSl = [{Key, local_path(["etc", File])} || + {Key, File} <- ?MQTT_SSL_CLIENT], {ok, SslTwoWay} = emqttc:start_link([{host, "localhost"}, {port, 8883}, {client_id, <<"ssltwoway">>}, @@ -427,7 +415,7 @@ hook_fun8(arg, initArg) -> stop. request_status(_) -> {InternalStatus, _ProvidedStatus} = init:get_status(), AppStatus = - case lists:keysearch(emqttd, 1, application:which_applications()) of + case lists:keysearch(?APP, 1, application:which_applications()) of false -> not_running; {value, _Val} -> running end, @@ -469,79 +457,6 @@ websocket_test(_) -> ct:log("Req:~p", [Req]), emqttd_http:handle_request(Req). -%%-------------------------------------------------------------------- -%% cluster group -%%-------------------------------------------------------------------- -cluster_test(_Config) -> - Z = slave(emqttd, cluster_test_z), - wait_running(Z), - true = emqttd:is_running(Z), - Node = node(), - ok = rpc:call(Z, emqttd_cluster, join, [Node]), - [Z, Node] = lists:sort(mnesia:system_info(running_db_nodes)), - ct:log("Z:~p, Node:~p", [Z, Node]), - ok = rpc:call(Z, emqttd_cluster, leave, []), - [Node] = lists:sort(mnesia:system_info(running_db_nodes)), - ok = slave:stop(Z). - -cluster_join(_) -> - Z = slave(emqttd, cluster_join_z), - N = slave(node, cluster_join_n), - wait_running(Z), - true = emqttd:is_running(Z), - Node = node(), - {error, {cannot_join_with_self, Node}} = emqttd_cluster:join(Node), - {error, {node_not_running, N}} = emqttd_cluster:join(N), - ok = emqttd_cluster:join(Z), - slave:stop(Z), - slave:stop(N). - -cluster_leave(_) -> - Z = slave(emqttd, cluster_leave_z), - wait_running(Z), - {error, node_not_in_cluster} = emqttd_cluster:leave(), - ok = emqttd_cluster:join(Z), - Node = node(), - [Z, Node] = emqttd_mnesia:running_nodes(), - ok = emqttd_cluster:leave(), - [Node] = emqttd_mnesia:running_nodes(), - slave:stop(Z). - -cluster_remove(_) -> - Z = slave(emqttd, cluster_remove_z), - wait_running(Z), - Node = node(), - {error, {cannot_remove_self, Node}} = emqttd_cluster:remove(Node), - ok = emqttd_cluster:join(Z), - [Z, Node] = emqttd_mnesia:running_nodes(), - ok = emqttd_cluster:remove(Z), - [Node] = emqttd_mnesia:running_nodes(), - slave:stop(Z). - -cluster_remove2(_) -> - Z = slave(emqttd, cluster_remove2_z), - wait_running(Z), - ok = emqttd_cluster:join(Z), - Node = node(), - [Z, Node] = emqttd_mnesia:running_nodes(), - ok = emqttd_cluster:remove(Z), - ok = rpc:call(Z, emqttd_mnesia, ensure_stopped, []), - [Node] = emqttd_mnesia:running_nodes(), - slave:stop(Z). - -cluster_node_down(_) -> - Z = slave(emqttd, cluster_node_down), - timer:sleep(1000), - wait_running(Z), - ok = emqttd_cluster:join(Z), - ok = rpc:call(Z, emqttd_router, add_route, [<<"a/b/c">>]), - ok = rpc:call(Z, emqttd_router, add_route, [<<"#">>]), - Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)), - ct:log("Routes: ~p~n", [Routes]), - [<<"#">>, <<"a/b/c">>] = [Topic || #mqtt_route{topic = Topic} <- Routes], - slave:stop(Z), - timer:sleep(1000), - [] = lists:sort(emqttd_router:match(<<"a/b/c">>)). set_alarms(_) -> AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, @@ -551,8 +466,6 @@ set_alarms(_) -> emqttd_alarm:clear_alarm(<<"1">>), [] = emqttd_alarm:get_alarms(). - - %%-------------------------------------------------------------------- %% Cli group %%-------------------------------------------------------------------- @@ -680,87 +593,56 @@ cleanSession_validate(_) -> emqttc:disconnect(Pub), emqttc:disconnect(C11). -cleanSession_validate1(_) -> - {ok, C1} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, true}]), - timer:sleep(10), - emqttc:subscribe(C1, <<"topic">>, qos1), - emqttc:disconnect(C1), - {ok, Pub} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"pub">>}]), - - emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 1}]), - timer:sleep(10), - {ok, C11} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, false}]), - timer:sleep(100), - Metrics = emqttd_metrics:all(), - ?assertEqual(0, proplists:get_value('messages/qos1/sent', Metrics)), - ?assertEqual(1, proplists:get_value('messages/qos1/received', Metrics)), - emqttc:disconnect(Pub), - emqttc:disconnect(C11). - - -ensure_ok(ok) -> ok; -ensure_ok({error, {already_started, _}}) -> ok. - -host() -> [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. - -wait_running(Node) -> - wait_running(Node, 30000). - -wait_running(Node, Timeout) when Timeout < 0 -> - throw({wait_timeout, Node}); - -wait_running(Node, Timeout) -> - case rpc:call(Node, emqttd, is_running, [Node]) of - true -> ok; - false -> timer:sleep(100), - wait_running(Node, Timeout - 100) - end. - -slave(emqttd, Node) -> - {ok, Emq} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), - rpc:call(Emq, application, ensure_all_started, [emqttd]), - Emq; - -slave(node, Node) -> - {ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), - N. - -emqttd_config(DataDir) -> - Schema = cuttlefish_schema:files([filename:join([DataDir, "emqttd.schema"])]), - Conf = conf_parse:file(filename:join([DataDir, "emqttd.conf"])), - cuttlefish_generator:map(Schema, Conf). - -change_opts(SslType, DataDir, Vals) -> - Listeners = proplists:get_value(listeners, Vals), +change_opts(SslType) -> + {ok, Listeners} = application:get_env(?APP, listeners), NewListeners = lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) -> case Protocol of ssl -> SslOpts = proplists:get_value(sslopts, Opts), - Keyfile = filename:join([DataDir, proplists:get_value(keyfile, SslOpts)]), - Certfile = filename:join([DataDir, proplists:get_value(certfile, SslOpts)]), + Keyfile = local_path(["etc/certs", "key.pem"]), + Certfile = local_path(["etc/certs", "cert.pem"]), TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}), TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}), TupleList3 = case SslType of ssl_twoway-> - CAfile = filename:join([DataDir, proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]), + CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]), MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}), lists:merge(TupleList2, MutSslList); _ -> - TupleList2 + lists:filter(fun ({cacertfile, _}) -> false; + ({verify, _}) -> false; + ({fail_if_no_peer_cert, _}) -> false; + (_) -> true + end, TupleList2) end, - [{Protocol, Port, [{ssl, TupleList3}]} | Acc]; + [{Protocol, Port, lists:keyreplace(sslopts, 1, Opts, {sslopts, TupleList3})} | Acc]; _ -> [Listener | Acc] end end, [], Listeners), - lists:keyreplace(listeners, 1, Vals, {listeners, NewListeners}). + application:set_env(?APP, listeners, NewListeners). + +generate_config() -> + Schema = cuttlefish_schema:files([local_path(["priv", "emq.schema"])]), + Conf = conf_parse:file([local_path(["etc", "emq.conf"])]), + cuttlefish_generator:map(Schema, Conf). + +get_base_dir(Module) -> + {file, Here} = code:is_loaded(Module), + filename:dirname(filename:dirname(Here)). + +get_base_dir() -> + get_base_dir(?MODULE). + +local_path(Components, Module) -> + filename:join([get_base_dir(Module) | Components]). + +local_path(Components) -> + local_path(Components, ?MODULE). + +set_app_env({App, Lists}) -> + lists:foreach(fun({Par, Var}) -> + application:set_env(App, Par, Var) + end, Lists). diff --git a/test/emqttd_SUITE_data/certs/cacert.pem b/test/emqttd_SUITE_data/certs/cacert.pem deleted file mode 100644 index ca4948ed9..000000000 --- a/test/emqttd_SUITE_data/certs/cacert.pem +++ /dev/null @@ -1,17 +0,0 @@ ------BEGIN CERTIFICATE----- -MIICxjCCAa6gAwIBAgIJAPhU8tv3KMe/MA0GCSqGSIb3DQEBCwUAMBMxETAPBgNV -BAMMCE15VGVzdENBMB4XDTE2MTAzMTA3MTU0NVoXDTE3MTAzMTA3MTU0NVowEzER -MA8GA1UEAwwITXlUZXN0Q0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB -AQCtPcDnmjiVl7ScDhYvGaW+PUgfp7P5cM39mnrW6fkxhA0tgunWpWlYVKbcuh5y -4bTNYrOQpcFO3Zg62tva4XEL8O1huqTlGsAeysZ3vWE4/8NGN/3wZy0TKDvwiwOB -tbS3C5wcRQZohExL6yEL4XzDGk44x2mIs8/NzeG7Zycqybh9tsCJiHbLiTxnLa24 -v5USOtlvWye0hA0yUUqc2k7tKVmIMT4A4ulMb2sDVRrSLjyFDTI0c8grlPLfKbG8 -gpYLsHn9aAjqviyvmJdRLxwauqn+ghNWn1TyZwgAUxpoTtWeC0ilzEt18RP8vZjm -eCbEP4qQDDvSCdLrie5CezyxAgMBAAGjHTAbMAwGA1UdEwQFMAMBAf8wCwYDVR0P -BAQDAgEGMA0GCSqGSIb3DQEBCwUAA4IBAQBJ/I/QJjU+mgkIaaHImFcIYFrfBirC -vDiWo2W+zRh7CbcSf+jsksI99d230ixSDY36CPLKZeZhELST7xWKEELKbPdNbtOO -EM10+XteLSXKVNGXfrEbW973eum3FGLobMA9OcH6+qDaf08pibe7kuv10aAgSs/I -0Qg5H/UTAKQJKO9hhOgERM/FettuF+WGJaaZZZb9Y2YYBNRf/GtM8KHCjpCX9+XD -kdeQGO8Hn10H9tOmggyfdIpsunBcs2/6/exCp8RPBWurN2GSW2RcnS5xVL0r+SVW -VOhSDy1JwnNPczpqkqE74qAbAah0dTJFcFWzeGLVk7Kp+2pissAiU3gg ------END CERTIFICATE----- diff --git a/test/emqttd_SUITE_data/certs/cert.pem b/test/emqttd_SUITE_data/certs/cert.pem deleted file mode 100644 index 58aa2c4ef..000000000 --- a/test/emqttd_SUITE_data/certs/cert.pem +++ /dev/null @@ -1,18 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIC9jCCAd6gAwIBAgIBATANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl -c3RDQTAeFw0xNjEwMzEwNzE1NDVaFw0xNzEwMzEwNzE1NDVaMDkxJjAkBgNVBAMT -HWRlbmdoYWlndWlkZU1hY0Jvb2stQWlyLmxvY2FsMQ8wDQYDVQQKEwZzZXJ2ZXIw -ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4Ena4vgWrzwUB0hGW1v0v -K986FhU5ZdYz5H5MGonfWwv89nR2DlftSDXEvKFyc2MT81GGm16VJv3mVpQJLuKA -xLBLY7a1zSrJdugXWy+mgJJTPW6KjTY4jPtfCl6x/yVr8YclVa8XO0JFzOme2LMV -Ylc/ixVEa66UpxRNrg5yWHS26KcB1lE3GLERoRBKF7nsyGqGY4X9TypBwglCVoqK -3dKVGwCvFur+oPnt/C5pwR6UmUV/Ppf1EaRD7Po+xcyJSeCvszG3FH4iHsDHnjLe -DR6lxouvMCb+aKJi9d0xowOjhbKoFMF179t4SVnptQeq+U6ui3cPKUjia7Zh1tZT -AgMBAAGjLzAtMAkGA1UdEwQCMAAwCwYDVR0PBAQDAgUgMBMGA1UdJQQMMAoGCCsG -AQUFBwMBMA0GCSqGSIb3DQEBCwUAA4IBAQB2jlDPiZfP/whsvvFn43g37QMwX5ST -Z5OpmEFnFjAH3ec0PPqPrKYEu00q5wEC+8L6uVH8FHOFf11JLH4wl11/C/mvE92D -qZtGG8KCnG2+rk5OJPGX+28Z+OnCZlXOjQ8qd2x5KtIW50JuXJ3cbDRHtF/TVanm -Exu+TCBeToNwbcU2sfQnbljkUTj4idUFz0pq3uvw3dA4R1J2foungPAYXSWcVhtb -RYtG8epIvkAyyUE5nY3kC05AUml6gSZkrJiYM5I1IJTX1lQ7Pv2yxRBZUtTx33rP -ccnsW6tbHTDBG8UDHx4LKHErdWFgCJWI81EUEcTip9g2zCOGTWKnpz+z ------END CERTIFICATE----- diff --git a/test/emqttd_SUITE_data/certs/client-cert.pem b/test/emqttd_SUITE_data/certs/client-cert.pem deleted file mode 100644 index e1690d9aa..000000000 --- a/test/emqttd_SUITE_data/certs/client-cert.pem +++ /dev/null @@ -1,18 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIC9jCCAd6gAwIBAgIBAjANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl -c3RDQTAeFw0xNjEwMzEwNzE1NDZaFw0xNzEwMzEwNzE1NDZaMDkxJjAkBgNVBAMT -HWRlbmdoYWlndWlkZU1hY0Jvb2stQWlyLmxvY2FsMQ8wDQYDVQQKEwZjbGllbnQw -ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCmPMkieMtJO4PGIQG30uxI -SEoRJoF2w0ufFhZGYCEaqFlHaSoc6nTiCUmnxadDpjkNBs4R6RDfM9zPJ0QdgSFO -OJsWgQEHym/EQTcEx11+/2NDZWMJyZdpWZlU57SwHfWDwYa2XFX1bV+pAvhB8cli -wCkygTwp1cZcwQpb8TfZySy8r5mwrWq2nhCQPtYqMxjNjpR/UeeZzt+Uh3CEXQ8h -omjGinDXnnGwrYwBEP9G6fzTvyCWTyrsWC1Q37oAMzbkwFRoIBSAQWXBv9hgI08s -IBYvXnRGKWOJZGxAP4a4TvpFS+nqi+fFVn4ktUfcH3PoSMh7PKavrFT2hQaryLt1 -AgMBAAGjLzAtMAkGA1UdEwQCMAAwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsG -AQUFBwMCMA0GCSqGSIb3DQEBCwUAA4IBAQAeimI8AQBFWiE9/Nf/0radux355mod -5vPLbKn6I6nzb/sS/Ug8SMoFnkhncwj+XOgTSliUyWcwOB11UDVJbUIkB/x+Qo3w -hvrATTdby2WdFNQvH4X7PmP8asDDN7ZxoLyRmuhjL4avJ3giwRcuQK4cB35b+Lb2 -p1e7hW81RaV7OEc0o4/vJgPvv9N7wvUuipwJns6PrN7VDn99lT8zWrt2pQ06e2mk -jDuXulVpiUtLHJhTnABkCaKiHWCYAFfMjFeRb3gUXKqShzOyDSGWY91YMID/HE4r -sVLm2mD1zurue8EmYtQQ6uiJIW9SzvshMHG6EA5QWA1ytoalfePbvf+c ------END CERTIFICATE----- diff --git a/test/emqttd_SUITE_data/certs/client-key.pem b/test/emqttd_SUITE_data/certs/client-key.pem deleted file mode 100644 index 74cf487cb..000000000 --- a/test/emqttd_SUITE_data/certs/client-key.pem +++ /dev/null @@ -1,27 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEpQIBAAKCAQEApjzJInjLSTuDxiEBt9LsSEhKESaBdsNLnxYWRmAhGqhZR2kq -HOp04glJp8WnQ6Y5DQbOEekQ3zPczydEHYEhTjibFoEBB8pvxEE3BMddfv9jQ2Vj -CcmXaVmZVOe0sB31g8GGtlxV9W1fqQL4QfHJYsApMoE8KdXGXMEKW/E32cksvK+Z -sK1qtp4QkD7WKjMYzY6Uf1Hnmc7flIdwhF0PIaJoxopw155xsK2MARD/Run8078g -lk8q7FgtUN+6ADM25MBUaCAUgEFlwb/YYCNPLCAWL150RiljiWRsQD+GuE76RUvp -6ovnxVZ+JLVH3B9z6EjIezymr6xU9oUGq8i7dQIDAQABAoIBAFkHEMjPXD96ChZf -suXZpgUIAfKxZoBOEv+9+mvyK4h1RGsEHTOjNLmhM7sQFYYbTU52qIHbCdgflE+0 -vbv3XfjgQ96HdB/SAI1gR7DdfGr5JxX/BE1HkzkubPmVpaT0RnoreJPNW5O24ZZI -KuBWNv4V33pWz/uvqy4djAi1ZK3TPDhn9cVCMwV/ISCPlofrNDB/4ZNOMeaQgiR+ -sGqv+Q0ok2ao7Y04QHPh5i+5o+5oBoiJAO/49q9uPdpO181/8H71jll0QL+h5Off -nyWkAAOcgEeX9T4ZnfTUivGdSwB/Y+LS97Ozdr6kp5Fdk8WdDn0DL4fHRrnJ4IJD -EIAn/sECgYEA2oOCRBMccr49wbu+cKlkICt/4ARzJWKysdLlK0tYQknkDK1bzoHO -9JerRJL4E9bKp8zNlobfP1hWV0TFpwYsK3RvZoLvCwaSHeqUCZ4wQvKrWP1FieJ2 -5kjO5iMvXiy/kNHdTEXsj0x6RKuUSVgzNIuILvCCQ9Z7JVa/3NWS1SkCgYEAwsF0 -TWxCjryQv8y4mFSUlyF+y+ntnWAvpe/1Wv3+dNdhsccUfcq3zPMuLEj5DEoIvlTy -jLkFLVJ468Ou7S1oSVetVT3wWoLP2eFDEU/sYjjPdf4IMSO1jWIPLC3WV7zsFb62 -jwG2en1qfz8AxrVl+zj4lWCbgA9Soi41NMiCUW0CgYEAokQEST8T4hVp0OL1Qb5Y -bxc+Z4GGbF3Fqw2cRrE1wkwSwGNACLMWl0XF1i95b2oSpdcNWFmhkO2teDLGwAhy -ZnaZfzt9/ecMPJEFC7tfxWdlXLj/mawFdW7dzcKVG08JlqZxuoE2cRduuG3duTV5 -GO0A3TKW2X99hTXNVlV3KzkCgYEAsaE8cHkzY3h9FVKlctqCBC3atiWQQZ+/Fbv8 -rpdHBE6Fnl4TRIAmj9mk3WNZM2o6+04DQ3JlVGcKPw7ldxGZMnuzbjHmDMeOyAx6 -3UlmMlfacKXX1unY5zDu4b6U5sU7FsIxQ9GuG55UCebu0E4Wy8G0iJnqeix/k8hN -Yu0WXykCgYEAo0kIm7sh9j0+r419Lo2kT4zlzFlNdJEa4+lFVISRqouDuhUO8VFE -/ZpGRcqIM7dH6iBM2Htasf7l/hyWKzDEvWCEpa4icicFYAJ92AgK7UBWbNbhueof -PyVx5G2o7amvyZNtJYUo4TpJ9eH5YbsBRBqWCJcBUAfrItrprxB1LMs= ------END RSA PRIVATE KEY----- diff --git a/test/emqttd_SUITE_data/certs/key.pem b/test/emqttd_SUITE_data/certs/key.pem deleted file mode 100644 index 7001093ef..000000000 --- a/test/emqttd_SUITE_data/certs/key.pem +++ /dev/null @@ -1,27 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEpAIBAAKCAQEAuBJ2uL4Fq88FAdIRltb9LyvfOhYVOWXWM+R+TBqJ31sL/PZ0 -dg5X7Ug1xLyhcnNjE/NRhptelSb95laUCS7igMSwS2O2tc0qyXboF1svpoCSUz1u -io02OIz7Xwpesf8la/GHJVWvFztCRczpntizFWJXP4sVRGuulKcUTa4Oclh0tuin -AdZRNxixEaEQShe57MhqhmOF/U8qQcIJQlaKit3SlRsArxbq/qD57fwuacEelJlF -fz6X9RGkQ+z6PsXMiUngr7MxtxR+Ih7Ax54y3g0epcaLrzAm/miiYvXdMaMDo4Wy -qBTBde/beElZ6bUHqvlOrot3DylI4mu2YdbWUwIDAQABAoIBADXYWNhT5c7LYTiW -HcUVIL0CxWr1eMHwk0dcyME0Zi5rMMePxKOgMIJdxDTHxSZ4sHvuimOo4XMaE92k -Z+uDxohKgROcmJ735FNIsD3c08SOCb/F0adABaNnQkUcAHVrIKRB4/m85doS4KEQ -fyqTU1enC8Svx8nbAhfEBEFw8BLsZD9UnQAEAU5W9S5aKPHNrYRDz5UE0ZP28ixC -4PtCew96uCqA0u+xZnWCGawF27FD9P88pcYSJqebF1iFYkXrAwdhAbqewHOqQJXf -KJpbpjflBvZr/oTVZ3GAnnHnZDiusFmCKIHB9dKimHMdTFVIU2ikOeJZLtgXsBjb -Wn3Fa8kCgYEA2fK0t9NPmELw43D7VoCNeUmu6KmLLd7CeRiQ/OkPLKTqrudnUZGi -uMinPFijGTLX3SmByAVOkzMKBQOYF+eB1X24kbRLmL4JKzr04hSqOKqG5gJctC+x -V5qQX7ZxrNxFRiSodILbnQN/z1gwZMfrAU0t0EKIKjZR3lpj8CELv1cCgYEA2DWn -9V6PCZPcHzoFabhb8DJFglUTHk0zINVe97qldvMvn0MgsjgyS2j954nX8ef7uE1O -Cf+9nN709Fu8kEC7/KzWXxP3/O58TfJ6NivCQSr5i0OJLumQMVNrS+u/VG1PaVbS -2oCwP3QFayOxZSj9wq2MARd1JkqzHmi8skZLz2UCgYEAgtnv3En3CLBwFe14SPgH -eGFfrPpVwGV0luXD7sQyQxiEehwecN+iNZTqqxWAXpmi9np8G83r3f6PrnD4+Kka -z0Wa8Yewt3So5paP/chwZnMjaKbUZ64WqET5Fy3fU+wvfyx1IvaJydwW+TK2Y1uP -4Yknz1iSjd1tC7VzOPFuLyMCgYBrTFWKQ98glayMIrNFACVAUvKD98yBITbaeImk -z5AGNDHSC/JR/+mV2wkGuzXb65DUqiisdaqYC13tVwmBXV7tyqiojrRnZcNyu39D -GvxQcw9cuat/CJJyqD97cgeF0qmyUVBa97qAAwgdX51N4sXss0vjzsxosHGsCbZ7 -kr9UsQKBgQCMTtdCeA+uK/OeJtzf4CYZKR9xllQ+P6gCtbQ7WHuLBX/x+ZhvTC0p -qVLVWwFsJ6ivc1f74sy8hZPiePk9fqAqA1JIjDHrof0M3TxRVFvB7dej5XIYVirn -521DyZGfE+N7HA7qW5cGKZT0+UYLVp4gnv88nNKDuS18lafy8JRrfQ== ------END RSA PRIVATE KEY----- diff --git a/test/emqttd_SUITE_data/emqttd.conf b/test/emqttd_SUITE_data/emqttd.conf deleted file mode 100644 index c3694c4b9..000000000 --- a/test/emqttd_SUITE_data/emqttd.conf +++ /dev/null @@ -1,479 +0,0 @@ - -##=================================================================== -## EMQ Configuration R2.2 -##=================================================================== - -##-------------------------------------------------------------------- -## Cluster -##-------------------------------------------------------------------- - -## The cluster Id -cluster.id = emq - -## The multicast address and port. -cluster.multicast = 239.192.0.1:44369 - -##-------------------------------------------------------------------- -## Node Args -##-------------------------------------------------------------------- - -## Node name -node.name = emqttd@127.0.0.1 - -## Cookie for distributed node -node.cookie = emqsecretcookie - -## SMP support: enable, auto, disable -node.smp = auto - -## vm.args: -heart -## Heartbeat monitoring of an Erlang runtime system -## Value should be 'on' or comment the line -## node.heartbeat = on - -## Enable kernel poll -node.kernel_poll = on - -## async thread pool -node.async_threads = 32 - -## Erlang Process Limit -node.process_limit = 256000 - -## Sets the maximum number of simultaneously existing ports for this system -node.max_ports = 65536 - -## Set the distribution buffer busy limit (dist_buf_busy_limit) -node.dist_buffer_size = 32MB - -## Max ETS Tables. -## Note that mnesia and SSL will create temporary ets tables. -node.max_ets_tables = 256000 - -## Tweak GC to run more often -node.fullsweep_after = 1000 - -## Crash dump -node.crash_dump = {{ platform_log_dir }}/crash.dump - -## Distributed node ticktime -node.dist_net_ticktime = 60 - -## Distributed node port range -node.dist_listen_min = 6369 -node.dist_listen_max = 6369 - -##-------------------------------------------------------------------- -## Log -##-------------------------------------------------------------------- - -## Set the log dir -log.dir = {{ platform_log_dir }} - -## Console log. Enum: off, file, console, both -log.console = console - -## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency -log.console.level = error - -## Syslog. Enum: on, off -log.syslog = on - -## syslog level. Enum: debug, info, notice, warning, error, critical, alert, emergency -log.syslog.level = error - -## Console log file -## log.console.file = {{ platform_log_dir }}/console.log - -## Error log file -log.error.file = {{ platform_log_dir }}/error.log - -## Enable the crash log. Enum: on, off -log.crash = on - -log.crash.file = {{ platform_log_dir }}/crash.log - -##-------------------------------------------------------------------- -## Allow Anonymous and Default ACL -##-------------------------------------------------------------------- - -## Allow Anonymous authentication -mqtt.allow_anonymous = true - -## ACL nomatch -mqtt.acl_nomatch = allow - -## Default ACL File -mqtt.acl_file = {{ platform_etc_dir }}/acl.conf - -## Cache ACL for PUBLISH -mqtt.cache_acl = true - -##-------------------------------------------------------------------- -## MQTT Protocol -##-------------------------------------------------------------------- - -## Max ClientId Length Allowed. -mqtt.max_clientid_len = 1024 - -## Max Packet Size Allowed, 64K by default. -mqtt.max_packet_size = 64KB - -## Check Websocket Protocol Header. Enum: on, off -mqtt.websocket_protocol_header = on - -##-------------------------------------------------------------------- -## MQTT Connection -##-------------------------------------------------------------------- - -## Force GC: integer. Value 0 disabled the Force GC. -mqtt.conn.force_gc_count = 100 - -##-------------------------------------------------------------------- -## MQTT Client -##-------------------------------------------------------------------- - -## Client Idle Timeout (Second) -mqtt.client.idle_timeout = 30s - -## Max publish rate of Messages -## mqtt.client.max_publish_rate = 5 - -## Enable client Stats: on | off -mqtt.client.enable_stats = off - -##-------------------------------------------------------------------- -## MQTT Session -##-------------------------------------------------------------------- - -## Max Number of Subscriptions, 0 means no limit. -mqtt.session.max_subscriptions = 0 - -## Upgrade QoS? -mqtt.session.upgrade_qos = off - -## Max Size of the Inflight Window for QoS1 and QoS2 messages -## 0 means no limit -mqtt.session.max_inflight = 32 - -## Retry Interval for redelivering QoS1/2 messages. -mqtt.session.retry_interval = 20s - -## Client -> Broker: Max Packets Awaiting PUBREL, 0 means no limit -mqtt.session.max_awaiting_rel = 100 - -## Awaiting PUBREL Timeout -mqtt.session.await_rel_timeout = 20s - -## Enable Statistics: on | off -mqtt.session.enable_stats = off - -## Expired after 1 day: -## w - week -## d - day -## h - hour -## m - minute -## s - second -mqtt.session.expiry_interval = 2h - -## Ignore message from self publish -mqtt.session.ignore_loop_deliver = false - -##-------------------------------------------------------------------- -## MQTT Message Queue -##-------------------------------------------------------------------- - -## Type: simple | priority -mqtt.mqueue.type = simple - -## Topic Priority: 0~255, Default is 0 -## mqtt.mqueue.priority = topic/1=10,topic/2=8 - -## Max queue length. Enqueued messages when persistent client disconnected, -## or inflight window is full. 0 means no limit. -mqtt.mqueue.max_length = 1000 - -## Low-water mark of queued messages -mqtt.mqueue.low_watermark = 20% - -## High-water mark of queued messages -mqtt.mqueue.high_watermark = 60% - -## Queue Qos0 messages? -mqtt.mqueue.store_qos0 = true - -##-------------------------------------------------------------------- -## MQTT Broker and PubSub -##-------------------------------------------------------------------- - -## System Interval of publishing broker $SYS Messages -mqtt.broker.sys_interval = 60 - -## PubSub Pool Size. Default should be scheduler numbers. -mqtt.pubsub.pool_size = 8 - -mqtt.pubsub.by_clientid = true - -## Subscribe Asynchronously -mqtt.pubsub.async = true - -##-------------------------------------------------------------------- -## MQTT Bridge -##-------------------------------------------------------------------- - -## Bridge Queue Size -mqtt.bridge.max_queue_len = 10000 - -## Ping Interval of bridge node. Unit: Second -mqtt.bridge.ping_down_interval = 1 - -##------------------------------------------------------------------- -## MQTT Plugins -##------------------------------------------------------------------- - -## Dir of plugins' config -mqtt.plugins.etc_dir ={{ platform_etc_dir }}/plugins/ - -## File to store loaded plugin names. -mqtt.plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins - -##-------------------------------------------------------------------- -## MQTT Listeners -##-------------------------------------------------------------------- - -##-------------------------------------------------------------------- -## External TCP Listener - -## External TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 -listener.tcp.external = 0.0.0.0:1883 - -## Size of acceptor pool -listener.tcp.external.acceptors = 16 - -## Maximum number of concurrent clients -listener.tcp.external.max_clients = 102400 - -#listener.tcp.external.mountpoint = external/ - -## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -#listener.tcp.external.rate_limit = 100,10 - -#listener.tcp.external.access.1 = allow 192.168.0.0/24 - -listener.tcp.external.access.2 = allow all - -## Proxy Protocol V1/2 -## listener.tcp.external.proxy_protocol = on -## listener.tcp.external.proxy_protocol_timeout = 3s - -## TCP Socket Options -listener.tcp.external.backlog = 1024 - -#listener.tcp.external.recbuf = 4KB - -#listener.tcp.external.sndbuf = 4KB - -listener.tcp.external.buffer = 4KB - -listener.tcp.external.nodelay = true - -##-------------------------------------------------------------------- -## Internal TCP Listener - -## Internal TCP Listener: 11883, 127.0.0.1:11883, ::1:11883 -listener.tcp.internal = 127.0.0.1:11883 - -## Size of acceptor pool -listener.tcp.internal.acceptors = 16 - -## Maximum number of concurrent clients -listener.tcp.internal.max_clients = 102400 - -#listener.tcp.external.mountpoint = internal/ - -## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## listener.tcp.internal.rate_limit = 1000,100 - -## TCP Socket Options -listener.tcp.internal.backlog = 512 - -listener.tcp.internal.tune_buffer = on - -listener.tcp.internal.buffer = 1MB - -listener.tcp.internal.recbuf = 4KB - -listener.tcp.internal.sndbuf = 1MB - -listener.tcp.internal.nodelay = true - -##-------------------------------------------------------------------- -## External SSL Listener - -## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883 -listener.ssl.external = 8883 - -## Size of acceptor pool -listener.ssl.external.acceptors = 16 - -## Maximum number of concurrent clients -listener.ssl.external.max_clients = 1024 - -## listener.ssl.external.mountpoint = inbound/ - -## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## listener.ssl.external.rate_limit = 100,10 - -## Proxy Protocol V1/2 -## listener.ssl.external.proxy_protocol = on -## listener.ssl.external.proxy_protocol_timeout = 3s - -listener.ssl.external.access.1 = allow all - -### SSL Options. See http://erlang.org/doc/man/ssl.html - -## Configuring SSL Options. See http://erlang.org/doc/man/ssl.html -### TLS only for POODLE attack -## listener.ssl.external.tls_versions = tlsv1.2,tlsv1.1,tlsv1 - -### The Ephemeral Diffie-Helman key exchange is a very effective way of -### ensuring Forward Secrecy by exchanging a set of keys that never hit -### the wire. Since the DH key is effectively signed by the private key, -### it needs to be at least as strong as the private key. In addition, -### the default DH groups that most of the OpenSSL installations have -### are only a handful (since they are distributed with the OpenSSL -### package that has been built for the operating system it’s running on) -### and hence predictable (not to mention, 1024 bits only). - -### In order to escape this situation, first we need to generate a fresh, -### strong DH group, store it in a file and then use the option above, -### to force our SSL application to use the new DH group. Fortunately, -### OpenSSL provides us with a tool to do that. Simply run: -### openssl dhparam -out dh-params.pem 2048 - -listener.ssl.external.handshake_timeout = 15s - -listener.ssl.external.keyfile = certs/key.pem - -listener.ssl.external.certfile = certs/cert.pem - -## listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem - -## listener.ssl.external.dhfile = {{ platform_etc_dir }}/certs/dh-params.pem - -## listener.ssl.external.verify = verify_peer - -## listener.ssl.external.fail_if_no_peer_cert = true - -### This is the single most important configuration option of an Erlang SSL application. -### Ciphers (and their ordering) define the way the client and server encrypt information -### over the wire, from the initial Diffie-Helman key exchange, the session key encryption -### algorithm and the message digest algorithm. Selecting a good cipher suite is critical -### for the application’s data security, confidentiality and performance. -### The cipher list above offers: -### -### A good balance between compatibility with older browsers. It can get stricter for Machine-To-Machine scenarios. -### Perfect Forward Secrecy. -### No old/insecure encryption and HMAC algorithms -### -### Most of it was copied from Mozilla’s Server Side TLS article -## listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA - -### SSL parameter renegotiation is a feature that allows a client and -### a server to renegotiate the parameters of the SSL connection on the fly. -### RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation, -### you drop support for the insecure renegotiation, prone to MitM attacks. -## listener.ssl.external.secure_renegotiate = off - -### A performance optimization setting, it allows clients to reuse -### pre-existing sessions, instead of initializing new ones. -### Read more about it here. -## listener.ssl.external.reuse_sessions = on - -### An important security setting, it forces the cipher to be set based on -### the server-specified order instead of the client-specified order, -### hence enforcing the (usually more properly configured) security -### ordering of the server administrator. -## listener.ssl.external.honor_cipher_order = on - -### Use the CN or DN value from the client certificate as a username. -### Notice: 'verify' should be configured as 'verify_peer' -## listener.ssl.external.peer_cert_as_username = cn - -##-------------------------------------------------------------------- -## External MQTT/WebSocket Listener - -listener.ws.external = 8083 - -listener.ws.external.acceptors = 4 - -listener.ws.external.max_clients = 64 - -listener.ws.external.access.1 = allow all - -## TCP Options -listener.ws.external.backlog = 1024 - -listener.ws.external.recbuf = 4KB - -listener.ws.external.sndbuf = 4KB - -listener.ws.external.buffer = 4KB - -listener.ws.external.nodelay = true - -##-------------------------------------------------------------------- -## External MQTT/WebSocket/SSL Listener - -listener.wss.external = 8084 - -listener.wss.external.acceptors = 4 - -listener.wss.external.max_clients = 64 - -listener.wss.external.access.1 = allow all - -## SSL Options -listener.wss.external.handshake_timeout = 15s - -listener.wss.external.keyfile = certs/key.pem - -listener.wss.external.certfile = certs/cert.pem - -## listener.wss.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem - -## listener.wss.external.verify = verify_peer - -## listener.wss.external.fail_if_no_peer_cert = true - -##-------------------------------------------------------------------- -## HTTP Management API Listener - -listener.api.mgmt = 127.0.0.1:8080 - -listener.api.mgmt.acceptors = 4 - -listener.api.mgmt.max_clients = 64 - -listener.api.mgmt.access.1 = allow all - -##------------------------------------------------------------------- -## System Monitor -##------------------------------------------------------------------- - -## Long GC, don't monitor in production mode for: -## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 -sysmon.long_gc = false - -## Long Schedule(ms) -sysmon.long_schedule = 240 - -## 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. -sysmon.large_heap = 8MB - -## Busy Port -sysmon.busy_port = false - -## Busy Dist Port -sysmon.busy_dist_port = true - diff --git a/test/emqttd_SUITE_data/emqttd.schema b/test/emqttd_SUITE_data/emqttd.schema deleted file mode 100644 index a2860322c..000000000 --- a/test/emqttd_SUITE_data/emqttd.schema +++ /dev/null @@ -1,1173 +0,0 @@ -%%-*- mode: erlang -*- -%% EMQ config mapping - -%%-------------------------------------------------------------------- -%% Cluster -%%-------------------------------------------------------------------- - -%% Cluster ID -{mapping, "cluster.id", "emqttd.cluster", [ - {default, "emq"}, - {datatype, string} -]}. - -%% Cluster Multicast Addr -{mapping, "cluster.multicast", "emqttd.cluster", [ - {default, "239.192.0.1:44369"}, - {datatype, string} -]}. - -{translation, "emqttd.cluster", fun(Conf) -> - Multicast = cuttlefish:conf_get("cluster.multicast", Conf), - [Addr, Port] = string:tokens(Multicast, ":"), - {ok, Ip} = inet_parse:address(Addr), - [{id, cuttlefish:conf_get("cluster.id", Conf)}, - {multicast, {Ip, list_to_integer(Port)}}] -end}. - -%%-------------------------------------------------------------------- -%% Erlang Node -%%-------------------------------------------------------------------- - -%% @doc Erlang node name -{mapping, "node.name", "vm_args.-name", [ - {default, "emq@127.0.0.1"} -]}. - -%% @doc Secret cookie for distributed erlang node -{mapping, "node.cookie", "vm_args.-setcookie", [ - {default, "emqsecretcookie"} -]}. - -%% @doc SMP Support -{mapping, "node.smp", "vm_args.-smp", [ - {default, auto}, - {datatype, {enum, [enable, auto, disable]}}, - hidden -]}. - -%% @doc http://erlang.org/doc/man/heart.html -{mapping, "node.heartbeat", "vm_args.-heart", [ - {datatype, flag}, - hidden -]}. - -{translation, "vm_args.-heart", fun(Conf) -> - case cuttlefish:conf_get("node.heartbeat", Conf) of - true -> ""; - false -> cuttlefish:invalid("should be 'on' or comment the line!") - end -end}. - -%% @doc Enable Kernel Poll -{mapping, "node.kernel_poll", "vm_args.+K", [ - {default, on}, - {datatype, flag}, - hidden -]}. - -%% @doc More information at: http://erlang.org/doc/man/erl.html -{mapping, "node.async_threads", "vm_args.+A", [ - {default, 64}, - {datatype, integer}, - {validators, ["range:0-1024"]} -]}. - -%% @doc Erlang Process Limit -{mapping, "node.process_limit", "vm_args.+P", [ - {datatype, integer}, - {default, 256000}, - hidden -]}. - -%% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q -%% @doc The number of concurrent ports/sockets -%% Valid range is 1024-134217727 -{mapping, "node.max_ports", - cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [ - {default, 262144}, - {datatype, integer}, - {validators, ["range4ports"]} -]}. - -{validator, "range4ports", "must be 1024 to 134217727", - fun(X) -> X >= 1024 andalso X =< 134217727 end}. - -%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl -{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [ - {datatype, bytesize}, - {commented, "32MB"}, - hidden, - {validators, ["zdbbl_range"]} -]}. - -{translation, "vm_args.+zdbbl", - fun(Conf) -> - ZDBBL = cuttlefish:conf_get("node.dist_buffer_size", Conf, undefined), - case ZDBBL of - undefined -> undefined; - X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes; - _ -> undefined - end - end -}. - -{validator, "zdbbl_range", "must be between 1KB and 2097151KB", - fun(ZDBBL) -> - %% 2097151KB = 2147482624 - ZDBBL >= 1024 andalso ZDBBL =< 2147482624 - end -}. - -%% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2 -{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [ - {default, 1000}, - {datatype, integer}, - hidden, - {validators, ["positive_integer"]} -]}. - -{validator, "positive_integer", "must be a positive integer", - fun(X) -> X >= 0 end}. - -%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES, -%% R16+ uses +e -%% @doc The ETS table limit -{mapping, "node.max_ets_tables", - cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [ - {default, 256000}, - {datatype, integer}, - hidden -]}. - -%% @doc Set the location of crash dumps -{mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [ - {default, "{{crash_dump}}"}, - {datatype, file}, - hidden -]}. - -%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime -{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [ - {commented, 60}, - {datatype, integer}, - hidden -]}. - -%% @doc http://www.erlang.org/doc/man/kernel_app.html -{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [ - {commented, 6369}, - {datatype, integer}, - hidden -]}. - -%% @see node.dist_listen_min -{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [ - {commented, 6369}, - {datatype, integer}, - hidden -]}. - -%%-------------------------------------------------------------------- -%% Log -%%-------------------------------------------------------------------- - -{mapping, "log.dir", "lager.log_dir", [ - {default, "log"}, - {datatype, string} -]}. - -{mapping, "log.console", "lager.handlers", [ - {default, file}, - {datatype, {enum, [off, file, console, both]}} -]}. - -{mapping, "log.console.level", "lager.handlers", [ - {default, info}, - {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}} -]}. - -{mapping, "log.console.file", "lager.handlers", [ - {default, "log/console.log"}, - {datatype, file} -]}. - -{mapping, "log.error.file", "lager.handlers", [ - {default, "log/error.log"}, - {datatype, file} -]}. - -{mapping, "log.syslog", "lager.handlers", [ - {default, off}, - {datatype, flag} -]}. - -{mapping, "log.syslog.identity", "lager.handlers", [ - {default, "emqttd"}, - {datatype, string} -]}. - -{mapping, "log.syslog.facility", "lager.handlers", [ - {default, local0}, - {datatype, {enum, [daemon, local0, local1, local2, local3, local4, local5, local6, local7]}} -]}. - -{mapping, "log.syslog.level", "lager.handlers", [ - {default, error}, - {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency]}} -]}. - -{mapping, "log.error.redirect", "lager.error_logger_redirect", [ - {default, on}, - {datatype, flag}, - hidden -]}. - -{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [ - {default, 1000}, - {datatype, integer}, - hidden -]}. - -{translation, - "lager.handlers", - fun(Conf) -> - ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of - undefined -> []; - ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename}, - {level, error}, - {size, 10485760}, - {date, "$D0"}, - {count, 5}]}] - end, - - ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf), - ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf), - - ConsoleHandler = {lager_console_backend, ConsoleLogLevel}, - ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile}, - {level, ConsoleLogLevel}, - {size, 10485760}, - {date, "$D0"}, - {count, 5}]}, - - ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of - off -> []; - file -> [ConsoleFileHandler]; - console -> [ConsoleHandler]; - both -> [ConsoleHandler, ConsoleFileHandler]; - _ -> [] - end, - - SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf) of - false -> []; - true -> [{lager_syslog_backend, - [cuttlefish:conf_get("log.syslog.identity", Conf), - cuttlefish:conf_get("log.syslog.facility", Conf), - cuttlefish:conf_get("log.syslog.level", Conf)]}] - end, - - ConsoleHandlers ++ ErrorHandler ++ SyslogHandler - end -}. - -{mapping, "log.crash", "lager.crash_log", [ - {default, on}, - {datatype, flag} -]}. - -{mapping, "log.crash.file", "lager.crash_log", [ - {default, "log/crash.log"}, - {datatype, file} -]}. - -{translation, - "lager.crash_log", - fun(Conf) -> - case cuttlefish:conf_get("log.crash", Conf) of - false -> undefined; - _ -> - cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log") - end - end}. - -{mapping, "sasl", "sasl.sasl_error_logger", [ - {default, off}, - {datatype, flag}, - hidden -]}. - -%%-------------------------------------------------------------------- -%% Allow Anonymous and Default ACL -%%-------------------------------------------------------------------- - -%% @doc Allow Anonymous -{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [ - {default, false}, - {datatype, {enum, [true, false]}} -]}. - -%% @doc ACL nomatch -{mapping, "mqtt.acl_nomatch", "emqttd.acl_nomatch", [ - {default, allow}, - {datatype, {enum, [allow, deny]}} -]}. - -%% @doc Default ACL File -{mapping, "mqtt.acl_file", "emqttd.acl_file", [ - {datatype, string}, - hidden -]}. - -%% @doc Cache ACL for PUBLISH -{mapping, "mqtt.cache_acl", "emqttd.cache_acl", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -%%-------------------------------------------------------------------- -%% MQTT Protocol -%%-------------------------------------------------------------------- - -%% @doc Set the Max ClientId Length Allowed. -{mapping, "mqtt.max_clientid_len", "emqttd.protocol", [ - {default, 1024}, - {datatype, integer} -]}. - -%% @doc Max Packet Size Allowed, 64K by default. -{mapping, "mqtt.max_packet_size", "emqttd.protocol", [ - {default, "64KB"}, - {datatype, bytesize} -]}. - -{translation, "emqttd.protocol", fun(Conf) -> - [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)}, - {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}] -end}. - -{mapping, "mqtt.websocket_protocol_header", "emqttd.websocket_protocol_header", [ - {default, on}, - {datatype, flag} -]}. - -%%-------------------------------------------------------------------- -%% MQTT Connection -%%-------------------------------------------------------------------- - -%% @doc Force the client to GC: integer -{mapping, "mqtt.conn.force_gc_count", "emqttd.conn_force_gc_count", [ - {datatype, integer} -]}. - -%%-------------------------------------------------------------------- -%% MQTT Client -%%-------------------------------------------------------------------- - -%% @doc Max Publish Rate of Message -{mapping, "mqtt.client.max_publish_rate", "emqttd.client", [ - {default, 0}, - {datatype, integer} -]}. - -%% @doc Client Idle Timeout. -{mapping, "mqtt.client.idle_timeout", "emqttd.client", [ - {default, "30s"}, - {datatype, {duration, ms}} -]}. - -%% @doc Enable Stats of Client. -{mapping, "mqtt.client.enable_stats", "emqttd.client", [ - {default, off}, - {datatype, flag} -]}. - -{translation, "emqttd.client", fun(Conf) -> - [{max_publish_rate, cuttlefish:conf_get("mqtt.client.max_publish_rate", Conf)}, - {client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)}, - {client_enable_stats, cuttlefish:conf_get("mqtt.client.enable_stats", Conf)}] -end}. - -%%-------------------------------------------------------------------- -%% MQTT Session -%%-------------------------------------------------------------------- - -%% @doc Max Number of Subscriptions Allowed -{mapping, "mqtt.session.max_subscriptions", "emqttd.session", [ - {default, 0}, - {datatype, integer} -]}. - -%% @doc Upgrade QoS? -{mapping, "mqtt.session.upgrade_qos", "emqttd.session", [ - {default, off}, - {datatype, flag} -]}. - -%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time. -%% 0 means no limit -{mapping, "mqtt.session.max_inflight", "emqttd.session", [ - {default, 100}, - {datatype, integer} -]}. - -%% @doc Retry interval for redelivering QoS1/2 messages. -{mapping, "mqtt.session.retry_interval", "emqttd.session", [ - {default, "20s"}, - {datatype, {duration, ms}} -]}. - -%% @doc Max Packets that Awaiting PUBREL, 0 means no limit -{mapping, "mqtt.session.max_awaiting_rel", "emqttd.session", [ - {default, 0}, - {datatype, integer} -]}. - -%% @doc Awaiting PUBREL Timeout -{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [ - {default, "20s"}, - {datatype, {duration, ms}} -]}. - -%% @doc Enable Stats -{mapping, "mqtt.session.enable_stats", "emqttd.session", [ - {default, off}, - {datatype, flag} -]}. - -%% @doc Session Expiry Interval -{mapping, "mqtt.session.expiry_interval", "emqttd.session", [ - {default, "2h"}, - {datatype, {duration, ms}} -]}. - -%% @doc Ignore message from self publish -{mapping, "mqtt.session.ignore_loop_deliver", "emqttd.session", [ - {default, false}, - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqttd.session", fun(Conf) -> - [{max_subscriptions, cuttlefish:conf_get("mqtt.session.max_subscriptions", Conf)}, - {upgrade_qos, cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)}, - {max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)}, - {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)}, - {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)}, - {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)}, - {enable_stats, cuttlefish:conf_get("mqtt.session.enable_stats", Conf)}, - {expiry_interval, cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}, - {ignore_loop_deliver, cuttlefish:conf_get("mqtt.session.ignore_loop_deliver", Conf)}] -end}. - -%%-------------------------------------------------------------------- -%% MQTT MQueue -%%-------------------------------------------------------------------- - -%% @doc Type: simple | priority -{mapping, "mqtt.mqueue.type", "emqttd.mqueue", [ - {default, simple}, - {datatype, atom} -]}. - -%% @doc Topic Priority: 0~255, Default is 0 -{mapping, "mqtt.mqueue.priority", "emqttd.mqueue", [ - {default, ""}, - {datatype, string} -]}. - -%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. 0 means no limit. -{mapping, "mqtt.mqueue.max_length", "emqttd.mqueue", [ - {default, 0}, - {datatype, integer} -]}. - -%% @doc Low-water mark of queued messages -{mapping, "mqtt.mqueue.low_watermark", "emqttd.mqueue", [ - {default, "20%"}, - {datatype, string} -]}. - -%% @doc High-water mark of queued messages -{mapping, "mqtt.mqueue.high_watermark", "emqttd.mqueue", [ - {default, "60%"}, - {datatype, string} -]}. - -%% @doc Queue Qos0 messages? -{mapping, "mqtt.mqueue.store_qos0", "emqttd.mqueue", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqttd.mqueue", fun(Conf) -> - Parse = fun(S) -> - {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), - list_to_integer(N) / 100 - end, - Opts = [{type, cuttlefish:conf_get("mqtt.mqueue.type", Conf, simple)}, - {max_length, cuttlefish:conf_get("mqtt.mqueue.max_length", Conf)}, - {low_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf))}, - {high_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf))}, - {store_qos0, cuttlefish:conf_get("mqtt.mqueue.store_qos0", Conf)}], - case cuttlefish:conf_get("mqtt.mqueue.priority", Conf) of - undefined -> Opts; - V -> [{priority, - [begin [T, P] = string:tokens(S, "="), - {T, list_to_integer(P)} - end || S <- string:tokens(V, ",")]} | Opts] - end -end}. - -%%-------------------------------------------------------------------- -%% MQTT Broker -%%-------------------------------------------------------------------- - -{mapping, "mqtt.broker.sys_interval", "emqttd.broker_sys_interval", [ - {default, 60}, - {datatype, integer} -]}. - -%%-------------------------------------------------------------------- -%% MQTT PubSub -%%-------------------------------------------------------------------- - -{mapping, "mqtt.pubsub.pool_size", "emqttd.pubsub", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "mqtt.pubsub.by_clientid", "emqttd.pubsub", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{mapping, "mqtt.pubsub.async", "emqttd.pubsub", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqttd.pubsub", fun(Conf) -> - [{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)}, - {by_clientid, cuttlefish:conf_get("mqtt.pubsub.by_clientid", Conf)}, - {async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}] -end}. - -%%-------------------------------------------------------------------- -%% MQTT Bridge -%%-------------------------------------------------------------------- - -{mapping, "mqtt.bridge.max_queue_len", "emqttd.bridge", [ - {default, 10000}, - {datatype, integer} -]}. - -{mapping, "mqtt.bridge.ping_down_interval", "emqttd.bridge", [ - {default, 1}, - {datatype, integer} -]}. - -{translation, "emqttd.bridge", fun(Conf) -> - [{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)}, - {ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}] -end}. - -%%------------------------------------------------------------------- -%% MQTT Plugins -%%------------------------------------------------------------------- - -{mapping, "mqtt.plugins.etc_dir", "emqttd.plugins_etc_dir", [ - {datatype, string} -]}. - -{mapping, "mqtt.plugins.loaded_file", "emqttd.plugins_loaded_file", [ - {datatype, string} -]}. - -%%-------------------------------------------------------------------- -%% MQTT Listeners -%%-------------------------------------------------------------------- - -%%-------------------------------------------------------------------- -%% TCP Listeners - -{mapping, "listener.tcp.$name", "emqttd.listeners", [ - {datatype, [integer, ip]} -]}. - -{mapping, "listener.tcp.$name.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "listener.tcp.$name.max_clients", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.tcp.$name.zone", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.tcp.$name.mountpoint", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.tcp.$name.rate_limit", "emqttd.listeners", [ - {default, undefined}, - {datatype, string} -]}. - -{mapping, "listener.tcp.$name.access.$id", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.tcp.$name.proxy_protocol", "emqttd.listeners", [ - %%{default, off}, - {datatype, flag} -]}. - -{mapping, "listener.tcp.$name.proxy_protocol_timeout", "emqttd.listeners", [ - %%{default, "5s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "listener.tcp.$name.backlog", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.tcp.$name.recbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.tcp.$name.sndbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.tcp.$name.buffer", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.tcp.$name.tune_buffer", "emqttd.listeners", [ - {datatype, flag}, - hidden -]}. - -{mapping, "listener.tcp.$name.nodelay", "emqttd.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -%%-------------------------------------------------------------------- -%% SSL Listeners - -{mapping, "listener.ssl.$name", "emqttd.listeners", [ - {datatype, [integer, ip]} -]}. - -{mapping, "listener.ssl.$name.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "listener.ssl.$name.max_clients", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.ssl.$name.zone", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.mountpoint", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.rate_limit", "emqttd.listeners", [ - {default, undefined}, - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.access.$id", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.proxy_protocol", "emqttd.listeners", [ - %%{default, off}, - {datatype, flag} -]}. - -{mapping, "listener.ssl.$name.proxy_protocol_timeout", "emqttd.listeners", [ - %%{default, "5s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "listener.ssl.$name.backlog", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.ssl.$name.recbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.ssl.$name.sndbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.ssl.$name.buffer", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.ssl.$name.tune_buffer", "emqttd.listeners", [ - {datatype, flag}, - hidden -]}. - -{mapping, "listener.ssl.$name.nodelay", "emqttd.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -{mapping, "listener.ssl.$name.tls_versions", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.ciphers", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.handshake_timeout", "emqttd.listeners", [ - {default, "15s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "listener.ssl.$name.dhfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.keyfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.certfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.cacertfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.verify", "emqttd.listeners", [ - {datatype, atom} -]}. - -{mapping, "listener.ssl.$name.fail_if_no_peer_cert", "emqttd.listeners", [ - {datatype, {enum, [true, false]}} -]}. - -{mapping, "listener.ssl.$name.secure_renegotiate", "emqttd.listeners", [ - {datatype, flag} -]}. - -{mapping, "listener.ssl.$name.reuse_sessions", "emqttd.listeners", [ - {default, on}, - {datatype, flag} -]}. - -{mapping, "listener.ssl.$name.honor_cipher_order", "emqttd.listeners", [ - {datatype, flag} -]}. - -{mapping, "listener.ssl.$name.peer_cert_as_username", "emqttd.listeners", [ - {datatype, {enum, [cn, dn]}} -]}. - -%%-------------------------------------------------------------------- -%% MQTT/WebSocket Listeners - -{mapping, "listener.ws.$name", "emqttd.listeners", [ - {datatype, [integer, ip]} -]}. - -{mapping, "listener.ws.$name.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "listener.ws.$name.max_clients", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.ws.$name.rate_limit", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ws.$name.zone", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ws.$name.access.$id", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ws.$name.backlog", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.ws.$name.recbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.ws.$name.sndbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.ws.$name.buffer", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.ws.$name.tune_buffer", "emqttd.listeners", [ - {datatype, flag}, - hidden -]}. - -{mapping, "listener.ws.$name.nodelay", "emqttd.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -%%-------------------------------------------------------------------- -%% MQTT/WebSocket/SSL Listeners - -{mapping, "listener.wss.$name", "emqttd.listeners", [ - {datatype, [integer, ip]} -]}. - -{mapping, "listener.wss.$name.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "listener.wss.$name.max_clients", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.wss.$name.zone", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.mountpoint", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.rate_limit", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.access.$id", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.backlog", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.wss.$name.recbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.wss.$name.sndbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.wss.$name.buffer", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.wss.$name.tune_buffer", "emqttd.listeners", [ - {datatype, flag}, - hidden -]}. - -{mapping, "listener.wss.$name.nodelay", "emqttd.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -{mapping, "listener.wss.$name.handshake_timeout", "emqttd.listeners", [ - {default, "15s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "listener.wss.$name.keyfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.certfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.cacertfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.verify", "emqttd.listeners", [ - {datatype, atom} -]}. - -{mapping, "listener.wss.$name.fail_if_no_peer_cert", "emqttd.listeners", [ - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqttd.listeners", fun(Conf) -> - - Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, - - Atom = fun(undefined) -> undefined; (S) -> list_to_atom(S) end, - - Access = fun(S) -> - [A, CIDR] = string:tokens(S, " "), - {list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end} - end, - - AccOpts = fun(Prefix) -> - case cuttlefish_variable:filter_by_prefix(Prefix ++ ".access", Conf) of - [] -> []; - Rules -> [{access, [Access(Rule) || {_, Rule} <- Rules]}] - end - end, - - MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end, - - ConnOpts = fun(Prefix) -> - Filter([{zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, - {rate_limit, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}, - {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, - {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, - {mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))}, - {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}]) - end, - - LisOpts = fun(Prefix) -> - Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, - {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, - {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)} | AccOpts(Prefix)]) - end, - TcpOpts = fun(Prefix) -> - Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, - {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, - {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, - {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, - {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}]) - end, - - SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, - - SslOpts = fun(Prefix) -> - Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of - undefined -> undefined; - L -> [list_to_atom(V) || V <- L] - end, - Filter([{versions, Versions}, - {ciphers, SplitFun(cuttlefish:conf_get(Prefix ++ ".ciphers", Conf, undefined))}, - {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)}, - {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, - {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, - {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, - {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, - {verify, cuttlefish:conf_get(Prefix ++ ".verify", Conf, undefined)}, - {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)}, - {secure_renegotiate, cuttlefish:conf_get(Prefix ++ ".secure_renegotiate", Conf, undefined)}, - {reuse_sessions, cuttlefish:conf_get(Prefix ++ ".reuse_sessions", Conf, undefined)}, - {honor_cipher_order, cuttlefish:conf_get(Prefix ++ ".honor_cipher_order", Conf, undefined)}]) - end, - - TcpListeners = fun(Type, Name) -> - Prefix = string:join(["listener", Type, Name], "."), - case cuttlefish:conf_get(Prefix, Conf, undefined) of - undefined -> - []; - ListenOn -> - [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, {sockopts, TcpOpts(Prefix)} | LisOpts(Prefix)]}] - end - end, - - SslListeners = fun(Type, Name) -> - Prefix = string:join(["listener", Type, Name], "."), - case cuttlefish:conf_get(Prefix, Conf, undefined) of - undefined -> - []; - ListenOn -> - [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, - {sockopts, TcpOpts(Prefix)}, - {sslopts, SslOpts(Prefix)} | LisOpts(Prefix)]}] - end - end, - - ApiListeners = fun(Type, Name) -> - Prefix = string:join(["listener", Type, Name], "."), - case cuttlefish:conf_get(Prefix, Conf, undefined) of - undefined -> - []; - ListenOn -> - SslOpts1 = case SslOpts(Prefix) of - [] -> []; - SslOpts0 -> [{sslopts, SslOpts0}] - end, - [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, - {sockopts, TcpOpts(Prefix)}| LisOpts(Prefix)] ++ SslOpts1}] - end - end, - - - lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} - <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] - ++ - [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} - <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)] - ++ - [ApiListeners(Type, Name) || {["listener", Type, Name], ListenOn} - <- cuttlefish_variable:filter_by_prefix("listener.api", Conf)]) -end}. - -%%-------------------------------------------------------------------- -%% MQTT REST API Listeners - -{mapping, "listener.api.$name", "emqttd.listeners", [ - {datatype, [integer, ip]} -]}. - -{mapping, "listener.api.$name.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "listener.api.$name.max_clients", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.api.$name.rate_limit", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.access.$id", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.backlog", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.api.$name.recbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.api.$name.sndbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.api.$name.buffer", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.api.$name.tune_buffer", "emqttd.listeners", [ - {datatype, flag}, - hidden -]}. - -{mapping, "listener.api.$name.nodelay", "emqttd.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -{mapping, "listener.api.$name.handshake_timeout", "emqttd.listeners", [ - {datatype, {duration, ms}} -]}. - -{mapping, "listener.api.$name.keyfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.certfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.cacertfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.verify", "emqttd.listeners", [ - {datatype, atom} -]}. - -{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqttd.listeners", [ - {datatype, {enum, [true, false]}} -]}. - -%%-------------------------------------------------------------------- -%% System Monitor -%%-------------------------------------------------------------------- - -%% @doc Long GC, don't monitor in production mode for: -%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 -{mapping, "sysmon.long_gc", "emqttd.sysmon", [ - {default, false}, - {datatype, {enum, [true, false]}} -]}. - -%% @doc Long Schedule(ms) -{mapping, "sysmon.long_schedule", "emqttd.sysmon", [ - {default, 1000}, - {datatype, integer} -]}. - -%% @doc Large Heap -{mapping, "sysmon.large_heap", "emqttd.sysmon", [ - {default, "8MB"}, - {datatype, bytesize} -]}. - -%% @doc Monitor Busy Port -{mapping, "sysmon.busy_port", "emqttd.sysmon", [ - {default, false}, - {datatype, {enum, [true, false]}} -]}. - -%% @doc Monitor Busy Dist Port -{mapping, "sysmon.busy_dist_port", "emqttd.sysmon", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqttd.sysmon", fun(Conf) -> - [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)}, - {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)}, - {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)}, - {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, - {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] -end}. - diff --git a/test/emqttd_lib_SUITE.erl b/test/emqttd_lib_SUITE.erl index 344e185d0..a808fbcc8 100644 --- a/test/emqttd_lib_SUITE.erl +++ b/test/emqttd_lib_SUITE.erl @@ -34,7 +34,7 @@ all() -> [{group, guid}, {group, opts}, {group, ?PQ}, {group, time}, - {group, node}, {group, base62}]. + {group, base62}]. groups() -> [{guid, [], [guid_gen, guid_hexstr, guid_base62]}, @@ -42,7 +42,6 @@ groups() -> {?PQ, [], [priority_queue_plen, priority_queue_out2]}, {time, [], [time_now_to_]}, - {node, [], [node_is_aliving, node_parse_name]}, {base62, [], [base62_encode]}]. %%-------------------------------------------------------------------- @@ -144,19 +143,6 @@ time_now_to_(_) -> emqttd_time:now_secs(), emqttd_time:now_ms(). -%%-------------------------------------------------------------------- -%% emqttd_node -%%-------------------------------------------------------------------- - -node_is_aliving(_) -> - io:format("Node: ~p~n", [node()]), - true = emqttd_node:is_aliving(node()), - false = emqttd_node:is_aliving('x@127.0.0.1'). - -node_parse_name(_) -> - 'a@127.0.0.1' = emqttd_node:parse_name("a@127.0.0.1"), - 'b@127.0.0.1' = emqttd_node:parse_name("b"). - %%-------------------------------------------------------------------- %% base62 encode decode %%-------------------------------------------------------------------- diff --git a/test/emqttd_mqueue_SUITE.erl b/test/emqttd_mqueue_SUITE.erl index e56b08398..93ccc9833 100644 --- a/test/emqttd_mqueue_SUITE.erl +++ b/test/emqttd_mqueue_SUITE.erl @@ -28,7 +28,7 @@ all() -> [t_in, t_in_qos0, t_out, t_simple_mqueue, t_priority_mqueue, t_in(_) -> Opts = [{max_length, 5}, - {queue_qos0, true}], + {store_qos0, true}], Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), true = ?Q:is_empty(Q), Q1 = ?Q:in(#mqtt_message{}, Q), @@ -42,7 +42,7 @@ t_in(_) -> t_in_qos0(_) -> Opts = [{max_length, 5}, - {queue_qos0, false}], + {store_qos0, false}], Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), Q1 = ?Q:in(#mqtt_message{}, Q), true = ?Q:is_empty(Q1), @@ -51,7 +51,7 @@ t_in_qos0(_) -> t_out(_) -> Opts = [{max_length, 5}, - {queue_qos0, true}], + {store_qos0, true}], Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), {empty, Q} = ?Q:out(Q), Q1 = ?Q:in(#mqtt_message{}, Q), @@ -64,7 +64,7 @@ t_simple_mqueue(_) -> {max_length, 3}, {low_watermark, 0.2}, {high_watermark, 0.6}, - {queue_qos0, false}], + {store_qos0, false}], Q = ?Q:new("simple_queue", Opts, alarm_fun()), simple = ?Q:type(Q), 3 = ?Q:max_len(Q), @@ -81,18 +81,18 @@ t_simple_mqueue(_) -> t_infinity_simple_mqueue(_) -> Opts = [{type, simple}, - {max_length, infinity}, + {max_length, 0}, {low_watermark, 0.2}, {high_watermark, 0.6}, - {queue_qos0, false}], + {store_qos0, false}], Q = ?Q:new("infinity_simple_queue", Opts, alarm_fun()), true = ?Q:is_empty(Q), - infinity = ?Q:max_len(Q), + 0 = ?Q:max_len(Q), Qx = lists:foldl(fun(I, AccQ) -> ?Q:in(#mqtt_message{qos = 1, payload = iolist_to_binary([I])}, AccQ) end, Q, lists:seq(1, 255)), 255 = ?Q:len(Qx), - [{len, 255}, {max_len, infinity}, {dropped, 0}] = ?Q:stats(Qx), + [{len, 255}, {max_len, 0}, {dropped, 0}] = ?Q:stats(Qx), {{value, V}, _Qy} = ?Q:out(Qx), <<1>> = V#mqtt_message.payload. @@ -102,7 +102,7 @@ t_priority_mqueue(_) -> {max_length, 3}, {low_watermark, 0.2}, {high_watermark, 0.6}, - {queue_qos0, false}], + {store_qos0, false}], Q = ?Q:new("priority_queue", Opts, alarm_fun()), priority = ?Q:type(Q), 3 = ?Q:max_len(Q), @@ -125,24 +125,24 @@ t_priority_mqueue(_) -> t_infinity_priority_mqueue(_) -> Opts = [{type, priority}, {priority, [{<<"t1">>, 10}, {<<"t2">>, 8}]}, - {max_length, infinity}, - {queue_qos0, false}], + {max_length, 0}, + {store_qos0, false}], Q = ?Q:new("infinity_priority_queue", Opts, alarm_fun()), - infinity = ?Q:max_len(Q), + 0 = ?Q:max_len(Q), Qx = lists:foldl(fun(I, AccQ) -> AccQ1 = ?Q:in(#mqtt_message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ), ?Q:in(#mqtt_message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1) end, Q, lists:seq(1, 255)), 510 = ?Q:len(Qx), - [{len, 510}, {max_len, infinity}, {dropped, 0}] = ?Q:stats(Qx). + [{len, 510}, {max_len, 0}, {dropped, 0}] = ?Q:stats(Qx). t_priority_mqueue2(_) -> Opts = [{type, priority}, {max_length, 2}, {low_watermark, 0.2}, {high_watermark, 0.6}, - {queue_qos0, false}], + {store_qos0, false}], Q = ?Q:new("priority_queue2_test", Opts, alarm_fun()), 2 = ?Q:max_len(Q), Q1 = ?Q:in(#mqtt_message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q), diff --git a/test/emqttd_trie_SUITE.erl b/test/emqttd_trie_SUITE.erl index 2394a902a..a81a132f5 100644 --- a/test/emqttd_trie_SUITE.erl +++ b/test/emqttd_trie_SUITE.erl @@ -28,14 +28,14 @@ all() -> [t_insert, t_match, t_match2, t_match3, t_delete, t_delete2, t_delete3]. init_per_suite(Config) -> - emqttd_mnesia:ensure_started(), + ekka_mnesia:ensure_started(), ?TRIE:mnesia(boot), ?TRIE:mnesia(copy), Config. end_per_suite(_Config) -> - emqttd_mnesia:ensure_stopped(), - emqttd_mnesia:delete_schema(). + ekka_mnesia:ensure_stopped(), + ekka_mnesia:delete_schema(). init_per_testcase(_TestCase, Config) -> Config.