diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 0d9e8eef6..6d919a38a 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -17,8 +17,11 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/asserts.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -435,7 +438,10 @@ t_subscribe_shared_topic(_Config) -> {ok, C} = emqtt:start_link(#{clientid => ClientId}), {ok, _} = emqtt:connect(C), - {ok, _, _} = emqtt:subscribe(C, <<"topic/1">>, 1), + + ClientPuber = <<"publish_client">>, + {ok, PC} = emqtt:start_link(#{clientid => ClientPuber}), + {ok, _} = emqtt:connect(PC), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), @@ -453,11 +459,12 @@ t_subscribe_shared_topic(_Config) -> end, SharedT = <<"$share/group/testtopic">>, - NonSharedT = <<"t/1">>, + NonSharedT = <<"t/#">>, - SubBodyFun = fun(T) -> #{topic => T, qos => 1, nl => 1, rh => 1} end, + SubBodyFun = fun(T) -> #{topic => T, qos => 1, nl => 0, rh => 1} end, UnSubBodyFun = fun(T) -> #{topic => T} end, + %% ==================== %% Client Subscribe ?assertMatch( {ok, {Http200, _, _}}, @@ -472,6 +479,36 @@ t_subscribe_shared_topic(_Config) -> ) ), + %% assert subscription + ?assertMatch( + [ + {_, #share{group = <<"group">>, topic = <<"testtopic">>}}, + {_, <<"t/#">>} + ], + ets:tab2list(?SUBSCRIPTION) + ), + + ?assertMatch( + [ + {{#share{group = <<"group">>, topic = <<"testtopic">>}, _}, #{ + nl := 0, qos := 1, rh := 1, rap := 0 + }}, + {{<<"t/#">>, _}, #{nl := 0, qos := 1, rh := 1, rap := 0}} + ], + ets:tab2list(?SUBOPTION) + ), + ?assertMatch( + [{emqx_shared_subscription, <<"group">>, <<"testtopic">>, _}], + ets:tab2list(emqx_shared_subscription) + ), + + %% assert subscription virtual + _ = emqtt:publish(PC, <<"testtopic">>, <<"msg1">>, [{qos, 0}]), + ?assertReceive({publish, #{topic := <<"testtopic">>, payload := <<"msg1">>}}), + _ = emqtt:publish(PC, <<"t/1">>, <<"msg2">>, [{qos, 0}]), + ?assertReceive({publish, #{topic := <<"t/1">>, payload := <<"msg2">>}}), + + %% ==================== %% Client Unsubscribe ?assertMatch( {ok, {Http204, _, _}}, @@ -484,6 +521,42 @@ t_subscribe_shared_topic(_Config) -> PathFun(["unsubscribe", "bulk"]), [UnSubBodyFun(T) || T <- [SharedT, NonSharedT]] ) + ), + + %% assert subscription + ?assertEqual([], ets:tab2list(?SUBSCRIPTION)), + ?assertEqual([], ets:tab2list(?SUBOPTION)), + ?assertEqual([], ets:tab2list(emqx_shared_subscription)), + + %% assert subscription virtual + _ = emqtt:publish(PC, <<"testtopic">>, <<"msg3">>, [{qos, 0}]), + _ = emqtt:publish(PC, <<"t/1">>, <<"msg4">>, [{qos, 0}]), + ?assertNotReceive({publish, #{topic := <<"testtopic">>, payload := <<"msg3">>}}), + ?assertNotReceive({publish, #{topic := <<"t/1">>, payload := <<"msg4">>}}). + +t_subscribe_shared_topic_nl(_Config) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Http400 = {"HTTP/1.1", 400, "Bad Request"}, + Body = + "{\"code\":\"INVALID_PARAMETER\"," + "\"message\":\"Invalid Subscribe options: `no_local` not allowed for shared-sub. See [MQTT-3.8.3-4]\"}", + ClientId = <<"client_subscribe_shared">>, + + {ok, C} = emqtt:start_link(#{clientid => ClientId}), + {ok, _} = emqtt:connect(C), + + PathFun = fun(Suffix) -> + emqx_mgmt_api_test_util:api_path(["clients", "client_subscribe_shared"] ++ Suffix) + end, + PostFun = fun(Method, Path, Data) -> + emqx_mgmt_api_test_util:request_api( + Method, Path, "", AuthHeader, Data, #{return_all => true} + ) + end, + T = <<"$share/group/testtopic">>, + ?assertMatch( + {error, {Http400, _, Body}}, + PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1}) ). time_string_to_epoch_millisecond(DateTime) ->