diff --git a/TODO b/TODO index 8a9926973..702b0e072 100644 --- a/TODO +++ b/TODO @@ -12,3 +12,5 @@ TODO 5. dashboard TODO 6. emqttd_ctl TODO 7. transaction on route, and topic? + +TODO 8. topics, subscriptions CLI diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 3abd3b39b..95b8148f1 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -8,7 +8,8 @@ {applications, [kernel, stdlib, gproc, - esockd]}, + esockd, + mochiweb]}, {mod, {emqttd_app, []}}, {env, []} ]}. diff --git a/src/emqttd.erl b/src/emqttd.erl index ba44d790d..c76fff74f 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -48,7 +48,7 @@ %%------------------------------------------------------------------------------ -spec start() -> ok | {error, any()}. start() -> - application:start(emqttd). + application:start(?APP). %%------------------------------------------------------------------------------ %% @doc Get environment @@ -56,7 +56,7 @@ start() -> %%------------------------------------------------------------------------------ -spec env(atom()) -> list(). env(Group) -> - application:get_env(emqttd, Group, []). + application:get_env(?APP, Group, []). -spec env(atom(), atom()) -> undefined | any(). env(Group, Name) -> @@ -111,11 +111,14 @@ stop_listener({Protocol, Port, _Options}) -> esockd:close({Protocol, Port}). load_all_mods() -> - lists:foreach(fun({Name, Opts}) -> - Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), - Mod:load(Opts), - lager:info("load module ~s successfully", [Name]) - end, env(modules)). + lists:foreach(fun load_mod/1, env(modules)). + +load_mod({Name, Opts}) -> + Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), + case catch Mod:load(Opts) of + {ok, _State} -> lager:info("load module ~s successfully", [Name]); + {'EXIT', Reason} -> lager:error("load module ~s error: ~p", [Name, Reason]) + end. is_mod_enabled(Name) -> env(modules, Name) =/= undefined. @@ -125,7 +128,7 @@ is_mod_enabled(Name) -> %% @end %%------------------------------------------------------------------------------ is_running(Node) -> - case rpc:call(Node, erlang, whereis, [emqttd]) of + case rpc:call(Node, erlang, whereis, [?APP]) of {badrpc, _} -> false; undefined -> false; Pid when is_pid(Pid) -> true diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index eb385968c..fdc5b7fba 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -59,9 +59,9 @@ start_link() -> start_link(emqttd:env(access)). --spec start_link(AcOpts :: list()) -> {ok, pid()} | ignore | {error, any()}. -start_link(AcOpts) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [AcOpts], []). +-spec start_link(Opts :: list()) -> {ok, pid()} | ignore | {error, any()}. +start_link(Opts) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []). %%------------------------------------------------------------------------------ %% @doc Authenticate MQTT Client @@ -73,10 +73,11 @@ auth(Client, Password) when is_record(Client, mqtt_client) -> auth(_Client, _Password, []) -> {error, "No auth module to check!"}; auth(Client, Password, [{Mod, State, _Seq} | Mods]) -> - case Mod:check(Client, Password, State) of - ok -> ok; + case catch Mod:check(Client, Password, State) of + ok -> ok; + ignore -> auth(Client, Password, Mods); {error, Reason} -> {error, Reason}; - ignore -> auth(Client, Password, Mods) + {'EXIT', Error} -> {error, Error} end. %%------------------------------------------------------------------------------ @@ -114,7 +115,7 @@ reload_acl() -> %% @doc Register authentication or ACL module %% @end %%------------------------------------------------------------------------------ --spec register_mod(Type :: auth | acl, Mod :: atom(), Opts :: list()) -> ok | {error, any()}. +-spec register_mod(auth | acl, atom(), list()) -> ok | {error, any()}. register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl-> register_mod(Type, Mod, Opts, 0). @@ -140,10 +141,9 @@ lookup_mods(Type) -> [] -> []; [{_, Mods}] -> Mods end. -tab_key(auth) -> - auth_modules; -tab_key(acl) -> - acl_modules. + +tab_key(auth) -> auth_modules; +tab_key(acl) -> acl_modules. %%------------------------------------------------------------------------------ %% @doc Stop access control server @@ -156,10 +156,10 @@ stop() -> %%% gen_server callbacks %%%============================================================================= -init([AcOpts]) -> - ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), - ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, AcOpts))}), - ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, AcOpts))}), +init([Opts]) -> + ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected]), + ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, Opts))}), + ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, Opts))}), {ok, state}. init_mods(auth, AuthMods) -> @@ -230,8 +230,11 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= authmod(Name) when is_atom(Name) -> - list_to_atom(lists:concat(["emqttd_auth_", Name])). + mod(emqttd_auth_, Name). aclmod(Name) when is_atom(Name) -> - list_to_atom(lists:concat(["emqttd_acl_", Name])). + mod(emqttd_acl_, Name). + +mod(Prefix, Name) -> + list_to_atom(lists:concat([Prefix, Name])). diff --git a/src/emqttd_acl_internal.erl b/src/emqttd_acl_internal.erl index e2a35a8ee..fd5118cb6 100644 --- a/src/emqttd_acl_internal.erl +++ b/src/emqttd_acl_internal.erl @@ -63,7 +63,7 @@ all_rules() -> %%------------------------------------------------------------------------------ -spec init(AclOpts :: list()) -> {ok, State :: any()}. init(AclOpts) -> - ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]), + ets:new(?ACL_RULE_TAB, [set, public, named_table]), AclFile = proplists:get_value(file, AclOpts), Default = proplists:get_value(nomatch, AclOpts, allow), State = #state{acl_file = AclFile, nomatch = Default}, @@ -139,6 +139,5 @@ reload_acl(State) -> %% @end %%------------------------------------------------------------------------------ -spec description() -> string(). -description() -> - "Internal ACL with etc/acl.config". +description() -> "Internal ACL with etc/acl.config". diff --git a/src/emqttd_alarm.erl b/src/emqttd_alarm.erl index d0087bb23..f22621b75 100644 --- a/src/emqttd_alarm.erl +++ b/src/emqttd_alarm.erl @@ -105,10 +105,10 @@ handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId, emqttd_pubsub:publish(alarm_msg(alert, AlarmId, Json)), {ok, [Alarm#mqtt_alarm{timestamp = Timestamp} | Alarms]}; -handle_event({clear_alarm, AlarmId}, Alarms)-> +handle_event({clear_alarm, AlarmId}, Alarms) -> Json = mochijson2:encode([{id, AlarmId}, {ts, emqttd_util:now_to_secs()}]), emqttd_pubsub:publish(alarm_msg(clear, AlarmId, Json)), - {ok, lists:keydelete(AlarmId, 2, Alarms)}; + {ok, lists:keydelete(AlarmId, 2, Alarms), hibernate}; handle_event(_, Alarms)-> {ok, Alarms}. diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index b236b41f7..09b20c440 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -78,7 +78,7 @@ start_servers(Sup) -> {"emqttd mod supervisor", emqttd_mod_sup}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd access control", emqttd_access_control}, - {"emqttd system monitor", emqttd_sysmon, emqttd:env(sysmon)}], + {"emqttd system monitor", {supervisor, emqttd_sysmon_sup}}], [start_server(Sup, Server) || Server <- Servers]. start_server(_Sup, {Name, F}) when is_function(F) -> diff --git a/src/emqttd_auth_ldap.erl b/src/emqttd_auth_ldap.erl index 64d02f9b1..d70aeaf47 100644 --- a/src/emqttd_auth_ldap.erl +++ b/src/emqttd_auth_ldap.erl @@ -81,6 +81,5 @@ ldap_bind(LDAP, UserDn, Password) -> fill(Username, UserDn) -> re:replace(UserDn, "\\$u", Username, [global, {return, list}]). -description() -> - "LDAP Authentication Module". +description() -> "LDAP Authentication Module". diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index 79e28743b..98a20cd62 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -119,12 +119,12 @@ handle_info({dispatch, Msg}, State = #state{mqueue = MQ, status = down}) -> handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) -> rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]), - {noreply, State}; + {noreply, State, hibernate}; handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> lager:warning("Bridge Node Down: ~p", [Node]), erlang:send_after(Interval, self(), ping_down_node), - {noreply, State#state{status = down}}; + {noreply, State#state{status = down}, hibernate}; handle_info({nodeup, Node}, State = #state{node = Node}) -> %% TODO: Really fast?? diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index 242bfdcf0..dc6f4e3af 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -80,7 +80,7 @@ start_bridge(Node, SubTopic, Options) when is_atom(Node) and is_binary(SubTopic) stop_bridge(Node, SubTopic) -> ChildId = bridge_id(Node, SubTopic), case supervisor:terminate_child(?MODULE, ChildId) of - ok -> + ok -> supervisor:delete_child(?MODULE, ChildId); {error, Reason} -> {error, Reason} diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 04cbafeeb..a35416606 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -216,7 +216,7 @@ stop_tick(TRef) -> %%%============================================================================= init([]) -> - random:seed(now()), + random:seed(os:timestamp()), ets:new(?BROKER_TAB, [set, public, named_table]), % Create $SYS Topics emqttd_pubsub:create(<<"$SYS/brokers">>), @@ -270,7 +270,7 @@ handle_info(tick, State) -> retain(brokers), retain(version, list_to_binary(version())), retain(sysdescr, list_to_binary(sysdescr())), - {noreply, State}; + {noreply, State, hibernate}; handle_info(_Info, State) -> {noreply, State}. diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 6c8ceecf5..ff8e9806a 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -39,6 +39,8 @@ clients/1, sessions/1, plugins/1, listeners/1, vm/1, mnesia/1, trace/1]). +%% TODO: topics, subscriptions... + -define(PROC_INFOKEYS, [status, memory, message_queue_len, @@ -47,6 +49,8 @@ stack_size, reductions]). +-define(APP, emqttd). + load() -> Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)], [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds]. @@ -65,10 +69,10 @@ is_cmd(Fun) -> status([]) -> {InternalStatus, _ProvidedStatus} = init:get_status(), ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]), - case lists:keysearch(emqttd, 1, application:which_applications()) of + case lists:keysearch(?APP, 1, application:which_applications()) of false -> ?PRINT_MSG("emqttd is not running~n"); - {value, {emqttd, _Desc, Vsn}} -> + {value, {?APP, _Desc, Vsn}} -> ?PRINT("emqttd ~s is running~n", [Vsn]) end; status(_) -> @@ -129,13 +133,9 @@ cluster([SNode]) -> false -> cluster(Node, fun() -> emqttd_plugins:unload(), - application:stop(emqttd), - application:stop(esockd), - application:stop(gproc), + stop_apps(), emqttd_mnesia:cluster(Node), - application:start(gproc), - application:start(esockd), - application:start(emqttd) + start_apps() end) end; @@ -157,6 +157,12 @@ cluster(pong, Node, DoCluster) -> cluster(pang, Node, _DoCluster) -> ?PRINT("Cannot connect to ~s~n", [Node]). +stop_apps() -> + [application:stop(App) || App <- [emqttd, esockd, gproc]]. + +start_apps() -> + [application:start(App) || App <- [gproc, esockd, emqttd]]. + %%------------------------------------------------------------------------------ %% @doc Query clients %% @end diff --git a/src/emqttd_sysmon_sup.erl b/src/emqttd_sysmon_sup.erl new file mode 100644 index 000000000..37bbd90bf --- /dev/null +++ b/src/emqttd_sysmon_sup.erl @@ -0,0 +1,45 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc emqttd sysmon supervisor. +%%% +%%% @author Feng Lee +%%%----------------------------------------------------------------------------- +-module(emqttd_sysmon_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + Env = emqttd:env(sysmon), + Sysmon = {sysmon, + {emqttd_sysmon, start_link, [Env]}, + permanent, 5000, worker, [emqttd_sysmon]}, + {ok, {{one_for_one, 10, 100}, [Sysmon]}}. +