local subscription, shared subscription

This commit is contained in:
Feng 2016-09-18 14:08:57 +08:00
parent 811f8a10eb
commit 1bb1799d34
7 changed files with 176 additions and 119 deletions

View File

@ -35,9 +35,6 @@
%% Hooks API
-export([hook/4, hook/3, unhook/2, run_hooks/3]).
%% Adapter
-export([adapter/1]).
%% Debug API
-export([dump/0]).
@ -99,12 +96,12 @@ subscribe(Topic, Subscriber) ->
-spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | pubsub_error()).
subscribe(Topic, Subscriber, Options) ->
with_pubsub(fun(PS) -> PS:subscribe(iolist_to_binary(Topic), Subscriber, Options) end).
emqttd_server:subscribe(iolist_to_binary(Topic), Subscriber, Options).
%% @doc Publish MQTT Message
-spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
publish(Msg) ->
with_pubsub(fun(PS) -> PS:publish(Msg) end).
emqttd_server:publish(Msg).
%% @doc Unsubscribe
-spec(unsubscribe(iodata()) -> ok | pubsub_error()).
@ -113,32 +110,30 @@ unsubscribe(Topic) ->
-spec(unsubscribe(iodata(), subscriber()) -> ok | pubsub_error()).
unsubscribe(Topic, Subscriber) ->
with_pubsub(fun(PS) -> PS:unsubscribe(iolist_to_binary(Topic), Subscriber) end).
emqttd_server:unsubscribe(iolist_to_binary(Topic), Subscriber).
-spec(setqos(binary(), subscriber(), mqtt_qos()) -> ok).
setqos(Topic, Subscriber, Qos) ->
with_pubsub(fun(PS) -> PS:setqos(iolist_to_binary(Topic), Subscriber, Qos) end).
emqttd_server:setqos(iolist_to_binary(Topic), Subscriber, Qos).
-spec(topics() -> [binary()]).
topics() -> emqttd_router:topics().
-spec(subscribers(iodata()) -> list(subscriber())).
subscribers(Topic) ->
with_pubsub(fun(PS) -> PS:subscribers(iolist_to_binary(Topic)) end).
emqttd_server:subscribers(iolist_to_binary(Topic)).
-spec(subscriptions(subscriber()) -> [{binary(), suboption()}]).
subscriptions(Subscriber) ->
with_pubsub(fun(PS) -> PS:subscriptions(Subscriber) end).
emqttd_server:subscriptions(Subscriber).
-spec(is_subscribed(iodata(), subscriber()) -> boolean()).
is_subscribed(Topic, Subscriber) ->
with_pubsub(fun(PS) -> PS:is_subscribed(iolist_to_binary(Topic), Subscriber) end).
emqttd_server:is_subscribed(iolist_to_binary(Topic), Subscriber).
-spec(subscriber_down(subscriber()) -> ok).
subscriber_down(Subscriber) ->
with_pubsub(fun(PS) -> PS:subscriber_down(Subscriber) end).
with_pubsub(Fun) -> Fun(env(pubsub_server, emqttd_server)).
emqttd_server:subscriber_down(Subscriber).
%%--------------------------------------------------------------------
%% Hooks API
@ -160,17 +155,9 @@ unhook(Hook, Function) ->
run_hooks(Hook, Args, Acc) ->
emqttd_hook:run(Hook, Args, Acc).
%%--------------------------------------------------------------------
%% Adapter
%%--------------------------------------------------------------------
adapter(server) -> env(pubsub_server, emqttd_server);
adapter(pubsub) -> env(pubsub_adapter, emqttd_pubsub);
adapter(bridge) -> env(bridge_adapter, emqttd_bridge).
%%--------------------------------------------------------------------
%% Debug
%%--------------------------------------------------------------------
dump() -> with_pubsub(fun(PS) -> lists:append([PS:dump(), emqttd_router:dump()]) end).
dump() -> lists:append([emqttd_server:dump(), emqttd_router:dump()]).

View File

