From c5bb1e031e8b99d519bee871e83badec2bcf20f5 Mon Sep 17 00:00:00 2001 From: huangdan Date: Wed, 21 Sep 2016 11:27:39 +0800 Subject: [PATCH] cluster ct --- test/emqttd_SUITE.erl | 113 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 5444a4f06..41ad9f60e 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -37,6 +37,7 @@ all() -> {group, stats}, {group, hook}, {group, http}, + {group, cluster}, %%{group, backend}, {group, cli}]. @@ -70,6 +71,14 @@ groups() -> [request_status, request_publish ]}, + {cluster, [sequence], + [cluster_test, + cluster_join, + cluster_leave, + cluster_remove, + cluster_remove2, + cluster_node_down + ]}, {cli, [sequence], [ctl_register_cmd, cli_status, @@ -378,6 +387,81 @@ auth_header_(User, Pass) -> Encoded = base64:encode_to_string(lists:append([User,":",Pass])), {"Authorization","Basic " ++ Encoded}. +%%-------------------------------------------------------------------- +%% cluster group +%%-------------------------------------------------------------------- +cluster_test(_Config) -> + Z = slave(emqttd, cluster_test_z), + wait_running(Z), + true = emqttd:is_running(Z), + Node = node(), + ok = rpc:call(Z, emqttd_cluster, join, [Node]), + [Z, Node] = lists:sort(mnesia:system_info(running_db_nodes)), + ct:log("Z:~p, Node:~p", [Z, Node]), + ok = rpc:call(Z, emqttd_cluster, leave, []), + [Node] = lists:sort(mnesia:system_info(running_db_nodes)), + ok = slave:stop(Z). + +cluster_join(_) -> + Z = slave(emqttd, cluster_join_z), + N = slave(node, cluster_join_n), + wait_running(Z), + true = emqttd:is_running(Z), + Node = node(), + {error, {cannot_join_with_self, Node}} = emqttd_cluster:join(Node), + {error, {node_not_running, N}} = emqttd_cluster:join(N), + ok = emqttd_cluster:join(Z), + slave:stop(Z), + slave:stop(N). + +cluster_leave(_) -> + Z = slave(emqttd, cluster_leave_z), + wait_running(Z), + {error, node_not_in_cluster} = emqttd_cluster:leave(), + ok = emqttd_cluster:join(Z), + Node = node(), + [Z, Node] = emqttd_mnesia:running_nodes(), + ok = emqttd_cluster:leave(), + [Node] = emqttd_mnesia:running_nodes(), + slave:stop(Z). + +cluster_remove(_) -> + Z = slave(emqttd, cluster_remove_z), + wait_running(Z), + Node = node(), + {error, {cannot_remove_self, Node}} = emqttd_cluster:remove(Node), + ok = emqttd_cluster:join(Z), + [Z, Node] = emqttd_mnesia:running_nodes(), + ok = emqttd_cluster:remove(Z), + [Node] = emqttd_mnesia:running_nodes(), + slave:stop(Z). + +cluster_remove2(_) -> + Z = slave(emqttd, cluster_remove2_z), + wait_running(Z), + ok = emqttd_cluster:join(Z), + Node = node(), + [Z, Node] = emqttd_mnesia:running_nodes(), + ok = rpc:call(Z, emqttd_mnesia, ensure_stopped, []), + ok = emqttd_cluster:remove(Z), + [Node] = emqttd_mnesia:running_nodes(), + slave:stop(Z). + +cluster_node_down(_) -> + Z = slave(emqttd, cluster_node_down), + timer:sleep(1000), + wait_running(Z), + ok = emqttd_cluster:join(Z), + ok = rpc:call(Z, emqttd_router, add_route, [<<"a/b/c">>]), + ok = rpc:call(Z, emqttd_router, add_route, [<<"#">>]), + Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)), + ct:log("Routes: ~p~n", [Routes]), + [<<"#">>, <<"a/b/c">>] = [Topic || #mqtt_route{topic = Topic} <- Routes], + slave:stop(Z), + timer:sleep(1000), + Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)). + + %%-------------------------------------------------------------------- %% Cli group %%-------------------------------------------------------------------- @@ -451,3 +535,32 @@ cli_vm(_) -> emqttd_cli:vm([]), emqttd_cli:vm(["ports"]). + +ensure_ok(ok) -> ok; +ensure_ok({error, {already_started, _}}) -> ok. + +host() -> [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. + +wait_running(Node) -> + wait_running(Node, 30000). + +wait_running(Node, Timeout) when Timeout < 0 -> + throw({wait_timeout, Node}); + +wait_running(Node, Timeout) -> + case rpc:call(Node, emqttd, is_running, [Node]) of + true -> ok; + false -> timer:sleep(100), + wait_running(Node, Timeout - 100) + end. + +slave(emqttd, Node) -> + {ok, Emq} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), + rpc:call(Emq, application, ensure_all_started, [emqttd]), + Emq; + +slave(node, Node) -> + {ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), + N. + +