rewrite pubsub

This commit is contained in:
Ery Lee 2015-04-14 13:31:25 +08:00
parent e47e3c1fa8
commit d311a058cc
7 changed files with 205 additions and 158 deletions

View File

@ -41,27 +41,6 @@
%%------------------------------------------------------------------------------
-type pubsub() :: publish | subscribe.
%%------------------------------------------------------------------------------
%% MQTT Topic
%%------------------------------------------------------------------------------
-record(mqtt_topic, {
name :: binary(),
node :: node()
}).
-type mqtt_topic() :: #mqtt_topic{}.
%%------------------------------------------------------------------------------
%% MQTT Subscriber
%%------------------------------------------------------------------------------
-record(mqtt_subscriber, {
topic :: binary(),
qos = 0 :: 0 | 1 | 2,
subpid :: pid()
}).
-type mqtt_subscriber() :: #mqtt_subscriber{}.
%%------------------------------------------------------------------------------
%% MQTT Client
%%------------------------------------------------------------------------------

View File

@ -45,6 +45,7 @@
-define(QOS_0, 0).
-define(QOS_1, 1).
-define(QOS_2, 2).
-define(QOS_ERR, 128).
-define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)).

View File

@ -0,0 +1,48 @@
%%------------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
%%% @doc
%%% emqtt topic header.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% MQTT Topic
%%------------------------------------------------------------------------------
-record(topic, {
name :: binary(),
node :: node()
}).
-type topic() :: #topic{}.
%%------------------------------------------------------------------------------
%% MQTT Topic Subscriber
%%------------------------------------------------------------------------------
-record(topic_subscriber, {
topic :: binary(),
qos = 0 :: 0 | 1 | 2,
subpid :: pid()
}).
-type topic_subscriber() :: #topic_subscriber{}.

View File