@ -25,7 +25,7 @@
-include("emqttd_internal.hrl").
%% API Function Exports
-export([start_link/3]).
-export([start_link/5]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -33,7 +33,8 @@
-define(PING_DOWN_INTERVAL, 1000).
-record(state, {node, subtopic,
-record(state, {pool, id,
node, subtopic,
qos = ?QOS_2,
topic_suffix = <<>>,
topic_prefix = <<>>,
@ -55,25 +56,28 @@
%%--------------------------------------------------------------------
%% @doc Start a bridge
-spec(start_link(atom(), binary(), [option()]) -> {ok, pid()} | ignore | {error, term()}).
start_link(Node, Topic, Options) ->
gen_server2:start_link(?MODULE, [Node, Topic, Options], []).
-spec(start_link(any(), pos_integer(), atom(), binary(), [option()]) ->
{ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id, Node, Topic, Options) ->
gen_server2:start_link(?MODULE, [Pool, Id, Node, Topic, Options], []).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Node, Topic, Options]) ->
init([Pool, Id, Node, Topic, Options]) ->
?GPROC_POOL(join, Pool, Id),
process_flag(trap_exit, true),
case net_kernel:connect_node(Node) of
true ->
true = erlang:monitor_node(Node, true),
Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
emqttd:subscribe(Topic, self(), [local, {share, Share}]),
State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
MQueue = emqttd_mqueue:new(qname(Node, Topic),
[{max_len, State#state.max_queue_len}],
emqttd_alarm:alarm_fun()),
emqttd:subscribe(Topic),
{ok, State#state{mqueue = MQueue}};
{ok, State#state{pool = Pool, id = Id, mqueue = MQueue}};
false ->
{stop, {cannot_connect, Node}}
end.
@ -119,7 +123,7 @@ handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = I
handle_info({nodeup, Node}, State = #state{node = Node}) ->
%% TODO: Really fast??
case emqttd:is_running(Node) of
true ->
true ->
lager:warning("Bridge Node Up: ~p", [Node]),
{noreply, dequeue(State#state{status = up})};
false ->
@ -145,7 +149,8 @@ handle_info({'EXIT', _Pid, normal}, State) ->
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
terminate(_Reason, _State) ->
terminate(_Reason, #state{pool = Pool, id = Id}) ->
?GPROC_POOL(leave, Pool, Id),
ok.
code_change(_OldVsn, State, _Extra) ->

View File

@ -16,27 +16,15 @@
-module(emqttd_bridge_sup).
-behavior(supervisor).
-export([start_link/3]).
-export([init/1]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%% @doc Start bridge supervisor
%% @doc Start bridge pool supervisor
-spec(start_link(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}).
start_link(Node, Topic, Options) ->
supervisor:start_link(?MODULE, [Node, Topic, Options]).
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([Node, Topic, Options]) ->
{ok, {{one_for_all, 10, 100},
[{bridge, {emqttd_bridge, start_link, [Node, Topic, Options]},
transient, 10000, worker, [emqttd_bridge]}]}}.
MFA = {emqttd_bridge, start_link, [Node, Topic, Options]},
emqttd_pool_sup:start_link({bridge, Node, Topic}, random, MFA).

View File

@ -66,11 +66,7 @@ init([]) ->
{ok, {{one_for_one, 10, 100}, []}}.
bridge_spec(Node, Topic, Options) ->
SupMod = sup_mod(emqttd:adapter(bridge)),
{?CHILD_ID(Node, Topic),
{SupMod, start_link, [Node, Topic, Options]},
permanent, infinity, supervisor, [SupMod]}.
sup_mod(Adaper) ->
list_to_atom(atom_to_list(Adaper) ++ "_sup").
{emqttd_bridge_sup, start_link, [Node, Topic, Options]},
permanent, infinity, supervisor, [emqttd_bridge_sup]}.

View File

@ -22,11 +22,14 @@
-include("emqttd_internal.hrl").
%% API Exports
-export([start_link/3, subscribe/2, unsubscribe/2, publish/2,
async_subscribe/2, async_unsubscribe/2]).
%% Start API.
-export([start_link/3]).
-export([subscribers/1, dispatch/2]).
%% PubSub API.
-export([subscribe/3, async_subscribe/3, publish/2, unsubscribe/3,
async_unsubscribe/3, subscribers/1]).
-export([dispatch/2]).
%% gen_server.
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -36,24 +39,38 @@
-define(PUBSUB, ?MODULE).
-spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}).
-define(is_local(Options), lists:member(local, Options)).
%%--------------------------------------------------------------------
%% Start PubSub
%%--------------------------------------------------------------------
%% @doc Start one Pubsub
-spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, any()}).
start_link(Pool, Id, Env) ->
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
-spec(subscribe(binary(), emqttd:subscriber()) -> ok).
subscribe(Topic, Subscriber) ->
call(pick(Topic), {subscribe, Topic, Subscriber}).
%%--------------------------------------------------------------------
%% PubSub API
%%--------------------------------------------------------------------
-spec(async_subscribe(binary(), emqttd:subscriber()) -> ok).
async_subscribe(Topic, Subscriber) ->
cast(pick(Topic), {subscribe, Topic, Subscriber}).
%% @doc Subscribe a Topic
-spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok).
subscribe(Topic, Subscriber, Options) ->
call(pick(Topic), {subscribe, Topic, Subscriber, Options}).
-spec(async_subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok).
async_subscribe(Topic, Subscriber, Options) ->
cast(pick(Topic), {subscribe, Topic, Subscriber, Options}).
%% @doc Publish MQTT Message to Topic
-spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore).
publish(Topic, Msg) ->
route(emqttd_router:match(Topic), delivery(Msg)).
route(lists:append(emqttd_router:match(Topic),
emqttd_router:match_local(Topic)), delivery(Msg)).
route([], _Delivery) ->
ignore;
route([], #mqtt_delivery{message = #mqtt_message{topic = Topic}}) ->
dropped(Topic), ignore;
%% Dispatch on the local node
route([#mqtt_route{topic = To, node = Node}],
@ -83,7 +100,7 @@ dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) ->
dropped(Topic), {ok, Delivery};
[Sub] -> %% optimize?
dispatch(Sub, Topic, Msg),
{ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1} | Flows]}};
{ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1}|Flows]}};
Subscribers ->
Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows],
lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers),
@ -93,10 +110,27 @@ dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) ->
dispatch(Pid, Topic, Msg) when is_pid(Pid) ->
Pid ! {dispatch, Topic, Msg};
dispatch(SubId, Topic, Msg) when is_binary(SubId) ->
emqttd_sm:dispatch(SubId, Topic, Msg).
emqttd_sm:dispatch(SubId, Topic, Msg);
dispatch({_Share, [Sub]}, Topic, Msg) ->
dispatch(Sub, Topic, Msg);
dispatch({_Share, []}, _Topic, _Msg) ->
ok;
dispatch({_Share, Subs}, Topic, Msg) ->
dispatch(lists:nth(rand:uniform(length(Subs)), Subs), Topic, Msg).
subscribers(Topic) ->
try ets:lookup_element(mqtt_subscriber, Topic, 2) catch error:badarg -> [] end.
group_by_share(try ets:lookup_element(mqtt_subscriber, Topic, 2) catch error:badarg -> [] end).
group_by_share([]) -> [];
group_by_share(Subscribers) ->
{Subs1, Shares1} =
lists:foldl(fun({Share, Sub}, {Subs, Shares}) ->
{Subs, dict:append(Share, Sub, Shares)};
(Sub, {Subs, Shares}) ->
{[Sub|Subs], Shares}
end, {[], dict:new()}, Subscribers),
lists:append(Subs1, dict:to_list(Shares1)).
%% @private
%% @doc Ingore $SYS Messages.
@ -105,45 +139,50 @@ dropped(<<"$SYS/", _/binary>>) ->
dropped(_Topic) ->
emqttd_metrics:inc('messages/dropped').
-spec(unsubscribe(binary(), emqttd:subscriber()) -> ok).
unsubscribe(Topic, Subscriber) ->
call(pick(Topic), {unsubscribe, Topic, Subscriber}).
%% @doc Unsubscribe
-spec(unsubscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok).
unsubscribe(Topic, Subscriber, Options) ->
call(pick(Topic), {unsubscribe, Topic, Subscriber, Options}).
-spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok).
async_unsubscribe(Topic, Subscriber) ->
cast(pick(Topic), {unsubscribe, Topic, Subscriber}).
-spec(async_unsubscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok).
async_unsubscribe(Topic, Subscriber, Options) ->
cast(pick(Topic), {unsubscribe, Topic, Subscriber, Options}).
call(Server, Req) ->
gen_server2:call(Server, Req, infinity).
call(PubSub, Req) when is_pid(PubSub) ->
gen_server2:call(PubSub, Req, infinity).
cast(Server, Msg) ->
gen_server2:cast(Server, Msg).
cast(PubSub, Msg) when is_pid(PubSub) ->
gen_server2:cast(PubSub, Msg).
pick(Topic) ->
gproc_pool:pick_worker(pubsub, Topic).
pick(Subscriber) ->
gproc_pool:pick_worker(pubsub, Subscriber).
%%--------------------------------------------------------------------
%% gen_server Callbacks
%%--------------------------------------------------------------------
init([Pool, Id, Env]) ->
?GPROC_POOL(join, Pool, Id),
{ok, #state{pool = Pool, id = Id, env = Env}}.
handle_call({subscribe, Topic, Subscriber}, _From, State) ->
add_subscriber_(Topic, Subscriber),
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
add_subscriber(Topic, Subscriber, Options),
{reply, ok, setstats(State)};
handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
del_subscriber_(Topic, Subscriber),
{reply, ok, setstats(State)};
handle_call({unsubscribe, Topic, Subscriber, Options}, _From, State) ->
del_subscriber(Topic, Subscriber, Options),
{reply, ok, setstats(State), hibernate};
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
handle_cast({subscribe, Topic, Subscriber}, State) ->
add_subscriber_(Topic, Subscriber),
handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
add_subscriber(Topic, Subscriber, Options),
{noreply, setstats(State)};
handle_cast({unsubscribe, Topic, Subscriber}, State) ->
del_subscriber_(Topic, Subscriber),
{noreply, setstats(State)};
handle_cast({unsubscribe, Topic, Subscriber, Options}, State) ->
del_subscriber(Topic, Subscriber, Options),
{noreply, setstats(State), hibernate};
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
@ -161,17 +200,42 @@ code_change(_OldVsn, State, _Extra) ->
%% Internel Functions
%%--------------------------------------------------------------------
add_subscriber_(Topic, Subscriber) ->
(not ets:member(mqtt_subscriber, Topic))
andalso emqttd_router:add_route(Topic),
ets:insert(mqtt_subscriber, {Topic, Subscriber}).
add_subscriber(Topic, Subscriber, Options) ->
Share = proplists:get_value(share, Options),
case ?is_local(Options) of
false -> add_subscriber_(Share, Topic, Subscriber);
true -> add_local_subscriber_(Share, Topic, Subscriber)
end.
del_subscriber_(Topic, Subscriber) ->
ets:delete_object(mqtt_subscriber, {Topic, Subscriber}),
(not ets:member(mqtt_subscriber, Topic))
andalso emqttd_router:del_route(Topic).
add_subscriber_(Share, Topic, Subscriber) ->
(not ets:member(mqtt_subscriber, Topic)) andalso emqttd_router:add_route(Topic),
ets:insert(mqtt_subscriber, {Topic, shared(Share, Subscriber)}).
add_local_subscriber_(Share, Topic, Subscriber) ->
(not ets:member(mqtt_subscriber, {local, Topic})) andalso emqttd_router:add_local_route(Topic),
ets:insert(mqtt_subscriber, {{local, Topic}, shared(Share, Subscriber)}).
del_subscriber(Topic, Subscriber, Options) ->
Share = proplists:get_value(share, Options),
case ?is_local(Options) of
false -> del_subscriber_(Share, Topic, Subscriber);
true -> del_local_subscriber_(Share, Topic, Subscriber)
end.
del_subscriber_(Share, Topic, Subscriber) ->
ets:delete_object(mqtt_subscriber, {Topic, shared(Share, Subscriber)}),
(not ets:member(mqtt_subscriber, Topic)) andalso emqttd_router:del_route(Topic).
del_local_subscriber_(Share, Topic, Subscriber) ->
ets:delete_object(mqtt_subscriber, {{local, Topic}, shared(Share, Subscriber)}),
(not ets:member(subscriber, {local, Topic})) andalso emqttd_router:del_local_route(Topic).
shared(undefined, Subscriber) ->
Subscriber;
shared(Share, Subscriber) ->
{Share, Subscriber}.
setstats(State) ->
emqttd_stats:setstats('subscribers/count', 'subscribers/max',
ets:info(mqtt_subscriber, size)), State.
emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(mqtt_subscriber, size)),
State.

