diff --git a/INSTALL b/INSTALL new file mode 100755 index 000000000..e69de29bb diff --git a/README.md b/README similarity index 100% rename from README.md rename to README diff --git a/include/emqtt.hrl b/include/emqtt.hrl index 2d3c57029..109c492c0 100644 --- a/include/emqtt.hrl +++ b/include/emqtt.hrl @@ -26,11 +26,15 @@ -record(internal_user, {username, passwdhash}). --record(topic, {words, path}). +%name: <<"a/b/c">> +%node: node() +%words: [<<"a">>, <<"b">>, <<"c">>] +-record(topic, {name, node, words}). + +%topic: topic name -record(subscriber, {topic, qos, client, monref}). - %% --------------------------------- %% Logging mechanism diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index ea2eb9772..6bff83dc7 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -16,20 +16,19 @@ -define(CLIENT_ID_MAXLEN, 23). --record(state, { socket, - conn_name, - await_recv, - connection_state, - conserve, - parse_state, - message_id, - client_id, - clean_sess, - will_msg, - keep_alive, - awaiting_ack, - subscriptions - }). +-record(state, {socket, + conn_name, + await_recv, + connection_state, + conserve, + parse_state, + message_id, + client_id, + clean_sess, + will_msg, + keep_alive, + awaiting_ack, + subscriptions}). -define(FRAME_TYPE(Frame, Type), @@ -144,35 +143,36 @@ async_recv(Sock, Length, infinity) when is_port(Sock) -> async_recv(Sock, Length, Timeout) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, Timeout). +%------------------------------------------------------- +% receive and parse tcp data +%------------------------------------------------------- process_received_bytes(<<>>, State) -> {noreply, State}; + process_received_bytes(Bytes, State = #state{ parse_state = ParseState, conn_name = ConnStr }) -> - case - emqtt_frame:parse(Bytes, ParseState) of - {more, ParseState1} -> - {noreply, - control_throttle( State #state{ parse_state = ParseState1 }), - hibernate}; - {ok, Frame, Rest} -> - case process_frame(Frame, State) of - {ok, State1} -> - PS = emqtt_frame:initial_state(), - process_received_bytes( - Rest, - State1 #state{ parse_state = PS}); - {err, Reason, State1} -> - ?ERROR("MQTT protocol error ~p for connection ~p~n", - [Reason, ConnStr]), - stop({shutdown, Reason}, State1); - {stop, State1} -> - stop(normal, State1) - end; - {error, Error} -> - ?ERROR("MQTT detected framing error ~p for connection ~p~n", - [ConnStr, Error]), - stop({shutdown, Error}, State) + case emqtt_frame:parse(Bytes, ParseState) of + {more, ParseState1} -> + {noreply, + control_throttle( State #state{ parse_state = ParseState1 }), + hibernate}; + {ok, Frame, Rest} -> + case process_frame(Frame, State) of + {ok, State1} -> + PS = emqtt_frame:initial_state(), + process_received_bytes( + Rest, + State1 #state{ parse_state = PS}); + {err, Reason, State1} -> + ?ERROR("MQTT protocol error ~p for connection ~p~n", [Reason, ConnStr]), + stop({shutdown, Reason}, State1); + {stop, State1} -> + stop(normal, State1) + end; + {error, Error} -> + ?ERROR("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), + stop({shutdown, Error}, State) end. process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, diff --git a/src/emqtt_router.erl b/src/emqtt_router.erl index 4a743a3d7..9976ebb64 100644 --- a/src/emqtt_router.erl +++ b/src/emqtt_router.erl @@ -4,11 +4,14 @@ -include("emqtt_frame.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + -export([start_link/0]). -export([subscribe/2, unsubscribe/2, - publish/2]). + publish/2, + route/2]). -behaviour(gen_server). @@ -30,31 +33,39 @@ subscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) -> unsubscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) -> gen_server2:cast(?MODULE, {unsubscribe, Topic, Client}). +%publish to cluster node. publish(Topic, Msg) when is_binary(Topic) and is_record(Msg, mqtt_msg) -> - [ - [Client ! {route, Msg} || #subscriber{client=Client} <- ets:lookup(subscriber, Path)] - || #topic{path=Path} <- match(Topic)]. + lists:foreach(fun(#topic{name=Name, node=Node}) -> + case Node == node() of + true -> route(Name, Msg); + false -> rpc:call(Node, ?MODULE, route, [Name, Msg]) + end + end, match(Topic)). +%route locally, should only be called by publish +route(Topic, Msg) -> + [Client ! {route, Msg} || #subscriber{client=Client} <- ets:lookup(subscriber, Topic)]. match(Topic) when is_binary(Topic) -> - Words = topic_split(Topic), - DirectMatches = mnesia:dirty_read(direct_topic, Words), - WildcardMatches = lists:append([ - mnesia:dirty_read(wildcard_topic, Key) || - Key <- mnesia:dirty_all_keys(wildcard_topic), - topic_match(Words, Key) - ]), + DirectMatches = mnesia:dirty_read(direct_topic, Topic), + TopicWords = topic_split(Topic), + WildcardQuery = qlc:q([T || T = #topic{words=Words} + <- mnesia:table(wildcard_topic), + topic_match(TopicWords, Words)]), % + + {atomic, WildcardMatches} = mnesia:transaction(fun() -> qlc:e(WildcardQuery) end), %mnesia:async_dirty(fun qlc:e/1, WildcardQuery), + ?INFO("~p", [WildcardMatches]), DirectMatches ++ WildcardMatches. init([]) -> - mnesia:create_table( - direct_topic, [ + mnesia:create_table(direct_topic, [ + {type, bag}, {record_name, topic}, {ram_copies, [node()]}, {attributes, record_info(fields, topic)}]), mnesia:add_table_copy(direct_topic, node(), ram_copies), - mnesia:create_table( - wildcard_topic, [ + mnesia:create_table(wildcard_topic, [ + {type, bag}, {record_name, topic}, {ram_copies, [node()]}, {attributes, record_info(fields, topic)}]), @@ -63,17 +74,17 @@ init([]) -> ?INFO_MSG("emqtt_router is started."), {ok, #state{}}. -handle_call({subscribe, Topic, Client}, _From, State) -> - Words = topic_split(Topic), - case topic_type(Words) of +handle_call({subscribe, Name, Client}, _From, State) -> + Topic = #topic{name=Name, node=node(), words=topic_split(Name)}, + case topic_type(Topic) of direct -> - ok = mnesia:dirty_write(direct_topic, #topic{words=Words, path=Topic}); + ok = mnesia:dirty_write(direct_topic, Topic); wildcard -> - ok = mnesia:dirty_write(wildcard_topic, #topic{words=Words, path=Topic}) + ok = mnesia:dirty_write(wildcard_topic, Topic) end, Ref = erlang:monitor(process, Client), - ets:insert(subscriber, #subscriber{topic=Topic, client=Client, monref=Ref}), - emqtt_retained:send(Topic, Client), + ets:insert(subscriber, #subscriber{topic=Name, client=Client, monref=Ref}), + emqtt_retained:send(Name, Client), {reply, ok, State}; handle_call(Req, _From, State) -> @@ -111,7 +122,8 @@ code_change(_OldVsn, State, _Extra) -> %-------------------------------------- % internal functions %-------------------------------------- - +topic_type(#topic{words=Words}) -> + topic_type(Words); topic_type([]) -> direct; topic_type([<<"#">>]) -> @@ -123,23 +135,17 @@ topic_type([_|T]) -> topic_match([], []) -> true; - topic_match([H|T1], [H|T2]) -> topic_match(T1, T2); - topic_match([_H|T1], [<<"+">>|T2]) -> topic_match(T1, T2); - topic_match(_, [<<"#">>]) -> true; - topic_match([_H1|_], [_H2|_]) -> false; - topic_match([], [_H|_T2]) -> false. topic_split(S) -> binary:split(S, [<<"/">>], [global]). -