diff --git a/include/emqtt.hrl b/include/emqtt.hrl index 109c492c0..a4872e26b 100644 --- a/include/emqtt.hrl +++ b/include/emqtt.hrl @@ -33,7 +33,7 @@ %topic: topic name --record(subscriber, {topic, qos, client, monref}). +-record(subscriber, {topic, qos, client}). %% --------------------------------- %% Logging mechanism diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index 6bff83dc7..4c8784341 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -28,7 +28,7 @@ will_msg, keep_alive, awaiting_ack, - subscriptions}). + subtopics}). -define(FRAME_TYPE(Frame, Type), @@ -58,7 +58,9 @@ handle_call({go, Sock}, _From, _State) -> ok = throw_on_error( inet_error, fun () -> emqtt_net:tune_buffer_size(Sock) end), {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), - error_logger:info_msg("accepting MQTT connection (~s)~n", [ConnStr]), + %FIXME: merge to registry + emqtt_client_monitor:mon(self()), + ?INFO("accepting MQTT connection (~s)~n", [ConnStr]), {reply, ok, control_throttle( #state{ socket = Sock, @@ -68,7 +70,7 @@ handle_call({go, Sock}, _From, _State) -> conserve = false, parse_state = emqtt_frame:initial_state(), message_id = 1, - subscriptions = dict:new(), + subtopics = [], awaiting_ack = gb_trees:empty()})}. handle_cast(Msg, State) -> @@ -240,53 +242,58 @@ process_request(?PUBLISH, dup = Dup, message_id = MessageId, payload = Payload }, - - emqtt_router:publish(Topic, Msg), - - %Retained? - retained(Retain, Topic, Msg), - + case emqtt_topic:validate({publish, Topic}) of + true -> + emqtt_router:publish(Topic, Msg), + %Retained? + retained(Retain, Topic, Msg); + false -> + ?ERROR("badtopic: ~p", [Topic]) + end, send_frame(Sock, - #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, - variable = #mqtt_frame_publish{ message_id = MsgId}}), + #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK }, + variable = #mqtt_frame_publish{ message_id = MsgId}}), {ok, State}; process_request(?SUBSCRIBE, #mqtt_frame{ - variable = #mqtt_frame_subscribe{ message_id = MessageId, - topic_table = Topics }, - payload = undefined }, - #state{socket=Sock} = State0) -> - QosResponse = - lists:foldl(fun (#mqtt_topic{ name = TopicName, - qos = Qos }, QosList) -> - SupportedQos = supported_subs_qos(Qos), - [SupportedQos | QosList] - end, [], Topics), + variable = #mqtt_frame_subscribe{message_id = MessageId, + topic_table = Topics}, + payload = undefined}, + #state{socket=Sock} = State) -> - [emqtt_router:subscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], + Topics1 = [Topic#mqtt_topic{qos=supported_subs_qos(Qos)} + || Topic = #mqtt_topic{name=Name, qos=Qos} <- Topics, + emqtt_topic:validate({subscribe, Name})], - send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK }, - variable = #mqtt_frame_suback{ - message_id = MessageId, - qos_table = QosResponse }}), + [emqtt_router:subscribe({Name, Qos}, self()) || + #mqtt_topic{name=Name, qos=Qos} <- Topics1], - {ok, State0}; + GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics1], + + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, + variable = #mqtt_frame_suback{ + message_id = MessageId, + qos_table = GrantedQos}}), + + {ok, State#state{subtopics=Topics1}}; process_request(?UNSUBSCRIBE, #mqtt_frame{ - variable = #mqtt_frame_subscribe{ message_id = MessageId, - topic_table = Topics }, - payload = undefined }, #state{ socket = Sock, client_id = ClientId, - subscriptions = Subs0} = State) -> + variable = #mqtt_frame_subscribe{message_id = MessageId, + topic_table = Topics }, + payload = undefined}, #state{socket = Sock, client_id = ClientId, + subtopics = Subs0} = State) -> - [emqtt_router:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], + [emqtt_router:unsubscribe(Name, self()) || + #mqtt_topic{name=Name} <- Topics, emqtt_topic:validate(Name)], - send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK }, - variable = #mqtt_frame_suback{ message_id = MessageId }}), + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK }, + variable = #mqtt_frame_suback{message_id = MessageId }}), - {ok, State #state{ subscriptions = Subs0 }}; + %TODO: fixme later + {ok, State #state{subtopics = Subs0}}; process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) -> %Keep alive timer diff --git a/src/emqtt_client_monitor.erl b/src/emqtt_client_monitor.erl new file mode 100644 index 000000000..cdc2c6597 --- /dev/null +++ b/src/emqtt_client_monitor.erl @@ -0,0 +1,66 @@ +-module(emqtt_client_monitor). + +-include("emqtt.hrl"). + +-export([start_link/0, mon/1]). + +-behaviour(gen_server). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {}). + +mon(Client) when is_pid(Client) -> + gen_server2:cast(?MODULE, {monitor, Client}). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + ets:new(clientmon, [set, protected, named_table]), + ets:new(clientmon_reverse, [set, protected, named_table]), + ?INFO("~p is started.", [?MODULE]), + {ok, #state{}}. + +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. + +handle_cast({monitor, Client}, State) -> + Ref = erlang:monitor(process, Client), + ets:insert(clientmon, {Client, Ref}), + ets:insert(clientmon_reverse, {Ref, Client}), + {noreply, State}; + +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. + +handle_info({'DOWN', MRef, _Type, _Object, _Info}, State) -> + case ets:lookup(clientmon_reverse, MRef) of + [{_, Client}] -> + emqtt_router:down(Client), + ets:delete(clientmon, Client), + ets:delete(clientmon_reverse, MRef); + [] -> + ignore + end, + {noreply, State}; + +handle_info(Info, State) -> + {stop, {badinfo, Info},State}. + + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + + diff --git a/src/emqtt_router.erl b/src/emqtt_router.erl index 9976ebb64..78f397504 100644 --- a/src/emqtt_router.erl +++ b/src/emqtt_router.erl @@ -8,10 +8,12 @@ -export([start_link/0]). --export([subscribe/2, +-export([topics/1, + subscribe/2, unsubscribe/2, publish/2, - route/2]). + route/2, + down/1]). -behaviour(gen_server). @@ -27,8 +29,14 @@ start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []). -subscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) -> - gen_server2:call(?MODULE, {subscribe, Topic, Client}). +topics(direct) -> + mnesia:dirty_all_keys(direct_topic); + +topics(wildcard) -> + mnesia:dirty_all_keys(wildcard_topic). + +subscribe({Topic, Qos}, Client) when is_pid(Client) -> + gen_server2:call(?MODULE, {subscribe, {Topic, Qos}, Client}). unsubscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) -> gen_server2:cast(?MODULE, {unsubscribe, Topic, Client}). @@ -48,15 +56,19 @@ route(Topic, Msg) -> match(Topic) when is_binary(Topic) -> DirectMatches = mnesia:dirty_read(direct_topic, Topic), - TopicWords = topic_split(Topic), + TopicWords = emqtt_topic:words(Topic), WildcardQuery = qlc:q([T || T = #topic{words=Words} <- mnesia:table(wildcard_topic), - topic_match(TopicWords, Words)]), % + emqtt_topic:match(TopicWords, Words)]), % - {atomic, WildcardMatches} = mnesia:transaction(fun() -> qlc:e(WildcardQuery) end), %mnesia:async_dirty(fun qlc:e/1, WildcardQuery), - ?INFO("~p", [WildcardMatches]), + {atomic, WildcardMatches} = mnesia:transaction(fun() -> qlc:e(WildcardQuery) end), + %mnesia:async_dirty(fun qlc:e/1, WildcardQuery), + %?INFO("~p", [WildcardMatches]), DirectMatches ++ WildcardMatches. +down(Client) when is_pid(Client) -> + gen_server2:cast(?MODULE, {down, Client}). + init([]) -> mnesia:create_table(direct_topic, [ {type, bag}, @@ -66,6 +78,7 @@ init([]) -> mnesia:add_table_copy(direct_topic, node(), ram_copies), mnesia:create_table(wildcard_topic, [ {type, bag}, + {index, [#topic.words]}, {record_name, topic}, {ram_copies, [node()]}, {attributes, record_info(fields, topic)}]), @@ -74,16 +87,15 @@ init([]) -> ?INFO_MSG("emqtt_router is started."), {ok, #state{}}. -handle_call({subscribe, Name, Client}, _From, State) -> - Topic = #topic{name=Name, node=node(), words=topic_split(Name)}, - case topic_type(Topic) of +handle_call({subscribe, {Name, Qos}, Client}, _From, State) -> + Topic = #topic{name=Name, node=node(), words=emqtt_topic:words(Name)}, + case emqtt_topic:type(Topic) of direct -> ok = mnesia:dirty_write(direct_topic, Topic); wildcard -> ok = mnesia:dirty_write(wildcard_topic, Topic) end, - Ref = erlang:monitor(process, Client), - ets:insert(subscriber, #subscriber{topic=Name, client=Client, monref=Ref}), + ets:insert(subscriber, #subscriber{topic=Name, qos=Qos, client=Client}), emqtt_retained:send(Name, Client), {reply, ok, State}; @@ -91,24 +103,23 @@ handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. handle_cast({unsubscribe, Topic, Client}, State) -> - ets:match_delete(subscriber, #subscriber{topic=Topic, client=Client}), - %TODO: how to remove topic - % - %Words = topic_split(Topic), - %case topic_type(Words) of - %direct -> - % mnesia:dirty_delete(direct_topic, #topic{words=Words, path=Topic}); - %wildcard -> - % mnesia:direct_delete(wildcard_topic, #topic{words=Words, path=Topic}) - %end, + ets:match_delete(subscriber, {subscriber, Topic, '_', Client}), + try_remove_topic(Topic), + {noreply, State}; + +handle_cast({down, Client}, State) -> + case ets:match_object(subscriber, {subscriber, '_', '_', Client}) of + [] -> + ignore; + Subs -> + [ets:delete_object(subscriber, Sub) || Sub <- Subs], + [try_remove_topic(Topic) || #subscriber{topic=Topic} <- Subs] + end, {noreply, State}; handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. -handle_info({'DOWN', MonitorRef, _Type, _Object, _Info}, State) -> - ets:match_delete(subscriber, #subscriber{monref=MonitorRef}), - {noreply, State}; handle_info(Info, State) -> {stop, {badinfo, Info}, State}. @@ -119,33 +130,19 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%-------------------------------------- -% internal functions -%-------------------------------------- -topic_type(#topic{words=Words}) -> - topic_type(Words); -topic_type([]) -> - direct; -topic_type([<<"#">>]) -> - wildcard; -topic_type([<<"+">>|_T]) -> - wildcard; -topic_type([_|T]) -> - topic_type(T). - -topic_match([], []) -> - true; -topic_match([H|T1], [H|T2]) -> - topic_match(T1, T2); -topic_match([_H|T1], [<<"+">>|T2]) -> - topic_match(T1, T2); -topic_match(_, [<<"#">>]) -> - true; -topic_match([_H1|_], [_H2|_]) -> - false; -topic_match([], [_H|_T2]) -> - false. - -topic_split(S) -> - binary:split(S, [<<"/">>], [global]). +%% ------------------------------------------------------------------------ +%% internal functions +%% ------------------------------------------------------------------------ +try_remove_topic(Name) -> + case ets:member(subscriber, Name) of + false -> + Topic = emqtt_topic:new(Name), + case emqtt_topic:type(Topic) of + direct -> + mnesia:dirty_delete_object(direct_topic, Topic); + wildcard -> + mnesia:dirty_delete_object(wildcard_topic, Topic) + end; + true -> ok + end. diff --git a/src/emqtt_sup.erl b/src/emqtt_sup.erl index 5bdb3699a..b257bf1aa 100644 --- a/src/emqtt_sup.erl +++ b/src/emqtt_sup.erl @@ -30,6 +30,7 @@ init([Listeners]) -> ?CHILD(emqtt_retained, worker), ?CHILD(emqtt_router, worker), ?CHILD(emqtt_registry, worker), + ?CHILD(emqtt_client_monitor, worker), ?CHILD(emqtt_client_sup, supervisor) | listener_children(Listeners) ]} }. diff --git a/src/emqtt_topic.erl b/src/emqtt_topic.erl new file mode 100644 index 000000000..6fb4b3e8a --- /dev/null +++ b/src/emqtt_topic.erl @@ -0,0 +1,115 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% Developer of the eMQTT Code is +%% Copyright (c) 2012 Ery Lee. All rights reserved. +%% +-module(emqtt_topic). + +%% ------------------------------------------------------------------------ +%% Topic semantics and usage +%% ------------------------------------------------------------------------ +%% A topic must be at least one character long. +%% +%% Topic names are case sensitive. For example, ACCOUNTS and Accounts are two different topics. +%% +%% Topic names can include the space character. For example, Accounts payable is a valid topic. +%% +%% A leading "/" creates a distinct topic. For example, /finance is different from finance. /finance matches "+/+" and "/+", but not "+". +%% +%% Do not include the null character (Unicode \x0000) in any topic. +%% +%% The following principles apply to the construction and content of a topic tree: +%% +%% The length is limited to 64k but within that there are no limits to the number of levels in a topic tree. +%% +%% There can be any number of root nodes; that is, there can be any number of topic trees. +%% ------------------------------------------------------------------------ + +-include("emqtt.hrl"). + +-export([new/1, type/1, match/2, validate/1, words/1]). + +-export([test/0]). + +-define(MAX_LEN, 64*1024). + +new(Name) when is_binary(Name) -> + #topic{name=Name, node=node(), words=words(Name)}. + +%% ------------------------------------------------------------------------ +%% topic type: direct or wildcard +%% ------------------------------------------------------------------------ +type(#topic{words=Words}) -> + type(Words); +type([]) -> + direct; +type([<<"#">>]) -> + wildcard; +type([<<"+">>|_T]) -> + wildcard; +type([_|T]) -> + type(T). + +%% ------------------------------------------------------------------------ +%% topic match +%% ------------------------------------------------------------------------ +match([], []) -> + true; +match([H|T1], [H|T2]) -> + match(T1, T2); +match([_H|T1], [<<"+">>|T2]) -> + match(T1, T2); +match(_, [<<"#">>]) -> + true; +match([_H1|_], [_H2|_]) -> + false; +match([], [_H|_T2]) -> + false. + + +%% ------------------------------------------------------------------------ +%% topic validate +%% ------------------------------------------------------------------------ +validate({_, <<>>}) -> + false; +validate({_, Topic}) when size(Topic) > ?MAX_LEN -> + false; +validate({subscribe, Topic}) when is_binary(Topic) -> + valid(words(Topic)); +validate({publish, Topic}) when is_binary(Topic) -> + Words = words(Topic), + valid(Words) and (not include_wildcard(Words)). + +words(Topic) when is_binary(Topic) -> + binary:split(Topic, [<<"/">>], [global]). + +valid([<<>>|Words]) -> valid2(Words); +valid(Words) -> valid2(Words). + +valid2([<<>>|_Words]) -> false; +valid2([<<"#">>|Words]) when length(Words) > 0 -> false; +valid2([_|Words]) -> valid2(Words); +valid2([]) -> true. + +include_wildcard([]) -> false; +include_wildcard([<<"#">>|_T]) -> true; +include_wildcard([<<"+">>|_T]) -> true; +include_wildcard([_H|T]) -> include_wildcard(T). + + +test() -> + true = validate({subscribe, <<"a/b/c">>}), + true = validate({subscribe, <<"/a/b">>}), + true = validate({subscribe, <<"/+/x">>}), + true = validate({subscribe, <<"/a/b/c/#">>}), + false = validate({subscribe, <<"a/#/c">>}), + ok. +