code review

This commit is contained in:
Feng Lee 2015-04-22 01:31:42 +08:00
parent f7d44f88f1
commit 504fe99570
40 changed files with 324 additions and 337 deletions

View File

@ -1,7 +1,7 @@
{application, emqttd,
[
{description, "Erlang MQTT Broker"},
{vsn, "0.6.1"},
{vsn, "0.7.0"},
{modules, []},
{registered, []},
{applications, [kernel,

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_access_control).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
@ -55,9 +55,7 @@
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Start access control server.
%%
%% @doc Start access control server
%% @end
%%------------------------------------------------------------------------------
-spec start_link(AcOpts :: list()) -> {ok, pid()} | ignore | {error, any()}.
@ -65,9 +63,7 @@ start_link(AcOpts) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [AcOpts], []).
%%------------------------------------------------------------------------------
%% @doc
%% Authenticate client.
%%
%% @doc Authenticate MQTT Client
%% @end
%%------------------------------------------------------------------------------
-spec auth(mqtt_client(), undefined | binary()) -> ok | {error, string()}.
@ -83,9 +79,7 @@ auth(Client, Password, [{Mod, State} | Mods]) ->
end.
%%------------------------------------------------------------------------------
%% @doc
%% Check ACL.
%%
%% @doc Check ACL
%% @end
%%------------------------------------------------------------------------------
-spec check_acl(Client, PubSub, Topic) -> allow | deny when
@ -108,9 +102,7 @@ check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) ->
end.
%%------------------------------------------------------------------------------
%% @doc
%% Reload ACL.
%%
%% @doc Reload ACL
%% @end
%%------------------------------------------------------------------------------
-spec reload_acl() -> list() | {error, any()}.
@ -118,9 +110,7 @@ reload_acl() ->
[M:reload_acl(State) || {M, State} <- lookup_mods(acl)].
%%------------------------------------------------------------------------------
%% @doc
%% Register auth or ACL module.
%%
%% @doc Register authentication or ACL module
%% @end
%%------------------------------------------------------------------------------
-spec register_mod(Type :: auth | acl, Mod :: atom(), Opts :: list()) -> ok | {error, any()}.
@ -128,9 +118,7 @@ register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl->
gen_server:call(?SERVER, {register_mod, Type, Mod, Opts}).
%%------------------------------------------------------------------------------
%% @doc
%% Unregister auth or ACL module.
%%
%% @doc Unregister authentication or ACL module
%% @end
%%------------------------------------------------------------------------------
-spec unregister_mod(Type :: auth | acl, Mod :: atom()) -> ok | {error, any()}.
@ -138,9 +126,7 @@ unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl ->
gen_server:call(?SERVER, {unregister_mod, Type, Mod}).
%%------------------------------------------------------------------------------
%% @doc
%% Lookup authentication or ACL modules.
%%
%% @doc Lookup authentication or ACL modules
%% @end
%%------------------------------------------------------------------------------
-spec lookup_mods(auth | acl) -> list().
@ -155,9 +141,7 @@ tab_key(acl) ->
acl_modules.
%%------------------------------------------------------------------------------
%% @doc
%% Stop access control server.
%%
%% @doc Stop access control server
%% @end
%%------------------------------------------------------------------------------
stop() ->

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_access_rule).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
@ -48,12 +48,10 @@
-export([compile/1, match/3]).
%%%-----------------------------------------------------------------------------
%% @doc
%% Compile rule.
%%
%%------------------------------------------------------------------------------
%% @doc Compile access rule
%% @end
%%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
compile({A, all}) when (A =:= allow) orelse (A =:= deny) ->
{A, all};
@ -92,12 +90,10 @@ bin(L) when is_list(L) ->
bin(B) when is_binary(B) ->
B.
%%%-----------------------------------------------------------------------------
%% @doc
%% Match rule.
%%
%%------------------------------------------------------------------------------
%% @doc Match rule
%% @end
%%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
-spec match(mqtt_client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch.
match(_Client, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) ->
{matched, AllowDeny};

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_acl_internal).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
@ -45,7 +45,10 @@
%%% API
%%%=============================================================================
%% @doc Read all rules.
%%------------------------------------------------------------------------------
%% @doc Read all rules
%% @end
%%------------------------------------------------------------------------------
-spec all_rules() -> list(emqttd_access_rule:rule()).
all_rules() ->
case ets:lookup(?ACL_RULE_TAB, all_rules) of
@ -57,17 +60,20 @@ all_rules() ->
%%% ACL callbacks
%%%=============================================================================
%% @doc init internal ACL.
%%------------------------------------------------------------------------------
%% @doc Init internal ACL
%% @end
%%------------------------------------------------------------------------------
-spec init(AclOpts :: list()) -> {ok, State :: any()}.
init(AclOpts) ->
ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]),
AclFile = proplists:get_value(file, AclOpts),
Default = proplists:get_value(nomatch, AclOpts, allow),
State = #state{acl_file = AclFile, nomatch = Default},
load_rules(State),
load_rules_from_file(State),
{ok, State}.
load_rules(#state{acl_file = AclFile}) ->
load_rules_from_file(#state{acl_file = AclFile}) ->
{ok, Terms} = file:consult(AclFile),
Rules = [emqttd_access_rule:compile(Term) || Term <- Terms],
lists:foreach(fun(PubSub) ->
@ -89,7 +95,10 @@ filter(subscribe, {_AllowDeny, _Who, subscribe, _Topics}) ->
filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
false.
%% @doc Check ACL.
%%------------------------------------------------------------------------------
%% @doc Check ACL
%% @end
%%------------------------------------------------------------------------------
-spec check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when
Client :: mqtt_client(),
PubSub :: pubsub(),
@ -117,15 +126,21 @@ match(Client, Topic, [Rule|Rules]) ->
{matched, AllowDeny} -> {matched, AllowDeny}
end.
%% @doc Reload ACL.
%%------------------------------------------------------------------------------
%% @doc Reload ACL
%% @end
%%------------------------------------------------------------------------------
-spec reload_acl(State :: #state{}) -> ok | {error, Reason :: any()}.
reload_acl(State) ->
case catch load_rules(State) of
case catch load_rules_from_file(State) of
{'EXIT', Error} -> {error, Error};
_ -> ok
end.
%% @doc ACL Description.
%%------------------------------------------------------------------------------
%% @doc ACL Module Description
%% @end
%%------------------------------------------------------------------------------
-spec description() -> string().
description() ->
"Internal ACL with etc/acl.config".

View File

@ -20,13 +20,13 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd ACL behaviour.
%%% ACL module behaviour.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_acl_mod).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_app).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-behaviour(application).

View File

@ -20,13 +20,13 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd anonymous authentication.
%%% Anonymous authentication module.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_auth_anonymous).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-behaviour(emqttd_auth_mod).
@ -34,7 +34,7 @@
init(Opts) -> {ok, Opts}.
check(_User, _Password, _Opts) -> ok.
check(_Client, _Password, _Opts) -> ok.
description() -> "Anonymous authentication module".

View File

@ -20,13 +20,13 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd authentication with clientid.
%%% ClientId authentication module.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_auth_clientid).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
@ -36,30 +36,58 @@
-behaviour(emqttd_auth_mod).
%% emqttd_auth callbacks
%% emqttd_auth_mod callbacks
-export([init/1, check/3, description/0]).
-define(AUTH_CLIENTID_TAB, mqtt_auth_clientid).
-record(?AUTH_CLIENTID_TAB, {clientid, ipaddr, password}).
%%%=============================================================================
%%% API
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc Add clientid
%% @end
%%------------------------------------------------------------------------------
add_clientid(ClientId) when is_binary(ClientId) ->
R = #mqtt_auth_clientid{clientid = ClientId},
mnesia:transaction(fun() -> mnesia:write(R) end).
%%------------------------------------------------------------------------------
%% @doc Add clientid with password
%% @end
%%------------------------------------------------------------------------------
add_clientid(ClientId, Password) ->
R = #mqtt_auth_clientid{clientid = ClientId, password = Password},
mnesia:transaction(fun() -> mnesia:write(R) end).
%%------------------------------------------------------------------------------
%% @doc Lookup clientid
%% @end
%%------------------------------------------------------------------------------
lookup_clientid(ClientId) ->
mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId).
%%------------------------------------------------------------------------------
%% @doc Lookup all clientids
%% @end
%%------------------------------------------------------------------------------
all_clientids() ->
mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB).
%%------------------------------------------------------------------------------
%% @doc Remove clientid
%% @end
%%------------------------------------------------------------------------------
remove_clientid(ClientId) ->
mnesia:transaction(fun() -> mnesia:delete({?AUTH_CLIENTID_TAB, ClientId}) end).
%%%=============================================================================
%%% emqttd_auth_mod callbacks
%%%=============================================================================
init(Opts) ->
mnesia:create_table(?AUTH_CLIENTID_TAB, [
{ram_copies, [node()]},
@ -129,4 +157,3 @@ check_clientid_only(ClientId, IpAddr) ->
end
end.

View File

@ -26,13 +26,13 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_bridge).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server).
-include("emqttd.hrl").
-include_lib("emqtt/include/emqtt.hrl").
-include("emqttd.hrl").
-behaviour(gen_server).
%% API Function Exports
-export([start_link/3]).
@ -64,9 +64,7 @@
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Start a bridge.
%%
%% @doc Start a bridge
%% @end
%%------------------------------------------------------------------------------
-spec start_link(atom(), binary(), [option()]) -> {ok, pid()} | ignore | {error, term()}.
@ -103,7 +101,7 @@ parse_opts([{ping_down_interval, Interval} | Opts], State) ->
parse_opts(Opts, State#state{ping_down_interval = Interval*1000}).
handle_call(_Request, _From, State) ->
{reply, ok, State}.
{reply, error, State}.
handle_cast(_Msg, State) ->
{noreply, State}.

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_bridge_sup).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-behavior(supervisor).
@ -42,23 +42,20 @@
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Start bridge supervisor.
%%
%% @doc Start bridge supervisor
%% @end
%%------------------------------------------------------------------------------
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%%TODO: bridges...
-spec bridges() -> [{tuple(), pid()}].
bridges() ->
[{{Node, SubTopic}, Pid} || {{bridge, Node, SubTopic}, Pid, worker, _}
<- supervisor:which_children(?MODULE)].
%%------------------------------------------------------------------------------
%% @doc
%% Start a bridge.
%%
%% @doc Start a bridge
%% @end
%%------------------------------------------------------------------------------
-spec start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}.
@ -72,9 +69,7 @@ start_bridge(Node, SubTopic, Options) when is_atom(Node) and is_binary(SubTopic)
supervisor:start_child(?MODULE, bridge_spec(Node, SubTopic, Options1)).
%%------------------------------------------------------------------------------
%% @doc
%% Stop a bridge.
%%
%% @doc Stop a bridge
%% @end
%%------------------------------------------------------------------------------
-spec stop_bridge(atom(), binary()) -> {ok, pid()} | ok.

