diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 24cd27ab8..7184bb1b3 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -309,18 +309,29 @@ handle_call(Req, _From, State) -> emqx_logger:error("[Broker] unexpected call: ~p", [Req]), {reply, ignored, State}. +resubscribe(From, {Subscriber, SubOpts, Topic}, State) -> + {SubPid, _} = Subscriber, + Group = maps:get(share, SubOpts, undefined), + true = do_subscribe(Group, Topic, Subscriber, SubOpts), + emqx_shared_sub:subscribe(Group, Topic, SubPid), + emqx_router:add_route(From, Topic, dest(Group)), + {noreply, monitor_subscriber(Subscriber, State)}. + + handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) -> Subscriber = {SubPid, SubId}, case ets:member(?SUBOPTION, {Topic, Subscriber}) of false -> - Group = maps:get(share, SubOpts, undefined), - true = do_subscribe(Group, Topic, Subscriber, SubOpts), - emqx_shared_sub:subscribe(Group, Topic, SubPid), - emqx_router:add_route(From, Topic, dest(Group)), - {noreply, monitor_subscriber(Subscriber, State)}; + resubscribe(From, {Subscriber, SubOpts, Topic}, State); true -> - gen_server:reply(From, ok), - {noreply, State} + case ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) =:= SubOpts of + true -> + io:format("Ets: ~p, SubOpts: ~p", [ets:lookup_element(?SUBOPTION, Topic, Subscriber), SubOpts]), + gen_server:reply(From, ok), + {noreply, State}; + false -> + resubscribe(From, {Subscriber, SubOpts, Topic}, State) + end end; handle_cast({From, #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId}}, State) ->