diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index 5841321f9..3e637230f 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.6.1"}, + {vsn, "0.7.0"}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/apps/emqttd/src/emqttd_access_control.erl b/apps/emqttd/src/emqttd_access_control.erl index 94b8025a1..68ea96d69 100644 --- a/apps/emqttd/src/emqttd_access_control.erl +++ b/apps/emqttd/src/emqttd_access_control.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_access_control). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). @@ -55,9 +55,7 @@ %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc -%% Start access control server. -%% +%% @doc Start access control server %% @end %%------------------------------------------------------------------------------ -spec start_link(AcOpts :: list()) -> {ok, pid()} | ignore | {error, any()}. @@ -65,9 +63,7 @@ start_link(AcOpts) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [AcOpts], []). %%------------------------------------------------------------------------------ -%% @doc -%% Authenticate client. -%% +%% @doc Authenticate MQTT Client %% @end %%------------------------------------------------------------------------------ -spec auth(mqtt_client(), undefined | binary()) -> ok | {error, string()}. @@ -83,9 +79,7 @@ auth(Client, Password, [{Mod, State} | Mods]) -> end. %%------------------------------------------------------------------------------ -%% @doc -%% Check ACL. -%% +%% @doc Check ACL %% @end %%------------------------------------------------------------------------------ -spec check_acl(Client, PubSub, Topic) -> allow | deny when @@ -108,9 +102,7 @@ check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) -> end. %%------------------------------------------------------------------------------ -%% @doc -%% Reload ACL. -%% +%% @doc Reload ACL %% @end %%------------------------------------------------------------------------------ -spec reload_acl() -> list() | {error, any()}. @@ -118,9 +110,7 @@ reload_acl() -> [M:reload_acl(State) || {M, State} <- lookup_mods(acl)]. %%------------------------------------------------------------------------------ -%% @doc -%% Register auth or ACL module. -%% +%% @doc Register authentication or ACL module %% @end %%------------------------------------------------------------------------------ -spec register_mod(Type :: auth | acl, Mod :: atom(), Opts :: list()) -> ok | {error, any()}. @@ -128,9 +118,7 @@ register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl-> gen_server:call(?SERVER, {register_mod, Type, Mod, Opts}). %%------------------------------------------------------------------------------ -%% @doc -%% Unregister auth or ACL module. -%% +%% @doc Unregister authentication or ACL module %% @end %%------------------------------------------------------------------------------ -spec unregister_mod(Type :: auth | acl, Mod :: atom()) -> ok | {error, any()}. @@ -138,9 +126,7 @@ unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl -> gen_server:call(?SERVER, {unregister_mod, Type, Mod}). %%------------------------------------------------------------------------------ -%% @doc -%% Lookup authentication or ACL modules. -%% +%% @doc Lookup authentication or ACL modules %% @end %%------------------------------------------------------------------------------ -spec lookup_mods(auth | acl) -> list(). @@ -155,9 +141,7 @@ tab_key(acl) -> acl_modules. %%------------------------------------------------------------------------------ -%% @doc -%% Stop access control server. -%% +%% @doc Stop access control server %% @end %%------------------------------------------------------------------------------ stop() -> diff --git a/apps/emqttd/src/emqttd_access_rule.erl b/apps/emqttd/src/emqttd_access_rule.erl index fe4ea2e5e..82802fbdb 100644 --- a/apps/emqttd/src/emqttd_access_rule.erl +++ b/apps/emqttd/src/emqttd_access_rule.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_access_rule). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). @@ -48,12 +48,10 @@ -export([compile/1, match/3]). -%%%----------------------------------------------------------------------------- -%% @doc -%% Compile rule. -%% +%%------------------------------------------------------------------------------ +%% @doc Compile access rule %% @end -%%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ compile({A, all}) when (A =:= allow) orelse (A =:= deny) -> {A, all}; @@ -92,12 +90,10 @@ bin(L) when is_list(L) -> bin(B) when is_binary(B) -> B. -%%%----------------------------------------------------------------------------- -%% @doc -%% Match rule. -%% +%%------------------------------------------------------------------------------ +%% @doc Match rule %% @end -%%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -spec match(mqtt_client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch. match(_Client, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) -> {matched, AllowDeny}; diff --git a/apps/emqttd/src/emqttd_acl_internal.erl b/apps/emqttd/src/emqttd_acl_internal.erl index c5c0ea0fb..ca1bdb29b 100644 --- a/apps/emqttd/src/emqttd_acl_internal.erl +++ b/apps/emqttd/src/emqttd_acl_internal.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_acl_internal). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). @@ -45,7 +45,10 @@ %%% API %%%============================================================================= -%% @doc Read all rules. +%%------------------------------------------------------------------------------ +%% @doc Read all rules +%% @end +%%------------------------------------------------------------------------------ -spec all_rules() -> list(emqttd_access_rule:rule()). all_rules() -> case ets:lookup(?ACL_RULE_TAB, all_rules) of @@ -57,17 +60,20 @@ all_rules() -> %%% ACL callbacks %%%============================================================================= -%% @doc init internal ACL. +%%------------------------------------------------------------------------------ +%% @doc Init internal ACL +%% @end +%%------------------------------------------------------------------------------ -spec init(AclOpts :: list()) -> {ok, State :: any()}. init(AclOpts) -> ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]), AclFile = proplists:get_value(file, AclOpts), Default = proplists:get_value(nomatch, AclOpts, allow), State = #state{acl_file = AclFile, nomatch = Default}, - load_rules(State), + load_rules_from_file(State), {ok, State}. -load_rules(#state{acl_file = AclFile}) -> +load_rules_from_file(#state{acl_file = AclFile}) -> {ok, Terms} = file:consult(AclFile), Rules = [emqttd_access_rule:compile(Term) || Term <- Terms], lists:foreach(fun(PubSub) -> @@ -89,7 +95,10 @@ filter(subscribe, {_AllowDeny, _Who, subscribe, _Topics}) -> filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> false. -%% @doc Check ACL. +%%------------------------------------------------------------------------------ +%% @doc Check ACL +%% @end +%%------------------------------------------------------------------------------ -spec check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when Client :: mqtt_client(), PubSub :: pubsub(), @@ -117,15 +126,21 @@ match(Client, Topic, [Rule|Rules]) -> {matched, AllowDeny} -> {matched, AllowDeny} end. -%% @doc Reload ACL. +%%------------------------------------------------------------------------------ +%% @doc Reload ACL +%% @end +%%------------------------------------------------------------------------------ -spec reload_acl(State :: #state{}) -> ok | {error, Reason :: any()}. reload_acl(State) -> - case catch load_rules(State) of + case catch load_rules_from_file(State) of {'EXIT', Error} -> {error, Error}; _ -> ok end. -%% @doc ACL Description. +%%------------------------------------------------------------------------------ +%% @doc ACL Module Description +%% @end +%%------------------------------------------------------------------------------ -spec description() -> string(). description() -> "Internal ACL with etc/acl.config". diff --git a/apps/emqttd/src/emqttd_acl_mod.erl b/apps/emqttd/src/emqttd_acl_mod.erl index e6ca68e4c..b11b53809 100644 --- a/apps/emqttd/src/emqttd_acl_mod.erl +++ b/apps/emqttd/src/emqttd_acl_mod.erl @@ -20,13 +20,13 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd ACL behaviour. +%%% ACL module behaviour. %%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_acl_mod). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index ffd9d4649..4188bd59f 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_app). --author('feng@emqtt.io'). +-author("Feng Lee "). -behaviour(application). diff --git a/apps/emqttd/src/emqttd_auth_anonymous.erl b/apps/emqttd/src/emqttd_auth_anonymous.erl index 4e4285fee..e7497a56b 100644 --- a/apps/emqttd/src/emqttd_auth_anonymous.erl +++ b/apps/emqttd/src/emqttd_auth_anonymous.erl @@ -20,13 +20,13 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd anonymous authentication. +%%% Anonymous authentication module. %%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_auth_anonymous). --author('feng@emqtt.io'). +-author("Feng Lee "). -behaviour(emqttd_auth_mod). @@ -34,7 +34,7 @@ init(Opts) -> {ok, Opts}. -check(_User, _Password, _Opts) -> ok. +check(_Client, _Password, _Opts) -> ok. description() -> "Anonymous authentication module". diff --git a/apps/emqttd/src/emqttd_auth_clientid.erl b/apps/emqttd/src/emqttd_auth_clientid.erl index 55ad8dcd2..56a300287 100644 --- a/apps/emqttd/src/emqttd_auth_clientid.erl +++ b/apps/emqttd/src/emqttd_auth_clientid.erl @@ -20,13 +20,13 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd authentication with clientid. +%%% ClientId authentication module. %%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_auth_clientid). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). @@ -36,30 +36,58 @@ -behaviour(emqttd_auth_mod). -%% emqttd_auth callbacks +%% emqttd_auth_mod callbacks -export([init/1, check/3, description/0]). -define(AUTH_CLIENTID_TAB, mqtt_auth_clientid). -record(?AUTH_CLIENTID_TAB, {clientid, ipaddr, password}). +%%%============================================================================= +%%% API +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Add clientid +%% @end +%%------------------------------------------------------------------------------ add_clientid(ClientId) when is_binary(ClientId) -> R = #mqtt_auth_clientid{clientid = ClientId}, mnesia:transaction(fun() -> mnesia:write(R) end). +%%------------------------------------------------------------------------------ +%% @doc Add clientid with password +%% @end +%%------------------------------------------------------------------------------ add_clientid(ClientId, Password) -> R = #mqtt_auth_clientid{clientid = ClientId, password = Password}, mnesia:transaction(fun() -> mnesia:write(R) end). +%%------------------------------------------------------------------------------ +%% @doc Lookup clientid +%% @end +%%------------------------------------------------------------------------------ lookup_clientid(ClientId) -> mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId). +%%------------------------------------------------------------------------------ +%% @doc Lookup all clientids +%% @end +%%------------------------------------------------------------------------------ all_clientids() -> mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB). +%%------------------------------------------------------------------------------ +%% @doc Remove clientid +%% @end +%%------------------------------------------------------------------------------ remove_clientid(ClientId) -> mnesia:transaction(fun() -> mnesia:delete({?AUTH_CLIENTID_TAB, ClientId}) end). +%%%============================================================================= +%%% emqttd_auth_mod callbacks +%%%============================================================================= + init(Opts) -> mnesia:create_table(?AUTH_CLIENTID_TAB, [ {ram_copies, [node()]}, @@ -129,4 +157,3 @@ check_clientid_only(ClientId, IpAddr) -> end end. - diff --git a/apps/emqttd/src/emqttd_bridge.erl b/apps/emqttd/src/emqttd_bridge.erl index 8e4896351..053b358e2 100644 --- a/apps/emqttd/src/emqttd_bridge.erl +++ b/apps/emqttd/src/emqttd_bridge.erl @@ -26,13 +26,13 @@ %%%----------------------------------------------------------------------------- -module(emqttd_bridge). --author('feng@emqtt.io'). +-author("Feng Lee "). --behaviour(gen_server). +-include("emqttd.hrl"). -include_lib("emqtt/include/emqtt.hrl"). --include("emqttd.hrl"). +-behaviour(gen_server). %% API Function Exports -export([start_link/3]). @@ -64,9 +64,7 @@ %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc -%% Start a bridge. -%% +%% @doc Start a bridge %% @end %%------------------------------------------------------------------------------ -spec start_link(atom(), binary(), [option()]) -> {ok, pid()} | ignore | {error, term()}. @@ -103,7 +101,7 @@ parse_opts([{ping_down_interval, Interval} | Opts], State) -> parse_opts(Opts, State#state{ping_down_interval = Interval*1000}). handle_call(_Request, _From, State) -> - {reply, ok, State}. + {reply, error, State}. handle_cast(_Msg, State) -> {noreply, State}. diff --git a/apps/emqttd/src/emqttd_bridge_sup.erl b/apps/emqttd/src/emqttd_bridge_sup.erl index 3857461c9..0b1818538 100644 --- a/apps/emqttd/src/emqttd_bridge_sup.erl +++ b/apps/emqttd/src/emqttd_bridge_sup.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_bridge_sup). --author('feng@emqtt.io'). +-author("Feng Lee "). -behavior(supervisor). @@ -42,23 +42,20 @@ %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc -%% Start bridge supervisor. -%% +%% @doc Start bridge supervisor %% @end %%------------------------------------------------------------------------------ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). +%%TODO: bridges... -spec bridges() -> [{tuple(), pid()}]. bridges() -> [{{Node, SubTopic}, Pid} || {{bridge, Node, SubTopic}, Pid, worker, _} <- supervisor:which_children(?MODULE)]. %%------------------------------------------------------------------------------ -%% @doc -%% Start a bridge. -%% +%% @doc Start a bridge %% @end %%------------------------------------------------------------------------------ -spec start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}. @@ -72,9 +69,7 @@ start_bridge(Node, SubTopic, Options) when is_atom(Node) and is_binary(SubTopic) supervisor:start_child(?MODULE, bridge_spec(Node, SubTopic, Options1)). %%------------------------------------------------------------------------------ -%% @doc -%% Stop a bridge. -%% +%% @doc Stop a bridge %% @end %%------------------------------------------------------------------------------ -spec stop_bridge(atom(), binary()) -> {ok, pid()} | ok. diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 4d4dd9b18..1933a180f 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -26,28 +26,32 @@ %%%----------------------------------------------------------------------------- -module(emqttd_broker). --include_lib("emqtt/include/emqtt.hrl"). +-author("Feng Lee "). -include("emqttd_systop.hrl"). +-include_lib("emqtt/include/emqtt.hrl"). + -behaviour(gen_server). -define(SERVER, ?MODULE). --define(BROKER_TAB, mqtt_broker). - %% API Function Exports -export([start_link/1]). -export([version/0, uptime/0, datetime/0, sysdescr/0]). %% statistics API. --export([getstats/0, getstat/1, setstat/2, setstats/3]). +-export([statsfun/1, statsfun/2, + getstats/0, getstat/1, + setstat/2, setstats/3]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-define(BROKER_TAB, mqtt_broker). + -record(state, {started_at, sys_interval, tick_timer}). %%%============================================================================= @@ -55,9 +59,7 @@ %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc -%% Start emqttd broker. -%% +%% @doc Start emqttd broker %% @end %%------------------------------------------------------------------------------ -spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}. @@ -65,9 +67,7 @@ start_link(Options) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []). %%------------------------------------------------------------------------------ -%% @doc -%% Get broker version. -%% +%% @doc Get broker version %% @end %%------------------------------------------------------------------------------ -spec version() -> string(). @@ -75,9 +75,7 @@ version() -> {ok, Version} = application:get_key(emqttd, vsn), Version. %%------------------------------------------------------------------------------ -%% @doc -%% Get broker description. -%% +%% @doc Get broker description %% @end %%------------------------------------------------------------------------------ -spec sysdescr() -> string(). @@ -85,9 +83,7 @@ sysdescr() -> {ok, Descr} = application:get_key(emqttd, description), Descr. %%------------------------------------------------------------------------------ -%% @doc -%% Get broker uptime. -%% +%% @doc Get broker uptime %% @end %%------------------------------------------------------------------------------ -spec uptime() -> string(). @@ -95,9 +91,7 @@ uptime() -> gen_server:call(?SERVER, uptime). %%------------------------------------------------------------------------------ -%% @doc -%% Get broker datetime. -%% +%% @doc Get broker datetime %% @end %%------------------------------------------------------------------------------ -spec datetime() -> string(). @@ -108,9 +102,19 @@ datetime() -> "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). %%------------------------------------------------------------------------------ -%% @doc -%% Get broker statistics. -%% +%% @doc Generate stats fun +%% @end +%%------------------------------------------------------------------------------ +-spec statsfun(Stat :: atom()) -> fun(). +statsfun(Stat) -> + fun(Val) -> setstat(Stat, Val) end. + +-spec statsfun(Stat :: atom(), MaxStat :: atom()) -> fun(). +statsfun(Stat, MaxStat) -> + fun(Val) -> setstats(Stat, MaxStat, Val) end. + +%%------------------------------------------------------------------------------ +%% @doc Get broker statistics %% @end %%------------------------------------------------------------------------------ -spec getstats() -> [{atom(), non_neg_integer()}]. @@ -118,9 +122,7 @@ getstats() -> ets:tab2list(?BROKER_TAB). %%------------------------------------------------------------------------------ -%% @doc -%% Get stats by name. -%% +%% @doc Get stats by name %% @end %%------------------------------------------------------------------------------ -spec getstat(atom()) -> non_neg_integer() | undefined. @@ -131,9 +133,7 @@ getstat(Name) -> end. %%------------------------------------------------------------------------------ -%% @doc -%% Set broker stats. -%% +%% @doc Set broker stats %% @end %%------------------------------------------------------------------------------ -spec setstat(Stat :: atom(), Val :: pos_integer()) -> boolean(). @@ -141,9 +141,7 @@ setstat(Stat, Val) -> ets:update_element(?BROKER_TAB, Stat, {2, Val}). %%------------------------------------------------------------------------------ -%% @doc -%% Set stats with max. -%% +%% @doc Set stats with max %% @end %%------------------------------------------------------------------------------ -spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean(). @@ -180,7 +178,7 @@ handle_call(uptime, _From, State) -> {reply, uptime(State), State}; handle_call(_Request, _From, State) -> - {reply, ok, State}. + {reply, error, State}. handle_cast(_Msg, State) -> {noreply, State}. @@ -252,4 +250,3 @@ tick(Delay, State) -> i2b(I) when is_integer(I) -> list_to_binary(integer_to_list(I)). - diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 44c728c78..5e32fcb94 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -20,29 +20,27 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd client. +%%% MQTT Client %%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_client). --author('feng@emqtt.io'). - --behaviour(gen_server). - --export([start_link/2, info/1]). - --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - code_change/3, - terminate/2]). +-author("Feng Lee "). -include_lib("emqtt/include/emqtt.hrl"). -include_lib("emqtt/include/emqtt_packet.hrl"). +%% API Function Exports +-export([start_link/2, info/1]). + +-behaviour(gen_server). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + code_change/3, terminate/2]). + %%Client State... -record(state, {transport, socket, diff --git a/apps/emqttd/src/emqttd_cluster.erl b/apps/emqttd/src/emqttd_cluster.erl index ad181c8ef..e7d6fe980 100644 --- a/apps/emqttd/src/emqttd_cluster.erl +++ b/apps/emqttd/src/emqttd_cluster.erl @@ -26,10 +26,14 @@ %%%----------------------------------------------------------------------------- -module(emqttd_cluster). --author('feng@emqtt.io'). +-author("Feng Lee "). -export([running_nodes/0]). +%%------------------------------------------------------------------------------ +%% @doc Get running nodes +%% @end +%%------------------------------------------------------------------------------ running_nodes() -> mnesia:system_info(running_db_nodes). diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 6b9d34848..5269138c8 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -20,13 +20,13 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd client manager. +%%% MQTT Client Manager %%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_cm). --author('feng@emqtt.io'). +-author("Feng Lee "). -behaviour(gen_server). @@ -37,18 +37,11 @@ -export([lookup/1, register/1, unregister/1]). -%% Stats --export([getstats/0]). - %% gen_server Function Exports --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). --record(state, {tab}). +-record(state, {tab, statsfun}). -define(CLIENT_TAB, mqtt_client). @@ -57,9 +50,7 @@ %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc -%% Start client manager. -%% +%% @doc Start client manager %% @end %%------------------------------------------------------------------------------ -spec start_link() -> {ok, pid()} | ignore | {error, any()}. @@ -67,9 +58,7 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%------------------------------------------------------------------------------ -%% @doc -%% Lookup client pid with clientId. -%% +%% @doc Lookup client pid with clientId %% @end %%------------------------------------------------------------------------------ -spec lookup(ClientId :: binary()) -> pid() | undefined. @@ -93,25 +82,13 @@ register(ClientId) when is_binary(ClientId) -> end. %%------------------------------------------------------------------------------ -%% @doc -%% Unregister clientId with pid. -%% +%% @doc Unregister clientId with pid. %% @end %%------------------------------------------------------------------------------ -spec unregister(ClientId :: binary()) -> ok. unregister(ClientId) when is_binary(ClientId) -> gen_server:cast(?SERVER, {unregister, ClientId, self()}). -%%------------------------------------------------------------------------------ -%% @doc -%% Get statistics of client manager. -%% -%% @end -%%------------------------------------------------------------------------------ -getstats() -> - [{Name, emqttd_broker:getstat(Name)} || - Name <- ['clients/count', 'clients/max']]. - %%%============================================================================= %%% gen_server callbacks %%%============================================================================= @@ -121,7 +98,8 @@ init([]) -> named_table, public, {write_concurrency, true}]), - {ok, #state{tab = TabId}}. + StatsFun = emqttd_broker:statsfun('clients/count', 'clients/max'), + {ok, #state{tab = TabId, statsfun = StatsFun}}. handle_call(Req, _From, State) -> lager:error("unexpected request: ~p", [Req]), @@ -188,9 +166,7 @@ registerd(Tab, {ClientId, Pid}) -> false end. -setstats(State) -> - emqttd_broker:setstats('clients/count', - 'clients/max', - ets:info(?CLIENT_TAB, size)), State. +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(ets:info(?CLIENT_TAB, size)), State. diff --git a/apps/emqttd/src/emqttd_config.erl b/apps/emqttd/src/emqttd_config.erl index ee6e862a4..38d6e6794 100644 --- a/apps/emqttd/src/emqttd_config.erl +++ b/apps/emqttd/src/emqttd_config.erl @@ -26,6 +26,8 @@ %%%----------------------------------------------------------------------------- -module(emqttd_config). +-author("Feng Lee "). + -define(SERVER, ?MODULE). -behaviour(gen_server). diff --git a/apps/emqttd/src/emqttd_ctl.erl b/apps/emqttd/src/emqttd_ctl.erl index 4fbaebd53..c32835eec 100644 --- a/apps/emqttd/src/emqttd_ctl.erl +++ b/apps/emqttd/src/emqttd_ctl.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_ctl). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). @@ -49,6 +49,10 @@ useradd/1, userdel/1]). +%%------------------------------------------------------------------------------ +%% @doc Query node status +%% @end +%%------------------------------------------------------------------------------ status([]) -> {InternalStatus, _ProvidedStatus} = init:get_status(), ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]), @@ -59,6 +63,10 @@ status([]) -> ?PRINT_MSG("emqttd is running~n") end. +%%------------------------------------------------------------------------------ +%% @doc Cluster with other node +%% @end +%%------------------------------------------------------------------------------ cluster([]) -> Nodes = [node()|nodes()], ?PRINT("cluster nodes: ~p~n", [Nodes]); @@ -77,9 +85,17 @@ cluster([SNode]) -> ?PRINT("failed to connect to ~p~n", [Node]) end. +%%------------------------------------------------------------------------------ +%% @doc Add usern +%% @end +%%------------------------------------------------------------------------------ useradd([Username, Password]) -> ?PRINT("~p~n", [emqttd_auth_username:add_user(bin(Username), bin(Password))]). +%%------------------------------------------------------------------------------ +%% @doc Delete user +%% @end +%%------------------------------------------------------------------------------ userdel([Username]) -> ?PRINT("~p~n", [emqttd_auth_username:remove_user(bin(Username))]). diff --git a/apps/emqttd/src/emqttd_event.erl b/apps/emqttd/src/emqttd_event.erl index a4c6a9252..b9ff574b8 100644 --- a/apps/emqttd/src/emqttd_event.erl +++ b/apps/emqttd/src/emqttd_event.erl @@ -26,27 +26,21 @@ %%%----------------------------------------------------------------------------- -module(emqttd_event). +-author("Feng Lee "). + -include_lib("emqtt/include/emqtt.hrl"). %% API Function Exports --export([start_link/0, - add_handler/2, - notify/1]). +-export([start_link/0, add_handler/2, notify/1]). %% gen_event Function Exports --export([init/1, - handle_event/2, - handle_call/2, - handle_info/2, - terminate/2, - code_change/3]). +-export([init/1, handle_event/2, handle_call/2, handle_info/2, + terminate/2, code_change/3]). -record(state, {systop}). %%------------------------------------------------------------------------------ -%% @doc -%% Start emqttd event manager. -%% +%% @doc Start event manager %% @end %%------------------------------------------------------------------------------ -spec start_link() -> {ok, pid()} | {error, any()}. @@ -64,6 +58,7 @@ add_handler(Handler, Args) -> notify(Event) -> gen_event:notify(?MODULE, Event). + %%%============================================================================= %%% gen_event callbacks %%%============================================================================= @@ -108,9 +103,9 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%% ------------------------------------------------------------------ -%% Internal Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% Internal functions +%%%============================================================================= payload(connected, Params) -> From = proplists:get_value(from, Params), diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index c4f95f21a..06146c8c2 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -26,12 +26,12 @@ %%%----------------------------------------------------------------------------- -module(emqttd_http). --author('feng@emqtt.io'). - --include_lib("emqtt/include/emqtt.hrl"). +-author("Feng Lee "). -include("emqttd.hrl"). +-include_lib("emqtt/include/emqtt.hrl"). + -import(proplists, [get_value/2, get_value/3]). -export([handle/1]). diff --git a/apps/emqttd/src/emqttd_keepalive.erl b/apps/emqttd/src/emqttd_keepalive.erl index 5bfb1e1a4..c7cf9917e 100644 --- a/apps/emqttd/src/emqttd_keepalive.erl +++ b/apps/emqttd/src/emqttd_keepalive.erl @@ -19,23 +19,25 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd keepalive. +%%% @doc client keepalive %%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_keepalive). --author('feng@emqtt.io'). +-author("Feng Lee "). -export([new/3, resume/1, cancel/1]). --record(keepalive, {transport, socket, recv_oct, timeout_sec, timeout_msg, timer_ref}). +-record(keepalive, {transport, + socket, + recv_oct, + timeout_sec, + timeout_msg, + timer_ref}). %%------------------------------------------------------------------------------ -%% @doc -%% Create a keepalive. -%% +%% @doc Create a keepalive %% @end %%------------------------------------------------------------------------------ new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> @@ -49,9 +51,7 @@ new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> timer_ref = Ref}. %%------------------------------------------------------------------------------ -%% @doc -%% Try to resume keepalive, called when timeout. -%% +%% @doc Try to resume keepalive, called when timeout %% @end %%------------------------------------------------------------------------------ resume(KeepAlive = #keepalive {transport = Transport, @@ -72,9 +72,7 @@ resume(KeepAlive = #keepalive {transport = Transport, end. %%------------------------------------------------------------------------------ -%% @doc -%% Cancel Keepalive. -%% +%% @doc Cancel Keepalive %% @end %%------------------------------------------------------------------------------ cancel(#keepalive{timer_ref = Ref}) -> diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index 9d2f21c77..dcda649b1 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -26,18 +26,16 @@ %%%----------------------------------------------------------------------------- -module(emqttd_metrics). --author('feng@emqtt.io'). - --include_lib("emqtt/include/emqtt.hrl"). +-author("Feng Lee "). -include("emqttd_systop.hrl"). +-include_lib("emqtt/include/emqtt.hrl"). + -behaviour(gen_server). -define(SERVER, ?MODULE). --define(METRIC_TAB, mqtt_broker_metric). - %% API Function Exports -export([start_link/1]). @@ -50,6 +48,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-define(METRIC_TAB, mqtt_metric). + -record(state, {pub_interval, tick_timer}). %%%============================================================================= @@ -57,9 +57,7 @@ %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc -%% Start emqttd metrics. -%% +%% @doc Start metrics server %% @end %%------------------------------------------------------------------------------ -spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}. @@ -67,9 +65,7 @@ start_link(Options) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []). %%------------------------------------------------------------------------------ -%% @doc -%% Get all metrics. -%% +%% @doc Get all metrics %% @end %%------------------------------------------------------------------------------ -spec all() -> [{atom(), non_neg_integer()}]. @@ -84,9 +80,7 @@ all() -> end, #{}, ?METRIC_TAB)). %%------------------------------------------------------------------------------ -%% @doc -%% Get metric value. -%% +%% @doc Get metric value %% @end %%------------------------------------------------------------------------------ -spec value(atom()) -> non_neg_integer(). @@ -94,9 +88,7 @@ value(Metric) -> lists:sum(ets:select(?METRIC_TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])). %%------------------------------------------------------------------------------ -%% @doc -%% Increase counter. -%% +%% @doc Increase counter %% @end %%------------------------------------------------------------------------------ -spec inc(atom()) -> non_neg_integer(). @@ -104,9 +96,7 @@ inc(Metric) -> inc(counter, Metric, 1). %%------------------------------------------------------------------------------ -%% @doc -%% Increase metric value. -%% +%% @doc Increase metric value %% @end %%------------------------------------------------------------------------------ -spec inc(counter | gauge, atom()) -> non_neg_integer(). @@ -118,9 +108,7 @@ inc(Metric, Val) when is_atom(Metric) and is_integer(Val) -> inc(counter, Metric, Val). %%------------------------------------------------------------------------------ -%% @doc -%% Increase metric value. -%% +%% @doc Increase metric value %% @end %%------------------------------------------------------------------------------ -spec inc(counter | gauge, atom(), pos_integer()) -> pos_integer(). @@ -130,9 +118,7 @@ inc(counter, Metric, Val) -> ets:update_counter(?METRIC_TAB, key(counter, Metric), {2, Val}). %%------------------------------------------------------------------------------ -%% @doc -%% Decrease metric value. -%% +%% @doc Decrease metric value %% @end %%------------------------------------------------------------------------------ -spec dec(gauge, atom()) -> integer(). @@ -140,9 +126,7 @@ dec(gauge, Metric) -> dec(gauge, Metric, 1). %%------------------------------------------------------------------------------ -%% @doc -%% Decrease metric value -%% +%% @doc Decrease metric value %% @end %%------------------------------------------------------------------------------ -spec dec(gauge, atom(), pos_integer()) -> integer(). @@ -150,9 +134,7 @@ dec(gauge, Metric, Val) -> ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, -Val}). %%------------------------------------------------------------------------------ -%% @doc -%% Set metric value. -%% +%% @doc Set metric value %% @end %%------------------------------------------------------------------------------ set(Metric, Val) when is_atom(Metric) -> @@ -161,10 +143,7 @@ set(gauge, Metric, Val) -> ets:insert(?METRIC_TAB, {key(gauge, Metric), Val}). %%------------------------------------------------------------------------------ -%% @doc -%% @private -%% Metric Key -%% +%% @doc Metric Key %% @end %%------------------------------------------------------------------------------ key(gauge, Metric) -> @@ -192,19 +171,19 @@ init([Options]) -> end, {ok, tick(Delay, #state{pub_interval = PubInterval}), hibernate}. -handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. +handle_call(_Req, _From, State) -> + {reply, {error, badreq}, State}. -handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. +handle_cast(_Msg, State) -> + {noreply, State}. handle_info(tick, State) -> % publish metric message [publish(systop(Metric), i2b(Val))|| {Metric, Val} <- all()], {noreply, tick(State), hibernate}; -handle_info(Info, State) -> - {stop, {badinfo, Info}, State}. +handle_info(_Info, State) -> + {noreply, State}. terminate(_Reason, _State) -> ok. @@ -241,4 +220,3 @@ tick(Delay, State) -> i2b(I) -> list_to_binary(integer_to_list(I)). - diff --git a/apps/emqttd/src/emqttd_msg_store.erl b/apps/emqttd/src/emqttd_msg_store.erl index 4e03c0e03..01f91fe1f 100644 --- a/apps/emqttd/src/emqttd_msg_store.erl +++ b/apps/emqttd/src/emqttd_msg_store.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_msg_store). --author('feng@slimpp.io'). +-author("Feng Lee "). -include_lib("emqtt/include/emqtt.hrl"). @@ -58,9 +58,7 @@ mnesia(copy) -> %%%============================================================================= %%%----------------------------------------------------------------------------- -%% @doc -%% Retain message. -%% +%% @doc Retain message %% @end %%%----------------------------------------------------------------------------- -spec retain(mqtt_message()) -> ok | ignore. @@ -100,7 +98,10 @@ env() -> Env end. -%% @doc redeliver retained messages to subscribed client. +%%%----------------------------------------------------------------------------- +%% @doc Redeliver retained messages to subscribed client +%% @end +%%%----------------------------------------------------------------------------- -spec redeliver(Topic, CPid) -> any() when Topic :: binary(), CPid :: pid(). @@ -126,4 +127,3 @@ dispatch(CPid, Msgs) when is_list(Msgs) -> dispatch(CPid, Msg) when is_record(Msg, mqtt_message) -> CPid ! {dispatch, {self(), Msg}}. - diff --git a/apps/emqttd/src/emqttd_net.erl b/apps/emqttd/src/emqttd_net.erl index 199fcd4fd..1a0b3d954 100644 --- a/apps/emqttd/src/emqttd_net.erl +++ b/apps/emqttd/src/emqttd_net.erl @@ -26,14 +26,14 @@ %%%----------------------------------------------------------------------------- -module(emqttd_net). --author('feng@emqtt.io'). +-author("Feng Lee "). + +-include_lib("kernel/include/inet.hrl"). -export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]). -export([peername/1, sockname/1, format/2, format/1, connection_string/2]). --include_lib("kernel/include/inet.hrl"). - -define(FIRST_TEST_BIND_PORT, 10000). %%-------------------------------------------------------------------- diff --git a/apps/emqttd/src/emqttd_opts.erl b/apps/emqttd/src/emqttd_opts.erl index b67f55665..b1f1e2ed5 100644 --- a/apps/emqttd/src/emqttd_opts.erl +++ b/apps/emqttd/src/emqttd_opts.erl @@ -26,8 +26,14 @@ %%%----------------------------------------------------------------------------- -module(emqttd_opts). +-author("Feng Lee "). + -export([merge/2]). +%%%----------------------------------------------------------------------------- +%% @doc Merge Options +%% @end +%%%----------------------------------------------------------------------------- merge(Defaults, Options) -> lists:foldl( fun({Opt, Val}, Acc) -> @@ -44,5 +50,3 @@ merge(Defaults, Options) -> end end, Defaults, Options). - - diff --git a/apps/emqttd/src/emqttd_plugin.erl b/apps/emqttd/src/emqttd_plugin.erl index 8963035e5..f90a96361 100644 --- a/apps/emqttd/src/emqttd_plugin.erl +++ b/apps/emqttd/src/emqttd_plugin.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_plugin). --author('feng@emqtt.io'). +-author("Feng Lee "). -behaviour(gen_server). diff --git a/apps/emqttd/src/emqttd_plugin_manager.erl b/apps/emqttd/src/emqttd_plugin_manager.erl index 37cfac690..244018d5d 100644 --- a/apps/emqttd/src/emqttd_plugin_manager.erl +++ b/apps/emqttd/src/emqttd_plugin_manager.erl @@ -26,34 +26,28 @@ %%%----------------------------------------------------------------------------- -module(emqttd_plugin_manager). +-author("Feng Lee "). + -export([list/0, load/1, unload/1]). %%------------------------------------------------------------------------------ -%% @doc -%% List all loaded plugins. -%% +%% @doc List all loaded plugins %% @end %%------------------------------------------------------------------------------ list() -> []. %%------------------------------------------------------------------------------ -%% @doc -%% Load Plugin. -%% +%% @doc Load Plugin %% @end %%------------------------------------------------------------------------------ load(Name) when is_atom(Name) -> ok. %%------------------------------------------------------------------------------ -%% @doc -%% Unload Plugin. -%% +%% @doc Unload Plugin %% @end %%------------------------------------------------------------------------------ unload(Name) when is_atom(Name) -> ok. - - diff --git a/apps/emqttd/src/emqttd_pooler.erl b/apps/emqttd/src/emqttd_pooler.erl index 09ad7bf7a..2d67ab238 100644 --- a/apps/emqttd/src/emqttd_pooler.erl +++ b/apps/emqttd/src/emqttd_pooler.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_pooler). --author('feng@emqtt.io'). +-author("Feng Lee "). -behaviour(gen_server). @@ -46,12 +46,24 @@ start_link(I) -> gen_server:start_link(?MODULE, [I], []). +%%------------------------------------------------------------------------------ +%% @doc Submit work to pooler +%% @end +%%------------------------------------------------------------------------------ submit(Fun) -> gen_server:call(gproc_pool:pick(pooler), {submit, Fun}, infinity). +%%------------------------------------------------------------------------------ +%% @doc Submit work to pooler asynchronously +%% @end +%%------------------------------------------------------------------------------ async_submit(Fun) -> gen_server:cast(gproc_pool:pick(pooler), {async_submit, Fun}). +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= + init([I]) -> gproc_pool:connect_worker(pooler, {pooler, I}), {ok, #state{id = I}}. diff --git a/apps/emqttd/src/emqttd_pooler_sup.erl b/apps/emqttd/src/emqttd_pooler_sup.erl index c2a62bbbb..5714183aa 100644 --- a/apps/emqttd/src/emqttd_pooler_sup.erl +++ b/apps/emqttd/src/emqttd_pooler_sup.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_pooler_sup). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). @@ -55,3 +55,4 @@ init([PoolSize]) -> end, lists:seq(1, PoolSize)), {ok, {{one_for_all, 10, 100}, Children}}. + diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 8f595632b..a073d3844 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -26,6 +26,8 @@ %%%----------------------------------------------------------------------------- -module(emqttd_protocol). +-author("Feng Lee "). + -include_lib("emqtt/include/emqtt.hrl"). -include_lib("emqtt/include/emqtt_packet.hrl"). diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 33193a397..98079ab7e 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_pubsub). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). @@ -88,7 +88,8 @@ mnesia(copy) -> %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc Start one pubsub. +%% @doc Start one pubsub server +%% @end %%------------------------------------------------------------------------------ -spec start_link(Id, Opts) -> {ok, pid()} | ignore | {error, any()} when Id :: pos_integer(), @@ -97,30 +98,30 @@ start_link(Id, Opts) -> gen_server:start_link(?MODULE, [Id, Opts], []). %%------------------------------------------------------------------------------ -%% @doc Create topic. Notice That this transaction is not protected by pubsub pool. +%% @doc Create topic. Notice That this transaction is not protected by pubsub pool +%% @end %%------------------------------------------------------------------------------ -spec create(Topic :: binary()) -> ok | {error, Error :: any()}. create(Topic) when is_binary(Topic) -> - TopicR = #mqtt_topic{topic = Topic, node = node()}, - case mnesia:transaction(fun add_topic/1, [TopicR]) of - {atomic, ok} -> setstats(topics), ok; - {aborted, Error} -> {error, Error} - end. + call({create, Topic}). %%------------------------------------------------------------------------------ -%% @doc Subscribe topic. +%% @doc Subscribe topic +%% @end %%------------------------------------------------------------------------------ -spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when Topic :: binary(), Qos :: mqtt_qos(). - subscribe({Topic, Qos}) when is_binary(Topic) andalso ?IS_QOS(Qos) -> call({subscribe, self(), Topic, Qos}); subscribe(Topics = [{_Topic, _Qos} | _]) -> call({subscribe, self(), Topics}). +%%------------------------------------------------------------------------------ %% @doc Unsubscribe Topic or Topics +%% @end +%%------------------------------------------------------------------------------ -spec unsubscribe(binary() | list(binary())) -> ok. unsubscribe(Topic) when is_binary(Topic) -> cast({unsubscribe, self(), Topic}); @@ -137,7 +138,8 @@ cast(Msg) -> gen_server:cast(Pid, Msg). %%------------------------------------------------------------------------------ -%% @doc Publish to cluster nodes. +%% @doc Publish to cluster nodes +%% @end %%------------------------------------------------------------------------------ -spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok. publish(From, Msg=#mqtt_message{topic=Topic}) -> @@ -159,7 +161,10 @@ publish(_From, Topic, Msg) when is_binary(Topic) -> end end, match(Topic)). +%%------------------------------------------------------------------------------ %% @doc Dispatch message locally. should only be called by publish. +%% @end +%%------------------------------------------------------------------------------ -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> Subscribers = mnesia:dirty_read(subscriber, Topic), @@ -188,6 +193,16 @@ init([Id, _Opts]) -> gproc_pool:connect_worker(pubsub, {?MODULE, Id}), {ok, #state{id = Id, submap = maps:new()}}. +handle_call({create, Topic}, _From, State) -> + TopicR = #mqtt_topic{topic = Topic, node = node()}, + Reply = + case mnesia:transaction(fun add_topic/1, [TopicR]) of + {atomic, ok} -> ok; + {aborted, Error} -> {error, Error} + end, + setstats(topics), + {reply, Reply, State}; + handle_call({subscribe, SubPid, Topics}, _From, State) -> TopicSubs = lists:map(fun({Topic, Qos}) -> {#mqtt_topic{topic = Topic, node = node()}, diff --git a/apps/emqttd/src/emqttd_pubsub_sup.erl b/apps/emqttd/src/emqttd_pubsub_sup.erl index b6a7936f6..590615ae0 100644 --- a/apps/emqttd/src/emqttd_pubsub_sup.erl +++ b/apps/emqttd/src/emqttd_pubsub_sup.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_pubsub_sup). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). diff --git a/apps/emqttd/src/emqttd_queue.erl b/apps/emqttd/src/emqttd_queue.erl index f5ce38fce..58a600bda 100644 --- a/apps/emqttd/src/emqttd_queue.erl +++ b/apps/emqttd/src/emqttd_queue.erl @@ -29,6 +29,8 @@ -module(emqttd_queue). +-author("Feng Lee "). + -include_lib("emqtt/include/emqtt.hrl"). -export([new/1, new/2, in/3, all/1, clear/1]). diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index a921efd91..affe1ae33 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -26,10 +26,14 @@ %%%----------------------------------------------------------------------------- -module(emqttd_session). --include_lib("emqtt/include/emqtt.hrl"). --include_lib("emqtt/include/emqtt_packet.hrl"). +-author("Feng Lee "). + -include("emqttd.hrl"). +-include_lib("emqtt/include/emqtt.hrl"). + +-include_lib("emqtt/include/emqtt_packet.hrl"). + %% API Function Exports -export([start/1, resume/3, @@ -67,9 +71,7 @@ %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc -%% Start Session. -%% +%% @doc Start Session %% @end %%------------------------------------------------------------------------------ -spec start({boolean(), binary(), pid()}) -> {ok, session()}. @@ -83,9 +85,7 @@ start({false = _CleanSess, ClientId, ClientPid}) -> {ok, SessPid}. %%------------------------------------------------------------------------------ -%% @doc -%% Resume Session. -%% +%% @doc Resume Session %% @end %%------------------------------------------------------------------------------ -spec resume(session(), binary(), pid()) -> session(). @@ -96,9 +96,7 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> SessPid. %%------------------------------------------------------------------------------ -%% @doc -%% Publish message. -%% +%% @doc Publish message %% @end %%------------------------------------------------------------------------------ -spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session(). @@ -118,9 +116,7 @@ publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) -> SessPid. %%------------------------------------------------------------------------------ -%% @doc -%% PubAck message. -%% +%% @doc PubAck message %% @end %%------------------------------------------------------------------------------ -spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session(). @@ -172,9 +168,7 @@ puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> gen_server:cast(SessPid, {pubcomp, PacketId}), SessPid. %%------------------------------------------------------------------------------ -%% @doc -%% Subscribe Topics. -%% +%% @doc Subscribe Topics %% @end %%------------------------------------------------------------------------------ -spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}. @@ -197,9 +191,7 @@ subscribe(SessPid, Topics) when is_pid(SessPid) -> {ok, SessPid, GrantedQos}. %%------------------------------------------------------------------------------ -%% @doc -%% Unsubscribe Topics. -%% +%% @doc Unsubscribe Topics %% @end %%------------------------------------------------------------------------------ -spec unsubscribe(session(), [binary()]) -> {ok, session()}. @@ -220,9 +212,7 @@ unsubscribe(SessPid, Topics) when is_pid(SessPid) -> {ok, SessPid}. %%------------------------------------------------------------------------------ -%% @doc -%% Destroy Session. -%% +%% @doc Destroy Session %% @end %%------------------------------------------------------------------------------ -spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok. diff --git a/apps/emqttd/src/emqttd_session_sup.erl b/apps/emqttd/src/emqttd_session_sup.erl index 723efc532..b1c89c879 100644 --- a/apps/emqttd/src/emqttd_session_sup.erl +++ b/apps/emqttd/src/emqttd_session_sup.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_session_sup). --author('feng@emqtt.io'). +-author("Feng Lee "). -behavior(supervisor). @@ -47,6 +47,7 @@ start_session(ClientId, ClientPid) -> %%%============================================================================= %%% Supervisor callbacks %%%============================================================================= + init([SessOpts]) -> {ok, {{simple_one_for_one, 0, 1}, [{session, {emqttd_session, start_link, [SessOpts]}, diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index 7c65bfb5d..c44b9dd5d 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -36,6 +36,8 @@ %%%----------------------------------------------------------------------------- -module(emqttd_sm). +-author("Feng Lee "). + %%cleanSess: true | false -include("emqttd.hrl"). @@ -44,8 +46,6 @@ -define(SERVER, ?MODULE). --define(SESSION_TAB, mqtt_session). - %% API Function Exports -export([start_link/0]). @@ -55,7 +55,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {tab}). +-record(state, {tabid, statsfun}). + +-define(SESSION_TAB, mqtt_session). %%%============================================================================= %%% API @@ -65,9 +67,7 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%------------------------------------------------------------------------------ -%% @doc -%% Lookup Session Pid. -%% +%% @doc Lookup Session Pid %% @end %%------------------------------------------------------------------------------ -spec lookup_session(binary()) -> pid() | undefined. @@ -78,9 +78,7 @@ lookup_session(ClientId) -> end. %%------------------------------------------------------------------------------ -%% @doc -%% Start Session. -%% +%% @doc Start a session %% @end %%------------------------------------------------------------------------------ -spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}. @@ -88,9 +86,7 @@ start_session(ClientId, ClientPid) -> gen_server:call(?SERVER, {start_session, ClientId, ClientPid}). %%------------------------------------------------------------------------------ -%% @doc -%% Destroy Session. -%% +%% @doc Destroy a session %% @end %%------------------------------------------------------------------------------ -spec destroy_session(binary()) -> ok. @@ -104,9 +100,10 @@ destroy_session(ClientId) -> init([]) -> process_flag(trap_exit, true), TabId = ets:new(?SESSION_TAB, [set, protected, named_table]), - {ok, #state{tab = TabId}}. + StatsFun = emqttd_broker:statsfun('sessions/count', 'sessions/max'), + {ok, #state{tabid = TabId, statsfun = StatsFun}}. -handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tab = Tab}) -> +handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) -> Reply = case ets:lookup(Tab, ClientId) of [{_, SessPid, _MRef}] -> @@ -124,7 +121,7 @@ handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tab = Ta end, {reply, Reply, setstats(State)}; -handle_call({destroy_session, ClientId}, _From, State = #state{tab = Tab}) -> +handle_call({destroy_session, ClientId}, _From, State = #state{tabid = Tab}) -> case ets:lookup(Tab, ClientId) of [{_, SessPid, MRef}] -> emqttd_session:destroy(SessPid, ClientId), @@ -141,7 +138,7 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = Tab}) -> +handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tabid = Tab}) -> ets:match_delete(Tab, {'_', DownPid, MRef}), {noreply, setstats(State)}; @@ -158,9 +155,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -setstats(State) -> - emqttd_broker:setstats('sessions/count', - 'sessions/max', - ets:info(?SESSION_TAB, size)), State. - +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(ets:info(?SESSION_TAB, size)), State. diff --git a/apps/emqttd/src/emqttd_sup.erl b/apps/emqttd/src/emqttd_sup.erl index f894cafe7..184e9a45c 100644 --- a/apps/emqttd/src/emqttd_sup.erl +++ b/apps/emqttd/src/emqttd_sup.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_sup). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). diff --git a/apps/emqttd/src/emqttd_sysmon.erl b/apps/emqttd/src/emqttd_sysmon.erl index ed676b1dd..08892d203 100644 --- a/apps/emqttd/src/emqttd_sysmon.erl +++ b/apps/emqttd/src/emqttd_sysmon.erl @@ -29,7 +29,7 @@ -module(emqttd_sysmon). --author('feng@emqtt.io'). +-author("Feng Lee "). -behavior(gen_server). @@ -41,9 +41,7 @@ -record(state, {}). %%------------------------------------------------------------------------------ -%% @doc -%% Start emqttd monitor. -%% +%% @doc Start system monitor %% @end %%------------------------------------------------------------------------------ -spec start_link() -> {ok, pid()} | ignore | {error, term()}. @@ -93,4 +91,3 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. - diff --git a/apps/emqttd/src/emqttd_throttle.erl b/apps/emqttd/src/emqttd_throttle.erl index ce410d317..f7cdc7c77 100644 --- a/apps/emqttd/src/emqttd_throttle.erl +++ b/apps/emqttd/src/emqttd_throttle.erl @@ -20,11 +20,13 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd throttle. +%%% emqttd client throttle. %%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_throttle). -%% TODO:... 0.6.0... +-author("Feng Lee "). + +%% TODO:... 0.9.0... diff --git a/apps/emqttd/src/emqttd_trie.erl b/apps/emqttd/src/emqttd_trie.erl index b87cb23d2..e16ecc7a1 100644 --- a/apps/emqttd/src/emqttd_trie.erl +++ b/apps/emqttd/src/emqttd_trie.erl @@ -28,7 +28,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_trie). --author('feng@emqtt.io'). +-author("Feng Lee "). %% Mnesia Callbacks -export([mnesia/1]). @@ -62,9 +62,7 @@ %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc -%% Create trie tables. -%% +%% @doc Create trie tables %% @end %%------------------------------------------------------------------------------ -spec mnesia(boot | copy) -> ok. @@ -80,9 +78,7 @@ mnesia(boot) -> {attributes, record_info(fields, trie_node)}]); %%------------------------------------------------------------------------------ -%% @doc -%% Replicate trie tables. -%% +%% @doc Replicate trie tables %% @end %%------------------------------------------------------------------------------ mnesia(copy) -> @@ -94,9 +90,7 @@ mnesia(copy) -> %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc -%% Insert topic to trie tree. -%% +%% @doc Insert topic to trie tree %% @end %%------------------------------------------------------------------------------ -spec insert(Topic :: binary()) -> ok. @@ -114,9 +108,7 @@ insert(Topic) when is_binary(Topic) -> end. %%------------------------------------------------------------------------------ -%% @doc -%% Find trie nodes that match topic. -%% +%% @doc Find trie nodes that match topic %% @end %%------------------------------------------------------------------------------ -spec find(Topic :: binary()) -> list(MatchedTopic :: binary()). @@ -125,9 +117,7 @@ find(Topic) when is_binary(Topic) -> [Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined]. %%------------------------------------------------------------------------------ -%% @doc -%% Delete topic from trie tree. -%% +%% @doc Delete topic from trie tree %% @end %%------------------------------------------------------------------------------ -spec delete(Topic :: binary()) -> ok. diff --git a/apps/emqttd/src/emqttd_utils.erl b/apps/emqttd/src/emqttd_utils.erl index c5ceecef9..112b6171a 100644 --- a/apps/emqttd/src/emqttd_utils.erl +++ b/apps/emqttd/src/emqttd_utils.erl @@ -26,6 +26,8 @@ %%%----------------------------------------------------------------------------- -module(emqttd_utils). +-author("Feng Lee "). + -export([apply_module_attributes/1, all_module_attributes/1]). diff --git a/apps/emqttd/src/emqttd_vm.erl b/apps/emqttd/src/emqttd_vm.erl index 9aeaed6ad..f65eb3e20 100644 --- a/apps/emqttd/src/emqttd_vm.erl +++ b/apps/emqttd/src/emqttd_vm.erl @@ -26,6 +26,8 @@ %%%----------------------------------------------------------------------------- -module(emqttd_vm). +-author("Feng Lee "). + -export([loads/0]). loads() ->