View File

@ -26,28 +26,32 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_broker).
-include_lib("emqtt/include/emqtt.hrl").
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd_systop.hrl").
-include_lib("emqtt/include/emqtt.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-define(BROKER_TAB, mqtt_broker).
%% API Function Exports
-export([start_link/1]).
-export([version/0, uptime/0, datetime/0, sysdescr/0]).
%% statistics API.
-export([getstats/0, getstat/1, setstat/2, setstats/3]).
-export([statsfun/1, statsfun/2,
getstats/0, getstat/1,
setstat/2, setstats/3]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(BROKER_TAB, mqtt_broker).
-record(state, {started_at, sys_interval, tick_timer}).
%%%=============================================================================
@ -55,9 +59,7 @@
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Start emqttd broker.
%%
%% @doc Start emqttd broker
%% @end
%%------------------------------------------------------------------------------
-spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}.
@ -65,9 +67,7 @@ start_link(Options) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []).
%%------------------------------------------------------------------------------
%% @doc
%% Get broker version.
%%
%% @doc Get broker version
%% @end
%%------------------------------------------------------------------------------
-spec version() -> string().
@ -75,9 +75,7 @@ version() ->
{ok, Version} = application:get_key(emqttd, vsn), Version.
%%------------------------------------------------------------------------------
%% @doc
%% Get broker description.
%%
%% @doc Get broker description
%% @end
%%------------------------------------------------------------------------------
-spec sysdescr() -> string().
@ -85,9 +83,7 @@ sysdescr() ->
{ok, Descr} = application:get_key(emqttd, description), Descr.
%%------------------------------------------------------------------------------
%% @doc
%% Get broker uptime.
%%
%% @doc Get broker uptime
%% @end
%%------------------------------------------------------------------------------
-spec uptime() -> string().
@ -95,9 +91,7 @@ uptime() ->
gen_server:call(?SERVER, uptime).
%%------------------------------------------------------------------------------
%% @doc
%% Get broker datetime.
%%
%% @doc Get broker datetime
%% @end
%%------------------------------------------------------------------------------
-spec datetime() -> string().
@ -108,9 +102,19 @@ datetime() ->
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
%%------------------------------------------------------------------------------
%% @doc
%% Get broker statistics.
%%
%% @doc Generate stats fun
%% @end
%%------------------------------------------------------------------------------
-spec statsfun(Stat :: atom()) -> fun().
statsfun(Stat) ->
fun(Val) -> setstat(Stat, Val) end.
-spec statsfun(Stat :: atom(), MaxStat :: atom()) -> fun().
statsfun(Stat, MaxStat) ->
fun(Val) -> setstats(Stat, MaxStat, Val) end.
%%------------------------------------------------------------------------------
%% @doc Get broker statistics
%% @end
%%------------------------------------------------------------------------------
-spec getstats() -> [{atom(), non_neg_integer()}].
@ -118,9 +122,7 @@ getstats() ->
ets:tab2list(?BROKER_TAB).
%%------------------------------------------------------------------------------
%% @doc
%% Get stats by name.
%%
%% @doc Get stats by name
%% @end
%%------------------------------------------------------------------------------
-spec getstat(atom()) -> non_neg_integer() | undefined.
@ -131,9 +133,7 @@ getstat(Name) ->
end.
%%------------------------------------------------------------------------------
%% @doc
%% Set broker stats.
%%
%% @doc Set broker stats
%% @end
%%------------------------------------------------------------------------------
-spec setstat(Stat :: atom(), Val :: pos_integer()) -> boolean().
@ -141,9 +141,7 @@ setstat(Stat, Val) ->
ets:update_element(?BROKER_TAB, Stat, {2, Val}).
%%------------------------------------------------------------------------------
%% @doc
%% Set stats with max.
%%
%% @doc Set stats with max
%% @end
%%------------------------------------------------------------------------------
-spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean().
@ -180,7 +178,7 @@ handle_call(uptime, _From, State) ->
{reply, uptime(State), State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
{reply, error, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
@ -252,4 +250,3 @@ tick(Delay, State) ->
i2b(I) when is_integer(I) ->
list_to_binary(integer_to_list(I)).

View File

@ -20,29 +20,27 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd client.
%%% MQTT Client
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_client).
-author('feng@emqtt.io').
-behaviour(gen_server).
-export([start_link/2, info/1]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3,
terminate/2]).
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqtt/include/emqtt.hrl").
-include_lib("emqtt/include/emqtt_packet.hrl").
%% API Function Exports
-export([start_link/2, info/1]).
-behaviour(gen_server).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
%%Client State...
-record(state, {transport,
socket,

View File

@ -26,10 +26,14 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_cluster).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-export([running_nodes/0]).
%%------------------------------------------------------------------------------
%% @doc Get running nodes
%% @end
%%------------------------------------------------------------------------------
running_nodes() ->
mnesia:system_info(running_db_nodes).

View File

@ -20,13 +20,13 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd client manager.
%%% MQTT Client Manager
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_cm).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server).
@ -37,18 +37,11 @@
-export([lookup/1, register/1, unregister/1]).
%% Stats
-export([getstats/0]).
%% gen_server Function Exports
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {tab}).
-record(state, {tab, statsfun}).
-define(CLIENT_TAB, mqtt_client).
@ -57,9 +50,7 @@
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Start client manager.
%%
%% @doc Start client manager
%% @end
%%------------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
@ -67,9 +58,7 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%------------------------------------------------------------------------------
%% @doc
%% Lookup client pid with clientId.
%%
%% @doc Lookup client pid with clientId
%% @end
%%------------------------------------------------------------------------------
-spec lookup(ClientId :: binary()) -> pid() | undefined.
@ -93,25 +82,13 @@ register(ClientId) when is_binary(ClientId) ->
end.
%%------------------------------------------------------------------------------
%% @doc
%% Unregister clientId with pid.
%%
%% @doc Unregister clientId with pid.
%% @end
%%------------------------------------------------------------------------------
-spec unregister(ClientId :: binary()) -> ok.
unregister(ClientId) when is_binary(ClientId) ->
gen_server:cast(?SERVER, {unregister, ClientId, self()}).
%%------------------------------------------------------------------------------
%% @doc
%% Get statistics of client manager.
%%
%% @end
%%------------------------------------------------------------------------------
getstats() ->
[{Name, emqttd_broker:getstat(Name)} ||
Name <- ['clients/count', 'clients/max']].
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
@ -121,7 +98,8 @@ init([]) ->
named_table,
public,
{write_concurrency, true}]),
{ok, #state{tab = TabId}}.
StatsFun = emqttd_broker:statsfun('clients/count', 'clients/max'),
{ok, #state{tab = TabId, statsfun = StatsFun}}.
handle_call(Req, _From, State) ->
lager:error("unexpected request: ~p", [Req]),
@ -188,9 +166,7 @@ registerd(Tab, {ClientId, Pid}) ->
false
end.
setstats(State) ->
emqttd_broker:setstats('clients/count',
'clients/max',
ets:info(?CLIENT_TAB, size)), State.
setstats(State = #state{statsfun = StatsFun}) ->
StatsFun(ets:info(?CLIENT_TAB, size)), State.

View File

@ -26,6 +26,8 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_config).
-author("Feng Lee <feng@emqtt.io>").
-define(SERVER, ?MODULE).
-behaviour(gen_server).

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_ctl).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
@ -49,6 +49,10 @@
useradd/1,
userdel/1]).
%%------------------------------------------------------------------------------
%% @doc Query node status
%% @end
%%------------------------------------------------------------------------------
status([]) ->
{InternalStatus, _ProvidedStatus} = init:get_status(),
?PRINT("Node ~p is ~p~n", [node(), InternalStatus]),
@ -59,6 +63,10 @@ status([]) ->
?PRINT_MSG("emqttd is running~n")
end.
%%------------------------------------------------------------------------------
%% @doc Cluster with other node
%% @end
%%------------------------------------------------------------------------------
cluster([]) ->
Nodes = [node()|nodes()],
?PRINT("cluster nodes: ~p~n", [Nodes]);
@ -77,9 +85,17 @@ cluster([SNode]) ->
?PRINT("failed to connect to ~p~n", [Node])
end.
%%------------------------------------------------------------------------------
%% @doc Add usern
%% @end
%%------------------------------------------------------------------------------
useradd([Username, Password]) ->
?PRINT("~p~n", [emqttd_auth_username:add_user(bin(Username), bin(Password))]).
%%------------------------------------------------------------------------------
%% @doc Delete user
%% @end
%%------------------------------------------------------------------------------
userdel([Username]) ->
?PRINT("~p~n", [emqttd_auth_username:remove_user(bin(Username))]).

