Merge pull request #13524 from lafirest/feat/exclusive-cli
feat(exclusive): added CLI interface for exclusive topics
This commit is contained in:
commit
2d6b2bff8e
|
@ -23,6 +23,7 @@
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}).
|
-define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}).
|
||||||
|
-define(EXCLUSIVE_TAB, emqx_exclusive_subscription).
|
||||||
|
|
||||||
-export([load/0]).
|
-export([load/0]).
|
||||||
|
|
||||||
|
@ -45,7 +46,8 @@
|
||||||
olp/1,
|
olp/1,
|
||||||
data/1,
|
data/1,
|
||||||
ds/1,
|
ds/1,
|
||||||
cluster_info/0
|
cluster_info/0,
|
||||||
|
exclusive/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-spec load() -> ok.
|
-spec load() -> ok.
|
||||||
|
@ -1024,7 +1026,9 @@ print({?SUBOPTION, {{Topic, Pid}, Options}}) when is_pid(Pid) ->
|
||||||
NL = maps:get(nl, Options, 0),
|
NL = maps:get(nl, Options, 0),
|
||||||
RH = maps:get(rh, Options, 0),
|
RH = maps:get(rh, Options, 0),
|
||||||
RAP = maps:get(rap, Options, 0),
|
RAP = maps:get(rap, Options, 0),
|
||||||
emqx_ctl:print("~ts -> topic:~ts qos:~p nl:~p rh:~p rap:~p~n", [SubId, Topic, QoS, NL, RH, RAP]).
|
emqx_ctl:print("~ts -> topic:~ts qos:~p nl:~p rh:~p rap:~p~n", [SubId, Topic, QoS, NL, RH, RAP]);
|
||||||
|
print({exclusive, {exclusive_subscription, Topic, ClientId}}) ->
|
||||||
|
emqx_ctl:print("topic:~ts -> ClientId:~ts~n", [Topic, ClientId]).
|
||||||
|
|
||||||
format(_, undefined) ->
|
format(_, undefined) ->
|
||||||
undefined;
|
undefined;
|
||||||
|
@ -1085,3 +1089,19 @@ safe_call_mria(Fun, Args, OnFail) ->
|
||||||
}),
|
}),
|
||||||
OnFail
|
OnFail
|
||||||
end.
|
end.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% @doc Exclusive topics
|
||||||
|
exclusive(["list"]) ->
|
||||||
|
case ets:info(?EXCLUSIVE_TAB, size) of
|
||||||
|
0 -> emqx_ctl:print("No topics.~n");
|
||||||
|
_ -> dump(?EXCLUSIVE_TAB, exclusive)
|
||||||
|
end;
|
||||||
|
exclusive(["delete", Topic0]) ->
|
||||||
|
Topic = erlang:iolist_to_binary(Topic0),
|
||||||
|
emqx_exclusive_subscription:unsubscribe(Topic, #{is_exclusive => true}),
|
||||||
|
emqx_ctl:print("ok~n");
|
||||||
|
exclusive(_) ->
|
||||||
|
emqx_ctl:usage([
|
||||||
|
{"exclusive list", "List all exclusive topics"},
|
||||||
|
{"exclusive delete <Topic>", "Delete an exclusive topic"}
|
||||||
|
]).
|
||||||
|
|
|
@ -354,4 +354,9 @@ t_autocluster_leave(Config) ->
|
||||||
)
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
t_exclusive(_Config) ->
|
||||||
|
emqx_ctl:run_command(["exclusive", "list"]),
|
||||||
|
emqx_ctl:run_command(["exclusive", "delete", "t/1"]),
|
||||||
|
ok.
|
||||||
|
|
||||||
format(Str, Opts) -> io:format("str:~s: Opts:~p", [Str, Opts]).
|
format(Str, Opts) -> io:format("str:~s: Opts:~p", [Str, Opts]).
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Added CLI interface `emqx ctl exclusive` for the feature exclusive topics.
|
Loading…
Reference in New Issue