queue support...

This commit is contained in:
Feng Lee 2015-05-05 16:41:45 +08:00
parent 435ae67f2d
commit 201b7c414a
5 changed files with 125 additions and 18 deletions

View File

@ -32,7 +32,7 @@
-export([match/2, validate/1, triples/1, words/1, wildcard/1]).
-export([systop/1]).
-export([is_queue/1, systop/1]).
%-type type() :: static | dynamic.
@ -166,6 +166,16 @@ word(<<"+">>) -> '+';
word(<<"#">>) -> '#';
word(Bin) -> Bin.
%%------------------------------------------------------------------------------
%% @doc Queue is a special topic name that starts with "$Q/"
%% @end
%%------------------------------------------------------------------------------
-spec is_queue(binary()) -> boolean().
is_queue(<<"$Q/", _Queue/binary>>) ->
true;
is_queue(_) ->
false.
%%------------------------------------------------------------------------------
%% @doc '$SYS' Topic.
%% @end

View File

@ -62,6 +62,17 @@
-type mqtt_subscriber() :: #mqtt_subscriber{}.
%%------------------------------------------------------------------------------
%% P2P Queue Subscriber
%%------------------------------------------------------------------------------
-record(mqtt_queue, {
name :: binary(),
subpid :: pid(),
qos = 0 :: 0 | 1 | 2
}).
-type mqtt_queue() :: #mqtt_queue{}.
%%------------------------------------------------------------------------------
%% MQTT Client
%%------------------------------------------------------------------------------

View File

@ -59,7 +59,10 @@
%% $SYS Topics for Subscribers
%%------------------------------------------------------------------------------
-define(SYSTOP_PUBSUB, [
'queues/count', % ...
'queues/max', % ...
'topics/count', % ...
'topics/max', % ...
'subscribers/count', % ...
'subscribers/max' % ...
]).

View File

@ -139,6 +139,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
ReturnCode ->
{ReturnCode, State1}
end,
%%TODO: this is not right...
notify(connected, ReturnCode1, State2),
send(?CONNACK_PACKET(ReturnCode1), State2),
%%Starting session

View File