View File

@ -26,27 +26,21 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_event).
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqtt/include/emqtt.hrl").
%% API Function Exports
-export([start_link/0,
add_handler/2,
notify/1]).
-export([start_link/0, add_handler/2, notify/1]).
%% gen_event Function Exports
-export([init/1,
handle_event/2,
handle_call/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([init/1, handle_event/2, handle_call/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {systop}).
%%------------------------------------------------------------------------------
%% @doc
%% Start emqttd event manager.
%%
%% @doc Start event manager
%% @end
%%------------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | {error, any()}.
@ -64,6 +58,7 @@ add_handler(Handler, Args) ->
notify(Event) ->
gen_event:notify(?MODULE, Event).
%%%=============================================================================
%%% gen_event callbacks
%%%=============================================================================
@ -108,9 +103,9 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
payload(connected, Params) ->
From = proplists:get_value(from, Params),

View File

@ -26,12 +26,12 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_http).
-author('feng@emqtt.io').
-include_lib("emqtt/include/emqtt.hrl").
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-include_lib("emqtt/include/emqtt.hrl").
-import(proplists, [get_value/2, get_value/3]).
-export([handle/1]).

View File

@ -19,23 +19,25 @@
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd keepalive.
%%% @doc client keepalive
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_keepalive).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-export([new/3, resume/1, cancel/1]).
-record(keepalive, {transport, socket, recv_oct, timeout_sec, timeout_msg, timer_ref}).
-record(keepalive, {transport,
socket,
recv_oct,
timeout_sec,
timeout_msg,
timer_ref}).
%%------------------------------------------------------------------------------
%% @doc
%% Create a keepalive.
%%
%% @doc Create a keepalive
%% @end
%%------------------------------------------------------------------------------
new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 ->
@ -49,9 +51,7 @@ new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 ->
timer_ref = Ref}.
%%------------------------------------------------------------------------------
%% @doc
%% Try to resume keepalive, called when timeout.
%%
%% @doc Try to resume keepalive, called when timeout
%% @end
%%------------------------------------------------------------------------------
resume(KeepAlive = #keepalive {transport = Transport,
@ -72,9 +72,7 @@ resume(KeepAlive = #keepalive {transport = Transport,
end.
%%------------------------------------------------------------------------------
%% @doc
%% Cancel Keepalive.
%%
%% @doc Cancel Keepalive
%% @end
%%------------------------------------------------------------------------------
cancel(#keepalive{timer_ref = Ref}) ->

View File

@ -26,18 +26,16 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_metrics).
-author('feng@emqtt.io').
-include_lib("emqtt/include/emqtt.hrl").
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd_systop.hrl").
-include_lib("emqtt/include/emqtt.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-define(METRIC_TAB, mqtt_broker_metric).
%% API Function Exports
-export([start_link/1]).
@ -50,6 +48,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(METRIC_TAB, mqtt_metric).
-record(state, {pub_interval, tick_timer}).
%%%=============================================================================
@ -57,9 +57,7 @@
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Start emqttd metrics.
%%
%% @doc Start metrics server
%% @end
%%------------------------------------------------------------------------------
-spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}.
@ -67,9 +65,7 @@ start_link(Options) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []).
%%------------------------------------------------------------------------------
%% @doc
%% Get all metrics.
%%
%% @doc Get all metrics
%% @end
%%------------------------------------------------------------------------------
-spec all() -> [{atom(), non_neg_integer()}].
@ -84,9 +80,7 @@ all() ->
end, #{}, ?METRIC_TAB)).
%%------------------------------------------------------------------------------
%% @doc
%% Get metric value.
%%
%% @doc Get metric value
%% @end
%%------------------------------------------------------------------------------
-spec value(atom()) -> non_neg_integer().
@ -94,9 +88,7 @@ value(Metric) ->
lists:sum(ets:select(?METRIC_TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
%%------------------------------------------------------------------------------
%% @doc
%% Increase counter.
%%
%% @doc Increase counter
%% @end
%%------------------------------------------------------------------------------
-spec inc(atom()) -> non_neg_integer().
@ -104,9 +96,7 @@ inc(Metric) ->
inc(counter, Metric, 1).
%%------------------------------------------------------------------------------
%% @doc
%% Increase metric value.
%%
%% @doc Increase metric value
%% @end
%%------------------------------------------------------------------------------
-spec inc(counter | gauge, atom()) -> non_neg_integer().
@ -118,9 +108,7 @@ inc(Metric, Val) when is_atom(Metric) and is_integer(Val) ->
inc(counter, Metric, Val).
%%------------------------------------------------------------------------------
%% @doc
%% Increase metric value.
%%
%% @doc Increase metric value
%% @end
%%------------------------------------------------------------------------------
-spec inc(counter | gauge, atom(), pos_integer()) -> pos_integer().
@ -130,9 +118,7 @@ inc(counter, Metric, Val) ->
ets:update_counter(?METRIC_TAB, key(counter, Metric), {2, Val}).
%%------------------------------------------------------------------------------
%% @doc
%% Decrease metric value.
%%
%% @doc Decrease metric value
%% @end
%%------------------------------------------------------------------------------
-spec dec(gauge, atom()) -> integer().
@ -140,9 +126,7 @@ dec(gauge, Metric) ->
dec(gauge, Metric, 1).
%%------------------------------------------------------------------------------
%% @doc
%% Decrease metric value
%%
%% @doc Decrease metric value
%% @end
%%------------------------------------------------------------------------------
-spec dec(gauge, atom(), pos_integer()) -> integer().
@ -150,9 +134,7 @@ dec(gauge, Metric, Val) ->
ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, -Val}).
%%------------------------------------------------------------------------------
%% @doc
%% Set metric value.
%%
%% @doc Set metric value
%% @end
%%------------------------------------------------------------------------------
set(Metric, Val) when is_atom(Metric) ->
@ -161,10 +143,7 @@ set(gauge, Metric, Val) ->
ets:insert(?METRIC_TAB, {key(gauge, Metric), Val}).
%%------------------------------------------------------------------------------
%% @doc
%% @private
%% Metric Key
%%
%% @doc Metric Key
%% @end
%%------------------------------------------------------------------------------
key(gauge, Metric) ->
@ -192,19 +171,19 @@ init([Options]) ->
end,
{ok, tick(Delay, #state{pub_interval = PubInterval}), hibernate}.
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_call(_Req, _From, State) ->
{reply, {error, badreq}, State}.
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(tick, State) ->
% publish metric message
[publish(systop(Metric), i2b(Val))|| {Metric, Val} <- all()],
{noreply, tick(State), hibernate};
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
@ -241,4 +220,3 @@ tick(Delay, State) ->
i2b(I) ->
list_to_binary(integer_to_list(I)).

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_msg_store).
-author('feng@slimpp.io').
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqtt/include/emqtt.hrl").
@ -58,9 +58,7 @@ mnesia(copy) ->
%%%=============================================================================
%%%-----------------------------------------------------------------------------
%% @doc
%% Retain message.
%%
%% @doc Retain message
%% @end
%%%-----------------------------------------------------------------------------
-spec retain(mqtt_message()) -> ok | ignore.
@ -100,7 +98,10 @@ env() ->
Env
end.
%% @doc redeliver retained messages to subscribed client.
%%%-----------------------------------------------------------------------------
%% @doc Redeliver retained messages to subscribed client
%% @end
%%%-----------------------------------------------------------------------------
-spec redeliver(Topic, CPid) -> any() when
Topic :: binary(),
CPid :: pid().
@ -126,4 +127,3 @@ dispatch(CPid, Msgs) when is_list(Msgs) ->
dispatch(CPid, Msg) when is_record(Msg, mqtt_message) ->
CPid ! {dispatch, {self(), Msg}}.

View File

@ -26,14 +26,14 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_net).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-include_lib("kernel/include/inet.hrl").
-export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]).
-export([peername/1, sockname/1, format/2, format/1, connection_string/2]).
-include_lib("kernel/include/inet.hrl").
-define(FIRST_TEST_BIND_PORT, 10000).
%%--------------------------------------------------------------------

