refactor: cleanup list subscriptions
This commit is contained in:
parent
0d2ce85776
commit
0d357f7038
|
@ -66,13 +66,8 @@
|
|||
|
||||
%% Subscriptions
|
||||
-export([
|
||||
list_subscriptions/1,
|
||||
list_subscriptions_via_topic/2,
|
||||
list_subscriptions_via_topic/3,
|
||||
lookup_subscriptions/1,
|
||||
lookup_subscriptions/2,
|
||||
|
||||
do_list_subscriptions/0
|
||||
list_subscriptions_via_topic/3
|
||||
]).
|
||||
|
||||
%% PubSub
|
||||
|
@ -400,21 +395,6 @@ call_client(Node, ClientId, Req) ->
|
|||
%% Subscriptions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec do_list_subscriptions() -> [map()].
|
||||
do_list_subscriptions() ->
|
||||
case check_max_table_size([mqtt_subproperty]) of
|
||||
false ->
|
||||
throw(max_row_limit);
|
||||
true ->
|
||||
[
|
||||
#{topic => Topic, clientid => ClientId, options => Options}
|
||||
|| {{Topic, ClientId}, Options} <- ets:tab2list(mqtt_subproperty)
|
||||
]
|
||||
end.
|
||||
|
||||
list_subscriptions(Node) ->
|
||||
unwrap_rpc(emqx_management_proto_v3:list_subscriptions(Node)).
|
||||
|
||||
list_subscriptions_via_topic(Topic, FormatFun) ->
|
||||
lists:append([
|
||||
list_subscriptions_via_topic(Node, Topic, FormatFun)
|
||||
|
@ -427,12 +407,6 @@ list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) ->
|
|||
Result -> M:F(Result)
|
||||
end.
|
||||
|
||||
lookup_subscriptions(ClientId) ->
|
||||
lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]).
|
||||
|
||||
lookup_subscriptions(Node, ClientId) ->
|
||||
unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% PubSub
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -568,17 +542,6 @@ unwrap_rpc(Res) ->
|
|||
otp_rel() ->
|
||||
iolist_to_binary([emqx_vm:get_otp_version(), "/", erlang:system_info(version)]).
|
||||
|
||||
check_max_table_size(Tables) ->
|
||||
check_max_table_size(Tables, ?MAX_TABLE_SIZE).
|
||||
|
||||
check_max_table_size([], _Limit) ->
|
||||
true;
|
||||
check_max_table_size([Tab | Tables], Limit) ->
|
||||
case table_size(Tab) > Limit of
|
||||
true -> false;
|
||||
false -> check_max_table_size(Tables, Limit)
|
||||
end.
|
||||
|
||||
check_results(Results) ->
|
||||
case lists:any(fun(Item) -> Item =:= ok end, Results) of
|
||||
true -> ok;
|
||||
|
@ -587,5 +550,3 @@ check_results(Results) ->
|
|||
|
||||
default_row_limit() ->
|
||||
?DEFAULT_ROW_LIMIT.
|
||||
|
||||
table_size(Tab) -> ets:info(Tab, size).
|
||||
|
|
|
@ -166,6 +166,77 @@ t_list_client_subscriptions(Config) ->
|
|||
?assertMatch({_, [{<<"t/#">>, _Opts}]}, emqx_mgmt:list_client_subscriptions(<<"client1">>)),
|
||||
?assertEqual({error, not_found}, emqx_mgmt:list_client_subscriptions(<<"notfound">>)).
|
||||
|
||||
t_clean_cache(init, Config) ->
|
||||
setup_clients(Config);
|
||||
t_clean_cache('end', Config) ->
|
||||
disconnect_clients(Config).
|
||||
|
||||
t_clean_cache(_Config) ->
|
||||
?assertNotMatch(
|
||||
{error, _},
|
||||
emqx_mgmt:clean_authz_cache(<<"client1">>)
|
||||
),
|
||||
?assertNotMatch(
|
||||
{error, _},
|
||||
emqx_mgmt:clean_authz_cache_all()
|
||||
),
|
||||
?assertNotMatch(
|
||||
{error, _},
|
||||
emqx_mgmt:clean_pem_cache_all()
|
||||
).
|
||||
|
||||
t_set_client_props(init, Config) ->
|
||||
setup_clients(Config);
|
||||
t_set_client_props('end', Config) ->
|
||||
disconnect_clients(Config).
|
||||
|
||||
t_set_client_props(_Config) ->
|
||||
?assertEqual(
|
||||
% [FIXME] not implemented at this point?
|
||||
ignored,
|
||||
emqx_mgmt:set_ratelimit_policy(<<"client1">>, foo)
|
||||
),
|
||||
?assertEqual(
|
||||
{error, not_found},
|
||||
emqx_mgmt:set_ratelimit_policy(<<"notfound">>, foo)
|
||||
),
|
||||
?assertEqual(
|
||||
% [FIXME] not implemented at this point?
|
||||
ignored,
|
||||
emqx_mgmt:set_quota_policy(<<"client1">>, foo)
|
||||
),
|
||||
?assertEqual(
|
||||
{error, not_found},
|
||||
emqx_mgmt:set_quota_policy(<<"notfound">>, foo)
|
||||
),
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_mgmt:set_keepalive(<<"client1">>, 3600)
|
||||
),
|
||||
?assertMatch(
|
||||
{error, _},
|
||||
emqx_mgmt:set_keepalive(<<"client1">>, true)
|
||||
),
|
||||
?assertEqual(
|
||||
{error, not_found},
|
||||
emqx_mgmt:set_keepalive(<<"notfound">>, 3600)
|
||||
),
|
||||
ok.
|
||||
|
||||
t_list_subscriptions_via_topic(init, Config) ->
|
||||
setup_clients(Config);
|
||||
t_list_subscriptions_via_topic('end', Config) ->
|
||||
disconnect_clients(Config).
|
||||
|
||||
t_list_subscriptions_via_topic(Config) ->
|
||||
[Client | _] = ?config(clients, Config),
|
||||
?assertEqual([], emqx_mgmt:list_subscriptions_via_topic(<<"t/#">>, ?FORMATFUN)),
|
||||
emqtt:subscribe(Client, <<"t/#">>),
|
||||
?assertMatch(
|
||||
[{{<<"t/#">>, _SubPid}, _Opts}],
|
||||
emqx_mgmt:list_subscriptions_via_topic(<<"t/#">>, ?FORMATFUN)
|
||||
).
|
||||
|
||||
%%% helpers
|
||||
ident(Arg) ->
|
||||
Arg.
|
||||
|
|
|
@ -114,7 +114,7 @@ t_clients(_) ->
|
|||
SubscribeBody
|
||||
),
|
||||
timer:sleep(100),
|
||||
[{AfterSubTopic, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1),
|
||||
{_, [{AfterSubTopic, #{qos := AfterSubQos}}]} = emqx_mgmt:list_client_subscriptions(ClientId1),
|
||||
?assertEqual(AfterSubTopic, Topic),
|
||||
?assertEqual(AfterSubQos, Qos),
|
||||
|
||||
|
@ -159,7 +159,7 @@ t_clients(_) ->
|
|||
UnSubscribeBody
|
||||
),
|
||||
timer:sleep(100),
|
||||
?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)),
|
||||
?assertEqual([], emqx_mgmt:list_client_subscriptions(ClientId1)),
|
||||
|
||||
%% testcase cleanup, kickout client1
|
||||
{ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client1Path),
|
||||
|
|
Loading…
Reference in New Issue