diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index 32320d50d..50339361c 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -1,11 +1,11 @@ % -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- %% ex: ft=erlang ts=4 sw=4 et [{kernel, - [{start_timer, true}, - {start_pg2, true} + [{start_timer, true}, + {start_pg2, true} ]}, {sasl, [ - {sasl_error_logger, {file, "log/emqttd_sasl.log"}} + {sasl_error_logger, {file, "log/emqttd_sasl.log"}} ]}, {ssl, [ %{versions, ['tlsv1.2', 'tlsv1.1']} @@ -13,29 +13,29 @@ {lager, [ {colored, true}, {async_threshold, 1000}, - {error_logger_redirect, false}, - {crash_log, "log/emqttd_crash.log"}, - {handlers, [ - {lager_console_backend, info}, + {error_logger_redirect, false}, + {crash_log, "log/emqttd_crash.log"}, + {handlers, [ + {lager_console_backend, info}, %%NOTICE: Level >= error - %%{lager_emqtt_backend, error}, - {lager_file_backend, [ + %%{lager_emqtt_backend, error}, + {lager_file_backend, [ {formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]}, - {file, "log/emqttd_info.log"}, - {level, info}, - {size, 104857600}, - {date, "$D0"}, - {count, 30} - ]}, - {lager_file_backend, [ + {file, "log/emqttd_info.log"}, + {level, info}, + {size, 104857600}, + {date, "$D0"}, + {count, 30} + ]}, + {lager_file_backend, [ {formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]}, - {file, "log/emqttd_error.log"}, - {level, error}, - {size, 104857600}, - {date, "$D0"}, - {count, 30} - ]} - ]} + {file, "log/emqttd_error.log"}, + {level, error}, + {size, 104857600}, + {date, "$D0"}, + {count, 30} + ]} + ]} ]}, {esockd, [ {logger, {lager, info}} @@ -278,7 +278,6 @@ %% Erlang System Monitor {sysmon, [ - %% Long GC {long_gc, 100}, diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index 987ab4876..e602e60ba 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -1,11 +1,11 @@ % -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- %% ex: ft=erlang ts=4 sw=4 et -[{kernel, - [{start_timer, true}, - {start_pg2, true} +[{kernel, + [{start_timer, true}, + {start_pg2, true} ]}, {sasl, [ - {sasl_error_logger, {file, "log/emqttd_sasl.log"}} + {sasl_error_logger, {file, "log/emqttd_sasl.log"}} ]}, {ssl, [ %{versions, ['tlsv1.2', 'tlsv1.1']} @@ -13,21 +13,21 @@ {lager, [ {colored, true}, {async_threshold, 5000}, - {error_logger_redirect, false}, - {crash_log, "log/emqttd_crash.log"}, - {handlers, [ - %%{lager_console_backend, info}, + {error_logger_redirect, false}, + {crash_log, "log/emqttd_crash.log"}, + {handlers, [ + %%{lager_console_backend, info}, %%NOTICE: Level >= error - %%{lager_emqtt_backend, error}, - {lager_file_backend, [ + %%{lager_emqtt_backend, error}, + {lager_file_backend, [ {formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]}, - {file, "log/emqttd_error.log"}, - {level, error}, - {size, 104857600}, - {date, "$D0"}, - {count, 30} - ]} - ]} + {file, "log/emqttd_error.log"}, + {level, error}, + {size, 104857600}, + {date, "$D0"}, + {count, 30} + ]} + ]} ]}, {esockd, [ {logger, {lager, error}} @@ -270,7 +270,6 @@ %% Erlang System Monitor {sysmon, [ - %% Long GC, don't monitor in production mode for: %% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 {long_gc, false}, diff --git a/src/emqttd.erl b/src/emqttd.erl index c76fff74f..b99869025 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -31,12 +31,11 @@ is_running/1]). -define(MQTT_SOCKOPTS, [ - binary, - {packet, raw}, - {reuseaddr, true}, - {backlog, 512}, - {nodelay, true} -]). + binary, + {packet, raw}, + {reuseaddr, true}, + {backlog, 512}, + {nodelay, true}]). -define(APP, ?MODULE). @@ -83,12 +82,12 @@ start_listener({mqtts, Port, Options}) -> %% Start http listener start_listener({http, Port, Options}) -> MFArgs = {emqttd_http, handle_request, []}, - mochiweb:start_http(Port, Options, MFArgs); + mochiweb:start_http(Port, Options, MFArgs); %% Start https listener start_listener({https, Port, Options}) -> MFArgs = {emqttd_http, handle_request, []}, - mochiweb:start_http(Port, Options, MFArgs). + mochiweb:start_http(Port, Options, MFArgs). start_listener(Protocol, Port, Options) -> MFArgs = {emqttd_client, start_link, [env(mqtt)]}, diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index c04cd920b..43e0600a4 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -157,11 +157,11 @@ stop() -> %%%============================================================================= init([Opts]) -> - ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), + ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, Opts))}), ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, Opts))}), - {ok, state}. + {ok, state}. init_mods(auth, AuthMods) -> [init_mod(fun authmod/1, Name, Opts) || {Name, Opts} <- AuthMods]; @@ -215,16 +215,16 @@ handle_call(Req, _From, State) -> {reply, {error, badreq}, State}. handle_cast(_Msg, State) -> - {noreply, State}. + {noreply, State}. handle_info(_Info, State) -> - {noreply, State}. + {noreply, State}. terminate(_Reason, _State) -> - ok. + ok. code_change(_OldVsn, State, _Extra) -> - {ok, State}. + {ok, State}. %%%============================================================================= %%% Internal functions diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index 2ab37ca08..2ed60e1b6 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -68,7 +68,7 @@ add_clientid(ClientId, Password) -> %%------------------------------------------------------------------------------ -spec lookup_clientid(binary()) -> list(). lookup_clientid(ClientId) -> - mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId). + mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId). %%------------------------------------------------------------------------------ %% @doc Lookup all clientids @@ -76,7 +76,7 @@ lookup_clientid(ClientId) -> %%------------------------------------------------------------------------------ -spec all_clientids() -> list(binary()). all_clientids() -> - mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB). + mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB). %%------------------------------------------------------------------------------ %% @doc Remove clientid @@ -91,15 +91,15 @@ remove_clientid(ClientId) -> %%%============================================================================= init(Opts) -> - mnesia:create_table(?AUTH_CLIENTID_TAB, [ - {ram_copies, [node()]}, - {attributes, record_info(fields, ?AUTH_CLIENTID_TAB)}]), - mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies), + mnesia:create_table(?AUTH_CLIENTID_TAB, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, ?AUTH_CLIENTID_TAB)}]), + mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies), case proplists:get_value(file, Opts) of undefined -> ok; File -> load(File) end, - {ok, Opts}. + {ok, Opts}. check(#mqtt_client{client_id = undefined}, _Password, []) -> {error, "ClientId undefined"}; diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index 69fff4a77..aa831642c 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -99,10 +99,10 @@ all_users() -> %%% emqttd_auth callbacks %%%============================================================================= init(Opts) -> - mnesia:create_table(?AUTH_USERNAME_TAB, [ - {disc_copies, [node()]}, - {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]), - mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), disc_copies), + mnesia:create_table(?AUTH_USERNAME_TAB, [ + {disc_copies, [node()]}, + {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]), + mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), disc_copies), emqttd_ctl:register_cmd(users, {?MODULE, cli}, []), {ok, Opts}. @@ -111,7 +111,7 @@ check(#mqtt_client{username = undefined}, _Password, _Opts) -> check(_User, undefined, _Opts) -> {error, "Password undefined"}; check(#mqtt_client{username = Username}, Password, _Opts) -> - case mnesia:dirty_read(?AUTH_USERNAME_TAB, Username) of + case mnesia:dirty_read(?AUTH_USERNAME_TAB, Username) of [] -> {error, "Username Not Found"}; [#?AUTH_USERNAME_TAB{password = <>}] -> @@ -119,8 +119,8 @@ check(#mqtt_client{username = Username}, Password, _Opts) -> true -> ok; false -> {error, "Password Not Right"} end - end. - + end. + description() -> "Username password authentication module". diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index dc6f4e3af..0c1a7e701 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -84,7 +84,7 @@ stop_bridge(Node, SubTopic) -> supervisor:delete_child(?MODULE, ChildId); {error, Reason} -> {error, Reason} - end. + end. %%%============================================================================= %%% Supervisor callbacks diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index 9b3bffe2b..c1a226aea 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -71,7 +71,7 @@ lookup(ClientId) when is_binary(ClientId) -> case ets:lookup(mqtt_client, ClientId) of [Client] -> Client; [] -> undefined - end. + end. %%------------------------------------------------------------------------------ %% @doc Lookup client pid by clientId @@ -136,10 +136,10 @@ handle_cast({register, Client = #mqtt_client{client_id = ClientId, _ -> ets:insert(mqtt_client, Client), {noreply, setstats(monitor_client(ClientId, Pid, State))} - end; + end; handle_cast({unregister, ClientId, Pid}, State) -> - case lookup_proc(ClientId) of + case lookup_proc(ClientId) of Pid -> ets:delete(mqtt_client, ClientId), {noreply, setstats(State)}; diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 22f5e5f8c..a3228f7ff 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -53,8 +53,8 @@ handle_request('GET', "/status", Req) -> handle_request('POST', "/mqtt/publish", Req) -> Params = mochiweb_request:parse_post(Req), lager:info("HTTP Publish: ~p", [Params]), - case authorized(Req) of - true -> + case authorized(Req) of + true -> ClientId = get_value("client", Params, http), Qos = int(get_value("qos", Params, "0")), Retain = bool(get_value("retain", Params, "0")), @@ -70,9 +70,9 @@ handle_request('POST', "/mqtt/publish", Req) -> {_, false} -> Req:respond({400, [], <<"Bad Topic">>}) end; - false -> - Req:respond({401, [], <<"Fobbiden">>}) - end; + false -> + Req:respond({401, [], <<"Fobbiden">>}) + end; %%------------------------------------------------------------------------------ %% MQTT Over WebSocket @@ -101,16 +101,16 @@ handle_request('GET', "/" ++ File, Req) -> handle_request(Method, Path, Req) -> lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]), - Req:not_found(). + Req:not_found(). %%------------------------------------------------------------------------------ %% basic authorization %%------------------------------------------------------------------------------ authorized(Req) -> case Req:get_header_value("Authorization") of - undefined -> - false; - "Basic " ++ BasicAuth -> + undefined -> + false; + "Basic " ++ BasicAuth -> {Username, Password} = user_passwd(BasicAuth), case emqttd_access_control:auth(#mqtt_client{username = Username}, Password) of ok -> @@ -119,10 +119,10 @@ authorized(Req) -> lager:error("HTTP Auth failure: username=~s, reason=~p", [Username, Reason]), false end - end. + end. user_passwd(BasicAuth) -> - list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). + list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). validate(qos, Qos) -> (Qos >= ?QOS_0) and (Qos =< ?QOS_2); diff --git a/src/emqttd_keepalive.erl b/src/emqttd_keepalive.erl index 95517b297..2616f746b 100644 --- a/src/emqttd_keepalive.erl +++ b/src/emqttd_keepalive.erl @@ -42,7 +42,7 @@ start(_, 0, _) -> undefined; start(StatFun, TimeoutSec, TimeoutMsg) -> {ok, StatVal} = StatFun(), - #keepalive{statfun = StatFun, statval = StatVal, + #keepalive{statfun = StatFun, statval = StatVal, tsec = TimeoutSec, tmsg = TimeoutMsg, tref = timer(TimeoutSec, TimeoutMsg)}. @@ -76,9 +76,9 @@ resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) -> cancel(#keepalive{tref = TRef}) -> cancel(TRef); cancel(undefined) -> - ok; + ok; cancel(TRef) -> - catch erlang:cancel_timer(TRef). + catch erlang:cancel_timer(TRef). timer(Sec, Msg) -> erlang:send_after(timer:seconds(Sec), self(), Msg). diff --git a/src/emqttd_message.erl b/src/emqttd_message.erl index 8b0cdfc2d..831528cfd 100644 --- a/src/emqttd_message.erl +++ b/src/emqttd_message.erl @@ -118,7 +118,7 @@ to_packet(#mqtt_message{pktid = PkgId, topic = Topic, payload = Payload}) -> - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = Qos, retain = Retain, dup = Dup}, diff --git a/src/emqttd_mod_sup.erl b/src/emqttd_mod_sup.erl index 07062c604..76c60d164 100644 --- a/src/emqttd_mod_sup.erl +++ b/src/emqttd_mod_sup.erl @@ -46,14 +46,14 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). start_child(ChildSpec) when is_tuple(ChildSpec) -> - supervisor:start_child(?MODULE, ChildSpec). + supervisor:start_child(?MODULE, ChildSpec). %% %% start_child(Mod::atom(), Type::type()) -> {ok, pid()} %% @type type() = worker | supervisor %% start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> - supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). + supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). %%%============================================================================= %%% Supervisor callbacks diff --git a/src/emqttd_net.erl b/src/emqttd_net.erl index ec961c32e..1242ae9ee 100644 --- a/src/emqttd_net.erl +++ b/src/emqttd_net.erl @@ -156,17 +156,17 @@ tcp_name(Prefix, IPAddress, Port) when is_atom(Prefix) andalso is_number(Port) -> list_to_atom( lists:flatten( - io_lib:format( - "~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port]))). + io_lib:format( + "~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port]))). connection_string(Sock, Direction) -> case socket_ends(Sock, Direction) of {ok, {FromAddress, FromPort, ToAddress, ToPort}} -> {ok, lists:flatten( - io_lib:format( - "~s:~p -> ~s:~p", - [maybe_ntoab(FromAddress), FromPort, - maybe_ntoab(ToAddress), ToPort]))}; + io_lib:format( + "~s:~p -> ~s:~p", + [maybe_ntoab(FromAddress), FromPort, + maybe_ntoab(ToAddress), ToPort]))}; Error -> Error end. diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index b056a6f05..122a47304 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -62,7 +62,7 @@ init(Peername, SendFun, Opts) -> MaxLen = emqttd_opts:g(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), WsInitialHeaders = emqttd_opts:g(ws_initial_headers, Opts), - #proto_state{peername = Peername, + #proto_state{peername = Peername, sendfun = SendFun, max_clientid_len = MaxLen, client_pid = self(), @@ -118,12 +118,12 @@ received(_Packet, State = #proto_state{connected = false}) -> received(Packet = ?PACKET(_Type), State) -> trace(recv, Packet, State), - case validate_packet(Packet) of + case validate_packet(Packet) of ok -> process(Packet, State); {error, Reason} -> {error, Reason, State} - end. + end. process(Packet = ?CONNECT_PACKET(Var), State0) -> @@ -185,7 +185,7 @@ process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) -> deny -> ?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State) end, - {ok, State}; + {ok, State}; process(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) -> emqttd_session:puback(Session, PacketId), @@ -254,7 +254,7 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), -spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}. send(Msg, State) when is_record(Msg, mqtt_message) -> - send(emqttd_message:to_packet(Msg), State); + send(emqttd_message:to_packet(Msg), State); send(Packet, State = #proto_state{sendfun = SendFun}) when is_record(Packet, mqtt_packet) -> @@ -352,10 +352,10 @@ validate_clientid(#mqtt_packet_connect{proto_ver = ProtoVer, false. validate_packet(?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload)) -> - case emqttd_topic:validate({name, Topic}) of + case emqttd_topic:validate({name, Topic}) of true -> ok; false -> {error, badtopic} - end; + end; validate_packet(?SUBSCRIBE_PACKET(_PacketId, TopicTable)) -> validate_topics(filter, TopicTable); @@ -374,16 +374,16 @@ validate_topics(Type, TopicTable = [{_Topic, _Qos}|_]) Valid = fun(Topic, Qos) -> emqttd_topic:validate({Type, Topic}) and validate_qos(Qos) end, - case [Topic || {Topic, Qos} <- TopicTable, not Valid(Topic, Qos)] of + case [Topic || {Topic, Qos} <- TopicTable, not Valid(Topic, Qos)] of [] -> ok; _ -> {error, badtopic} - end; + end; validate_topics(Type, Topics = [Topic0|_]) when is_binary(Topic0) -> - case [Topic || Topic <- Topics, not emqttd_topic:validate({Type, Topic})] of + case [Topic || Topic <- Topics, not emqttd_topic:validate({Type, Topic})] of [] -> ok; _ -> {error, badtopic} - end. + end. validate_qos(undefined) -> true; diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 96851ae79..bb8032c79 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -224,7 +224,7 @@ publish(Msg = #mqtt_message{from = From}) -> end. publish(To, Msg) -> - lists:foreach(fun(#mqtt_topic{topic = Topic, node = Node}) -> + lists:foreach(fun(#mqtt_topic{topic = Topic, node = Node}) -> case Node =:= node() of true -> ?ROUTER:route(Topic, Msg); false -> rpc:cast(Node, ?ROUTER, route, [Topic, Msg]) @@ -237,9 +237,9 @@ publish(To, Msg) -> %%------------------------------------------------------------------------------ -spec match(binary()) -> [mqtt_topic()]. match(To) -> - MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]), + MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]), %% ets:lookup for topic table will be replicated. - lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]). + lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]). %%%============================================================================= %%% gen_server callbacks diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index a2289920d..1049a8173 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -83,7 +83,7 @@ old_client_pid :: pid(), %% Last packet id of the session - packet_id = 1, + packet_id = 1, %% Client’s subscriptions. subscriptions :: dict:dict(), @@ -198,7 +198,7 @@ publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) -> publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) -> %% publish qos1 directly, and client will puback automatically - emqttd_pubsub:publish(Msg); + emqttd_pubsub:publish(Msg); publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) -> %% publish qos2 by session @@ -701,9 +701,9 @@ timer(TimeoutSec, TimeoutMsg) -> erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg). cancel_timer(undefined) -> - undefined; + undefined; cancel_timer(Ref) -> - catch erlang:cancel_timer(Ref). + catch erlang:cancel_timer(Ref). noreply(State) -> {noreply, State}. diff --git a/src/emqttd_sup.erl b/src/emqttd_sup.erl index 2785ba04d..111dba0c4 100644 --- a/src/emqttd_sup.erl +++ b/src/emqttd_sup.erl @@ -47,14 +47,14 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). start_child(ChildSpec) when is_tuple(ChildSpec) -> - supervisor:start_child(?MODULE, ChildSpec). + supervisor:start_child(?MODULE, ChildSpec). %% %% start_child(Mod::atom(), Type::type()) -> {ok, pid()} %% @type type() = worker | supervisor %% start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> - supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). + supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). %%%============================================================================= %%% Supervisor callbacks diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index 05189aba8..a77f330cc 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -49,7 +49,7 @@ %%%----------------------------------------------------------------------------- -spec wildcard(binary()) -> true | false. wildcard(Topic) when is_binary(Topic) -> - wildcard(words(Topic)); + wildcard(words(Topic)); wildcard([]) -> false; wildcard(['#'|_]) -> @@ -67,25 +67,25 @@ wildcard([_H|T]) -> Name :: binary() | words(), Filter :: binary() | words(). match(Name, Filter) when is_binary(Name) and is_binary(Filter) -> - match(words(Name), words(Filter)); + match(words(Name), words(Filter)); match([], []) -> - true; + true; match([H|T1], [H|T2]) -> - match(T1, T2); + match(T1, T2); match([<<$$, _/binary>>|_], ['+'|_]) -> false; match([_H|T1], ['+'|T2]) -> - match(T1, T2); + match(T1, T2); match([<<$$, _/binary>>|_], ['#']) -> false; match(_, ['#']) -> - true; + true; match([_H1|_], [_H2|_]) -> - false; + false; match([_H1|_], []) -> - false; + false; match([], [_H|_T2]) -> - false. + false. %%------------------------------------------------------------------------------ %% @doc Validate Topic @@ -93,14 +93,14 @@ match([], [_H|_T2]) -> %%------------------------------------------------------------------------------ -spec validate({name | filter, binary()}) -> boolean(). validate({_, <<>>}) -> - false; + false; validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) -> - false; + false; validate({filter, Topic}) when is_binary(Topic) -> - validate2(words(Topic)); + validate2(words(Topic)); validate({name, Topic}) when is_binary(Topic) -> - Words = words(Topic), - validate2(Words) and (not wildcard(Words)). + Words = words(Topic), + validate2(Words) and (not wildcard(Words)). validate2([]) -> true; @@ -131,7 +131,7 @@ validate3(<<_/utf8, Rest/binary>>) -> %%%----------------------------------------------------------------------------- -spec triples(binary()) -> list(triple()). triples(Topic) when is_binary(Topic) -> - triples(words(Topic), root, []). + triples(words(Topic), root, []). triples([], _Parent, Acc) -> reverse(Acc); diff --git a/src/emqttd_trie.erl b/src/emqttd_trie.erl index dd3920fa6..a5f9ac9e5 100644 --- a/src/emqttd_trie.erl +++ b/src/emqttd_trie.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% MQTT Topic Trie Tree. +%%% MQTT Topic Trie. %%% %%% [Trie](http://en.wikipedia.org/wiki/Trie) %%% @@ -42,19 +42,19 @@ -type node_id() :: binary() | atom(). -record(trie_node, { - node_id :: node_id(), + node_id :: node_id(), edge_count = 0 :: non_neg_integer(), - topic :: binary() | undefined + topic :: binary() | undefined }). -record(trie_edge, { - node_id :: node_id(), - word :: binary() | atom() + node_id :: node_id(), + word :: binary() | atom() }). -record(trie, { - edge :: #trie_edge{}, - node_id :: node_id() + edge :: #trie_edge{}, + node_id :: node_id() }). %%%============================================================================= @@ -98,17 +98,17 @@ mnesia(copy) -> %%------------------------------------------------------------------------------ -spec insert(Topic :: binary()) -> ok. insert(Topic) when is_binary(Topic) -> - case mnesia:read(trie_node, Topic) of - [#trie_node{topic=Topic}] -> + case mnesia:read(trie_node, Topic) of + [#trie_node{topic=Topic}] -> ok; - [TrieNode=#trie_node{topic=undefined}] -> - mnesia:write(TrieNode#trie_node{topic=Topic}); - [] -> - %add trie path - [add_path(Triple) || Triple <- emqttd_topic:triples(Topic)], - %add last node - mnesia:write(#trie_node{node_id=Topic, topic=Topic}) - end. + [TrieNode=#trie_node{topic=undefined}] -> + mnesia:write(TrieNode#trie_node{topic=Topic}); + [] -> + %add trie path + [add_path(Triple) || Triple <- emqttd_topic:triples(Topic)], + %add last node + mnesia:write(#trie_node{node_id=Topic, topic=Topic}) + end. %%------------------------------------------------------------------------------ %% @doc Find trie nodes that match topic @@ -117,23 +117,23 @@ insert(Topic) when is_binary(Topic) -> -spec match(Topic :: binary()) -> list(MatchedTopic :: binary()). match(Topic) when is_binary(Topic) -> TrieNodes = match_node(root, emqttd_topic:words(Topic)), - [Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined]. + [Name || #trie_node{topic=Name} <- TrieNodes, Name =/= undefined]. %%------------------------------------------------------------------------------ -%% @doc Delete topic from trie tree +%% @doc Delete topic from trie %% @end %%------------------------------------------------------------------------------ -spec delete(Topic :: binary()) -> ok. delete(Topic) when is_binary(Topic) -> - case mnesia:read(trie_node, Topic) of - [#trie_node{edge_count=0}] -> - mnesia:delete({trie_node, Topic}), - delete_path(lists:reverse(emqttd_topic:triples(Topic))); - [TrieNode] -> - mnesia:write(TrieNode#trie_node{topic=Topic}); - [] -> - ok - end. + case mnesia:read(trie_node, Topic) of + [#trie_node{edge_count=0}] -> + mnesia:delete({trie_node, Topic}), + delete_path(lists:reverse(emqttd_topic:triples(Topic))); + [TrieNode] -> + mnesia:write(TrieNode#trie_node{topic=Topic}); + [] -> + ok + end. %%%============================================================================= %%% Internal functions @@ -147,21 +147,21 @@ delete(Topic) when is_binary(Topic) -> %% @end %%------------------------------------------------------------------------------ add_path({Node, Word, Child}) -> - Edge = #trie_edge{node_id=Node, word=Word}, - case mnesia:read(trie_node, Node) of - [TrieNode = #trie_node{edge_count=Count}] -> - case mnesia:wread({trie, Edge}) of - [] -> - mnesia:write(TrieNode#trie_node{edge_count=Count+1}), - mnesia:write(#trie{edge=Edge, node_id=Child}); - [_] -> - ok - end; - [] -> - mnesia:write(#trie_node{node_id=Node, edge_count=1}), - mnesia:write(#trie{edge=Edge, node_id=Child}) - end. - + Edge = #trie_edge{node_id=Node, word=Word}, + case mnesia:read(trie_node, Node) of + [TrieNode = #trie_node{edge_count=Count}] -> + case mnesia:wread({trie, Edge}) of + [] -> + mnesia:write(TrieNode#trie_node{edge_count=Count+1}), + mnesia:write(#trie{edge=Edge, node_id=Child}); + [_] -> + ok + end; + [] -> + mnesia:write(#trie_node{node_id=Node, edge_count=1}), + mnesia:write(#trie{edge=Edge, node_id=Child}) + end. + %%------------------------------------------------------------------------------ %% @doc %% @private @@ -177,15 +177,15 @@ match_node(NodeId, Words) -> match_node(NodeId, Words, []). match_node(NodeId, [], ResAcc) -> - mnesia:read(trie_node, NodeId) ++ 'match_#'(NodeId, ResAcc); + mnesia:read(trie_node, NodeId) ++ 'match_#'(NodeId, ResAcc); match_node(NodeId, [W|Words], ResAcc) -> - lists:foldl(fun(WArg, Acc) -> - case mnesia:read(trie, #trie_edge{node_id=NodeId, word=WArg}) of - [#trie{node_id=ChildId}] -> match_node(ChildId, Words, Acc); - [] -> Acc - end - end, 'match_#'(NodeId, ResAcc), [W, '+']). + lists:foldl(fun(WArg, Acc) -> + case mnesia:read(trie, #trie_edge{node_id=NodeId, word=WArg}) of + [#trie{node_id=ChildId}] -> match_node(ChildId, Words, Acc); + [] -> Acc + end + end, 'match_#'(NodeId, ResAcc), [W, '+']). %%------------------------------------------------------------------------------ %% @doc @@ -195,12 +195,12 @@ match_node(NodeId, [W|Words], ResAcc) -> %% @end %%------------------------------------------------------------------------------ 'match_#'(NodeId, ResAcc) -> - case mnesia:read(trie, #trie_edge{node_id=NodeId, word = '#'}) of - [#trie{node_id=ChildId}] -> - mnesia:read(trie_node, ChildId) ++ ResAcc; - [] -> - ResAcc - end. + case mnesia:read(trie, #trie_edge{node_id=NodeId, word = '#'}) of + [#trie{node_id=ChildId}] -> + mnesia:read(trie_node, ChildId) ++ ResAcc; + [] -> + ResAcc + end. %%------------------------------------------------------------------------------ %% @doc @@ -210,18 +210,18 @@ match_node(NodeId, [W|Words], ResAcc) -> %% @end %%------------------------------------------------------------------------------ delete_path([]) -> - ok; + ok; delete_path([{NodeId, Word, _} | RestPath]) -> - mnesia:delete({trie, #trie_edge{node_id=NodeId, word=Word}}), - case mnesia:read(trie_node, NodeId) of - [#trie_node{edge_count=1, topic=undefined}] -> - mnesia:delete({trie_node, NodeId}), - delete_path(RestPath); - [TrieNode=#trie_node{edge_count=1, topic=_}] -> - mnesia:write(TrieNode#trie_node{edge_count=0}); - [TrieNode=#trie_node{edge_count=C}] -> - mnesia:write(TrieNode#trie_node{edge_count=C-1}); - [] -> - throw({notfound, NodeId}) - end. + mnesia:delete({trie, #trie_edge{node_id=NodeId, word=Word}}), + case mnesia:read(trie_node, NodeId) of + [#trie_node{edge_count=1, topic=undefined}] -> + mnesia:delete({trie_node, NodeId}), + delete_path(RestPath); + [TrieNode=#trie_node{edge_count=1, topic=_}] -> + mnesia:write(TrieNode#trie_node{edge_count=0}); + [TrieNode=#trie_node{edge_count=C}] -> + mnesia:write(TrieNode#trie_node{edge_count=C-1}); + [] -> + throw({notfound, NodeId}) + end. diff --git a/src/emqttd_util.erl b/src/emqttd_util.erl index 312471e8f..fcb06d4c3 100644 --- a/src/emqttd_util.erl +++ b/src/emqttd_util.erl @@ -81,9 +81,9 @@ ignore_lib_apps(Apps) -> cancel_timer(undefined) -> - undefined; + undefined; cancel_timer(Ref) -> - catch erlang:cancel_timer(Ref). + catch erlang:cancel_timer(Ref). integer_to_binary(I) when is_integer(I) -> list_to_binary(integer_to_list(I)). diff --git a/src/emqttd_vm.erl b/src/emqttd_vm.erl index 9a2449281..2b0e1b85f 100644 --- a/src/emqttd_vm.erl +++ b/src/emqttd_vm.erl @@ -33,23 +33,15 @@ -export([get_memory/0]). --export([get_process_list/0, - get_process_info/0, - get_process_info/1, - get_process_gc/0, - get_process_gc/1, +-export([get_process_list/0, get_process_info/0, get_process_info/1, + get_process_gc/0, get_process_gc/1, get_process_group_leader_info/1, get_process_limit/0]). - --export([get_ets_list/0, - get_ets_info/0, - get_ets_info/1, - get_ets_object/0, - get_ets_object/1]). --export([get_port_types/0, - get_port_info/0, - get_port_info/1]). +-export([get_ets_list/0, get_ets_info/0, get_ets_info/1, + get_ets_object/0, get_ets_object/1]). + +-export([get_port_types/0, get_port_info/0, get_port_info/1]). -define(UTIL_ALLOCATORS, [temp_alloc, eheap_alloc, @@ -182,7 +174,7 @@ get_system_info() -> get_system_info(Key) -> try erlang:system_info(Key) catch - error:badarg->undefined + error:badarg->undefined end. %% conversion functions for erlang:system_info(Key) @@ -192,9 +184,8 @@ format_system_info(allocator, {_,_,_,List}) -> List; format_system_info(dist_ctrl, List) -> lists:map(fun({Node, Socket}) -> - {ok, Stats} = inet:getstat(Socket), - {Node, Stats} - end, List); + {ok, Stats} = inet:getstat(Socket), {Node, Stats} + end, List); format_system_info(driver_version, Value) -> list_to_binary(Value); format_system_info(machine, Value) -> @@ -238,22 +229,21 @@ scheduler_usage(Interval) when is_integer(Interval) -> scheduler_usage_diff(First, Last). scheduler_usage_diff(First, Last) -> - lists:map( - fun({{I, A0, T0},{I, A1, T1}}) ->{I, (A1 - A0)/(T1 - T0)}end, - lists:zip(lists:sort(First), lists:sort(Last)) - ). + lists:map(fun({{I, A0, T0},{I, A1, T1}}) -> + {I, (A1 - A0)/(T1 - T0)} + end, lists:zip(lists:sort(First), lists:sort(Last))). get_memory()-> [{Key, get_memory(Key, current)} || Key <- [used, allocated, unused, usage]] ++ erlang:memory(). get_memory(used, Keyword) -> lists:sum(lists:map(fun({_, Prop}) -> - container_size(Prop, Keyword, blocks_size) - end, util_alloc())); + container_size(Prop, Keyword, blocks_size) + end, util_alloc())); get_memory(allocated, Keyword) -> lists:sum(lists:map(fun({_, Prop})-> - container_size(Prop, Keyword, carriers_size) - end, util_alloc())); + container_size(Prop, Keyword, carriers_size) + end, util_alloc())); get_memory(unused, Keyword) -> get_memory(allocated, Keyword) - get_memory(used, Keyword); get_memory(usage, Keyword) -> @@ -274,11 +264,9 @@ snapshot_int() -> allocators() -> UtilAllocators = erlang:system_info(alloc_util_allocators), Allocators = [sys_alloc, mseg_alloc|UtilAllocators], - [{{A, N},lists:sort(proplists:delete(versions, Props))} || - A <- Allocators, - Allocs <- [erlang:system_info({allocator, A})], - Allocs =/= false, - {_, N, Props} <- Allocs]. + [{{A, N},lists:sort(proplists:delete(versions, Props))} || + A <- Allocators, Allocs <- [erlang:system_info({allocator, A})], + Allocs =/= false, {_, N, Props} <- Allocs]. container_size(Prop, Keyword, Container) -> Sbcs = container_value(Prop, Keyword, sbcs, Container), @@ -289,10 +277,11 @@ container_value(Prop, Keyword, Type, Container) when is_atom(Keyword)-> container_value(Prop, 2, Type, Container); container_value(Props, Pos, mbcs = Type, Container) when is_integer(Pos)-> Pool = case proplists:get_value(mbcs_pool, Props) of - PoolProps when PoolProps =/= undefined -> - element(Pos, lists:keyfind(Container, 1, PoolProps)); - _ -> 0 - end, + PoolProps when PoolProps =/= undefined -> + element(Pos, lists:keyfind(Container, 1, PoolProps)); + _ -> + 0 + end, TypeProps = proplists:get_value(Type, Props), Pool + element(Pos, lists:keyfind(Container, 1, TypeProps)); @@ -330,11 +319,11 @@ get_ets_info() -> get_ets_info(Tab) -> case ets:info(Tab) of - undefined -> - []; - Entries when is_list(Entries) -> - mapping(Entries) - end. + undefined -> + []; + Entries when is_list(Entries) -> + mapping(Entries) + end. get_ets_object() -> [{Tab, get_ets_object(Tab)} || Tab <- ets:all()]. @@ -343,15 +332,15 @@ get_ets_object(Tab) -> TabInfo = ets:info(Tab), Size = proplists:get_value(size, TabInfo), NameTab = proplists:get_value(named_table, TabInfo), - if (Size == 0) or (NameTab == false) -> - []; + if (Size == 0) or (NameTab == false) -> + []; true -> - ets:tab2list(Tab) + ets:tab2list(Tab) end. get_port_types() -> lists:usort(fun({KA, VA},{KB, VB})-> {VA, KB} >{VB, KA} end, - ports_type_count([Type || {_Port, Type} <- ports_type_list()])). + ports_type_count([Type || {_Port, Type} <- ports_type_list()])). get_port_info() -> [get_port_info(Port) ||Port <- erlang:ports()]. @@ -361,11 +350,11 @@ get_port_info(PortTerm) -> [port_info(Port, Type) || Type <- [meta, signals, io, memory_used, specific]]. port_info(Port, meta) -> - {meta, List} = port_info_type(Port, meta, [id, name, os_pid]), - case port_info(Port, registered_name) of - [] -> {meta, List}; - Name -> {meta, [Name | List]} -end; + {meta, List} = port_info_type(Port, meta, [id, name, os_pid]), + case port_info(Port, registered_name) of + [] -> {meta, List}; + Name -> {meta, [Name | List]} + end; port_info(PortTerm, signals) -> port_info_type(PortTerm, signals, [connected, links, monitors]); @@ -377,43 +366,43 @@ port_info(PortTerm, memory_used) -> port_info_type(PortTerm, memory_used, [memory, queue_size]); port_info(PortTerm, specific) -> - Port = transform_port(PortTerm), - Props = case erlang:port_info(Port, name) of - {_, Type} when Type =:= "udp_inet"; - Type =:= "tcp_inet"; - Type =:= "sctp_inet" -> - case catch inet:getstat(Port) of - {ok, Stats} -> [{statistics, Stats}]; - _ ->[] - - end ++ - case catch inet:peername(Port) of - {ok, Peer} ->[{peername, Peer}]; - {error, _} ->[] - end ++ - case catch inet:sockname(Port) of - {ok, Local} ->[{sockname, Local}]; - {error, _} -> [] -end ++ -case catch inet:getopts(Port, ?SOCKET_OPTS ) of - {ok, Opts} -> [{options, Opts}]; - {error, _} -> [] -end; - {_, "efile"} -> - []; - _ ->[] -end, - {specific, Props}; + Port = transform_port(PortTerm), + Props = case erlang:port_info(Port, name) of + {_, Type} when Type =:= "udp_inet"; + Type =:= "tcp_inet"; + Type =:= "sctp_inet" -> + case catch inet:getstat(Port) of + {ok, Stats} -> [{statistics, Stats}]; + _ -> [] + end ++ + case catch inet:peername(Port) of + {ok, Peer} -> [{peername, Peer}]; + {error, _} -> [] + end ++ + case catch inet:sockname(Port) of + {ok, Local} -> [{sockname, Local}]; + {error, _} -> [] + end ++ + case catch inet:getopts(Port, ?SOCKET_OPTS ) of + {ok, Opts} -> [{options, Opts}]; + {error, _} -> [] + end; + {_, "efile"} -> + []; + _ -> + [] + end, + {specific, Props}; port_info(PortTerm, Keys) when is_list(Keys) -> - Port = transform_port(PortTerm), - [erlang:port_info(Port, Key) || Key <- Keys]; + Port = transform_port(PortTerm), + [erlang:port_info(Port, Key) || Key <- Keys]; port_info(PortTerm, Key) when is_atom(Key) -> - Port = transform_port(PortTerm), - erlang:port_info(Port, Key). + Port = transform_port(PortTerm), + erlang:port_info(Port, Key). port_info_type(PortTerm, Type, Keys) -> - Port = transform_port(PortTerm), - {Type, [erlang:port_info(Port, Key) || Key <- Keys]}. + Port = transform_port(PortTerm), + {Type, [erlang:port_info(Port, Key) || Key <- Keys]}. transform_port(Port) when is_port(Port) -> Port; transform_port("#Port<0." ++ Id) -> @@ -423,21 +412,17 @@ transform_port(N) when is_integer(N) -> Name = iolist_to_binary(atom_to_list(node())), NameLen = iolist_size(Name), Vsn = binary:last(term_to_binary(self())), - Bin = <<131, 102, 100, - NameLen:2/unit:8, - Name:NameLen/binary, - N:4/unit:8, - Vsn:8>>, + Bin = <<131, 102, 100, NameLen:2/unit:8, Name:NameLen/binary, N:4/unit:8, Vsn:8>>, binary_to_term(Bin). ports_type_list() -> [{Port, PortType} || Port <- erlang:ports(), - {_, PortType} <- [erlang:port_info(Port, name)]]. + {_, PortType} <- [erlang:port_info(Port, name)]]. ports_type_count(Types) -> DictTypes = lists:foldl(fun(Type, Acc)-> - dict:update_counter(Type, 1, Acc) - end, dict:new(), Types), + dict:update_counter(Type, 1, Acc) + end, dict:new(), Types), dict:to_list(DictTypes). mapping(Entries) ->