View File

@ -26,8 +26,14 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_opts).
-author("Feng Lee <feng@emqtt.io>").
-export([merge/2]).
%%%-----------------------------------------------------------------------------
%% @doc Merge Options
%% @end
%%%-----------------------------------------------------------------------------
merge(Defaults, Options) ->
lists:foldl(
fun({Opt, Val}, Acc) ->
@ -44,5 +50,3 @@ merge(Defaults, Options) ->
end
end, Defaults, Options).

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_plugin).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server).

View File

@ -26,34 +26,28 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_plugin_manager).
-author("Feng Lee <feng@emqtt.io>").
-export([list/0, load/1, unload/1]).
%%------------------------------------------------------------------------------
%% @doc
%% List all loaded plugins.
%%
%% @doc List all loaded plugins
%% @end
%%------------------------------------------------------------------------------
list() ->
[].
%%------------------------------------------------------------------------------
%% @doc
%% Load Plugin.
%%
%% @doc Load Plugin
%% @end
%%------------------------------------------------------------------------------
load(Name) when is_atom(Name) ->
ok.
%%------------------------------------------------------------------------------
%% @doc
%% Unload Plugin.
%%
%% @doc Unload Plugin
%% @end
%%------------------------------------------------------------------------------
unload(Name) when is_atom(Name) ->
ok.

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_pooler).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server).
@ -46,12 +46,24 @@
start_link(I) ->
gen_server:start_link(?MODULE, [I], []).
%%------------------------------------------------------------------------------
%% @doc Submit work to pooler
%% @end
%%------------------------------------------------------------------------------
submit(Fun) ->
gen_server:call(gproc_pool:pick(pooler), {submit, Fun}, infinity).
%%------------------------------------------------------------------------------
%% @doc Submit work to pooler asynchronously
%% @end
%%------------------------------------------------------------------------------
async_submit(Fun) ->
gen_server:cast(gproc_pool:pick(pooler), {async_submit, Fun}).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([I]) ->
gproc_pool:connect_worker(pooler, {pooler, I}),
{ok, #state{id = I}}.

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_pooler_sup).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
@ -55,3 +55,4 @@ init([PoolSize]) ->
end, lists:seq(1, PoolSize)),
{ok, {{one_for_all, 10, 100}, Children}}.

