diff --git a/Makefile b/Makefile index 20420cf73..3738111b4 100644 --- a/Makefile +++ b/Makefile @@ -1,15 +1,16 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker -PROJECT_VERSION = 2.2 +PROJECT_VERSION = 2.3 -DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt +DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt dep_goldrush = git https://github.com/basho/goldrush 0.1.9 dep_gproc = git https://github.com/uwiger/gproc dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master -dep_esockd = git https://github.com/emqtt/esockd emq22 -dep_mochiweb = git https://github.com/emqtt/mochiweb emq22 +dep_esockd = git https://github.com/emqtt/esockd master +dep_ekka = git https://github.com/emqtt/ekka master +dep_mochiweb = git https://github.com/emqtt/mochiweb master dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master diff --git a/README.md b/README.md index 9bbd5f8eb..a4f7432e6 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,8 @@ Please visit [emqtt.io](http://emqtt.io) for more service. Follow us on Twitter: * Passed eclipse paho interoperability tests * Local Subscription * Shared Subscription +* Proxy Protocol V1/2 +* Lua Hook and Web Hook ## Installation @@ -84,6 +86,8 @@ Plugin | Descrip [emq_auth_mongo](https://github.com/emqtt/emq_auth_mongo) | MongoDB Authentication/ACL Plugin [emq_auth_http](https://github.com/emqtt/emq_auth_http) | Authentication/ACL by HTTP API [emq_auth_ldap](https://github.com/emqtt/emq_auth_ldap) | LDAP Authentication Plugin +[emq_web_hook](https://github.com/emqtt/emq-web-hook) | Web Hook Plugin +[emq_lua_hook](https://github.com/emqtt/emq-lua-hook) | Lua Hook Plugin [emq_sn](https://github.com/emqtt/emq_sn) | MQTT-SN Protocol Plugin [emq_coap](https://github.com/emqtt/emq_coap) | CoAP Protocol Plugin [emq_stomp](https://github.com/emqtt/emq_stomp) | Stomp Protocol Plugin diff --git a/etc/emq.conf b/etc/emq.conf index ca694cf70..deb4c97a9 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -7,11 +7,71 @@ ## Cluster ##-------------------------------------------------------------------- -## The cluster Id -cluster.id = emq +## Cluster name +cluster.name = ekka -## The multicast address and port. -cluster.multicast = 239.192.0.1:44369 +## Cluster Cookie +cluster.cookie = ekkaclustercookie + +## Cluster Discovery: static | epmd | multicast | gossip | etcd | consul +cluster.discovery = static + +## Cluster Autoheal: on | off +cluster.autoheal = on + +## Clean down node of the cluster +cluster.clean_down = 1h + +##-------------------------------------------------------------------- +## Cluster with epmd + +cluster.epmd.seeds = a@127.0.0.1,b@127.0.0.1 + +##-------------------------------------------------------------------- +## Cluster with multicast + +## 1 second +cluster.mcast.period = 1s + +cluster.mcast.addr = 239.192.0.1:4369 + +cluster.mcast.iface = 0.0.0.0 + +cluster.mcast.ttl = 1 + +cluster.mcast.loop = on + +##-------------------------------------------------------------------- +## Cluster with Gossip + +cluster.gossip.seeds = 127.0.0.1:4369 + +cluster.gossip.protocol_period = 1s + +cluster.gossip.suspicion_factor = 3 + +##-------------------------------------------------------------------- +## Cluster with Etcd + +cluster.etcd.addr = 127.0.0.1:2367 + +cluster.etcd.prefix = emq + +cluster.etcd.node_ttl = 30m + +##-------------------------------------------------------------------- +## Cluster by Consul + +cluster.consul.addr = 127.0.0.1:8500 + +cluster.consul.acl_token = example-acl-token + +##-------------------------------------------------------------------- +## Discover by Kubernetes + +## cluster.k8s.selector = app=emq + +## cluster.k8s.node_basename = emq ##-------------------------------------------------------------------- ## Node Args @@ -21,7 +81,7 @@ cluster.multicast = 239.192.0.1:44369 node.name = emqttd@127.0.0.1 ## Cookie for distributed node -node.cookie = emqsecretcookie +## node.cookie = emqsecretcookie ## SMP support: enable, auto, disable node.smp = auto @@ -85,6 +145,9 @@ log.syslog.level = error ## Console log file ## log.console.file = {{ platform_log_dir }}/console.log +## Info log file +## log.info.file = {{ platform_log_dir }}/info.log + ## Error log file log.error.file = {{ platform_log_dir }}/error.log @@ -400,6 +463,17 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ### Notice: 'verify' should be configured as 'verify_peer' ## listener.ssl.external.peer_cert_as_username = cn +## SSL Socket Options +## listener.ssl.external.backlog = 1024 + +## listener.ssl.external.recbuf = 4KB + +## listener.ssl.external.sndbuf = 4KB + +## listener.ssl.external.buffer = 4KB + +## listener.ssl.external.nodelay = true + ##-------------------------------------------------------------------- ## External MQTT/WebSocket Listener diff --git a/priv/emq.schema b/priv/emq.schema index a2860322c..5ad6501d8 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -5,26 +5,168 @@ %% Cluster %%-------------------------------------------------------------------- -%% Cluster ID -{mapping, "cluster.id", "emqttd.cluster", [ - {default, "emq"}, +%% @doc Cluster name +{mapping, "cluster.name", "ekka.cluster_name", [ + {default, emqcl}, + {datatype, atom} +]}. + +%% @doc Secret cookie for the cluster +{mapping, "cluster.cookie", "vm_args.-setcookie", [ + {default, "emqclustercookie"} +]}. + +%% @doc Cluster discovery +{mapping, "cluster.discovery", "ekka.cluster_discovery", [ + {default, manual}, + {datatype, atom} +]}. + +%% @doc Cluster autoheal +{mapping, "cluster.autoheal", "ekka.cluster_autoheal", [ + {datatype, flag}, + {default, on} +]}. + + +%% @doc Clean down node from the cluster +{mapping, "cluster.clean_down", "ekka.cluster_clean_down", [ + {datatype, {duration, ms}}, + {default, "1h"} +]}. + +%%-------------------------------------------------------------------- +%% Cluster with epmd + +{mapping, "cluster.epmd.seeds", "ekka.cluster_discovery", [ {datatype, string} ]}. -%% Cluster Multicast Addr -{mapping, "cluster.multicast", "emqttd.cluster", [ - {default, "239.192.0.1:44369"}, +%%-------------------------------------------------------------------- +%% Cluster with IP Multicast + +{mapping, "cluster.mcast.addr", "ekka.cluster_discovery", [ + {datatype, ip} +]}. + +{mapping, "cluster.mcast.period", "ekka.cluster_discovery", [ + {datatype, {duration, ms}}, + {default, "1s"} +]}. + +{mapping, "cluster.mcast.iface", "ekka.cluster_discovery", [ + {datatype, string}, + {default, "0.0.0.0"} +]}. + +{mapping, "cluster.mcast.ttl", "ekka.cluster_discovery", [ + {datatype, integer}, + {default, 1} +]}. + +{mapping, "cluster.mcast.loop", "ekka.cluster_discovery", [ + {datatype, flag}, + {default, on} +]}. + +{mapping, "cluster.mcast.sndbuf", "ekka.cluster_discovery", [ + {datatype, bytesize}, + {default, "16KB"} +]}. + +{mapping, "cluster.mcast.recbuf", "ekka.cluster_discovery", [ + {datatype, bytesize}, + {default, "16KB"} +]}. + +{mapping, "cluster.mcast.buffer", "ekka.cluster_discovery", [ + {datatype, bytesize}, + {default, "32KB"} +]}. + +{mapping, "cluster.gossip.seeds", "ekka.cluster_discovery", [ {datatype, string} ]}. -{translation, "emqttd.cluster", fun(Conf) -> - Multicast = cuttlefish:conf_get("cluster.multicast", Conf), - [Addr, Port] = string:tokens(Multicast, ":"), - {ok, Ip} = inet_parse:address(Addr), - [{id, cuttlefish:conf_get("cluster.id", Conf)}, - {multicast, {Ip, list_to_integer(Port)}}] +{mapping, "cluster.gossip.protocol_period", "ekka.cluster_discovery", [ + {datatype, {duration, ms}}, + {default, "1s"} +]}. + +{mapping, "cluster.gossip.suspicion_factor", "ekka.cluster_discovery", [ + {datatype, integer}, + {default, 3} +]}. + +%%-------------------------------------------------------------------- +%% Cluster with Etcd + +{mapping, "cluster.etcd.addr", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.etcd.prefix", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.etcd.node_ttl", "ekka.cluster_discovery", [ + {datatype, {duration, ms}}, + {default, "1m"} +]}. + +%%-------------------------------------------------------------------- +%% Cluster with Consul + +{mapping, "cluster.consul.addr", "ekka.cluster_discovery", [ + {datatype, ip} +]}. + +{mapping, "cluster.consul.acl_token", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{translation, "ekka.cluster_discovery", fun(Conf) -> + Strategy = cuttlefish:conf_get("cluster.discovery", Conf), + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + IpPort = fun(S) -> + [Addr, Port] = string:tokens(S, ":"), + {ok, Ip} = inet:parse_address(Addr), + {Ip, Port} + end, + Options = fun(static) -> + [{seeds, cuttlefish:conf_get("cluster.epmd.seeds", Conf)}]; + (mcast) -> + {Addr, Port} = cuttlefish:conf_get("cluster.mcast.addr", Conf), + {ok, Ip} = inet:parse_address(Addr), + {ok, Iface} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.iface", Conf)), + [{addr, Ip}, {port, Port}, {iface, Iface}, + {period, cuttlefish:conf_get("cluster.mcast.period", Conf)}, + {ttl, cuttlefish:conf_get("cluster.mcast.ttl", Conf, 1)}, + {loop, cuttlefish:conf_get("cluster.mcast.loop", Conf, true)}]; + (etcd) -> + [{seeds, cuttlefish:conf_get("cluster.epmd.seeds", Conf)}, + {clean_down, cuttlefish:conf_get("cluster.epmd.clean_down", Conf, undefined)}]; + (gossip) -> + [{seeds, [IpPort(S) || S <- string:tokens(",", cuttlefish:conf_get("cluster.gossip.seeds", Conf))]}, + {protocol_period, cuttlefish:conf_get("cluster.gossip.protocol_period", Conf)}, + {suspicion_factor, cuttlefish:conf_get("cluster.gossip.suspicion_factor", Conf, 3)}]; + (etcd) -> + [{addr, cuttlefish:conf_get("cluster.etcd.addr", Conf)}, + {prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emq")}, + {node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}]; + (consul) -> + [{addr, cuttlefish:conf_get("cluster.consul.addr", Conf)}, + {acl_token, cuttlefish:conf_get("cluster.consul.acl_token", Conf)}]; + (k8s) -> + [{host, cuttlefish:conf_get("cluster.k8s.selector", Conf)}, + {acl_token, cuttlefish:conf_get("cluster.k8s.node_basename", Conf)}]; + (manual) -> + [ ] + end, + {Strategy, Filter(Options(Strategy))} end}. + %%-------------------------------------------------------------------- %% Erlang Node %%-------------------------------------------------------------------- @@ -35,9 +177,9 @@ end}. ]}. %% @doc Secret cookie for distributed erlang node -{mapping, "node.cookie", "vm_args.-setcookie", [ - {default, "emqsecretcookie"} -]}. +%% {mapping, "node.cookie", "vm_args.-setcookie", [ +%% {default, "emqsecretcookie"} +%%]}. %% @doc SMP Support {mapping, "node.smp", "vm_args.-smp", [ @@ -258,7 +400,6 @@ end}. both -> [ConsoleHandler, ConsoleFileHandler]; _ -> [] end, - SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf) of false -> []; true -> [{lager_syslog_backend, diff --git a/rebar.config b/rebar.config index 28b47bf58..0ce2b3602 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq22"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","emq22"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{bcrypt,".*",{git,"https://github.com/smarkets/erlang-bcrypt","master"}} +{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","master"}},{ekka,".*",{git,"https://github.com/emqtt/ekka","master"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","master"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{bcrypt,".*",{git,"https://github.com/smarkets/erlang-bcrypt","master"}} ]}. -{erl_opts, [{parse_transform,lager_transform}]}. +{erl_opts, [debug_info,{parse_transform,lager_transform}]}. diff --git a/src/emqttd.erl b/src/emqttd.erl index c60f8d590..d4cdd8437 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -40,6 +40,9 @@ %% Debug API -export([dump/0]). +%% Shutdown and reboot +-export([shutdown/0, shutdown/1, reboot/0]). + -type(subscriber() :: pid() | binary()). -type(suboption() :: local | {qos, non_neg_integer()} | {share, {'$queue' | binary()}}). @@ -161,6 +164,21 @@ run_hooks(Hook, Args) -> run_hooks(Hook, Args, Acc) -> emqttd_hooks:run(Hook, Args, Acc). +%%-------------------------------------------------------------------- +%% Shutdown and reboot +%%-------------------------------------------------------------------- + +shutdown() -> + shutdown(normal). + +shutdown(Reason) -> + lager:error("EMQ shutdown for ~s", [Reason]), + emqttd_plugins:unload(), + lists:foreach(fun application:stop/1, [emqttd, ekka, mochiweb, esockd, gproc]). + +reboot() -> + lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, ekka, emqttd]). + %%-------------------------------------------------------------------- %% Debug %%-------------------------------------------------------------------- diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index ad52416df..acca547be 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -34,18 +34,19 @@ -define(APP, emqttd). %%-------------------------------------------------------------------- -%% Application callbacks +%% Application Callbacks %%-------------------------------------------------------------------- start(_Type, _Args) -> print_banner(), - emqttd_mnesia:start(), + ekka:start(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd_cli:load(), register_acl_mod(), emqttd_plugins:init(), emqttd_plugins:load(), + init_cluster(), start_listeners(), register(emqttd, self()), print_vsn(), @@ -146,6 +147,14 @@ register_acl_mod() -> undefined -> ok end. +%%-------------------------------------------------------------------- +%% Init Cluster +%%-------------------------------------------------------------------- + +init_cluster() -> + ekka:callback(prepare, fun emqttd:shutdown/1), + ekka:callback(reboot, fun emqttd:reboot/0). + %%-------------------------------------------------------------------- %% Start Listeners %%-------------------------------------------------------------------- diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index e729ec9cb..c35f44ae3 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -40,7 +40,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {started_at, sys_interval, heartbeat, tick_tref, version, sysdescr}). +-record(state, {started_at, sys_interval, heartbeat, ticker, version, sysdescr}). -define(APP, emqttd). @@ -122,9 +122,9 @@ init([]) -> % Tick {ok, #state{started_at = os:timestamp(), heartbeat = start_tick(1000, heartbeat), - version = list_to_binary(version()), - sysdescr = list_to_binary(sysdescr()), - tick_tref = start_tick(tick)}, hibernate}. + version = list_to_binary(version()), + sysdescr = list_to_binary(sysdescr()), + ticker = start_tick(tick)}, hibernate}. handle_call(uptime, _From, State) -> {reply, uptime(State), State}; @@ -149,7 +149,7 @@ handle_info(tick, State = #state{version = Version, sysdescr = Descr}) -> handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). -terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) -> +terminate(_Reason, #state{heartbeat = Hb, ticker = TRef}) -> stop_tick(Hb), stop_tick(TRef), ok. @@ -163,7 +163,7 @@ code_change(_OldVsn, State, _Extra) -> retain(brokers) -> Payload = list_to_binary(string:join([atom_to_list(N) || - N <- emqttd_mnesia:running_nodes()], ",")), + N <- ekka_mnesia:running_nodes()], ",")), Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload), emqttd:publish(emqttd_message:set_flag(sys, emqttd_message:set_flag(retain, Msg))). diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 369053909..d6301d484 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -111,7 +111,7 @@ broker(_) -> %% @doc Cluster with other nodes cluster(["join", SNode]) -> - case emqttd_cluster:join(emqttd_node:parse_name(SNode)) of + case ekka:join(ekka_node:parse_name(SNode)) of ok -> ?PRINT_MSG("Join the cluster successfully.~n"), cluster(["status"]); @@ -120,7 +120,7 @@ cluster(["join", SNode]) -> end; cluster(["leave"]) -> - case emqttd_cluster:leave() of + case ekka:leave() of ok -> ?PRINT_MSG("Leave the cluster successfully.~n"), cluster(["status"]); @@ -128,8 +128,8 @@ cluster(["leave"]) -> ?PRINT("Failed to leave the cluster: ~p~n", [Error]) end; -cluster(["remove", SNode]) -> - case emqttd_cluster:remove(emqttd_node:parse_name(SNode)) of +cluster(["force-leave", SNode]) -> + case ekka:force_leave(ekka_node:parse_name(SNode)) of ok -> ?PRINT_MSG("Remove the node from cluster successfully.~n"), cluster(["status"]); @@ -138,13 +138,13 @@ cluster(["remove", SNode]) -> end; cluster(["status"]) -> - ?PRINT("Cluster status: ~p~n", [emqttd_cluster:status()]); + ?PRINT("Cluster status: ~p~n", [ekka_cluster:status()]); cluster(_) -> - ?USAGE([{"cluster join ", "Join the cluster"}, - {"cluster leave", "Leave the cluster"}, - {"cluster remove ","Remove the node from cluster"}, - {"cluster status", "Cluster status"}]). + ?USAGE([{"cluster join ", "Join the cluster"}, + {"cluster leave", "Leave the cluster"}, + {"cluster force-leave ","Force the node leave from cluster"}, + {"cluster status", "Cluster status"}]). %%-------------------------------------------------------------------- %% @doc Users usage diff --git a/src/emqttd_cluster.erl b/src/emqttd_cluster.erl deleted file mode 100644 index 28b2d1723..000000000 --- a/src/emqttd_cluster.erl +++ /dev/null @@ -1,92 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_cluster). - --author("Feng Lee "). - --include("emqttd.hrl"). - -%% Cluster API --export([join/1, leave/0, status/0, remove/1]). - -%% RPC Call --export([prepare/0, reboot/0]). - -%% @doc Join cluster --spec(join(node()) -> ok | {error, any()}). -join(Node) when Node =:= node() -> - {error, {cannot_join_with_self, Node}}; - -join(Node) when is_atom(Node) -> - case {is_clustered(Node), emqttd:is_running(Node)} of - {false, true} -> - prepare(), ok = emqttd_mnesia:join_cluster(Node), reboot(); - {false, false} -> - {error, {node_not_running, Node}}; - {true, _} -> - {error, {already_clustered, Node}} - end. - -%% @doc Prepare to join or leave cluster. --spec(prepare() -> ok). -prepare() -> - emqttd_plugins:unload(), - lists:foreach(fun application:stop/1, [emqttd, mochiweb, esockd, gproc]). - -%% @doc Is node in cluster? --spec(is_clustered(node()) -> boolean()). -is_clustered(Node) -> - lists:member(Node, emqttd_mnesia:cluster_nodes(all)). - -%% @doc Reboot after join or leave cluster. --spec(reboot() -> ok). -reboot() -> - lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, emqttd]). - -%% @doc Leave from Cluster. --spec(leave() -> ok | {error, any()}). -leave() -> - case emqttd_mnesia:running_nodes() -- [node()] of - [_|_] -> - prepare(), ok = emqttd_mnesia:leave_cluster(), reboot(); - [] -> - {error, node_not_in_cluster} - end. - -%% @doc Remove a node from cluster. --spec(remove(node()) -> ok | {error, any()}). -remove(Node) when Node =:= node() -> - {error, {cannot_remove_self, Node}}; - -remove(Node) -> - case is_clustered(Node) andalso rpc:call(Node, ?MODULE, prepare, []) of - ok -> - case emqttd_mnesia:remove_from_cluster(Node) of - ok -> rpc:call(Node, ?MODULE, reboot, []); - Error -> Error - end; - false -> - {error, node_not_in_cluster}; - {badrpc, nodedown} -> - emqttd_mnesia:remove_from_cluster(Node); - {badrpc, Reason} -> - {error, Reason} - end. - -%% @doc Cluster status -status() -> emqttd_mnesia:cluster_status(). - diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index cf705ff00..e09154245 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -70,7 +70,7 @@ handle_request(Method, Path, Req) -> http_publish(Req) -> Params = [{iolist_to_binary(Key), Val} || {Key, Val} <- mochiweb_request:parse_post(Req)], - lager:info("HTTP Publish: ~p", [Params]), + lager:debug("HTTP Publish: ~p", [Params]), Topics = topics(Params), ClientId = get_value(<<"client">>, Params, http), Qos = int(get_value(<<"qos">>, Params, "0")), diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl deleted file mode 100644 index 20c054ab2..000000000 --- a/src/emqttd_mnesia.erl +++ /dev/null @@ -1,268 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_mnesia). - --author("Feng Lee "). - --include("emqttd.hrl"). - --include("emqttd_internal.hrl"). - -%% Start and stop mnesia --export([start/0, ensure_started/0, ensure_stopped/0, connect/1]). - -%% Cluster mnesia --export([join_cluster/1, cluster_status/0, leave_cluster/0, - remove_from_cluster/1, cluster_nodes/1, running_nodes/0]). - -%% Schema and tables --export([copy_schema/1, delete_schema/0, del_schema_copy/1, - create_table/2, copy_table/1, copy_table/2]). - -%%-------------------------------------------------------------------- -%% Start and init mnesia -%%-------------------------------------------------------------------- - -%% @doc Start mnesia database. --spec(start() -> ok). -start() -> - ensure_ok(ensure_data_dir()), - ensure_ok(init_schema()), - ok = mnesia:start(), - init_tables(), - wait_for(tables). - -%% @private -ensure_data_dir() -> - Dir = mnesia:system_info(directory) ++ "/", - case filelib:ensure_dir(Dir) of - ok -> ok; - {error, Reason} -> {error, {mnesia_dir_error, Dir, Reason}} - end. - -%% @doc ensure mnesia started. --spec(ensure_started() -> ok | {error, any()}). -ensure_started() -> - ok = mnesia:start(), wait_for(start). - -%% @doc ensure mnesia stopped. --spec(ensure_stopped() -> ok | {error, any()}). -ensure_stopped() -> - stopped = mnesia:stop(), wait_for(stop). - -%% @private -%% @doc Init mnesia schema or tables. -init_schema() -> - case mnesia:system_info(extra_db_nodes) of - [] -> mnesia:create_schema([node()]); - [_|_] -> ok - end. - -%% @private -%% @doc Init mnesia tables. -init_tables() -> - case mnesia:system_info(extra_db_nodes) of - [] -> create_tables(); - [_|_] -> copy_tables() - end. - -%% @doc Create mnesia tables. -create_tables() -> - emqttd_boot:apply_module_attributes(boot_mnesia). - -%% @doc Copy mnesia tables. -copy_tables() -> - emqttd_boot:apply_module_attributes(copy_mnesia). - -%% @doc Create mnesia table. --spec(create_table(Name:: atom(), TabDef :: list()) -> ok | {error, any()}). -create_table(Name, TabDef) -> - ensure_tab(mnesia:create_table(Name, TabDef)). - -%% @doc Copy mnesia table. --spec(copy_table(Name :: atom()) -> ok). -copy_table(Name) -> - copy_table(Name, ram_copies). - --spec(copy_table(Name:: atom(), ram_copies | disc_copies) -> ok). -copy_table(Name, RamOrDisc) -> - ensure_tab(mnesia:add_table_copy(Name, node(), RamOrDisc)). - -%% @doc Copy schema. -copy_schema(Node) -> - case mnesia:change_table_copy_type(schema, Node, disc_copies) of - {atomic, ok} -> - ok; - {aborted, {already_exists, schema, Node, disc_copies}} -> - ok; - {aborted, Error} -> - {error, Error} - end. - -%% @doc Force to delete schema. -delete_schema() -> - mnesia:delete_schema([node()]). - -%% @doc Delete schema copy -del_schema_copy(Node) -> - case mnesia:del_table_copy(schema, Node) of - {atomic, ok} -> ok; - {aborted, Reason} -> {error, Reason} - end. - -%%-------------------------------------------------------------------- -%% Cluster mnesia -%%-------------------------------------------------------------------- - -%% @doc Join the mnesia cluster --spec(join_cluster(node()) -> ok). -join_cluster(Node) when Node =/= node() -> - %% Stop mnesia and delete schema first - ensure_ok(ensure_stopped()), - ensure_ok(delete_schema()), - %% Start mnesia and cluster to node - ensure_ok(ensure_started()), - ensure_ok(connect(Node)), - ensure_ok(copy_schema(node())), - %% Copy tables - copy_tables(), - ensure_ok(wait_for(tables)). - -%% @doc Cluster status --spec(cluster_status() -> list()). -cluster_status() -> - Running = mnesia:system_info(running_db_nodes), - Stopped = mnesia:system_info(db_nodes) -- Running, - ?IF(Stopped =:= [], [{running_nodes, Running}], - [{running_nodes, Running}, {stopped_nodes, Stopped}]). - -%% @doc This node try leave the cluster --spec(leave_cluster() -> ok | {error, any()}). -leave_cluster() -> - case running_nodes() -- [node()] of - [] -> - {error, node_not_in_cluster}; - Nodes -> - case lists:any(fun(Node) -> - case leave_cluster(Node) of - ok -> true; - {error, _Reason} -> false - end - end, Nodes) of - true -> ok; - false -> {error, {failed_to_leave, Nodes}} - end - end. - --spec(leave_cluster(node()) -> ok | {error, any()}). -leave_cluster(Node) when Node =/= node() -> - case is_running_db_node(Node) of - true -> - ensure_ok(ensure_stopped()), - ensure_ok(rpc:call(Node, ?MODULE, del_schema_copy, [node()])), - ensure_ok(delete_schema()); - %%ensure_ok(start()); %% restart? - false -> - {error, {node_not_running, Node}} - end. - -%% @doc Remove node from mnesia cluster. --spec(remove_from_cluster(node()) -> ok | {error, any()}). -remove_from_cluster(Node) when Node =/= node() -> - case {is_node_in_cluster(Node), is_running_db_node(Node)} of - {true, true} -> - ensure_ok(rpc:call(Node, ?MODULE, ensure_stopped, [])), - mnesia_lib:del(extra_db_nodes, Node), - ensure_ok(del_schema_copy(Node)), - ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); - {true, false} -> - mnesia_lib:del(extra_db_nodes, Node), - ensure_ok(del_schema_copy(Node)); - %ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); - {false, _} -> - {error, node_not_in_cluster} - end. - -%% @doc Is node in mnesia cluster. -is_node_in_cluster(Node) -> - lists:member(Node, mnesia:system_info(db_nodes)). - -%% @private -%% @doc Is running db node. -is_running_db_node(Node) -> - lists:member(Node, running_nodes()). - -%% @doc Cluster with node. --spec(connect(node()) -> ok | {error, any()}). -connect(Node) -> - case mnesia:change_config(extra_db_nodes, [Node]) of - {ok, [Node]} -> ok; - {ok, []} -> {error, {failed_to_connect_node, Node}}; - Error -> Error - end. - -%% @doc Running nodes. --spec(running_nodes() -> list(node())). -running_nodes() -> cluster_nodes(running). - -%% @doc Cluster nodes. --spec(cluster_nodes(all | running | stopped) -> [node()]). -cluster_nodes(all) -> - mnesia:system_info(db_nodes); -cluster_nodes(running) -> - mnesia:system_info(running_db_nodes); -cluster_nodes(stopped) -> - cluster_nodes(all) -- cluster_nodes(running). - -%% @private -ensure_ok(ok) -> ok; -ensure_ok({error, {_Node, {already_exists, _Node}}}) -> ok; -ensure_ok({badrpc, Reason}) -> throw({error, {badrpc, Reason}}); -ensure_ok({error, Reason}) -> throw({error, Reason}). - -%% @private -ensure_tab({atomic, ok}) -> ok; -ensure_tab({aborted, {already_exists, _Name}}) -> ok; -ensure_tab({aborted, {already_exists, _Name, _Node}})-> ok; -ensure_tab({aborted, Error}) -> Error. - -%% @doc Wait for mnesia to start, stop or tables ready. --spec(wait_for(start | stop | tables) -> ok | {error, Reason :: atom()}). -wait_for(start) -> - case mnesia:system_info(is_running) of - yes -> ok; - no -> {error, mnesia_unexpectedly_stopped}; - stopping -> {error, mnesia_unexpectedly_stopping}; - starting -> timer:sleep(1000), wait_for(start) - end; - -wait_for(stop) -> - case mnesia:system_info(is_running) of - no -> ok; - yes -> {error, mnesia_unexpectedly_running}; - starting -> {error, mnesia_unexpectedly_starting}; - stopping -> timer:sleep(1000), wait_for(stop) - end; - -wait_for(tables) -> - Tables = mnesia:system_info(local_tables), - case mnesia:wait_for_tables(Tables, 600000) of - ok -> ok; - {error, Reason} -> {error, Reason}; - {timeout, BadTables} -> {error, {timetout, BadTables}} - end. - diff --git a/src/emqttd_node.erl b/src/emqttd_node.erl deleted file mode 100644 index ecf44503f..000000000 --- a/src/emqttd_node.erl +++ /dev/null @@ -1,44 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_node). - --author("Feng Lee "). - --import(lists, [concat/1]). - --export([is_aliving/1, parse_name/1]). - -%% @doc Is node aliving --spec(is_aliving(node()) -> boolean()). -is_aliving(Node) -> - case net_adm:ping(Node) of - pong -> true; - pang -> false - end. - -%% @doc Parse node name --spec(parse_name(string()) -> atom()). -parse_name(Name) when is_list(Name) -> - case string:tokens(Name, "@") of - [_Node, _Host] -> list_to_atom(Name); - _ -> with_host(Name) - end. - -with_host(Name) -> - [_, Host] = string:tokens(atom_to_list(node()), "@"), - list_to_atom(concat([Name, "@", Host])). - diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index f126f631c..73f184a3a 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -156,8 +156,8 @@ load_app(App) -> start_app(App, SuccFun) -> case application:ensure_all_started(App) of {ok, Started} -> - lager:info("started Apps: ~p", [Started]), - lager:info("load plugin ~p successfully", [App]), + lager:info("Started Apps: ~p", [Started]), + lager:info("Load plugin ~p successfully", [App]), SuccFun(App), {ok, Started}; {error, {ErrApp, Reason}} -> @@ -196,11 +196,11 @@ unload_plugin(App, Persistent) -> stop_app(App) -> case application:stop(App) of ok -> - lager:info("stop plugin ~p successfully~n", [App]), ok; + lager:info("Stop plugin ~p successfully~n", [App]), ok; {error, {not_started, App}} -> - lager:error("plugin ~p is not started~n", [App]), ok; + lager:error("Plugin ~p is not started~n", [App]), ok; {error, Reason} -> - lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} + lager:error("Stop plugin ~p error: ~p", [App]), {error, Reason} end. %%-------------------------------------------------------------------- diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 433825253..b5be7c066 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -344,10 +344,10 @@ send(Packet = ?PACKET(Type), {ok, State#proto_state{stats_data = Stats1}}. trace(recv, Packet, ProtoState) -> - ?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState); + ?LOG(debug, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState); trace(send, Packet, ProtoState) -> - ?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState). + ?LOG(debug, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState). inc_stats(_Direct, _Type, Stats = #proto_stats{enable_stats = false}) -> Stats; @@ -382,7 +382,7 @@ shutdown(conflict, #proto_state{client_id = _ClientId}) -> ignore; shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> - ?LOG(info, "Shutdown for ~p", [Error], State), + ?LOG(debug, "Shutdown for ~p", [Error], State), Client = client(State), send_willmsg(Client, WillMsg), emqttd_hooks:run('client.disconnected', [Error], Client), diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 4d7762e5d..9efe8b612 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -53,19 +53,19 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = emqttd_mnesia:create_table(mqtt_topic, [ + ok = ekka_mnesia:create_table(mqtt_topic, [ {ram_copies, [node()]}, {record_name, mqtt_topic}, {attributes, record_info(fields, mqtt_topic)}]), - ok = emqttd_mnesia:create_table(mqtt_route, [ + ok = ekka_mnesia:create_table(mqtt_route, [ {type, bag}, {ram_copies, [node()]}, {record_name, mqtt_route}, {attributes, record_info(fields, mqtt_route)}]); mnesia(copy) -> - ok = emqttd_mnesia:copy_table(mqtt_topic), - ok = emqttd_mnesia:copy_table(mqtt_route, ram_copies). + ok = ekka_mnesia:copy_table(mqtt_topic), + ok = ekka_mnesia:copy_table(mqtt_route, ram_copies). %%-------------------------------------------------------------------- %% Start the Router @@ -216,7 +216,7 @@ stop() -> gen_server:call(?ROUTER, stop). %%-------------------------------------------------------------------- init([]) -> - mnesia:subscribe(system), + ekka:monitor(membership), ets:new(mqtt_local_route, [set, named_table, protected]), {ok, TRef} = timer:send_interval(timer:seconds(1), stats), {ok, #state{stats_timer = TRef}}. @@ -239,27 +239,13 @@ handle_cast({del_local_route, Topic}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({mnesia_system_event, {mnesia_up, Node}}, State) -> - lager:error("Mnesia up: ~p~n", [Node]), - {noreply, State}; - -handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - lager:error("Mnesia down: ~p~n", [Node]), +handle_info({membership, {mnesia, down, Node}}, State) -> clean_routes_(Node), update_stats_(), {noreply, State, hibernate}; -handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) -> - %% 1. Backup and restart - %% 2. Set master nodes - lager:critical("Mnesia inconsistent_database event: ~p, ~p~n", [Context, Node]), - {noreply, State}; - -handle_info({mnesia_system_event, {mnesia_overload, Details}}, State) -> - lager:critical("Mnesia overload: ~p~n", [Details]), - {noreply, State}; - -handle_info({mnesia_system_event, _Event}, State) -> +handle_info({membership, _Event}, State) -> + %% ignore {noreply, State}; handle_info(stats, State) -> @@ -271,7 +257,7 @@ handle_info(_Info, State) -> terminate(_Reason, #state{stats_timer = TRef}) -> timer:cancel(TRef), - mnesia:unsubscribe(system). + ekka:unmonitor(membership). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl index ee79ac4fb..69d18e1e4 100644 --- a/src/emqttd_server.erl +++ b/src/emqttd_server.erl @@ -102,11 +102,11 @@ trace(publish, From, _Msg) when is_atom(From) -> %% Dont' trace '$SYS' publish ignore; trace(publish, {ClientId, Username}, #mqtt_message{topic = Topic, payload = Payload}) -> - lager:info([{client, ClientId}, {topic, Topic}], - "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]); -trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) when is_binary(From); is_list(From) -> - lager:info([{client, From}, {topic, Topic}], - "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). + lager:debug([{client, ClientId}, {topic, Topic}], + "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]); +trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) -> + lager:debug([{client, From}, {topic, Topic}], + "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). %% @doc Unsubscribe -spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()). diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 864905b05..31934759f 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -379,7 +379,7 @@ handle_cast({subscribe, _From, TopicTable, AckFun}, State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) -> - ?LOG(info, "Subscribe ~p", [TopicTable], State), + ?LOG(debug, "Subscribe ~p", [TopicTable], State), {GrantedQos, Subscriptions1} = lists:foldl(fun({Topic, Opts}, {QosAcc, SubMap}) -> NewQos = proplists:get_value(qos, Opts), @@ -407,7 +407,7 @@ handle_cast({unsubscribe, _From, TopicTable}, State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) -> - ?LOG(info, "Unsubscribe ~p", [TopicTable], State), + ?LOG(debug, "Unsubscribe ~p", [TopicTable], State), Subscriptions1 = lists:foldl(fun({Topic, Opts}, SubMap) -> case maps:find(Topic, SubMap) of @@ -482,7 +482,7 @@ handle_cast({resume, ClientId, ClientPid}, await_rel_timer = AwaitTimer, expiry_timer = ExpireTimer}) -> - ?LOG(info, "Resumed by ~p", [ClientPid], State), + ?LOG(debug, "Resumed by ~p", [ClientPid], State), %% Cancel Timers lists:foreach(fun emqttd_misc:cancel_timer/1, @@ -552,7 +552,7 @@ handle_info({timeout, _Timer, check_awaiting_rel}, State) -> hibernate(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined}))); handle_info({timeout, _Timer, expired}, State) -> - ?LOG(info, "Expired, shutdown now.", [], State), + ?LOG(debug, "Expired, shutdown now.", [], State), shutdown(expired, State); handle_info({'EXIT', ClientPid, _Reason}, @@ -563,7 +563,7 @@ handle_info({'EXIT', ClientPid, Reason}, State = #state{clean_sess = false, client_pid = ClientPid, expiry_interval = Interval}) -> - ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State), + ?LOG(debug, "Client ~p EXIT for ~p", [ClientPid, Reason], State), ExpireTimer = start_timer(Interval, expired), State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer}, hibernate(emit_stats(State1)); diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index d53ab8c3c..dacb0b6a1 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -62,14 +62,14 @@ mnesia(boot) -> %% Global Session Table - ok = emqttd_mnesia:create_table(mqtt_session, [ + ok = ekka_mnesia:create_table(mqtt_session, [ {type, set}, {ram_copies, [node()]}, {record_name, mqtt_session}, {attributes, record_info(fields, mqtt_session)}]); mnesia(copy) -> - ok = emqttd_mnesia:copy_table(mqtt_session). + ok = ekka_mnesia:copy_table(mqtt_session). %%-------------------------------------------------------------------- %% API diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index 0f7cb7ef1..2081ea9e5 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -34,7 +34,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {stats_fun, tick_tref}). +-record(state, {stats_fun, ticker}). %% @doc Start a session helper -spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}). @@ -42,9 +42,9 @@ start_link(StatsFun) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []). init([StatsFun]) -> - mnesia:subscribe(system), + ekka:monitor(membership), {ok, TRef} = timer:send_interval(timer:seconds(1), tick), - {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}. + {ok, #state{stats_fun = StatsFun, ticker = TRef}}. handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). @@ -52,8 +52,7 @@ handle_call(Req, _From, State) -> handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - lager:error("!!!Mnesia node down: ~s", [Node]), +handle_info({membership, {mnesia, down, Node}}, State) -> Fun = fun() -> ClientIds = mnesia:select(mqtt_session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'}, @@ -63,7 +62,7 @@ handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> mnesia:async_dirty(Fun), {noreply, State}; -handle_info({mnesia_system_event, {mnesia_up, _Node}}, State) -> +handle_info({membership, _Event}, State) -> {noreply, State}; handle_info(tick, State) -> @@ -72,9 +71,9 @@ handle_info(tick, State) -> handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). -terminate(_Reason, _State = #state{tick_tref = TRef}) -> +terminate(_Reason, _State = #state{ticker = TRef}) -> timer:cancel(TRef), - mnesia:unsubscribe(system). + ekka:unmonitor(membership). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_trie.erl b/src/emqttd_trie.erl index 70b1c003f..5b36e6e04 100644 --- a/src/emqttd_trie.erl +++ b/src/emqttd_trie.erl @@ -41,21 +41,21 @@ -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> %% Trie Table - ok = emqttd_mnesia:create_table(mqtt_trie, [ + ok = ekka_mnesia:create_table(mqtt_trie, [ {ram_copies, [node()]}, {record_name, trie}, {attributes, record_info(fields, trie)}]), %% Trie Node Table - ok = emqttd_mnesia:create_table(mqtt_trie_node, [ + ok = ekka_mnesia:create_table(mqtt_trie_node, [ {ram_copies, [node()]}, {record_name, trie_node}, {attributes, record_info(fields, trie_node)}]); mnesia(copy) -> %% Copy Trie Table - ok = emqttd_mnesia:copy_table(mqtt_trie), + ok = ekka_mnesia:copy_table(mqtt_trie), %% Copy Trie Node Table - ok = emqttd_mnesia:copy_table(mqtt_trie_node). + ok = ekka_mnesia:copy_table(mqtt_trie_node). %%-------------------------------------------------------------------- %% Trie API diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl index dbf0ea08c..c7d0b2119 100644 --- a/src/emqttd_ws.erl +++ b/src/emqttd_ws.erl @@ -39,7 +39,7 @@ handle_request(Req) -> %% MQTT Over WebSocket %%-------------------------------------------------------------------- handle_request('GET', "/mqtt", Req) -> - lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), + lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]), Upgrade = Req:get_header_value("Upgrade"), Proto = check_protocol_header(Req), case {is_websocket(Upgrade), Proto} of diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index f59c8968b..3b1833cb2 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -45,7 +45,6 @@ all() -> {group, stats}, {group, hook}, {group, http}, - {group, cluster}, {group, alarms}, {group, cli}, {group, cleanSession}]. @@ -81,14 +80,6 @@ groups() -> request_publish % websocket_test ]}, - {cluster, [sequence], - [cluster_test, - cluster_join, - cluster_leave, - cluster_remove, - cluster_remove2, - cluster_node_down - ]}, {alarms, [sequence], [set_alarms] }, @@ -469,79 +460,6 @@ websocket_test(_) -> ct:log("Req:~p", [Req]), emqttd_http:handle_request(Req). -%%-------------------------------------------------------------------- -%% cluster group -%%-------------------------------------------------------------------- -cluster_test(_Config) -> - Z = slave(emqttd, cluster_test_z), - wait_running(Z), - true = emqttd:is_running(Z), - Node = node(), - ok = rpc:call(Z, emqttd_cluster, join, [Node]), - [Z, Node] = lists:sort(mnesia:system_info(running_db_nodes)), - ct:log("Z:~p, Node:~p", [Z, Node]), - ok = rpc:call(Z, emqttd_cluster, leave, []), - [Node] = lists:sort(mnesia:system_info(running_db_nodes)), - ok = slave:stop(Z). - -cluster_join(_) -> - Z = slave(emqttd, cluster_join_z), - N = slave(node, cluster_join_n), - wait_running(Z), - true = emqttd:is_running(Z), - Node = node(), - {error, {cannot_join_with_self, Node}} = emqttd_cluster:join(Node), - {error, {node_not_running, N}} = emqttd_cluster:join(N), - ok = emqttd_cluster:join(Z), - slave:stop(Z), - slave:stop(N). - -cluster_leave(_) -> - Z = slave(emqttd, cluster_leave_z), - wait_running(Z), - {error, node_not_in_cluster} = emqttd_cluster:leave(), - ok = emqttd_cluster:join(Z), - Node = node(), - [Z, Node] = emqttd_mnesia:running_nodes(), - ok = emqttd_cluster:leave(), - [Node] = emqttd_mnesia:running_nodes(), - slave:stop(Z). - -cluster_remove(_) -> - Z = slave(emqttd, cluster_remove_z), - wait_running(Z), - Node = node(), - {error, {cannot_remove_self, Node}} = emqttd_cluster:remove(Node), - ok = emqttd_cluster:join(Z), - [Z, Node] = emqttd_mnesia:running_nodes(), - ok = emqttd_cluster:remove(Z), - [Node] = emqttd_mnesia:running_nodes(), - slave:stop(Z). - -cluster_remove2(_) -> - Z = slave(emqttd, cluster_remove2_z), - wait_running(Z), - ok = emqttd_cluster:join(Z), - Node = node(), - [Z, Node] = emqttd_mnesia:running_nodes(), - ok = emqttd_cluster:remove(Z), - ok = rpc:call(Z, emqttd_mnesia, ensure_stopped, []), - [Node] = emqttd_mnesia:running_nodes(), - slave:stop(Z). - -cluster_node_down(_) -> - Z = slave(emqttd, cluster_node_down), - timer:sleep(1000), - wait_running(Z), - ok = emqttd_cluster:join(Z), - ok = rpc:call(Z, emqttd_router, add_route, [<<"a/b/c">>]), - ok = rpc:call(Z, emqttd_router, add_route, [<<"#">>]), - Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)), - ct:log("Routes: ~p~n", [Routes]), - [<<"#">>, <<"a/b/c">>] = [Topic || #mqtt_route{topic = Topic} <- Routes], - slave:stop(Z), - timer:sleep(1000), - [] = lists:sort(emqttd_router:match(<<"a/b/c">>)). set_alarms(_) -> AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, @@ -551,8 +469,6 @@ set_alarms(_) -> emqttd_alarm:clear_alarm(<<"1">>), [] = emqttd_alarm:get_alarms(). - - %%-------------------------------------------------------------------- %% Cli group %%-------------------------------------------------------------------- @@ -705,34 +621,6 @@ cleanSession_validate1(_) -> emqttc:disconnect(Pub), emqttc:disconnect(C11). - -ensure_ok(ok) -> ok; -ensure_ok({error, {already_started, _}}) -> ok. - -host() -> [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. - -wait_running(Node) -> - wait_running(Node, 30000). - -wait_running(Node, Timeout) when Timeout < 0 -> - throw({wait_timeout, Node}); - -wait_running(Node, Timeout) -> - case rpc:call(Node, emqttd, is_running, [Node]) of - true -> ok; - false -> timer:sleep(100), - wait_running(Node, Timeout - 100) - end. - -slave(emqttd, Node) -> - {ok, Emq} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), - rpc:call(Emq, application, ensure_all_started, [emqttd]), - Emq; - -slave(node, Node) -> - {ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), - N. - emqttd_config(DataDir) -> Schema = cuttlefish_schema:files([filename:join([DataDir, "emqttd.schema"])]), Conf = conf_parse:file(filename:join([DataDir, "emqttd.conf"])),