This commit is contained in:
Feng 2015-12-10 15:04:13 +08:00
parent cb4ad87010
commit 2dd6d160f3
21 changed files with 275 additions and 293 deletions

View File

@ -1,11 +1,11 @@
% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
%% ex: ft=erlang ts=4 sw=4 et
[{kernel,
[{start_timer, true},
{start_pg2, true}
[{start_timer, true},
{start_pg2, true}
]},
{sasl, [
{sasl_error_logger, {file, "log/emqttd_sasl.log"}}
{sasl_error_logger, {file, "log/emqttd_sasl.log"}}
]},
{ssl, [
%{versions, ['tlsv1.2', 'tlsv1.1']}
@ -13,29 +13,29 @@
{lager, [
{colored, true},
{async_threshold, 1000},
{error_logger_redirect, false},
{crash_log, "log/emqttd_crash.log"},
{handlers, [
{lager_console_backend, info},
{error_logger_redirect, false},
{crash_log, "log/emqttd_crash.log"},
{handlers, [
{lager_console_backend, info},
%%NOTICE: Level >= error
%%{lager_emqtt_backend, error},
{lager_file_backend, [
%%{lager_emqtt_backend, error},
{lager_file_backend, [
{formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]},
{file, "log/emqttd_info.log"},
{level, info},
{size, 104857600},
{date, "$D0"},
{count, 30}
]},
{lager_file_backend, [
{file, "log/emqttd_info.log"},
{level, info},
{size, 104857600},
{date, "$D0"},
{count, 30}
]},
{lager_file_backend, [
{formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]},
{file, "log/emqttd_error.log"},
{level, error},
{size, 104857600},
{date, "$D0"},
{count, 30}
]}
]}
{file, "log/emqttd_error.log"},
{level, error},
{size, 104857600},
{date, "$D0"},
{count, 30}
]}
]}
]},
{esockd, [
{logger, {lager, info}}
@ -278,7 +278,6 @@
%% Erlang System Monitor
{sysmon, [
%% Long GC
{long_gc, 100},

View File

@ -1,11 +1,11 @@
% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
%% ex: ft=erlang ts=4 sw=4 et
[{kernel,
[{start_timer, true},
{start_pg2, true}
[{kernel,
[{start_timer, true},
{start_pg2, true}
]},
{sasl, [
{sasl_error_logger, {file, "log/emqttd_sasl.log"}}
{sasl_error_logger, {file, "log/emqttd_sasl.log"}}
]},
{ssl, [
%{versions, ['tlsv1.2', 'tlsv1.1']}
@ -13,21 +13,21 @@
{lager, [
{colored, true},
{async_threshold, 5000},
{error_logger_redirect, false},
{crash_log, "log/emqttd_crash.log"},
{handlers, [
%%{lager_console_backend, info},
{error_logger_redirect, false},
{crash_log, "log/emqttd_crash.log"},
{handlers, [
%%{lager_console_backend, info},
%%NOTICE: Level >= error
%%{lager_emqtt_backend, error},
{lager_file_backend, [
%%{lager_emqtt_backend, error},
{lager_file_backend, [
{formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]},
{file, "log/emqttd_error.log"},
{level, error},
{size, 104857600},
{date, "$D0"},
{count, 30}
]}
]}
{file, "log/emqttd_error.log"},
{level, error},
{size, 104857600},
{date, "$D0"},
{count, 30}
]}
]}
]},
{esockd, [
{logger, {lager, error}}
@ -270,7 +270,6 @@
%% Erlang System Monitor
{sysmon, [
%% Long GC, don't monitor in production mode for:
%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
{long_gc, false},

View File

@ -31,12 +31,11 @@
is_running/1]).
-define(MQTT_SOCKOPTS, [
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, true}
]).
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, true}]).
-define(APP, ?MODULE).
@ -83,12 +82,12 @@ start_listener({mqtts, Port, Options}) ->
%% Start http listener
start_listener({http, Port, Options}) ->
MFArgs = {emqttd_http, handle_request, []},
mochiweb:start_http(Port, Options, MFArgs);
mochiweb:start_http(Port, Options, MFArgs);
%% Start https listener
start_listener({https, Port, Options}) ->
MFArgs = {emqttd_http, handle_request, []},
mochiweb:start_http(Port, Options, MFArgs).
mochiweb:start_http(Port, Options, MFArgs).
start_listener(Protocol, Port, Options) ->
MFArgs = {emqttd_client, start_link, [env(mqtt)]},

View File

@ -157,11 +157,11 @@ stop() ->
%%%=============================================================================
init([Opts]) ->
ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, Opts))}),
ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, Opts))}),
{ok, state}.
{ok, state}.
init_mods(auth, AuthMods) ->
[init_mod(fun authmod/1, Name, Opts) || {Name, Opts} <- AuthMods];
@ -215,16 +215,16 @@ handle_call(Req, _From, State) ->
{reply, {error, badreq}, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
{noreply, State}.
terminate(_Reason, _State) ->
ok.
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
{ok, State}.
%%%=============================================================================
%%% Internal functions

View File

@ -68,7 +68,7 @@ add_clientid(ClientId, Password) ->
%%------------------------------------------------------------------------------
-spec lookup_clientid(binary()) -> list().
lookup_clientid(ClientId) ->
mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId).
mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId).
%%------------------------------------------------------------------------------
%% @doc Lookup all clientids
@ -76,7 +76,7 @@ lookup_clientid(ClientId) ->
%%------------------------------------------------------------------------------
-spec all_clientids() -> list(binary()).
all_clientids() ->
mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB).
mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB).
%%------------------------------------------------------------------------------
%% @doc Remove clientid
@ -91,15 +91,15 @@ remove_clientid(ClientId) ->
%%%=============================================================================
init(Opts) ->
mnesia:create_table(?AUTH_CLIENTID_TAB, [
{ram_copies, [node()]},
{attributes, record_info(fields, ?AUTH_CLIENTID_TAB)}]),
mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies),
mnesia:create_table(?AUTH_CLIENTID_TAB, [
{ram_copies, [node()]},
{attributes, record_info(fields, ?AUTH_CLIENTID_TAB)}]),
mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies),
case proplists:get_value(file, Opts) of
undefined -> ok;
File -> load(File)
end,
{ok, Opts}.
{ok, Opts}.
check(#mqtt_client{client_id = undefined}, _Password, []) ->
{error, "ClientId undefined"};

View File

@ -99,10 +99,10 @@ all_users() ->
%%% emqttd_auth callbacks
%%%=============================================================================
init(Opts) ->
mnesia:create_table(?AUTH_USERNAME_TAB, [
{disc_copies, [node()]},
{attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]),
mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), disc_copies),
mnesia:create_table(?AUTH_USERNAME_TAB, [
{disc_copies, [node()]},
{attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]),
mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), disc_copies),
emqttd_ctl:register_cmd(users, {?MODULE, cli}, []),
{ok, Opts}.
@ -111,7 +111,7 @@ check(#mqtt_client{username = undefined}, _Password, _Opts) ->
check(_User, undefined, _Opts) ->
{error, "Password undefined"};
check(#mqtt_client{username = Username}, Password, _Opts) ->
case mnesia:dirty_read(?AUTH_USERNAME_TAB, Username) of
case mnesia:dirty_read(?AUTH_USERNAME_TAB, Username) of
[] ->
{error, "Username Not Found"};
[#?AUTH_USERNAME_TAB{password = <<Salt:4/binary, Hash/binary>>}] ->
@ -119,8 +119,8 @@ check(#mqtt_client{username = Username}, Password, _Opts) ->
true -> ok;
false -> {error, "Password Not Right"}
end
end.
end.
description() ->
"Username password authentication module".

View File

@ -84,7 +84,7 @@ stop_bridge(Node, SubTopic) ->
supervisor:delete_child(?MODULE, ChildId);
{error, Reason} ->
{error, Reason}
end.
end.
%%%=============================================================================
%%% Supervisor callbacks

View File

@ -71,7 +71,7 @@ lookup(ClientId) when is_binary(ClientId) ->
case ets:lookup(mqtt_client, ClientId) of
[Client] -> Client;
[] -> undefined
end.
end.
%%------------------------------------------------------------------------------
%% @doc Lookup client pid by clientId
@ -136,10 +136,10 @@ handle_cast({register, Client = #mqtt_client{client_id = ClientId,
_ ->
ets:insert(mqtt_client, Client),
{noreply, setstats(monitor_client(ClientId, Pid, State))}
end;
end;
handle_cast({unregister, ClientId, Pid}, State) ->
case lookup_proc(ClientId) of
case lookup_proc(ClientId) of
Pid ->
ets:delete(mqtt_client, ClientId),
{noreply, setstats(State)};

View File

@ -53,8 +53,8 @@ handle_request('GET', "/status", Req) ->
handle_request('POST', "/mqtt/publish", Req) ->
Params = mochiweb_request:parse_post(Req),
lager:info("HTTP Publish: ~p", [Params]),
case authorized(Req) of
true ->
case authorized(Req) of
true ->
ClientId = get_value("client", Params, http),
Qos = int(get_value("qos", Params, "0")),
Retain = bool(get_value("retain", Params, "0")),
@ -70,9 +70,9 @@ handle_request('POST', "/mqtt/publish", Req) ->
{_, false} ->
Req:respond({400, [], <<"Bad Topic">>})
end;
false ->
Req:respond({401, [], <<"Fobbiden">>})
end;
false ->
Req:respond({401, [], <<"Fobbiden">>})
end;
%%------------------------------------------------------------------------------
%% MQTT Over WebSocket
@ -101,16 +101,16 @@ handle_request('GET', "/" ++ File, Req) ->
handle_request(Method, Path, Req) ->
lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]),
Req:not_found().
Req:not_found().
%%------------------------------------------------------------------------------
%% basic authorization
%%------------------------------------------------------------------------------
authorized(Req) ->
case Req:get_header_value("Authorization") of
undefined ->
false;
"Basic " ++ BasicAuth ->
undefined ->
false;
"Basic " ++ BasicAuth ->
{Username, Password} = user_passwd(BasicAuth),
case emqttd_access_control:auth(#mqtt_client{username = Username}, Password) of
ok ->
@ -119,10 +119,10 @@ authorized(Req) ->
lager:error("HTTP Auth failure: username=~s, reason=~p", [Username, Reason]),
false
end
end.
end.
user_passwd(BasicAuth) ->
list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
validate(qos, Qos) ->
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);

View File

@ -42,7 +42,7 @@ start(_, 0, _) ->
undefined;
start(StatFun, TimeoutSec, TimeoutMsg) ->
{ok, StatVal} = StatFun(),
#keepalive{statfun = StatFun, statval = StatVal,
#keepalive{statfun = StatFun, statval = StatVal,
tsec = TimeoutSec, tmsg = TimeoutMsg,
tref = timer(TimeoutSec, TimeoutMsg)}.
@ -76,9 +76,9 @@ resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
cancel(#keepalive{tref = TRef}) ->
cancel(TRef);
cancel(undefined) ->
ok;
ok;
cancel(TRef) ->
catch erlang:cancel_timer(TRef).
catch erlang:cancel_timer(TRef).
timer(Sec, Msg) ->
erlang:send_after(timer:seconds(Sec), self(), Msg).

View File

@ -118,7 +118,7 @@ to_packet(#mqtt_message{pktid = PkgId,
topic = Topic,
payload = Payload}) ->
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = Qos,
retain = Retain,
dup = Dup},

View File

@ -46,14 +46,14 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(ChildSpec) when is_tuple(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec).
supervisor:start_child(?MODULE, ChildSpec).
%%
%% start_child(Mod::atom(), Type::type()) -> {ok, pid()}
%% @type type() = worker | supervisor
%%
start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
%%%=============================================================================
%%% Supervisor callbacks

View File

@ -156,17 +156,17 @@ tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
list_to_atom(
lists:flatten(
io_lib:format(
"~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port]))).
io_lib:format(
"~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port]))).
connection_string(Sock, Direction) ->
case socket_ends(Sock, Direction) of
{ok, {FromAddress, FromPort, ToAddress, ToPort}} ->
{ok, lists:flatten(
io_lib:format(
"~s:~p -> ~s:~p",
[maybe_ntoab(FromAddress), FromPort,
maybe_ntoab(ToAddress), ToPort]))};
io_lib:format(
"~s:~p -> ~s:~p",
[maybe_ntoab(FromAddress), FromPort,
maybe_ntoab(ToAddress), ToPort]))};
Error ->
Error
end.

View File

@ -62,7 +62,7 @@
init(Peername, SendFun, Opts) ->
MaxLen = emqttd_opts:g(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
WsInitialHeaders = emqttd_opts:g(ws_initial_headers, Opts),
#proto_state{peername = Peername,
#proto_state{peername = Peername,
sendfun = SendFun,
max_clientid_len = MaxLen,
client_pid = self(),
@ -118,12 +118,12 @@ received(_Packet, State = #proto_state{connected = false}) ->
received(Packet = ?PACKET(_Type), State) ->
trace(recv, Packet, State),
case validate_packet(Packet) of
case validate_packet(Packet) of
ok ->
process(Packet, State);
{error, Reason} ->
{error, Reason, State}
end.
end.
process(Packet = ?CONNECT_PACKET(Var), State0) ->
@ -185,7 +185,7 @@ process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
deny ->
?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State)
end,
{ok, State};
{ok, State};
process(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
emqttd_session:puback(Session, PacketId),
@ -254,7 +254,7 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
-spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
send(Msg, State) when is_record(Msg, mqtt_message) ->
send(emqttd_message:to_packet(Msg), State);
send(emqttd_message:to_packet(Msg), State);
send(Packet, State = #proto_state{sendfun = SendFun})
when is_record(Packet, mqtt_packet) ->
@ -352,10 +352,10 @@ validate_clientid(#mqtt_packet_connect{proto_ver = ProtoVer,
false.
validate_packet(?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload)) ->
case emqttd_topic:validate({name, Topic}) of
case emqttd_topic:validate({name, Topic}) of
true -> ok;
false -> {error, badtopic}
end;
end;
validate_packet(?SUBSCRIBE_PACKET(_PacketId, TopicTable)) ->
validate_topics(filter, TopicTable);
@ -374,16 +374,16 @@ validate_topics(Type, TopicTable = [{_Topic, _Qos}|_])
Valid = fun(Topic, Qos) ->
emqttd_topic:validate({Type, Topic}) and validate_qos(Qos)
end,
case [Topic || {Topic, Qos} <- TopicTable, not Valid(Topic, Qos)] of
case [Topic || {Topic, Qos} <- TopicTable, not Valid(Topic, Qos)] of
[] -> ok;
_ -> {error, badtopic}
end;
end;
validate_topics(Type, Topics = [Topic0|_]) when is_binary(Topic0) ->
case [Topic || Topic <- Topics, not emqttd_topic:validate({Type, Topic})] of
case [Topic || Topic <- Topics, not emqttd_topic:validate({Type, Topic})] of
[] -> ok;
_ -> {error, badtopic}
end.
end.
validate_qos(undefined) ->
true;

View File

@ -224,7 +224,7 @@ publish(Msg = #mqtt_message{from = From}) ->
end.
publish(To, Msg) ->
lists:foreach(fun(#mqtt_topic{topic = Topic, node = Node}) ->
lists:foreach(fun(#mqtt_topic{topic = Topic, node = Node}) ->
case Node =:= node() of
true -> ?ROUTER:route(Topic, Msg);
false -> rpc:cast(Node, ?ROUTER, route, [Topic, Msg])
@ -237,9 +237,9 @@ publish(To, Msg) ->
%%------------------------------------------------------------------------------
-spec match(binary()) -> [mqtt_topic()].
match(To) ->
MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]),
MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]),
%% ets:lookup for topic table will be replicated.
lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]).
lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]).
%%%=============================================================================
%%% gen_server callbacks

View File

@ -83,7 +83,7 @@
old_client_pid :: pid(),
%% Last packet id of the session
packet_id = 1,
packet_id = 1,
%% Clients subscriptions.
subscriptions :: dict:dict(),
@ -198,7 +198,7 @@ publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) ->
publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) ->
%% publish qos1 directly, and client will puback automatically
emqttd_pubsub:publish(Msg);
emqttd_pubsub:publish(Msg);
publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) ->
%% publish qos2 by session
@ -701,9 +701,9 @@ timer(TimeoutSec, TimeoutMsg) ->
erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg).
cancel_timer(undefined) ->
undefined;
undefined;
cancel_timer(Ref) ->
catch erlang:cancel_timer(Ref).
catch erlang:cancel_timer(Ref).
noreply(State) ->
{noreply, State}.

