From 192dc853048a9eecbd61411034f4bc728d22db25 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 13 Oct 2015 20:55:33 +0800 Subject: [PATCH 01/10] id --- src/emqttd.app.src | 1 + 1 file changed, 1 insertion(+) diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 6bda6f2de..570dec511 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,6 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, + {id, "emqttd"}, {vsn, "0.12.1"}, {modules, []}, {registered, []}, From a055a0a0c8cb1ba9ace384173261431fece131c0 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 13 Oct 2015 20:56:10 +0800 Subject: [PATCH 02/10] IS_PUBSUB --- src/emqttd_access_control.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index d53d25f19..320b6f75f 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_access_control). -author("Feng Lee "). @@ -61,8 +62,7 @@ %%------------------------------------------------------------------------------ -spec start_link() -> {ok, pid()} | ignore | {error, any()}. start_link() -> - {ok, AcOpts} = application:get_env(emqttd, access), - start_link(AcOpts). + start_link(emqttd:env(access)). -spec start_link(AcOpts :: list()) -> {ok, pid()} | ignore | {error, any()}. start_link(AcOpts) -> @@ -92,7 +92,7 @@ auth(Client, Password, [{Mod, State} | Mods]) -> Client :: mqtt_client(), PubSub :: pubsub(), Topic :: binary(). -check_acl(Client, PubSub, Topic) when PubSub =:= publish orelse PubSub =:= subscribe -> +check_acl(Client, PubSub, Topic) when ?IS_PUBSUB(PubSub) -> case lookup_mods(acl) of [] -> allow; AclMods -> check_acl(Client, PubSub, Topic, AclMods) From d3e39ae9a35d2807fd9dde5c664b6c068e833c51 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 13 Oct 2015 21:01:53 +0800 Subject: [PATCH 03/10] src/emqttd_mod_rewrite.erl --- include/emqttd.hrl | 2 + src/emqttd.erl | 9 ++- src/emqttd_access_rule.erl | 1 + src/emqttd_net.erl | 6 +- src/emqttd_opts.erl | 1 + src/emqttd_plugins.erl | 6 +- src/emqttd_pooler.erl | 15 +++-- src/emqttd_protocol.erl | 135 ++++++++++++++++++------------------- src/emqttd_sm_helper.erl | 2 +- src/emqttd_sysmon.erl | 10 +-- src/emqttd_topic.erl | 2 +- src/emqttd_ws_client.erl | 59 ++++++++-------- 12 files changed, 125 insertions(+), 123 deletions(-) diff --git a/include/emqttd.hrl b/include/emqttd.hrl index 5c2f61019..51bce2f6d 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -47,6 +47,8 @@ %%------------------------------------------------------------------------------ -type pubsub() :: publish | subscribe. +-define(IS_PUBSUB(PS), (PS =:= publish orelse PS =:= subscribe)). + %%------------------------------------------------------------------------------ %% MQTT Topic %%------------------------------------------------------------------------------ diff --git a/src/emqttd.erl b/src/emqttd.erl index b09cd2d32..eeb5bab4d 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd). -author("Feng Lee "). @@ -41,7 +42,7 @@ {nodelay, true} ]). --type listener() :: {atom(), inet:port_number(), [esockd:option()]}. +-type listener() :: {atom(), inet:port_number(), [esockd:option()]}. %%------------------------------------------------------------------------------ %% @doc Start emqttd application. @@ -109,14 +110,12 @@ close_listeners(Listeners) when is_list(Listeners) -> close_listener({Protocol, Port, _Options}) -> esockd:close({Protocol, Port}). - load_all_mods() -> - Mods = application:get_env(emqttd, modules, []), - lists:foreach(fun({Name, Opts}) -> + lists:foreach(fun({Name, Opts}) -> Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), Mod:load(Opts), lager:info("load module ~s successfully", [Name]) - end, Mods). + end, env(modules)). is_mod_enabled(Name) -> env(modules, Name) =/= undefined. diff --git a/src/emqttd_access_rule.erl b/src/emqttd_access_rule.erl index 40158285d..fab9461b9 100644 --- a/src/emqttd_access_rule.erl +++ b/src/emqttd_access_rule.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_access_rule). -author("Feng Lee "). diff --git a/src/emqttd_net.erl b/src/emqttd_net.erl index 8488fe76e..627856cd8 100644 --- a/src/emqttd_net.erl +++ b/src/emqttd_net.erl @@ -30,9 +30,11 @@ -include_lib("kernel/include/inet.hrl"). --export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]). +-export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, + getaddr/2, port_to_listeners/1]). --export([peername/1, sockname/1, format/2, format/1, connection_string/2, ntoa/1]). +-export([peername/1, sockname/1, format/2, format/1, + connection_string/2, ntoa/1]). -define(FIRST_TEST_BIND_PORT, 10000). diff --git a/src/emqttd_opts.erl b/src/emqttd_opts.erl index dfd5b74b6..938d002cc 100644 --- a/src/emqttd_opts.erl +++ b/src/emqttd_opts.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_opts). -author("Feng Lee "). diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index 8e23f41e1..b02b1d427 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -41,7 +41,6 @@ %% @doc Load all plugins when the broker started. %% @end %%------------------------------------------------------------------------------ - -spec load() -> list() | {error, any()}. load() -> case env(loaded_file) of @@ -64,7 +63,7 @@ with_loaded_file(File, SuccFun) -> load_plugins(Names, Persistent) -> Plugins = list(), NotFound = Names -- names(Plugins), case NotFound of - [] -> ok; + [] -> ok; NotFound -> lager:error("Cannot find plugins: ~p", [NotFound]) end, NeedToLoad = Names -- NotFound -- names(started_app), @@ -78,7 +77,7 @@ load_plugins(Names, Persistent) -> unload() -> case env(loaded_file) of {ok, File} -> - with_loaded_file(File, fun(Names) -> stop_plugins(Names) end); + with_loaded_file(File, fun stop_plugins/1); undefined -> ignore end. @@ -128,7 +127,6 @@ plugin(PluginsDir, AppFile0) -> %% @doc Load One Plugin %% @end %%------------------------------------------------------------------------------ - -spec load(atom()) -> ok | {error, any()}. load(PluginName) when is_atom(PluginName) -> case lists:member(PluginName, names(started_app)) of diff --git a/src/emqttd_pooler.erl b/src/emqttd_pooler.erl index 147c19ee5..bec9f287a 100644 --- a/src/emqttd_pooler.erl +++ b/src/emqttd_pooler.erl @@ -42,9 +42,12 @@ %%%============================================================================= %%% API %%%============================================================================= --spec start_link(I :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}. -start_link(I) -> - gen_server:start_link(?MODULE, [I], []). +-spec start_link(Id :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}. +start_link(Id) -> + gen_server:start_link({local, name(Id)}, ?MODULE, [Id], []). + +name(Id) -> + list_to_atom(lists:concat([?MODULE, "_", integer_to_list(Id)])). %%------------------------------------------------------------------------------ %% @doc Submit work to pooler @@ -64,9 +67,9 @@ async_submit(Fun) -> %%% gen_server callbacks %%%============================================================================= -init([I]) -> - gproc_pool:connect_worker(pooler, {pooler, I}), - {ok, #state{id = I}}. +init([Id]) -> + gproc_pool:connect_worker(pooler, {pooler, Id}), + {ok, #state{id = Id}}. handle_call({submit, Fun}, _From, State) -> {reply, run(Fun), State}; diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 840339819..9eab5fe14 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -38,7 +38,7 @@ -export([received/2, send/2, redeliver/2, shutdown/2]). --export([handle/2]). +-export([process/2]). %% Protocol State -record(proto_state, {peername, @@ -65,8 +65,8 @@ %%------------------------------------------------------------------------------ init(Peername, SendFun, Opts) -> - MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), - WsInitialHeaders = proplists:get_value(ws_initial_headers, Opts), + MaxLen = emqttd_opts:g(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), + WsInitialHeaders = emqttd_opts:g(ws_initial_headers, Opts), #proto_state{peername = Peername, sendfun = SendFun, max_clientid_len = MaxLen, @@ -130,7 +130,7 @@ session(#proto_state{session = Session}) -> %%A Client can only send the CONNECT Packet once over a Network Connection. -spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}. received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) -> - handle(Packet, State#proto_state{connected = true}); + process(Packet, State#proto_state{connected = true}); received(?PACKET(?CONNECT), State = #proto_state{connected = true}) -> {error, protocol_bad_connect, State}; @@ -143,19 +143,19 @@ received(Packet = ?PACKET(_Type), State) -> trace(recv, Packet, State), case validate_packet(Packet) of ok -> - handle(Packet, State); + process(Packet, State); {error, Reason} -> {error, Reason, State} end. -handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) -> +process(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) -> #mqtt_packet_connect{proto_ver = ProtoVer, proto_name = ProtoName, username = Username, password = Password, clean_sess = CleanSess, - keep_alive = KeepAlive, + keep_alive = KeepAlive, client_id = ClientId} = Var, State1 = State0#proto_state{proto_ver = ProtoVer, @@ -190,7 +190,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} exit({shutdown, Error}) end; {error, Reason}-> - lager:error("~s@~s: username '~s', login failed - ~s", + lager:error("~s@~s: username '~s' login failed for ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]), {?CONNACK_CREDENTIALS, State1} @@ -203,8 +203,8 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} %% Send connack send(?CONNACK_PACKET(ReturnCode1), State3); -handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), - State = #proto_state{client_id = ClientId}) -> +process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), + State = #proto_state{client_id = ClientId}) -> case check_acl(publish, Topic, State) of allow -> @@ -214,70 +214,76 @@ handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), end, {ok, State}; -handle(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) -> - emqttd_session:puback(Session, PacketId), - {ok, State}; +process(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) -> + emqttd_session:puback(Session, PacketId), {ok, State}; -handle(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) -> +process(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) -> emqttd_session:pubrec(Session, PacketId), send(?PUBREL_PACKET(PacketId), State); -handle(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) -> +process(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) -> emqttd_session:pubrel(Session, PacketId), send(?PUBACK_PACKET(?PUBCOMP, PacketId), State); -handle(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session}) -> - emqttd_session:pubcomp(Session, PacketId), - {ok, State}; +process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session})-> + emqttd_session:pubcomp(Session, PacketId), {ok, State}; %% protect from empty topic list -handle(?SUBSCRIBE_PACKET(PacketId, []), State) -> +process(?SUBSCRIBE_PACKET(PacketId, []), State) -> send(?SUBACK_PACKET(PacketId, []), State); -handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = ClientId, session = Session}) -> +process(?SUBSCRIBE_PACKET(PacketId, TopicTable), + State = #proto_state{client_id = ClientId, session = Session}) -> AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable], case lists:member(deny, AllowDenies) of true -> - %%TODO: return 128 QoS when deny... no need to SUBACK? - lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]); + lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), + send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State); false -> - Callback = fun(GrantedQos) -> send(?SUBACK_PACKET(PacketId, GrantedQos), State) end, - emqttd_session:subscribe(Session, TopicTable, Callback) - end, - {ok, State}; + AckFun = fun(GrantedQos) -> + send(?SUBACK_PACKET(PacketId, GrantedQos), State) + end, + emqttd_session:subscribe(Session, TopicTable, AckFun), {ok, State} + end; %% protect from empty topic list -handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> +process(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); -handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> +process(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> emqttd_session:unsubscribe(Session, Topics), send(?UNSUBACK_PACKET(PacketId), State); -handle(?PACKET(?PINGREQ), State) -> +process(?PACKET(?PINGREQ), State) -> send(?PACKET(?PINGRESP), State); -handle(?PACKET(?DISCONNECT), State) -> +process(?PACKET(?DISCONNECT), State) -> % clean willmsg {stop, normal, State#proto_state{will_msg = undefined}}. -publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{client_id = ClientId, session = Session}) -> - emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)); +publish(Packet = ?PUBLISH(?QOS_0, _PacketId), + #proto_state{client_id = ClientId, session = Session}) -> + Msg = emqttd_message:from_packet(ClientId, Packet), + emqttd_session:publish(Session, Msg); -publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{client_id = ClientId, session = Session}) -> - case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of +publish(Packet = ?PUBLISH(?QOS_1, PacketId), + State = #proto_state{client_id = ClientId, session = Session}) -> + Msg = emqttd_message:from_packet(ClientId, Packet), + case emqttd_session:publish(Session, Msg) of ok -> send(?PUBACK_PACKET(?PUBACK, PacketId), State); {error, Error} -> - lager:error("Client ~s: publish qos1 error - ~p", [ClientId, Error]) + lager:error("Client(~s): publish qos1 error - ~p", [ClientId, Error]) end; -publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = ClientId, session = Session}) -> - case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of +publish(Packet = ?PUBLISH(?QOS_2, PacketId), + State = #proto_state{client_id = ClientId, session = Session}) -> + Msg = emqttd_message:from_packet(ClientId, Packet), + case emqttd_session:publish(Session, Msg) of ok -> send(?PUBACK_PACKET(?PUBREC, PacketId), State); {error, Error} -> - lager:error("Client ~s: publish qos2 error - ~p", [ClientId, Error]) + lager:error("Client(~s): publish qos2 error - ~p", [ClientId, Error]) end. -spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}. @@ -285,9 +291,9 @@ send(Msg, State) when is_record(Msg, mqtt_message) -> send(emqttd_message:to_packet(Msg), State); send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) - when is_record(Packet, mqtt_packet) -> + when is_record(Packet, mqtt_packet) -> trace(send, Packet, State), - sent_stats(Packet), + emqttd_metrics:sent(Packet), Data = emqttd_serialiser:serialise(Packet), lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]), emqttd_metrics:inc('bytes/sent', size(Data)), @@ -370,28 +376,31 @@ validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_c true; %% MQTT3.1.1 allow null clientId. -validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}, _ProtoState) +validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, + client_id = ClientId}, _ProtoState) when size(ClientId) =:= 0 -> true; -validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}, _ProtoState) -> +validate_clientid(#mqtt_packet_connect{proto_ver = Ver, + clean_sess = CleanSess, + client_id = ClientId}, _ProtoState) -> lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]), false. -validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH}, +validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH}, variable = #mqtt_packet_publish{topic_name = Topic}}) -> case emqttd_topic:validate({name, Topic}) of - true -> ok; - false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic} + true -> ok; + false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic} end; -validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE}, +validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE}, variable = #mqtt_packet_subscribe{topic_table = Topics}}) -> validate_topics(filter, Topics); -validate_packet(#mqtt_packet{ header = #mqtt_packet_header{type = ?UNSUBSCRIBE}, - variable = #mqtt_packet_subscribe{topic_table = Topics}}) -> +validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE}, + variable = #mqtt_packet_subscribe{topic_table = Topics}}) -> validate_topics(filter, Topics); @@ -406,13 +415,16 @@ validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter -> ErrTopics = [Topic || {Topic, Qos} <- Topics, not (emqttd_topic:validate({Type, Topic}) and validate_qos(Qos))], case ErrTopics of - [] -> ok; - _ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic} + [] -> ok; + _ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic} end. -validate_qos(undefined) -> true; -validate_qos(Qos) when Qos =< ?QOS_2 -> true; -validate_qos(_) -> false. +validate_qos(undefined) -> + true; +validate_qos(Qos) when ?IS_QOS(Qos) -> + true; +validate_qos(_) -> + false. %% publish ACL is cached in process dictionary. check_acl(publish, Topic, State) -> @@ -428,20 +440,3 @@ check_acl(publish, Topic, State) -> check_acl(subscribe, Topic, State) -> emqttd_access_control:check_acl(client(State), subscribe, Topic). -sent_stats(?PACKET(Type)) -> - emqttd_metrics:inc('packets/sent'), - inc(Type). -inc(?CONNACK) -> - emqttd_metrics:inc('packets/connack'); -inc(?PUBLISH) -> - emqttd_metrics:inc('messages/sent'), - emqttd_metrics:inc('packets/publish/sent'); -inc(?SUBACK) -> - emqttd_metrics:inc('packets/suback'); -inc(?UNSUBACK) -> - emqttd_metrics:inc('packets/unsuback'); -inc(?PINGRESP) -> - emqttd_metrics:inc('packets/pingresp'); -inc(_) -> - ingore. - diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index a3387b6a3..509419335 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -54,7 +54,7 @@ start_link() -> init([]) -> mnesia:subscribe(system), - {ok, TRef} = timer:send_interval(1000, tick), + {ok, TRef} = timer:send_interval(timer:seconds(1), tick), StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}. diff --git a/src/emqttd_sysmon.erl b/src/emqttd_sysmon.erl index a86cad01e..1031ee478 100644 --- a/src/emqttd_sysmon.erl +++ b/src/emqttd_sysmon.erl @@ -36,7 +36,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {tref, events = []}). +-record(state, {tick_tref, events = []}). %%------------------------------------------------------------------------------ %% @doc Start system monitor @@ -53,8 +53,8 @@ start_link(Opts) -> init([Opts]) -> erlang:system_monitor(self(), parse_opt(Opts)), - {ok, TRef} = timer:send_interval(1000, reset), - {ok, #state{tref = TRef}}. + {ok, TRef} = timer:send_interval(timer:seconds(1), reset), + {ok, #state{tick_tref = TRef}}. parse_opt(Opts) -> parse_opt(Opts, []). @@ -134,8 +134,8 @@ handle_info(Info, State) -> lager:error("Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{tref = TRef}) -> - timer:cancel(TRef), ok. +terminate(_Reason, #state{tick_tref = TRef}) -> + timer:cancel(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index 7ea5c3ac7..8e2b22973 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% MQTT Topic +%%% MQTT Topic Functions %%% %%% @end %%%----------------------------------------------------------------------------- diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index fb83a4539..3d06e8432 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -101,18 +101,18 @@ ws_loop(Data, State = #wsocket_state{request = Req, Peer = Req:get(peer), lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]), case Parser(iolist_to_binary(Data)) of - {more, NewParser} -> - State#wsocket_state{parser = NewParser}; - {ok, Packet, Rest} -> - gen_server:cast(ClientPid, {received, Packet}), - ws_loop(Rest, reset_parser(State), ReplyChannel); - {error, Error} -> - lager:error("MQTT(WebSocket) detected framing error ~p for connection ~s", [Error, Peer]), - exit({shutdown, Error}) + {more, NewParser} -> + State#wsocket_state{parser = NewParser}; + {ok, Packet, Rest} -> + gen_server:cast(ClientPid, {received, Packet}), + ws_loop(Rest, reset_parser(State), ReplyChannel); + {error, Error} -> + lager:error("MQTT(WebSocket) frame error ~p for connection ~s", [Error, Peer]), + exit({shutdown, Error}) end. reset_parser(State = #wsocket_state{packet_opts = PktOpts}) -> - State#wsocket_state{parser = emqttd_parser:new (PktOpts)}. + State#wsocket_state{parser = emqttd_parser:new(PktOpts)}. %%%============================================================================= %%% gen_fsm callbacks @@ -124,7 +124,8 @@ init([WsPid, Req, ReplyChannel, PktOpts]) -> SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, Headers = mochiweb_request:get(headers, Req), HeadersList = mochiweb_headers:to_list(Headers), - ProtoState = emqttd_protocol:init(Peername, SendFun, [{ws_initial_headers, HeadersList}|PktOpts]), + ProtoState = emqttd_protocol:init(Peername, SendFun, + [{ws_initial_headers, HeadersList}|PktOpts]), {ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> @@ -149,15 +150,15 @@ handle_cast({unsubscribe, Topics}, State) -> handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) -> case emqttd_protocol:received(Packet, ProtoState) of - {ok, ProtoState1} -> - noreply(State#client_state{proto_state = ProtoState1}); - {error, Error} -> - lager:error("MQTT protocol error ~p", [Error]), - stop({shutdown, Error}, State); - {error, Error, ProtoState1} -> - stop({shutdown, Error}, State#client_state{proto_state = ProtoState1}); - {stop, Reason, ProtoState1} -> - stop(Reason, State#client_state{proto_state = ProtoState1}) + {ok, ProtoState1} -> + noreply(State#client_state{proto_state = ProtoState1}); + {error, Error} -> + lager:error("MQTT protocol error ~p", [Error]), + stop({shutdown, Error}, State); + {error, Error, ProtoState1} -> + stop({shutdown, Error}, State#client_state{proto_state = ProtoState1}); + {stop, Reason, ProtoState1} -> + stop(Reason, State#client_state{proto_state = ProtoState1}) end; handle_cast(_Msg, State) -> @@ -189,18 +190,18 @@ handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req} handle_info({keepalive, check}, State = #client_state{request = Req, keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of - {ok, KeepAlive1} -> - lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]), - noreply(State#client_state{keepalive = KeepAlive1}); - {error, timeout} -> - lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]), - stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined}); - {error, Error} -> - lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]), - stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined}) + {ok, KeepAlive1} -> + noreply(State#client_state{keepalive = KeepAlive1}); + {error, timeout} -> + lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]), + stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined}); + {error, Error} -> + lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]), + stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined}) end; -handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto_state = ProtoState}) -> +handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, + proto_state = ProtoState}) -> ClientId = emqttd_protocol:clientid(ProtoState), lager:warning("Websocket client ~s exit: reason=~p", [ClientId, Reason]), stop({shutdown, websocket_closed}, State); From 43523f2a1c18d3130b701890daefd945658f67df Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 13 Oct 2015 21:03:01 +0800 Subject: [PATCH 04/10] received/sent metrics --- src/emqttd_metrics.erl | 88 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 87 insertions(+), 1 deletion(-) diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index 707c2a9f9..cebe8b562 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -31,6 +31,8 @@ -include("emqttd.hrl"). +-include("emqttd_protocol.hrl"). + -behaviour(gen_server). -define(SERVER, ?MODULE). @@ -38,6 +40,9 @@ %% API Function Exports -export([start_link/0]). +%% Received/Sent Metrics +-export([received/1, sent/1]). + -export([all/0, value/1, inc/1, inc/2, inc/3, dec/2, dec/3, @@ -65,6 +70,14 @@ {counter, 'packets/connack'}, % CONNACK Packets sent {counter, 'packets/publish/received'}, % PUBLISH packets received {counter, 'packets/publish/sent'}, % PUBLISH packets sent + {counter, 'packets/puback/received'}, % PUBACK packets received + {counter, 'packets/puback/sent'}, % PUBACK packets sent + {counter, 'packets/pubrec/received'}, % PUBREC packets received + {counter, 'packets/pubrec/sent'}, % PUBREC packets sent + {counter, 'packets/pubrel/received'}, % PUBREL packets received + {counter, 'packets/pubrel/sent'}, % PUBREL packets sent + {counter, 'packets/pubcomp/received'}, % PUBCOMP packets received + {counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent {counter, 'packets/subscribe'}, % SUBSCRIBE Packets received {counter, 'packets/suback'}, % SUBACK packets sent {counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received @@ -78,6 +91,12 @@ -define(SYSTOP_MESSAGES, [ {counter, 'messages/received'}, % Messages received {counter, 'messages/sent'}, % Messages sent + {counter, 'messages/qos0/received'}, % Messages received + {counter, 'messages/qos0/sent'}, % Messages sent + {counter, 'messages/qos1/received'}, % Messages received + {counter, 'messages/qos1/sent'}, % Messages sent + {counter, 'messages/qos2/received'}, % Messages received + {counter, 'messages/qos2/sent'}, % Messages sent {gauge, 'messages/retained'}, % Messagea retained {counter, 'messages/dropped'} % Messages dropped ]). @@ -94,6 +113,73 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +received(Packet = ?PACKET(Type)) -> + inc('packets/received'), + received(Type, Packet). +received(?CONNECT, _Packet) -> + inc('packets/connect'); +received(?PUBLISH, ?PUBLISH(Qos, _PktId)) -> + inc('packets/publish/received'), + inc('messages/received'), + qos_received(Qos); +received(?PUBACK, _Packet) -> + inc('packets/puback/received'); +received(?PUBREC, _Packet) -> + inc('packets/pubrec/received'); +received(?PUBREL, _Packet) -> + inc('packets/pubrel/received'); +received(?PUBCOMP, _Packet) -> + inc('packets/pubcomp/received'); +received(?SUBSCRIBE, _Packet) -> + inc('packets/subscribe'); +received(?UNSUBSCRIBE, _Packet) -> + inc('packets/unsubscribe'); +received(?PINGREQ, _Packet) -> + inc('packets/pingreq'); +received(?DISCONNECT, _Packet) -> + inc('packets/disconnect'); +received(_, _) -> ignore. + +qos_received(?QOS_0) -> + inc('messages/qos0/received'); +qos_received(?QOS_1) -> + inc('messages/qos1/received'); +qos_received(?QOS_2) -> + inc('messages/qos2/received'). + +sent(Packet = ?PACKET(Type)) -> + emqttd_metrics:inc('packets/sent'), + sent(Type, Packet). +sent(?CONNACK, _Packet) -> + inc('packets/connack'); +sent(?PUBLISH, ?PUBLISH(Qos, _PktId)) -> + inc('packets/publish/sent'), + inc('messages/sent'), + qos_sent(Qos); +sent(?PUBACK, _Packet) -> + inc('packets/puback/sent'); +sent(?PUBREC, _Packet) -> + inc('packets/pubrec/sent'); +sent(?PUBREL, _Packet) -> + inc('packets/pubrel/sent'); +sent(?PUBCOMP, _Packet) -> + inc('packets/pubcomp/sent'); +sent(?SUBACK, _Packet) -> + inc('packets/suback'); +sent(?UNSUBACK, _Packet) -> + inc('packets/unsuback'); +sent(?PINGRESP, _Packet) -> + inc('packets/pingresp'); +sent(_Type, _Packet) -> + ingore. + +qos_sent(?QOS_0) -> + inc('messages/qos0/sent'); +qos_sent(?QOS_1) -> + inc('messages/qos1/sent'); +qos_sent(?QOS_2) -> + inc('messages/qos2/sent'). + %%------------------------------------------------------------------------------ %% @doc Get all metrics %% @end @@ -198,7 +284,7 @@ init([]) -> {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. handle_call(_Req, _From, State) -> - {reply, error, State}. + {reply, error, State}. handle_cast(_Msg, State) -> {noreply, State}. From 5daeac083c3178d480464e83b1b4befffffbcf9c Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 13 Oct 2015 21:03:25 +0800 Subject: [PATCH 05/10] spec --- src/emqttd_keepalive.erl | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/emqttd_keepalive.erl b/src/emqttd_keepalive.erl index e06382207..6b9042d13 100644 --- a/src/emqttd_keepalive.erl +++ b/src/emqttd_keepalive.erl @@ -34,10 +34,13 @@ tsec, tmsg, tref, repeat = 0}). +-type keepalive() :: #keepalive{}. + %%------------------------------------------------------------------------------ %% @doc Start a keepalive %% @end %%------------------------------------------------------------------------------ +-spec start(fun(), integer(), any()) -> undefined | keepalive(). start(_, 0, _) -> undefined; start(StatFun, TimeoutSec, TimeoutMsg) -> @@ -50,6 +53,7 @@ start(StatFun, TimeoutSec, TimeoutMsg) -> %% @doc Check keepalive, called when timeout. %% @end %%------------------------------------------------------------------------------ +-spec check(keepalive()) -> {ok, keepalive()} | {error, any()}. check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) -> case StatFun() of {ok, NewVal} -> @@ -71,6 +75,7 @@ resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) -> %% @doc Cancel Keepalive %% @end %%------------------------------------------------------------------------------ +-spec cancel(keepalive()) -> ok. cancel(#keepalive{tref = TRef}) -> cancel(TRef); cancel(undefined) -> From 6a66ca90b19a0ab6ce50682bc26e3b9da6bdbc78 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 13 Oct 2015 21:04:15 +0800 Subject: [PATCH 06/10] concat --- src/emqttd_dist.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/emqttd_dist.erl b/src/emqttd_dist.erl index 13acedda4..25e977660 100644 --- a/src/emqttd_dist.erl +++ b/src/emqttd_dist.erl @@ -27,6 +27,8 @@ -module(emqttd_dist). +-import(lists, [concat/1]). + -export([parse_node/1]). parse_node(Name) when is_list(Name) -> @@ -40,10 +42,10 @@ parse_node(Name) when is_list(Name) -> with_domain(Name) -> case net_kernel:longnames() of true -> - Name ++ "@" ++ inet_db:gethostname() ++ - "." ++ inet_db:res_option(domain); + concat([Name, "@", inet_db:gethostname(), + ".", inet_db:res_option(domain)]); false -> - Name ++ "@" ++ inet_db:gethostname(); + concat([Name, "@", inet_db:gethostname()]); _ -> Name end. From ebf203a9311e285b14379a252e1848e1bbf5bb3c Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 13 Oct 2015 21:05:11 +0800 Subject: [PATCH 07/10] refactor metrics --- src/emqttd_client.erl | 48 ++++++++++++++----------------------------- 1 file changed, 15 insertions(+), 33 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 5acb9e5d3..ddddccbda 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -110,7 +110,7 @@ handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; handle_call(Req, _From, State = #state{peername = Peername}) -> - lager:error("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), + lager:error("Client(~s): unexpected request - ~p", [emqttd_net:format(Peername), Req]), {reply, {error, unsupported_request}, State}. handle_cast({subscribe, TopicTable}, State) -> @@ -120,7 +120,7 @@ handle_cast({unsubscribe, Topics}, State) -> with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); handle_cast(Msg, State = #state{peername = Peername}) -> - lager:error("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), + lager:error("Client(~s): unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), {noreply, State}. handle_info(timeout, State) -> @@ -152,11 +152,12 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peername}) -> - lager:error("Client ~s: unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]), + lager:error("Client(~s): unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]), {noreply, State}; handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) -> - lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]), + lager:debug("Client(~s): Start KeepAlive with ~p seconds", + [emqttd_net:format(Peername), TimeoutSec]), StatFun = fun() -> case Transport:getstat(Socket, [recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; @@ -169,13 +170,12 @@ handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport handle_info({keepalive, check}, State = #state{peername = Peername, keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> - lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), noreply(State#state{keepalive = KeepAlive1}); {error, timeout} -> - lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]), + lager:debug("Client(~s): Keepalive Timeout!", [emqttd_net:format(Peername)]), stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); {error, Error} -> - lager:debug("Client ~s: Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]), + lager:debug("Client(~s): Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]), stop({shutdown, keepalive_error}, State#state{keepalive = undefined}) end; @@ -183,10 +183,10 @@ handle_info(Info, State = #state{peername = Peername}) -> lager:error("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]), {noreply, State}. -terminate(Reason, #state{peername = Peername, - transport = Transport, - socket = Socket, - keepalive = KeepAlive, +terminate(Reason, #state{peername = Peername, + transport = Transport, + socket = Socket, + keepalive = KeepAlive, proto_state = ProtoState}) -> lager:info("Client(~s) terminated, reason: ~p", [emqttd_net:format(Peername), Reason]), emqttd_keepalive:cancel(KeepAlive), @@ -228,9 +228,9 @@ received(Bytes, State = #state{packet_opts = PacketOpts, conn_name = ConnStr}) -> case Parser(Bytes) of {more, NewParser} -> - {noreply, control_throttle(State #state{parser = NewParser}), hibernate}; + noreply(control_throttle(State#state{parser = NewParser})); {ok, Packet, Rest} -> - received_stats(Packet), + emqttd_metrics:received(Packet), case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> received(Rest, State#state{parser = emqttd_parser:new(PacketOpts), @@ -244,12 +244,12 @@ received(Bytes, State = #state{packet_opts = PacketOpts, stop(Reason, State#state{proto_state = ProtoState1}) end; {error, Error} -> - lager:error("MQTT detected framing error ~p for connection ~p", [Error, ConnStr]), + lager:error("MQTT framing error ~p for connection ~p", [Error, ConnStr]), stop({shutdown, Error}, State) end. network_error(Reason, State = #state{peername = Peername}) -> - lager:warning("Client ~s: MQTT detected network error '~p'", + lager:warning("Client(~s): MQTT detected network error '~p'", [emqttd_net:format(Peername), Reason]), stop({shutdown, conn_closed}, State). @@ -269,21 +269,3 @@ control_throttle(State = #state{conn_state = Flow, {_, _} -> run_socket(State) end. -received_stats(?PACKET(Type)) -> - emqttd_metrics:inc('packets/received'), inc(Type). -inc(?CONNECT) -> - emqttd_metrics:inc('packets/connect'); -inc(?PUBLISH) -> - emqttd_metrics:inc('messages/received'), - emqttd_metrics:inc('packets/publish/received'); -inc(?SUBSCRIBE) -> - emqttd_metrics:inc('packets/subscribe'); -inc(?UNSUBSCRIBE) -> - emqttd_metrics:inc('packets/unsubscribe'); -inc(?PINGREQ) -> - emqttd_metrics:inc('packets/pingreq'); -inc(?DISCONNECT) -> - emqttd_metrics:inc('packets/disconnect'); -inc(_) -> - ignore. - From 05ff1ab002b53b563d59f27919d2b6730246c873 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 13 Oct 2015 21:08:56 +0800 Subject: [PATCH 08/10] gen_event --- src/emqttd_alarm.erl | 60 ++++++++++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/src/emqttd_alarm.erl b/src/emqttd_alarm.erl index f3a25ff68..fc3ce729e 100644 --- a/src/emqttd_alarm.erl +++ b/src/emqttd_alarm.erl @@ -27,25 +27,35 @@ -module(emqttd_alarm). +-author("Feng Lee "). + -include("emqttd.hrl"). +-behaviour(gen_event). + +-define(ALARM_MGR, ?MODULE). + +%% API Function Exports -export([start_link/0, alarm_fun/0, get_alarms/0, set_alarm/1, clear_alarm/1, add_alarm_handler/1, add_alarm_handler/2, delete_alarm_handler/1]). +%% gen_event callbacks -export([init/1, handle_event/2, handle_call/2, handle_info/2, - terminate/2]). + terminate/2, code_change/3]). --define(SERVER, ?MODULE). +%%%============================================================================= +%%% API +%%%============================================================================= start_link() -> - case gen_event:start_link({local, ?SERVER}) of - {ok, Pid} -> - gen_event:add_handler(?SERVER, ?MODULE, []), - {ok, Pid}; - Error -> - Error + start_with(fun(Pid) -> gen_event:add_handler(Pid, ?MODULE, []) end). + +start_with(Fun) -> + case gen_event:start_link({local, ?ALARM_MGR}) of + {ok, Pid} -> Fun(Pid), {ok, Pid}; + Error -> Error end. alarm_fun() -> @@ -60,34 +70,36 @@ alarm_fun(Bool) -> -spec set_alarm(mqtt_alarm()) -> ok. set_alarm(Alarm) when is_record(Alarm, mqtt_alarm) -> - gen_event:notify(?SERVER, {set_alarm, Alarm}). + gen_event:notify(?ALARM_MGR, {set_alarm, Alarm}). -spec clear_alarm(any()) -> ok. clear_alarm(AlarmId) when is_binary(AlarmId) -> - gen_event:notify(?SERVER, {clear_alarm, AlarmId}). + gen_event:notify(?ALARM_MGR, {clear_alarm, AlarmId}). +-spec get_alarms() -> list(mqtt_alarm()). get_alarms() -> - gen_event:call(?SERVER, ?MODULE, get_alarms). + gen_event:call(?ALARM_MGR, ?MODULE, get_alarms). add_alarm_handler(Module) when is_atom(Module) -> - gen_event:add_handler(?SERVER, Module, []). + gen_event:add_handler(?ALARM_MGR, Module, []). add_alarm_handler(Module, Args) when is_atom(Module) -> - gen_event:add_handler(?SERVER, Module, Args). + gen_event:add_handler(?ALARM_MGR, Module, Args). delete_alarm_handler(Module) when is_atom(Module) -> - gen_event:delete_handler(?SERVER, Module, []). + gen_event:delete_handler(?ALARM_MGR, Module, []). + +%%%============================================================================= +%%% Default Alarm handler +%%%============================================================================= -%%----------------------------------------------------------------- -%% Default Alarm handler -%%----------------------------------------------------------------- init(_) -> {ok, []}. -handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId, +handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId, severity = Severity, - title = Title, - summary = Summary}}, Alarms)-> + title = Title, + summary = Summary}}, Alarms)-> Timestamp = os:timestamp(), Json = mochijson2:encode([{id, AlarmId}, {severity, Severity}, @@ -120,6 +132,13 @@ terminate(swap, Alarms) -> terminate(_, _) -> ok. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + alarm_msg(Type, AlarmId, Json) -> Msg = emqttd_message:make(alarm, topic(Type, AlarmId), @@ -132,4 +151,3 @@ topic(alert, AlarmId) -> topic(clear, AlarmId) -> emqttd_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>). - From a16e5279758f5697d7bd22ea225bb5b5c4a95026 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 13 Oct 2015 21:09:13 +0800 Subject: [PATCH 09/10] 0.12.1 refactor --- src/emqttd_acl_internal.erl | 1 + src/emqttd_app.erl | 56 +++++++++++++++++------------------ src/emqttd_auth_anonymous.erl | 2 +- src/emqttd_auth_clientid.erl | 15 ++++++---- src/emqttd_auth_ldap.erl | 4 +-- src/emqttd_auth_mod.erl | 6 ++-- src/emqttd_auth_username.erl | 34 ++++++++++++++++----- src/emqttd_bridge.erl | 12 ++++---- src/emqttd_bridge_sup.erl | 10 +++---- src/emqttd_broker.erl | 11 +++---- src/emqttd_cli.erl | 8 +++-- src/emqttd_ctl.erl | 7 +++-- src/emqttd_gen_mod.erl | 4 +-- src/emqttd_http.erl | 15 +++++----- src/emqttd_log.erl | 1 + src/emqttd_mnesia.erl | 2 +- src/emqttd_mod_presence.erl | 6 ++-- src/emqttd_mod_rewrite.erl | 16 +++++----- 18 files changed, 122 insertions(+), 88 deletions(-) diff --git a/src/emqttd_acl_internal.erl b/src/emqttd_acl_internal.erl index b6dc0b81b..c2ce3184c 100644 --- a/src/emqttd_acl_internal.erl +++ b/src/emqttd_acl_internal.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_acl_internal). -author("Feng Lee "). diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 366df60cb..fa7799904 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -29,15 +29,13 @@ -author("Feng Lee "). +-include("emqttd_cli.hrl"). + -behaviour(application). %% Application callbacks -export([start/2, stop/1]). --define(PRINT_MSG(Msg), io:format(Msg)). - --define(PRINT(Format, Args), io:format(Format, Args)). - %%%============================================================================= %%% Application callbacks %%%============================================================================= @@ -106,35 +104,35 @@ start_server(Sup, {Name, Server, Opts}) -> start_child(Sup, Server, Opts), ?PRINT_MSG("[done]~n"). -start_child(Sup, {supervisor, Name}) -> - supervisor:start_child(Sup, supervisor_spec(Name)); -start_child(Sup, Name) when is_atom(Name) -> - {ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name)). +start_child(Sup, {supervisor, Module}) -> + supervisor:start_child(Sup, supervisor_spec(Module)); -start_child(Sup, {supervisor, Name}, Opts) -> - supervisor:start_child(Sup, supervisor_spec(Name, Opts)); -start_child(Sup, Name, Opts) when is_atom(Name) -> - {ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name, Opts)). +start_child(Sup, Module) when is_atom(Module) -> + {ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Module)). -%%TODO: refactor... -supervisor_spec(Name) -> - {Name, - {Name, start_link, []}, - permanent, infinity, supervisor, [Name]}. +start_child(Sup, {supervisor, Module}, Opts) -> + supervisor:start_child(Sup, supervisor_spec(Module, Opts)); -supervisor_spec(Name, Opts) -> - {Name, - {Name, start_link, [Opts]}, - permanent, infinity, supervisor, [Name]}. +start_child(Sup, Module, Opts) when is_atom(Module) -> + supervisor:start_child(Sup, worker_spec(Module, Opts)). -worker_spec(Name) -> - {Name, - {Name, start_link, []}, - permanent, 10000, worker, [Name]}. -worker_spec(Name, Opts) -> - {Name, - {Name, start_link, [Opts]}, - permanent, 10000, worker, [Name]}. +supervisor_spec(Module) when is_atom(Module) -> + supervisor_spec(Module, start_link, []). + +supervisor_spec(Module, Opts) -> + supervisor_spec(Module, start_link, [Opts]). + +supervisor_spec(M, F, A) -> + {M, {M, F, A}, permanent, infinity, supervisor, [M]}. + +worker_spec(Module) when is_atom(Module) -> + worker_spec(Module, start_link, []). + +worker_spec(Module, Opts) when is_atom(Module) -> + worker_spec(Module, start_link, [Opts]). + +worker_spec(M, F, A) -> + {M, {M, F, A}, permanent, 10000, worker, [M]}. -spec stop(State :: term()) -> term(). stop(_State) -> diff --git a/src/emqttd_auth_anonymous.erl b/src/emqttd_auth_anonymous.erl index e410d3459..95592c22b 100644 --- a/src/emqttd_auth_anonymous.erl +++ b/src/emqttd_auth_anonymous.erl @@ -36,5 +36,5 @@ init(Opts) -> {ok, Opts}. check(_Client, _Password, _Opts) -> ok. -description() -> "Anonymous authentication module". +description() -> "Anonymous Authentication Module". diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index 53cc5496f..0d28b8421 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% ClientId authentication module. +%%% ClientId Authentication Module. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -51,22 +51,25 @@ %% @doc Add clientid %% @end %%------------------------------------------------------------------------------ +-spec add_clientid(binary()) -> {atomic, ok} | {aborted, any()}. add_clientid(ClientId) when is_binary(ClientId) -> R = #mqtt_auth_clientid{client_id = ClientId}, - mnesia:transaction(fun() -> mnesia:write(R) end). + mnesia:transaction(fun mnesia:write/1, [R]). %%------------------------------------------------------------------------------ %% @doc Add clientid with password %% @end %%------------------------------------------------------------------------------ +-spec add_clientid(binary(), binary()) -> {atomic, ok} | {aborted, any()}. add_clientid(ClientId, Password) -> R = #mqtt_auth_clientid{client_id = ClientId, password = Password}, - mnesia:transaction(fun() -> mnesia:write(R) end). + mnesia:transaction(fun mnesia:write/1, [R]). %%------------------------------------------------------------------------------ %% @doc Lookup clientid %% @end %%------------------------------------------------------------------------------ +-spec lookup_clientid(binary()) -> list(). lookup_clientid(ClientId) -> mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId). @@ -74,6 +77,7 @@ lookup_clientid(ClientId) -> %% @doc Lookup all clientids %% @end %%------------------------------------------------------------------------------ +-spec all_clientids() -> list(binary()). all_clientids() -> mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB). @@ -81,8 +85,9 @@ all_clientids() -> %% @doc Remove clientid %% @end %%------------------------------------------------------------------------------ +-spec remove_clientid(binary()) -> {atomic, ok} | {aborted, any()}. remove_clientid(ClientId) -> - mnesia:transaction(fun() -> mnesia:delete({?AUTH_CLIENTID_TAB, ClientId}) end). + mnesia:transaction(fun mnesia:delete/1, [{?AUTH_CLIENTID_TAB, ClientId}]). %%%============================================================================= %%% emqttd_auth_mod callbacks @@ -95,7 +100,7 @@ init(Opts) -> mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies), case proplists:get_value(file, Opts) of undefined -> ok; - File -> load(File) + File -> load(File) end, {ok, Opts}. diff --git a/src/emqttd_auth_ldap.erl b/src/emqttd_auth_ldap.erl index 965aaeefe..09ca89142 100644 --- a/src/emqttd_auth_ldap.erl +++ b/src/emqttd_auth_ldap.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% LDAP Authentication Module. +%%% LDAP Authentication Module %%% %%% @end %%%----------------------------------------------------------------------------- @@ -28,7 +28,7 @@ -author("Feng Lee "). --include_lib("emqttd/include/emqttd.hrl"). +-include("emqttd.hrl"). -import(proplists, [get_value/2, get_value/3]). diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index a6298a68b..86e08df4b 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_auth_mod). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). @@ -50,9 +50,9 @@ -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{init, 1}, {check, 3}, {description, 0}]; + [{init, 1}, {check, 3}, {description, 0}]; behaviour_info(_Other) -> - undefined. + undefined. -endif. diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index 39fa08a6a..392d32961 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -20,13 +20,13 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd authentication with username and password. +%%% Authentication with username and password. %%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_auth_username). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). @@ -65,16 +65,36 @@ cli(_) -> %%% API %%%============================================================================= +%%------------------------------------------------------------------------------ +%% @doc Add user +%% @end +%%------------------------------------------------------------------------------ +-spec add_user(binary(), binary()) -> {atomic, ok} | {aborted, any()}. add_user(Username, Password) -> - R = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)}, - mnesia:transaction(fun() -> mnesia:write(R) end). + User = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)}, + mnesia:transaction(fun mnesia:write/1, [User]). +%%------------------------------------------------------------------------------ +%% @doc Lookup user by username +%% @end +%%------------------------------------------------------------------------------ +-spec lookup_user(binary()) -> list(). lookup_user(Username) -> mnesia:dirty_read(?AUTH_USERNAME_TAB, Username). +%%------------------------------------------------------------------------------ +%% @doc Remove user +%% @end +%%------------------------------------------------------------------------------ +-spec remove_user(binary()) -> {atomic, ok} | {aborted, any()}. remove_user(Username) -> - mnesia:transaction(fun() -> mnesia:delete({?AUTH_USERNAME_TAB, Username}) end). + mnesia:transaction(fun mnesia:delete/1, [{?AUTH_USERNAME_TAB, Username}]). +%%------------------------------------------------------------------------------ +%% @doc All usernames +%% @end +%%------------------------------------------------------------------------------ +-spec all_users() -> list(). all_users() -> mnesia:dirty_all_keys(?AUTH_USERNAME_TAB). @@ -104,7 +124,8 @@ check(#mqtt_client{username = Username}, Password, _Opts) -> end end. -description() -> "Username password authentication module". +description() -> + "Username password authentication module". %%%============================================================================= %%% Internal functions @@ -123,4 +144,3 @@ salt() -> Salt = random:uniform(16#ffffffff), <>. - diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index dfb68add3..cb6349803 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -144,12 +144,12 @@ handle_info({nodeup, Node}, State = #state{node = Node}) -> handle_info(ping_down_node, State = #state{node = Node, ping_down_interval = Interval}) -> Self = self(), spawn_link(fun() -> - case net_kernel:connect_node(Node) of - true -> %%TODO: this is not right... fixme later - Self ! {nodeup, Node}; - false -> - erlang:send_after(Interval, Self, ping_down_node) - end + case net_kernel:connect_node(Node) of + true -> %%TODO: this is not right... fixme later + Self ! {nodeup, Node}; + false -> + erlang:send_after(Interval, Self, ping_down_node) + end end), {noreply, State}; diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index 132eca200..c780a6f6b 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -38,12 +38,6 @@ -export([init/1]). -%%%============================================================================= -%%% CLI -%%%============================================================================= - - - %%%============================================================================= %%% API %%%============================================================================= @@ -55,6 +49,10 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). +%%------------------------------------------------------------------------------ +%% @doc List all bridges +%% @end +%%------------------------------------------------------------------------------ -spec bridges() -> [{tuple(), pid()}]. bridges() -> [{{Node, SubTopic}, Pid} || {{bridge, Node, SubTopic}, Pid, worker, _} diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 19dd6237a..88a3713cc 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -84,6 +84,7 @@ start_link() -> %% @doc Get running nodes %% @end %%------------------------------------------------------------------------------ +-spec running_nodes() -> list(node()). running_nodes() -> mnesia:system_info(running_db_nodes). @@ -109,7 +110,7 @@ notify(EventType, Event) -> %% @end %%------------------------------------------------------------------------------ env(Name) -> - proplists:get_value(Name, application:get_env(emqttd, broker, [])). + proplists:get_value(Name, emqttd:env(broker)). %%------------------------------------------------------------------------------ %% @doc Get broker version @@ -152,7 +153,7 @@ datetime() -> %%------------------------------------------------------------------------------ -spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}. hook(Hook, Name, MFA) -> - gen_server:call(?MODULE, {hook, Hook, Name, MFA}). + gen_server:call(?SERVER, {hook, Hook, Name, MFA}). %%------------------------------------------------------------------------------ %% @doc Unhook @@ -160,7 +161,7 @@ hook(Hook, Name, MFA) -> %%------------------------------------------------------------------------------ -spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}. unhook(Hook, Name) -> - gen_server:call(?MODULE, {unhook, Hook, Name}). + gen_server:call(?SERVER, {unhook, Hook, Name}). %%------------------------------------------------------------------------------ %% @doc Foreach hooks @@ -266,13 +267,13 @@ handle_cast(_Msg, State) -> handle_info(heartbeat, State) -> publish(uptime, list_to_binary(uptime(State))), publish(datetime, list_to_binary(datetime())), - {noreply, State, hibernate}; + {noreply, State}; handle_info(tick, State) -> retain(brokers), retain(version, list_to_binary(version())), retain(sysdescr, list_to_binary(sysdescr())), - {noreply, State, hibernate}; + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 72f0e580b..fd70ce4f6 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -223,8 +223,10 @@ plugins(["list"]) -> plugins(["load", Name]) -> case emqttd_plugins:load(list_to_atom(Name)) of - {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); - {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) + {ok, StartedApps} -> + ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); + {error, Reason} -> + ?PRINT("load plugin error: ~p~n", [Reason]) end; plugins(["unload", Name]) -> @@ -236,7 +238,7 @@ plugins(["unload", Name]) -> end; plugins(_) -> - ?USAGE([{"plugins list", "query loaded plugins"}, + ?USAGE([{"plugins list", "show loaded plugins"}, {"plugins load ", "load plugin"}, {"plugins unload ", "unload plugin"}]). diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index b02bab7ad..5d34bd449 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -139,7 +139,10 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal Function Definitions %%%============================================================================= -noreply(State) -> {noreply, State, hibernate}. +noreply(State) -> + {noreply, State, hibernate}. + +next_seq(State = #state{seq = Seq}) -> + State#state{seq = Seq + 1}. -next_seq(State = #state{seq = Seq}) -> State#state{seq = Seq + 1}. diff --git a/src/emqttd_gen_mod.erl b/src/emqttd_gen_mod.erl index 190971d0e..682b9ab05 100644 --- a/src/emqttd_gen_mod.erl +++ b/src/emqttd_gen_mod.erl @@ -41,9 +41,9 @@ -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{load, 1}, {unload, 1}]; + [{load, 1}, {unload, 1}]; behaviour_info(_Other) -> - undefined. + undefined. -endif. diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 946ad0b65..7f4251cd9 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -30,6 +30,7 @@ -author("Feng Lee "). -include("emqttd.hrl"). + -include("emqttd_protocol.hrl"). -import(proplists, [get_value/2, get_value/3]). @@ -46,7 +47,7 @@ handle_request('GET', "/mqtt/status", Req) -> false -> not_running; {value, _Ver} -> running end, - Status = io_lib:format("Node ~s is ~s~nemqttd is ~s~n", + Status = io_lib:format("Node ~s is ~s~nemqttd is ~s", [node(), InternalStatus, AppStatus]), Req:ok({"text/plain", iolist_to_binary(Status)}); @@ -59,15 +60,15 @@ handle_request('POST', "/mqtt/publish", Req) -> case authorized(Req) of true -> ClientId = get_value("client", Params, http), - Qos = int(get_value("qos", Params, "0")), - Retain = bool(get_value("retain", Params, "0")), - Topic = list_to_binary(get_value("topic", Params)), - Payload = list_to_binary(get_value("message", Params)), + Qos = int(get_value("qos", Params, "0")), + Retain = bool(get_value("retain", Params, "0")), + Topic = list_to_binary(get_value("topic", Params)), + Payload = list_to_binary(get_value("message", Params)), case {validate(qos, Qos), validate(topic, Topic)} of {true, true} -> Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), emqttd_pubsub:publish(Msg#mqtt_message{retain = Retain}), - Req:ok({"text/plain", <<"ok\n">>}); + Req:ok({"text/plain", <<"ok">>}); {false, _} -> Req:respond({400, [], <<"Bad QoS">>}); {_, false} -> @@ -83,7 +84,7 @@ handle_request('POST', "/mqtt/publish", Req) -> handle_request('GET', "/mqtt", Req) -> lager:info("Websocket Connection from: ~s", [Req:get(peer)]), Upgrade = Req:get_header_value("Upgrade"), - Proto = Req:get_header_value("Sec-WebSocket-Protocol"), + Proto = Req:get_header_value("Sec-WebSocket-Protocol"), case {is_websocket(Upgrade), Proto} of {true, "mqtt" ++ _Vsn} -> emqttd_ws_client:start_link(Req); diff --git a/src/emqttd_log.erl b/src/emqttd_log.erl index 96522ae9d..4be2f2999 100644 --- a/src/emqttd_log.erl +++ b/src/emqttd_log.erl @@ -30,3 +30,4 @@ -module(emqttd_log). +%%TODO: Hooks to log??? diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 355f3d547..3f94071ed 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -27,7 +27,7 @@ -module(emqttd_mnesia). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index b534695e0..da3a05c00 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -38,8 +38,10 @@ -export([client_connected/3, client_disconnected/3]). load(Opts) -> - emqttd_broker:hook('client.connected', {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}), - emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}), + emqttd_broker:hook('client.connected', {?MODULE, client_connected}, + {?MODULE, client_connected, [Opts]}), + emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected}, + {?MODULE, client_disconnected, [Opts]}), {ok, Opts}. client_connected(ConnAck, #mqtt_client{client_id = ClientId, diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index b15a0c700..09357211c 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -46,11 +46,11 @@ load(Opts) -> {ok, Terms} = file:consult(File), Sections = compile(Terms), emqttd_broker:hook('client.subscribe', {?MODULE, rewrite_subscribe}, - {?MODULE, rewrite, [subscribe, Sections]}), + {?MODULE, rewrite, [subscribe, Sections]}), emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}, - {?MODULE, rewrite, [unsubscribe, Sections]}), + {?MODULE, rewrite, [unsubscribe, Sections]}), emqttd_broker:hook('message.publish', {?MODULE, rewrite_publish}, - {?MODULE, rewrite, [publish, Sections]}). + {?MODULE, rewrite, [publish, Sections]}). rewrite(_ClientId, TopicTable, subscribe, Sections) -> lager:info("rewrite subscribe: ~p", [TopicTable]), @@ -83,9 +83,9 @@ reload(File) -> end. unload(_) -> - emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}), - emqttd_broker:unhook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), - emqttd_broker:unhook('message.publish', {?MODULE, rewrite_publish}). + emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}), + emqttd_broker:unhook('client.unsubscribe',{?MODULE, rewrite_unsubscribe}), + emqttd_broker:unhook('message.publish', {?MODULE, rewrite_publish}). %%%============================================================================= %%% Internal functions @@ -116,7 +116,8 @@ match_rule(Topic, []) -> match_rule(Topic, [{rewrite, MP, Dest} | Rules]) -> case re:run(Topic, MP, [{capture, all_but_first, list}]) of {match, Captured} -> - Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured), + Vars = lists:zip(["\\$" ++ integer_to_list(I) + || I <- lists:seq(1, length(Captured))], Captured), iolist_to_binary(lists:foldl( fun({Var, Val}, Acc) -> re:replace(Acc, Var, Val, [global]) @@ -124,3 +125,4 @@ match_rule(Topic, [{rewrite, MP, Dest} | Rules]) -> nomatch -> match_rule(Topic, Rules) end. + From 3dd42b65cff6cd82ada6c836c0344a93420f28d6 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 13 Oct 2015 23:03:02 +0800 Subject: [PATCH 10/10] /status --- src/emqttd_http.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 7f4251cd9..48724e1e0 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -40,7 +40,7 @@ handle_request(Req) -> handle_request(Req:get(method), Req:get(path), Req). -handle_request('GET', "/mqtt/status", Req) -> +handle_request('GET', "/status", Req) -> {InternalStatus, _ProvidedStatus} = init:get_status(), AppStatus = case lists:keysearch(emqttd, 1, application:which_applications()) of