From 201b7c414ad8f47d2940c078b2152d8773e2cbc3 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 5 May 2015 16:41:45 +0800 Subject: [PATCH] queue support... --- apps/emqtt/src/emqtt_topic.erl | 12 ++- apps/emqttd/include/emqttd.hrl | 11 +++ apps/emqttd/include/emqttd_systop.hrl | 3 + apps/emqttd/src/emqttd_protocol.erl | 1 + apps/emqttd/src/emqttd_pubsub.erl | 116 ++++++++++++++++++++++---- 5 files changed, 125 insertions(+), 18 deletions(-) diff --git a/apps/emqtt/src/emqtt_topic.erl b/apps/emqtt/src/emqtt_topic.erl index 9d384dab0..c9a057aaa 100644 --- a/apps/emqtt/src/emqtt_topic.erl +++ b/apps/emqtt/src/emqtt_topic.erl @@ -32,7 +32,7 @@ -export([match/2, validate/1, triples/1, words/1, wildcard/1]). --export([systop/1]). +-export([is_queue/1, systop/1]). %-type type() :: static | dynamic. @@ -166,6 +166,16 @@ word(<<"+">>) -> '+'; word(<<"#">>) -> '#'; word(Bin) -> Bin. +%%------------------------------------------------------------------------------ +%% @doc Queue is a special topic name that starts with "$Q/" +%% @end +%%------------------------------------------------------------------------------ +-spec is_queue(binary()) -> boolean(). +is_queue(<<"$Q/", _Queue/binary>>) -> + true; +is_queue(_) -> + false. + %%------------------------------------------------------------------------------ %% @doc '$SYS' Topic. %% @end diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index a1523c22d..9c2ab934a 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -62,6 +62,17 @@ -type mqtt_subscriber() :: #mqtt_subscriber{}. +%%------------------------------------------------------------------------------ +%% P2P Queue Subscriber +%%------------------------------------------------------------------------------ +-record(mqtt_queue, { + name :: binary(), + subpid :: pid(), + qos = 0 :: 0 | 1 | 2 +}). + +-type mqtt_queue() :: #mqtt_queue{}. + %%------------------------------------------------------------------------------ %% MQTT Client %%------------------------------------------------------------------------------ diff --git a/apps/emqttd/include/emqttd_systop.hrl b/apps/emqttd/include/emqttd_systop.hrl index 93ceff73c..d2c929273 100644 --- a/apps/emqttd/include/emqttd_systop.hrl +++ b/apps/emqttd/include/emqttd_systop.hrl @@ -59,7 +59,10 @@ %% $SYS Topics for Subscribers %%------------------------------------------------------------------------------ -define(SYSTOP_PUBSUB, [ + 'queues/count', % ... + 'queues/max', % ... 'topics/count', % ... + 'topics/max', % ... 'subscribers/count', % ... 'subscribers/max' % ... ]). diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index d215fd852..ed621bffa 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -139,6 +139,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = ReturnCode -> {ReturnCode, State1} end, + %%TODO: this is not right... notify(connected, ReturnCode1, State2), send(?CONNACK_PACKET(ReturnCode1), State2), %%Starting session diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 1de9ec0f2..48abd56f8 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -64,6 +64,12 @@ %%% Mnesia callbacks %%%============================================================================= mnesia(boot) -> + %% p2p queue table + ok = emqttd_mnesia:create_table(queue, [ + {type, set}, + {ram_copies, [node()]}, + {record_name, mqtt_queue}, + {attributes, record_info(fields, mqtt_queue)}]), %% topic table ok = emqttd_mnesia:create_table(topic, [ {type, bag}, @@ -80,6 +86,7 @@ mnesia(boot) -> {local_content, true}]); mnesia(copy) -> + ok = emqttd_mnesia:copy_table(queue), ok = emqttd_mnesia:copy_table(topic), ok = emqttd_mnesia:copy_table(subscriber). @@ -102,6 +109,9 @@ start_link(Id, Opts) -> %% @end %%------------------------------------------------------------------------------ -spec create(Topic :: binary()) -> ok | {error, Error :: any()}. +create(<<"$Q/", _Queue/binary>>) -> + %% protecte from queue + {error, cannot_create_queue}; create(Topic) when is_binary(Topic) -> TopicR = #mqtt_topic{topic = Topic, node = node()}, case mnesia:transaction(fun add_topic/1, [TopicR]) of @@ -148,7 +158,7 @@ cast(Msg) -> %% @end %%------------------------------------------------------------------------------ -spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok. -publish(From, Msg=#mqtt_message{topic=Topic}) -> +publish(From, #mqtt_message{topic=Topic} = Msg) -> lager:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s", [From, Topic]), %% Retain message first. Don't create retained topic. case emqttd_msg_store:retain(Msg) of @@ -158,6 +168,16 @@ publish(From, Msg=#mqtt_message{topic=Topic}) -> ignore -> publish(From, Topic, Msg) end. + +publish(_From, <<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) -> + lists:foreach( + fun(#mqtt_queue{subpid = SubPid, qos = SubQos}) -> + Msg1 = if + Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; + true -> Msg + end, + SubPid ! {dispatch, {self(), Msg1}} + end, mnesia:dirty_read(queue, Queue)); publish(_From, Topic, Msg) when is_binary(Topic) -> lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) -> @@ -172,7 +192,7 @@ publish(_From, Topic, Msg) when is_binary(Topic) -> %% @end %%------------------------------------------------------------------------------ -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). -dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> +dispatch(Topic, #mqtt_message{qos = Qos} = Msg ) when is_binary(Topic) -> Subscribers = mnesia:dirty_read(subscriber, Topic), setstats(dropped, Subscribers =:= []), %%TODO:... lists:foreach( @@ -200,23 +220,43 @@ init([Id, _Opts]) -> {ok, #state{id = Id, submap = maps:new()}}. handle_call({subscribe, SubPid, Topics}, _From, State) -> - TopicSubs = lists:map(fun({Topic, Qos}) -> - {#mqtt_topic{topic = Topic, node = node()}, - #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}} + TopicSubs = lists:map(fun({<<"$Q/", _/binary>> = Queue, Qos}) -> + #mqtt_queue{name = Queue, subpid = SubPid, qos = Qos}; + ({Topic, Qos}) -> + {#mqtt_topic{topic = Topic, node = node()}, + #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}} end, Topics), F = fun() -> - lists:map(fun add_subscriber/1, TopicSubs) + lists:map(fun(QueueR) when is_record(QueueR, mqtt_queue) -> + add_queue(QueueR); + (TopicSub) -> + add_subscriber(TopicSub) + end, TopicSubs) end, case mnesia:transaction(F) of {atomic, _Result} -> setstats(all), NewState = monitor_subscriber(SubPid, State), - %% grant all qos + %%TODO: grant all qos {reply, {ok, [Qos || {_Topic, Qos} <- Topics]}, NewState}; {aborted, Error} -> {reply, {error, Error}, State} end; +handle_call({subscribe, SubPid, <<"$Q/", _/binary>> = Queue, Qos}, _From, State) -> + case mnesia:dirty_read(queue, Queue) of + [OldQueueR] -> lager:error("Queue is overwrited by ~p: ~p", [SubPid, OldQueueR]); + [] -> ok + end, + QueueR = #mqtt_queue{name = Queue, subpid = SubPid, qos = Qos}, + case mnesia:transaction(fun add_queue/1, [QueueR]) of + {atomic, ok} -> + setstats(queues), + {reply, {ok, Qos}, monitor_subscriber(SubPid, State)}; + {aborted, Error} -> + {reply, {error, Error}, State} + end; + handle_call({subscribe, SubPid, Topic, Qos}, _From, State) -> TopicR = #mqtt_topic{topic = Topic, node = node()}, Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}, @@ -233,11 +273,21 @@ handle_call(Req, _From, State) -> {reply, {error, badreq}, State}. handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) -> - TopicSubs = lists:map(fun(Topic) -> - {#mqtt_topic{topic = Topic, node = node()}, - #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}} + + TopicSubs = lists:map(fun(<<"$Q/", _/binary>> = Queue) -> + #mqtt_queue{name = Queue, subpid = SubPid}; + (Topic) -> + {#mqtt_topic{topic = Topic, node = node()}, + #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}} end, Topics), - F = fun() -> lists:foreach(fun remove_subscriber/1, TopicSubs) end, + F = fun() -> + lists:foreach( + fun(QueueR) when is_record(QueueR, mqtt_queue) -> + remove_queue(QueueR); + (TopicSub) -> + remove_subscriber(TopicSub) + end, TopicSubs) + end, case mnesia:transaction(F) of {atomic, _} -> ok; {aborted, Error} -> lager:error("unsubscribe ~p error: ~p", [Topics, Error]) @@ -245,6 +295,16 @@ handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) -> setstats(all), {noreply, State}; +handle_cast({unsubscribe, SubPid, <<"$Q/", _/binary>> = Queue}, State) -> + QueueR = #mqtt_queue{name = Queue, subpid = SubPid}, + case mnesia:transaction(fun remove_queue/1, [QueueR]) of + {atomic, _} -> + setstats(queues); + {aborted, Error} -> + lager:error("unsubscribe queue ~s error: ~p", [Queue, Error]) + end, + {noreply, State}; + handle_cast({unsubscribe, SubPid, Topic}, State) -> TopicR = #mqtt_topic{topic = Topic, node = node()}, Subscriber = #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}, @@ -264,6 +324,13 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa true -> Node = node(), F = fun() -> + %% remove queue... + Queues = mnesia:match_object(queue, #mqtt_queue{subpid = DownPid, _ = '_'}, write), + lists:foreach(fun(QueueR) -> + mnesia:delete_object(queue, QueueR, write) + end, Queues), + + %% remove subscribers... Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.pid), lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) -> mnesia:delete_object(subscriber, Sub, write), @@ -302,6 +369,9 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= +add_queue(QueueR) -> + mnesia:write(queue, QueueR, write). + add_topic(TopicR = #mqtt_topic{topic = Topic}) -> case mnesia:wread({topic, Topic}) of [] -> @@ -331,6 +401,14 @@ monitor_subscriber(SubPid, State = #state{submap = SubMap}) -> end, State#state{submap = NewSubMap}. +remove_queue(#mqtt_queue{name = Name, subpid = Pid}) -> + case mnesia:wread({queue, Name}) of + [R = #mqtt_queue{subpid = Pid}] -> + mnesia:delete(queue, R, write); + _ -> + ok + end. + remove_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) -> [mnesia:delete_object(subscriber, Sub, write) || Sub <- mnesia:match_object(subscriber, Subscriber, write)], @@ -351,20 +429,24 @@ try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> %%%============================================================================= %%% Stats functions %%%============================================================================= + setstats(all) -> - setstats(topics), - setstats(subscribers); + [setstats(Stat) || Stat <- [queues, topics, subscribers]]; + +setstats(queues) -> + emqttd_stats:setstats('queues/count', 'queues/max', + mnesia:table_info(queue, size)); + setstats(topics) -> - emqttd_stats:setstat('topics/count', + emqttd_stats:setstat('topics/count', 'topics/max', mnesia:table_info(topic, size)); setstats(subscribers) -> - emqttd_stats:setstats('subscribers/count', - 'subscribers/max', + emqttd_stats:setstats('subscribers/count', 'subscribers/max', mnesia:table_info(subscriber, size)). +%%TODO: queue dropped? setstats(dropped, false) -> ignore; setstats(dropped, true) -> emqttd_metrics:inc('messages/dropped'). -