emqx/apps/emqx_gateway/src/emqx_gateway_cli.erl

328 lines
9.4 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The Command-Line-Interface module for Gateway Application
-module(emqx_gateway_cli).
-export([
load/0,
unload/0
]).
-export([
gateway/1,
'gateway-registry'/1,
'gateway-clients'/1,
'gateway-metrics'/1
%, 'gateway-banned'/1
]).
-elvis([{elvis_style, function_naming_convention, disable}]).
-spec load() -> ok.
load() ->
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
lists:foreach(
fun(Cmd) ->
emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, [])
end,
Cmds
).
-spec unload() -> ok.
unload() ->
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
lists:foreach(fun(Cmd) -> emqx_ctl:unregister_command(Cmd) end, Cmds).
is_cmd(Fun) ->
case atom_to_list(Fun) of
"gateway" ++ _ -> true;
_ -> false
end.
%%--------------------------------------------------------------------
%% Cmds
gateway(["list"]) ->
lists:foreach(
fun(GwSummary) ->
print(format_gw_summary(GwSummary))
end,
emqx_gateway_http:gateways(all)
);
gateway(["lookup", Name]) ->
case emqx_gateway:lookup(atom(Name)) of
undefined ->
print("undefined\n");
Gateway ->
print(format_gateway(Gateway))
end;
gateway(["load", Name, Conf]) ->
case
emqx_gateway_conf:load_gateway(
bin(Name),
emqx_json:decode(Conf, [return_maps])
)
of
{ok, _} ->
print("ok\n");
{error, Reason} ->
print("Error: ~ts\n", [format_error(Reason)])
end;
gateway(["unload", Name]) ->
case emqx_gateway_conf:unload_gateway(bin(Name)) of
ok ->
print("ok\n");
{error, Reason} ->
print("Error: ~ts\n", [format_error(Reason)])
end;
gateway(["stop", Name]) ->
case
emqx_gateway_conf:update_gateway(
bin(Name),
#{<<"enable">> => <<"false">>}
)
of
{ok, _} ->
print("ok\n");
{error, Reason} ->
print("Error: ~ts\n", [format_error(Reason)])
end;
gateway(["start", Name]) ->
case
emqx_gateway_conf:update_gateway(
bin(Name),
#{<<"enable">> => <<"true">>}
)
of
{ok, _} ->
print("ok\n");
{error, Reason} ->
print("Error: ~ts\n", [format_error(Reason)])
end;
gateway(_) ->
emqx_ctl:usage(
[
{"gateway list", "List all gateway"},
{"gateway lookup <Name>", "Lookup a gateway detailed information"},
{"gateway load <Name> <JsonConf>", "Load a gateway with config"},
{"gateway unload <Name>", "Unload the gateway"},
{"gateway stop <Name>", "Stop the gateway"},
{"gateway start <Name>", "Start the gateway"}
]
).
'gateway-registry'(["list"]) ->
lists:foreach(
fun({Name, #{cbkmod := CbMod}}) ->
print("Registered Name: ~ts, Callback Module: ~ts\n", [Name, CbMod])
end,
emqx_gateway_registry:list()
);
'gateway-registry'(_) ->
emqx_ctl:usage([{"gateway-registry list", "List all registered gateways"}]).
'gateway-clients'(["list", Name]) ->
%% XXX: page me?
InfoTab = emqx_gateway_cm:tabname(info, Name),
case ets:info(InfoTab) of
undefined ->
print("Bad Gateway Name.\n");
_ ->
dump(InfoTab, client)
end;
'gateway-clients'(["lookup", Name, ClientId]) ->
ChanTab = emqx_gateway_cm:tabname(chan, Name),
case ets:info(ChanTab) of
undefined ->
print("Bad Gateway Name.\n");
_ ->
case ets:lookup(ChanTab, bin(ClientId)) of
[] ->
print("Not Found.\n");
[Chann] ->
InfoTab = emqx_gateway_cm:tabname(info, Name),
[ChannInfo] = ets:lookup(InfoTab, Chann),
print_record({client, ChannInfo})
end
end;
'gateway-clients'(["kick", Name, ClientId]) ->
case emqx_gateway_cm:kick_session(Name, bin(ClientId)) of
ok -> print("ok\n");
_ -> print("Not Found.\n")
end;
'gateway-clients'(_) ->
emqx_ctl:usage([
{"gateway-clients list <Name>", "List all clients for a gateway"},
{"gateway-clients lookup <Name> <ClientId>", "Lookup the Client Info for specified client"},
{"gateway-clients kick <Name> <ClientId>", "Kick out a client"}
]).
'gateway-metrics'([Name]) ->
case emqx_gateway_metrics:lookup(atom(Name)) of
undefined ->
print("Bad Gateway Name.\n");
Metrics ->
lists:foreach(
fun({K, V}) -> print("~-30s: ~w\n", [K, V]) end,
Metrics
)
end;
'gateway-metrics'(_) ->
emqx_ctl:usage([{"gateway-metrics <Name>", "List all metrics for a gateway"}]).
atom(Id) ->
try
list_to_existing_atom(Id)
catch
_:_ -> undefined
end.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
bin(S) -> iolist_to_binary(S).
dump(Table, Tag) ->
dump(Table, Tag, ets:first(Table), []).
dump(_Table, _, '$end_of_table', Result) ->
lists:reverse(Result);
dump(Table, Tag, Key, Result) ->
PrintValue = [print_record({Tag, Record}) || Record <- ets:lookup(Table, Key)],
dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]).
print_record({client, {_, Infos, Stats}}) ->
ClientInfo = maps:get(clientinfo, Infos, #{}),
ConnInfo = maps:get(conninfo, Infos, #{}),
_Session = maps:get(session, Infos, #{}),
SafeGet = fun(K, M) -> maps:get(K, M, undefined) end,
StatsGet = fun(K) -> proplists:get_value(K, Stats, 0) end,
ConnectedAt = SafeGet(connected_at, ConnInfo),
InfoKeys = [
clientid,
username,
peername,
clean_start,
keepalive,
subscriptions_cnt,
send_msg,
connected,
created_at,
connected_at
],
Info = #{
clientid => SafeGet(clientid, ClientInfo),
username => SafeGet(username, ClientInfo),
peername => SafeGet(peername, ConnInfo),
clean_start => SafeGet(clean_start, ConnInfo),
keepalive => SafeGet(keepalive, ConnInfo),
subscriptions_cnt => StatsGet(subscriptions_cnt),
send_msg => StatsGet(send_msg),
connected => SafeGet(conn_state, Infos) == connected,
created_at => ConnectedAt,
connected_at => ConnectedAt
},
print(
"Client(~ts, username=~ts, peername=~ts, "
"clean_start=~ts, keepalive=~w, "
"subscriptions=~w, delivered_msgs=~w, "
"connected=~ts, created_at=~w, connected_at=~w)\n",
[format(K, maps:get(K, Info)) || K <- InfoKeys]
).
print(S) -> emqx_ctl:print(S).
print(S, A) -> emqx_ctl:print(S, A).
format(_, undefined) ->
undefined;
format(peername, {IPAddr, Port}) ->
IPStr = emqx_mgmt_util:ntoa(IPAddr),
io_lib:format("~ts:~p", [IPStr, Port]);
format(_, Val) ->
Val.
format_gw_summary(#{name := Name, status := unloaded}) ->
io_lib:format("Gateway(name=~ts, status=unloaded)\n", [Name]);
format_gw_summary(#{
name := Name,
status := stopped,
stopped_at := StoppedAt
}) ->
io_lib:format(
"Gateway(name=~ts, status=stopped, stopped_at=~ts)\n",
[Name, StoppedAt]
);
format_gw_summary(#{
name := Name,
status := running,
current_connections := ConnCnt,
started_at := StartedAt
}) ->
io_lib:format(
"Gateway(name=~ts, status=running, clients=~w, "
"started_at=~ts)\n",
[Name, ConnCnt, StartedAt]
).
format_gateway(#{
name := Name,
status := unloaded
}) ->
io_lib:format(
"name: ~ts\n"
"status: unloaded\n",
[Name]
);
format_gateway(
Gw =
#{
name := Name,
status := Status,
created_at := CreatedAt,
config := Config
}
) ->
{StopOrStart, Timestamp} =
case Status of
stopped -> {stopped_at, maps:get(stopped_at, Gw)};
running -> {started_at, maps:get(started_at, Gw)}
end,
io_lib:format(
"name: ~ts\n"
"status: ~ts\n"
"created_at: ~ts\n"
"~ts: ~ts\n"
"config: ~p\n",
[
Name,
Status,
emqx_gateway_utils:unix_ts_to_rfc3339(CreatedAt),
StopOrStart,
emqx_gateway_utils:unix_ts_to_rfc3339(Timestamp),
Config
]
).
format_error(Reason) ->
case emqx_gateway_http:reason2msg(Reason) of
error -> io_lib:format("~p", [Reason]);
Msg -> Msg
end.