Merge pull request #11745 from zhongwencool/cli-fix
fix: cli list return nothing when list is empty
This commit is contained in:
commit
b1a3bc8565
|
@ -157,7 +157,10 @@ sort_map_list_field(Field, Map) ->
|
||||||
%% @doc Query clients
|
%% @doc Query clients
|
||||||
|
|
||||||
clients(["list"]) ->
|
clients(["list"]) ->
|
||||||
dump(?CHAN_TAB, client);
|
case ets:info(?CHAN_TAB, size) of
|
||||||
|
0 -> emqx_ctl:print("No clients.~n");
|
||||||
|
_ -> dump(?CHAN_TAB, client)
|
||||||
|
end;
|
||||||
clients(["show", ClientId]) ->
|
clients(["show", ClientId]) ->
|
||||||
if_client(ClientId, fun print/1);
|
if_client(ClientId, fun print/1);
|
||||||
clients(["kick", ClientId]) ->
|
clients(["kick", ClientId]) ->
|
||||||
|
@ -180,10 +183,15 @@ if_client(ClientId, Fun) ->
|
||||||
%% @doc Topics Command
|
%% @doc Topics Command
|
||||||
|
|
||||||
topics(["list"]) ->
|
topics(["list"]) ->
|
||||||
emqx_router:foldr_routes(
|
Res =
|
||||||
fun(Route, Acc) -> [print({emqx_topic, Route}) | Acc] end,
|
emqx_router:foldr_routes(
|
||||||
[]
|
fun(Route, Acc) -> [print({emqx_topic, Route}) | Acc] end,
|
||||||
);
|
[]
|
||||||
|
),
|
||||||
|
case Res of
|
||||||
|
[] -> emqx_ctl:print("No topics.~n");
|
||||||
|
_ -> ok
|
||||||
|
end;
|
||||||
topics(["show", Topic]) ->
|
topics(["show", Topic]) ->
|
||||||
Routes = emqx_router:lookup_routes(Topic),
|
Routes = emqx_router:lookup_routes(Topic),
|
||||||
[print({emqx_topic, Route}) || Route <- Routes];
|
[print({emqx_topic, Route}) || Route <- Routes];
|
||||||
|
@ -194,12 +202,17 @@ topics(_) ->
|
||||||
]).
|
]).
|
||||||
|
|
||||||
subscriptions(["list"]) ->
|
subscriptions(["list"]) ->
|
||||||
lists:foreach(
|
case ets:info(?SUBOPTION, size) of
|
||||||
fun(Suboption) ->
|
0 ->
|
||||||
print({?SUBOPTION, Suboption})
|
emqx_ctl:print("No subscriptions.~n");
|
||||||
end,
|
_ ->
|
||||||
ets:tab2list(?SUBOPTION)
|
lists:foreach(
|
||||||
);
|
fun(SubOption) ->
|
||||||
|
print({?SUBOPTION, SubOption})
|
||||||
|
end,
|
||||||
|
ets:tab2list(?SUBOPTION)
|
||||||
|
)
|
||||||
|
end;
|
||||||
subscriptions(["show", ClientId]) ->
|
subscriptions(["show", ClientId]) ->
|
||||||
case ets:lookup(emqx_subid, bin(ClientId)) of
|
case ets:lookup(emqx_subid, bin(ClientId)) of
|
||||||
[] ->
|
[] ->
|
||||||
|
@ -207,7 +220,7 @@ subscriptions(["show", ClientId]) ->
|
||||||
[{_, Pid}] ->
|
[{_, Pid}] ->
|
||||||
case ets:match_object(?SUBOPTION, {{'_', Pid}, '_'}) of
|
case ets:match_object(?SUBOPTION, {{'_', Pid}, '_'}) of
|
||||||
[] -> emqx_ctl:print("Not Found.~n");
|
[] -> emqx_ctl:print("Not Found.~n");
|
||||||
Suboption -> [print({?SUBOPTION, Sub}) || Sub <- Suboption]
|
SubOption -> [print({?SUBOPTION, Sub}) || Sub <- SubOption]
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
subscriptions(["add", ClientId, Topic, QoS]) ->
|
subscriptions(["add", ClientId, Topic, QoS]) ->
|
||||||
|
@ -446,13 +459,20 @@ log(_) ->
|
||||||
%% @doc Trace Command
|
%% @doc Trace Command
|
||||||
|
|
||||||
trace(["list"]) ->
|
trace(["list"]) ->
|
||||||
lists:foreach(
|
case emqx_trace_handler:running() of
|
||||||
fun(Trace) ->
|
[] ->
|
||||||
#{type := Type, filter := Filter, level := Level, dst := Dst} = Trace,
|
emqx_ctl:print("Trace is empty~n", []);
|
||||||
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~0p)~n", [Type, Filter, Level, Dst])
|
Traces ->
|
||||||
end,
|
lists:foreach(
|
||||||
emqx_trace_handler:running()
|
fun(Trace) ->
|
||||||
);
|
#{type := Type, filter := Filter, level := Level, dst := Dst} = Trace,
|
||||||
|
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~0p)~n", [
|
||||||
|
Type, Filter, Level, Dst
|
||||||
|
])
|
||||||
|
end,
|
||||||
|
Traces
|
||||||
|
)
|
||||||
|
end;
|
||||||
trace(["stop", Operation, Filter0]) ->
|
trace(["stop", Operation, Filter0]) ->
|
||||||
case trace_type(Operation, Filter0) of
|
case trace_type(Operation, Filter0) of
|
||||||
{ok, Type, Filter} -> trace_off(Type, Filter);
|
{ok, Type, Filter} -> trace_off(Type, Filter);
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_modules, [
|
{application, emqx_modules, [
|
||||||
{description, "EMQX Modules"},
|
{description, "EMQX Modules"},
|
||||||
{vsn, "5.0.22"},
|
{vsn, "5.0.23"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{applications, [kernel, stdlib, emqx, emqx_ctl]},
|
{applications, [kernel, stdlib, emqx, emqx_ctl]},
|
||||||
{mod, {emqx_modules_app, []}},
|
{mod, {emqx_modules_app, []}},
|
||||||
|
|
|
@ -40,10 +40,18 @@ cmd(["bin_leak"]) ->
|
||||||
recon:bin_leak(100)
|
recon:bin_leak(100)
|
||||||
);
|
);
|
||||||
cmd(["load", Mod]) ->
|
cmd(["load", Mod]) ->
|
||||||
Module = list_to_existing_atom(Mod),
|
case nodes() of
|
||||||
Nodes = nodes(),
|
[] ->
|
||||||
Res = remote_load(Nodes, Module),
|
emqx_ctl:print("No other nodes in the cluster~n");
|
||||||
emqx_ctl:print("Loaded ~p module on ~p: ~p~n", [Module, Nodes, Res]);
|
Nodes ->
|
||||||
|
case emqx_utils:safe_to_existing_atom(Mod) of
|
||||||
|
{ok, Module} ->
|
||||||
|
Res = recon:remote_load(Nodes, Module),
|
||||||
|
emqx_ctl:print("Loaded ~p module on ~p: ~p~n", [Module, Nodes, Res]);
|
||||||
|
{error, Reason} ->
|
||||||
|
emqx_ctl:print("Module(~s) not found: ~p~n", [Mod, Reason])
|
||||||
|
end
|
||||||
|
end;
|
||||||
cmd(_) ->
|
cmd(_) ->
|
||||||
emqx_ctl:usage([
|
emqx_ctl:usage([
|
||||||
{"observer status", "Start observer in the current console"},
|
{"observer status", "Start observer in the current console"},
|
||||||
|
@ -51,12 +59,5 @@ cmd(_) ->
|
||||||
"Force all processes to perform garbage collection "
|
"Force all processes to perform garbage collection "
|
||||||
"and prints the top-100 processes that freed the "
|
"and prints the top-100 processes that freed the "
|
||||||
"biggest amount of binaries, potentially highlighting leaks."},
|
"biggest amount of binaries, potentially highlighting leaks."},
|
||||||
{"observer load Mod", "Ensure a module is loaded in all EMQX nodes in the cluster"}
|
{"observer load Mod", "Enhanced module synchronization across all cluster nodes"}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% recon:remote_load/1 has a bug, when nodes() returns [], it is
|
|
||||||
%% taken by recon as a node name.
|
|
||||||
%% before OTP 23, the call returns a 'badrpc' tuple
|
|
||||||
%% after OTP 23, it crashes with 'badarg' error
|
|
||||||
remote_load([], _Module) -> ok;
|
|
||||||
remote_load(Nodes, Module) -> recon:remote_load(Nodes, Module).
|
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
{application, emqx_retainer, [
|
{application, emqx_retainer, [
|
||||||
{description, "EMQX Retainer"},
|
{description, "EMQX Retainer"},
|
||||||
% strict semver, bump manually!
|
% strict semver, bump manually!
|
||||||
{vsn, "5.0.17"},
|
{vsn, "5.0.18"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_retainer_sup]},
|
{registered, [emqx_retainer_sup]},
|
||||||
{applications, [kernel, stdlib, emqx, emqx_ctl]},
|
{applications, [kernel, stdlib, emqx, emqx_ctl]},
|
||||||
|
|
|
@ -32,10 +32,11 @@ load() ->
|
||||||
ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []).
|
ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []).
|
||||||
|
|
||||||
retainer(["info"]) ->
|
retainer(["info"]) ->
|
||||||
?PRINT("Number of retained messages: ~p~n", [emqx_retainer:retained_count()]);
|
count();
|
||||||
retainer(["topics"]) ->
|
retainer(["topics"]) ->
|
||||||
[?PRINT("~ts~n", [I]) || I <- emqx_retainer_mnesia:topics()],
|
topic(1, 1000);
|
||||||
ok;
|
retainer(["topics", Start, Len]) ->
|
||||||
|
topic(list_to_integer(Start), list_to_integer(Len));
|
||||||
retainer(["clean", Topic]) ->
|
retainer(["clean", Topic]) ->
|
||||||
emqx_retainer:delete(list_to_binary(Topic));
|
emqx_retainer:delete(list_to_binary(Topic));
|
||||||
retainer(["clean"]) ->
|
retainer(["clean"]) ->
|
||||||
|
@ -65,7 +66,9 @@ retainer(_) ->
|
||||||
emqx_ctl:usage(
|
emqx_ctl:usage(
|
||||||
[
|
[
|
||||||
{"retainer info", "Show the count of retained messages"},
|
{"retainer info", "Show the count of retained messages"},
|
||||||
{"retainer topics", "Show all topics of retained messages"},
|
{"retainer topics", "Same as retainer topic 1 1000"},
|
||||||
|
{"retainer topics <Start> <Limit>",
|
||||||
|
"Show topics of retained messages by the specified range"},
|
||||||
{"retainer clean", "Clean all retained messages"},
|
{"retainer clean", "Clean all retained messages"},
|
||||||
{"retainer clean <Topic>", "Clean retained messages by the specified topic filter"},
|
{"retainer clean <Topic>", "Clean retained messages by the specified topic filter"},
|
||||||
{"retainer reindex status", "Show reindex status"},
|
{"retainer reindex status", "Show reindex status"},
|
||||||
|
@ -98,3 +101,12 @@ do_reindex(Force) ->
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
?PRINT_MSG("Reindexing finished~n").
|
?PRINT_MSG("Reindexing finished~n").
|
||||||
|
|
||||||
|
count() ->
|
||||||
|
?PRINT("Number of retained messages: ~p~n", [emqx_retainer:retained_count()]).
|
||||||
|
|
||||||
|
topic(Start, Len) ->
|
||||||
|
count(),
|
||||||
|
Topics = lists:sublist(emqx_retainer_mnesia:topics(), Start, Len),
|
||||||
|
[?PRINT("~ts~n", [I]) || I <- Topics],
|
||||||
|
ok.
|
||||||
|
|
|
@ -44,6 +44,9 @@ t_info(_Config) ->
|
||||||
t_topics(_Config) ->
|
t_topics(_Config) ->
|
||||||
ok = emqx_retainer_mnesia_cli:retainer(["topics"]).
|
ok = emqx_retainer_mnesia_cli:retainer(["topics"]).
|
||||||
|
|
||||||
|
t_topics_with_len(_Config) ->
|
||||||
|
ok = emqx_retainer_mnesia_cli:retainer(["topics", "100", "200"]).
|
||||||
|
|
||||||
t_clean(_Config) ->
|
t_clean(_Config) ->
|
||||||
ok = emqx_retainer_mnesia_cli:retainer(["clean"]).
|
ok = emqx_retainer_mnesia_cli:retainer(["clean"]).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue