diff --git a/Makefile b/Makefile index d42673d93..1bef9acb6 100644 --- a/Makefile +++ b/Makefile @@ -4,12 +4,12 @@ PROJECT_VERSION = 2.0 DEPS = gproc lager gen_logger gen_conf esockd mochiweb -dep_gproc = git https://github.com/uwiger/gproc.git -dep_lager = git https://github.com/basho/lager.git -dep_gen_conf = git https://github.com/emqtt/gen_conf.git -dep_gen_logger = git https://github.com/emqtt/gen_logger.git -dep_esockd = git https://github.com/emqtt/esockd.git emq20 -dep_mochiweb = git https://github.com/emqtt/mochiweb.git +dep_gproc = git https://github.com/uwiger/gproc +dep_lager = git https://github.com/basho/lager +dep_gen_conf = git https://github.com/emqtt/gen_conf +dep_gen_logger = git https://github.com/emqtt/gen_logger +dep_esockd = git https://github.com/emqtt/esockd emq20 +dep_mochiweb = git https://github.com/emqtt/mochiweb ERLC_OPTS += +'{parse_transform, lager_transform}' diff --git a/docs/source/changes.rst b/docs/source/changes.rst index 218d9e0a7..3d659666a 100644 --- a/docs/source/changes.rst +++ b/docs/source/changes.rst @@ -5,6 +5,27 @@ Changes ======= +.. _release_2.0_rc.1: + +---------------- +Version 2.0-rc.1 +---------------- + +*Release Date: 2016-10-03* + +1. `mqtt/superuser` POST called two times in `emqtt_auth_http` (#696) + +2. Close MQTT TCP connection if authentication failed (#707) + +3. Improve the plugin management. Developer don't need to add plugin's config to rel/sys.config + +4. Add `BUILD_DEPS` in the plugin's Makefile:: + + BUILD_DEPS = emqttd + dep_emqttd = git https://github.com/emqtt/emqttd emq20 + +5. Improve the design of Redis ACL. + .. _release_2.0_beta.3: ------------------ diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index aecc1966d..2b90427cb 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -415,26 +415,25 @@ etc/plugins/emqttd_auth_redis.conf: %% Variables: %u = username, %c = clientid - %% HMGET mqtt_user:%u is_superuser - {supercmd, ["HGET", "mqtt_user:%u", "is_superuser"]}. - %% HMGET mqtt_user:%u password - {authcmd, ["HGET", "mqtt_user:%u", "password"]}. + {authcmd, "HGET mqtt_user:%u password"}. %% Password hash algorithm: plain, md5, sha, sha256, pbkdf2? {password_hash, sha256}. - %% SMEMBERS mqtt_acl:%u - {aclcmd, ["SMEMBERS", "mqtt_acl:%u"]}. + %% HMGET mqtt_user:%u is_superuser + {supercmd, "HGET mqtt_user:%u is_superuser"}. + + %% HGETALL mqtt_acl:%u + {aclcmd, "HGETALL mqtt_acl:%u"}. %% If no rules matched, return... {acl_nomatch, deny}. %% Load Subscriptions form Redis when client connected. - {subcmd, ["HGETALL", "mqtt_subs:%u"]}. + {subcmd, "HGETALL mqtt_sub:%u"}. - -Redis User HASH +Redis User Hash --------------- Set a 'user' hash with 'password' field, for example:: @@ -442,16 +441,18 @@ Set a 'user' hash with 'password' field, for example:: HSET mqtt_user: is_superuser 1 HSET mqtt_user: password "passwd" -Redis ACL Rule SET ------------------- +Redis ACL Rule Hash +------------------- -The plugin uses a redis SET to store ACL rules:: +The plugin uses a redis Hash to store ACL rules:: - SADD mqtt_acl: "publish topic1" - SADD mqtt_acl: "subscribe topic2" - SADD mqtt_acl: "pubsub topic3" + HSET mqtt_acl: topic1 1 + HSET mqtt_acl: topic2 2 + HSET mqtt_acl: topic3 3 -Redis Subscription HASH +.. NOTE:: 1: subscribe, 2: publish, 3: pubsub + +Redis Subscription Hash ----------------------- The plugin can store static subscriptions in a redis Hash:: @@ -495,14 +496,14 @@ etc/plugins/emqttd_plugin_mongo.conf: %% Variables: %u = username, %c = clientid %% Superuser Query - {superquery, pool, [ + {superquery, [ {collection, "mqtt_user"}, {super_field, "is_superuser"}, {selector, {"username", "%u"}} ]}. %% Authentication Query - {authquery, pool, [ + {authquery, [ {collection, "mqtt_user"}, {password_field, "password"}, %% Hash Algorithm: plain, md5, sha, sha256, pbkdf2? @@ -511,7 +512,7 @@ etc/plugins/emqttd_plugin_mongo.conf: ]}. %% ACL Query: "%u" = username, "%c" = clientid - {aclquery, pool, [ + {aclquery, [ {collection, "mqtt_acl"}, {selector, {"username", "%u"}} ]}. diff --git a/rebar.config b/rebar.config index aedddfbe9..74a744b0d 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{gproc,".*",{git,"https://github.com/uwiger/gproc.git",""}},{lager,".*",{git,"https://github.com/basho/lager.git",""}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger.git",""}},{gen_conf,".*",{git,"https://github.com/emqtt/gen_conf.git",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd.git","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb.git",""}} +{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager",""}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger",""}},{gen_conf,".*",{git,"https://github.com/emqtt/gen_conf",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}} ]}. {erl_opts, [{parse_transform,lager_transform}]}. diff --git a/src/emqttd.erl b/src/emqttd.erl index 9c5d652a9..07464ee56 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -33,7 +33,7 @@ is_subscribed/2, subscriber_down/1]). %% Hooks API --export([hook/4, hook/3, unhook/2, run_hooks/3]). +-export([hook/4, hook/3, unhook/2, run_hooks/2, run_hooks/3]). %% Debug API -export([dump/0]). @@ -151,6 +151,10 @@ hook(Hook, Function, InitArgs, Priority) -> unhook(Hook, Function) -> emqttd_hook:delete(Hook, Function). +-spec(run_hooks(atom(), list(any())) -> ok | stop). +run_hooks(Hook, Args) -> + emqttd_hook:run(Hook, Args). + -spec(run_hooks(atom(), list(any()), any()) -> {ok | stop, any()}). run_hooks(Hook, Args, Acc) -> emqttd_hook:run(Hook, Args, Acc). diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index c2c8fb9b3..fb36892c7 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -60,6 +60,7 @@ auth(_Client, _Password, []) -> auth(Client, Password, [{Mod, State, _Seq} | Mods]) -> case catch Mod:check(Client, Password, State) of ok -> ok; + {ok, IsSuper} -> {ok, IsSuper}; ignore -> auth(Client, Password, Mods); {error, Reason} -> {error, Reason}; {'EXIT', Error} -> {error, Error} diff --git a/src/emqttd_acl_mod.erl b/src/emqttd_acl_mod.erl index dfec11157..2eb09a0fe 100644 --- a/src/emqttd_acl_mod.erl +++ b/src/emqttd_acl_mod.erl @@ -24,16 +24,16 @@ -ifdef(use_specs). --callback init(AclOpts :: list()) -> {ok, State :: any()}. +-callback(init(AclOpts :: list()) -> {ok, State :: any()}). --callback check_acl({Client, PubSub, Topic}, State :: any()) -> allow | deny | ignore when - Client :: mqtt_client(), - PubSub :: pubsub(), - Topic :: binary(). +-callback(check_acl({Client, PubSub, Topic}, State :: any()) -> allow | deny | ignore when + Client :: mqtt_client(), + PubSub :: pubsub(), + Topic :: binary()). --callback reload_acl(State :: any()) -> ok | {error, any()}. +-callback(reload_acl(State :: any()) -> ok | {error, any()}). --callback description() -> string(). +-callback(description() -> string()). -else. diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 81f1d329c..2299a7ec0 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -48,6 +48,7 @@ start(_StartType, _StartArgs) -> start_servers(Sup), emqttd_cli:load(), load_all_mods(), + emqttd_plugins:init(), emqttd_plugins:load(), start_listeners(), register(emqttd, self()), diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index 09438703d..845d5a0fb 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -14,14 +14,13 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Authentication Behaviour. -module(emqttd_auth_mod). -include("emqttd.hrl"). -export([passwd_hash/2]). --type hash_type() :: plain | md5 | sha | sha256. +-type(hash_type() :: plain | md5 | sha | sha256). %%-------------------------------------------------------------------- %% Authentication behavihour @@ -29,14 +28,14 @@ -ifdef(use_specs). --callback init(AuthOpts :: list()) -> {ok, State :: any()}. +-callback(init(AuthOpts :: list()) -> {ok, State :: any()}). --callback check(Client, Password, State) -> ok | ignore | {error, string()} when - Client :: mqtt_client(), - Password :: binary(), - State :: any(). +-callback(check(Client, Password, State) -> ok | | {ok, boolean()} | ignore | {error, string()} when + Client :: mqtt_client(), + Password :: binary(), + State :: any()). --callback description() -> string(). +-callback(description() -> string()). -else. diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index 6a0b1d17f..545bce7c3 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -160,5 +160,5 @@ md5_hash(SaltBin, Password) -> erlang:md5(<>). salt() -> - emqttd_time:seed(), Salt = random:uniform(16#ffffffff), <>. + emqttd_time:seed(), Salt = rand:uniform(16#ffffffff), <>. diff --git a/src/emqttd_hook.erl b/src/emqttd_hook.erl index f183c98d3..675698688 100644 --- a/src/emqttd_hook.erl +++ b/src/emqttd_hook.erl @@ -24,7 +24,7 @@ -export([start_link/0]). %% Hooks API --export([add/3, add/4, delete/2, run/3, lookup/1]). +-export([add/3, add/4, delete/2, run/2, run/3, lookup/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -63,10 +63,26 @@ add(HookPoint, Function, InitArgs, Priority) -> delete(HookPoint, Function) -> gen_server:call(?MODULE, {delete, HookPoint, Function}). --spec(run(atom(), list(any()), any()) -> any()). +%% @doc Run hooks without Acc. +-spec(run(atom(), list(Arg :: any())) -> ok | stop). +run(HookPoint, Args) -> + run_(lookup(HookPoint), Args). + +-spec(run(atom(), list(Arg :: any()), any()) -> any()). run(HookPoint, Args, Acc) -> run_(lookup(HookPoint), Args, Acc). +%% @private +run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args) -> + case apply(Fun, lists:append([Args, InitArgs])) of + ok -> run_(Callbacks, Args); + stop -> stop; + _Any -> run_(Callbacks, Args) + end; + +run_([], _Args) -> + ok. + %% @private run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args, Acc) -> case apply(Fun, lists:append([Args, [Acc], InitArgs])) of diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index 360e828f0..384a595de 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -18,12 +18,27 @@ -include("emqttd.hrl"). +-export([init/0]). + -export([load/0, unload/0]). -export([load/1, unload/1]). -export([list/0]). +init() -> + case emqttd:conf(plugins_etc_dir) of + {ok, PluginsEtc} -> + CfgFiles = filelib:wildcard("*.conf", PluginsEtc), + lists:foreach(fun(CfgFile) -> + App = app_name(CfgFile), + application:set_env(App, conf, filename:join(PluginsEtc, CfgFile)), + gen_conf:init(App) + end, CfgFiles); + undefined -> + ok + end. + %% @doc Load all plugins when the broker started. -spec(load() -> list() | {error, any()}). load() -> @@ -90,11 +105,11 @@ list() -> end. plugin(CfgFile) -> - [AppName | _] = string:tokens(CfgFile, "."), - {ok, Attrs} = application:get_all_key(list_to_atom(AppName)), + AppName = app_name(CfgFile), + {ok, Attrs} = application:get_all_key(AppName), Ver = proplists:get_value(vsn, Attrs, "0"), Descr = proplists:get_value(description, Attrs, ""), - #mqtt_plugin{name = list_to_atom(AppName), version = Ver, descr = Descr}. + #mqtt_plugin{name = AppName, version = Ver, descr = Descr}. %% @doc Load a Plugin -spec(load(atom()) -> ok | {error, any()}). @@ -185,6 +200,9 @@ stop_app(App) -> %% Internal functions %%-------------------------------------------------------------------- +app_name(File) -> + [AppName | _] = string:tokens(File, "."), list_to_atom(AppName). + names(plugin) -> names(list()); @@ -244,4 +262,3 @@ write_loaded(AppNames) -> lager:error("Open File ~p Error: ~p", [File, Error]), {error, Error} end. - diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 81593a7e5..f3579d46b 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -34,7 +34,7 @@ %% Protocol State -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid, clean_sess, - proto_ver, proto_name, username, + proto_ver, proto_name, username, is_superuser = false, will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN, session, ws_initial_headers, %% Headers from first HTTP request for websocket client connected_at}). @@ -159,8 +159,8 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> {ReturnCode1, SessPresent, State3} = case validate_connect(Var, State1) of ?CONNACK_ACCEPT -> - case emqttd_access_control:auth(client(State1), Password) of - ok -> + case authenticate(client(State1), Password) of + {ok, IsSuperuser} -> %% Generate clientId if null State2 = maybe_set_clientid(State1), @@ -172,7 +172,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> %% Start keepalive start_keepalive(KeepAlive), %% ACCEPT - {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session}}; + {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}}; {error, Error} -> exit({shutdown, Error}) end; @@ -186,14 +186,14 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> %% Run hooks emqttd:run_hooks('client.connected', [ReturnCode1], client(State3)), %% Send connack - send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3); + send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3), + %% stop if authentication failure + stop_if_auth_failure(ReturnCode1, State3); -process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) -> - case check_acl(publish, Topic, client(State)) of - allow -> - publish(Packet, State); - deny -> - ?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State) +process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State = #proto_state{is_superuser = IsSuper}) -> + case IsSuper orelse allow == check_acl(publish, Topic, client(State)) of + true -> publish(Packet, State); + false -> ?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State) end, {ok, State}; @@ -216,11 +216,14 @@ process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Sessi process(?SUBSCRIBE_PACKET(PacketId, []), State) -> send(?SUBACK_PACKET(PacketId, []), State); -process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{ - client_id = ClientId, username = Username, session = Session}) -> - Client = client(State), - TopicTable = parse_topic_table(RawTopicTable), - AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Opts} <- TopicTable], +%% TODO: refactor later... +process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{session = Session, + client_id = ClientId, username = Username, is_superuser = IsSuperuser}) -> + Client = client(State), TopicTable = parse_topic_table(RawTopicTable), + AllowDenies = if + IsSuperuser -> []; + true -> [check_acl(subscribe, Topic, Client) || {Topic, _Opts} <- TopicTable] + end, case lists:member(deny, AllowDenies) of true -> ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State), @@ -297,6 +300,12 @@ trace(send, Packet, ProtoState) -> redeliver({?PUBREL, PacketId}, State) -> send(?PUBREL_PACKET(PacketId), State). +stop_if_auth_failure(RC, State) when RC == ?CONNACK_CREDENTIALS; RC == ?CONNACK_AUTH -> + {stop, {shutdown, auth_failure}, State}; + +stop_if_auth_failure(_RC, State) -> + {ok, State}. + shutdown(_Error, #proto_state{client_id = undefined}) -> ignore; @@ -430,6 +439,13 @@ parse_topic_table(TopicTable) -> parse_topics(Topics) -> [emqttd_topic:parse(Topic) || Topic <- Topics]. +authenticate(Client, Password) -> + case emqttd_access_control:auth(Client, Password) of + ok -> {ok, false}; + {ok, IsSuper} -> {ok, IsSuper}; + {error, Error} -> {error, Error} + end. + %% PUBLISH ACL is cached in process dictionary. check_acl(publish, Topic, Client) -> IfCache = emqttd:conf(cache_acl, true), diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 85701e249..b6276ab85 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -235,6 +235,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> collect_interval = get_value(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)), + emqttd:run_hooks('session.created', [ClientId, Username]), %% Start statistics {ok, start_collector(Session), hibernate}. @@ -519,7 +520,8 @@ handle_info(expired, Session) -> handle_info(Info, Session) -> ?UNEXPECTED_INFO(Info, Session). -terminate(_Reason, #session{client_id = ClientId}) -> +terminate(Reason, #session{client_id = ClientId, username = Username}) -> + emqttd:run_hooks('session.terminated', [ClientId, Username, Reason]), emqttd:subscriber_down(ClientId), emqttd_sm:unreg_session(ClientId). diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 5444a4f06..8a38cad90 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -37,7 +37,9 @@ all() -> {group, stats}, {group, hook}, {group, http}, + {group, cluster}, %%{group, backend}, + {group, alarms}, {group, cli}]. groups() -> @@ -69,7 +71,19 @@ groups() -> {http, [sequence], [request_status, request_publish + % websocket_test ]}, + {cluster, [sequence], + [cluster_test, + cluster_join, + cluster_leave, + cluster_remove, + cluster_remove2, + cluster_node_down + ]}, + {alarms, [sequence], + [set_alarms] + }, {cli, [sequence], [ctl_register_cmd, cli_status, @@ -323,11 +337,16 @@ add_delete_hook(_) -> [] = emqttd_hook:lookup(emqttd_hook). run_hooks(_) -> - emqttd:hook(test_hook, fun ?MODULE:hook_fun3/4, [init]), - emqttd:hook(test_hook, fun ?MODULE:hook_fun4/4, [init]), - emqttd:hook(test_hook, fun ?MODULE:hook_fun5/4, [init]), - {stop, [r3, r2]} = emqttd:run_hooks(test_hook, [arg1, arg2], []), - {ok, []} = emqttd:run_hooks(unknown_hook, [], []). + emqttd:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]), + emqttd:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]), + emqttd:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]), + {stop, [r3, r2]} = emqttd:run_hooks(foldl_hook, [arg1, arg2], []), + {ok, []} = emqttd:run_hooks(unknown_hook, [], []), + + emqttd:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), + emqttd:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]), + emqttd:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]), + stop = emqttd:run_hooks(foreach_hook, [arg]). hook_fun1([]) -> ok. hook_fun2([]) -> {ok, []}. @@ -336,6 +355,10 @@ hook_fun3(arg1, arg2, _Acc, init) -> ok. hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. +hook_fun6(arg, initArg) -> ok. +hook_fun7(arg, initArg) -> any. +hook_fun8(arg, initArg) -> stop. + %%-------------------------------------------------------------------- %% HTTP Request Test %%-------------------------------------------------------------------- @@ -378,6 +401,102 @@ auth_header_(User, Pass) -> Encoded = base64:encode_to_string(lists:append([User,":",Pass])), {"Authorization","Basic " ++ Encoded}. +websocket_test(_) -> +% Conn = esockd_connection:new(esockd_transport, nil, []), +% Req = mochiweb_request:new(Conn, 'GET', "/mqtt", {1, 1}, +% mochiweb_headers:make([{"Sec-WebSocket-Protocol","mqtt"}, +% {"Upgrade","websocket"} +% ])), + Req = "GET " ++ "/mqtt" ++" HTTP/1.1\r\nUpgrade: WebSocket\r\nConnection: Upgrade\r\n" ++ + "Host: " ++ "127.0.0.1"++ "\r\n" ++ + "Origin: http://" ++ "127.0.0.1" ++ "/\r\n\r\n", + + ct:log("Req:~p", [Req]), + emqttd_http:handle_request(Req). +%%-------------------------------------------------------------------- +%% cluster group +%%-------------------------------------------------------------------- +cluster_test(_Config) -> + Z = slave(emqttd, cluster_test_z), + wait_running(Z), + true = emqttd:is_running(Z), + Node = node(), + ok = rpc:call(Z, emqttd_cluster, join, [Node]), + [Z, Node] = lists:sort(mnesia:system_info(running_db_nodes)), + ct:log("Z:~p, Node:~p", [Z, Node]), + ok = rpc:call(Z, emqttd_cluster, leave, []), + [Node] = lists:sort(mnesia:system_info(running_db_nodes)), + ok = slave:stop(Z). + +cluster_join(_) -> + Z = slave(emqttd, cluster_join_z), + N = slave(node, cluster_join_n), + wait_running(Z), + true = emqttd:is_running(Z), + Node = node(), + {error, {cannot_join_with_self, Node}} = emqttd_cluster:join(Node), + {error, {node_not_running, N}} = emqttd_cluster:join(N), + ok = emqttd_cluster:join(Z), + slave:stop(Z), + slave:stop(N). + +cluster_leave(_) -> + Z = slave(emqttd, cluster_leave_z), + wait_running(Z), + {error, node_not_in_cluster} = emqttd_cluster:leave(), + ok = emqttd_cluster:join(Z), + Node = node(), + [Z, Node] = emqttd_mnesia:running_nodes(), + ok = emqttd_cluster:leave(), + [Node] = emqttd_mnesia:running_nodes(), + slave:stop(Z). + +cluster_remove(_) -> + Z = slave(emqttd, cluster_remove_z), + wait_running(Z), + Node = node(), + {error, {cannot_remove_self, Node}} = emqttd_cluster:remove(Node), + ok = emqttd_cluster:join(Z), + [Z, Node] = emqttd_mnesia:running_nodes(), + ok = emqttd_cluster:remove(Z), + [Node] = emqttd_mnesia:running_nodes(), + slave:stop(Z). + +cluster_remove2(_) -> + Z = slave(emqttd, cluster_remove2_z), + wait_running(Z), + ok = emqttd_cluster:join(Z), + Node = node(), + [Z, Node] = emqttd_mnesia:running_nodes(), + ok = rpc:call(Z, emqttd_mnesia, ensure_stopped, []), + ok = emqttd_cluster:remove(Z), + [Node] = emqttd_mnesia:running_nodes(), + slave:stop(Z). + +cluster_node_down(_) -> + Z = slave(emqttd, cluster_node_down), + timer:sleep(1000), + wait_running(Z), + ok = emqttd_cluster:join(Z), + ok = rpc:call(Z, emqttd_router, add_route, [<<"a/b/c">>]), + ok = rpc:call(Z, emqttd_router, add_route, [<<"#">>]), + Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)), + ct:log("Routes: ~p~n", [Routes]), + [<<"#">>, <<"a/b/c">>] = [Topic || #mqtt_route{topic = Topic} <- Routes], + slave:stop(Z), + timer:sleep(1000), + Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)). + +set_alarms(_) -> + AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, + emqttd_alarm:set_alarm(AlarmTest), + Alarms = emqttd_alarm:get_alarms(), + ?assertEqual(1, length(Alarms)), + emqttd_alarm:clear_alarm(<<"1">>), + [] = emqttd_alarm:get_alarms(). + + + %%-------------------------------------------------------------------- %% Cli group %%-------------------------------------------------------------------- @@ -451,3 +570,32 @@ cli_vm(_) -> emqttd_cli:vm([]), emqttd_cli:vm(["ports"]). + +ensure_ok(ok) -> ok; +ensure_ok({error, {already_started, _}}) -> ok. + +host() -> [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. + +wait_running(Node) -> + wait_running(Node, 30000). + +wait_running(Node, Timeout) when Timeout < 0 -> + throw({wait_timeout, Node}); + +wait_running(Node, Timeout) -> + case rpc:call(Node, emqttd, is_running, [Node]) of + true -> ok; + false -> timer:sleep(100), + wait_running(Node, Timeout - 100) + end. + +slave(emqttd, Node) -> + {ok, Emq} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), + rpc:call(Emq, application, ensure_all_started, [emqttd]), + Emq; + +slave(node, Node) -> + {ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), + N. + +