diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index b816a0a69..50fb06e0e 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -258,7 +258,7 @@ subscriptions(Subscriber) -> subscription(Topic, Subscriber); ({_, Topic}) -> subscription(Topic, Subscriber) - end, ets:lookup(?SUBSCRIPTION, Subscriber)). + end, ets:lookup(?SUBSCRIPTION, Subscriber)). subscription(Topic, Subscriber) -> {Topic, ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)}. @@ -336,15 +336,14 @@ handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, sub Subscriber = {SubPid, SubId}, case ets:member(?SUBOPTION, {Topic, Subscriber}) of false -> - resubscribe(From, {Subscriber, SubOpts, Topic}, State); + Group = maps:get(share, SubOpts, undefined), + true = do_subscribe(Group, Topic, Subscriber, SubOpts), + emqx_shared_sub:subscribe(Group, Topic, SubPid), + emqx_router:add_route(From, Topic, dest(Group)), + {noreply, monitor_subscriber(Subscriber, State)}; true -> - case ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) =:= SubOpts of - true -> - gen_server:reply(From, ok), - {noreply, State}; - false -> - resubscribe(From, {Subscriber, SubOpts, Topic}, State) - end + gen_server:reply(From, ok), + {noreply, State} end; handle_cast({From, #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId}}, State) -> @@ -390,26 +389,9 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -resubscribe(From, {Subscriber, SubOpts, Topic}, State) -> - {SubPid, _} = Subscriber, - Group = maps:get(share, SubOpts, undefined), - true = do_subscribe(Group, Topic, Subscriber, SubOpts), - emqx_shared_sub:subscribe(Group, Topic, SubPid), - emqx_router:add_route(From, Topic, dest(Group)), - {noreply, monitor_subscriber(Subscriber, State)}. - -insert_subscriber(Group, Topic, Subscriber) -> - Subscribers = subscribers(Topic), - case lists:member(Subscriber, Subscribers) of - false -> - ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}); - _ -> - ok - end. - do_subscribe(Group, Topic, Subscriber, SubOpts) -> ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}), - insert_subscriber(Group, Topic, Subscriber), + ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}), ets:insert(?SUBOPTION, {{Topic, Subscriber}, SubOpts}). do_unsubscribe(Group, Topic, Subscriber) -> diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index b9bfb4257..f9a6dcf40 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -92,10 +92,8 @@ pubsub(_) -> true = emqx:set_subopts(<<"a/b/c">>, Subscriber, #{qos => 0}), #{qos := 0} = emqx:get_subopts(<<"a/b/c">>, Subscriber), ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }), - #{qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]), timer:sleep(10), - [{<<"a/b/c">>, #{qos := 2}}] = emqx_broker:subscriptions(Subscriber), [{Self, <<"clientId">>}] = emqx_broker:subscribers(<<"a/b/c">>), emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), ?assert(receive {dispatch, <<"a/b/c">>, _ } -> true; P -> ct:log("Receive Message: ~p~n",[P]) after 2 -> false end), diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index 6de507fca..021109606 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -161,7 +161,7 @@ basic_test(_Config) -> ct:print("Basic test starting"), {ok, C} = emqx_client:start_link(), {ok, _} = emqx_client:connect(C), - + {ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1), {ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2), {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),