From 767c4ccc6e3bdb58b458702b89c24353f4b4fc9e Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 11 Feb 2016 15:54:35 +0800 Subject: [PATCH] fix issue #449 - Improve the CLI of cluster --- src/emqttd_cli.erl | 101 +++++++++++++++++++++++---------------------- 1 file changed, 52 insertions(+), 49 deletions(-) diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 284584704..645556044 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd cli -%% @author Feng Lee -module(emqttd_cli). -include("emqttd.hrl"). @@ -106,56 +104,47 @@ broker(_) -> {"broker metrics", "query broker metrics"}]). %%-------------------------------------------------------------------- -%% @doc Cluster with other node -cluster([]) -> - Nodes = emqttd_broker:running_nodes(), - ?PRINT("cluster nodes: ~p~n", [Nodes]); - -cluster(usage) -> - ?PRINT_CMD("cluster []", "cluster with node, query cluster info"); - -cluster([SNode]) -> - Node = emqttd_dist:parse_node(SNode), - case lists:member(Node, emqttd_broker:running_nodes()) of - true -> - ?PRINT("~s is already clustered~n", [Node]); - false -> - cluster(Node, fun() -> - emqttd_plugins:unload(), - stop_apps(), - emqttd_mnesia:cluster(Node), - start_apps() - end) +%% @doc Cluster with other nodes +cluster(["join", SNode]) -> + case emqttd_cluster:join(emqttd_node:parse_name(SNode)) of + ok -> + ?PRINT_MSG("Join the cluster successfully.~n"), + cluster(["status"]); + {error, Error} -> + ?PRINT("Failed to join the cluster: ~p~n", [Error]) end; +cluster(["leave"]) -> + case emqttd_cluster:leave() of + ok -> + ?PRINT_MSG("Leave the cluster successfully.~n"), + cluster(["status"]); + {error, Error} -> + ?PRINT("Failed to leave the cluster: ~p~n", [Error]) + end; + +cluster(["remove", SNode]) -> + case emqttd_cluster:remove(emqttd_node:parse_name(SNode)) of + ok -> + ?PRINT_MSG("Remove the node from cluster successfully.~n"), + cluster(["status"]); + {error, Error} -> + ?PRINT("Failed to remove the node from cluster: ~p~n", [Error]) + end; + +cluster(["status"]) -> + ?PRINT("Cluster status: ~p~n", [emqttd_cluster:status()]); + cluster(_) -> - cluster(usage). - -cluster(Node, DoCluster) -> - cluster(net_adm:ping(Node), Node, DoCluster). - -cluster(pong, Node, DoCluster) -> - case emqttd:is_running(Node) of - true -> - DoCluster(), - ?PRINT("cluster with ~s successfully.~n", [Node]); - false -> - ?PRINT("emqttd is not running on ~s~n", [Node]) - end; - -cluster(pang, Node, _DoCluster) -> - ?PRINT("Cannot connect to ~s~n", [Node]). - -stop_apps() -> - [application:stop(App) || App <- [emqttd, esockd, gproc]]. - -start_apps() -> - [application:start(App) || App <- [gproc, esockd, emqttd]]. + ?USAGE([{"cluster join ", "Join the cluster"}, + {"cluster leave", "Leave the cluster"}, + {"cluster remove ","Remove the node from cluster"}, + {"cluster status", "Cluster status"}]). %%-------------------------------------------------------------------- %% @doc Query clients clients(["list"]) -> - emqttd_mnesia:dump(ets, mqtt_client, fun print/1); + dump(ets, mqtt_client, fun print/1); clients(["show", ClientId]) -> if_client(ClientId, fun print/1); @@ -180,10 +169,10 @@ sessions(["list"]) -> [sessions(["list", Type]) || Type <- ["persistent", "transient"]]; sessions(["list", "persistent"]) -> - emqttd_mnesia:dump(ets, mqtt_persistent_session, fun print/1); + dump(ets, mqtt_persistent_session, fun print/1); sessions(["list", "transient"]) -> - emqttd_mnesia:dump(ets, mqtt_transient_session, fun print/1); + dump(ets, mqtt_transient_session, fun print/1); sessions(["show", ClientId]) -> MP = {{bin(ClientId), '_'}, '_'}, @@ -463,7 +452,7 @@ print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n", [ClientId, CleanSess, Username, emqttd_net:format(Peername), - emqttd_util:now_to_secs(ConnectedAt)]); + emqttd_time:now_to_secs(ConnectedAt)]); print(#mqtt_topic{topic = Topic, node = Node}) -> ?PRINT("~s on ~s~n", [Topic, Node]); @@ -493,7 +482,7 @@ print(subscription, ClientId, Subscriptions) -> ?PRINT("~s: ~p~n", [ClientId, TopicTable]). format(created_at, Val) -> - emqttd_util:now_to_secs(Val); + emqttd_time:now_to_secs(Val); format(subscriptions, List) -> string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ","); @@ -503,3 +492,17 @@ format(_, Val) -> bin(S) -> iolist_to_binary(S). +%%TODO: ... +dump(ets, Table, Fun) -> + dump(ets, Table, ets:first(Table), Fun). + +dump(ets, _Table, '$end_of_table', _Fun) -> + ok; + +dump(ets, Table, Key, Fun) -> + case ets:lookup(Table, Key) of + [Record] -> Fun(Record); + [] -> ignore + end, + dump(ets, Table, ets:next(Table, Key), Fun). +