Merge pull request #12598 from JimMoen/EMQX-11912-fix-clients-subscribe-api

fix(mgmt): sub/unsub a share subscription to the client via http api
This commit is contained in:
JimMoen 2024-02-27 20:13:30 +08:00 committed by GitHub
commit 81d3f5b2c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 175 additions and 16 deletions

View File

@ -269,10 +269,11 @@ do_join(_TopicAcc, [C | Words]) when ?MULTI_LEVEL_WILDCARD_NOT_LAST(C, Words) ->
do_join(TopicAcc, [Word | Words]) ->
do_join(<<TopicAcc/binary, "/", (bin(Word))/binary>>, Words).
-spec parse(topic() | {topic(), map()}) -> {topic() | share(), map()}.
parse(TopicFilter) when is_binary(TopicFilter) ->
-spec parse(TF | {TF, map()}) -> {TF, map()} when
TF :: topic() | share().
parse(TopicFilter) when ?IS_TOPIC(TopicFilter) ->
parse(TopicFilter, #{});
parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
parse({TopicFilter, Options}) when ?IS_TOPIC(TopicFilter) ->
parse(TopicFilter, Options).
-spec parse(topic() | share(), map()) -> {topic() | share(), map()}.
@ -282,6 +283,10 @@ parse(#share{topic = Topic = <<?QUEUE, "/", _/binary>>}, _Options) ->
error({invalid_topic_filter, Topic});
parse(#share{topic = Topic = <<?SHARE, "/", _/binary>>}, _Options) ->
error({invalid_topic_filter, Topic});
parse(#share{} = T, #{nl := 1} = _Options) ->
%% Protocol Error and Should Disconnect
%% MQTT-5.0 [MQTT-3.8.3-4] and [MQTT-4.13.1-1]
error({invalid_subopts_nl, maybe_format_share(T)});
parse(<<?QUEUE, "/", Topic/binary>>, Options) ->
parse(#share{group = <<?QUEUE>>, topic = Topic}, Options);
parse(TopicFilter = <<?SHARE, "/", Rest/binary>>, Options) ->

View File

@ -2,7 +2,7 @@
{application, emqx_management, [
{description, "EMQX Management API and CLI"},
% strict semver, bump manually!
{vsn, "5.0.37"},
{vsn, "5.0.38"},
{modules, []},
{registered, [emqx_management_sup]},
{applications, [

View File

@ -758,6 +758,12 @@ subscribe(#{clientid := ClientID, topic := Topic} = Sub) ->
case do_subscribe(ClientID, Topic, Opts) of
{error, channel_not_found} ->
{404, ?CLIENTID_NOT_FOUND};
{error, invalid_subopts_nl} ->
{400, #{
code => <<"INVALID_PARAMETER">>,
message =>
<<"Invalid Subscribe options: `no_local` not allowed for shared-sub. See [MQTT-3.8.3-4]">>
}};
{error, Reason} ->
Message = list_to_binary(io_lib:format("~p", [Reason])),
{500, #{code => <<"UNKNOWN_ERROR">>, message => Message}};
@ -781,10 +787,13 @@ subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
end.
unsubscribe(#{clientid := ClientID, topic := Topic}) ->
{NTopic, _} = emqx_topic:parse(Topic),
case do_unsubscribe(ClientID, Topic) of
{error, channel_not_found} ->
{404, ?CLIENTID_NOT_FOUND};
{unsubscribe, [{Topic, #{}}]} ->
{unsubscribe, [{UnSubedT, #{}}]} when
(UnSubedT =:= NTopic) orelse (UnSubedT =:= Topic)
->
{204}
end.
@ -801,18 +810,25 @@ unsubscribe_batch(#{clientid := ClientID, topics := Topics}) ->
%% internal function
do_subscribe(ClientID, Topic0, Options) ->
{Topic, Opts} = emqx_topic:parse(Topic0, Options),
TopicTable = [{Topic, Opts}],
case emqx_mgmt:subscribe(ClientID, TopicTable) of
{error, Reason} ->
{error, Reason};
{subscribe, Subscriptions, Node} ->
case proplists:is_defined(Topic, Subscriptions) of
true ->
{ok, Options#{node => Node, clientid => ClientID, topic => Topic}};
false ->
{error, unknow_error}
try emqx_topic:parse(Topic0, Options) of
{Topic, Opts} ->
TopicTable = [{Topic, Opts}],
case emqx_mgmt:subscribe(ClientID, TopicTable) of
{error, Reason} ->
{error, Reason};
{subscribe, Subscriptions, Node} ->
case proplists:is_defined(Topic, Subscriptions) of
true ->
{ok, Options#{node => Node, clientid => ClientID, topic => Topic0}};
false ->
{error, unknow_error}
end
end
catch
error:{invalid_subopts_nl, _} ->
{error, invalid_subopts_nl};
_:Reason ->
{error, Reason}
end.
-spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->

View File

@ -17,8 +17,11 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -430,6 +433,132 @@ t_client_id_not_found(_Config) ->
{error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe", "bulk"]), [UnsubBody])
).
t_subscribe_shared_topic(_Config) ->
ClientId = <<"client_subscribe_shared">>,
{ok, C} = emqtt:start_link(#{clientid => ClientId}),
{ok, _} = emqtt:connect(C),
ClientPuber = <<"publish_client">>,
{ok, PC} = emqtt:start_link(#{clientid => ClientPuber}),
{ok, _} = emqtt:connect(PC),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Http200 = {"HTTP/1.1", 200, "OK"},
Http204 = {"HTTP/1.1", 204, "No Content"},
PathFun = fun(Suffix) ->
emqx_mgmt_api_test_util:api_path(["clients", "client_subscribe_shared"] ++ Suffix)
end,
PostFun = fun(Method, Path, Data) ->
emqx_mgmt_api_test_util:request_api(
Method, Path, "", AuthHeader, Data, #{return_all => true}
)
end,
SharedT = <<"$share/group/testtopic">>,
NonSharedT = <<"t/#">>,
SubBodyFun = fun(T) -> #{topic => T, qos => 1, nl => 0, rh => 1} end,
UnSubBodyFun = fun(T) -> #{topic => T} end,
%% ====================
%% Client Subscribe
?assertMatch(
{ok, {Http200, _, _}},
PostFun(post, PathFun(["subscribe"]), SubBodyFun(SharedT))
),
?assertMatch(
{ok, {Http200, _, _}},
PostFun(
post,
PathFun(["subscribe", "bulk"]),
[SubBodyFun(T) || T <- [SharedT, NonSharedT]]
)
),
%% assert subscription
?assertMatch(
[
{_, #share{group = <<"group">>, topic = <<"testtopic">>}},
{_, <<"t/#">>}
],
ets:tab2list(?SUBSCRIPTION)
),
?assertMatch(
[
{{#share{group = <<"group">>, topic = <<"testtopic">>}, _}, #{
nl := 0, qos := 1, rh := 1, rap := 0
}},
{{<<"t/#">>, _}, #{nl := 0, qos := 1, rh := 1, rap := 0}}
],
ets:tab2list(?SUBOPTION)
),
?assertMatch(
[{emqx_shared_subscription, <<"group">>, <<"testtopic">>, _}],
ets:tab2list(emqx_shared_subscription)
),
%% assert subscription virtual
_ = emqtt:publish(PC, <<"testtopic">>, <<"msg1">>, [{qos, 0}]),
?assertReceive({publish, #{topic := <<"testtopic">>, payload := <<"msg1">>}}),
_ = emqtt:publish(PC, <<"t/1">>, <<"msg2">>, [{qos, 0}]),
?assertReceive({publish, #{topic := <<"t/1">>, payload := <<"msg2">>}}),
%% ====================
%% Client Unsubscribe
?assertMatch(
{ok, {Http204, _, _}},
PostFun(post, PathFun(["unsubscribe"]), UnSubBodyFun(SharedT))
),
?assertMatch(
{ok, {Http204, _, _}},
PostFun(
post,
PathFun(["unsubscribe", "bulk"]),
[UnSubBodyFun(T) || T <- [SharedT, NonSharedT]]
)
),
%% assert subscription
?assertEqual([], ets:tab2list(?SUBSCRIPTION)),
?assertEqual([], ets:tab2list(?SUBOPTION)),
?assertEqual([], ets:tab2list(emqx_shared_subscription)),
%% assert subscription virtual
_ = emqtt:publish(PC, <<"testtopic">>, <<"msg3">>, [{qos, 0}]),
_ = emqtt:publish(PC, <<"t/1">>, <<"msg4">>, [{qos, 0}]),
?assertNotReceive({publish, #{topic := <<"testtopic">>, payload := <<"msg3">>}}),
?assertNotReceive({publish, #{topic := <<"t/1">>, payload := <<"msg4">>}}).
t_subscribe_shared_topic_nl(_Config) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Http400 = {"HTTP/1.1", 400, "Bad Request"},
Body =
"{\"code\":\"INVALID_PARAMETER\","
"\"message\":\"Invalid Subscribe options: `no_local` not allowed for shared-sub. See [MQTT-3.8.3-4]\"}",
ClientId = <<"client_subscribe_shared">>,
{ok, C} = emqtt:start_link(#{clientid => ClientId}),
{ok, _} = emqtt:connect(C),
PathFun = fun(Suffix) ->
emqx_mgmt_api_test_util:api_path(["clients", "client_subscribe_shared"] ++ Suffix)
end,
PostFun = fun(Method, Path, Data) ->
emqx_mgmt_api_test_util:request_api(
Method, Path, "", AuthHeader, Data, #{return_all => true}
)
end,
T = <<"$share/group/testtopic">>,
?assertMatch(
{error, {Http400, _, Body}},
PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1})
).
time_string_to_epoch_millisecond(DateTime) ->
time_string_to_epoch(DateTime, millisecond).

View File

@ -0,0 +1,9 @@
Fixed an issue that unable to subscribe or unsubscribe a shared topic filter via HTTP API.
Releated APIs:
- `/clients/:clientid/subscribe`
- `/clients/:clientid/subscribe/bulk`
- `/clients/:clientid/unsubscribe`
- `/clients/:clientid/unsubscribe/bulk`