View File

@ -47,14 +47,14 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(ChildSpec) when is_tuple(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec).
supervisor:start_child(?MODULE, ChildSpec).
%%
%% start_child(Mod::atom(), Type::type()) -> {ok, pid()}
%% @type type() = worker | supervisor
%%
start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
%%%=============================================================================
%%% Supervisor callbacks

View File

@ -49,7 +49,7 @@
%%%-----------------------------------------------------------------------------
-spec wildcard(binary()) -> true | false.
wildcard(Topic) when is_binary(Topic) ->
wildcard(words(Topic));
wildcard(words(Topic));
wildcard([]) ->
false;
wildcard(['#'|_]) ->
@ -67,25 +67,25 @@ wildcard([_H|T]) ->
Name :: binary() | words(),
Filter :: binary() | words().
match(Name, Filter) when is_binary(Name) and is_binary(Filter) ->
match(words(Name), words(Filter));
match(words(Name), words(Filter));
match([], []) ->
true;
true;
match([H|T1], [H|T2]) ->
match(T1, T2);
match(T1, T2);
match([<<$$, _/binary>>|_], ['+'|_]) ->
false;
match([_H|T1], ['+'|T2]) ->
match(T1, T2);
match(T1, T2);
match([<<$$, _/binary>>|_], ['#']) ->
false;
match(_, ['#']) ->
true;
true;
match([_H1|_], [_H2|_]) ->
false;
false;
match([_H1|_], []) ->
false;
false;
match([], [_H|_T2]) ->
false.
false.
%%------------------------------------------------------------------------------
%% @doc Validate Topic
@ -93,14 +93,14 @@ match([], [_H|_T2]) ->
%%------------------------------------------------------------------------------
-spec validate({name | filter, binary()}) -> boolean().
validate({_, <<>>}) ->
false;
false;
validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) ->
false;
false;
validate({filter, Topic}) when is_binary(Topic) ->
validate2(words(Topic));
validate2(words(Topic));
validate({name, Topic}) when is_binary(Topic) ->
Words = words(Topic),
validate2(Words) and (not wildcard(Words)).
Words = words(Topic),
validate2(Words) and (not wildcard(Words)).
validate2([]) ->
true;
@ -131,7 +131,7 @@ validate3(<<_/utf8, Rest/binary>>) ->
%%%-----------------------------------------------------------------------------
-spec triples(binary()) -> list(triple()).
triples(Topic) when is_binary(Topic) ->
triples(words(Topic), root, []).
triples(words(Topic), root, []).
triples([], _Parent, Acc) ->
reverse(Acc);

