fix subscribe bug

This commit is contained in:
Gilbert Wong 2018-08-26 22:02:39 +08:00
parent 397179bbda
commit a369fb6960
1 changed files with 18 additions and 7 deletions

View File

@ -309,18 +309,29 @@ handle_call(Req, _From, State) ->
emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
{reply, ignored, State}.
resubscribe(From, {Subscriber, SubOpts, Topic}, State) ->
{SubPid, _} = Subscriber,
Group = maps:get(share, SubOpts, undefined),
true = do_subscribe(Group, Topic, Subscriber, SubOpts),
emqx_shared_sub:subscribe(Group, Topic, SubPid),
emqx_router:add_route(From, Topic, dest(Group)),
{noreply, monitor_subscriber(Subscriber, State)}.
handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) ->
Subscriber = {SubPid, SubId},
case ets:member(?SUBOPTION, {Topic, Subscriber}) of
false ->
Group = maps:get(share, SubOpts, undefined),
true = do_subscribe(Group, Topic, Subscriber, SubOpts),
emqx_shared_sub:subscribe(Group, Topic, SubPid),
emqx_router:add_route(From, Topic, dest(Group)),
{noreply, monitor_subscriber(Subscriber, State)};
resubscribe(From, {Subscriber, SubOpts, Topic}, State);
true ->
gen_server:reply(From, ok),
{noreply, State}
case ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) =:= SubOpts of
true ->
io:format("Ets: ~p, SubOpts: ~p", [ets:lookup_element(?SUBOPTION, Topic, Subscriber), SubOpts]),
gen_server:reply(From, ok),
{noreply, State};
false ->
resubscribe(From, {Subscriber, SubOpts, Topic}, State)
end
end;
handle_cast({From, #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId}}, State) ->