fix topic record

This commit is contained in:
erylee 2012-12-24 11:59:02 +08:00
parent cd1e2c73d2
commit c082cbb27b
5 changed files with 79 additions and 69 deletions

0
INSTALL Executable file
View File

View File

View File

@ -26,11 +26,15 @@
-record(internal_user, {username, passwdhash}). -record(internal_user, {username, passwdhash}).
-record(topic, {words, path}). %name: <<"a/b/c">>
%node: node()
%words: [<<"a">>, <<"b">>, <<"c">>]
-record(topic, {name, node, words}).
%topic: topic name
-record(subscriber, {topic, qos, client, monref}). -record(subscriber, {topic, qos, client, monref}).
%% --------------------------------- %% ---------------------------------
%% Logging mechanism %% Logging mechanism

View File

@ -16,20 +16,19 @@
-define(CLIENT_ID_MAXLEN, 23). -define(CLIENT_ID_MAXLEN, 23).
-record(state, { socket, -record(state, {socket,
conn_name, conn_name,
await_recv, await_recv,
connection_state, connection_state,
conserve, conserve,
parse_state, parse_state,
message_id, message_id,
client_id, client_id,
clean_sess, clean_sess,
will_msg, will_msg,
keep_alive, keep_alive,
awaiting_ack, awaiting_ack,
subscriptions subscriptions}).
}).
-define(FRAME_TYPE(Frame, Type), -define(FRAME_TYPE(Frame, Type),
@ -144,35 +143,36 @@ async_recv(Sock, Length, infinity) when is_port(Sock) ->
async_recv(Sock, Length, Timeout) when is_port(Sock) -> async_recv(Sock, Length, Timeout) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, Timeout). prim_inet:async_recv(Sock, Length, Timeout).
%-------------------------------------------------------
% receive and parse tcp data
%-------------------------------------------------------
process_received_bytes(<<>>, State) -> process_received_bytes(<<>>, State) ->
{noreply, State}; {noreply, State};
process_received_bytes(Bytes, process_received_bytes(Bytes,
State = #state{ parse_state = ParseState, State = #state{ parse_state = ParseState,
conn_name = ConnStr }) -> conn_name = ConnStr }) ->
case case emqtt_frame:parse(Bytes, ParseState) of
emqtt_frame:parse(Bytes, ParseState) of {more, ParseState1} ->
{more, ParseState1} -> {noreply,
{noreply, control_throttle( State #state{ parse_state = ParseState1 }),
control_throttle( State #state{ parse_state = ParseState1 }), hibernate};
hibernate}; {ok, Frame, Rest} ->
{ok, Frame, Rest} -> case process_frame(Frame, State) of
case process_frame(Frame, State) of {ok, State1} ->
{ok, State1} -> PS = emqtt_frame:initial_state(),
PS = emqtt_frame:initial_state(), process_received_bytes(
process_received_bytes( Rest,
Rest, State1 #state{ parse_state = PS});
State1 #state{ parse_state = PS}); {err, Reason, State1} ->
{err, Reason, State1} -> ?ERROR("MQTT protocol error ~p for connection ~p~n", [Reason, ConnStr]),
?ERROR("MQTT protocol error ~p for connection ~p~n", stop({shutdown, Reason}, State1);
[Reason, ConnStr]), {stop, State1} ->
stop({shutdown, Reason}, State1); stop(normal, State1)
{stop, State1} -> end;
stop(normal, State1) {error, Error} ->
end; ?ERROR("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]),
{error, Error} -> stop({shutdown, Error}, State)
?ERROR("MQTT detected framing error ~p for connection ~p~n",
[ConnStr, Error]),
stop({shutdown, Error}, State)
end. end.
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},

View File

@ -4,11 +4,14 @@
-include("emqtt_frame.hrl"). -include("emqtt_frame.hrl").
-include_lib("stdlib/include/qlc.hrl").
-export([start_link/0]). -export([start_link/0]).
-export([subscribe/2, -export([subscribe/2,
unsubscribe/2, unsubscribe/2,
publish/2]). publish/2,
route/2]).
-behaviour(gen_server). -behaviour(gen_server).
@ -30,31 +33,39 @@ subscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) ->
unsubscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) -> unsubscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) ->
gen_server2:cast(?MODULE, {unsubscribe, Topic, Client}). gen_server2:cast(?MODULE, {unsubscribe, Topic, Client}).
%publish to cluster node.
publish(Topic, Msg) when is_binary(Topic) and is_record(Msg, mqtt_msg) -> publish(Topic, Msg) when is_binary(Topic) and is_record(Msg, mqtt_msg) ->
[ lists:foreach(fun(#topic{name=Name, node=Node}) ->
[Client ! {route, Msg} || #subscriber{client=Client} <- ets:lookup(subscriber, Path)] case Node == node() of
|| #topic{path=Path} <- match(Topic)]. true -> route(Name, Msg);
false -> rpc:call(Node, ?MODULE, route, [Name, Msg])
end
end, match(Topic)).
%route locally, should only be called by publish
route(Topic, Msg) ->
[Client ! {route, Msg} || #subscriber{client=Client} <- ets:lookup(subscriber, Topic)].
match(Topic) when is_binary(Topic) -> match(Topic) when is_binary(Topic) ->
Words = topic_split(Topic), DirectMatches = mnesia:dirty_read(direct_topic, Topic),
DirectMatches = mnesia:dirty_read(direct_topic, Words), TopicWords = topic_split(Topic),
WildcardMatches = lists:append([ WildcardQuery = qlc:q([T || T = #topic{words=Words}
mnesia:dirty_read(wildcard_topic, Key) || <- mnesia:table(wildcard_topic),
Key <- mnesia:dirty_all_keys(wildcard_topic), topic_match(TopicWords, Words)]), %
topic_match(Words, Key)
]), {atomic, WildcardMatches} = mnesia:transaction(fun() -> qlc:e(WildcardQuery) end), %mnesia:async_dirty(fun qlc:e/1, WildcardQuery),
?INFO("~p", [WildcardMatches]),
DirectMatches ++ WildcardMatches. DirectMatches ++ WildcardMatches.
init([]) -> init([]) ->
mnesia:create_table( mnesia:create_table(direct_topic, [
direct_topic, [ {type, bag},
{record_name, topic}, {record_name, topic},
{ram_copies, [node()]}, {ram_copies, [node()]},
{attributes, record_info(fields, topic)}]), {attributes, record_info(fields, topic)}]),
mnesia:add_table_copy(direct_topic, node(), ram_copies), mnesia:add_table_copy(direct_topic, node(), ram_copies),
mnesia:create_table( mnesia:create_table(wildcard_topic, [
wildcard_topic, [ {type, bag},
{record_name, topic}, {record_name, topic},
{ram_copies, [node()]}, {ram_copies, [node()]},
{attributes, record_info(fields, topic)}]), {attributes, record_info(fields, topic)}]),
@ -63,17 +74,17 @@ init([]) ->
?INFO_MSG("emqtt_router is started."), ?INFO_MSG("emqtt_router is started."),
{ok, #state{}}. {ok, #state{}}.
handle_call({subscribe, Topic, Client}, _From, State) -> handle_call({subscribe, Name, Client}, _From, State) ->
Words = topic_split(Topic), Topic = #topic{name=Name, node=node(), words=topic_split(Name)},
case topic_type(Words) of case topic_type(Topic) of
direct -> direct ->
ok = mnesia:dirty_write(direct_topic, #topic{words=Words, path=Topic}); ok = mnesia:dirty_write(direct_topic, Topic);
wildcard -> wildcard ->
ok = mnesia:dirty_write(wildcard_topic, #topic{words=Words, path=Topic}) ok = mnesia:dirty_write(wildcard_topic, Topic)
end, end,
Ref = erlang:monitor(process, Client), Ref = erlang:monitor(process, Client),
ets:insert(subscriber, #subscriber{topic=Topic, client=Client, monref=Ref}), ets:insert(subscriber, #subscriber{topic=Name, client=Client, monref=Ref}),
emqtt_retained:send(Topic, Client), emqtt_retained:send(Name, Client),
{reply, ok, State}; {reply, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
@ -111,7 +122,8 @@ code_change(_OldVsn, State, _Extra) ->
%-------------------------------------- %--------------------------------------
% internal functions % internal functions
%-------------------------------------- %--------------------------------------
topic_type(#topic{words=Words}) ->
topic_type(Words);
topic_type([]) -> topic_type([]) ->
direct; direct;
topic_type([<<"#">>]) -> topic_type([<<"#">>]) ->
@ -123,23 +135,17 @@ topic_type([_|T]) ->
topic_match([], []) -> topic_match([], []) ->
true; true;
topic_match([H|T1], [H|T2]) -> topic_match([H|T1], [H|T2]) ->
topic_match(T1, T2); topic_match(T1, T2);
topic_match([_H|T1], [<<"+">>|T2]) -> topic_match([_H|T1], [<<"+">>|T2]) ->
topic_match(T1, T2); topic_match(T1, T2);
topic_match(_, [<<"#">>]) -> topic_match(_, [<<"#">>]) ->
true; true;
topic_match([_H1|_], [_H2|_]) -> topic_match([_H1|_], [_H2|_]) ->
false; false;
topic_match([], [_H|_T2]) -> topic_match([], [_H|_T2]) ->
false. false.
topic_split(S) -> topic_split(S) ->
binary:split(S, [<<"/">>], [global]). binary:split(S, [<<"/">>], [global]).