diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index bc6946a43..b48c05af5 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -269,10 +269,11 @@ do_join(_TopicAcc, [C | Words]) when ?MULTI_LEVEL_WILDCARD_NOT_LAST(C, Words) -> do_join(TopicAcc, [Word | Words]) -> do_join(<>, Words). --spec parse(topic() | {topic(), map()}) -> {topic() | share(), map()}. -parse(TopicFilter) when is_binary(TopicFilter) -> +-spec parse(TF | {TF, map()}) -> {TF, map()} when + TF :: topic() | share(). +parse(TopicFilter) when ?IS_TOPIC(TopicFilter) -> parse(TopicFilter, #{}); -parse({TopicFilter, Options}) when is_binary(TopicFilter) -> +parse({TopicFilter, Options}) when ?IS_TOPIC(TopicFilter) -> parse(TopicFilter, Options). -spec parse(topic() | share(), map()) -> {topic() | share(), map()}. @@ -282,6 +283,10 @@ parse(#share{topic = Topic = <>}, _Options) -> error({invalid_topic_filter, Topic}); parse(#share{topic = Topic = <>}, _Options) -> error({invalid_topic_filter, Topic}); +parse(#share{} = T, #{nl := 1} = _Options) -> + %% Protocol Error and Should Disconnect + %% MQTT-5.0 [MQTT-3.8.3-4] and [MQTT-4.13.1-1] + error({invalid_subopts_nl, maybe_format_share(T)}); parse(<>, Options) -> parse(#share{group = <>, topic = Topic}, Options); parse(TopicFilter = <>, Options) -> diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index a44983596..ad9e12b90 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -2,7 +2,7 @@ {application, emqx_management, [ {description, "EMQX Management API and CLI"}, % strict semver, bump manually! - {vsn, "5.0.37"}, + {vsn, "5.0.38"}, {modules, []}, {registered, [emqx_management_sup]}, {applications, [ diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index f394ffefa..e6a9b9555 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -758,6 +758,12 @@ subscribe(#{clientid := ClientID, topic := Topic} = Sub) -> case do_subscribe(ClientID, Topic, Opts) of {error, channel_not_found} -> {404, ?CLIENTID_NOT_FOUND}; + {error, invalid_subopts_nl} -> + {400, #{ + code => <<"INVALID_PARAMETER">>, + message => + <<"Invalid Subscribe options: `no_local` not allowed for shared-sub. See [MQTT-3.8.3-4]">> + }}; {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOWN_ERROR">>, message => Message}}; @@ -781,10 +787,13 @@ subscribe_batch(#{clientid := ClientID, topics := Topics}) -> end. unsubscribe(#{clientid := ClientID, topic := Topic}) -> + {NTopic, _} = emqx_topic:parse(Topic), case do_unsubscribe(ClientID, Topic) of {error, channel_not_found} -> {404, ?CLIENTID_NOT_FOUND}; - {unsubscribe, [{Topic, #{}}]} -> + {unsubscribe, [{UnSubedT, #{}}]} when + (UnSubedT =:= NTopic) orelse (UnSubedT =:= Topic) + -> {204} end. @@ -801,18 +810,25 @@ unsubscribe_batch(#{clientid := ClientID, topics := Topics}) -> %% internal function do_subscribe(ClientID, Topic0, Options) -> - {Topic, Opts} = emqx_topic:parse(Topic0, Options), - TopicTable = [{Topic, Opts}], - case emqx_mgmt:subscribe(ClientID, TopicTable) of - {error, Reason} -> - {error, Reason}; - {subscribe, Subscriptions, Node} -> - case proplists:is_defined(Topic, Subscriptions) of - true -> - {ok, Options#{node => Node, clientid => ClientID, topic => Topic}}; - false -> - {error, unknow_error} + try emqx_topic:parse(Topic0, Options) of + {Topic, Opts} -> + TopicTable = [{Topic, Opts}], + case emqx_mgmt:subscribe(ClientID, TopicTable) of + {error, Reason} -> + {error, Reason}; + {subscribe, Subscriptions, Node} -> + case proplists:is_defined(Topic, Subscriptions) of + true -> + {ok, Options#{node => Node, clientid => ClientID, topic => Topic0}}; + false -> + {error, unknow_error} + end end + catch + error:{invalid_subopts_nl, _} -> + {error, invalid_subopts_nl}; + _:Reason -> + {error, Reason} end. -spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 6daf918f1..6d919a38a 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -17,8 +17,11 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/asserts.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -430,6 +433,132 @@ t_client_id_not_found(_Config) -> {error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe", "bulk"]), [UnsubBody]) ). +t_subscribe_shared_topic(_Config) -> + ClientId = <<"client_subscribe_shared">>, + + {ok, C} = emqtt:start_link(#{clientid => ClientId}), + {ok, _} = emqtt:connect(C), + + ClientPuber = <<"publish_client">>, + {ok, PC} = emqtt:start_link(#{clientid => ClientPuber}), + {ok, _} = emqtt:connect(PC), + + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + + Http200 = {"HTTP/1.1", 200, "OK"}, + Http204 = {"HTTP/1.1", 204, "No Content"}, + + PathFun = fun(Suffix) -> + emqx_mgmt_api_test_util:api_path(["clients", "client_subscribe_shared"] ++ Suffix) + end, + + PostFun = fun(Method, Path, Data) -> + emqx_mgmt_api_test_util:request_api( + Method, Path, "", AuthHeader, Data, #{return_all => true} + ) + end, + + SharedT = <<"$share/group/testtopic">>, + NonSharedT = <<"t/#">>, + + SubBodyFun = fun(T) -> #{topic => T, qos => 1, nl => 0, rh => 1} end, + UnSubBodyFun = fun(T) -> #{topic => T} end, + + %% ==================== + %% Client Subscribe + ?assertMatch( + {ok, {Http200, _, _}}, + PostFun(post, PathFun(["subscribe"]), SubBodyFun(SharedT)) + ), + ?assertMatch( + {ok, {Http200, _, _}}, + PostFun( + post, + PathFun(["subscribe", "bulk"]), + [SubBodyFun(T) || T <- [SharedT, NonSharedT]] + ) + ), + + %% assert subscription + ?assertMatch( + [ + {_, #share{group = <<"group">>, topic = <<"testtopic">>}}, + {_, <<"t/#">>} + ], + ets:tab2list(?SUBSCRIPTION) + ), + + ?assertMatch( + [ + {{#share{group = <<"group">>, topic = <<"testtopic">>}, _}, #{ + nl := 0, qos := 1, rh := 1, rap := 0 + }}, + {{<<"t/#">>, _}, #{nl := 0, qos := 1, rh := 1, rap := 0}} + ], + ets:tab2list(?SUBOPTION) + ), + ?assertMatch( + [{emqx_shared_subscription, <<"group">>, <<"testtopic">>, _}], + ets:tab2list(emqx_shared_subscription) + ), + + %% assert subscription virtual + _ = emqtt:publish(PC, <<"testtopic">>, <<"msg1">>, [{qos, 0}]), + ?assertReceive({publish, #{topic := <<"testtopic">>, payload := <<"msg1">>}}), + _ = emqtt:publish(PC, <<"t/1">>, <<"msg2">>, [{qos, 0}]), + ?assertReceive({publish, #{topic := <<"t/1">>, payload := <<"msg2">>}}), + + %% ==================== + %% Client Unsubscribe + ?assertMatch( + {ok, {Http204, _, _}}, + PostFun(post, PathFun(["unsubscribe"]), UnSubBodyFun(SharedT)) + ), + ?assertMatch( + {ok, {Http204, _, _}}, + PostFun( + post, + PathFun(["unsubscribe", "bulk"]), + [UnSubBodyFun(T) || T <- [SharedT, NonSharedT]] + ) + ), + + %% assert subscription + ?assertEqual([], ets:tab2list(?SUBSCRIPTION)), + ?assertEqual([], ets:tab2list(?SUBOPTION)), + ?assertEqual([], ets:tab2list(emqx_shared_subscription)), + + %% assert subscription virtual + _ = emqtt:publish(PC, <<"testtopic">>, <<"msg3">>, [{qos, 0}]), + _ = emqtt:publish(PC, <<"t/1">>, <<"msg4">>, [{qos, 0}]), + ?assertNotReceive({publish, #{topic := <<"testtopic">>, payload := <<"msg3">>}}), + ?assertNotReceive({publish, #{topic := <<"t/1">>, payload := <<"msg4">>}}). + +t_subscribe_shared_topic_nl(_Config) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Http400 = {"HTTP/1.1", 400, "Bad Request"}, + Body = + "{\"code\":\"INVALID_PARAMETER\"," + "\"message\":\"Invalid Subscribe options: `no_local` not allowed for shared-sub. See [MQTT-3.8.3-4]\"}", + ClientId = <<"client_subscribe_shared">>, + + {ok, C} = emqtt:start_link(#{clientid => ClientId}), + {ok, _} = emqtt:connect(C), + + PathFun = fun(Suffix) -> + emqx_mgmt_api_test_util:api_path(["clients", "client_subscribe_shared"] ++ Suffix) + end, + PostFun = fun(Method, Path, Data) -> + emqx_mgmt_api_test_util:request_api( + Method, Path, "", AuthHeader, Data, #{return_all => true} + ) + end, + T = <<"$share/group/testtopic">>, + ?assertMatch( + {error, {Http400, _, Body}}, + PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1}) + ). + time_string_to_epoch_millisecond(DateTime) -> time_string_to_epoch(DateTime, millisecond). diff --git a/changes/ce/fix-12598.en.md b/changes/ce/fix-12598.en.md new file mode 100644 index 000000000..fd077d492 --- /dev/null +++ b/changes/ce/fix-12598.en.md @@ -0,0 +1,9 @@ +Fixed an issue that unable to subscribe or unsubscribe a shared topic filter via HTTP API. + +Releated APIs: + +- `/clients/:clientid/subscribe` +- `/clients/:clientid/subscribe/bulk` + +- `/clients/:clientid/unsubscribe` +- `/clients/:clientid/unsubscribe/bulk`