View File

@ -26,6 +26,8 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_protocol).
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqtt/include/emqtt.hrl").
-include_lib("emqtt/include/emqtt_packet.hrl").

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_pubsub).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
@ -88,7 +88,8 @@ mnesia(copy) ->
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc Start one pubsub.
%% @doc Start one pubsub server
%% @end
%%------------------------------------------------------------------------------
-spec start_link(Id, Opts) -> {ok, pid()} | ignore | {error, any()} when
Id :: pos_integer(),
@ -97,30 +98,30 @@ start_link(Id, Opts) ->
gen_server:start_link(?MODULE, [Id, Opts], []).
%%------------------------------------------------------------------------------
%% @doc Create topic. Notice That this transaction is not protected by pubsub pool.
%% @doc Create topic. Notice That this transaction is not protected by pubsub pool
%% @end
%%------------------------------------------------------------------------------
-spec create(Topic :: binary()) -> ok | {error, Error :: any()}.
create(Topic) when is_binary(Topic) ->
TopicR = #mqtt_topic{topic = Topic, node = node()},
case mnesia:transaction(fun add_topic/1, [TopicR]) of
{atomic, ok} -> setstats(topics), ok;
{aborted, Error} -> {error, Error}
end.
call({create, Topic}).
%%------------------------------------------------------------------------------
%% @doc Subscribe topic.
%% @doc Subscribe topic
%% @end
%%------------------------------------------------------------------------------
-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when
Topic :: binary(),
Qos :: mqtt_qos().
subscribe({Topic, Qos}) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
call({subscribe, self(), Topic, Qos});
subscribe(Topics = [{_Topic, _Qos} | _]) ->
call({subscribe, self(), Topics}).
%%------------------------------------------------------------------------------
%% @doc Unsubscribe Topic or Topics
%% @end
%%------------------------------------------------------------------------------
-spec unsubscribe(binary() | list(binary())) -> ok.
unsubscribe(Topic) when is_binary(Topic) ->
cast({unsubscribe, self(), Topic});
@ -137,7 +138,8 @@ cast(Msg) ->
gen_server:cast(Pid, Msg).
%%------------------------------------------------------------------------------
%% @doc Publish to cluster nodes.
%% @doc Publish to cluster nodes
%% @end
%%------------------------------------------------------------------------------
-spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok.
publish(From, Msg=#mqtt_message{topic=Topic}) ->
@ -159,7 +161,10 @@ publish(_From, Topic, Msg) when is_binary(Topic) ->
end
end, match(Topic)).
%%------------------------------------------------------------------------------
%% @doc Dispatch message locally. should only be called by publish.
%% @end
%%------------------------------------------------------------------------------
-spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
Subscribers = mnesia:dirty_read(subscriber, Topic),
@ -188,6 +193,16 @@ init([Id, _Opts]) ->
gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
{ok, #state{id = Id, submap = maps:new()}}.
handle_call({create, Topic}, _From, State) ->
TopicR = #mqtt_topic{topic = Topic, node = node()},
Reply =
case mnesia:transaction(fun add_topic/1, [TopicR]) of
{atomic, ok} -> ok;
{aborted, Error} -> {error, Error}
end,
setstats(topics),
{reply, Reply, State};
handle_call({subscribe, SubPid, Topics}, _From, State) ->
TopicSubs = lists:map(fun({Topic, Qos}) ->
{#mqtt_topic{topic = Topic, node = node()},

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_pubsub_sup).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").

View File

@ -29,6 +29,8 @@
-module(emqttd_queue).
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqtt/include/emqtt.hrl").
-export([new/1, new/2, in/3, all/1, clear/1]).

View File

@ -26,10 +26,14 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_session).
-include_lib("emqtt/include/emqtt.hrl").
-include_lib("emqtt/include/emqtt_packet.hrl").
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-include_lib("emqtt/include/emqtt.hrl").
-include_lib("emqtt/include/emqtt_packet.hrl").
%% API Function Exports
-export([start/1,
resume/3,
@ -67,9 +71,7 @@
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Start Session.
%%
%% @doc Start Session
%% @end
%%------------------------------------------------------------------------------
-spec start({boolean(), binary(), pid()}) -> {ok, session()}.
@ -83,9 +85,7 @@ start({false = _CleanSess, ClientId, ClientPid}) ->
{ok, SessPid}.
%%------------------------------------------------------------------------------
%% @doc
%% Resume Session.
%%
%% @doc Resume Session
%% @end
%%------------------------------------------------------------------------------
-spec resume(session(), binary(), pid()) -> session().
@ -96,9 +96,7 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
SessPid.
%%------------------------------------------------------------------------------
%% @doc
%% Publish message.
%%
%% @doc Publish message
%% @end
%%------------------------------------------------------------------------------
-spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session().
@ -118,9 +116,7 @@ publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) ->
SessPid.
%%------------------------------------------------------------------------------
%% @doc
%% PubAck message.
%%
%% @doc PubAck message
%% @end
%%------------------------------------------------------------------------------
-spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session().
@ -172,9 +168,7 @@ puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {pubcomp, PacketId}), SessPid.
%%------------------------------------------------------------------------------
%% @doc
%% Subscribe Topics.
%%
%% @doc Subscribe Topics
%% @end
%%------------------------------------------------------------------------------
-spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}.
@ -197,9 +191,7 @@ subscribe(SessPid, Topics) when is_pid(SessPid) ->
{ok, SessPid, GrantedQos}.
%%------------------------------------------------------------------------------
%% @doc
%% Unsubscribe Topics.
%%
%% @doc Unsubscribe Topics
%% @end
%%------------------------------------------------------------------------------
-spec unsubscribe(session(), [binary()]) -> {ok, session()}.
@ -220,9 +212,7 @@ unsubscribe(SessPid, Topics) when is_pid(SessPid) ->
{ok, SessPid}.
%%------------------------------------------------------------------------------
%% @doc
%% Destroy Session.
%%
%% @doc Destroy Session
%% @end
%%------------------------------------------------------------------------------
-spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok.

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_session_sup).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-behavior(supervisor).
@ -47,6 +47,7 @@ start_session(ClientId, ClientPid) ->
%%%=============================================================================
%%% Supervisor callbacks
%%%=============================================================================
init([SessOpts]) ->
{ok, {{simple_one_for_one, 0, 1},
[{session, {emqttd_session, start_link, [SessOpts]},

View File

@ -36,6 +36,8 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_sm).
-author("Feng Lee <feng@emqtt.io>").
%%cleanSess: true | false
-include("emqttd.hrl").
@ -44,8 +46,6 @@
-define(SERVER, ?MODULE).
-define(SESSION_TAB, mqtt_session).
%% API Function Exports
-export([start_link/0]).
@ -55,7 +55,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {tab}).
-record(state, {tabid, statsfun}).
-define(SESSION_TAB, mqtt_session).
%%%=============================================================================
%%% API
@ -65,9 +67,7 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%------------------------------------------------------------------------------
%% @doc
%% Lookup Session Pid.
%%
%% @doc Lookup Session Pid
%% @end
%%------------------------------------------------------------------------------
-spec lookup_session(binary()) -> pid() | undefined.
@ -78,9 +78,7 @@ lookup_session(ClientId) ->
end.
%%------------------------------------------------------------------------------
%% @doc
%% Start Session.
%%
%% @doc Start a session
%% @end
%%------------------------------------------------------------------------------
-spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}.
@ -88,9 +86,7 @@ start_session(ClientId, ClientPid) ->
gen_server:call(?SERVER, {start_session, ClientId, ClientPid}).
%%------------------------------------------------------------------------------
%% @doc
%% Destroy Session.
%%
%% @doc Destroy a session
%% @end
%%------------------------------------------------------------------------------
-spec destroy_session(binary()) -> ok.
@ -104,9 +100,10 @@ destroy_session(ClientId) ->
init([]) ->
process_flag(trap_exit, true),
TabId = ets:new(?SESSION_TAB, [set, protected, named_table]),
{ok, #state{tab = TabId}}.
StatsFun = emqttd_broker:statsfun('sessions/count', 'sessions/max'),
{ok, #state{tabid = TabId, statsfun = StatsFun}}.
handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tab = Tab}) ->
handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) ->
Reply =
case ets:lookup(Tab, ClientId) of
[{_, SessPid, _MRef}] ->
@ -124,7 +121,7 @@ handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tab = Ta
end,
{reply, Reply, setstats(State)};
handle_call({destroy_session, ClientId}, _From, State = #state{tab = Tab}) ->
handle_call({destroy_session, ClientId}, _From, State = #state{tabid = Tab}) ->
case ets:lookup(Tab, ClientId) of
[{_, SessPid, MRef}] ->
emqttd_session:destroy(SessPid, ClientId),
@ -141,7 +138,7 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = Tab}) ->
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tabid = Tab}) ->
ets:match_delete(Tab, {'_', DownPid, MRef}),
{noreply, setstats(State)};
@ -158,9 +155,6 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%=============================================================================
setstats(State) ->
emqttd_broker:setstats('sessions/count',
'sessions/max',
ets:info(?SESSION_TAB, size)), State.
setstats(State = #state{statsfun = StatsFun}) ->
StatsFun(ets:info(?SESSION_TAB, size)), State.

View File

@ -26,7 +26,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_sup).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").

