diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 6764f5e82..4eb358ade 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -41,7 +41,8 @@ delete/1, page_read/3, post_config_update/5, - stats_fun/0 + stats_fun/0, + retained_count/0 ]). %% gen_server callbacks @@ -152,6 +153,9 @@ clean() -> delete(Topic) -> call({?FUNCTION_NAME, Topic}). +retained_count() -> + call(?FUNCTION_NAME). + page_read(Topic, Page, Limit) -> call({?FUNCTION_NAME, Topic, Page, Limit}). diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 235d30cc0..c861d27e4 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -24,6 +24,7 @@ -include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/qlc.hrl"). +%% emqx_retainer callbacks -export([ delete_message/2, store_retained/2, @@ -35,6 +36,9 @@ size/1 ]). +%% Management API: +-export([topics/0]). + -export([create_resource/1]). -export([reindex/2, reindex_status/0]). @@ -54,6 +58,13 @@ -define(REINDEX_BATCH_SIZE, 1000). -define(REINDEX_DISPATCH_WAIT, 30000). +%%-------------------------------------------------------------------- +%% Management API +%%-------------------------------------------------------------------- + +topics() -> + [emqx_topic:join(I) || I <- mnesia:dirty_all_keys(?TAB_MESSAGE)]. + %%-------------------------------------------------------------------- %% emqx_retainer callbacks %%-------------------------------------------------------------------- diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl index 8e0c81a6d..22eeafe08 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl @@ -31,6 +31,15 @@ load() -> ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []). +retainer(["info"]) -> + ?PRINT("Number of retained messages: ~p~n", [emqx_retainer:retained_count()]); +retainer(["topics"]) -> + [?PRINT("~ts~n", [I]) || I <- emqx_retainer_mnesia:topics()], + ok; +retainer(["clean", Topic]) -> + emqx_retainer:delete(list_to_binary(Topic)); +retainer(["clean"]) -> + emqx_retainer:clean(); retainer(["reindex", "status"]) -> case emqx_retainer_mnesia:reindex_status() of true -> @@ -66,6 +75,10 @@ retainer(["reindex", "start", ForceParam]) -> retainer(_) -> emqx_ctl:usage( [ + {"retainer info", "Show the count of retained messages"}, + {"retainer topics", "Show all topics of retained messages"}, + {"retainer clean", "Clean all retained messages"}, + {"retainer clean ", "Clean retained messages by the specified topic filter"}, {"retainer reindex status", "Show reindex status"}, {"retainer reindex start [force]", "Generate new retainer topic indices config settings.\n" diff --git a/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl index 8bd6b80f2..8f1a75e30 100644 --- a/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl @@ -38,6 +38,18 @@ end_per_suite(_Config) -> t_reindex_status(_Config) -> ok = emqx_retainer_mnesia_cli:retainer(["reindex", "status"]). +t_info(_Config) -> + ok = emqx_retainer_mnesia_cli:retainer(["info"]). + +t_topics(_Config) -> + ok = emqx_retainer_mnesia_cli:retainer(["topics"]). + +t_clean(_Config) -> + ok = emqx_retainer_mnesia_cli:retainer(["clean"]). + +t_topic(_Config) -> + ok = emqx_retainer_mnesia_cli:retainer(["clean", "foo/bar"]). + t_reindex(_Config) -> {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C),