@ -85,27 +85,9 @@ init_tables() ->
%%------------------------------------------------------------------------------
create_tables() ->
%% trie tree tables
ok = create_table(topic_trie_node, [
{ram_copies, [node()]},
{record_name, topic_trie_node},
{attributes, record_info(fields, topic_trie_node)}]),
ok = create_table(topic_trie, [
{ram_copies, [node()]},
{record_name, topic_trie},
{attributes, record_info(fields, topic_trie)}]),
%% topic table
ok = create_table(topic, [
{type, bag},
{ram_copies, [node()]},
{record_name, topic},
{attributes, record_info(fields, topic)}]),
%% local subscriber table, not shared with other nodes
ok = create_table(topic_subscriber, [
{type, bag},
{ram_copies, [node()]},
{attributes, record_info(fields, topic_subscriber)},
{index, [subpid]},
{local_content, true}]),
%%TODO: should use module 'mnesia_create' attribute...
ok = emqttd_trie:mnesia(create),
ok = emqttd_pubsub:mnesia(create),
%% TODO: retained messages, this table should not be copied...
ok = create_table(message_retained, [
{type, ordered_set},

View File

@ -20,7 +20,7 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd core pubsub.
%%% emqttd pubsub.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
@ -30,12 +30,20 @@
-include("emqttd.hrl").
-include("emqttd_topic.hrl").
-include("emqttd_packet.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
%% Mnesia Callbacks
-export([mnesia/1]).
-mnesia_create({mnesia, [create]}).
-mnesia_replicate({mnesia, [replicate]}).
%% API Exports
-export([start_link/0]).
@ -49,7 +57,30 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {}).
-record(state, {submap :: map()}).
%%%=============================================================================
%%% Mnesia callbacks
%%%=============================================================================
mnesia(create) ->
%% topic table
ok = emqttd_mnesia:create_table(topic, [
{type, bag},
{ram_copies, [node()]},
{record_name, topic},
{attributes, record_info(fields, topic)}]),
%% local subscriber table, not shared with other nodes
ok = emqttd_mnesia:create_table(topic_subscriber, [
{type, bag},
{ram_copies, [node()]},
{record_name, topic_subscriber},
{attributes, record_info(fields, topic_subscriber)},
{index, [subpid]},
{local_content, true}]);
mnesia(replicate) ->
ok = emqttd_mnesia:copy_table(topic),
ok = emqttd_mnesia:copy_table(topic_subscriber).
%%%=============================================================================
%%% API
@ -84,13 +115,20 @@ create(Topic) when is_binary(Topic) ->
-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} when
Topic :: binary(),
Qos :: mqtt_qos().
subscribe(Topics = [{Topic, Qos}|_]) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
subscribe2(Topics, []).
subscribe(Topics = [{_Topic, _Qos}|_]) ->
{ok, lists:map(fun({Topic, Qos}) ->
case subscribe(Topic, Qos) of
{ok, GrantedQos} ->
GrantedQos;
Error ->
lager:error("Failed to subscribe '~s': ~p", [Topic, Error]), ?QOS_ERR
end
end, Topics)}.
-spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()}.
subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
TopicRecord = emqttd_topic:new(Topic),
Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, subpid = self()},
Subscriber = #topic_subscriber{topic = Topic, qos = Qos, subpid = self()},
F = fun() ->
case insert_topic(TopicRecord) of
ok -> insert_subscriber(Subscriber);
@ -99,20 +137,9 @@ subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
end,
case mnesia:transaction(F) of
{atomic, ok} -> {ok, Qos};
Error -> Error
{aborted, Reason} -> {error, Reason}
end.
subscribe2([], QosAcc) ->
{ok, lists:reverse(QosAcc)};
subscribe2([{Topic, Qos}|Topics], Acc) ->
case subscribe(Topic, Qos) of
{ok, GrantedQos} ->
subscribe2(Topics, [GrantedQos|Acc]);
Error ->
Error
end.
%%------------------------------------------------------------------------------
%% @doc
%% Unsubscribe Topic or Topics
@ -121,24 +148,17 @@ subscribe2([{Topic, Qos}|Topics], Acc) ->
%%------------------------------------------------------------------------------
-spec unsubscribe(binary() | list(binary())) -> ok.
unsubscribe(Topic) when is_binary(Topic) ->
unsubscribe([Topic]);
unsubscribe(Topics = [Topic|_]) when is_list(Topics) and is_binary(Topic) ->
unsubscribe(Topics, self()).
unsubscribe(Topics, SubPid) ->
SubPid = self(),
TopicRecord = emqttd_topic:new(Topic),
F = fun() ->
Subscribers = mnesia:index_read(topic_subscriber, SubPid, #topic_subscriber.subpid),
lists:foreach(fun(Sub = #topic_subscriber{topic = Topic}) ->
case lists:member(Topic, Topics) of
true -> mneisa:delete_object(Sub);
false -> ok
end
end, Subscribers)
%TODO: try to remove topic??? if topic is dynamic...
%%try_remove_topic(Topic)
Pattern = #topic_subscriber{topic = Topic, _ = '_', subpid = SubPid},
[mnesia:delete_object(Sub) || Sub <- mnesia:match_object(Pattern)],
try_remove_topic(TopicRecord)
end,
{atomic, _} = mneisa:transaction(F), ok.
{atomic, _} = mneisa:transaction(F), ok;
unsubscribe(Topics = [Topic|_]) when is_binary(Topic) ->
lists:foreach(fun(T) -> unsubscribe(T) end, Topics).
%%------------------------------------------------------------------------------
%% @doc
@ -152,14 +172,12 @@ publish(Msg=#mqtt_message{topic=Topic}) ->
-spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any().
publish(Topic, Msg) when is_binary(Topic) ->
Count =
lists:foldl(fun(#topic{name=Name, node=Node}, Acc) ->
lists:foreach(fun(#topic{name=Name, node=Node}) ->
case Node =:= node() of
true -> dispatch(Name, Msg) + Acc;
false -> rpc:call(Node, ?MODULE, dispatch, [Name, Msg]) + Acc
true -> dispatch(Name, Msg);
false -> rpc:cast(Node, ?MODULE, dispatch, [Name, Msg])
end
end, 0, match(Topic)),
dropped(Count =:= 0).
end, match(Topic)).
%%------------------------------------------------------------------------------
%% @doc
@ -169,7 +187,11 @@ publish(Topic, Msg) when is_binary(Topic) ->
%%------------------------------------------------------------------------------
-spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
Subscribers = mnesia:dirty_read(topic_subscriber, Topic),
case mnesia:dirty_read(topic_subscriber, Topic) of
[] ->
%%TODO: not right when clusted...
setstats(dropped);
Subscribers ->
lists:foreach(
fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) ->
Msg1 = if
@ -177,8 +199,8 @@ dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
true -> Msg
end,
SubPid ! {dispatch, {self(), Msg1}}
end, Subscribers),
length(Subscribers).
end, Subscribers)
end.
%%------------------------------------------------------------------------------
%% @doc
@ -189,60 +211,78 @@ dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
%%------------------------------------------------------------------------------
-spec match(Topic :: binary()) -> [topic()].
match(Topic) when is_binary(Topic) ->
TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqttd_topic:words(Topic)]),
Names = [Name || #topic_trie_node{topic=Name} <- TrieNodes, Name=/= undefined],
lists:flatten([mnesia:dirty_read(topic, Name) || Name <- Names]).
MatchedTopics = mnesia:async_dirty(fun emqttd_trie:find/1, [Topic]),
lists:flatten([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([]) ->
%%TODO: really need?
process_flag(priority, high),
process_flag(min_heap_size, 1024*1024),
mnesia:subscribe({table, topic, simple}),
%% trie and topic tables, will be copied by all nodes.
mnesia:subscribe({table, topic_subscriber, simple}),
{ok, #state{}}.
{ok, #state{submap = maps:new()}}.
handle_call(Req, _From, State) ->
lager:error("Bad Req: ~p", [Req]),
{reply, error, State}.
lager:error("Bad Request: ~p", [Req]),
{reply, {error, badreq}, State}.
handle_cast(Msg, State) ->
lager:error("Bad Msg: ~p", [Msg]),
{noreply, State}.
%% a new record has been written.
handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, State) ->
%%TODO: rewrite...
erlang:monitor(process, Pid),
upstats(subscriber),
handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}},
State = #state{submap = SubMap}) ->
case maps:is_key(Pid, SubMap) of
false ->
maps:put(Pid, erlang:monitor(process, Pid));
true ->
ignore
end,
setstats(subscribers),
{noreply, State};
%% TODO:...
handle_info({mnesia_table_event, {write, #topic{}, _ActivityId}}, State) ->
upstats(topic),
%%TODO: this is not right when clusterd.
setstats(topics),
{noreply, State};
%% {write, #topic{}, _ActivityId}
%% {delete_object, _OldRecord, _ActivityId}
%% {delete, {Tab, Key}, ActivityId}
handle_info({mnesia_table_event, _Event}, State) ->
upstats(),
setstats(topics),
setstats(subscribers),
{noreply, State};
handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMap}) ->
case maps:is_key(DownPid, SubMap) of
true ->
Node = node(),
F = fun() ->
%%TODO: how to read with write lock?
[mnesia:delete_object(Sub) || Sub <- mnesia:index_read(topic_subscriber, DownPid, #topic_subscriber.subpid)]
%%TODO: try to remove dynamic topics without subscribers
%% [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs]
Subscribers = mnesia:index_read(topic_subscriber, DownPid, #topic_subscriber.subpid),
lists:foreach(fun(Sub = #topic_subscriber{topic = Topic}) ->
mnesia:delete_object(Sub),
try_remove_topic(#topic{name = Topic, node = Node})
end, Subscribers)
end,
NewState =
case catch mnesia:transaction(F) of
{atomic, _} -> ok;
{aborted, Reason} -> lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason])
{atomic, _} ->
State#state{submap = maps:remove(DownPid, SubMap)};
{aborted, Reason} ->
lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]),
State
end,
upstats(),
{noreply, State};
setstats(topics), setstats(subscribers),
{noreply, NewState};
false ->
lager:error("Unexpected 'DOWN' from ~p", [DownPid]),
{noreply, State}
end;
handle_info(Info, State) ->
lager:error("Unexpected Info: ~p", [Info]),
@ -260,27 +300,10 @@ code_change(_OldVsn, State, _Extra) ->
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
%%try_remove_topic(Name) when is_binary(Name) ->
%% case ets:member(topic_subscriber, Name) of
%% false ->
%% Topic = emqttd_topic:new(Name),
%% Fun = fun() ->
%% mnesia:delete_object(Topic),
%% case mnesia:read(topic, Name) of
%% [] -> trie_delete(Name);
%% _ -> ignore
%% end
%% end,
%% mnesia:transaction(Fun);
%% true ->
%% ok
%% end.
%%
insert_topic(Topic = #mqtt_topic{name = Name, node = Node}) ->
insert_topic(Topic = #topic{name = Name}) ->
case mnesia:wread(topic, Name) of
[] ->
trie_add(Name),
ok = emqttd_trie:insert(Name),
mnesia:write(Topic);
Topics ->
case lists:member(Topic, Topics) of
@ -289,20 +312,30 @@ insert_topic(Topic = #mqtt_topic{name = Name, node = Node}) ->
end
end.
insert_subscriber(Subscriber) ->
mnesia:write(Subscriber).
upstats() ->
upstats(topic), upstats(subscribe).
try_remove_topic(Topic = #topic{name = Name}) ->
%%TODO: is this ok in transaction?
case ets:member(topic_subscriber, Name) of
false ->
mnesia:delete_object(Topic),
case mnesia:read(topic, Name) of
[] -> emqttd_trie:delete(Name);
_ -> ok
end;
true ->
ok
end.
upstats(topic) ->
setstats(topics) ->
emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size));
upstats(subscribe) ->
setstats(subscribers) ->
emqttd_broker:setstats('subscribers/count',
'subscribers/max',
mnesia:table_info(topic_subscriber, size)).
mnesia:table_info(topic_subscriber, size));
setstats(dropped) ->
emqttd_metrics:inc('messages/dropped').
dropped(true) ->
emqttd_metrics:inc('messages/dropped');
dropped(false) ->
ok.

View File

@ -28,7 +28,7 @@
-author('feng@emqtt.io').
-include("emqttd.hrl").
-include("emqttd_topic.hrl").
-import(lists, [reverse/1]).
@ -52,9 +52,9 @@
%%
%% @end
%%%-----------------------------------------------------------------------------
-spec new(binary()) -> mqtt_topic().
-spec new(binary()) -> topic().
new(Name) when is_binary(Name) ->
#mqtt_topic{name = Name, node = node()}.
#topic{name = Name, node = node()}.
%%%-----------------------------------------------------------------------------
%% @doc
@ -62,8 +62,8 @@ new(Name) when is_binary(Name) ->
%%
%% @end
%%%-----------------------------------------------------------------------------
-spec wildcard(mqtt_topic() | binary()) -> true | false.
wildcard(#mqtt_topic{name = Name}) when is_binary(Name) ->
-spec wildcard(topic() | binary()) -> true | false.
wildcard(#topic{name = Name}) when is_binary(Name) ->
wildcard(Name);
wildcard(Topic) when is_binary(Topic) ->
wildcard(words(Topic));

View File

@ -99,6 +99,7 @@ mnesia(replicate) ->
%%
%% @end
%%------------------------------------------------------------------------------
-spec insert(Topic :: binary()) -> ok.
insert(Topic) when is_binary(Topic) ->
case mnesia:read(trie_node, Topic) of
[#trie_node{topic=Topic}] ->
@ -118,8 +119,10 @@ insert(Topic) when is_binary(Topic) ->
%%
%% @end
%%------------------------------------------------------------------------------
-spec find(Topic :: binary()) -> list(MatchedTopic :: binary()).
find(Topic) when is_binary(Topic) ->
match_node(root, emqttd_topic:words(Topic), []).
TrieNodes = match_node(root, emqttd_topic:words(Topic), []),
[Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined].
%%------------------------------------------------------------------------------
%% @doc
@ -127,6 +130,7 @@ find(Topic) when is_binary(Topic) ->
%%
%% @end
%%------------------------------------------------------------------------------
-spec delete(Topic :: binary()) -> ok.
delete(Topic) when is_binary(Topic) ->
case mnesia:read(trie_node, Topic) of
[#trie_node{edge_count=0}] ->
@ -135,7 +139,7 @@ delete(Topic) when is_binary(Topic) ->
[TrieNode] ->
mnesia:write(TrieNode#trie_node{topic=Topic});
[] ->
ignore
ok
end.
%%%=============================================================================