diff --git a/docs/source/changes.rst b/docs/source/changes.rst index 7616f8b76..218d9e0a7 100644 --- a/docs/source/changes.rst +++ b/docs/source/changes.rst @@ -5,6 +5,33 @@ Changes ======= +.. _release_2.0_beta.3: + +------------------ +Version 2.0-beta.3 +------------------ + +*Release Date: 2016-09-18* + +New Features +------------ + +Shared Suscriptions (#639, #416):: + + mosquitto_sub -t '$queue/topic' + mosquitto_sub -t '$share/group/topic' + +Local Subscriptions that will not create global routes:: + + mosquitto_sub -t '$local/topic' + +Bugfix +------ + +Error on Loading `emqttd_auth_http` (#691) + +Remove 'emqttd' application from dependencies (emqttd_coap PR#3) + .. _release_2.0_beta.2: ------------------ diff --git a/src/emqttd.erl b/src/emqttd.erl index 5221969de..9c5d652a9 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -35,9 +35,6 @@ %% Hooks API -export([hook/4, hook/3, unhook/2, run_hooks/3]). -%% Adapter --export([adapter/1]). - %% Debug API -export([dump/0]). @@ -99,12 +96,12 @@ subscribe(Topic, Subscriber) -> -spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | pubsub_error()). subscribe(Topic, Subscriber, Options) -> - with_pubsub(fun(PS) -> PS:subscribe(iolist_to_binary(Topic), Subscriber, Options) end). + emqttd_server:subscribe(iolist_to_binary(Topic), Subscriber, Options). %% @doc Publish MQTT Message -spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore). publish(Msg) -> - with_pubsub(fun(PS) -> PS:publish(Msg) end). + emqttd_server:publish(Msg). %% @doc Unsubscribe -spec(unsubscribe(iodata()) -> ok | pubsub_error()). @@ -113,32 +110,30 @@ unsubscribe(Topic) -> -spec(unsubscribe(iodata(), subscriber()) -> ok | pubsub_error()). unsubscribe(Topic, Subscriber) -> - with_pubsub(fun(PS) -> PS:unsubscribe(iolist_to_binary(Topic), Subscriber) end). + emqttd_server:unsubscribe(iolist_to_binary(Topic), Subscriber). -spec(setqos(binary(), subscriber(), mqtt_qos()) -> ok). setqos(Topic, Subscriber, Qos) -> - with_pubsub(fun(PS) -> PS:setqos(iolist_to_binary(Topic), Subscriber, Qos) end). + emqttd_server:setqos(iolist_to_binary(Topic), Subscriber, Qos). -spec(topics() -> [binary()]). topics() -> emqttd_router:topics(). -spec(subscribers(iodata()) -> list(subscriber())). subscribers(Topic) -> - with_pubsub(fun(PS) -> PS:subscribers(iolist_to_binary(Topic)) end). + emqttd_server:subscribers(iolist_to_binary(Topic)). -spec(subscriptions(subscriber()) -> [{binary(), suboption()}]). subscriptions(Subscriber) -> - with_pubsub(fun(PS) -> PS:subscriptions(Subscriber) end). + emqttd_server:subscriptions(Subscriber). -spec(is_subscribed(iodata(), subscriber()) -> boolean()). is_subscribed(Topic, Subscriber) -> - with_pubsub(fun(PS) -> PS:is_subscribed(iolist_to_binary(Topic), Subscriber) end). + emqttd_server:is_subscribed(iolist_to_binary(Topic), Subscriber). -spec(subscriber_down(subscriber()) -> ok). subscriber_down(Subscriber) -> - with_pubsub(fun(PS) -> PS:subscriber_down(Subscriber) end). - -with_pubsub(Fun) -> Fun(env(pubsub_server, emqttd_server)). + emqttd_server:subscriber_down(Subscriber). %%-------------------------------------------------------------------- %% Hooks API @@ -160,17 +155,9 @@ unhook(Hook, Function) -> run_hooks(Hook, Args, Acc) -> emqttd_hook:run(Hook, Args, Acc). -%%-------------------------------------------------------------------- -%% Adapter -%%-------------------------------------------------------------------- - -adapter(server) -> env(pubsub_server, emqttd_server); -adapter(pubsub) -> env(pubsub_adapter, emqttd_pubsub); -adapter(bridge) -> env(bridge_adapter, emqttd_bridge). - %%-------------------------------------------------------------------- %% Debug %%-------------------------------------------------------------------- -dump() -> with_pubsub(fun(PS) -> lists:append([PS:dump(), emqttd_router:dump()]) end). +dump() -> lists:append([emqttd_server:dump(), emqttd_router:dump()]). diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index 537f5a832..de51b8f5d 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -25,7 +25,7 @@ -include("emqttd_internal.hrl"). %% API Function Exports --export([start_link/3]). +-export([start_link/5]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -33,7 +33,8 @@ -define(PING_DOWN_INTERVAL, 1000). --record(state, {node, subtopic, +-record(state, {pool, id, + node, subtopic, qos = ?QOS_2, topic_suffix = <<>>, topic_prefix = <<>>, @@ -55,25 +56,28 @@ %%-------------------------------------------------------------------- %% @doc Start a bridge --spec(start_link(atom(), binary(), [option()]) -> {ok, pid()} | ignore | {error, term()}). -start_link(Node, Topic, Options) -> - gen_server2:start_link(?MODULE, [Node, Topic, Options], []). +-spec(start_link(any(), pos_integer(), atom(), binary(), [option()]) -> + {ok, pid()} | ignore | {error, term()}). +start_link(Pool, Id, Node, Topic, Options) -> + gen_server2:start_link(?MODULE, [Pool, Id, Node, Topic, Options], []). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- -init([Node, Topic, Options]) -> +init([Pool, Id, Node, Topic, Options]) -> + ?GPROC_POOL(join, Pool, Id), process_flag(trap_exit, true), case net_kernel:connect_node(Node) of true -> true = erlang:monitor_node(Node, true), + Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), + emqttd:subscribe(Topic, self(), [local, {share, Share}]), State = parse_opts(Options, #state{node = Node, subtopic = Topic}), MQueue = emqttd_mqueue:new(qname(Node, Topic), [{max_len, State#state.max_queue_len}], emqttd_alarm:alarm_fun()), - emqttd:subscribe(Topic), - {ok, State#state{mqueue = MQueue}}; + {ok, State#state{pool = Pool, id = Id, mqueue = MQueue}}; false -> {stop, {cannot_connect, Node}} end. @@ -119,7 +123,7 @@ handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = I handle_info({nodeup, Node}, State = #state{node = Node}) -> %% TODO: Really fast?? case emqttd:is_running(Node) of - true -> + true -> lager:warning("Bridge Node Up: ~p", [Node]), {noreply, dequeue(State#state{status = up})}; false -> @@ -145,7 +149,8 @@ handle_info({'EXIT', _Pid, normal}, State) -> handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). -terminate(_Reason, _State) -> +terminate(_Reason, #state{pool = Pool, id = Id}) -> + ?GPROC_POOL(leave, Pool, Id), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index e808ee72a..64c3fcf83 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -16,27 +16,15 @@ -module(emqttd_bridge_sup). --behavior(supervisor). - -export([start_link/3]). --export([init/1]). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- -%% @doc Start bridge supervisor +%% @doc Start bridge pool supervisor -spec(start_link(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}). start_link(Node, Topic, Options) -> - supervisor:start_link(?MODULE, [Node, Topic, Options]). - -%%-------------------------------------------------------------------- -%% Supervisor callbacks -%%-------------------------------------------------------------------- - -init([Node, Topic, Options]) -> - {ok, {{one_for_all, 10, 100}, - [{bridge, {emqttd_bridge, start_link, [Node, Topic, Options]}, - transient, 10000, worker, [emqttd_bridge]}]}}. + MFA = {emqttd_bridge, start_link, [Node, Topic, Options]}, + emqttd_pool_sup:start_link({bridge, Node, Topic}, random, MFA). diff --git a/src/emqttd_bridge_sup_sup.erl b/src/emqttd_bridge_sup_sup.erl index 475149afe..82f7af3b0 100644 --- a/src/emqttd_bridge_sup_sup.erl +++ b/src/emqttd_bridge_sup_sup.erl @@ -66,11 +66,7 @@ init([]) -> {ok, {{one_for_one, 10, 100}, []}}. bridge_spec(Node, Topic, Options) -> - SupMod = sup_mod(emqttd:adapter(bridge)), {?CHILD_ID(Node, Topic), - {SupMod, start_link, [Node, Topic, Options]}, - permanent, infinity, supervisor, [SupMod]}. - -sup_mod(Adaper) -> - list_to_atom(atom_to_list(Adaper) ++ "_sup"). + {emqttd_bridge_sup, start_link, [Node, Topic, Options]}, + permanent, infinity, supervisor, [emqttd_bridge_sup]}. diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 487181a0e..4275f88fd 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -22,11 +22,14 @@ -include("emqttd_internal.hrl"). -%% API Exports --export([start_link/3, subscribe/2, unsubscribe/2, publish/2, - async_subscribe/2, async_unsubscribe/2]). +%% Start API. +-export([start_link/3]). --export([subscribers/1, dispatch/2]). +%% PubSub API. +-export([subscribe/3, async_subscribe/3, publish/2, unsubscribe/3, + async_unsubscribe/3, subscribers/1]). + +-export([dispatch/2]). %% gen_server. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -36,24 +39,38 @@ -define(PUBSUB, ?MODULE). --spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}). +-define(is_local(Options), lists:member(local, Options)). + +%%-------------------------------------------------------------------- +%% Start PubSub +%%-------------------------------------------------------------------- + +%% @doc Start one Pubsub +-spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, any()}). start_link(Pool, Id, Env) -> gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). --spec(subscribe(binary(), emqttd:subscriber()) -> ok). -subscribe(Topic, Subscriber) -> - call(pick(Topic), {subscribe, Topic, Subscriber}). +%%-------------------------------------------------------------------- +%% PubSub API +%%-------------------------------------------------------------------- --spec(async_subscribe(binary(), emqttd:subscriber()) -> ok). -async_subscribe(Topic, Subscriber) -> - cast(pick(Topic), {subscribe, Topic, Subscriber}). +%% @doc Subscribe a Topic +-spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok). +subscribe(Topic, Subscriber, Options) -> + call(pick(Topic), {subscribe, Topic, Subscriber, Options}). +-spec(async_subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok). +async_subscribe(Topic, Subscriber, Options) -> + cast(pick(Topic), {subscribe, Topic, Subscriber, Options}). + +%% @doc Publish MQTT Message to Topic -spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore). publish(Topic, Msg) -> - route(emqttd_router:match(Topic), delivery(Msg)). + route(lists:append(emqttd_router:match(Topic), + emqttd_router:match_local(Topic)), delivery(Msg)). -route([], _Delivery) -> - ignore; +route([], #mqtt_delivery{message = #mqtt_message{topic = Topic}}) -> + dropped(Topic), ignore; %% Dispatch on the local node route([#mqtt_route{topic = To, node = Node}], @@ -83,7 +100,7 @@ dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) -> dropped(Topic), {ok, Delivery}; [Sub] -> %% optimize? dispatch(Sub, Topic, Msg), - {ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1} | Flows]}}; + {ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1}|Flows]}}; Subscribers -> Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows], lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers), @@ -93,10 +110,27 @@ dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) -> dispatch(Pid, Topic, Msg) when is_pid(Pid) -> Pid ! {dispatch, Topic, Msg}; dispatch(SubId, Topic, Msg) when is_binary(SubId) -> - emqttd_sm:dispatch(SubId, Topic, Msg). + emqttd_sm:dispatch(SubId, Topic, Msg); +dispatch({_Share, [Sub]}, Topic, Msg) -> + dispatch(Sub, Topic, Msg); +dispatch({_Share, []}, _Topic, _Msg) -> + ok; +dispatch({_Share, Subs}, Topic, Msg) -> + dispatch(lists:nth(rand:uniform(length(Subs)), Subs), Topic, Msg). subscribers(Topic) -> - try ets:lookup_element(mqtt_subscriber, Topic, 2) catch error:badarg -> [] end. + group_by_share(try ets:lookup_element(mqtt_subscriber, Topic, 2) catch error:badarg -> [] end). + +group_by_share([]) -> []; + +group_by_share(Subscribers) -> + {Subs1, Shares1} = + lists:foldl(fun({Share, Sub}, {Subs, Shares}) -> + {Subs, dict:append(Share, Sub, Shares)}; + (Sub, {Subs, Shares}) -> + {[Sub|Subs], Shares} + end, {[], dict:new()}, Subscribers), + lists:append(Subs1, dict:to_list(Shares1)). %% @private %% @doc Ingore $SYS Messages. @@ -105,45 +139,50 @@ dropped(<<"$SYS/", _/binary>>) -> dropped(_Topic) -> emqttd_metrics:inc('messages/dropped'). --spec(unsubscribe(binary(), emqttd:subscriber()) -> ok). -unsubscribe(Topic, Subscriber) -> - call(pick(Topic), {unsubscribe, Topic, Subscriber}). +%% @doc Unsubscribe +-spec(unsubscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok). +unsubscribe(Topic, Subscriber, Options) -> + call(pick(Topic), {unsubscribe, Topic, Subscriber, Options}). --spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok). -async_unsubscribe(Topic, Subscriber) -> - cast(pick(Topic), {unsubscribe, Topic, Subscriber}). +-spec(async_unsubscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok). +async_unsubscribe(Topic, Subscriber, Options) -> + cast(pick(Topic), {unsubscribe, Topic, Subscriber, Options}). -call(Server, Req) -> - gen_server2:call(Server, Req, infinity). +call(PubSub, Req) when is_pid(PubSub) -> + gen_server2:call(PubSub, Req, infinity). -cast(Server, Msg) -> - gen_server2:cast(Server, Msg). +cast(PubSub, Msg) when is_pid(PubSub) -> + gen_server2:cast(PubSub, Msg). -pick(Topic) -> - gproc_pool:pick_worker(pubsub, Topic). +pick(Subscriber) -> + gproc_pool:pick_worker(pubsub, Subscriber). + +%%-------------------------------------------------------------------- +%% gen_server Callbacks +%%-------------------------------------------------------------------- init([Pool, Id, Env]) -> ?GPROC_POOL(join, Pool, Id), {ok, #state{pool = Pool, id = Id, env = Env}}. -handle_call({subscribe, Topic, Subscriber}, _From, State) -> - add_subscriber_(Topic, Subscriber), +handle_call({subscribe, Topic, Subscriber, Options}, _From, State) -> + add_subscriber(Topic, Subscriber, Options), {reply, ok, setstats(State)}; -handle_call({unsubscribe, Topic, Subscriber}, _From, State) -> - del_subscriber_(Topic, Subscriber), - {reply, ok, setstats(State)}; +handle_call({unsubscribe, Topic, Subscriber, Options}, _From, State) -> + del_subscriber(Topic, Subscriber, Options), + {reply, ok, setstats(State), hibernate}; handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). -handle_cast({subscribe, Topic, Subscriber}, State) -> - add_subscriber_(Topic, Subscriber), +handle_cast({subscribe, Topic, Subscriber, Options}, State) -> + add_subscriber(Topic, Subscriber, Options), {noreply, setstats(State)}; -handle_cast({unsubscribe, Topic, Subscriber}, State) -> - del_subscriber_(Topic, Subscriber), - {noreply, setstats(State)}; +handle_cast({unsubscribe, Topic, Subscriber, Options}, State) -> + del_subscriber(Topic, Subscriber, Options), + {noreply, setstats(State), hibernate}; handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). @@ -161,17 +200,42 @@ code_change(_OldVsn, State, _Extra) -> %% Internel Functions %%-------------------------------------------------------------------- -add_subscriber_(Topic, Subscriber) -> - (not ets:member(mqtt_subscriber, Topic)) - andalso emqttd_router:add_route(Topic), - ets:insert(mqtt_subscriber, {Topic, Subscriber}). +add_subscriber(Topic, Subscriber, Options) -> + Share = proplists:get_value(share, Options), + case ?is_local(Options) of + false -> add_subscriber_(Share, Topic, Subscriber); + true -> add_local_subscriber_(Share, Topic, Subscriber) + end. -del_subscriber_(Topic, Subscriber) -> - ets:delete_object(mqtt_subscriber, {Topic, Subscriber}), - (not ets:member(mqtt_subscriber, Topic)) - andalso emqttd_router:del_route(Topic). +add_subscriber_(Share, Topic, Subscriber) -> + (not ets:member(mqtt_subscriber, Topic)) andalso emqttd_router:add_route(Topic), + ets:insert(mqtt_subscriber, {Topic, shared(Share, Subscriber)}). + +add_local_subscriber_(Share, Topic, Subscriber) -> + (not ets:member(mqtt_subscriber, {local, Topic})) andalso emqttd_router:add_local_route(Topic), + ets:insert(mqtt_subscriber, {{local, Topic}, shared(Share, Subscriber)}). + +del_subscriber(Topic, Subscriber, Options) -> + Share = proplists:get_value(share, Options), + case ?is_local(Options) of + false -> del_subscriber_(Share, Topic, Subscriber); + true -> del_local_subscriber_(Share, Topic, Subscriber) + end. + +del_subscriber_(Share, Topic, Subscriber) -> + ets:delete_object(mqtt_subscriber, {Topic, shared(Share, Subscriber)}), + (not ets:member(mqtt_subscriber, Topic)) andalso emqttd_router:del_route(Topic). + +del_local_subscriber_(Share, Topic, Subscriber) -> + ets:delete_object(mqtt_subscriber, {{local, Topic}, shared(Share, Subscriber)}), + (not ets:member(subscriber, {local, Topic})) andalso emqttd_router:del_local_route(Topic). + +shared(undefined, Subscriber) -> + Subscriber; +shared(Share, Subscriber) -> + {Share, Subscriber}. setstats(State) -> - emqttd_stats:setstats('subscribers/count', 'subscribers/max', - ets:info(mqtt_subscriber, size)), State. + emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(mqtt_subscriber, size)), + State. diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index f87f87d48..e57eee517 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -38,7 +38,7 @@ pubsub_pool() -> hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]). %%-------------------------------------------------------------------- -%% Supervisor callbacks +%% Supervisor Callbacks %%-------------------------------------------------------------------- init([Env]) -> @@ -57,10 +57,10 @@ pool_size(Env) -> pool_sup(Name, Env) -> Pool = list_to_atom(atom_to_list(Name) ++ "_pool"), - MFA = {emqttd:adapter(Name), start_link, [Env]}, + Mod = list_to_atom("emqttd_" ++ atom_to_list(Name)), + MFA = {Mod, start_link, [Env]}, emqttd_pool_sup:spec(Pool, [Name, hash, pool_size(Env), MFA]). - %%-------------------------------------------------------------------- %% Create PubSub Tables %%-------------------------------------------------------------------- diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl index 7cf4c3007..83903736e 100644 --- a/src/emqttd_server.erl +++ b/src/emqttd_server.erl @@ -133,7 +133,9 @@ setqos(Topic, Subscriber, Qos) when is_binary(Topic) -> -spec(subscriptions(emqttd:subscriber()) -> [{binary(), list(emqttd:suboption())}]). subscriptions(Subscriber) -> - lists:map(fun({_, Topic}) -> + lists:map(fun({_, {_Share, Topic}}) -> + subscription(Topic, Subscriber); + ({_, Topic}) -> subscription(Topic, Subscriber) end, ets:lookup(mqtt_subscription, Subscriber)). @@ -235,14 +237,20 @@ code_change(_OldVsn, State, _Extra) -> do_subscribe_(Topic, Subscriber, Options, State) -> case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of [] -> - emqttd_pubsub:async_subscribe(Topic, Subscriber), - ets:insert(mqtt_subscription, {Subscriber, Topic}), + emqttd_pubsub:async_subscribe(Topic, Subscriber, Options), + Share = proplists:get_value(share, Options), + add_subscription_(Share, Subscriber, Topic), ets:insert(mqtt_subproperty, {{Topic, Subscriber}, Options}), {ok, monitor_subpid(Subscriber, State)}; [_] -> {error, {already_subscribed, Topic}} end. +add_subscription_(undefined, Subscriber, Topic) -> + ets:insert(mqtt_subscription, {Subscriber, Topic}); +add_subscription_(Share, Subscriber, Topic) -> + ets:insert(mqtt_subscription, {Subscriber, {Share, Topic}}). + monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> State#state{submon = PMon:monitor(SubPid)}; monitor_subpid(_SubPid, State) -> @@ -250,9 +258,10 @@ monitor_subpid(_SubPid, State) -> do_unsubscribe_(Topic, Subscriber, State) -> case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of - [_] -> - emqttd_pubsub:async_unsubscribe(Topic, Subscriber), - ets:delete_object(mqtt_subscription, {Subscriber, Topic}), + [{_, Options}] -> + emqttd_pubsub:async_unsubscribe(Topic, Subscriber, Options), + Share = proplists:get_value(share, Options), + del_subscription_(Share, Subscriber, Topic), ets:delete(mqtt_subproperty, {Topic, Subscriber}), {ok, case ets:member(mqtt_subscription, Subscriber) of true -> State; @@ -262,24 +271,32 @@ do_unsubscribe_(Topic, Subscriber, State) -> {error, {subscription_not_found, Topic}} end. +del_subscription_(undefined, Subscriber, Topic) -> + ets:delete_object(mqtt_subscription, {Subscriber, Topic}); +del_subscription_(Share, Subscriber, Topic) -> + ets:delete_object(mqtt_subscription, {Subscriber, {Share, Topic}}). + demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> State#state{submon = PMon:demonitor(SubPid)}; demonitor_subpid(_SubPid, State) -> State. subscriber_down_(Subscriber) -> - lists:foreach(fun({_, Topic}) -> - subscriber_down_(Subscriber, Topic) + lists:foreach(fun({_, {Share, Topic}}) -> + subscriber_down_(Share, Subscriber, Topic); + ({_, Topic}) -> + subscriber_down_(undefined, Subscriber, Topic) end, ets:lookup(mqtt_subscription, Subscriber)), ets:delete(mqtt_subscription, Subscriber). -subscriber_down_(Subscriber, Topic) -> +subscriber_down_(Share, Subscriber, Topic) -> case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of - [] -> - %% here? - emqttd_pubsub:async_unsubscribe(Topic, Subscriber); - [_] -> - emqttd_pubsub:async_unsubscribe(Topic, Subscriber), + [] -> + %% TODO:....??? + Options = if Share == undefined -> []; true -> [{share, Share}] end, + emqttd_pubsub:async_unsubscribe(Topic, Subscriber, Options); + [{_, Options}] -> + emqttd_pubsub:async_unsubscribe(Topic, Subscriber, Options), ets:delete(mqtt_subproperty, {Topic, Subscriber}) end. diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 846ee1955..5444a4f06 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -22,6 +22,8 @@ -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + -define(CONTENT_TYPE, "application/x-www-form-urlencoded"). all() -> @@ -44,6 +46,8 @@ groups() -> {pubsub, [sequence], [subscribe_unsubscribe, publish, pubsub, + t_local_subscribe, + t_shared_subscribe, 'pubsub#', 'pubsub+']}, {router, [sequence], [router_add_del, @@ -148,6 +152,37 @@ pubsub(_) -> timer:sleep(20), emqttd:unsubscribe(<<"a/b/c">>). +t_local_subscribe(_) -> + emqttd:subscribe("$local/topic0"), + emqttd:subscribe("$local/topic1", <<"x">>), + emqttd:subscribe("$local/topic2", <<"x">>, [{qos, 2}]), + timer:sleep(10), + ?assertEqual([self()], emqttd:subscribers("$local/topic0")), + ?assertEqual([<<"x">>], emqttd:subscribers("$local/topic1")), + ?assertEqual([{<<"$local/topic1">>,<<"x">>,[]},{<<"$local/topic2">>,<<"x">>,[{qos,2}]}], emqttd:subscriptions(<<"x">>)), + + ?assertEqual(ok, emqttd:unsubscribe("$local/topic0")), + ?assertMatch({error, {subscription_not_found, _}}, emqttd:unsubscribe("$local/topic0")), + ?assertEqual(ok, emqttd:unsubscribe("$local/topic1", <<"x">>)), + ?assertEqual(ok, emqttd:unsubscribe("$local/topic2", <<"x">>)), + ?assertEqual([], emqttd:subscribers("topic1")), + ?assertEqual([], emqttd:subscriptions(<<"x">>)). + +t_shared_subscribe(_) -> + emqttd:subscribe("$local/$share/group1/topic1"), + emqttd:subscribe("$share/group2/topic2"), + emqttd:subscribe("$queue/topic3"), + timer:sleep(10), + ?assertEqual([self()], emqttd:subscribers(<<"$local/$share/group1/topic1">>)), + ?assertEqual([{<<"$local/$share/group1/topic1">>, self(), []}, + {<<"$queue/topic3">>, self(), []}, + {<<"$share/group2/topic2">>, self(), []}], + lists:sort(emqttd:subscriptions(self()))), + emqttd:unsubscribe("$local/$share/group1/topic1"), + emqttd:unsubscribe("$share/group2/topic2"), + emqttd:unsubscribe("$queue/topic3"), + ?assertEqual([], lists:sort(emqttd:subscriptions(self()))). + 'pubsub#'(_) -> emqttd:subscribe(<<"a/#">>), timer:sleep(10),