support topic cluster

This commit is contained in:
erylee 2012-12-24 16:59:12 +08:00
parent 39ab7df38a
commit 76169eb446
6 changed files with 277 additions and 91 deletions

View File

@ -33,7 +33,7 @@
%topic: topic name
-record(subscriber, {topic, qos, client, monref}).
-record(subscriber, {topic, qos, client}).
%% ---------------------------------
%% Logging mechanism

View File

@ -28,7 +28,7 @@
will_msg,
keep_alive,
awaiting_ack,
subscriptions}).
subtopics}).
-define(FRAME_TYPE(Frame, Type),
@ -58,7 +58,9 @@ handle_call({go, Sock}, _From, _State) ->
ok = throw_on_error(
inet_error, fun () -> emqtt_net:tune_buffer_size(Sock) end),
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
error_logger:info_msg("accepting MQTT connection (~s)~n", [ConnStr]),
%FIXME: merge to registry
emqtt_client_monitor:mon(self()),
?INFO("accepting MQTT connection (~s)~n", [ConnStr]),
{reply, ok,
control_throttle(
#state{ socket = Sock,
@ -68,7 +70,7 @@ handle_call({go, Sock}, _From, _State) ->
conserve = false,
parse_state = emqtt_frame:initial_state(),
message_id = 1,
subscriptions = dict:new(),
subtopics = [],
awaiting_ack = gb_trees:empty()})}.
handle_cast(Msg, State) ->
@ -240,53 +242,58 @@ process_request(?PUBLISH,
dup = Dup,
message_id = MessageId,
payload = Payload },
emqtt_router:publish(Topic, Msg),
%Retained?
retained(Retain, Topic, Msg),
case emqtt_topic:validate({publish, Topic}) of
true ->
emqtt_router:publish(Topic, Msg),
%Retained?
retained(Retain, Topic, Msg);
false ->
?ERROR("badtopic: ~p", [Topic])
end,
send_frame(Sock,
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{ message_id = MsgId}}),
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State};
process_request(?SUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{ message_id = MessageId,
topic_table = Topics },
payload = undefined },
#state{socket=Sock} = State0) ->
QosResponse =
lists:foldl(fun (#mqtt_topic{ name = TopicName,
qos = Qos }, QosList) ->
SupportedQos = supported_subs_qos(Qos),
[SupportedQos | QosList]
end, [], Topics),
variable = #mqtt_frame_subscribe{message_id = MessageId,
topic_table = Topics},
payload = undefined},
#state{socket=Sock} = State) ->
[emqtt_router:subscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
Topics1 = [Topic#mqtt_topic{qos=supported_subs_qos(Qos)}
|| Topic = #mqtt_topic{name=Name, qos=Qos} <- Topics,
emqtt_topic:validate({subscribe, Name})],
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK },
variable = #mqtt_frame_suback{
message_id = MessageId,
qos_table = QosResponse }}),
[emqtt_router:subscribe({Name, Qos}, self()) ||
#mqtt_topic{name=Name, qos=Qos} <- Topics1],
{ok, State0};
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics1],
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
variable = #mqtt_frame_suback{
message_id = MessageId,
qos_table = GrantedQos}}),
{ok, State#state{subtopics=Topics1}};
process_request(?UNSUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{ message_id = MessageId,
topic_table = Topics },
payload = undefined }, #state{ socket = Sock, client_id = ClientId,
subscriptions = Subs0} = State) ->
variable = #mqtt_frame_subscribe{message_id = MessageId,
topic_table = Topics },
payload = undefined}, #state{socket = Sock, client_id = ClientId,
subtopics = Subs0} = State) ->
[emqtt_router:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
[emqtt_router:unsubscribe(Name, self()) ||
#mqtt_topic{name=Name} <- Topics, emqtt_topic:validate(Name)],
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK },
variable = #mqtt_frame_suback{ message_id = MessageId }}),
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK },
variable = #mqtt_frame_suback{message_id = MessageId }}),
{ok, State #state{ subscriptions = Subs0 }};
%TODO: fixme later
{ok, State #state{subtopics = Subs0}};
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) ->
%Keep alive timer

View File

@ -0,0 +1,66 @@
-module(emqtt_client_monitor).
-include("emqtt.hrl").
-export([start_link/0, mon/1]).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {}).
mon(Client) when is_pid(Client) ->
gen_server2:cast(?MODULE, {monitor, Client}).
%%----------------------------------------------------------------------------
start_link() ->
gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
ets:new(clientmon, [set, protected, named_table]),
ets:new(clientmon_reverse, [set, protected, named_table]),
?INFO("~p is started.", [?MODULE]),
{ok, #state{}}.
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({monitor, Client}, State) ->
Ref = erlang:monitor(process, Client),
ets:insert(clientmon, {Client, Ref}),
ets:insert(clientmon_reverse, {Ref, Client}),
{noreply, State};
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info({'DOWN', MRef, _Type, _Object, _Info}, State) ->
case ets:lookup(clientmon_reverse, MRef) of
[{_, Client}] ->
emqtt_router:down(Client),
ets:delete(clientmon, Client),
ets:delete(clientmon_reverse, MRef);
[] ->
ignore
end,
{noreply, State};
handle_info(Info, State) ->
{stop, {badinfo, Info},State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -8,10 +8,12 @@
-export([start_link/0]).
-export([subscribe/2,
-export([topics/1,
subscribe/2,
unsubscribe/2,
publish/2,
route/2]).
route/2,
down/1]).
-behaviour(gen_server).
@ -27,8 +29,14 @@
start_link() ->
gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
subscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) ->
gen_server2:call(?MODULE, {subscribe, Topic, Client}).
topics(direct) ->
mnesia:dirty_all_keys(direct_topic);
topics(wildcard) ->
mnesia:dirty_all_keys(wildcard_topic).
subscribe({Topic, Qos}, Client) when is_pid(Client) ->
gen_server2:call(?MODULE, {subscribe, {Topic, Qos}, Client}).
unsubscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) ->
gen_server2:cast(?MODULE, {unsubscribe, Topic, Client}).
@ -48,15 +56,19 @@ route(Topic, Msg) ->
match(Topic) when is_binary(Topic) ->
DirectMatches = mnesia:dirty_read(direct_topic, Topic),
TopicWords = topic_split(Topic),
TopicWords = emqtt_topic:words(Topic),
WildcardQuery = qlc:q([T || T = #topic{words=Words}
<- mnesia:table(wildcard_topic),
topic_match(TopicWords, Words)]), %
emqtt_topic:match(TopicWords, Words)]), %
{atomic, WildcardMatches} = mnesia:transaction(fun() -> qlc:e(WildcardQuery) end), %mnesia:async_dirty(fun qlc:e/1, WildcardQuery),
?INFO("~p", [WildcardMatches]),
{atomic, WildcardMatches} = mnesia:transaction(fun() -> qlc:e(WildcardQuery) end),
%mnesia:async_dirty(fun qlc:e/1, WildcardQuery),
%?INFO("~p", [WildcardMatches]),
DirectMatches ++ WildcardMatches.
down(Client) when is_pid(Client) ->
gen_server2:cast(?MODULE, {down, Client}).
init([]) ->
mnesia:create_table(direct_topic, [
{type, bag},
@ -66,6 +78,7 @@ init([]) ->
mnesia:add_table_copy(direct_topic, node(), ram_copies),
mnesia:create_table(wildcard_topic, [
{type, bag},
{index, [#topic.words]},
{record_name, topic},
{ram_copies, [node()]},
{attributes, record_info(fields, topic)}]),
@ -74,16 +87,15 @@ init([]) ->
?INFO_MSG("emqtt_router is started."),
{ok, #state{}}.
handle_call({subscribe, Name, Client}, _From, State) ->
Topic = #topic{name=Name, node=node(), words=topic_split(Name)},
case topic_type(Topic) of
handle_call({subscribe, {Name, Qos}, Client}, _From, State) ->
Topic = #topic{name=Name, node=node(), words=emqtt_topic:words(Name)},
case emqtt_topic:type(Topic) of
direct ->
ok = mnesia:dirty_write(direct_topic, Topic);
wildcard ->
ok = mnesia:dirty_write(wildcard_topic, Topic)
end,
Ref = erlang:monitor(process, Client),
ets:insert(subscriber, #subscriber{topic=Name, client=Client, monref=Ref}),
ets:insert(subscriber, #subscriber{topic=Name, qos=Qos, client=Client}),
emqtt_retained:send(Name, Client),
{reply, ok, State};
@ -91,24 +103,23 @@ handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({unsubscribe, Topic, Client}, State) ->
ets:match_delete(subscriber, #subscriber{topic=Topic, client=Client}),
%TODO: how to remove topic
%
%Words = topic_split(Topic),
%case topic_type(Words) of
%direct ->
% mnesia:dirty_delete(direct_topic, #topic{words=Words, path=Topic});
%wildcard ->
% mnesia:direct_delete(wildcard_topic, #topic{words=Words, path=Topic})
%end,
ets:match_delete(subscriber, {subscriber, Topic, '_', Client}),
try_remove_topic(Topic),
{noreply, State};
handle_cast({down, Client}, State) ->
case ets:match_object(subscriber, {subscriber, '_', '_', Client}) of
[] ->
ignore;
Subs ->
[ets:delete_object(subscriber, Sub) || Sub <- Subs],
[try_remove_topic(Topic) || #subscriber{topic=Topic} <- Subs]
end,
{noreply, State};
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info({'DOWN', MonitorRef, _Type, _Object, _Info}, State) ->
ets:match_delete(subscriber, #subscriber{monref=MonitorRef}),
{noreply, State};
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
@ -119,33 +130,19 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%--------------------------------------
% internal functions
%--------------------------------------
topic_type(#topic{words=Words}) ->
topic_type(Words);
topic_type([]) ->
direct;
topic_type([<<"#">>]) ->
wildcard;
topic_type([<<"+">>|_T]) ->
wildcard;
topic_type([_|T]) ->
topic_type(T).
topic_match([], []) ->
true;
topic_match([H|T1], [H|T2]) ->
topic_match(T1, T2);
topic_match([_H|T1], [<<"+">>|T2]) ->
topic_match(T1, T2);
topic_match(_, [<<"#">>]) ->
true;
topic_match([_H1|_], [_H2|_]) ->
false;
topic_match([], [_H|_T2]) ->
false.
topic_split(S) ->
binary:split(S, [<<"/">>], [global]).
%% ------------------------------------------------------------------------
%% internal functions
%% ------------------------------------------------------------------------
try_remove_topic(Name) ->
case ets:member(subscriber, Name) of
false ->
Topic = emqtt_topic:new(Name),
case emqtt_topic:type(Topic) of
direct ->
mnesia:dirty_delete_object(direct_topic, Topic);
wildcard ->
mnesia:dirty_delete_object(wildcard_topic, Topic)
end;
true -> ok
end.

View File

@ -30,6 +30,7 @@ init([Listeners]) ->
?CHILD(emqtt_retained, worker),
?CHILD(emqtt_router, worker),
?CHILD(emqtt_registry, worker),
?CHILD(emqtt_client_monitor, worker),
?CHILD(emqtt_client_sup, supervisor)
| listener_children(Listeners) ]}
}.

115
src/emqtt_topic.erl Normal file
View File

@ -0,0 +1,115 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% Developer of the eMQTT Code is <ery.lee@gmail.com>
%% Copyright (c) 2012 Ery Lee. All rights reserved.
%%
-module(emqtt_topic).
%% ------------------------------------------------------------------------
%% Topic semantics and usage
%% ------------------------------------------------------------------------
%% A topic must be at least one character long.
%%
%% Topic names are case sensitive. For example, ACCOUNTS and Accounts are two different topics.
%%
%% Topic names can include the space character. For example, Accounts payable is a valid topic.
%%
%% A leading "/" creates a distinct topic. For example, /finance is different from finance. /finance matches "+/+" and "/+", but not "+".
%%
%% Do not include the null character (Unicode \x0000) in any topic.
%%
%% The following principles apply to the construction and content of a topic tree:
%%
%% The length is limited to 64k but within that there are no limits to the number of levels in a topic tree.
%%
%% There can be any number of root nodes; that is, there can be any number of topic trees.
%% ------------------------------------------------------------------------
-include("emqtt.hrl").
-export([new/1, type/1, match/2, validate/1, words/1]).
-export([test/0]).
-define(MAX_LEN, 64*1024).
new(Name) when is_binary(Name) ->
#topic{name=Name, node=node(), words=words(Name)}.
%% ------------------------------------------------------------------------
%% topic type: direct or wildcard
%% ------------------------------------------------------------------------
type(#topic{words=Words}) ->
type(Words);
type([]) ->
direct;
type([<<"#">>]) ->
wildcard;
type([<<"+">>|_T]) ->
wildcard;
type([_|T]) ->
type(T).
%% ------------------------------------------------------------------------
%% topic match
%% ------------------------------------------------------------------------
match([], []) ->
true;
match([H|T1], [H|T2]) ->
match(T1, T2);
match([_H|T1], [<<"+">>|T2]) ->
match(T1, T2);
match(_, [<<"#">>]) ->
true;
match([_H1|_], [_H2|_]) ->
false;
match([], [_H|_T2]) ->
false.
%% ------------------------------------------------------------------------
%% topic validate
%% ------------------------------------------------------------------------
validate({_, <<>>}) ->
false;
validate({_, Topic}) when size(Topic) > ?MAX_LEN ->
false;
validate({subscribe, Topic}) when is_binary(Topic) ->
valid(words(Topic));
validate({publish, Topic}) when is_binary(Topic) ->
Words = words(Topic),
valid(Words) and (not include_wildcard(Words)).
words(Topic) when is_binary(Topic) ->
binary:split(Topic, [<<"/">>], [global]).
valid([<<>>|Words]) -> valid2(Words);
valid(Words) -> valid2(Words).
valid2([<<>>|_Words]) -> false;
valid2([<<"#">>|Words]) when length(Words) > 0 -> false;
valid2([_|Words]) -> valid2(Words);
valid2([]) -> true.
include_wildcard([]) -> false;
include_wildcard([<<"#">>|_T]) -> true;
include_wildcard([<<"+">>|_T]) -> true;
include_wildcard([_H|T]) -> include_wildcard(T).
test() ->
true = validate({subscribe, <<"a/b/c">>}),
true = validate({subscribe, <<"/a/b">>}),
true = validate({subscribe, <<"/+/x">>}),
true = validate({subscribe, <<"/a/b/c/#">>}),
false = validate({subscribe, <<"a/#/c">>}),
ok.