Merge pull request #8209 from ieQu1/list-retained-message-topics
feat(retainer): CLI to list and clean retained messages
This commit is contained in:
commit
3b41c841a7
|
@ -41,7 +41,8 @@
|
|||
delete/1,
|
||||
page_read/3,
|
||||
post_config_update/5,
|
||||
stats_fun/0
|
||||
stats_fun/0,
|
||||
retained_count/0
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
|
@ -152,6 +153,9 @@ clean() ->
|
|||
delete(Topic) ->
|
||||
call({?FUNCTION_NAME, Topic}).
|
||||
|
||||
retained_count() ->
|
||||
call(?FUNCTION_NAME).
|
||||
|
||||
page_read(Topic, Page, Limit) ->
|
||||
call({?FUNCTION_NAME, Topic, Page, Limit}).
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
-include_lib("stdlib/include/ms_transform.hrl").
|
||||
-include_lib("stdlib/include/qlc.hrl").
|
||||
|
||||
%% emqx_retainer callbacks
|
||||
-export([
|
||||
delete_message/2,
|
||||
store_retained/2,
|
||||
|
@ -35,6 +36,9 @@
|
|||
size/1
|
||||
]).
|
||||
|
||||
%% Management API:
|
||||
-export([topics/0]).
|
||||
|
||||
-export([create_resource/1]).
|
||||
|
||||
-export([reindex/2, reindex_status/0]).
|
||||
|
@ -54,6 +58,13 @@
|
|||
-define(REINDEX_BATCH_SIZE, 1000).
|
||||
-define(REINDEX_DISPATCH_WAIT, 30000).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Management API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
topics() ->
|
||||
[emqx_topic:join(I) || I <- mnesia:dirty_all_keys(?TAB_MESSAGE)].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% emqx_retainer callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -31,6 +31,15 @@
|
|||
load() ->
|
||||
ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []).
|
||||
|
||||
retainer(["info"]) ->
|
||||
?PRINT("Number of retained messages: ~p~n", [emqx_retainer:retained_count()]);
|
||||
retainer(["topics"]) ->
|
||||
[?PRINT("~ts~n", [I]) || I <- emqx_retainer_mnesia:topics()],
|
||||
ok;
|
||||
retainer(["clean", Topic]) ->
|
||||
emqx_retainer:delete(list_to_binary(Topic));
|
||||
retainer(["clean"]) ->
|
||||
emqx_retainer:clean();
|
||||
retainer(["reindex", "status"]) ->
|
||||
case emqx_retainer_mnesia:reindex_status() of
|
||||
true ->
|
||||
|
@ -66,6 +75,10 @@ retainer(["reindex", "start", ForceParam]) ->
|
|||
retainer(_) ->
|
||||
emqx_ctl:usage(
|
||||
[
|
||||
{"retainer info", "Show the count of retained messages"},
|
||||
{"retainer topics", "Show all topics of retained messages"},
|
||||
{"retainer clean", "Clean all retained messages"},
|
||||
{"retainer clean <Topic>", "Clean retained messages by the specified topic filter"},
|
||||
{"retainer reindex status", "Show reindex status"},
|
||||
{"retainer reindex start [force]",
|
||||
"Generate new retainer topic indices config settings.\n"
|
||||
|
|
|
@ -38,6 +38,18 @@ end_per_suite(_Config) ->
|
|||
t_reindex_status(_Config) ->
|
||||
ok = emqx_retainer_mnesia_cli:retainer(["reindex", "status"]).
|
||||
|
||||
t_info(_Config) ->
|
||||
ok = emqx_retainer_mnesia_cli:retainer(["info"]).
|
||||
|
||||
t_topics(_Config) ->
|
||||
ok = emqx_retainer_mnesia_cli:retainer(["topics"]).
|
||||
|
||||
t_clean(_Config) ->
|
||||
ok = emqx_retainer_mnesia_cli:retainer(["clean"]).
|
||||
|
||||
t_topic(_Config) ->
|
||||
ok = emqx_retainer_mnesia_cli:retainer(["clean", "foo/bar"]).
|
||||
|
||||
t_reindex(_Config) ->
|
||||
{ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
|
|
Loading…
Reference in New Issue