From 226933018a6679f6df2060f510ec149ab7374522 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 8 Aug 2016 13:49:52 +0800 Subject: [PATCH] improve the design of pubsub and router --- include/emqttd.hrl | 7 +- src/emqttd.erl | 120 +++++++----- src/emqttd_app.erl | 1 + src/emqttd_backend.erl | 95 ++++++++++ src/emqttd_cluster.erl | 3 +- src/emqttd_pubsub.erl | 387 ++++++++++++++++++++++++-------------- src/emqttd_pubsub_sup.erl | 42 +---- src/emqttd_server.erl | 278 --------------------------- src/emqttd_session.erl | 4 +- 9 files changed, 431 insertions(+), 506 deletions(-) create mode 100644 src/emqttd_backend.erl delete mode 100644 src/emqttd_server.erl diff --git a/include/emqttd.hrl b/include/emqttd.hrl index 1e5ed2d3a..6142407d1 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -156,11 +156,12 @@ %% MQTT Delivery %%-------------------------------------------------------------------- -record(mqtt_delivery, { - message :: mqtt_message(), %% Message - dispatched = [] :: list(), - flow_through :: [node()] + message :: mqtt_message(), %% Message + flows :: list() }). +-type(mqtt_delivery() :: #mqtt_delivery{}). + %%-------------------------------------------------------------------- %% MQTT Alarm %%-------------------------------------------------------------------- diff --git a/src/emqttd.erl b/src/emqttd.erl index 70b1459a3..6b73d9027 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -16,6 +16,8 @@ -module(emqttd). +-author("Feng Lee "). + -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). @@ -23,19 +25,31 @@ -export([start/0, conf/1, conf/2, env/1, env/2, is_running/1]). %% PubSub API --export([create/2, lookup/2, publish/1, subscribe/1, subscribe/3, - unsubscribe/1, unsubscribe/3]). +-export([subscribe/1, subscribe/2, subscribe/3, publish/1, + unsubscribe/1, unsubscribe/2]). -%% Route and Forward API -%% -export([route/2, forward/2]). +%% PubSub Management API +-export([topics/0, subscribers/1, subscriptions/1]). %% Hooks API -export([hook/4, hook/3, unhook/2, run_hooks/3]). +%% Debug API +-export([dump/0]). + +-type(subscriber() :: pid() | binary() | function()). + +-type(suboption() :: local | {qos, non_neg_integer()} | {share, {'$queue' | binary()}}). + +-type(pubsub_error() :: {error, {already_subscribed, binary()} + | {subscription_not_found, binary()}}). + +-export_type([subscriber/0, suboption/0, pubsub_error/0]). + -define(APP, ?MODULE). %%-------------------------------------------------------------------- -%% Bootstrap, environment, is_running... +%% Bootstrap, environment, configuration, is_running... %%-------------------------------------------------------------------- %% @doc Start emqttd application. @@ -67,52 +81,62 @@ is_running(Node) -> end. %%-------------------------------------------------------------------- -%% PubSub APIs that wrap emqttd_server, emqttd_pubsub +%% PubSub APIs that wrap emqttd_pubsub %%-------------------------------------------------------------------- -%% @doc Lookup Topic or Subscription --spec(lookup(topic, binary()) -> [mqtt_topic()]; - (subscription, binary()) -> [mqtt_subscription()]). -lookup(topic, Topic) when is_binary(Topic) -> - emqttd_pubsub:lookup_topic(Topic); +%% @doc Subscribe +-spec(subscribe(iodata()) -> ok | {error, any()}). +subscribe(Topic) -> + subscribe(Topic, self()). -lookup(subscription, ClientId) when is_binary(ClientId) -> - emqttd_server:lookup_subscription(ClientId). +-spec(subscribe(iodata(), subscriber()) -> ok | {error, any()}). +subscribe(Topic, Subscriber) -> + subscribe(Topic, Subscriber, []). -%% @doc Create a Topic or Subscription --spec(create(topic | subscription, binary()) -> ok | {error, any()}). -create(topic, Topic) when is_binary(Topic) -> - emqttd_pubsub:create_topic(Topic); - -create(subscription, {ClientId, Topic, Qos}) -> - Subscription = #mqtt_subscription{subid = ClientId, topic = Topic, qos = ?QOS_I(Qos)}, - emqttd_backend:add_subscription(Subscription). +-spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | pubsub_error()). +subscribe(Topic, Subscriber, Options) -> + with_pubsub(fun(PubSub) -> PubSub:subscribe(iolist_to_binary(Topic), Subscriber, Options) end). %% @doc Publish MQTT Message --spec(publish(mqtt_message()) -> ok). -publish(Msg) when is_record(Msg, mqtt_message) -> - emqttd_server:publish(Msg), ok. - -%% @doc Subscribe --spec(subscribe(binary()) -> ok; - ({binary(), binary(), mqtt_qos()}) -> ok). -subscribe(Topic) when is_binary(Topic) -> - emqttd_server:subscribe(Topic); -subscribe({ClientId, Topic, Qos}) -> - subscribe(ClientId, Topic, Qos). - --spec(subscribe(binary(), binary(), mqtt_qos()) -> {ok, mqtt_qos()}). -subscribe(ClientId, Topic, Qos) -> - emqttd_server:subscribe(ClientId, Topic, Qos). +-spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore). +publish(Msg = #mqtt_message{from = From}) -> + trace(publish, From, Msg), + case run_hooks('message.publish', [], Msg) of + {ok, Msg1 = #mqtt_message{topic = Topic}} -> + %% Retain message first. Don't create retained topic. + Msg2 = case emqttd_retainer:retain(Msg1) of + ok -> emqttd_message:unset_flag(Msg1); + ignore -> Msg1 + end, + with_pubsub(fun(PubSub) -> PubSub:publish(Topic, Msg2) end); + {stop, Msg1} -> + lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)]), + ignore + end. %% @doc Unsubscribe --spec(unsubscribe(binary()) -> ok). -unsubscribe(Topic) when is_binary(Topic) -> - emqttd_server:unsubscribe(Topic). +-spec(unsubscribe(iodata()) -> ok | pubsub_error()). +unsubscribe(Topic) -> + unsubscribe(Topic, self()). --spec(unsubscribe(binary(), binary(), mqtt_qos()) -> ok). -unsubscribe(ClientId, Topic, Qos) -> - emqttd_server:unsubscribe(ClientId, Topic, Qos). +-spec(unsubscribe(iodata(), subscriber()) -> ok | pubsub_error()). +unsubscribe(Topic, Subscriber) -> + with_pubsub(fun(PubSub) -> PubSub:unsubscribe(iolist_to_binary(Topic), Subscriber) end). + +-spec(topics() -> [binary()]). +topics() -> with_pubsub(fun(PubSub) -> PubSub:topics() end). + +-spec(subscribers(iodata()) -> list(subscriber())). +subscribers(Topic) -> + with_pubsub(fun(PubSub) -> PubSub:subscribers(iolist_to_binary(Topic)) end). + +-spec(subscriptions(subscriber()) -> [{binary(), suboption()}]). +subscriptions(Subscriber) -> + with_pubsub(fun(PubSub) -> PubSub:subscriptions(Subscriber) end). + +with_pubsub(Fun) -> Fun(conf(pubsub_adapter)). + +dump() -> with_pubsub(fun(PubSub) -> lists:append(PubSub:dump(), zenmq_router:dump()) end). %%-------------------------------------------------------------------- %% Hooks API @@ -134,3 +158,15 @@ unhook(Hook, Function) -> run_hooks(Hook, Args, Acc) -> emqttd_hook:run(Hook, Args, Acc). +%%-------------------------------------------------------------------- +%% Trace Functions +%%-------------------------------------------------------------------- + +trace(publish, From, _Msg) when is_atom(From) -> + %% Dont' trace '$SYS' publish + ignore; + +trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) -> + lager:info([{client, From}, {topic, Topic}], + "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). + diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 870a349a6..81640c884 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -77,6 +77,7 @@ print_vsn() -> start_servers(Sup) -> Servers = [{"emqttd ctl", emqttd_ctl}, {"emqttd hook", emqttd_hook}, + {"emqttd router", emqttd_router}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, {"emqttd metrics", emqttd_metrics}, diff --git a/src/emqttd_backend.erl b/src/emqttd_backend.erl new file mode 100644 index 000000000..5515b2ac8 --- /dev/null +++ b/src/emqttd_backend.erl @@ -0,0 +1,95 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_backend). + +-include("emqttd.hrl"). + +-include_lib("stdlib/include/ms_transform.hrl"). + +%% Mnesia Callbacks +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +%% Retained Message API +-export([retain_message/1, read_messages/1, match_messages/1, delete_message/1, + expire_messages/1, retained_count/0]). + +-record(retained_message, {topic, msg}). + +%%-------------------------------------------------------------------- +%% Mnesia callbacks +%%-------------------------------------------------------------------- + +mnesia(boot) -> + ok = emqttd_mnesia:create_table(retained_message, [ + {type, ordered_set}, + {disc_copies, [node()]}, + {record_name, retained_message}, + {attributes, record_info(fields, retained_message)}, + {storage_properties, [{ets, [compressed]}, + {dets, [{auto_save, 1000}]}]}]); + +mnesia(copy) -> + ok = emqttd_mnesia:copy_table(retained_message). + +%%-------------------------------------------------------------------- +%% Retained Message +%%-------------------------------------------------------------------- + +-spec(retain_message(mqtt_message()) -> ok). +retain_message(Msg = #mqtt_message{topic = Topic}) -> + mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}). + +-spec(read_messages(binary()) -> [mqtt_message()]). +read_messages(Topic) -> + [Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)]. + +-spec(match_messages(binary()) -> [mqtt_message()]). +match_messages(Filter) -> + %% TODO: optimize later... + Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) -> + case emqttd_topic:match(Name, Filter) of + true -> [Msg|Acc]; + false -> Acc + end + end, + mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]). + +-spec(delete_message(binary()) -> ok). +delete_message(Topic) -> + mnesia:dirty_delete(retained_message, Topic). + +-spec(expire_messages(pos_integer()) -> any()). +expire_messages(Time) when is_integer(Time) -> + mnesia:transaction( + fun() -> + Match = ets:fun2ms( + fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = {MegaSecs, Secs, _}}}) + when Time > (MegaSecs * 1000000 + Secs) -> Topic + end), + Topics = mnesia:select(retained_message, Match, write), + lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages + (Topic) -> mnesia:delete({retained_message, Topic}) + end, Topics) + end). + +-spec(retained_count() -> non_neg_integer()). +retained_count() -> + mnesia:table_info(retained_message, size). + diff --git a/src/emqttd_cluster.erl b/src/emqttd_cluster.erl index 05c2ecf70..834de2d71 100644 --- a/src/emqttd_cluster.erl +++ b/src/emqttd_cluster.erl @@ -82,6 +82,5 @@ remove(Node) -> end. %% @doc Cluster status -status() -> - emqttd_mnesia:cluster_status(). +status() -> emqttd_mnesia:cluster_status(). diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 8bcaa7dc2..cdf5cba31 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -16,6 +16,8 @@ -module(emqttd_pubsub). +-author("Feng Lee "). + -behaviour(gen_server2). -include("emqttd.hrl"). @@ -24,121 +26,170 @@ -include("emqttd_internal.hrl"). -%% Mnesia Callbacks --export([mnesia/1]). +%% Init And Start +-export([init_tabs/0, start_link/3]). --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). +%% PubSub API. +-export([subscribe/1, subscribe/2, subscribe/3, publish/2, + unsubscribe/1, unsubscribe/2]). -%% API Exports --export([start_link/3, create_topic/1, lookup_topic/1]). +%% Async PubSub API. +-export([async_subscribe/1, async_subscribe/2, async_subscribe/3, + async_unsubscribe/1, async_unsubscribe/2]). --export([subscribe/2, unsubscribe/2, publish/2, dispatch/2, - async_subscribe/2, async_unsubscribe/2]). +%% Management API. +-export([setqos/3, topics/0, subscribers/1, is_subscribed/2, subscriptions/1]). + +%% Route API +-export([forward/3, dispatch/2]). + +%% Debug API +-export([dump/0]). %% gen_server. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {pool, id, env}). +-record(state, {pool, id, env, submon :: emqttd_pmon:pmon()}). + +-define(PUBSUB, ?MODULE). + +-define(is_local(Options), lists:member(local, Options)). + +-define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]). %%-------------------------------------------------------------------- -%% Mnesia callbacks +%% Init ETS Tables %%-------------------------------------------------------------------- -mnesia(boot) -> - ok = emqttd_mnesia:create_table(topic, [ - {ram_copies, [node()]}, - {record_name, mqtt_topic}, - {attributes, record_info(fields, mqtt_topic)}]); +init_tabs() -> + %% Create ETS Tabs + lists:foreach(fun create_tab/1, [subscriber, subscription, subproperty]). -mnesia(copy) -> - ok = emqttd_mnesia:copy_table(topic). +create_tab(subscriber) -> + %% Subscriber: Topic -> Sub1, {Share, Sub2}, {Share, Sub3}, ..., SubN + %% duplicate_bag: o(1) insert + ensure_tab(subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]); + +create_tab(subscription) -> + %% Subscription: Sub -> Topic1, {Share, Topic2}, {Share, Topic3}, ..., TopicN + %% bag: o(n) insert + ensure_tab(subscription, [public, named_table, bag | ?CONCURRENCY_OPTS]); + +create_tab(subproperty) -> + %% Subproperty: {Topic, Sub} -> [local, {qos, 1}, {share, <<"share">>}] + ensure_tab(subproperty, [public, named_table, ordered_set | ?CONCURRENCY_OPTS]). + +ensure_tab(Tab, Opts) -> + case ets:info(Tab, name) of undefined -> ets:new(Tab, Opts); _ -> ok end. %%-------------------------------------------------------------------- %% Start PubSub %%-------------------------------------------------------------------- %% @doc Start one pubsub --spec(start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when - Pool :: atom(), - Id :: pos_integer(), - Env :: list(tuple())). +-spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}). start_link(Pool, Id, Env) -> - gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). - -%% @doc Create a Topic. --spec(create_topic(binary()) -> ok | {error, any()}). -create_topic(Topic) when is_binary(Topic) -> - case mnesia:transaction(fun add_topic_/2, [Topic, [static]]) of - {atomic, ok} -> ok; - {aborted, Error} -> {error, Error} - end. - -%% @doc Lookup a Topic. --spec(lookup_topic(binary()) -> list(mqtt_topic())). -lookup_topic(Topic) when is_binary(Topic) -> - mnesia:dirty_read(topic, Topic). + gen_server2:start_link({local, ?PROC_NAME(?PUBSUB, Id)}, ?MODULE, [Pool, Id, Env], []). %%-------------------------------------------------------------------- %% PubSub API %%-------------------------------------------------------------------- %% @doc Subscribe a Topic --spec(subscribe(binary(), pid()) -> ok). -subscribe(Topic, SubPid) when is_binary(Topic) -> - call(pick(Topic), {subscribe, Topic, SubPid}). +-spec(subscribe(binary()) -> ok | emqttd:pubsub_error()). +subscribe(Topic) when is_binary(Topic) -> + subscribe(Topic, self()). -%% @doc Asynchronous Subscribe --spec(async_subscribe(binary(), pid()) -> ok). -async_subscribe(Topic, SubPid) when is_binary(Topic) -> - cast(pick(Topic), {subscribe, Topic, SubPid}). +-spec(subscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()). +subscribe(Topic, Subscriber) when is_binary(Topic) -> + subscribe(Topic, Subscriber, []). + +-spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> + ok | emqttd:pubsub_error()). +subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> + call(pick(Subscriber), {subscribe, Topic, Subscriber, Options}). + +%% @doc Subscribe a Topic Asynchronously +-spec(async_subscribe(binary()) -> ok). +async_subscribe(Topic) when is_binary(Topic) -> + async_subscribe(Topic, self()). + +-spec(async_subscribe(binary(), emqttd:subscriber()) -> ok). +async_subscribe(Topic, Subscriber) when is_binary(Topic) -> + async_subscribe(Topic, Subscriber, []). + +-spec(async_subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok). +async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> + cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}). %% @doc Publish message to Topic. --spec(publish(binary(), any()) -> any()). -publish(Topic, Msg) -> - lists:foreach( - fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() -> - ?MODULE:dispatch(To, Msg); - (#mqtt_route{topic = To, node = Node}) -> - rpc:cast(Node, ?MODULE, dispatch, [To, Msg]) - end, emqttd_router:match(Topic)). +-spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore). +publish(Topic, Msg) when is_binary(Topic) -> + route(emqttd_router:match(Topic), delivery(Msg)). + +%% Dispatch on the local node +route([#mqtt_route{topic = To, node = Node}], + Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() -> + dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]}); +%% Forward to other nodes +route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) -> + forward(Node, To, Delivery#mqtt_delivery{flows = [{route, Node, To}|Flows]}); + +route(Routes, Delivery) -> + {ok, lists:foldl(fun(Route, DelAcc) -> + {ok, DelAcc1} = route([Route], DelAcc), DelAcc1 + end, Delivery, Routes)}. + +delivery(Msg) -> #mqtt_delivery{message = Msg, flows = []}. + +%% @doc Forward message to another node... +forward(Node, To, Delivery) -> + rpc:cast(Node, ?PUBSUB, dispatch, [To, Delivery]), {ok, Delivery}. %% @doc Dispatch Message to Subscribers --spec(dispatch(binary(), mqtt_message()) -> ok). -dispatch(Queue = <<"$queue/", _Q/binary>>, Msg) -> - case subscribers(Queue) of - [] -> - dropped(Queue); - [SubPid] -> - SubPid ! {dispatch, Queue, Msg}; - SubPids -> - Idx = crypto:rand_uniform(1, length(SubPids) + 1), - SubPid = lists:nth(Idx, SubPids), - SubPid ! {dispatch, Queue, Msg} - end; - -dispatch(Topic, Msg) -> +-spec(dispatch(binary(), mqtt_delivery()) -> mqtt_delivery()). +dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) -> case subscribers(Topic) of [] -> - dropped(Topic); - [SubPid] -> - SubPid ! {dispatch, Topic, Msg}; - SubPids -> - lists:foreach(fun(SubPid) -> - SubPid ! {dispatch, Topic, Msg} - end, SubPids) + dropped(Topic), {ok, Delivery}; + [Sub] -> + dispatch(Sub, Topic, Msg), + {ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1}|Flows]}}; + Subscribers -> + Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows], + lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers), + {ok, Delivery#mqtt_delivery{flows = Flows1}} end. -%% @private -%% @doc Find all subscribers +dispatch(Pid, Topic, Msg) when is_pid(Pid) -> + Pid ! {dispatch, Topic, Msg}; +dispatch(SubId, Topic, Msg) when is_binary(SubId) -> + emqttd_sm:dispatch(SubId, Topic, Msg). + +topics() -> emqttd_router:topics(). + subscribers(Topic) -> - case ets:member(subscriber, Topic) of - true -> %% faster then lookup? - try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end; - false -> - [] - end. + try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end. + +subscriptions(Subscriber) -> + lists:map(fun({_, Topic}) -> + subscription(Topic, Subscriber) + end, ets:lookup(subscription, Subscriber)). + +subscription(Topic, Subscriber) -> + {Topic, ets:lookup_element(subproperty, {Topic, Subscriber}, 2)}. + +is_subscribed(Topic, Subscriber) when is_binary(Topic) -> + ets:member(subproperty, {Topic, Subscriber}). + +setqos(Topic, Subscriber, Qos) when is_binary(Topic) -> + call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}). + +dump() -> + [{subscriber, ets:tab2list(subscriber)}, + {subscription, ets:tab2list(subscription)}, + {subproperty, ets:tab2list(subproperty)}]. %% @private %% @doc Ingore $SYS Messages. @@ -148,14 +199,23 @@ dropped(_Topic) -> emqttd_metrics:inc('messages/dropped'). %% @doc Unsubscribe --spec(unsubscribe(binary(), pid()) -> ok). -unsubscribe(Topic, SubPid) when is_binary(Topic) -> - call(pick(Topic), {unsubscribe, Topic, SubPid}). +-spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()). +unsubscribe(Topic) when is_binary(Topic) -> + unsubscribe(Topic, self()). -%% @doc Asynchronous Unsubscribe --spec(async_unsubscribe(binary(), pid()) -> ok). -async_unsubscribe(Topic, SubPid) when is_binary(Topic) -> - cast(pick(Topic), {unsubscribe, Topic, SubPid}). +%% @doc Unsubscribe +-spec(unsubscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()). +unsubscribe(Topic, Subscriber) when is_binary(Topic) -> + call(pick(Subscriber), {unsubscribe, Topic, Subscriber}). + +%% @doc Async Unsubscribe +-spec(async_unsubscribe(binary()) -> ok). +async_unsubscribe(Topic) when is_binary(Topic) -> + async_unsubscribe(Topic, self()). + +-spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok). +async_unsubscribe(Topic, Subscriber) when is_binary(Topic) -> + cast(pick(Subscriber), {unsubscribe, Topic, Subscriber}). call(PubSub, Req) when is_pid(PubSub) -> gen_server2:call(PubSub, Req, infinity). @@ -172,30 +232,56 @@ pick(Topic) -> init([Pool, Id, Env]) -> ?GPROC_POOL(join, Pool, Id), - {ok, #state{pool = Pool, id = Id, env = Env}}. + {ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}. -handle_call({subscribe, Topic, SubPid}, _From, State) -> - add_subscriber_(Topic, SubPid), - {reply, ok, setstats(State)}; +handle_call({subscribe, Topic, Subscriber, Options}, _From, State) -> + case do_subscribe_(Topic, Subscriber, Options, State) of + {ok, NewState} -> {reply, ok, setstats(NewState)}; + {error, Error} -> {reply, {error, Error}, State} + end; -handle_call({unsubscribe, Topic, SubPid}, _From, State) -> - del_subscriber_(Topic, SubPid), - {reply, ok, setstats(State)}; +handle_call({unsubscribe, Topic, Subscriber}, _From, State) -> + case do_unsubscribe_(Topic, Subscriber, State) of + {ok, NewState} -> {reply, ok, setstats(NewState), hibernate}; + {error, Error} -> {reply, {error, Error}, State} + end; + +handle_call({setqos, Topic, Subscriber, Qos}, _From, State) -> + Key = {Topic, Subscriber}, + case ets:lookup(subproperty, Key) of + [{_, Opts}] -> + Opts1 = lists:ukeymerge(1, [{qos, Qos}], Opts), + ets:insert(subproperty, {Key, Opts1}), + {reply, ok, State}; + [] -> + {reply, {error, {subscription_not_found, Topic}}, State} + end; handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). -handle_cast({subscribe, Topic, SubPid}, State) -> - add_subscriber_(Topic, SubPid), - {noreply, setstats(State)}; +handle_cast({subscribe, Topic, Subscriber, Options}, State) -> + case do_subscribe_(Topic, Subscriber, Options, State) of + {ok, NewState} -> {noreply, setstats(NewState)}; + {error, _Error} -> {noreply, State} + end; -handle_cast({unsubscribe, Topic, SubPid}, State) -> - del_subscriber_(Topic, SubPid), - {noreply, setstats(State)}; +handle_cast({unsubscribe, Topic, Subscriber}, State) -> + case do_unsubscribe_(Topic, Subscriber, State) of + {ok, NewState} -> {noreply, setstats(NewState), hibernate}; + {error, _Error} -> {noreply, State} + end; handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) -> + lists:foreach(fun({_, Topic}) -> + subscriber_down_(DownPid, Topic) + end, ets:lookup(subscription, DownPid)), + ets:delete(subscription, DownPid), + {noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate}; + handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). @@ -209,62 +295,79 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Functions %%-------------------------------------------------------------------- -add_subscriber_(Topic, SubPid) -> - case ets:member(subscriber, Topic) of - false -> - mnesia:transaction(fun add_topic_route_/2, [Topic, node()]), - setstats(topic); - true -> - ok - end, - ets:insert(subscriber, {Topic, SubPid}). - -del_subscriber_(Topic, SubPid) -> - ets:delete_object(subscriber, {Topic, SubPid}), - case ets:lookup(subscriber, Topic) of +do_subscribe_(Topic, Subscriber, Options, State) -> + case ets:lookup(subproperty, {Topic, Subscriber}) of [] -> - mnesia:transaction(fun del_topic_route_/2, [Topic, node()]), - setstats(topic); - [_|_] -> - ok + do_subscribe2_(Topic, Subscriber, Options), + ets:insert(subproperty, {{Topic, Subscriber}, Options}), + {ok, monitor_subpid(Subscriber, State)}; + [_] -> + {error, {already_subscribed, Topic}} end. -add_topic_route_(Topic, Node) -> - add_topic_(Topic), emqttd_router:add_route(Topic, Node). +do_subscribe2_(Topic, Subscriber, _Options) -> + add_subscription_(Subscriber, Topic), + add_subscriber_(Topic, Subscriber). -add_topic_(Topic) -> - add_topic_(Topic, []). +add_subscription_(Subscriber, Topic) -> + ets:insert(subscription, {Subscriber, Topic}). -add_topic_(Topic, Flags) -> - Record = #mqtt_topic{topic = Topic, flags = Flags}, - case mnesia:wread({topic, Topic}) of - [] -> mnesia:write(topic, Record, write); - [_] -> ok +add_subscriber_(Topic, Subscriber) -> + %%TODO: LOCK here... + case ets:member(subscriber, Topic) of + false -> emqttd_router:add_route(Topic, node()); + true -> ok + end, + ets:insert(subscriber, {Topic, Subscriber}). + +do_unsubscribe_(Topic, Subscriber, State) -> + case ets:lookup(subproperty, {Topic, Subscriber}) of + [_] -> + del_subscriber_(Topic, Subscriber), + del_subscription(Subscriber, Topic), + ets:delete(subproperty, {Topic, Subscriber}), + {ok, case ets:member(subscription, Subscriber) of + true -> State; + false -> demonitor_subpid(Subscriber, State) + end}; + [] -> + {error, {subscription_not_found, Topic}} end. -del_topic_route_(Topic, Node) -> - emqttd_router:del_route(Topic, Node), del_topic_(Topic). +del_subscription(Subscriber, Topic) -> + ets:delete_object(subscription, {Subscriber, Topic}). -del_topic_(Topic) -> - case emqttd_router:has_route(Topic) of - true -> ok; - false -> do_del_topic_(Topic) +del_subscriber_(Topic, Subscriber) -> + ets:delete_object(subscriber, {Topic, Subscriber}), + %%TODO: LOCK TOPIC + case ets:member(subscriber, Topic) of + false -> emqttd_router:del_route(Topic, node()); + true -> ok end. -do_del_topic_(Topic) -> - case mnesia:wread({topic, Topic}) of - [#mqtt_topic{flags = []}] -> - mnesia:delete(topic, Topic, write); - _ -> - ok +subscriber_down_(DownPid, Topic) -> + case ets:lookup(subproperty, {Topic, DownPid}) of + [] -> del_subscriber_(Topic, DownPid); %%TODO: warning? + [_] -> del_subscriber_(Topic, DownPid), + ets:delete(subproperty, {Topic, DownPid}) end. +monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> + State#state{submon = PMon:monitor(SubPid)}; +monitor_subpid(_SubPid, State) -> + State. + +demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> + State#state{submon = PMon:demonitor(SubPid)}; +demonitor_subpid(_SubPid, State) -> + State. + setstats(State) when is_record(State, state) -> - setstats(subscriber), State; - -setstats(topic) -> - emqttd_stats:setstats('topics/count', 'topics/max', mnesia:table_info(topic, size)); + setstats(subscriber), setstats(subscription), State; setstats(subscriber) -> - emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(subscriber, size)). + emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(subscriber, size)); + +setstats(subscription) -> + emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', ets:info(subscription, size)). diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 1677cec8d..72d4fce68 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -19,10 +19,6 @@ -behaviour(supervisor). --include("emqttd.hrl"). - --define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]). - %% API -export([start_link/0, pubsub_pool/0]). @@ -36,41 +32,13 @@ pubsub_pool() -> hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]). init([Env]) -> - - %% Create ETS Tabs - create_tab(subscriber), create_tab(subscribed), - - %% Router - Router = {router, {emqttd_router, start_link, []}, - permanent, 5000, worker, [emqttd_router]}, - - %% PubSub Pool Sup - PubSubMFA = {emqttd_pubsub, start_link, [Env]}, - PubSubPoolSup = emqttd_pool_sup:spec(pubsub_pool, [pubsub, hash, pool_size(Env), PubSubMFA]), - - %% Server Pool Sup - ServerMFA = {emqttd_server, start_link, [Env]}, - ServerPoolSup = emqttd_pool_sup:spec(server_pool, [server, hash, pool_size(Env), ServerMFA]), - - {ok, {{one_for_all, 5, 60}, [Router, PubSubPoolSup, ServerPoolSup]}}. + PubSub = emqttd:conf(pubsub_adapter), + PubSubMFA = {PubSub, start_link, [Env]}, + PoolArgs = [pubsub, hash, pool_size(Env), PubSubMFA], + PubSubPoolSup = emqttd_pool_sup:spec(pubsub_pool, PoolArgs), + {ok, { {one_for_all, 10, 3600}, [PubSubPoolSup]} }. pool_size(Env) -> Schedulers = erlang:system_info(schedulers), proplists:get_value(pool_size, Env, Schedulers). -create_tab(subscriber) -> - %% subscriber: Topic -> Pid1, Pid2, ..., PidN - %% duplicate_bag: o(1) insert - ensure_tab(subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]); - -create_tab(subscribed) -> - %% subscribed: Pid -> Topic1, Topic2, ..., TopicN - %% bag: o(n) insert - ensure_tab(subscribed, [public, named_table, bag | ?CONCURRENCY_OPTS]). - -ensure_tab(Tab, Opts) -> - case ets:info(Tab, name) of - undefined -> ets:new(Tab, Opts); - _ -> ok - end. - diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl deleted file mode 100644 index 1466d8a7a..000000000 --- a/src/emqttd_server.erl +++ /dev/null @@ -1,278 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_server). - --behaviour(gen_server2). - --include("emqttd.hrl"). - --include("emqttd_protocol.hrl"). - --include("emqttd_internal.hrl"). - -%% Mnesia Callbacks --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - -%% API Exports --export([start_link/3]). - -%% PubSub API --export([subscribe/1, subscribe/3, publish/1, unsubscribe/1, unsubscribe/3, - lookup_subscription/1, update_subscription/4]). - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {pool, id, env, monitors}). - -%%-------------------------------------------------------------------- -%% Mnesia callbacks -%%-------------------------------------------------------------------- - -mnesia(boot) -> - ok = emqttd_mnesia:create_table(subscription, [ - {type, bag}, - {ram_copies, [node()]}, - {local_content, true}, %% subscription table is local - {record_name, mqtt_subscription}, - {attributes, record_info(fields, mqtt_subscription)}]); - -mnesia(copy) -> - ok = emqttd_mnesia:copy_table(subscription). - -%%-------------------------------------------------------------------- -%% Start server -%%-------------------------------------------------------------------- - -%% @doc Start a Server --spec(start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when - Pool :: atom(), - Id :: pos_integer(), - Env :: list(tuple())). -start_link(Pool, Id, Env) -> - gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). - -%%-------------------------------------------------------------------- -%% PubSub API -%%-------------------------------------------------------------------- - -%% @doc Subscribe a Topic --spec(subscribe(binary()) -> ok). -subscribe(Topic) when is_binary(Topic) -> - From = self(), call(server(From), {subscribe, From, Topic}). - -%% @doc Subscribe from a MQTT session. --spec(subscribe(binary(), binary(), mqtt_qos()) -> ok). -subscribe(ClientId, Topic, Qos) -> - From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}). - -%% @doc Lookup subscriptions. --spec(lookup_subscription(binary()) -> [#mqtt_subscription{}]). -lookup_subscription(ClientId) -> - mnesia:dirty_read(subscription, ClientId). - -%% @doc Update a subscription. --spec(update_subscription(binary(), binary(), mqtt_qos(), mqtt_qos()) -> ok). -update_subscription(ClientId, Topic, OldQos, NewQos) -> - call(server(self()), {update_subscription, ClientId, Topic, ?QOS_I(OldQos), ?QOS_I(NewQos)}). - -%% @doc Publish a Message --spec(publish(Msg :: mqtt_message()) -> any()). -publish(Msg = #mqtt_message{from = From}) -> - trace(publish, From, Msg), - case emqttd:run_hooks('message.publish', [], Msg) of - {ok, Msg1 = #mqtt_message{topic = Topic}} -> - %% Retain message first. Don't create retained topic. - Msg2 = case emqttd_retainer:retain(Msg1) of - ok -> emqttd_message:unset_flag(Msg1); - ignore -> Msg1 - end, - emqttd_pubsub:publish(Topic, Msg2); - {stop, Msg1} -> - lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)]) - end. - -%% @doc Unsubscribe a Topic --spec(unsubscribe(binary()) -> ok). -unsubscribe(Topic) when is_binary(Topic) -> - From = self(), call(server(From), {unsubscribe, From, Topic}). - -%% @doc Unsubscribe a Topic from a MQTT session --spec(unsubscribe(binary(), binary(), mqtt_qos()) -> ok). -unsubscribe(ClientId, Topic, Qos) -> - From = self(), call(server(From), {unsubscribe, From, ClientId, Topic, Qos}). - -call(Server, Req) -> - gen_server2:call(Server, Req, infinity). - -server(From) -> - gproc_pool:pick_worker(server, From). - -%%-------------------------------------------------------------------- -%% gen_server Callbacks -%%-------------------------------------------------------------------- - -init([Pool, Id, Env]) -> - ?GPROC_POOL(join, Pool, Id), - {ok, #state{pool = Pool, id = Id, env = Env, monitors = dict:new()}}. - -handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) -> - pubsub_subscribe_(SubPid, Topic), - if_subsciption(State, fun() -> - add_subscription_(ClientId, Topic, Qos), - set_subscription_stats() - end), - ok(monitor_subscriber_(ClientId, SubPid, State)); - -handle_call({subscribe, SubPid, Topic}, _From, State) -> - pubsub_subscribe_(SubPid, Topic), - ok(monitor_subscriber_(undefined, SubPid, State)); - -handle_call({update_subscription, ClientId, Topic, OldQos, NewQos}, _From, State) -> - if_subsciption(State, fun() -> - OldSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = OldQos}, - NewSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = NewQos}, - mnesia:transaction(fun update_subscription_/2, [OldSub, NewSub]), - set_subscription_stats() - end), ok(State); - -handle_call({unsubscribe, SubPid, ClientId, Topic, Qos}, _From, State) -> - pubsub_unsubscribe_(SubPid, Topic), - if_subsciption(State, fun() -> - del_subscription_(ClientId, Topic, Qos), - set_subscription_stats() - end), ok(State); - -handle_call({unsubscribe, SubPid, Topic}, _From, State) -> - pubsub_unsubscribe_(SubPid, Topic), ok(State); - -handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). - -handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). - -handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{monitors = Monitors}) -> - %% unsubscribe - lists:foreach(fun({_, Topic}) -> - emqttd_pubsub:async_unsubscribe(Topic, DownPid) - end, ets:lookup(subscribed, DownPid)), - ets:delete(subscribed, DownPid), - - %% clean subscriptions - case dict:find(DownPid, Monitors) of - {ok, {undefined, _}} -> ok; - {ok, {ClientId, _}} -> mnesia:dirty_delete(subscription, ClientId); - error -> ok - end, - {noreply, State#state{monitors = dict:erase(DownPid, Monitors)}, hibernate}; - -handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). - -terminate(_Reason, #state{pool = Pool, id = Id}) -> - ?GPROC_POOL(leave, Pool, Id). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal Functions -%%-------------------------------------------------------------------- - -if_subsciption(#state{env = Env}, Fun) -> - case proplists:get_value(subscription, Env, true) of - false -> ok; - _true -> Fun() - end. - -%% @private -%% @doc Add a subscription. --spec(add_subscription_(binary(), binary(), mqtt_qos()) -> ok). -add_subscription_(ClientId, Topic, Qos) -> - add_subscription_(#mqtt_subscription{subid = ClientId, topic = Topic, qos = Qos}). - --spec(add_subscription_(mqtt_subscription()) -> ok). -add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) -> - mnesia:dirty_write(subscription, Subscription). - -update_subscription_(OldSub, NewSub) -> - mnesia:delete_object(subscription, OldSub, write), - mnesia:write(subscription, NewSub, write). - -%% @private -%% @doc Delete a subscription --spec(del_subscription_(binary(), binary(), mqtt_qos()) -> ok). -del_subscription_(ClientId, Topic, Qos) -> - del_subscription_(#mqtt_subscription{subid = ClientId, topic = Topic, qos = Qos}). - -del_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) -> - mnesia:dirty_delete_object(subscription, Subscription). - -%% @private -%% @doc Call pubsub to subscribe -pubsub_subscribe_(SubPid, Topic) -> - case ets:match(subscribed, {SubPid, Topic}) of - [] -> - emqttd_pubsub:async_subscribe(Topic, SubPid), - ets:insert(subscribed, {SubPid, Topic}); - [_] -> - false - end. - -%% @private -pubsub_unsubscribe_(SubPid, Topic) -> - emqttd_pubsub:async_unsubscribe(Topic, SubPid), - ets:delete_object(subscribed, {SubPid, Topic}). - -monitor_subscriber_(ClientId, SubPid, State = #state{monitors = Monitors}) -> - case dict:find(SubPid, Monitors) of - {ok, _} -> - State; - error -> - MRef = erlang:monitor(process, SubPid), - State#state{monitors = dict:store(SubPid, {ClientId, MRef}, Monitors)} - end. - -%%-------------------------------------------------------------------- -%% Trace Functions -%%-------------------------------------------------------------------- - -trace(publish, From, _Msg) when is_atom(From) -> - %% Dont' trace '$SYS' publish - ignore; - -trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) -> - lager:info([{client, From}, {topic, Topic}], - "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). - -%%-------------------------------------------------------------------- -%% Subscription Statistics -%%-------------------------------------------------------------------- - -set_subscription_stats() -> - emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', - mnesia:table_info(subscription, size)). - -%%-------------------------------------------------------------------- - -ok(State) -> {reply, ok, State}. - diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 6e0586b94..6f2b3c6dd 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -297,11 +297,11 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session), SubDict; {ok, OldQos} -> - emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos), + emqttd_pubsub:setqos(Topic, ClientId, Qos), ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session), dict:store(Topic, Qos, SubDict); error -> - emqttd:subscribe(ClientId, Topic, Qos), + emqttd:subscribe(Topic, ClientId, [{qos, Qos}]), %%TODO: the design is ugly... %% : 3.8.4 %% Where the Topic Filter is not identical to any existing Subscription’s filter,