@ -64,6 +64,12 @@
%%% Mnesia callbacks
%%%=============================================================================
mnesia(boot) ->
%% p2p queue table
ok = emqttd_mnesia:create_table(queue, [
{type, set},
{ram_copies, [node()]},
{record_name, mqtt_queue},
{attributes, record_info(fields, mqtt_queue)}]),
%% topic table
ok = emqttd_mnesia:create_table(topic, [
{type, bag},
@ -80,6 +86,7 @@ mnesia(boot) ->
{local_content, true}]);
mnesia(copy) ->
ok = emqttd_mnesia:copy_table(queue),
ok = emqttd_mnesia:copy_table(topic),
ok = emqttd_mnesia:copy_table(subscriber).
@ -102,6 +109,9 @@ start_link(Id, Opts) ->
%% @end
%%------------------------------------------------------------------------------
-spec create(Topic :: binary()) -> ok | {error, Error :: any()}.
create(<<"$Q/", _Queue/binary>>) ->
%% protecte from queue
{error, cannot_create_queue};
create(Topic) when is_binary(Topic) ->
TopicR = #mqtt_topic{topic = Topic, node = node()},
case mnesia:transaction(fun add_topic/1, [TopicR]) of
@ -148,7 +158,7 @@ cast(Msg) ->
%% @end
%%------------------------------------------------------------------------------
-spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok.
publish(From, Msg=#mqtt_message{topic=Topic}) ->
publish(From, #mqtt_message{topic=Topic} = Msg) ->
lager:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s", [From, Topic]),
%% Retain message first. Don't create retained topic.
case emqttd_msg_store:retain(Msg) of
@ -159,6 +169,16 @@ publish(From, Msg=#mqtt_message{topic=Topic}) ->
publish(From, Topic, Msg)
end.
publish(_From, <<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->
lists:foreach(
fun(#mqtt_queue{subpid = SubPid, qos = SubQos}) ->
Msg1 = if
Qos > SubQos -> Msg#mqtt_message{qos = SubQos};
true -> Msg
end,
SubPid ! {dispatch, {self(), Msg1}}
end, mnesia:dirty_read(queue, Queue));
publish(_From, Topic, Msg) when is_binary(Topic) ->
lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) ->
case Node =:= node() of
@ -172,7 +192,7 @@ publish(_From, Topic, Msg) when is_binary(Topic) ->
%% @end
%%------------------------------------------------------------------------------
-spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
dispatch(Topic, #mqtt_message{qos = Qos} = Msg ) when is_binary(Topic) ->
Subscribers = mnesia:dirty_read(subscriber, Topic),
setstats(dropped, Subscribers =:= []), %%TODO:...
lists:foreach(
@ -200,23 +220,43 @@ init([Id, _Opts]) ->
{ok, #state{id = Id, submap = maps:new()}}.
handle_call({subscribe, SubPid, Topics}, _From, State) ->
TopicSubs = lists:map(fun({Topic, Qos}) ->
{#mqtt_topic{topic = Topic, node = node()},
#mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}}
TopicSubs = lists:map(fun({<<"$Q/", _/binary>> = Queue, Qos}) ->
#mqtt_queue{name = Queue, subpid = SubPid, qos = Qos};
({Topic, Qos}) ->
{#mqtt_topic{topic = Topic, node = node()},
#mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}}
end, Topics),
F = fun() ->
lists:map(fun add_subscriber/1, TopicSubs)
lists:map(fun(QueueR) when is_record(QueueR, mqtt_queue) ->
add_queue(QueueR);
(TopicSub) ->
add_subscriber(TopicSub)
end, TopicSubs)
end,
case mnesia:transaction(F) of
{atomic, _Result} ->
setstats(all),
NewState = monitor_subscriber(SubPid, State),
%% grant all qos
%%TODO: grant all qos
{reply, {ok, [Qos || {_Topic, Qos} <- Topics]}, NewState};
{aborted, Error} ->
{reply, {error, Error}, State}
end;
handle_call({subscribe, SubPid, <<"$Q/", _/binary>> = Queue, Qos}, _From, State) ->
case mnesia:dirty_read(queue, Queue) of
[OldQueueR] -> lager:error("Queue is overwrited by ~p: ~p", [SubPid, OldQueueR]);
[] -> ok
end,
QueueR = #mqtt_queue{name = Queue, subpid = SubPid, qos = Qos},
case mnesia:transaction(fun add_queue/1, [QueueR]) of
{atomic, ok} ->
setstats(queues),
{reply, {ok, Qos}, monitor_subscriber(SubPid, State)};
{aborted, Error} ->
{reply, {error, Error}, State}
end;
handle_call({subscribe, SubPid, Topic, Qos}, _From, State) ->
TopicR = #mqtt_topic{topic = Topic, node = node()},
Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid},
@ -233,11 +273,21 @@ handle_call(Req, _From, State) ->
{reply, {error, badreq}, State}.
handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) ->
TopicSubs = lists:map(fun(Topic) ->
{#mqtt_topic{topic = Topic, node = node()},
#mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}}
TopicSubs = lists:map(fun(<<"$Q/", _/binary>> = Queue) ->
#mqtt_queue{name = Queue, subpid = SubPid};
(Topic) ->
{#mqtt_topic{topic = Topic, node = node()},
#mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}}
end, Topics),
F = fun() -> lists:foreach(fun remove_subscriber/1, TopicSubs) end,
F = fun() ->
lists:foreach(
fun(QueueR) when is_record(QueueR, mqtt_queue) ->
remove_queue(QueueR);
(TopicSub) ->
remove_subscriber(TopicSub)
end, TopicSubs)
end,
case mnesia:transaction(F) of
{atomic, _} -> ok;
{aborted, Error} -> lager:error("unsubscribe ~p error: ~p", [Topics, Error])
@ -245,6 +295,16 @@ handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) ->
setstats(all),
{noreply, State};
handle_cast({unsubscribe, SubPid, <<"$Q/", _/binary>> = Queue}, State) ->
QueueR = #mqtt_queue{name = Queue, subpid = SubPid},
case mnesia:transaction(fun remove_queue/1, [QueueR]) of
{atomic, _} ->
setstats(queues);
{aborted, Error} ->
lager:error("unsubscribe queue ~s error: ~p", [Queue, Error])
end,
{noreply, State};
handle_cast({unsubscribe, SubPid, Topic}, State) ->
TopicR = #mqtt_topic{topic = Topic, node = node()},
Subscriber = #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid},
@ -264,6 +324,13 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa
true ->
Node = node(),
F = fun() ->
%% remove queue...
Queues = mnesia:match_object(queue, #mqtt_queue{subpid = DownPid, _ = '_'}, write),
lists:foreach(fun(QueueR) ->
mnesia:delete_object(queue, QueueR, write)
end, Queues),
%% remove subscribers...
Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.pid),
lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) ->
mnesia:delete_object(subscriber, Sub, write),
@ -302,6 +369,9 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%=============================================================================
add_queue(QueueR) ->
mnesia:write(queue, QueueR, write).
add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
case mnesia:wread({topic, Topic}) of
[] ->
@ -331,6 +401,14 @@ monitor_subscriber(SubPid, State = #state{submap = SubMap}) ->
end,
State#state{submap = NewSubMap}.
remove_queue(#mqtt_queue{name = Name, subpid = Pid}) ->
case mnesia:wread({queue, Name}) of
[R = #mqtt_queue{subpid = Pid}] ->
mnesia:delete(queue, R, write);
_ ->
ok
end.
remove_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) ->
[mnesia:delete_object(subscriber, Sub, write) ||
Sub <- mnesia:match_object(subscriber, Subscriber, write)],
@ -351,20 +429,24 @@ try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) ->
%%%=============================================================================
%%% Stats functions
%%%=============================================================================
setstats(all) ->
setstats(topics),
setstats(subscribers);
[setstats(Stat) || Stat <- [queues, topics, subscribers]];
setstats(queues) ->
emqttd_stats:setstats('queues/count', 'queues/max',
mnesia:table_info(queue, size));
setstats(topics) ->
emqttd_stats:setstat('topics/count',
emqttd_stats:setstat('topics/count', 'topics/max',
mnesia:table_info(topic, size));
setstats(subscribers) ->
emqttd_stats:setstats('subscribers/count',
'subscribers/max',
emqttd_stats:setstats('subscribers/count', 'subscribers/max',
mnesia:table_info(subscriber, size)).
%%TODO: queue dropped?
setstats(dropped, false) ->
ignore;
setstats(dropped, true) ->
emqttd_metrics:inc('messages/dropped').