diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 6d1ee98cb..429c6097d 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -87,7 +87,7 @@ subscribe(Topic) when is_binary(Topic) -> -spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok). subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> - subscribe(Topic, SubId, #{}); + subscribe(Topic, SubId, #{qos => 0}); subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) -> subscribe(Topic, undefined, SubOpts). diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 1e186b0ab..9117dd1b8 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -482,7 +482,7 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) -> case maps:find(Topic, SubMap) of {ok, SubOpts} -> - ok = emqx_broker:unsubscribe(Topic, ClientId), + ok = emqx_broker:unsubscribe(Topic), emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]), {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; error -> diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index f9a6dcf40..b2031a61c 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -36,7 +36,6 @@ groups() -> [ {pubsub, [sequence], [subscribe_unsubscribe, publish, pubsub, - t_local_subscribe, t_shared_subscribe, dispatch_with_no_sub, 'pubsub#', 'pubsub+']}, @@ -61,14 +60,14 @@ subscribe_unsubscribe(_) -> ok = emqx:subscribe(<<"topic">>, <<"clientId">>), ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, #{ qos => 1 }), ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }), - true = emqx:subscribed(<<"topic">>, <<"clientId">>), + true = emqx:subscribed(<<"clientId">>, <<"topic">>), Topics = emqx:topics(), lists:foreach(fun(Topic) -> ?assert(lists:member(Topic, Topics)) end, Topics), - ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>), - ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>), - ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>). + ok = emqx:unsubscribe(<<"topic">>), + ok = emqx:unsubscribe(<<"topic/1">>), + ok = emqx:unsubscribe(<<"topic/2">>). publish(_) -> Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>), @@ -85,18 +84,25 @@ dispatch_with_no_sub(_) -> pubsub(_) -> true = emqx:is_running(node()), Self = self(), - Subscriber = {Self, <<"clientId">>}, - ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 1 }), - #{qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), - #{qos := 1} = emqx:get_subopts(<<"a/b/c">>, Subscriber), - true = emqx:set_subopts(<<"a/b/c">>, Subscriber, #{qos => 0}), - #{qos := 0} = emqx:get_subopts(<<"a/b/c">>, Subscriber), - ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }), + Subscriber = <<"clientId">>, + ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 1 }), + #{qos := 1} = ets:lookup_element(emqx_suboption, {Self, <<"a/b/c">>}, 2), + #{qos := 1} = emqx_broker:get_subopts(Subscriber, <<"a/b/c">>), + true = emqx_broker:set_subopts(<<"a/b/c">>, #{qos => 0}), + #{qos := 0} = emqx_broker:get_subopts(Subscriber, <<"a/b/c">>), + ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 2 }), %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]), timer:sleep(10), - [{Self, <<"clientId">>}] = emqx_broker:subscribers(<<"a/b/c">>), + [Self] = emqx_broker:subscribers(<<"a/b/c">>), emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), - ?assert(receive {dispatch, <<"a/b/c">>, _ } -> true; P -> ct:log("Receive Message: ~p~n",[P]) after 2 -> false end), + ?assert( + receive {dispatch, <<"a/b/c">>, _ } -> + true; + P -> + ct:log("Receive Message: ~p~n",[P]) + after 2 -> + false + end), spawn(fun() -> emqx:subscribe(<<"a/b/c">>), emqx:subscribe(<<"c/d/e">>), @@ -106,38 +112,15 @@ pubsub(_) -> timer:sleep(20), emqx:unsubscribe(<<"a/b/c">>). -t_local_subscribe(_) -> - ok = emqx:subscribe(<<"$local/topic0">>), - ok = emqx:subscribe(<<"$local/topic1">>, <<"clientId">>), - ok = emqx:subscribe(<<"$local/topic2">>, <<"clientId">>, #{ qos => 2 }), - timer:sleep(10), - ?assertEqual([{self(), undefined}], emqx:subscribers("$local/topic0")), - ?assertEqual([{self(), <<"clientId">>}], emqx:subscribers("$local/topic1")), - ?assertEqual([{<<"$local/topic1">>, #{ qos => 0 }}, - {<<"$local/topic2">>, #{ qos => 2 }}], - emqx:subscriptions({self(), <<"clientId">>})), - ?assertEqual(ok, emqx:unsubscribe("$local/topic0")), - ?assertEqual(ok, emqx:unsubscribe("$local/topic0")), - ?assertEqual(ok, emqx:unsubscribe("$local/topic1", <<"clientId">>)), - ?assertEqual(ok, emqx:unsubscribe("$local/topic2", <<"clientId">>)), - ?assertEqual([], emqx:subscribers("topic1")), - ?assertEqual([], emqx:subscriptions({self(), <<"clientId">>})). - t_shared_subscribe(_) -> - emqx:subscribe("$local/$share/group1/topic1"), emqx:subscribe("$share/group2/topic2"), emqx:subscribe("$queue/topic3"), timer:sleep(10), - ct:log("share subscriptions: ~p~n", [emqx:subscriptions({self(), undefined})]), - ?assertEqual([{self(), undefined}], emqx:subscribers(<<"$local/$share/group1/topic1">>)), - ?assertEqual([{<<"$local/$share/group1/topic1">>, #{qos => 0}}, - {<<"$queue/topic3">>, #{qos => 0}}, - {<<"$share/group2/topic2">>, #{qos => 0}}], - lists:sort(emqx:subscriptions({self(), undefined}))), - emqx:unsubscribe("$local/$share/group1/topic1"), + ct:log("share subscriptions: ~p~n", [emqx:subscriptions(self())]), + ?assertEqual(2, length(emqx:subscriptions(self()))), emqx:unsubscribe("$share/group2/topic2"), emqx:unsubscribe("$queue/topic3"), - ?assertEqual([], lists:sort(emqx:subscriptions(self()))). + ?assertEqual(0, length(emqx:subscriptions(self()))). 'pubsub#'(_) -> emqx:subscribe(<<"a/#">>), diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index a04a0b82b..2e597340d 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -53,7 +53,7 @@ t_session_all(_) -> emqx_session:subscribe(SPid, [{<<"topic">>, #{qos => 2}}]), emqx_session:subscribe(SPid, [{<<"topic">>, #{qos => 1}}]), timer:sleep(200), - [{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}), + [{<<"topic">>, _}] = emqx:subscriptions(SPid), emqx_session:publish(SPid, 1, Message1), timer:sleep(200), {publish, 1, _} = emqx_mock_client:get_last_message(ConnPid), @@ -76,5 +76,5 @@ t_session_all(_) -> 1 = proplists:get_value(subscriptions_count, Stats), emqx_session:unsubscribe(SPid, [<<"topic">>]), timer:sleep(200), - [] = emqx:subscriptions({SPid, <<"clientId">>}), + [] = emqx:subscriptions(SPid), emqx_mock_client:close_session(ConnPid).