Fix subscription
This commit is contained in:
parent
d11e734dae
commit
e0eb76afa6
|
@ -258,7 +258,7 @@ subscriptions(Subscriber) ->
|
||||||
subscription(Topic, Subscriber);
|
subscription(Topic, Subscriber);
|
||||||
({_, Topic}) ->
|
({_, Topic}) ->
|
||||||
subscription(Topic, Subscriber)
|
subscription(Topic, Subscriber)
|
||||||
end, ets:lookup(?SUBSCRIPTION, Subscriber)).
|
end, ets:lookup(?SUBSCRIPTION, Subscriber)).
|
||||||
|
|
||||||
subscription(Topic, Subscriber) ->
|
subscription(Topic, Subscriber) ->
|
||||||
{Topic, ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)}.
|
{Topic, ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)}.
|
||||||
|
@ -336,15 +336,14 @@ handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, sub
|
||||||
Subscriber = {SubPid, SubId},
|
Subscriber = {SubPid, SubId},
|
||||||
case ets:member(?SUBOPTION, {Topic, Subscriber}) of
|
case ets:member(?SUBOPTION, {Topic, Subscriber}) of
|
||||||
false ->
|
false ->
|
||||||
resubscribe(From, {Subscriber, SubOpts, Topic}, State);
|
Group = maps:get(share, SubOpts, undefined),
|
||||||
|
true = do_subscribe(Group, Topic, Subscriber, SubOpts),
|
||||||
|
emqx_shared_sub:subscribe(Group, Topic, SubPid),
|
||||||
|
emqx_router:add_route(From, Topic, dest(Group)),
|
||||||
|
{noreply, monitor_subscriber(Subscriber, State)};
|
||||||
true ->
|
true ->
|
||||||
case ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) =:= SubOpts of
|
gen_server:reply(From, ok),
|
||||||
true ->
|
{noreply, State}
|
||||||
gen_server:reply(From, ok),
|
|
||||||
{noreply, State};
|
|
||||||
false ->
|
|
||||||
resubscribe(From, {Subscriber, SubOpts, Topic}, State)
|
|
||||||
end
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_cast({From, #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId}}, State) ->
|
handle_cast({From, #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId}}, State) ->
|
||||||
|
@ -390,26 +389,9 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
resubscribe(From, {Subscriber, SubOpts, Topic}, State) ->
|
|
||||||
{SubPid, _} = Subscriber,
|
|
||||||
Group = maps:get(share, SubOpts, undefined),
|
|
||||||
true = do_subscribe(Group, Topic, Subscriber, SubOpts),
|
|
||||||
emqx_shared_sub:subscribe(Group, Topic, SubPid),
|
|
||||||
emqx_router:add_route(From, Topic, dest(Group)),
|
|
||||||
{noreply, monitor_subscriber(Subscriber, State)}.
|
|
||||||
|
|
||||||
insert_subscriber(Group, Topic, Subscriber) ->
|
|
||||||
Subscribers = subscribers(Topic),
|
|
||||||
case lists:member(Subscriber, Subscribers) of
|
|
||||||
false ->
|
|
||||||
ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)});
|
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_subscribe(Group, Topic, Subscriber, SubOpts) ->
|
do_subscribe(Group, Topic, Subscriber, SubOpts) ->
|
||||||
ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}),
|
ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}),
|
||||||
insert_subscriber(Group, Topic, Subscriber),
|
ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}),
|
||||||
ets:insert(?SUBOPTION, {{Topic, Subscriber}, SubOpts}).
|
ets:insert(?SUBOPTION, {{Topic, Subscriber}, SubOpts}).
|
||||||
|
|
||||||
do_unsubscribe(Group, Topic, Subscriber) ->
|
do_unsubscribe(Group, Topic, Subscriber) ->
|
||||||
|
|
|
@ -92,10 +92,8 @@ pubsub(_) ->
|
||||||
true = emqx:set_subopts(<<"a/b/c">>, Subscriber, #{qos => 0}),
|
true = emqx:set_subopts(<<"a/b/c">>, Subscriber, #{qos => 0}),
|
||||||
#{qos := 0} = emqx:get_subopts(<<"a/b/c">>, Subscriber),
|
#{qos := 0} = emqx:get_subopts(<<"a/b/c">>, Subscriber),
|
||||||
ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }),
|
ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }),
|
||||||
#{qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
|
|
||||||
%% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
|
%% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
|
||||||
timer:sleep(10),
|
timer:sleep(10),
|
||||||
[{<<"a/b/c">>, #{qos := 2}}] = emqx_broker:subscriptions(Subscriber),
|
|
||||||
[{Self, <<"clientId">>}] = emqx_broker:subscribers(<<"a/b/c">>),
|
[{Self, <<"clientId">>}] = emqx_broker:subscribers(<<"a/b/c">>),
|
||||||
emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
|
emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
|
||||||
?assert(receive {dispatch, <<"a/b/c">>, _ } -> true; P -> ct:log("Receive Message: ~p~n",[P]) after 2 -> false end),
|
?assert(receive {dispatch, <<"a/b/c">>, _ } -> true; P -> ct:log("Receive Message: ~p~n",[P]) after 2 -> false end),
|
||||||
|
|
|
@ -161,7 +161,7 @@ basic_test(_Config) ->
|
||||||
ct:print("Basic test starting"),
|
ct:print("Basic test starting"),
|
||||||
{ok, C} = emqx_client:start_link(),
|
{ok, C} = emqx_client:start_link(),
|
||||||
{ok, _} = emqx_client:connect(C),
|
{ok, _} = emqx_client:connect(C),
|
||||||
|
{ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1),
|
||||||
{ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2),
|
{ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2),
|
||||||
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
|
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
|
||||||
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
|
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
|
||||||
|
|
Loading…
Reference in New Issue