diff --git a/Makefile b/Makefile index 3738111b4..9270a562e 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ dep_gproc = git https://github.com/uwiger/gproc dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master dep_esockd = git https://github.com/emqtt/esockd master -dep_ekka = git https://github.com/emqtt/ekka master +dep_ekka = git https://github.com/emqtt/ekka develop dep_mochiweb = git https://github.com/emqtt/mochiweb master dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog diff --git a/etc/emq.conf b/etc/emq.conf index deb4c97a9..82209dde7 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -1,6 +1,5 @@ - ##=================================================================== -## EMQ Configuration R2.2 +## EMQ Configuration R2.3 ##=================================================================== ##-------------------------------------------------------------------- @@ -8,80 +7,73 @@ ##-------------------------------------------------------------------- ## Cluster name -cluster.name = ekka +cluster.name = emqcl -## Cluster Cookie -cluster.cookie = ekkaclustercookie - -## Cluster Discovery: static | epmd | multicast | gossip | etcd | consul -cluster.discovery = static +## Cluster discovery strategy: manual | static | mcast | dns | etcd | k8s +cluster.discovery = manual ## Cluster Autoheal: on | off cluster.autoheal = on ## Clean down node of the cluster -cluster.clean_down = 1h +cluster.autoclean = 5m ##-------------------------------------------------------------------- -## Cluster with epmd +## Cluster with static node list -cluster.epmd.seeds = a@127.0.0.1,b@127.0.0.1 +## cluster.static.seeds = emq1@127.0.0.1,emq2@127.0.0.1 ##-------------------------------------------------------------------- ## Cluster with multicast -## 1 second -cluster.mcast.period = 1s +## cluster.mcast.addr = 239.192.0.1 -cluster.mcast.addr = 239.192.0.1:4369 +## cluster.mcast.ports = 4369,4370 -cluster.mcast.iface = 0.0.0.0 +## cluster.mcast.iface = 0.0.0.0 -cluster.mcast.ttl = 1 +## cluster.mcast.ttl = 255 -cluster.mcast.loop = on +## cluster.mcast.loop = on ##-------------------------------------------------------------------- -## Cluster with Gossip +## Cluster with DNS -cluster.gossip.seeds = 127.0.0.1:4369 +## cluster.dns.name = localhost -cluster.gossip.protocol_period = 1s - -cluster.gossip.suspicion_factor = 3 +## cluster.dns.app = emq ##-------------------------------------------------------------------- ## Cluster with Etcd -cluster.etcd.addr = 127.0.0.1:2367 +## cluster.etcd.server = http://127.0.0.1:2379 -cluster.etcd.prefix = emq +## cluster.etcd.prefix = emqcl -cluster.etcd.node_ttl = 30m +## cluster.etcd.node_ttl = 1m ##-------------------------------------------------------------------- -## Cluster by Consul +## Cluster with k8s -cluster.consul.addr = 127.0.0.1:8500 +## cluster.k8s.apiserver = http://10.110.111.204:8080 -cluster.consul.acl_token = example-acl-token +## cluster.k8s.service_name = emq -##-------------------------------------------------------------------- -## Discover by Kubernetes +## Address Type: ip | dns +## cluster.k8s.address_type = ip -## cluster.k8s.selector = app=emq - -## cluster.k8s.node_basename = emq +## The Erlang application name +## cluster.k8s.app_name = emq ##-------------------------------------------------------------------- ## Node Args ##-------------------------------------------------------------------- ## Node name -node.name = emqttd@127.0.0.1 +node.name = emq@127.0.0.1 ## Cookie for distributed node -## node.cookie = emqsecretcookie +node.cookie = emqsecretcookie ## SMP support: enable, auto, disable node.smp = auto @@ -121,7 +113,7 @@ node.dist_net_ticktime = 60 ## Distributed node port range node.dist_listen_min = 6369 -node.dist_listen_max = 6369 +node.dist_listen_max = 6379 ##-------------------------------------------------------------------- ## Log diff --git a/priv/emq.schema b/priv/emq.schema index 5ad6501d8..f4c369b9c 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -1,5 +1,5 @@ %%-*- mode: erlang -*- -%% EMQ config mapping +%% EMQ R2.3 config mapping %%-------------------------------------------------------------------- %% Cluster @@ -11,47 +11,41 @@ {datatype, atom} ]}. -%% @doc Secret cookie for the cluster -{mapping, "cluster.cookie", "vm_args.-setcookie", [ - {default, "emqclustercookie"} -]}. - %% @doc Cluster discovery {mapping, "cluster.discovery", "ekka.cluster_discovery", [ {default, manual}, {datatype, atom} ]}. +%% @doc Clean down node from the cluster +{mapping, "cluster.autoclean", "ekka.cluster_autoclean", [ + {datatype, {duration, ms}} +]}. + %% @doc Cluster autoheal {mapping, "cluster.autoheal", "ekka.cluster_autoheal", [ {datatype, flag}, - {default, on} -]}. - - -%% @doc Clean down node from the cluster -{mapping, "cluster.clean_down", "ekka.cluster_clean_down", [ - {datatype, {duration, ms}}, - {default, "1h"} + {default, off} ]}. %%-------------------------------------------------------------------- -%% Cluster with epmd +%% Cluster by static node list -{mapping, "cluster.epmd.seeds", "ekka.cluster_discovery", [ +{mapping, "cluster.static.seeds", "ekka.cluster_discovery", [ {datatype, string} ]}. %%-------------------------------------------------------------------- -%% Cluster with IP Multicast +%% Cluster by UDP Multicast {mapping, "cluster.mcast.addr", "ekka.cluster_discovery", [ - {datatype, ip} + {default, "239.192.0.1"}, + {datatype, string} ]}. -{mapping, "cluster.mcast.period", "ekka.cluster_discovery", [ - {datatype, {duration, ms}}, - {default, "1s"} +{mapping, "cluster.mcast.ports", "ekka.cluster_discovery", [ + {default, "4369"}, + {datatype, string} ]}. {mapping, "cluster.mcast.iface", "ekka.cluster_discovery", [ @@ -61,7 +55,7 @@ {mapping, "cluster.mcast.ttl", "ekka.cluster_discovery", [ {datatype, integer}, - {default, 1} + {default, 255} ]}. {mapping, "cluster.mcast.loop", "ekka.cluster_discovery", [ @@ -84,24 +78,21 @@ {default, "32KB"} ]}. -{mapping, "cluster.gossip.seeds", "ekka.cluster_discovery", [ +%%-------------------------------------------------------------------- +%% Cluster by DNS A Record + +{mapping, "cluster.dns.name", "ekka.cluster_discovery", [ {datatype, string} ]}. -{mapping, "cluster.gossip.protocol_period", "ekka.cluster_discovery", [ - {datatype, {duration, ms}}, - {default, "1s"} -]}. - -{mapping, "cluster.gossip.suspicion_factor", "ekka.cluster_discovery", [ - {datatype, integer}, - {default, 3} +{mapping, "cluster.dns.app", "ekka.cluster_discovery", [ + {datatype, string} ]}. %%-------------------------------------------------------------------- -%% Cluster with Etcd +%% Cluster using etcd -{mapping, "cluster.etcd.addr", "ekka.cluster_discovery", [ +{mapping, "cluster.etcd.server", "ekka.cluster_discovery", [ {datatype, string} ]}. @@ -115,13 +106,21 @@ ]}. %%-------------------------------------------------------------------- -%% Cluster with Consul +%% Cluster on K8s -{mapping, "cluster.consul.addr", "ekka.cluster_discovery", [ - {datatype, ip} +{mapping, "cluster.k8s.apiserver", "ekka.cluster_discovery", [ + {datatype, string} ]}. -{mapping, "cluster.consul.acl_token", "ekka.cluster_discovery", [ +{mapping, "cluster.k8s.service_name", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.k8s.address_type", "ekka.cluster_discovery", [ + {datatype, {enum, [ip, dns]}} +]}. + +{mapping, "cluster.k8s.app_name", "ekka.cluster_discovery", [ {datatype, string} ]}. @@ -134,39 +133,32 @@ {Ip, Port} end, Options = fun(static) -> - [{seeds, cuttlefish:conf_get("cluster.epmd.seeds", Conf)}]; + [{seeds, [list_to_atom(S) || S <- string:tokens(cuttlefish:conf_get("cluster.static.seeds", Conf, ""), ",")]}]; (mcast) -> - {Addr, Port} = cuttlefish:conf_get("cluster.mcast.addr", Conf), - {ok, Ip} = inet:parse_address(Addr), + {ok, Addr} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.addr", Conf)), {ok, Iface} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.iface", Conf)), - [{addr, Ip}, {port, Port}, {iface, Iface}, - {period, cuttlefish:conf_get("cluster.mcast.period", Conf)}, + Ports = [list_to_integer(S) || S <- string:tokens(cuttlefish:conf_get("cluster.mcast.ports", Conf), ",")], + [{addr, Addr}, {ports, Ports}, {iface, Iface}, {ttl, cuttlefish:conf_get("cluster.mcast.ttl", Conf, 1)}, {loop, cuttlefish:conf_get("cluster.mcast.loop", Conf, true)}]; + (dns) -> + [{name, cuttlefish:conf_get("cluster.dns.name", Conf)}, + {app, cuttlefish:conf_get("cluster.dns.app", Conf)}]; (etcd) -> - [{seeds, cuttlefish:conf_get("cluster.epmd.seeds", Conf)}, - {clean_down, cuttlefish:conf_get("cluster.epmd.clean_down", Conf, undefined)}]; - (gossip) -> - [{seeds, [IpPort(S) || S <- string:tokens(",", cuttlefish:conf_get("cluster.gossip.seeds", Conf))]}, - {protocol_period, cuttlefish:conf_get("cluster.gossip.protocol_period", Conf)}, - {suspicion_factor, cuttlefish:conf_get("cluster.gossip.suspicion_factor", Conf, 3)}]; - (etcd) -> - [{addr, cuttlefish:conf_get("cluster.etcd.addr", Conf)}, - {prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emq")}, + [{server, string:tokens(cuttlefish:conf_get("cluster.etcd.server", Conf), ",")}, + {prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emqcl")}, {node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}]; - (consul) -> - [{addr, cuttlefish:conf_get("cluster.consul.addr", Conf)}, - {acl_token, cuttlefish:conf_get("cluster.consul.acl_token", Conf)}]; (k8s) -> - [{host, cuttlefish:conf_get("cluster.k8s.selector", Conf)}, - {acl_token, cuttlefish:conf_get("cluster.k8s.node_basename", Conf)}]; + [{apiserver, cuttlefish:conf_get("cluster.k8s.apiserver", Conf)}, + {service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)}, + {address_type, cuttlefish:conf_get("cluster.k8s.address_type", Conf, ip)}, + {app_name, cuttlefish:conf_get("cluster.k8s.app_name", Conf)}]; (manual) -> [ ] end, {Strategy, Filter(Options(Strategy))} end}. - %%-------------------------------------------------------------------- %% Erlang Node %%-------------------------------------------------------------------- @@ -177,9 +169,9 @@ end}. ]}. %% @doc Secret cookie for distributed erlang node -%% {mapping, "node.cookie", "vm_args.-setcookie", [ -%% {default, "emqsecretcookie"} -%%]}. +{mapping, "node.cookie", "vm_args.-setcookie", [ + {default, "emqsecretcookie"} +]}. %% @doc SMP Support {mapping, "node.smp", "vm_args.-smp", [ diff --git a/rebar.config b/rebar.config index 0ce2b3602..834382f05 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","master"}},{ekka,".*",{git,"https://github.com/emqtt/ekka","master"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","master"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{bcrypt,".*",{git,"https://github.com/smarkets/erlang-bcrypt","master"}} +{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","master"}},{ekka,".*",{git,"https://github.com/emqtt/ekka","develop"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","master"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{bcrypt,".*",{git,"https://github.com/smarkets/erlang-bcrypt","master"}} ]}. {erl_opts, [debug_info,{parse_transform,lager_transform}]}. diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 3a7ed3482..0b85fe0f0 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,6 +1,6 @@ {application,emqttd, [{description,"Erlang MQTT Broker"}, - {vsn,"2.2"}, + {vsn,"2.3"}, {modules,[]}, {registered,[emqttd_sup]}, {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb, diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index acca547be..b85834722 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -44,10 +44,7 @@ start(_Type, _Args) -> start_servers(Sup), emqttd_cli:load(), register_acl_mod(), - emqttd_plugins:init(), - emqttd_plugins:load(), - init_cluster(), - start_listeners(), + start_autocluster(), register(emqttd, self()), print_vsn(), {ok, Sup}. @@ -148,12 +145,18 @@ register_acl_mod() -> end. %%-------------------------------------------------------------------- -%% Init Cluster +%% Autocluster %%-------------------------------------------------------------------- -init_cluster() -> +start_autocluster() -> ekka:callback(prepare, fun emqttd:shutdown/1), - ekka:callback(reboot, fun emqttd:reboot/0). + ekka:callback(reboot, fun emqttd:reboot/0), + ekka:autocluster(fun after_autocluster/0). + +after_autocluster() -> + emqttd_plugins:init(), + emqttd_plugins:load(), + start_listeners(). %%-------------------------------------------------------------------- %% Start Listeners diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 9efe8b612..b3dd8b4ad 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -48,6 +48,8 @@ -define(ROUTER, ?MODULE). +-define(LOCK, {?ROUTER, clean_routes}). + %%-------------------------------------------------------------------- %% Mnesia Bootstrap %%-------------------------------------------------------------------- @@ -218,7 +220,7 @@ stop() -> gen_server:call(?ROUTER, stop). init([]) -> ekka:monitor(membership), ets:new(mqtt_local_route, [set, named_table, protected]), - {ok, TRef} = timer:send_interval(timer:seconds(1), stats), + {ok, TRef} = timer:send_interval(timer:seconds(1), stats), {ok, #state{stats_timer = TRef}}. handle_call(stop, _From, State) -> @@ -231,7 +233,7 @@ handle_cast({add_local_route, Topic}, State) -> %% why node()...? ets:insert(mqtt_local_route, {Topic, node()}), {noreply, State}; - + handle_cast({del_local_route, Topic}, State) -> ets:delete(mqtt_local_route, Topic), {noreply, State}; @@ -240,8 +242,11 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({membership, {mnesia, down, Node}}, State) -> - clean_routes_(Node), - update_stats_(), + global:trans({?LOCK, self()}, + fun() -> + clean_routes_(Node), + update_stats_() + end), {noreply, State, hibernate}; handle_info({membership, _Event}, State) -> diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index 2081ea9e5..2e8e9a749 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -36,6 +36,8 @@ -record(state, {stats_fun, ticker}). +-define(LOCK, {?MODULE, clean_sessions}). + %% @doc Start a session helper -spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}). start_link(StatsFun) -> @@ -59,8 +61,8 @@ handle_info({membership, {mnesia, down, Node}}, State) -> [{'==', {node, '$2'}, Node}], ['$1']}]), lists:foreach(fun(ClientId) -> mnesia:delete({mqtt_session, ClientId}) end, ClientIds) end, - mnesia:async_dirty(Fun), - {noreply, State}; + global:trans({?LOCK, self()}, fun() -> mnesia:async_dirty(Fun) end), + {noreply, State, hibernate}; handle_info({membership, _Event}, State) -> {noreply, State}; diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 3b1833cb2..160e023f2 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -24,6 +24,8 @@ -include_lib("common_test/include/ct.hrl"). +-define(APP, emqttd). + -define(CONTENT_TYPE, "application/x-www-form-urlencoded"). -define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"}, @@ -52,8 +54,9 @@ all() -> groups() -> [{protocol, [sequence], [mqtt_connect, - mqtt_ssl_oneway, - mqtt_ssl_twoway]}, + mqtt_ssl_twoway, + mqtt_ssl_oneway + ]}, {pubsub, [sequence], [subscribe_unsubscribe, publish, pubsub, @@ -100,24 +103,17 @@ groups() -> ]}, cli_vm]}, {cleanSession, [sequence], - [cleanSession_validate, - cleanSession_validate1 + [cleanSession_validate ]}]. init_per_suite(Config) -> - application:start(lager), - DataDir = proplists:get_value(data_dir, Config), - NewConfig = emqttd_config(DataDir), - Vals = change_opts(ssl_oneway, DataDir, proplists:get_value(emqttd, NewConfig)), - [application:set_env(emqttd, Par, Value) || {Par, Value} <- Vals], - application:ensure_all_started(emqttd), - [{config, NewConfig} | Config]. + NewConfig = generate_config(), + lists:foreach(fun set_app_env/1, NewConfig), + application:ensure_all_started(?APP), + Config. end_per_suite(_Config) -> - application:stop(emqttd), - application:stop(esockd), - application:stop(gproc), - emqttd_mnesia:ensure_stopped(). + emqttd:shutdown(). %%-------------------------------------------------------------------- %% Protocol Test @@ -138,31 +134,32 @@ connect_broker_(Packet, RecvSize) -> Data. mqtt_ssl_oneway(_) -> + emqttd:stop(), + change_opts(ssl_oneway), + emqttd:start(), {ok, SslOneWay} = emqttc:start_link([{host, "localhost"}, {port, 8883}, {client_id, <<"ssloneway">>}, ssl]), - timer:sleep(10), + timer:sleep(100), emqttc:subscribe(SslOneWay, <<"topic">>, qos1), {ok, Pub} = emqttc:start_link([{host, "localhost"}, {client_id, <<"pub">>}]), emqttc:publish(Pub, <<"topic">>, <<"SSL oneWay test">>, [{qos, 1}]), - timer:sleep(10), + timer:sleep(100), receive {publish, _Topic, RM} -> ?assertEqual(<<"SSL oneWay test">>, RM) after 1000 -> false end, + timer:sleep(100), emqttc:disconnect(SslOneWay), emqttc:disconnect(Pub). -mqtt_ssl_twoway(Config) -> - emqttd_cluster:prepare(), - DataDir = proplists:get_value(data_dir, Config), - EmqConfig = proplists:get_value(config, Config), - Vals = change_opts(ssl_twoway, DataDir, proplists:get_value(emqttd, EmqConfig)), - [application:set_env(emqttd, Par, Value) || {Par, Value} <- Vals], - emqttd_cluster:reboot(), - ClientSSl = [{Key, filename:join([DataDir, File])} || - {Key, File} <- ?MQTT_SSL_CLIENT ], +mqtt_ssl_twoway(_) -> + emqttd:stop(), + change_opts(ssl_twoway), + emqttd:start(), + ClientSSl = [{Key, local_path(["etc", File])} || + {Key, File} <- ?MQTT_SSL_CLIENT], {ok, SslTwoWay} = emqttc:start_link([{host, "localhost"}, {port, 8883}, {client_id, <<"ssltwoway">>}, @@ -418,7 +415,7 @@ hook_fun8(arg, initArg) -> stop. request_status(_) -> {InternalStatus, _ProvidedStatus} = init:get_status(), AppStatus = - case lists:keysearch(emqttd, 1, application:which_applications()) of + case lists:keysearch(?APP, 1, application:which_applications()) of false -> not_running; {value, _Val} -> running end, @@ -596,59 +593,56 @@ cleanSession_validate(_) -> emqttc:disconnect(Pub), emqttc:disconnect(C11). -cleanSession_validate1(_) -> - {ok, C1} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, true}]), - timer:sleep(10), - emqttc:subscribe(C1, <<"topic">>, qos1), - emqttc:disconnect(C1), - {ok, Pub} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"pub">>}]), - - emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 1}]), - timer:sleep(10), - {ok, C11} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, false}]), - timer:sleep(100), - Metrics = emqttd_metrics:all(), - ?assertEqual(0, proplists:get_value('messages/qos1/sent', Metrics)), - ?assertEqual(1, proplists:get_value('messages/qos1/received', Metrics)), - emqttc:disconnect(Pub), - emqttc:disconnect(C11). - -emqttd_config(DataDir) -> - Schema = cuttlefish_schema:files([filename:join([DataDir, "emqttd.schema"])]), - Conf = conf_parse:file(filename:join([DataDir, "emqttd.conf"])), - cuttlefish_generator:map(Schema, Conf). - -change_opts(SslType, DataDir, Vals) -> - Listeners = proplists:get_value(listeners, Vals), +change_opts(SslType) -> + {ok, Listeners} = application:get_env(?APP, listeners), NewListeners = lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) -> case Protocol of ssl -> SslOpts = proplists:get_value(sslopts, Opts), - Keyfile = filename:join([DataDir, proplists:get_value(keyfile, SslOpts)]), - Certfile = filename:join([DataDir, proplists:get_value(certfile, SslOpts)]), + Keyfile = local_path(["etc/certs", "key.pem"]), + Certfile = local_path(["etc/certs", "cert.pem"]), TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}), TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}), TupleList3 = case SslType of ssl_twoway-> - CAfile = filename:join([DataDir, proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]), + CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]), MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}), lists:merge(TupleList2, MutSslList); _ -> - TupleList2 + lists:filter(fun ({cacertfile, _}) -> false; + ({verify, _}) -> false; + ({fail_if_no_peer_cert, _}) -> false; + (_) -> true + end, TupleList2) end, - [{Protocol, Port, [{ssl, TupleList3}]} | Acc]; + [{Protocol, Port, lists:keyreplace(sslopts, 1, Opts, {sslopts, TupleList3})} | Acc]; _ -> [Listener | Acc] end end, [], Listeners), - lists:keyreplace(listeners, 1, Vals, {listeners, NewListeners}). + application:set_env(?APP, listeners, NewListeners). + +generate_config() -> + Schema = cuttlefish_schema:files([local_path(["priv", "emq.schema"])]), + Conf = conf_parse:file([local_path(["etc", "emq.conf"])]), + cuttlefish_generator:map(Schema, Conf). + +get_base_dir(Module) -> + {file, Here} = code:is_loaded(Module), + filename:dirname(filename:dirname(Here)). + +get_base_dir() -> + get_base_dir(?MODULE). + +local_path(Components, Module) -> + filename:join([get_base_dir(Module) | Components]). + +local_path(Components) -> + local_path(Components, ?MODULE). + +set_app_env({App, Lists}) -> + lists:foreach(fun({Par, Var}) -> + application:set_env(App, Par, Var) + end, Lists). diff --git a/test/emqttd_SUITE_data/certs/cacert.pem b/test/emqttd_SUITE_data/certs/cacert.pem deleted file mode 100644 index ca4948ed9..000000000 --- a/test/emqttd_SUITE_data/certs/cacert.pem +++ /dev/null @@ -1,17 +0,0 @@ ------BEGIN CERTIFICATE----- -MIICxjCCAa6gAwIBAgIJAPhU8tv3KMe/MA0GCSqGSIb3DQEBCwUAMBMxETAPBgNV -BAMMCE15VGVzdENBMB4XDTE2MTAzMTA3MTU0NVoXDTE3MTAzMTA3MTU0NVowEzER -MA8GA1UEAwwITXlUZXN0Q0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB -AQCtPcDnmjiVl7ScDhYvGaW+PUgfp7P5cM39mnrW6fkxhA0tgunWpWlYVKbcuh5y -4bTNYrOQpcFO3Zg62tva4XEL8O1huqTlGsAeysZ3vWE4/8NGN/3wZy0TKDvwiwOB -tbS3C5wcRQZohExL6yEL4XzDGk44x2mIs8/NzeG7Zycqybh9tsCJiHbLiTxnLa24 -v5USOtlvWye0hA0yUUqc2k7tKVmIMT4A4ulMb2sDVRrSLjyFDTI0c8grlPLfKbG8 -gpYLsHn9aAjqviyvmJdRLxwauqn+ghNWn1TyZwgAUxpoTtWeC0ilzEt18RP8vZjm -eCbEP4qQDDvSCdLrie5CezyxAgMBAAGjHTAbMAwGA1UdEwQFMAMBAf8wCwYDVR0P -BAQDAgEGMA0GCSqGSIb3DQEBCwUAA4IBAQBJ/I/QJjU+mgkIaaHImFcIYFrfBirC -vDiWo2W+zRh7CbcSf+jsksI99d230ixSDY36CPLKZeZhELST7xWKEELKbPdNbtOO -EM10+XteLSXKVNGXfrEbW973eum3FGLobMA9OcH6+qDaf08pibe7kuv10aAgSs/I -0Qg5H/UTAKQJKO9hhOgERM/FettuF+WGJaaZZZb9Y2YYBNRf/GtM8KHCjpCX9+XD -kdeQGO8Hn10H9tOmggyfdIpsunBcs2/6/exCp8RPBWurN2GSW2RcnS5xVL0r+SVW -VOhSDy1JwnNPczpqkqE74qAbAah0dTJFcFWzeGLVk7Kp+2pissAiU3gg ------END CERTIFICATE----- diff --git a/test/emqttd_SUITE_data/certs/cert.pem b/test/emqttd_SUITE_data/certs/cert.pem deleted file mode 100644 index 58aa2c4ef..000000000 --- a/test/emqttd_SUITE_data/certs/cert.pem +++ /dev/null @@ -1,18 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIC9jCCAd6gAwIBAgIBATANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl -c3RDQTAeFw0xNjEwMzEwNzE1NDVaFw0xNzEwMzEwNzE1NDVaMDkxJjAkBgNVBAMT -HWRlbmdoYWlndWlkZU1hY0Jvb2stQWlyLmxvY2FsMQ8wDQYDVQQKEwZzZXJ2ZXIw -ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4Ena4vgWrzwUB0hGW1v0v -K986FhU5ZdYz5H5MGonfWwv89nR2DlftSDXEvKFyc2MT81GGm16VJv3mVpQJLuKA -xLBLY7a1zSrJdugXWy+mgJJTPW6KjTY4jPtfCl6x/yVr8YclVa8XO0JFzOme2LMV -Ylc/ixVEa66UpxRNrg5yWHS26KcB1lE3GLERoRBKF7nsyGqGY4X9TypBwglCVoqK -3dKVGwCvFur+oPnt/C5pwR6UmUV/Ppf1EaRD7Po+xcyJSeCvszG3FH4iHsDHnjLe -DR6lxouvMCb+aKJi9d0xowOjhbKoFMF179t4SVnptQeq+U6ui3cPKUjia7Zh1tZT -AgMBAAGjLzAtMAkGA1UdEwQCMAAwCwYDVR0PBAQDAgUgMBMGA1UdJQQMMAoGCCsG -AQUFBwMBMA0GCSqGSIb3DQEBCwUAA4IBAQB2jlDPiZfP/whsvvFn43g37QMwX5ST -Z5OpmEFnFjAH3ec0PPqPrKYEu00q5wEC+8L6uVH8FHOFf11JLH4wl11/C/mvE92D -qZtGG8KCnG2+rk5OJPGX+28Z+OnCZlXOjQ8qd2x5KtIW50JuXJ3cbDRHtF/TVanm -Exu+TCBeToNwbcU2sfQnbljkUTj4idUFz0pq3uvw3dA4R1J2foungPAYXSWcVhtb -RYtG8epIvkAyyUE5nY3kC05AUml6gSZkrJiYM5I1IJTX1lQ7Pv2yxRBZUtTx33rP -ccnsW6tbHTDBG8UDHx4LKHErdWFgCJWI81EUEcTip9g2zCOGTWKnpz+z ------END CERTIFICATE----- diff --git a/test/emqttd_SUITE_data/certs/client-cert.pem b/test/emqttd_SUITE_data/certs/client-cert.pem deleted file mode 100644 index e1690d9aa..000000000 --- a/test/emqttd_SUITE_data/certs/client-cert.pem +++ /dev/null @@ -1,18 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIC9jCCAd6gAwIBAgIBAjANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl -c3RDQTAeFw0xNjEwMzEwNzE1NDZaFw0xNzEwMzEwNzE1NDZaMDkxJjAkBgNVBAMT -HWRlbmdoYWlndWlkZU1hY0Jvb2stQWlyLmxvY2FsMQ8wDQYDVQQKEwZjbGllbnQw -ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCmPMkieMtJO4PGIQG30uxI -SEoRJoF2w0ufFhZGYCEaqFlHaSoc6nTiCUmnxadDpjkNBs4R6RDfM9zPJ0QdgSFO -OJsWgQEHym/EQTcEx11+/2NDZWMJyZdpWZlU57SwHfWDwYa2XFX1bV+pAvhB8cli -wCkygTwp1cZcwQpb8TfZySy8r5mwrWq2nhCQPtYqMxjNjpR/UeeZzt+Uh3CEXQ8h -omjGinDXnnGwrYwBEP9G6fzTvyCWTyrsWC1Q37oAMzbkwFRoIBSAQWXBv9hgI08s -IBYvXnRGKWOJZGxAP4a4TvpFS+nqi+fFVn4ktUfcH3PoSMh7PKavrFT2hQaryLt1 -AgMBAAGjLzAtMAkGA1UdEwQCMAAwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsG -AQUFBwMCMA0GCSqGSIb3DQEBCwUAA4IBAQAeimI8AQBFWiE9/Nf/0radux355mod -5vPLbKn6I6nzb/sS/Ug8SMoFnkhncwj+XOgTSliUyWcwOB11UDVJbUIkB/x+Qo3w -hvrATTdby2WdFNQvH4X7PmP8asDDN7ZxoLyRmuhjL4avJ3giwRcuQK4cB35b+Lb2 -p1e7hW81RaV7OEc0o4/vJgPvv9N7wvUuipwJns6PrN7VDn99lT8zWrt2pQ06e2mk -jDuXulVpiUtLHJhTnABkCaKiHWCYAFfMjFeRb3gUXKqShzOyDSGWY91YMID/HE4r -sVLm2mD1zurue8EmYtQQ6uiJIW9SzvshMHG6EA5QWA1ytoalfePbvf+c ------END CERTIFICATE----- diff --git a/test/emqttd_SUITE_data/certs/client-key.pem b/test/emqttd_SUITE_data/certs/client-key.pem deleted file mode 100644 index 74cf487cb..000000000 --- a/test/emqttd_SUITE_data/certs/client-key.pem +++ /dev/null @@ -1,27 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEpQIBAAKCAQEApjzJInjLSTuDxiEBt9LsSEhKESaBdsNLnxYWRmAhGqhZR2kq -HOp04glJp8WnQ6Y5DQbOEekQ3zPczydEHYEhTjibFoEBB8pvxEE3BMddfv9jQ2Vj -CcmXaVmZVOe0sB31g8GGtlxV9W1fqQL4QfHJYsApMoE8KdXGXMEKW/E32cksvK+Z -sK1qtp4QkD7WKjMYzY6Uf1Hnmc7flIdwhF0PIaJoxopw155xsK2MARD/Run8078g -lk8q7FgtUN+6ADM25MBUaCAUgEFlwb/YYCNPLCAWL150RiljiWRsQD+GuE76RUvp -6ovnxVZ+JLVH3B9z6EjIezymr6xU9oUGq8i7dQIDAQABAoIBAFkHEMjPXD96ChZf -suXZpgUIAfKxZoBOEv+9+mvyK4h1RGsEHTOjNLmhM7sQFYYbTU52qIHbCdgflE+0 -vbv3XfjgQ96HdB/SAI1gR7DdfGr5JxX/BE1HkzkubPmVpaT0RnoreJPNW5O24ZZI -KuBWNv4V33pWz/uvqy4djAi1ZK3TPDhn9cVCMwV/ISCPlofrNDB/4ZNOMeaQgiR+ -sGqv+Q0ok2ao7Y04QHPh5i+5o+5oBoiJAO/49q9uPdpO181/8H71jll0QL+h5Off -nyWkAAOcgEeX9T4ZnfTUivGdSwB/Y+LS97Ozdr6kp5Fdk8WdDn0DL4fHRrnJ4IJD -EIAn/sECgYEA2oOCRBMccr49wbu+cKlkICt/4ARzJWKysdLlK0tYQknkDK1bzoHO -9JerRJL4E9bKp8zNlobfP1hWV0TFpwYsK3RvZoLvCwaSHeqUCZ4wQvKrWP1FieJ2 -5kjO5iMvXiy/kNHdTEXsj0x6RKuUSVgzNIuILvCCQ9Z7JVa/3NWS1SkCgYEAwsF0 -TWxCjryQv8y4mFSUlyF+y+ntnWAvpe/1Wv3+dNdhsccUfcq3zPMuLEj5DEoIvlTy -jLkFLVJ468Ou7S1oSVetVT3wWoLP2eFDEU/sYjjPdf4IMSO1jWIPLC3WV7zsFb62 -jwG2en1qfz8AxrVl+zj4lWCbgA9Soi41NMiCUW0CgYEAokQEST8T4hVp0OL1Qb5Y -bxc+Z4GGbF3Fqw2cRrE1wkwSwGNACLMWl0XF1i95b2oSpdcNWFmhkO2teDLGwAhy -ZnaZfzt9/ecMPJEFC7tfxWdlXLj/mawFdW7dzcKVG08JlqZxuoE2cRduuG3duTV5 -GO0A3TKW2X99hTXNVlV3KzkCgYEAsaE8cHkzY3h9FVKlctqCBC3atiWQQZ+/Fbv8 -rpdHBE6Fnl4TRIAmj9mk3WNZM2o6+04DQ3JlVGcKPw7ldxGZMnuzbjHmDMeOyAx6 -3UlmMlfacKXX1unY5zDu4b6U5sU7FsIxQ9GuG55UCebu0E4Wy8G0iJnqeix/k8hN -Yu0WXykCgYEAo0kIm7sh9j0+r419Lo2kT4zlzFlNdJEa4+lFVISRqouDuhUO8VFE -/ZpGRcqIM7dH6iBM2Htasf7l/hyWKzDEvWCEpa4icicFYAJ92AgK7UBWbNbhueof -PyVx5G2o7amvyZNtJYUo4TpJ9eH5YbsBRBqWCJcBUAfrItrprxB1LMs= ------END RSA PRIVATE KEY----- diff --git a/test/emqttd_SUITE_data/certs/key.pem b/test/emqttd_SUITE_data/certs/key.pem deleted file mode 100644 index 7001093ef..000000000 --- a/test/emqttd_SUITE_data/certs/key.pem +++ /dev/null @@ -1,27 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEpAIBAAKCAQEAuBJ2uL4Fq88FAdIRltb9LyvfOhYVOWXWM+R+TBqJ31sL/PZ0 -dg5X7Ug1xLyhcnNjE/NRhptelSb95laUCS7igMSwS2O2tc0qyXboF1svpoCSUz1u -io02OIz7Xwpesf8la/GHJVWvFztCRczpntizFWJXP4sVRGuulKcUTa4Oclh0tuin -AdZRNxixEaEQShe57MhqhmOF/U8qQcIJQlaKit3SlRsArxbq/qD57fwuacEelJlF -fz6X9RGkQ+z6PsXMiUngr7MxtxR+Ih7Ax54y3g0epcaLrzAm/miiYvXdMaMDo4Wy -qBTBde/beElZ6bUHqvlOrot3DylI4mu2YdbWUwIDAQABAoIBADXYWNhT5c7LYTiW -HcUVIL0CxWr1eMHwk0dcyME0Zi5rMMePxKOgMIJdxDTHxSZ4sHvuimOo4XMaE92k -Z+uDxohKgROcmJ735FNIsD3c08SOCb/F0adABaNnQkUcAHVrIKRB4/m85doS4KEQ -fyqTU1enC8Svx8nbAhfEBEFw8BLsZD9UnQAEAU5W9S5aKPHNrYRDz5UE0ZP28ixC -4PtCew96uCqA0u+xZnWCGawF27FD9P88pcYSJqebF1iFYkXrAwdhAbqewHOqQJXf -KJpbpjflBvZr/oTVZ3GAnnHnZDiusFmCKIHB9dKimHMdTFVIU2ikOeJZLtgXsBjb -Wn3Fa8kCgYEA2fK0t9NPmELw43D7VoCNeUmu6KmLLd7CeRiQ/OkPLKTqrudnUZGi -uMinPFijGTLX3SmByAVOkzMKBQOYF+eB1X24kbRLmL4JKzr04hSqOKqG5gJctC+x -V5qQX7ZxrNxFRiSodILbnQN/z1gwZMfrAU0t0EKIKjZR3lpj8CELv1cCgYEA2DWn -9V6PCZPcHzoFabhb8DJFglUTHk0zINVe97qldvMvn0MgsjgyS2j954nX8ef7uE1O -Cf+9nN709Fu8kEC7/KzWXxP3/O58TfJ6NivCQSr5i0OJLumQMVNrS+u/VG1PaVbS -2oCwP3QFayOxZSj9wq2MARd1JkqzHmi8skZLz2UCgYEAgtnv3En3CLBwFe14SPgH -eGFfrPpVwGV0luXD7sQyQxiEehwecN+iNZTqqxWAXpmi9np8G83r3f6PrnD4+Kka -z0Wa8Yewt3So5paP/chwZnMjaKbUZ64WqET5Fy3fU+wvfyx1IvaJydwW+TK2Y1uP -4Yknz1iSjd1tC7VzOPFuLyMCgYBrTFWKQ98glayMIrNFACVAUvKD98yBITbaeImk -z5AGNDHSC/JR/+mV2wkGuzXb65DUqiisdaqYC13tVwmBXV7tyqiojrRnZcNyu39D -GvxQcw9cuat/CJJyqD97cgeF0qmyUVBa97qAAwgdX51N4sXss0vjzsxosHGsCbZ7 -kr9UsQKBgQCMTtdCeA+uK/OeJtzf4CYZKR9xllQ+P6gCtbQ7WHuLBX/x+ZhvTC0p -qVLVWwFsJ6ivc1f74sy8hZPiePk9fqAqA1JIjDHrof0M3TxRVFvB7dej5XIYVirn -521DyZGfE+N7HA7qW5cGKZT0+UYLVp4gnv88nNKDuS18lafy8JRrfQ== ------END RSA PRIVATE KEY----- diff --git a/test/emqttd_SUITE_data/emqttd.conf b/test/emqttd_SUITE_data/emqttd.conf deleted file mode 100644 index c3694c4b9..000000000 --- a/test/emqttd_SUITE_data/emqttd.conf +++ /dev/null @@ -1,479 +0,0 @@ - -##=================================================================== -## EMQ Configuration R2.2 -##=================================================================== - -##-------------------------------------------------------------------- -## Cluster -##-------------------------------------------------------------------- - -## The cluster Id -cluster.id = emq - -## The multicast address and port. -cluster.multicast = 239.192.0.1:44369 - -##-------------------------------------------------------------------- -## Node Args -##-------------------------------------------------------------------- - -## Node name -node.name = emqttd@127.0.0.1 - -## Cookie for distributed node -node.cookie = emqsecretcookie - -## SMP support: enable, auto, disable -node.smp = auto - -## vm.args: -heart -## Heartbeat monitoring of an Erlang runtime system -## Value should be 'on' or comment the line -## node.heartbeat = on - -## Enable kernel poll -node.kernel_poll = on - -## async thread pool -node.async_threads = 32 - -## Erlang Process Limit -node.process_limit = 256000 - -## Sets the maximum number of simultaneously existing ports for this system -node.max_ports = 65536 - -## Set the distribution buffer busy limit (dist_buf_busy_limit) -node.dist_buffer_size = 32MB - -## Max ETS Tables. -## Note that mnesia and SSL will create temporary ets tables. -node.max_ets_tables = 256000 - -## Tweak GC to run more often -node.fullsweep_after = 1000 - -## Crash dump -node.crash_dump = {{ platform_log_dir }}/crash.dump - -## Distributed node ticktime -node.dist_net_ticktime = 60 - -## Distributed node port range -node.dist_listen_min = 6369 -node.dist_listen_max = 6369 - -##-------------------------------------------------------------------- -## Log -##-------------------------------------------------------------------- - -## Set the log dir -log.dir = {{ platform_log_dir }} - -## Console log. Enum: off, file, console, both -log.console = console - -## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency -log.console.level = error - -## Syslog. Enum: on, off -log.syslog = on - -## syslog level. Enum: debug, info, notice, warning, error, critical, alert, emergency -log.syslog.level = error - -## Console log file -## log.console.file = {{ platform_log_dir }}/console.log - -## Error log file -log.error.file = {{ platform_log_dir }}/error.log - -## Enable the crash log. Enum: on, off -log.crash = on - -log.crash.file = {{ platform_log_dir }}/crash.log - -##-------------------------------------------------------------------- -## Allow Anonymous and Default ACL -##-------------------------------------------------------------------- - -## Allow Anonymous authentication -mqtt.allow_anonymous = true - -## ACL nomatch -mqtt.acl_nomatch = allow - -## Default ACL File -mqtt.acl_file = {{ platform_etc_dir }}/acl.conf - -## Cache ACL for PUBLISH -mqtt.cache_acl = true - -##-------------------------------------------------------------------- -## MQTT Protocol -##-------------------------------------------------------------------- - -## Max ClientId Length Allowed. -mqtt.max_clientid_len = 1024 - -## Max Packet Size Allowed, 64K by default. -mqtt.max_packet_size = 64KB - -## Check Websocket Protocol Header. Enum: on, off -mqtt.websocket_protocol_header = on - -##-------------------------------------------------------------------- -## MQTT Connection -##-------------------------------------------------------------------- - -## Force GC: integer. Value 0 disabled the Force GC. -mqtt.conn.force_gc_count = 100 - -##-------------------------------------------------------------------- -## MQTT Client -##-------------------------------------------------------------------- - -## Client Idle Timeout (Second) -mqtt.client.idle_timeout = 30s - -## Max publish rate of Messages -## mqtt.client.max_publish_rate = 5 - -## Enable client Stats: on | off -mqtt.client.enable_stats = off - -##-------------------------------------------------------------------- -## MQTT Session -##-------------------------------------------------------------------- - -## Max Number of Subscriptions, 0 means no limit. -mqtt.session.max_subscriptions = 0 - -## Upgrade QoS? -mqtt.session.upgrade_qos = off - -## Max Size of the Inflight Window for QoS1 and QoS2 messages -## 0 means no limit -mqtt.session.max_inflight = 32 - -## Retry Interval for redelivering QoS1/2 messages. -mqtt.session.retry_interval = 20s - -## Client -> Broker: Max Packets Awaiting PUBREL, 0 means no limit -mqtt.session.max_awaiting_rel = 100 - -## Awaiting PUBREL Timeout -mqtt.session.await_rel_timeout = 20s - -## Enable Statistics: on | off -mqtt.session.enable_stats = off - -## Expired after 1 day: -## w - week -## d - day -## h - hour -## m - minute -## s - second -mqtt.session.expiry_interval = 2h - -## Ignore message from self publish -mqtt.session.ignore_loop_deliver = false - -##-------------------------------------------------------------------- -## MQTT Message Queue -##-------------------------------------------------------------------- - -## Type: simple | priority -mqtt.mqueue.type = simple - -## Topic Priority: 0~255, Default is 0 -## mqtt.mqueue.priority = topic/1=10,topic/2=8 - -## Max queue length. Enqueued messages when persistent client disconnected, -## or inflight window is full. 0 means no limit. -mqtt.mqueue.max_length = 1000 - -## Low-water mark of queued messages -mqtt.mqueue.low_watermark = 20% - -## High-water mark of queued messages -mqtt.mqueue.high_watermark = 60% - -## Queue Qos0 messages? -mqtt.mqueue.store_qos0 = true - -##-------------------------------------------------------------------- -## MQTT Broker and PubSub -##-------------------------------------------------------------------- - -## System Interval of publishing broker $SYS Messages -mqtt.broker.sys_interval = 60 - -## PubSub Pool Size. Default should be scheduler numbers. -mqtt.pubsub.pool_size = 8 - -mqtt.pubsub.by_clientid = true - -## Subscribe Asynchronously -mqtt.pubsub.async = true - -##-------------------------------------------------------------------- -## MQTT Bridge -##-------------------------------------------------------------------- - -## Bridge Queue Size -mqtt.bridge.max_queue_len = 10000 - -## Ping Interval of bridge node. Unit: Second -mqtt.bridge.ping_down_interval = 1 - -##------------------------------------------------------------------- -## MQTT Plugins -##------------------------------------------------------------------- - -## Dir of plugins' config -mqtt.plugins.etc_dir ={{ platform_etc_dir }}/plugins/ - -## File to store loaded plugin names. -mqtt.plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins - -##-------------------------------------------------------------------- -## MQTT Listeners -##-------------------------------------------------------------------- - -##-------------------------------------------------------------------- -## External TCP Listener - -## External TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 -listener.tcp.external = 0.0.0.0:1883 - -## Size of acceptor pool -listener.tcp.external.acceptors = 16 - -## Maximum number of concurrent clients -listener.tcp.external.max_clients = 102400 - -#listener.tcp.external.mountpoint = external/ - -## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -#listener.tcp.external.rate_limit = 100,10 - -#listener.tcp.external.access.1 = allow 192.168.0.0/24 - -listener.tcp.external.access.2 = allow all - -## Proxy Protocol V1/2 -## listener.tcp.external.proxy_protocol = on -## listener.tcp.external.proxy_protocol_timeout = 3s - -## TCP Socket Options -listener.tcp.external.backlog = 1024 - -#listener.tcp.external.recbuf = 4KB - -#listener.tcp.external.sndbuf = 4KB - -listener.tcp.external.buffer = 4KB - -listener.tcp.external.nodelay = true - -##-------------------------------------------------------------------- -## Internal TCP Listener - -## Internal TCP Listener: 11883, 127.0.0.1:11883, ::1:11883 -listener.tcp.internal = 127.0.0.1:11883 - -## Size of acceptor pool -listener.tcp.internal.acceptors = 16 - -## Maximum number of concurrent clients -listener.tcp.internal.max_clients = 102400 - -#listener.tcp.external.mountpoint = internal/ - -## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## listener.tcp.internal.rate_limit = 1000,100 - -## TCP Socket Options -listener.tcp.internal.backlog = 512 - -listener.tcp.internal.tune_buffer = on - -listener.tcp.internal.buffer = 1MB - -listener.tcp.internal.recbuf = 4KB - -listener.tcp.internal.sndbuf = 1MB - -listener.tcp.internal.nodelay = true - -##-------------------------------------------------------------------- -## External SSL Listener - -## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883 -listener.ssl.external = 8883 - -## Size of acceptor pool -listener.ssl.external.acceptors = 16 - -## Maximum number of concurrent clients -listener.ssl.external.max_clients = 1024 - -## listener.ssl.external.mountpoint = inbound/ - -## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## listener.ssl.external.rate_limit = 100,10 - -## Proxy Protocol V1/2 -## listener.ssl.external.proxy_protocol = on -## listener.ssl.external.proxy_protocol_timeout = 3s - -listener.ssl.external.access.1 = allow all - -### SSL Options. See http://erlang.org/doc/man/ssl.html - -## Configuring SSL Options. See http://erlang.org/doc/man/ssl.html -### TLS only for POODLE attack -## listener.ssl.external.tls_versions = tlsv1.2,tlsv1.1,tlsv1 - -### The Ephemeral Diffie-Helman key exchange is a very effective way of -### ensuring Forward Secrecy by exchanging a set of keys that never hit -### the wire. Since the DH key is effectively signed by the private key, -### it needs to be at least as strong as the private key. In addition, -### the default DH groups that most of the OpenSSL installations have -### are only a handful (since they are distributed with the OpenSSL -### package that has been built for the operating system it’s running on) -### and hence predictable (not to mention, 1024 bits only). - -### In order to escape this situation, first we need to generate a fresh, -### strong DH group, store it in a file and then use the option above, -### to force our SSL application to use the new DH group. Fortunately, -### OpenSSL provides us with a tool to do that. Simply run: -### openssl dhparam -out dh-params.pem 2048 - -listener.ssl.external.handshake_timeout = 15s - -listener.ssl.external.keyfile = certs/key.pem - -listener.ssl.external.certfile = certs/cert.pem - -## listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem - -## listener.ssl.external.dhfile = {{ platform_etc_dir }}/certs/dh-params.pem - -## listener.ssl.external.verify = verify_peer - -## listener.ssl.external.fail_if_no_peer_cert = true - -### This is the single most important configuration option of an Erlang SSL application. -### Ciphers (and their ordering) define the way the client and server encrypt information -### over the wire, from the initial Diffie-Helman key exchange, the session key encryption -### algorithm and the message digest algorithm. Selecting a good cipher suite is critical -### for the application’s data security, confidentiality and performance. -### The cipher list above offers: -### -### A good balance between compatibility with older browsers. It can get stricter for Machine-To-Machine scenarios. -### Perfect Forward Secrecy. -### No old/insecure encryption and HMAC algorithms -### -### Most of it was copied from Mozilla’s Server Side TLS article -## listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA - -### SSL parameter renegotiation is a feature that allows a client and -### a server to renegotiate the parameters of the SSL connection on the fly. -### RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation, -### you drop support for the insecure renegotiation, prone to MitM attacks. -## listener.ssl.external.secure_renegotiate = off - -### A performance optimization setting, it allows clients to reuse -### pre-existing sessions, instead of initializing new ones. -### Read more about it here. -## listener.ssl.external.reuse_sessions = on - -### An important security setting, it forces the cipher to be set based on -### the server-specified order instead of the client-specified order, -### hence enforcing the (usually more properly configured) security -### ordering of the server administrator. -## listener.ssl.external.honor_cipher_order = on - -### Use the CN or DN value from the client certificate as a username. -### Notice: 'verify' should be configured as 'verify_peer' -## listener.ssl.external.peer_cert_as_username = cn - -##-------------------------------------------------------------------- -## External MQTT/WebSocket Listener - -listener.ws.external = 8083 - -listener.ws.external.acceptors = 4 - -listener.ws.external.max_clients = 64 - -listener.ws.external.access.1 = allow all - -## TCP Options -listener.ws.external.backlog = 1024 - -listener.ws.external.recbuf = 4KB - -listener.ws.external.sndbuf = 4KB - -listener.ws.external.buffer = 4KB - -listener.ws.external.nodelay = true - -##-------------------------------------------------------------------- -## External MQTT/WebSocket/SSL Listener - -listener.wss.external = 8084 - -listener.wss.external.acceptors = 4 - -listener.wss.external.max_clients = 64 - -listener.wss.external.access.1 = allow all - -## SSL Options -listener.wss.external.handshake_timeout = 15s - -listener.wss.external.keyfile = certs/key.pem - -listener.wss.external.certfile = certs/cert.pem - -## listener.wss.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem - -## listener.wss.external.verify = verify_peer - -## listener.wss.external.fail_if_no_peer_cert = true - -##-------------------------------------------------------------------- -## HTTP Management API Listener - -listener.api.mgmt = 127.0.0.1:8080 - -listener.api.mgmt.acceptors = 4 - -listener.api.mgmt.max_clients = 64 - -listener.api.mgmt.access.1 = allow all - -##------------------------------------------------------------------- -## System Monitor -##------------------------------------------------------------------- - -## Long GC, don't monitor in production mode for: -## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 -sysmon.long_gc = false - -## Long Schedule(ms) -sysmon.long_schedule = 240 - -## 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. -sysmon.large_heap = 8MB - -## Busy Port -sysmon.busy_port = false - -## Busy Dist Port -sysmon.busy_dist_port = true - diff --git a/test/emqttd_SUITE_data/emqttd.schema b/test/emqttd_SUITE_data/emqttd.schema deleted file mode 100644 index a2860322c..000000000 --- a/test/emqttd_SUITE_data/emqttd.schema +++ /dev/null @@ -1,1173 +0,0 @@ -%%-*- mode: erlang -*- -%% EMQ config mapping - -%%-------------------------------------------------------------------- -%% Cluster -%%-------------------------------------------------------------------- - -%% Cluster ID -{mapping, "cluster.id", "emqttd.cluster", [ - {default, "emq"}, - {datatype, string} -]}. - -%% Cluster Multicast Addr -{mapping, "cluster.multicast", "emqttd.cluster", [ - {default, "239.192.0.1:44369"}, - {datatype, string} -]}. - -{translation, "emqttd.cluster", fun(Conf) -> - Multicast = cuttlefish:conf_get("cluster.multicast", Conf), - [Addr, Port] = string:tokens(Multicast, ":"), - {ok, Ip} = inet_parse:address(Addr), - [{id, cuttlefish:conf_get("cluster.id", Conf)}, - {multicast, {Ip, list_to_integer(Port)}}] -end}. - -%%-------------------------------------------------------------------- -%% Erlang Node -%%-------------------------------------------------------------------- - -%% @doc Erlang node name -{mapping, "node.name", "vm_args.-name", [ - {default, "emq@127.0.0.1"} -]}. - -%% @doc Secret cookie for distributed erlang node -{mapping, "node.cookie", "vm_args.-setcookie", [ - {default, "emqsecretcookie"} -]}. - -%% @doc SMP Support -{mapping, "node.smp", "vm_args.-smp", [ - {default, auto}, - {datatype, {enum, [enable, auto, disable]}}, - hidden -]}. - -%% @doc http://erlang.org/doc/man/heart.html -{mapping, "node.heartbeat", "vm_args.-heart", [ - {datatype, flag}, - hidden -]}. - -{translation, "vm_args.-heart", fun(Conf) -> - case cuttlefish:conf_get("node.heartbeat", Conf) of - true -> ""; - false -> cuttlefish:invalid("should be 'on' or comment the line!") - end -end}. - -%% @doc Enable Kernel Poll -{mapping, "node.kernel_poll", "vm_args.+K", [ - {default, on}, - {datatype, flag}, - hidden -]}. - -%% @doc More information at: http://erlang.org/doc/man/erl.html -{mapping, "node.async_threads", "vm_args.+A", [ - {default, 64}, - {datatype, integer}, - {validators, ["range:0-1024"]} -]}. - -%% @doc Erlang Process Limit -{mapping, "node.process_limit", "vm_args.+P", [ - {datatype, integer}, - {default, 256000}, - hidden -]}. - -%% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q -%% @doc The number of concurrent ports/sockets -%% Valid range is 1024-134217727 -{mapping, "node.max_ports", - cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [ - {default, 262144}, - {datatype, integer}, - {validators, ["range4ports"]} -]}. - -{validator, "range4ports", "must be 1024 to 134217727", - fun(X) -> X >= 1024 andalso X =< 134217727 end}. - -%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl -{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [ - {datatype, bytesize}, - {commented, "32MB"}, - hidden, - {validators, ["zdbbl_range"]} -]}. - -{translation, "vm_args.+zdbbl", - fun(Conf) -> - ZDBBL = cuttlefish:conf_get("node.dist_buffer_size", Conf, undefined), - case ZDBBL of - undefined -> undefined; - X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes; - _ -> undefined - end - end -}. - -{validator, "zdbbl_range", "must be between 1KB and 2097151KB", - fun(ZDBBL) -> - %% 2097151KB = 2147482624 - ZDBBL >= 1024 andalso ZDBBL =< 2147482624 - end -}. - -%% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2 -{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [ - {default, 1000}, - {datatype, integer}, - hidden, - {validators, ["positive_integer"]} -]}. - -{validator, "positive_integer", "must be a positive integer", - fun(X) -> X >= 0 end}. - -%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES, -%% R16+ uses +e -%% @doc The ETS table limit -{mapping, "node.max_ets_tables", - cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [ - {default, 256000}, - {datatype, integer}, - hidden -]}. - -%% @doc Set the location of crash dumps -{mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [ - {default, "{{crash_dump}}"}, - {datatype, file}, - hidden -]}. - -%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime -{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [ - {commented, 60}, - {datatype, integer}, - hidden -]}. - -%% @doc http://www.erlang.org/doc/man/kernel_app.html -{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [ - {commented, 6369}, - {datatype, integer}, - hidden -]}. - -%% @see node.dist_listen_min -{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [ - {commented, 6369}, - {datatype, integer}, - hidden -]}. - -%%-------------------------------------------------------------------- -%% Log -%%-------------------------------------------------------------------- - -{mapping, "log.dir", "lager.log_dir", [ - {default, "log"}, - {datatype, string} -]}. - -{mapping, "log.console", "lager.handlers", [ - {default, file}, - {datatype, {enum, [off, file, console, both]}} -]}. - -{mapping, "log.console.level", "lager.handlers", [ - {default, info}, - {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}} -]}. - -{mapping, "log.console.file", "lager.handlers", [ - {default, "log/console.log"}, - {datatype, file} -]}. - -{mapping, "log.error.file", "lager.handlers", [ - {default, "log/error.log"}, - {datatype, file} -]}. - -{mapping, "log.syslog", "lager.handlers", [ - {default, off}, - {datatype, flag} -]}. - -{mapping, "log.syslog.identity", "lager.handlers", [ - {default, "emqttd"}, - {datatype, string} -]}. - -{mapping, "log.syslog.facility", "lager.handlers", [ - {default, local0}, - {datatype, {enum, [daemon, local0, local1, local2, local3, local4, local5, local6, local7]}} -]}. - -{mapping, "log.syslog.level", "lager.handlers", [ - {default, error}, - {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency]}} -]}. - -{mapping, "log.error.redirect", "lager.error_logger_redirect", [ - {default, on}, - {datatype, flag}, - hidden -]}. - -{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [ - {default, 1000}, - {datatype, integer}, - hidden -]}. - -{translation, - "lager.handlers", - fun(Conf) -> - ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of - undefined -> []; - ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename}, - {level, error}, - {size, 10485760}, - {date, "$D0"}, - {count, 5}]}] - end, - - ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf), - ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf), - - ConsoleHandler = {lager_console_backend, ConsoleLogLevel}, - ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile}, - {level, ConsoleLogLevel}, - {size, 10485760}, - {date, "$D0"}, - {count, 5}]}, - - ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of - off -> []; - file -> [ConsoleFileHandler]; - console -> [ConsoleHandler]; - both -> [ConsoleHandler, ConsoleFileHandler]; - _ -> [] - end, - - SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf) of - false -> []; - true -> [{lager_syslog_backend, - [cuttlefish:conf_get("log.syslog.identity", Conf), - cuttlefish:conf_get("log.syslog.facility", Conf), - cuttlefish:conf_get("log.syslog.level", Conf)]}] - end, - - ConsoleHandlers ++ ErrorHandler ++ SyslogHandler - end -}. - -{mapping, "log.crash", "lager.crash_log", [ - {default, on}, - {datatype, flag} -]}. - -{mapping, "log.crash.file", "lager.crash_log", [ - {default, "log/crash.log"}, - {datatype, file} -]}. - -{translation, - "lager.crash_log", - fun(Conf) -> - case cuttlefish:conf_get("log.crash", Conf) of - false -> undefined; - _ -> - cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log") - end - end}. - -{mapping, "sasl", "sasl.sasl_error_logger", [ - {default, off}, - {datatype, flag}, - hidden -]}. - -%%-------------------------------------------------------------------- -%% Allow Anonymous and Default ACL -%%-------------------------------------------------------------------- - -%% @doc Allow Anonymous -{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [ - {default, false}, - {datatype, {enum, [true, false]}} -]}. - -%% @doc ACL nomatch -{mapping, "mqtt.acl_nomatch", "emqttd.acl_nomatch", [ - {default, allow}, - {datatype, {enum, [allow, deny]}} -]}. - -%% @doc Default ACL File -{mapping, "mqtt.acl_file", "emqttd.acl_file", [ - {datatype, string}, - hidden -]}. - -%% @doc Cache ACL for PUBLISH -{mapping, "mqtt.cache_acl", "emqttd.cache_acl", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -%%-------------------------------------------------------------------- -%% MQTT Protocol -%%-------------------------------------------------------------------- - -%% @doc Set the Max ClientId Length Allowed. -{mapping, "mqtt.max_clientid_len", "emqttd.protocol", [ - {default, 1024}, - {datatype, integer} -]}. - -%% @doc Max Packet Size Allowed, 64K by default. -{mapping, "mqtt.max_packet_size", "emqttd.protocol", [ - {default, "64KB"}, - {datatype, bytesize} -]}. - -{translation, "emqttd.protocol", fun(Conf) -> - [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)}, - {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}] -end}. - -{mapping, "mqtt.websocket_protocol_header", "emqttd.websocket_protocol_header", [ - {default, on}, - {datatype, flag} -]}. - -%%-------------------------------------------------------------------- -%% MQTT Connection -%%-------------------------------------------------------------------- - -%% @doc Force the client to GC: integer -{mapping, "mqtt.conn.force_gc_count", "emqttd.conn_force_gc_count", [ - {datatype, integer} -]}. - -%%-------------------------------------------------------------------- -%% MQTT Client -%%-------------------------------------------------------------------- - -%% @doc Max Publish Rate of Message -{mapping, "mqtt.client.max_publish_rate", "emqttd.client", [ - {default, 0}, - {datatype, integer} -]}. - -%% @doc Client Idle Timeout. -{mapping, "mqtt.client.idle_timeout", "emqttd.client", [ - {default, "30s"}, - {datatype, {duration, ms}} -]}. - -%% @doc Enable Stats of Client. -{mapping, "mqtt.client.enable_stats", "emqttd.client", [ - {default, off}, - {datatype, flag} -]}. - -{translation, "emqttd.client", fun(Conf) -> - [{max_publish_rate, cuttlefish:conf_get("mqtt.client.max_publish_rate", Conf)}, - {client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)}, - {client_enable_stats, cuttlefish:conf_get("mqtt.client.enable_stats", Conf)}] -end}. - -%%-------------------------------------------------------------------- -%% MQTT Session -%%-------------------------------------------------------------------- - -%% @doc Max Number of Subscriptions Allowed -{mapping, "mqtt.session.max_subscriptions", "emqttd.session", [ - {default, 0}, - {datatype, integer} -]}. - -%% @doc Upgrade QoS? -{mapping, "mqtt.session.upgrade_qos", "emqttd.session", [ - {default, off}, - {datatype, flag} -]}. - -%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time. -%% 0 means no limit -{mapping, "mqtt.session.max_inflight", "emqttd.session", [ - {default, 100}, - {datatype, integer} -]}. - -%% @doc Retry interval for redelivering QoS1/2 messages. -{mapping, "mqtt.session.retry_interval", "emqttd.session", [ - {default, "20s"}, - {datatype, {duration, ms}} -]}. - -%% @doc Max Packets that Awaiting PUBREL, 0 means no limit -{mapping, "mqtt.session.max_awaiting_rel", "emqttd.session", [ - {default, 0}, - {datatype, integer} -]}. - -%% @doc Awaiting PUBREL Timeout -{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [ - {default, "20s"}, - {datatype, {duration, ms}} -]}. - -%% @doc Enable Stats -{mapping, "mqtt.session.enable_stats", "emqttd.session", [ - {default, off}, - {datatype, flag} -]}. - -%% @doc Session Expiry Interval -{mapping, "mqtt.session.expiry_interval", "emqttd.session", [ - {default, "2h"}, - {datatype, {duration, ms}} -]}. - -%% @doc Ignore message from self publish -{mapping, "mqtt.session.ignore_loop_deliver", "emqttd.session", [ - {default, false}, - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqttd.session", fun(Conf) -> - [{max_subscriptions, cuttlefish:conf_get("mqtt.session.max_subscriptions", Conf)}, - {upgrade_qos, cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)}, - {max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)}, - {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)}, - {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)}, - {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)}, - {enable_stats, cuttlefish:conf_get("mqtt.session.enable_stats", Conf)}, - {expiry_interval, cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}, - {ignore_loop_deliver, cuttlefish:conf_get("mqtt.session.ignore_loop_deliver", Conf)}] -end}. - -%%-------------------------------------------------------------------- -%% MQTT MQueue -%%-------------------------------------------------------------------- - -%% @doc Type: simple | priority -{mapping, "mqtt.mqueue.type", "emqttd.mqueue", [ - {default, simple}, - {datatype, atom} -]}. - -%% @doc Topic Priority: 0~255, Default is 0 -{mapping, "mqtt.mqueue.priority", "emqttd.mqueue", [ - {default, ""}, - {datatype, string} -]}. - -%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. 0 means no limit. -{mapping, "mqtt.mqueue.max_length", "emqttd.mqueue", [ - {default, 0}, - {datatype, integer} -]}. - -%% @doc Low-water mark of queued messages -{mapping, "mqtt.mqueue.low_watermark", "emqttd.mqueue", [ - {default, "20%"}, - {datatype, string} -]}. - -%% @doc High-water mark of queued messages -{mapping, "mqtt.mqueue.high_watermark", "emqttd.mqueue", [ - {default, "60%"}, - {datatype, string} -]}. - -%% @doc Queue Qos0 messages? -{mapping, "mqtt.mqueue.store_qos0", "emqttd.mqueue", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqttd.mqueue", fun(Conf) -> - Parse = fun(S) -> - {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), - list_to_integer(N) / 100 - end, - Opts = [{type, cuttlefish:conf_get("mqtt.mqueue.type", Conf, simple)}, - {max_length, cuttlefish:conf_get("mqtt.mqueue.max_length", Conf)}, - {low_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf))}, - {high_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf))}, - {store_qos0, cuttlefish:conf_get("mqtt.mqueue.store_qos0", Conf)}], - case cuttlefish:conf_get("mqtt.mqueue.priority", Conf) of - undefined -> Opts; - V -> [{priority, - [begin [T, P] = string:tokens(S, "="), - {T, list_to_integer(P)} - end || S <- string:tokens(V, ",")]} | Opts] - end -end}. - -%%-------------------------------------------------------------------- -%% MQTT Broker -%%-------------------------------------------------------------------- - -{mapping, "mqtt.broker.sys_interval", "emqttd.broker_sys_interval", [ - {default, 60}, - {datatype, integer} -]}. - -%%-------------------------------------------------------------------- -%% MQTT PubSub -%%-------------------------------------------------------------------- - -{mapping, "mqtt.pubsub.pool_size", "emqttd.pubsub", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "mqtt.pubsub.by_clientid", "emqttd.pubsub", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{mapping, "mqtt.pubsub.async", "emqttd.pubsub", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqttd.pubsub", fun(Conf) -> - [{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)}, - {by_clientid, cuttlefish:conf_get("mqtt.pubsub.by_clientid", Conf)}, - {async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}] -end}. - -%%-------------------------------------------------------------------- -%% MQTT Bridge -%%-------------------------------------------------------------------- - -{mapping, "mqtt.bridge.max_queue_len", "emqttd.bridge", [ - {default, 10000}, - {datatype, integer} -]}. - -{mapping, "mqtt.bridge.ping_down_interval", "emqttd.bridge", [ - {default, 1}, - {datatype, integer} -]}. - -{translation, "emqttd.bridge", fun(Conf) -> - [{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)}, - {ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}] -end}. - -%%------------------------------------------------------------------- -%% MQTT Plugins -%%------------------------------------------------------------------- - -{mapping, "mqtt.plugins.etc_dir", "emqttd.plugins_etc_dir", [ - {datatype, string} -]}. - -{mapping, "mqtt.plugins.loaded_file", "emqttd.plugins_loaded_file", [ - {datatype, string} -]}. - -%%-------------------------------------------------------------------- -%% MQTT Listeners -%%-------------------------------------------------------------------- - -%%-------------------------------------------------------------------- -%% TCP Listeners - -{mapping, "listener.tcp.$name", "emqttd.listeners", [ - {datatype, [integer, ip]} -]}. - -{mapping, "listener.tcp.$name.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "listener.tcp.$name.max_clients", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.tcp.$name.zone", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.tcp.$name.mountpoint", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.tcp.$name.rate_limit", "emqttd.listeners", [ - {default, undefined}, - {datatype, string} -]}. - -{mapping, "listener.tcp.$name.access.$id", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.tcp.$name.proxy_protocol", "emqttd.listeners", [ - %%{default, off}, - {datatype, flag} -]}. - -{mapping, "listener.tcp.$name.proxy_protocol_timeout", "emqttd.listeners", [ - %%{default, "5s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "listener.tcp.$name.backlog", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.tcp.$name.recbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.tcp.$name.sndbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.tcp.$name.buffer", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.tcp.$name.tune_buffer", "emqttd.listeners", [ - {datatype, flag}, - hidden -]}. - -{mapping, "listener.tcp.$name.nodelay", "emqttd.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -%%-------------------------------------------------------------------- -%% SSL Listeners - -{mapping, "listener.ssl.$name", "emqttd.listeners", [ - {datatype, [integer, ip]} -]}. - -{mapping, "listener.ssl.$name.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "listener.ssl.$name.max_clients", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.ssl.$name.zone", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.mountpoint", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.rate_limit", "emqttd.listeners", [ - {default, undefined}, - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.access.$id", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.proxy_protocol", "emqttd.listeners", [ - %%{default, off}, - {datatype, flag} -]}. - -{mapping, "listener.ssl.$name.proxy_protocol_timeout", "emqttd.listeners", [ - %%{default, "5s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "listener.ssl.$name.backlog", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.ssl.$name.recbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.ssl.$name.sndbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.ssl.$name.buffer", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.ssl.$name.tune_buffer", "emqttd.listeners", [ - {datatype, flag}, - hidden -]}. - -{mapping, "listener.ssl.$name.nodelay", "emqttd.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -{mapping, "listener.ssl.$name.tls_versions", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.ciphers", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.handshake_timeout", "emqttd.listeners", [ - {default, "15s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "listener.ssl.$name.dhfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.keyfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.certfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.cacertfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ssl.$name.verify", "emqttd.listeners", [ - {datatype, atom} -]}. - -{mapping, "listener.ssl.$name.fail_if_no_peer_cert", "emqttd.listeners", [ - {datatype, {enum, [true, false]}} -]}. - -{mapping, "listener.ssl.$name.secure_renegotiate", "emqttd.listeners", [ - {datatype, flag} -]}. - -{mapping, "listener.ssl.$name.reuse_sessions", "emqttd.listeners", [ - {default, on}, - {datatype, flag} -]}. - -{mapping, "listener.ssl.$name.honor_cipher_order", "emqttd.listeners", [ - {datatype, flag} -]}. - -{mapping, "listener.ssl.$name.peer_cert_as_username", "emqttd.listeners", [ - {datatype, {enum, [cn, dn]}} -]}. - -%%-------------------------------------------------------------------- -%% MQTT/WebSocket Listeners - -{mapping, "listener.ws.$name", "emqttd.listeners", [ - {datatype, [integer, ip]} -]}. - -{mapping, "listener.ws.$name.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "listener.ws.$name.max_clients", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.ws.$name.rate_limit", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ws.$name.zone", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ws.$name.access.$id", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ws.$name.backlog", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.ws.$name.recbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.ws.$name.sndbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.ws.$name.buffer", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.ws.$name.tune_buffer", "emqttd.listeners", [ - {datatype, flag}, - hidden -]}. - -{mapping, "listener.ws.$name.nodelay", "emqttd.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -%%-------------------------------------------------------------------- -%% MQTT/WebSocket/SSL Listeners - -{mapping, "listener.wss.$name", "emqttd.listeners", [ - {datatype, [integer, ip]} -]}. - -{mapping, "listener.wss.$name.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "listener.wss.$name.max_clients", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.wss.$name.zone", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.mountpoint", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.rate_limit", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.access.$id", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.backlog", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.wss.$name.recbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.wss.$name.sndbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.wss.$name.buffer", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.wss.$name.tune_buffer", "emqttd.listeners", [ - {datatype, flag}, - hidden -]}. - -{mapping, "listener.wss.$name.nodelay", "emqttd.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -{mapping, "listener.wss.$name.handshake_timeout", "emqttd.listeners", [ - {default, "15s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "listener.wss.$name.keyfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.certfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.cacertfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.verify", "emqttd.listeners", [ - {datatype, atom} -]}. - -{mapping, "listener.wss.$name.fail_if_no_peer_cert", "emqttd.listeners", [ - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqttd.listeners", fun(Conf) -> - - Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, - - Atom = fun(undefined) -> undefined; (S) -> list_to_atom(S) end, - - Access = fun(S) -> - [A, CIDR] = string:tokens(S, " "), - {list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end} - end, - - AccOpts = fun(Prefix) -> - case cuttlefish_variable:filter_by_prefix(Prefix ++ ".access", Conf) of - [] -> []; - Rules -> [{access, [Access(Rule) || {_, Rule} <- Rules]}] - end - end, - - MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end, - - ConnOpts = fun(Prefix) -> - Filter([{zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, - {rate_limit, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}, - {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, - {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, - {mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))}, - {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}]) - end, - - LisOpts = fun(Prefix) -> - Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, - {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, - {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)} | AccOpts(Prefix)]) - end, - TcpOpts = fun(Prefix) -> - Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, - {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, - {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, - {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, - {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}]) - end, - - SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, - - SslOpts = fun(Prefix) -> - Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of - undefined -> undefined; - L -> [list_to_atom(V) || V <- L] - end, - Filter([{versions, Versions}, - {ciphers, SplitFun(cuttlefish:conf_get(Prefix ++ ".ciphers", Conf, undefined))}, - {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)}, - {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, - {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, - {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, - {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, - {verify, cuttlefish:conf_get(Prefix ++ ".verify", Conf, undefined)}, - {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)}, - {secure_renegotiate, cuttlefish:conf_get(Prefix ++ ".secure_renegotiate", Conf, undefined)}, - {reuse_sessions, cuttlefish:conf_get(Prefix ++ ".reuse_sessions", Conf, undefined)}, - {honor_cipher_order, cuttlefish:conf_get(Prefix ++ ".honor_cipher_order", Conf, undefined)}]) - end, - - TcpListeners = fun(Type, Name) -> - Prefix = string:join(["listener", Type, Name], "."), - case cuttlefish:conf_get(Prefix, Conf, undefined) of - undefined -> - []; - ListenOn -> - [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, {sockopts, TcpOpts(Prefix)} | LisOpts(Prefix)]}] - end - end, - - SslListeners = fun(Type, Name) -> - Prefix = string:join(["listener", Type, Name], "."), - case cuttlefish:conf_get(Prefix, Conf, undefined) of - undefined -> - []; - ListenOn -> - [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, - {sockopts, TcpOpts(Prefix)}, - {sslopts, SslOpts(Prefix)} | LisOpts(Prefix)]}] - end - end, - - ApiListeners = fun(Type, Name) -> - Prefix = string:join(["listener", Type, Name], "."), - case cuttlefish:conf_get(Prefix, Conf, undefined) of - undefined -> - []; - ListenOn -> - SslOpts1 = case SslOpts(Prefix) of - [] -> []; - SslOpts0 -> [{sslopts, SslOpts0}] - end, - [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, - {sockopts, TcpOpts(Prefix)}| LisOpts(Prefix)] ++ SslOpts1}] - end - end, - - - lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} - <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] - ++ - [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} - <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)] - ++ - [ApiListeners(Type, Name) || {["listener", Type, Name], ListenOn} - <- cuttlefish_variable:filter_by_prefix("listener.api", Conf)]) -end}. - -%%-------------------------------------------------------------------- -%% MQTT REST API Listeners - -{mapping, "listener.api.$name", "emqttd.listeners", [ - {datatype, [integer, ip]} -]}. - -{mapping, "listener.api.$name.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "listener.api.$name.max_clients", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.api.$name.rate_limit", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.access.$id", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.backlog", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.api.$name.recbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.api.$name.sndbuf", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.api.$name.buffer", "emqttd.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.api.$name.tune_buffer", "emqttd.listeners", [ - {datatype, flag}, - hidden -]}. - -{mapping, "listener.api.$name.nodelay", "emqttd.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -{mapping, "listener.api.$name.handshake_timeout", "emqttd.listeners", [ - {datatype, {duration, ms}} -]}. - -{mapping, "listener.api.$name.keyfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.certfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.cacertfile", "emqttd.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.verify", "emqttd.listeners", [ - {datatype, atom} -]}. - -{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqttd.listeners", [ - {datatype, {enum, [true, false]}} -]}. - -%%-------------------------------------------------------------------- -%% System Monitor -%%-------------------------------------------------------------------- - -%% @doc Long GC, don't monitor in production mode for: -%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 -{mapping, "sysmon.long_gc", "emqttd.sysmon", [ - {default, false}, - {datatype, {enum, [true, false]}} -]}. - -%% @doc Long Schedule(ms) -{mapping, "sysmon.long_schedule", "emqttd.sysmon", [ - {default, 1000}, - {datatype, integer} -]}. - -%% @doc Large Heap -{mapping, "sysmon.large_heap", "emqttd.sysmon", [ - {default, "8MB"}, - {datatype, bytesize} -]}. - -%% @doc Monitor Busy Port -{mapping, "sysmon.busy_port", "emqttd.sysmon", [ - {default, false}, - {datatype, {enum, [true, false]}} -]}. - -%% @doc Monitor Busy Dist Port -{mapping, "sysmon.busy_dist_port", "emqttd.sysmon", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqttd.sysmon", fun(Conf) -> - [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)}, - {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)}, - {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)}, - {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, - {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] -end}. - diff --git a/test/emqttd_lib_SUITE.erl b/test/emqttd_lib_SUITE.erl index 344e185d0..a808fbcc8 100644 --- a/test/emqttd_lib_SUITE.erl +++ b/test/emqttd_lib_SUITE.erl @@ -34,7 +34,7 @@ all() -> [{group, guid}, {group, opts}, {group, ?PQ}, {group, time}, - {group, node}, {group, base62}]. + {group, base62}]. groups() -> [{guid, [], [guid_gen, guid_hexstr, guid_base62]}, @@ -42,7 +42,6 @@ groups() -> {?PQ, [], [priority_queue_plen, priority_queue_out2]}, {time, [], [time_now_to_]}, - {node, [], [node_is_aliving, node_parse_name]}, {base62, [], [base62_encode]}]. %%-------------------------------------------------------------------- @@ -144,19 +143,6 @@ time_now_to_(_) -> emqttd_time:now_secs(), emqttd_time:now_ms(). -%%-------------------------------------------------------------------- -%% emqttd_node -%%-------------------------------------------------------------------- - -node_is_aliving(_) -> - io:format("Node: ~p~n", [node()]), - true = emqttd_node:is_aliving(node()), - false = emqttd_node:is_aliving('x@127.0.0.1'). - -node_parse_name(_) -> - 'a@127.0.0.1' = emqttd_node:parse_name("a@127.0.0.1"), - 'b@127.0.0.1' = emqttd_node:parse_name("b"). - %%-------------------------------------------------------------------- %% base62 encode decode %%-------------------------------------------------------------------- diff --git a/test/emqttd_mqueue_SUITE.erl b/test/emqttd_mqueue_SUITE.erl index e56b08398..93ccc9833 100644 --- a/test/emqttd_mqueue_SUITE.erl +++ b/test/emqttd_mqueue_SUITE.erl @@ -28,7 +28,7 @@ all() -> [t_in, t_in_qos0, t_out, t_simple_mqueue, t_priority_mqueue, t_in(_) -> Opts = [{max_length, 5}, - {queue_qos0, true}], + {store_qos0, true}], Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), true = ?Q:is_empty(Q), Q1 = ?Q:in(#mqtt_message{}, Q), @@ -42,7 +42,7 @@ t_in(_) -> t_in_qos0(_) -> Opts = [{max_length, 5}, - {queue_qos0, false}], + {store_qos0, false}], Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), Q1 = ?Q:in(#mqtt_message{}, Q), true = ?Q:is_empty(Q1), @@ -51,7 +51,7 @@ t_in_qos0(_) -> t_out(_) -> Opts = [{max_length, 5}, - {queue_qos0, true}], + {store_qos0, true}], Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), {empty, Q} = ?Q:out(Q), Q1 = ?Q:in(#mqtt_message{}, Q), @@ -64,7 +64,7 @@ t_simple_mqueue(_) -> {max_length, 3}, {low_watermark, 0.2}, {high_watermark, 0.6}, - {queue_qos0, false}], + {store_qos0, false}], Q = ?Q:new("simple_queue", Opts, alarm_fun()), simple = ?Q:type(Q), 3 = ?Q:max_len(Q), @@ -81,18 +81,18 @@ t_simple_mqueue(_) -> t_infinity_simple_mqueue(_) -> Opts = [{type, simple}, - {max_length, infinity}, + {max_length, 0}, {low_watermark, 0.2}, {high_watermark, 0.6}, - {queue_qos0, false}], + {store_qos0, false}], Q = ?Q:new("infinity_simple_queue", Opts, alarm_fun()), true = ?Q:is_empty(Q), - infinity = ?Q:max_len(Q), + 0 = ?Q:max_len(Q), Qx = lists:foldl(fun(I, AccQ) -> ?Q:in(#mqtt_message{qos = 1, payload = iolist_to_binary([I])}, AccQ) end, Q, lists:seq(1, 255)), 255 = ?Q:len(Qx), - [{len, 255}, {max_len, infinity}, {dropped, 0}] = ?Q:stats(Qx), + [{len, 255}, {max_len, 0}, {dropped, 0}] = ?Q:stats(Qx), {{value, V}, _Qy} = ?Q:out(Qx), <<1>> = V#mqtt_message.payload. @@ -102,7 +102,7 @@ t_priority_mqueue(_) -> {max_length, 3}, {low_watermark, 0.2}, {high_watermark, 0.6}, - {queue_qos0, false}], + {store_qos0, false}], Q = ?Q:new("priority_queue", Opts, alarm_fun()), priority = ?Q:type(Q), 3 = ?Q:max_len(Q), @@ -125,24 +125,24 @@ t_priority_mqueue(_) -> t_infinity_priority_mqueue(_) -> Opts = [{type, priority}, {priority, [{<<"t1">>, 10}, {<<"t2">>, 8}]}, - {max_length, infinity}, - {queue_qos0, false}], + {max_length, 0}, + {store_qos0, false}], Q = ?Q:new("infinity_priority_queue", Opts, alarm_fun()), - infinity = ?Q:max_len(Q), + 0 = ?Q:max_len(Q), Qx = lists:foldl(fun(I, AccQ) -> AccQ1 = ?Q:in(#mqtt_message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ), ?Q:in(#mqtt_message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1) end, Q, lists:seq(1, 255)), 510 = ?Q:len(Qx), - [{len, 510}, {max_len, infinity}, {dropped, 0}] = ?Q:stats(Qx). + [{len, 510}, {max_len, 0}, {dropped, 0}] = ?Q:stats(Qx). t_priority_mqueue2(_) -> Opts = [{type, priority}, {max_length, 2}, {low_watermark, 0.2}, {high_watermark, 0.6}, - {queue_qos0, false}], + {store_qos0, false}], Q = ?Q:new("priority_queue2_test", Opts, alarm_fun()), 2 = ?Q:max_len(Q), Q1 = ?Q:in(#mqtt_message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q), diff --git a/test/emqttd_trie_SUITE.erl b/test/emqttd_trie_SUITE.erl index 2394a902a..a81a132f5 100644 --- a/test/emqttd_trie_SUITE.erl +++ b/test/emqttd_trie_SUITE.erl @@ -28,14 +28,14 @@ all() -> [t_insert, t_match, t_match2, t_match3, t_delete, t_delete2, t_delete3]. init_per_suite(Config) -> - emqttd_mnesia:ensure_started(), + ekka_mnesia:ensure_started(), ?TRIE:mnesia(boot), ?TRIE:mnesia(copy), Config. end_per_suite(_Config) -> - emqttd_mnesia:ensure_stopped(), - emqttd_mnesia:delete_schema(). + ekka_mnesia:ensure_stopped(), + ekka_mnesia:delete_schema(). init_per_testcase(_TestCase, Config) -> Config.