View File

@ -29,7 +29,7 @@
-module(emqttd_sysmon).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
-behavior(gen_server).
@ -41,9 +41,7 @@
-record(state, {}).
%%------------------------------------------------------------------------------
%% @doc
%% Start emqttd monitor.
%%
%% @doc Start system monitor
%% @end
%%------------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
@ -93,4 +91,3 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -20,11 +20,13 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd throttle.
%%% emqttd client throttle.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_throttle).
%% TODO:... 0.6.0...
-author("Feng Lee <feng@emqtt.io>").
%% TODO:... 0.9.0...

View File

@ -28,7 +28,7 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_trie).
-author('feng@emqtt.io').
-author("Feng Lee <feng@emqtt.io>").
%% Mnesia Callbacks
-export([mnesia/1]).
@ -62,9 +62,7 @@
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Create trie tables.
%%
%% @doc Create trie tables
%% @end
%%------------------------------------------------------------------------------
-spec mnesia(boot | copy) -> ok.
@ -80,9 +78,7 @@ mnesia(boot) ->
{attributes, record_info(fields, trie_node)}]);
%%------------------------------------------------------------------------------
%% @doc
%% Replicate trie tables.
%%
%% @doc Replicate trie tables
%% @end
%%------------------------------------------------------------------------------
mnesia(copy) ->
@ -94,9 +90,7 @@ mnesia(copy) ->
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Insert topic to trie tree.
%%
%% @doc Insert topic to trie tree
%% @end
%%------------------------------------------------------------------------------
-spec insert(Topic :: binary()) -> ok.
@ -114,9 +108,7 @@ insert(Topic) when is_binary(Topic) ->
end.
%%------------------------------------------------------------------------------
%% @doc
%% Find trie nodes that match topic.
%%
%% @doc Find trie nodes that match topic
%% @end
%%------------------------------------------------------------------------------
-spec find(Topic :: binary()) -> list(MatchedTopic :: binary()).
@ -125,9 +117,7 @@ find(Topic) when is_binary(Topic) ->
[Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined].
%%------------------------------------------------------------------------------
%% @doc
%% Delete topic from trie tree.
%%
%% @doc Delete topic from trie tree
%% @end
%%------------------------------------------------------------------------------
-spec delete(Topic :: binary()) -> ok.

View File

@ -26,6 +26,8 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_utils).
-author("Feng Lee <feng@emqtt.io>").
-export([apply_module_attributes/1,
all_module_attributes/1]).

View File

@ -26,6 +26,8 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_vm).
-author("Feng Lee <feng@emqtt.io>").
-export([loads/0]).
loads() ->