View File

@ -20,7 +20,7 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% MQTT Topic Trie Tree.
%%% MQTT Topic Trie.
%%%
%%% [Trie](http://en.wikipedia.org/wiki/Trie)
%%%
@ -42,19 +42,19 @@
-type node_id() :: binary() | atom().
-record(trie_node, {
node_id :: node_id(),
node_id :: node_id(),
edge_count = 0 :: non_neg_integer(),
topic :: binary() | undefined
topic :: binary() | undefined
}).
-record(trie_edge, {
node_id :: node_id(),
word :: binary() | atom()
node_id :: node_id(),
word :: binary() | atom()
}).
-record(trie, {
edge :: #trie_edge{},
node_id :: node_id()
edge :: #trie_edge{},
node_id :: node_id()
}).
%%%=============================================================================
@ -98,17 +98,17 @@ mnesia(copy) ->
%%------------------------------------------------------------------------------
-spec insert(Topic :: binary()) -> ok.
insert(Topic) when is_binary(Topic) ->
case mnesia:read(trie_node, Topic) of
[#trie_node{topic=Topic}] ->
case mnesia:read(trie_node, Topic) of
[#trie_node{topic=Topic}] ->
ok;
[TrieNode=#trie_node{topic=undefined}] ->
mnesia:write(TrieNode#trie_node{topic=Topic});
[] ->
%add trie path
[add_path(Triple) || Triple <- emqttd_topic:triples(Topic)],
%add last node
mnesia:write(#trie_node{node_id=Topic, topic=Topic})
end.
[TrieNode=#trie_node{topic=undefined}] ->
mnesia:write(TrieNode#trie_node{topic=Topic});
[] ->
%add trie path
[add_path(Triple) || Triple <- emqttd_topic:triples(Topic)],
%add last node
mnesia:write(#trie_node{node_id=Topic, topic=Topic})
end.
%%------------------------------------------------------------------------------
%% @doc Find trie nodes that match topic
@ -117,23 +117,23 @@ insert(Topic) when is_binary(Topic) ->
-spec match(Topic :: binary()) -> list(MatchedTopic :: binary()).
match(Topic) when is_binary(Topic) ->
TrieNodes = match_node(root, emqttd_topic:words(Topic)),
[Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined].
[Name || #trie_node{topic=Name} <- TrieNodes, Name =/= undefined].
%%------------------------------------------------------------------------------
%% @doc Delete topic from trie tree
%% @doc Delete topic from trie
%% @end
%%------------------------------------------------------------------------------
-spec delete(Topic :: binary()) -> ok.
delete(Topic) when is_binary(Topic) ->
case mnesia:read(trie_node, Topic) of
[#trie_node{edge_count=0}] ->
mnesia:delete({trie_node, Topic}),
delete_path(lists:reverse(emqttd_topic:triples(Topic)));
[TrieNode] ->
mnesia:write(TrieNode#trie_node{topic=Topic});
[] ->
ok
end.
case mnesia:read(trie_node, Topic) of
[#trie_node{edge_count=0}] ->
mnesia:delete({trie_node, Topic}),
delete_path(lists:reverse(emqttd_topic:triples(Topic)));
[TrieNode] ->
mnesia:write(TrieNode#trie_node{topic=Topic});
[] ->
ok
end.
%%%=============================================================================
%%% Internal functions
@ -147,21 +147,21 @@ delete(Topic) when is_binary(Topic) ->
%% @end
%%------------------------------------------------------------------------------
add_path({Node, Word, Child}) ->
Edge = #trie_edge{node_id=Node, word=Word},
case mnesia:read(trie_node, Node) of
[TrieNode = #trie_node{edge_count=Count}] ->
case mnesia:wread({trie, Edge}) of
[] ->
mnesia:write(TrieNode#trie_node{edge_count=Count+1}),
mnesia:write(#trie{edge=Edge, node_id=Child});
[_] ->
ok
end;
[] ->
mnesia:write(#trie_node{node_id=Node, edge_count=1}),
mnesia:write(#trie{edge=Edge, node_id=Child})
end.
Edge = #trie_edge{node_id=Node, word=Word},
case mnesia:read(trie_node, Node) of
[TrieNode = #trie_node{edge_count=Count}] ->
case mnesia:wread({trie, Edge}) of
[] ->
mnesia:write(TrieNode#trie_node{edge_count=Count+1}),
mnesia:write(#trie{edge=Edge, node_id=Child});
[_] ->
ok
end;
[] ->
mnesia:write(#trie_node{node_id=Node, edge_count=1}),
mnesia:write(#trie{edge=Edge, node_id=Child})
end.
%%------------------------------------------------------------------------------
%% @doc
%% @private
@ -177,15 +177,15 @@ match_node(NodeId, Words) ->
match_node(NodeId, Words, []).
match_node(NodeId, [], ResAcc) ->
mnesia:read(trie_node, NodeId) ++ 'match_#'(NodeId, ResAcc);
mnesia:read(trie_node, NodeId) ++ 'match_#'(NodeId, ResAcc);
match_node(NodeId, [W|Words], ResAcc) ->
lists:foldl(fun(WArg, Acc) ->
case mnesia:read(trie, #trie_edge{node_id=NodeId, word=WArg}) of
[#trie{node_id=ChildId}] -> match_node(ChildId, Words, Acc);
[] -> Acc
end
end, 'match_#'(NodeId, ResAcc), [W, '+']).
lists:foldl(fun(WArg, Acc) ->
case mnesia:read(trie, #trie_edge{node_id=NodeId, word=WArg}) of
[#trie{node_id=ChildId}] -> match_node(ChildId, Words, Acc);
[] -> Acc
end
end, 'match_#'(NodeId, ResAcc), [W, '+']).
%%------------------------------------------------------------------------------
%% @doc
@ -195,12 +195,12 @@ match_node(NodeId, [W|Words], ResAcc) ->
%% @end
%%------------------------------------------------------------------------------
'match_#'(NodeId, ResAcc) ->
case mnesia:read(trie, #trie_edge{node_id=NodeId, word = '#'}) of
[#trie{node_id=ChildId}] ->
mnesia:read(trie_node, ChildId) ++ ResAcc;
[] ->
ResAcc
end.
case mnesia:read(trie, #trie_edge{node_id=NodeId, word = '#'}) of
[#trie{node_id=ChildId}] ->
mnesia:read(trie_node, ChildId) ++ ResAcc;
[] ->
ResAcc
end.
%%------------------------------------------------------------------------------
%% @doc
@ -210,18 +210,18 @@ match_node(NodeId, [W|Words], ResAcc) ->
%% @end
%%------------------------------------------------------------------------------
delete_path([]) ->
ok;
ok;
delete_path([{NodeId, Word, _} | RestPath]) ->
mnesia:delete({trie, #trie_edge{node_id=NodeId, word=Word}}),
case mnesia:read(trie_node, NodeId) of
[#trie_node{edge_count=1, topic=undefined}] ->
mnesia:delete({trie_node, NodeId}),
delete_path(RestPath);
[TrieNode=#trie_node{edge_count=1, topic=_}] ->
mnesia:write(TrieNode#trie_node{edge_count=0});
[TrieNode=#trie_node{edge_count=C}] ->
mnesia:write(TrieNode#trie_node{edge_count=C-1});
[] ->
throw({notfound, NodeId})
end.
mnesia:delete({trie, #trie_edge{node_id=NodeId, word=Word}}),
case mnesia:read(trie_node, NodeId) of
[#trie_node{edge_count=1, topic=undefined}] ->
mnesia:delete({trie_node, NodeId}),
delete_path(RestPath);
[TrieNode=#trie_node{edge_count=1, topic=_}] ->
mnesia:write(TrieNode#trie_node{edge_count=0});
[TrieNode=#trie_node{edge_count=C}] ->
mnesia:write(TrieNode#trie_node{edge_count=C-1});
[] ->
throw({notfound, NodeId})
end.

View File

@ -81,9 +81,9 @@ ignore_lib_apps(Apps) ->
cancel_timer(undefined) ->
undefined;
undefined;
cancel_timer(Ref) ->
catch erlang:cancel_timer(Ref).
catch erlang:cancel_timer(Ref).
integer_to_binary(I) when is_integer(I) ->
list_to_binary(integer_to_list(I)).

View File

@ -33,23 +33,15 @@
-export([get_memory/0]).
-export([get_process_list/0,
get_process_info/0,
get_process_info/1,
get_process_gc/0,
get_process_gc/1,
-export([get_process_list/0, get_process_info/0, get_process_info/1,
get_process_gc/0, get_process_gc/1,
get_process_group_leader_info/1,
get_process_limit/0]).
-export([get_ets_list/0,
get_ets_info/0,
get_ets_info/1,
get_ets_object/0,
get_ets_object/1]).
-export([get_port_types/0,
get_port_info/0,
get_port_info/1]).
-export([get_ets_list/0, get_ets_info/0, get_ets_info/1,
get_ets_object/0, get_ets_object/1]).
-export([get_port_types/0, get_port_info/0, get_port_info/1]).
-define(UTIL_ALLOCATORS, [temp_alloc,
eheap_alloc,
@ -182,7 +174,7 @@ get_system_info() ->
get_system_info(Key) ->
try erlang:system_info(Key) catch
error:badarg->undefined
error:badarg->undefined
end.
%% conversion functions for erlang:system_info(Key)
@ -192,9 +184,8 @@ format_system_info(allocator, {_,_,_,List}) ->
List;
format_system_info(dist_ctrl, List) ->
lists:map(fun({Node, Socket}) ->
{ok, Stats} = inet:getstat(Socket),
{Node, Stats}
end, List);
{ok, Stats} = inet:getstat(Socket), {Node, Stats}
end, List);
format_system_info(driver_version, Value) ->
list_to_binary(Value);
format_system_info(machine, Value) ->
@ -238,22 +229,21 @@ scheduler_usage(Interval) when is_integer(Interval) ->
scheduler_usage_diff(First, Last).
scheduler_usage_diff(First, Last) ->
lists:map(
fun({{I, A0, T0},{I, A1, T1}}) ->{I, (A1 - A0)/(T1 - T0)}end,
lists:zip(lists:sort(First), lists:sort(Last))
).
lists:map(fun({{I, A0, T0},{I, A1, T1}}) ->
{I, (A1 - A0)/(T1 - T0)}
end, lists:zip(lists:sort(First), lists:sort(Last))).
get_memory()->
[{Key, get_memory(Key, current)} || Key <- [used, allocated, unused, usage]] ++ erlang:memory().
get_memory(used, Keyword) ->
lists:sum(lists:map(fun({_, Prop}) ->
container_size(Prop, Keyword, blocks_size)
end, util_alloc()));
container_size(Prop, Keyword, blocks_size)
end, util_alloc()));
get_memory(allocated, Keyword) ->
lists:sum(lists:map(fun({_, Prop})->
container_size(Prop, Keyword, carriers_size)
end, util_alloc()));
container_size(Prop, Keyword, carriers_size)
end, util_alloc()));
get_memory(unused, Keyword) ->
get_memory(allocated, Keyword) - get_memory(used, Keyword);
get_memory(usage, Keyword) ->
@ -274,11 +264,9 @@ snapshot_int() ->
allocators() ->
UtilAllocators = erlang:system_info(alloc_util_allocators),
Allocators = [sys_alloc, mseg_alloc|UtilAllocators],
[{{A, N},lists:sort(proplists:delete(versions, Props))} ||
A <- Allocators,
Allocs <- [erlang:system_info({allocator, A})],
Allocs =/= false,
{_, N, Props} <- Allocs].
[{{A, N},lists:sort(proplists:delete(versions, Props))} ||
A <- Allocators, Allocs <- [erlang:system_info({allocator, A})],
Allocs =/= false, {_, N, Props} <- Allocs].
container_size(Prop, Keyword, Container) ->
Sbcs = container_value(Prop, Keyword, sbcs, Container),
@ -289,10 +277,11 @@ container_value(Prop, Keyword, Type, Container) when is_atom(Keyword)->
container_value(Prop, 2, Type, Container);
container_value(Props, Pos, mbcs = Type, Container) when is_integer(Pos)->
Pool = case proplists:get_value(mbcs_pool, Props) of
PoolProps when PoolProps =/= undefined ->
element(Pos, lists:keyfind(Container, 1, PoolProps));
_ -> 0
end,
PoolProps when PoolProps =/= undefined ->
element(Pos, lists:keyfind(Container, 1, PoolProps));
_ ->
0
end,
TypeProps = proplists:get_value(Type, Props),
Pool + element(Pos, lists:keyfind(Container, 1, TypeProps));
@ -330,11 +319,11 @@ get_ets_info() ->
get_ets_info(Tab) ->
case ets:info(Tab) of
undefined ->
[];
Entries when is_list(Entries) ->
mapping(Entries)
end.
undefined ->
[];
Entries when is_list(Entries) ->
mapping(Entries)
end.
get_ets_object() ->
[{Tab, get_ets_object(Tab)} || Tab <- ets:all()].
@ -343,15 +332,15 @@ get_ets_object(Tab) ->
TabInfo = ets:info(Tab),
Size = proplists:get_value(size, TabInfo),
NameTab = proplists:get_value(named_table, TabInfo),
if (Size == 0) or (NameTab == false) ->
[];
if (Size == 0) or (NameTab == false) ->
[];
true ->
ets:tab2list(Tab)
ets:tab2list(Tab)
end.
get_port_types() ->
lists:usort(fun({KA, VA},{KB, VB})-> {VA, KB} >{VB, KA} end,
ports_type_count([Type || {_Port, Type} <- ports_type_list()])).
ports_type_count([Type || {_Port, Type} <- ports_type_list()])).
get_port_info() ->
[get_port_info(Port) ||Port <- erlang:ports()].
@ -361,11 +350,11 @@ get_port_info(PortTerm) ->
[port_info(Port, Type) || Type <- [meta, signals, io, memory_used, specific]].
port_info(Port, meta) ->
{meta, List} = port_info_type(Port, meta, [id, name, os_pid]),
case port_info(Port, registered_name) of
[] -> {meta, List};
Name -> {meta, [Name | List]}
end;
{meta, List} = port_info_type(Port, meta, [id, name, os_pid]),
case port_info(Port, registered_name) of
[] -> {meta, List};
Name -> {meta, [Name | List]}
end;
port_info(PortTerm, signals) ->
port_info_type(PortTerm, signals, [connected, links, monitors]);
@ -377,43 +366,43 @@ port_info(PortTerm, memory_used) ->
port_info_type(PortTerm, memory_used, [memory, queue_size]);
port_info(PortTerm, specific) ->
Port = transform_port(PortTerm),
Props = case erlang:port_info(Port, name) of
{_, Type} when Type =:= "udp_inet";
Type =:= "tcp_inet";
Type =:= "sctp_inet" ->
case catch inet:getstat(Port) of
{ok, Stats} -> [{statistics, Stats}];
_ ->[]
end ++
case catch inet:peername(Port) of
{ok, Peer} ->[{peername, Peer}];
{error, _} ->[]
end ++
case catch inet:sockname(Port) of
{ok, Local} ->[{sockname, Local}];
{error, _} -> []
end ++
case catch inet:getopts(Port, ?SOCKET_OPTS ) of
{ok, Opts} -> [{options, Opts}];
{error, _} -> []
end;
{_, "efile"} ->
[];
_ ->[]
end,
{specific, Props};
Port = transform_port(PortTerm),
Props = case erlang:port_info(Port, name) of
{_, Type} when Type =:= "udp_inet";
Type =:= "tcp_inet";
Type =:= "sctp_inet" ->
case catch inet:getstat(Port) of
{ok, Stats} -> [{statistics, Stats}];
_ -> []
end ++
case catch inet:peername(Port) of
{ok, Peer} -> [{peername, Peer}];
{error, _} -> []
end ++
case catch inet:sockname(Port) of
{ok, Local} -> [{sockname, Local}];
{error, _} -> []
end ++
case catch inet:getopts(Port, ?SOCKET_OPTS ) of
{ok, Opts} -> [{options, Opts}];
{error, _} -> []
end;
{_, "efile"} ->
[];
_ ->
[]
end,
{specific, Props};
port_info(PortTerm, Keys) when is_list(Keys) ->
Port = transform_port(PortTerm),
[erlang:port_info(Port, Key) || Key <- Keys];
Port = transform_port(PortTerm),
[erlang:port_info(Port, Key) || Key <- Keys];
port_info(PortTerm, Key) when is_atom(Key) ->
Port = transform_port(PortTerm),
erlang:port_info(Port, Key).
Port = transform_port(PortTerm),
erlang:port_info(Port, Key).
port_info_type(PortTerm, Type, Keys) ->
Port = transform_port(PortTerm),
{Type, [erlang:port_info(Port, Key) || Key <- Keys]}.
Port = transform_port(PortTerm),
{Type, [erlang:port_info(Port, Key) || Key <- Keys]}.
transform_port(Port) when is_port(Port) -> Port;
transform_port("#Port<0." ++ Id) ->
@ -423,21 +412,17 @@ transform_port(N) when is_integer(N) ->
Name = iolist_to_binary(atom_to_list(node())),
NameLen = iolist_size(Name),
Vsn = binary:last(term_to_binary(self())),
Bin = <<131, 102, 100,
NameLen:2/unit:8,
Name:NameLen/binary,
N:4/unit:8,
Vsn:8>>,
Bin = <<131, 102, 100, NameLen:2/unit:8, Name:NameLen/binary, N:4/unit:8, Vsn:8>>,
binary_to_term(Bin).
ports_type_list() ->
[{Port, PortType} || Port <- erlang:ports(),
{_, PortType} <- [erlang:port_info(Port, name)]].
{_, PortType} <- [erlang:port_info(Port, name)]].
ports_type_count(Types) ->
DictTypes = lists:foldl(fun(Type, Acc)->
dict:update_counter(Type, 1, Acc)
end, dict:new(), Types),
dict:update_counter(Type, 1, Acc)
end, dict:new(), Types),
dict:to_list(DictTypes).
mapping(Entries) ->