feat(retainer): CLI to list and clean retained messages

This commit is contained in:
ieQu1 2022-06-14 11:23:40 +02:00
parent 5817b7a9a9
commit 9ca2e3bc2a
4 changed files with 41 additions and 1 deletions

View File

@ -41,7 +41,8 @@
delete/1, delete/1,
page_read/3, page_read/3,
post_config_update/5, post_config_update/5,
stats_fun/0 stats_fun/0,
retained_count/0
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -152,6 +153,9 @@ clean() ->
delete(Topic) -> delete(Topic) ->
call({?FUNCTION_NAME, Topic}). call({?FUNCTION_NAME, Topic}).
retained_count() ->
call(?FUNCTION_NAME).
page_read(Topic, Page, Limit) -> page_read(Topic, Page, Limit) ->
call({?FUNCTION_NAME, Topic, Page, Limit}). call({?FUNCTION_NAME, Topic, Page, Limit}).

View File

@ -24,6 +24,7 @@
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-include_lib("stdlib/include/qlc.hrl"). -include_lib("stdlib/include/qlc.hrl").
%% emqx_retainer callbacks
-export([ -export([
delete_message/2, delete_message/2,
store_retained/2, store_retained/2,
@ -35,6 +36,9 @@
size/1 size/1
]). ]).
%% Management API:
-export([topics/0]).
-export([create_resource/1]). -export([create_resource/1]).
-export([reindex/2, reindex_status/0]). -export([reindex/2, reindex_status/0]).
@ -54,6 +58,13 @@
-define(REINDEX_BATCH_SIZE, 1000). -define(REINDEX_BATCH_SIZE, 1000).
-define(REINDEX_DISPATCH_WAIT, 30000). -define(REINDEX_DISPATCH_WAIT, 30000).
%%--------------------------------------------------------------------
%% Management API
%%--------------------------------------------------------------------
topics() ->
[emqx_topic:join(I) || I <- mnesia:dirty_all_keys(?TAB_MESSAGE)].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% emqx_retainer callbacks %% emqx_retainer callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -31,6 +31,15 @@
load() -> load() ->
ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []). ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []).
retainer(["info"]) ->
?PRINT("Number of retained messages: ~p~n", [emqx_retainer:retained_count()]);
retainer(["topics"]) ->
[?PRINT("~ts~n", [I]) || I <- emqx_retainer_mnesia:topics()],
ok;
retainer(["clean", Topic]) ->
emqx_retainer:delete(list_to_binary(Topic));
retainer(["clean"]) ->
emqx_retainer:clean();
retainer(["reindex", "status"]) -> retainer(["reindex", "status"]) ->
case emqx_retainer_mnesia:reindex_status() of case emqx_retainer_mnesia:reindex_status() of
true -> true ->
@ -66,6 +75,10 @@ retainer(["reindex", "start", ForceParam]) ->
retainer(_) -> retainer(_) ->
emqx_ctl:usage( emqx_ctl:usage(
[ [
{"retainer info", "Show the count of retained messages"},
{"retainer topics", "Show all topics of retained messages"},
{"retainer clean", "Clean all retained messages"},
{"retainer clean <Topic>", "Clean retained messages by the specified topic filter"},
{"retainer reindex status", "Show reindex status"}, {"retainer reindex status", "Show reindex status"},
{"retainer reindex start [force]", {"retainer reindex start [force]",
"Generate new retainer topic indices config settings.\n" "Generate new retainer topic indices config settings.\n"

View File

@ -38,6 +38,18 @@ end_per_suite(_Config) ->
t_reindex_status(_Config) -> t_reindex_status(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["reindex", "status"]). ok = emqx_retainer_mnesia_cli:retainer(["reindex", "status"]).
t_info(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["info"]).
t_topics(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["topics"]).
t_clean(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["clean"]).
t_topic(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["clean", "foo/bar"]).
t_reindex(_Config) -> t_reindex(_Config) ->
{ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),