View File

@ -38,7 +38,7 @@ pubsub_pool() ->
hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]).
%%--------------------------------------------------------------------
%% Supervisor callbacks
%% Supervisor Callbacks
%%--------------------------------------------------------------------
init([Env]) ->
@ -57,10 +57,10 @@ pool_size(Env) ->
pool_sup(Name, Env) ->
Pool = list_to_atom(atom_to_list(Name) ++ "_pool"),
MFA = {emqttd:adapter(Name), start_link, [Env]},
Mod = list_to_atom("emqttd_" ++ atom_to_list(Name)),
MFA = {Mod, start_link, [Env]},
emqttd_pool_sup:spec(Pool, [Name, hash, pool_size(Env), MFA]).
%%--------------------------------------------------------------------
%% Create PubSub Tables
%%--------------------------------------------------------------------

View File

@ -133,7 +133,9 @@ setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
-spec(subscriptions(emqttd:subscriber()) -> [{binary(), list(emqttd:suboption())}]).
subscriptions(Subscriber) ->
lists:map(fun({_, Topic}) ->
lists:map(fun({_, {_Share, Topic}}) ->
subscription(Topic, Subscriber);
({_, Topic}) ->
subscription(Topic, Subscriber)
end, ets:lookup(mqtt_subscription, Subscriber)).
@ -235,14 +237,20 @@ code_change(_OldVsn, State, _Extra) ->
do_subscribe_(Topic, Subscriber, Options, State) ->
case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
[] ->
emqttd_pubsub:async_subscribe(Topic, Subscriber),
ets:insert(mqtt_subscription, {Subscriber, Topic}),
emqttd_pubsub:async_subscribe(Topic, Subscriber, Options),
Share = proplists:get_value(share, Options),
add_subscription_(Share, Subscriber, Topic),
ets:insert(mqtt_subproperty, {{Topic, Subscriber}, Options}),
{ok, monitor_subpid(Subscriber, State)};
[_] ->
{error, {already_subscribed, Topic}}
end.
add_subscription_(undefined, Subscriber, Topic) ->
ets:insert(mqtt_subscription, {Subscriber, Topic});
add_subscription_(Share, Subscriber, Topic) ->
ets:insert(mqtt_subscription, {Subscriber, {Share, Topic}}).
monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
State#state{submon = PMon:monitor(SubPid)};
monitor_subpid(_SubPid, State) ->
@ -250,9 +258,10 @@ monitor_subpid(_SubPid, State) ->
do_unsubscribe_(Topic, Subscriber, State) ->
case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
[_] ->
emqttd_pubsub:async_unsubscribe(Topic, Subscriber),
ets:delete_object(mqtt_subscription, {Subscriber, Topic}),
[{_, Options}] ->
emqttd_pubsub:async_unsubscribe(Topic, Subscriber, Options),
Share = proplists:get_value(share, Options),
del_subscription_(Share, Subscriber, Topic),
ets:delete(mqtt_subproperty, {Topic, Subscriber}),
{ok, case ets:member(mqtt_subscription, Subscriber) of
true -> State;
@ -262,24 +271,32 @@ do_unsubscribe_(Topic, Subscriber, State) ->
{error, {subscription_not_found, Topic}}
end.
del_subscription_(undefined, Subscriber, Topic) ->
ets:delete_object(mqtt_subscription, {Subscriber, Topic});
del_subscription_(Share, Subscriber, Topic) ->
ets:delete_object(mqtt_subscription, {Subscriber, {Share, Topic}}).
demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
State#state{submon = PMon:demonitor(SubPid)};
demonitor_subpid(_SubPid, State) ->
State.
subscriber_down_(Subscriber) ->
lists:foreach(fun({_, Topic}) ->
subscriber_down_(Subscriber, Topic)
lists:foreach(fun({_, {Share, Topic}}) ->
subscriber_down_(Share, Subscriber, Topic);
({_, Topic}) ->
subscriber_down_(undefined, Subscriber, Topic)
end, ets:lookup(mqtt_subscription, Subscriber)),
ets:delete(mqtt_subscription, Subscriber).
subscriber_down_(Subscriber, Topic) ->
subscriber_down_(Share, Subscriber, Topic) ->
case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
[] ->
%% here?
emqttd_pubsub:async_unsubscribe(Topic, Subscriber);
[_] ->
emqttd_pubsub:async_unsubscribe(Topic, Subscriber),
[] ->
%% TODO:....???
Options = if Share == undefined -> []; true -> [{share, Share}] end,
emqttd_pubsub:async_unsubscribe(Topic, Subscriber, Options);
[{_, Options}] ->
emqttd_pubsub:async_unsubscribe(Topic, Subscriber, Options),
ets:delete(mqtt_subproperty, {Topic, Subscriber})
end.