diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 15f05da04..866efb267 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -157,7 +157,10 @@ sort_map_list_field(Field, Map) -> %% @doc Query clients clients(["list"]) -> - dump(?CHAN_TAB, client); + case ets:info(?CHAN_TAB, size) of + 0 -> emqx_ctl:print("No clients.~n"); + _ -> dump(?CHAN_TAB, client) + end; clients(["show", ClientId]) -> if_client(ClientId, fun print/1); clients(["kick", ClientId]) -> @@ -180,10 +183,15 @@ if_client(ClientId, Fun) -> %% @doc Topics Command topics(["list"]) -> - emqx_router:foldr_routes( - fun(Route, Acc) -> [print({emqx_topic, Route}) | Acc] end, - [] - ); + Res = + emqx_router:foldr_routes( + fun(Route, Acc) -> [print({emqx_topic, Route}) | Acc] end, + [] + ), + case Res of + [] -> emqx_ctl:print("No topics.~n"); + _ -> ok + end; topics(["show", Topic]) -> Routes = emqx_router:lookup_routes(Topic), [print({emqx_topic, Route}) || Route <- Routes]; @@ -194,12 +202,17 @@ topics(_) -> ]). subscriptions(["list"]) -> - lists:foreach( - fun(Suboption) -> - print({?SUBOPTION, Suboption}) - end, - ets:tab2list(?SUBOPTION) - ); + case ets:info(?SUBOPTION, size) of + 0 -> + emqx_ctl:print("No subscriptions.~n"); + _ -> + lists:foreach( + fun(SubOption) -> + print({?SUBOPTION, SubOption}) + end, + ets:tab2list(?SUBOPTION) + ) + end; subscriptions(["show", ClientId]) -> case ets:lookup(emqx_subid, bin(ClientId)) of [] -> @@ -207,7 +220,7 @@ subscriptions(["show", ClientId]) -> [{_, Pid}] -> case ets:match_object(?SUBOPTION, {{'_', Pid}, '_'}) of [] -> emqx_ctl:print("Not Found.~n"); - Suboption -> [print({?SUBOPTION, Sub}) || Sub <- Suboption] + SubOption -> [print({?SUBOPTION, Sub}) || Sub <- SubOption] end end; subscriptions(["add", ClientId, Topic, QoS]) -> @@ -446,13 +459,20 @@ log(_) -> %% @doc Trace Command trace(["list"]) -> - lists:foreach( - fun(Trace) -> - #{type := Type, filter := Filter, level := Level, dst := Dst} = Trace, - emqx_ctl:print("Trace(~s=~s, level=~s, destination=~0p)~n", [Type, Filter, Level, Dst]) - end, - emqx_trace_handler:running() - ); + case emqx_trace_handler:running() of + [] -> + emqx_ctl:print("Trace is empty~n", []); + Traces -> + lists:foreach( + fun(Trace) -> + #{type := Type, filter := Filter, level := Level, dst := Dst} = Trace, + emqx_ctl:print("Trace(~s=~s, level=~s, destination=~0p)~n", [ + Type, Filter, Level, Dst + ]) + end, + Traces + ) + end; trace(["stop", Operation, Filter0]) -> case trace_type(Operation, Filter0) of {ok, Type, Filter} -> trace_off(Type, Filter); diff --git a/apps/emqx_modules/src/emqx_modules.app.src b/apps/emqx_modules/src/emqx_modules.app.src index 09a404a44..e986a3fe1 100644 --- a/apps/emqx_modules/src/emqx_modules.app.src +++ b/apps/emqx_modules/src/emqx_modules.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_modules, [ {description, "EMQX Modules"}, - {vsn, "5.0.22"}, + {vsn, "5.0.23"}, {modules, []}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, {mod, {emqx_modules_app, []}}, diff --git a/apps/emqx_modules/src/emqx_observer_cli.erl b/apps/emqx_modules/src/emqx_observer_cli.erl index abed31edc..0be17decb 100644 --- a/apps/emqx_modules/src/emqx_observer_cli.erl +++ b/apps/emqx_modules/src/emqx_observer_cli.erl @@ -40,10 +40,18 @@ cmd(["bin_leak"]) -> recon:bin_leak(100) ); cmd(["load", Mod]) -> - Module = list_to_existing_atom(Mod), - Nodes = nodes(), - Res = remote_load(Nodes, Module), - emqx_ctl:print("Loaded ~p module on ~p: ~p~n", [Module, Nodes, Res]); + case nodes() of + [] -> + emqx_ctl:print("No other nodes in the cluster~n"); + Nodes -> + case emqx_utils:safe_to_existing_atom(Mod) of + {ok, Module} -> + Res = recon:remote_load(Nodes, Module), + emqx_ctl:print("Loaded ~p module on ~p: ~p~n", [Module, Nodes, Res]); + {error, Reason} -> + emqx_ctl:print("Module(~s) not found: ~p~n", [Mod, Reason]) + end + end; cmd(_) -> emqx_ctl:usage([ {"observer status", "Start observer in the current console"}, @@ -51,12 +59,5 @@ cmd(_) -> "Force all processes to perform garbage collection " "and prints the top-100 processes that freed the " "biggest amount of binaries, potentially highlighting leaks."}, - {"observer load Mod", "Ensure a module is loaded in all EMQX nodes in the cluster"} + {"observer load Mod", "Enhanced module synchronization across all cluster nodes"} ]). - -%% recon:remote_load/1 has a bug, when nodes() returns [], it is -%% taken by recon as a node name. -%% before OTP 23, the call returns a 'badrpc' tuple -%% after OTP 23, it crashes with 'badarg' error -remote_load([], _Module) -> ok; -remote_load(Nodes, Module) -> recon:remote_load(Nodes, Module). diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 8f7c9aa17..cab070826 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.17"}, + {vsn, "5.0.18"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl index 5710e4df3..3fef4c8b0 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl @@ -32,10 +32,11 @@ load() -> ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []). retainer(["info"]) -> - ?PRINT("Number of retained messages: ~p~n", [emqx_retainer:retained_count()]); + count(); retainer(["topics"]) -> - [?PRINT("~ts~n", [I]) || I <- emqx_retainer_mnesia:topics()], - ok; + topic(1, 1000); +retainer(["topics", Start, Len]) -> + topic(list_to_integer(Start), list_to_integer(Len)); retainer(["clean", Topic]) -> emqx_retainer:delete(list_to_binary(Topic)); retainer(["clean"]) -> @@ -65,7 +66,9 @@ retainer(_) -> emqx_ctl:usage( [ {"retainer info", "Show the count of retained messages"}, - {"retainer topics", "Show all topics of retained messages"}, + {"retainer topics", "Same as retainer topic 1 1000"}, + {"retainer topics ", + "Show topics of retained messages by the specified range"}, {"retainer clean", "Clean all retained messages"}, {"retainer clean ", "Clean retained messages by the specified topic filter"}, {"retainer reindex status", "Show reindex status"}, @@ -98,3 +101,12 @@ do_reindex(Force) -> end ), ?PRINT_MSG("Reindexing finished~n"). + +count() -> + ?PRINT("Number of retained messages: ~p~n", [emqx_retainer:retained_count()]). + +topic(Start, Len) -> + count(), + Topics = lists:sublist(emqx_retainer_mnesia:topics(), Start, Len), + [?PRINT("~ts~n", [I]) || I <- Topics], + ok. diff --git a/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl index bddad5fb3..c04f7a6de 100644 --- a/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl @@ -44,6 +44,9 @@ t_info(_Config) -> t_topics(_Config) -> ok = emqx_retainer_mnesia_cli:retainer(["topics"]). +t_topics_with_len(_Config) -> + ok = emqx_retainer_mnesia_cli:retainer(["topics", "100", "200"]). + t_clean(_Config) -> ok = emqx_retainer_mnesia_cli:retainer(["clean"]).