diff --git a/apps/emqttd/src/emqttd_bridge_sup.erl b/apps/emqttd/src/emqttd_bridge_sup.erl index 542cabd42..3857461c9 100644 --- a/apps/emqttd/src/emqttd_bridge_sup.erl +++ b/apps/emqttd/src/emqttd_bridge_sup.erl @@ -31,6 +31,7 @@ -behavior(supervisor). -export([start_link/0, + bridges/0, start_bridge/2, start_bridge/3, stop_bridge/2]). @@ -49,6 +50,11 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). +-spec bridges() -> [{tuple(), pid()}]. +bridges() -> + [{{Node, SubTopic}, Pid} || {{bridge, Node, SubTopic}, Pid, worker, _} + <- supervisor:which_children(?MODULE)]. + %%------------------------------------------------------------------------------ %% @doc %% Start a bridge. diff --git a/apps/emqttd/src/emqttd_ctl.erl b/apps/emqttd/src/emqttd_ctl.erl index 62273e52f..a1347f1b8 100644 --- a/apps/emqttd/src/emqttd_ctl.erl +++ b/apps/emqttd/src/emqttd_ctl.erl @@ -36,16 +36,11 @@ -define(PRINT(Format, Args), io:format(Format, Args)). --export([status/1, - cluster/1, - useradd/1, - userdel/1]). - -%TODO: add comment -% bridge -% metrics -% broker -% sockets +-export([status/1, cluster/1, + listeners/1, + useradd/1, userdel/1, + broker/1, + bridges/1, start_bridge/1, stop_bridge/1]). status([]) -> {InternalStatus, _ProvidedStatus} = init:get_status(), @@ -84,6 +79,41 @@ useradd([Username, Password]) -> userdel([Username]) -> ?PRINT("~p", [emqttd_auth:delete(list_to_binary(Username))]). +broker([]) -> + Funs = [sysdescr, version, uptime, datetime], + [?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs]; + +broker(["stats"]) -> + [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_broker:getstats()]; + +broker(["metrics"]) -> + [?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()]. + +listeners([]) -> + lists:foreach(fun({{Protocol, Port}, Pid}) -> + ?PRINT("listener ~s:~p~n", [Protocol, Port]), + ?PRINT(" acceptor_pool: ~p~n", [esockd:get_acceptor_pool(Pid)]), + ?PRINT(" max_clients: ~p~n", [esockd:get_max_clients(Pid)]), + ?PRINT(" current_clients: ~p~n", [esockd:get_current_clients(Pid)]) + end, esockd:listeners()). + +bridges([]) -> + lists:foreach(fun({{Node, Topic}, _Pid}) -> + ?PRINT("bridge: ~s ~s~n", [Node, Topic]) + end, emqttd_bridge_sup:bridges()). + +start_bridge([SNode, Topic]) -> + case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of + {ok, _} -> ?PRINT_MSG("bridge is started.~n"); + {error, Error} -> ?PRINT("error: ~p~n", [Error]) + end. + +stop_bridge([SNode, Topic]) -> + case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of + ok -> ?PRINT_MSG("bridge is stopped.~n"); + {error, Error} -> ?PRINT("error: ~p~n", [Error]) + end. + node_name(SNode) -> SNode1 = case string:tokens(SNode, "@") of @@ -101,3 +131,4 @@ node_name(SNode) -> end end, list_to_atom(SNode1). +