add emqttd_time module, fix comments and format

This commit is contained in:
Feng 2016-02-11 15:52:59 +08:00
parent 1560386ab2
commit 89052d8e6e
56 changed files with 61 additions and 181 deletions

View File

@ -14,11 +14,8 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Authentication and ACL Control.
-module(emqttd_access_control). -module(emqttd_access_control).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
-behaviour(gen_server). -behaviour(gen_server).
@ -43,6 +40,8 @@
-type password() :: undefined | binary(). -type password() :: undefined | binary().
-record(state, {}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -128,10 +127,9 @@ stop() -> gen_server:call(?MODULE, stop).
init([Opts]) -> init([Opts]) ->
ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, Opts))}), ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, Opts))}),
ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, Opts))}), ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, Opts))}),
{ok, state}. {ok, #state{}}.
init_mods(auth, AuthMods) -> init_mods(auth, AuthMods) ->
[init_mod(authmod(Name), Opts) || {Name, Opts} <- AuthMods]; [init_mod(authmod(Name), Opts) || {Name, Opts} <- AuthMods];
@ -151,7 +149,8 @@ handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) -> NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) ->
Seq1 >= Seq2 Seq1 >= Seq2
end, [{Mod, ModState, Seq} | Mods]), end, [{Mod, ModState, Seq} | Mods]),
ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), NewMods}); ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), NewMods}),
ok;
{error, Error} -> {error, Error} ->
lager:error("Access Control: register ~s error - ~p", [Mod, Error]), lager:error("Access Control: register ~s error - ~p", [Mod, Error]),
{error, Error}; {error, Error};

View File

@ -1,4 +1,4 @@
%% ------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
@ -12,13 +12,10 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%% ------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Access Rule.
-module(emqttd_access_rule). -module(emqttd_access_rule).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
-type who() :: all | binary() | -type who() :: all | binary() |

View File

@ -14,17 +14,14 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Internal ACL that load rules from etc/acl.config
-module(emqttd_acl_internal). -module(emqttd_acl_internal).
-author("Feng Lee <feng@emqtt.io>"). -behaviour(emqttd_acl_mod).
-include("emqttd.hrl"). -include("emqttd.hrl").
-export([all_rules/0]). -export([all_rules/0]).
-behaviour(emqttd_acl_mod).
%% ACL callbacks %% ACL callbacks
-export([init/1, check_acl/2, reload_acl/1, description/0]). -export([init/1, check_acl/2, reload_acl/1, description/0]).
@ -55,10 +52,8 @@ init(AclOpts) ->
AclFile = proplists:get_value(file, AclOpts), AclFile = proplists:get_value(file, AclOpts),
Default = proplists:get_value(nomatch, AclOpts, allow), Default = proplists:get_value(nomatch, AclOpts, allow),
State = #state{acl_file = AclFile, nomatch = Default}, State = #state{acl_file = AclFile, nomatch = Default},
case load_rules_from_file(State) of true = load_rules_from_file(State),
ok -> {ok, State}; {ok, State}.
{error, Error} -> {error, Error}
end.
load_rules_from_file(#state{acl_file = AclFile}) -> load_rules_from_file(#state{acl_file = AclFile}) ->
{ok, Terms} = file:consult(AclFile), {ok, Terms} = file:consult(AclFile),

View File

@ -14,11 +14,8 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc ACL module behaviour.
-module(emqttd_acl_mod). -module(emqttd_acl_mod).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -14,11 +14,8 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd application.
-module(emqttd_app). -module(emqttd_app).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd_cli.hrl"). -include("emqttd_cli.hrl").
-behaviour(application). -behaviour(application).

View File

@ -17,8 +17,6 @@
%% @doc Anonymous Authentication Module %% @doc Anonymous Authentication Module
-module(emqttd_auth_anonymous). -module(emqttd_auth_anonymous).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(emqttd_auth_mod). -behaviour(emqttd_auth_mod).
-export([init/1, check/3, description/0]). -export([init/1, check/3, description/0]).

View File

@ -14,11 +14,8 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc ClientId Authentication Module.
-module(emqttd_auth_clientid). -module(emqttd_auth_clientid).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
-export([add_clientid/1, add_clientid/2, lookup_clientid/1, remove_clientid/1, -export([add_clientid/1, add_clientid/2, lookup_clientid/1, remove_clientid/1,

View File

@ -17,8 +17,6 @@
%% @doc LDAP Authentication Module %% @doc LDAP Authentication Module
-module(emqttd_auth_ldap). -module(emqttd_auth_ldap).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
-import(proplists, [get_value/2, get_value/3]). -import(proplists, [get_value/2, get_value/3]).

View File

@ -17,8 +17,6 @@
%% @doc Authentication Behaviour. %% @doc Authentication Behaviour.
-module(emqttd_auth_mod). -module(emqttd_auth_mod).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
-export([passwd_hash/2]). -export([passwd_hash/2]).

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd bridge
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_bridge). -module(emqttd_bridge).
-behaviour(gen_server2). -behaviour(gen_server2).

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Bridge Supervisor
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_bridge_sup). -module(emqttd_bridge_sup).
-behavior(supervisor). -behavior(supervisor).
@ -57,8 +55,8 @@ start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) -
stop_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> stop_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
ChildId = ?BRIDGE_ID(Node, Topic), ChildId = ?BRIDGE_ID(Node, Topic),
case supervisor:terminate_child(?MODULE, ChildId) of case supervisor:terminate_child(?MODULE, ChildId) of
ok -> supervisor:delete_child(?MODULE, ChildId); ok -> supervisor:delete_child(?MODULE, ChildId);
{error, Reason} -> {error, Reason} Error -> Error
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc MQTT Client Connection %% @doc MQTT Client Connection
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_client). -module(emqttd_client).
-behaviour(gen_server). -behaviour(gen_server).

View File

@ -15,9 +15,10 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc MQTT Client Manager %% @doc MQTT Client Manager
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_cm). -module(emqttd_cm).
-behaviour(gen_server2).
-include("emqttd.hrl"). -include("emqttd.hrl").
-include("emqttd_internal.hrl"). -include("emqttd_internal.hrl").
@ -27,8 +28,6 @@
-export([lookup/1, lookup_proc/1, register/1, unregister/1]). -export([lookup/1, lookup_proc/1, register/1, unregister/1]).
-behaviour(gen_server2).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Client Manager Supervisor. %% @doc Client Manager Supervisor.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_cm_sup). -module(emqttd_cm_sup).
-behaviour(supervisor). -behaviour(supervisor).

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd control
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_ctl). -module(emqttd_ctl).
-behaviour(gen_server). -behaviour(gen_server).
@ -27,10 +25,7 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
%% API Function Exports %% API Function Exports
-export([start_link/0, -export([start_link/0, register_cmd/3, unregister_cmd/1, run/1]).
register_cmd/3,
unregister_cmd/1,
run/1]).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd gen_mod behaviour %% @doc emqttd gen_mod behaviour
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_gen_mod). -module(emqttd_gen_mod).
-include("emqttd.hrl"). -include("emqttd.hrl").

View File

@ -27,8 +27,6 @@
%% 4. Sequence: 2 bytes sequence in one process %% 4. Sequence: 2 bytes sequence in one process
%% %%
%% @end %% @end
%%
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_guid). -module(emqttd_guid).
-export([gen/0, new/0, timestamp/1]). -export([gen/0, new/0, timestamp/1]).

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd http publish API and websocket client. %% @doc emqttd http publish API and websocket client.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_http). -module(emqttd_http).
-include("emqttd.hrl"). -include("emqttd.hrl").

View File

@ -14,8 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc client keepalive %% @doc Client Keepalive
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_keepalive). -module(emqttd_keepalive).
-export([start/3, check/1, cancel/1]). -export([start/3, check/1, cancel/1]).

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc MQTT Message Functions %% @doc MQTT Message Functions
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_message). -module(emqttd_message).
-include("emqttd.hrl"). -include("emqttd.hrl").

View File

@ -14,8 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd metrics. responsible for collecting broker metrics %% @doc emqttd metrics. responsible for collecting broker metrics.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_metrics). -module(emqttd_metrics).
-behaviour(gen_server). -behaviour(gen_server).
@ -32,10 +31,7 @@
%% Received/Sent Metrics %% Received/Sent Metrics
-export([received/1, sent/1]). -export([received/1, sent/1]).
-export([all/0, value/1, -export([all/0, value/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2]).
inc/1, inc/2, inc/3,
dec/2, dec/3,
set/2]).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -240,7 +236,7 @@ key(counter, Metric) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
emqttd:seed_now(), emqttd_time:seed(),
Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
% Create metrics table % Create metrics table
ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]), ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
@ -276,8 +272,7 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
publish(Metric, Val) -> publish(Metric, Val) ->
Payload = emqttd_util:integer_to_binary(Val), Msg = emqttd_message:make(metrics, metric_topic(Metric), bin(Val)),
Msg = emqttd_message:make(metrics, metric_topic(Metric), Payload),
emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).
create_metric({gauge, Name}) -> create_metric({gauge, Name}) ->
@ -290,3 +285,5 @@ create_metric({counter, Name}) ->
metric_topic(Metric) -> metric_topic(Metric) ->
emqttd_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))). emqttd_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))).
bin(I) when is_integer(I) -> list_to_binary(integer_to_list(I)).

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd presence management module %% @doc emqttd presence management module
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_mod_presence). -module(emqttd_mod_presence).
-behaviour(emqttd_gen_mod). -behaviour(emqttd_gen_mod).
@ -48,7 +47,7 @@ client_connected(ConnAck, #mqtt_client{client_id = ClientId,
{session, Sess}, {session, Sess},
{protocol, ProtoVer}, {protocol, ProtoVer},
{connack, ConnAck}, {connack, ConnAck},
{ts, emqttd_util:now_to_secs()}]), {ts, emqttd_time:now_to_secs()}]),
Msg = emqttd_message:make(presence, Msg = emqttd_message:make(presence,
proplists:get_value(qos, Opts, 0), proplists:get_value(qos, Opts, 0),
topic(connected, ClientId), topic(connected, ClientId),
@ -58,7 +57,7 @@ client_connected(ConnAck, #mqtt_client{client_id = ClientId,
client_disconnected(Reason, ClientId, Opts) -> client_disconnected(Reason, ClientId, Opts) ->
Json = mochijson2:encode([{clientid, ClientId}, Json = mochijson2:encode([{clientid, ClientId},
{reason, reason(Reason)}, {reason, reason(Reason)},
{ts, emqttd_util:now_to_secs()}]), {ts, emqttd_time:now_to_secs()}]),
Msg = emqttd_message:make(presence, Msg = emqttd_message:make(presence,
proplists:get_value(qos, Opts, 0), proplists:get_value(qos, Opts, 0),
topic(disconnected, ClientId), topic(disconnected, ClientId),

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd rewrite module %% @doc emqttd rewrite module
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_mod_rewrite). -module(emqttd_mod_rewrite).
-behaviour(emqttd_gen_mod). -behaviour(emqttd_gen_mod).

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Subscription from Broker Side %% @doc Subscription from Broker Side
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_mod_subscription). -module(emqttd_mod_subscription).
-behaviour(emqttd_gen_mod). -behaviour(emqttd_gen_mod).

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd module supervisor.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_mod_sup). -module(emqttd_mod_sup).
-behaviour(supervisor). -behaviour(supervisor).
@ -48,7 +46,6 @@ start_child(ChildSpec) when is_tuple(ChildSpec) ->
start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Supervisor callbacks %% Supervisor callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -40,8 +40,7 @@
%% otherwise dropped the oldest one. %% otherwise dropped the oldest one.
%% %%
%% @end %% @end
%%
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_mqueue). -module(emqttd_mqueue).
-include("emqttd.hrl"). -include("emqttd.hrl").

View File

@ -14,17 +14,16 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd net utility functions. some functions copied from rabbitmq. %% @doc emqttd net utilities.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_net). -module(emqttd_net).
-include_lib("kernel/include/inet.hrl"). -include_lib("kernel/include/inet.hrl").
-export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, -export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2,
getaddr/2, port_to_listeners/1]). port_to_listeners/1]).
-export([peername/1, sockname/1, format/2, format/1, -export([peername/1, sockname/1, format/2, format/1, ntoa/1,
connection_string/2, ntoa/1]). connection_string/2]).
-define(FIRST_TEST_BIND_PORT, 10000). -define(FIRST_TEST_BIND_PORT, 10000).

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd options handler.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_opts). -module(emqttd_opts).
-export([merge/2, g/2, g/3]). -export([merge/2, g/2, g/3]).

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc MQTT Packet Functions %% @doc MQTT Packet Functions
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_packet). -module(emqttd_packet).
-include("emqttd.hrl"). -include("emqttd.hrl").

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc MQTT Packet Parser %% @doc MQTT Packet Parser
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_parser). -module(emqttd_parser).
-include("emqttd.hrl"). -include("emqttd.hrl").

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd plugins.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_plugins). -module(emqttd_plugins).
-include("emqttd.hrl"). -include("emqttd.hrl").

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Common Pool Supervisor %% @doc Common Pool Supervisor
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_pool_sup). -module(emqttd_pool_sup).
-behaviour(supervisor). -behaviour(supervisor).

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd pooler.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_pooler). -module(emqttd_pooler).
-behaviour(gen_server). -behaviour(gen_server).
@ -43,7 +41,7 @@ start_link() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}. -spec start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}.
start_link(Pool, Id) -> start_link(Pool, Id) ->
gen_server:start_link({local, emqttd:reg_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
%% @doc Submit work to pooler %% @doc Submit work to pooler
submit(Fun) -> gen_server:call(worker(), {submit, Fun}, infinity). submit(Fun) -> gen_server:call(worker(), {submit, Fun}, infinity).

View File

@ -14,8 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd protocol. %% @doc MQTT Protocol Processor.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_protocol). -module(emqttd_protocol).
-include("emqttd.hrl"). -include("emqttd.hrl").

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Core PubSub
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_pubsub). -module(emqttd_pubsub).
-behaviour(gen_server2). -behaviour(gen_server2).
@ -119,7 +117,7 @@ cache_env(Key) ->
StatsFun :: fun((atom()) -> any()), StatsFun :: fun((atom()) -> any()),
Opts :: list(tuple()). Opts :: list(tuple()).
start_link(Pool, Id, StatsFun, Opts) -> start_link(Pool, Id, StatsFun, Opts) ->
gen_server2:start_link({local, emqttd:reg_name(?MODULE, Id)}, gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)},
?MODULE, [Pool, Id, StatsFun, Opts], []). ?MODULE, [Pool, Id, StatsFun, Opts], []).
%% @doc Create Topic or Subscription. %% @doc Create Topic or Subscription.
@ -356,7 +354,7 @@ add_subscription(SubId, {Topic, Qos}) ->
Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}, Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'},
Records = mnesia:match_object(subscription, Pattern, write), Records = mnesia:match_object(subscription, Pattern, write),
case lists:member(Subscription, Records) of case lists:member(Subscription, Records) of
true -> true ->
ok; ok;
false -> false ->
[delete_subscription(Record) || Record <- Records], [delete_subscription(Record) || Record <- Records],

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%% @doc PubSub Helper. %%% @doc PubSub Helper.
%%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_pubsub_helper). -module(emqttd_pubsub_helper).
-behaviour(gen_server). -behaviour(gen_server).

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc PubSub Supervisor. %% @doc PubSub Supervisor.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_pubsub_sup). -module(emqttd_pubsub_sup).
-behaviour(supervisor). -behaviour(supervisor).

View File

@ -16,7 +16,6 @@
%% TODO: should match topic tree %% TODO: should match topic tree
%% @doc MQTT retained message storage. %% @doc MQTT retained message storage.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_retainer). -module(emqttd_retainer).
-behaviour(gen_server). -behaviour(gen_server).
@ -155,7 +154,7 @@ handle_info(expire, State = #state{expired_after = Never})
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info(expire, State = #state{expired_after = ExpiredAfter}) -> handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
expire(emqttd_util:now_to_secs(os:timestamp()) - ExpiredAfter), expire(emqttd_time:now_to_secs() - ExpiredAfter),
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->

View File

@ -14,8 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Message Router on local node. %% @doc MQTT Message Router
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_router). -module(emqttd_router).
-behaviour(gen_server2). -behaviour(gen_server2).
@ -51,7 +50,7 @@
%% @doc Start a local router. %% @doc Start a local router.
-spec start_link(atom(), pos_integer(), fun((atom()) -> ok), list()) -> {ok, pid()} | {error, any()}. -spec start_link(atom(), pos_integer(), fun((atom()) -> ok), list()) -> {ok, pid()} | {error, any()}.
start_link(Pool, Id, StatsFun, Env) -> start_link(Pool, Id, StatsFun, Env) ->
gen_server2:start_link({local, emqttd:reg_name(?MODULE,Id)}, gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)},
?MODULE, [Pool, Id, StatsFun, Env], []). ?MODULE, [Pool, Id, StatsFun, Env], []).
%% @doc Route Message on the local node. %% @doc Route Message on the local node.
@ -138,7 +137,7 @@ pick(Topic) ->
gproc_pool:pick_worker(router, Topic). gproc_pool:pick_worker(router, Topic).
stop(Id) when is_integer(Id) -> stop(Id) when is_integer(Id) ->
gen_server2:call(emqttd:reg_name(?MODULE, Id), stop). gen_server2:call(?PROC_NAME(?MODULE, Id), stop).
call(Router, Request) -> call(Router, Request) ->
gen_server2:call(Router, Request, infinity). gen_server2:call(Router, Request, infinity).
@ -148,13 +147,13 @@ cast(Router, Msg) ->
init([Pool, Id, StatsFun, Opts]) -> init([Pool, Id, StatsFun, Opts]) ->
emqttd_time:seed(),
%% Calls from pubsub should be scheduled first? %% Calls from pubsub should be scheduled first?
process_flag(priority, high), process_flag(priority, high),
?GPROC_POOL(join, Pool, Id), ?GPROC_POOL(join, Pool, Id),
emqttd:seed_now(),
AgingSecs = proplists:get_value(route_aging, Opts, 5), AgingSecs = proplists:get_value(route_aging, Opts, 5),
%% Aging Timer %% Aging Timer
@ -208,7 +207,7 @@ handle_info({clean, aged}, State = #state{aging = Aging}) ->
#aging{topics = Dict, time = Time} = Aging, #aging{topics = Dict, time = Time} = Aging,
ByTime = emqttd_util:now_to_secs() - Time, ByTime = emqttd_time:now_to_secs() - Time,
Dict1 = try_clean(ByTime, dict:to_list(Dict)), Dict1 = try_clean(ByTime, dict:to_list(Dict)),
@ -269,7 +268,7 @@ delete_topic(TopicR) ->
mnesia:delete_object(topic, TopicR, write). mnesia:delete_object(topic, TopicR, write).
store_aged(Topic, Aging = #aging{topics = Dict}) -> store_aged(Topic, Aging = #aging{topics = Dict}) ->
Now = emqttd_util:now_to_secs(), Now = emqttd_time:now_to_secs(),
Aging#aging{topics = dict:store(Topic, Now, Dict)}. Aging#aging{topics = dict:store(Topic, Now, Dict)}.
setstats(State = #state{statsfun = StatsFun}) -> setstats(State = #state{statsfun = StatsFun}) ->

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc MQTT Packet Serializer %% @doc MQTT Packet Serializer
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_serializer). -module(emqttd_serializer).
-include("emqttd.hrl"). -include("emqttd.hrl").

View File

@ -36,8 +36,7 @@
%% State of Message: newcome, inflight, pending %% State of Message: newcome, inflight, pending
%% %%
%% @end %% @end
%%
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_session). -module(emqttd_session).
-include("emqttd.hrl"). -include("emqttd.hrl").

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd session supervisor. %% @doc emqttd session supervisor.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_session_sup). -module(emqttd_session_sup).
-behavior(supervisor). -behavior(supervisor).

View File

@ -15,9 +15,10 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Session Manager %% @doc Session Manager
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_sm). -module(emqttd_sm).
-behaviour(gen_server2).
-include("emqttd.hrl"). -include("emqttd.hrl").
-include("emqttd_internal.hrl"). -include("emqttd_internal.hrl").
@ -35,8 +36,6 @@
-export([register_session/3, unregister_session/2]). -export([register_session/3, unregister_session/2]).
-behaviour(gen_server2).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
@ -75,7 +74,7 @@ mnesia(copy) ->
%% @doc Start a session manager %% @doc Start a session manager
-spec start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}. -spec start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}.
start_link(Pool, Id) -> start_link(Pool, Id) ->
gen_server2:start_link({local, emqttd:reg_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
%% @doc Start a session %% @doc Start a session
-spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}. -spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}.

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Session Helper. %% @doc Session Helper.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_sm_helper). -module(emqttd_sm_helper).
-behaviour(gen_server). -behaviour(gen_server).

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Session Manager Supervisor. %% @doc Session Manager Supervisor.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_sm_sup). -module(emqttd_sm_sup).
-behaviour(supervisor). -behaviour(supervisor).
@ -26,8 +25,7 @@
-define(HELPER, emqttd_sm_helper). -define(HELPER, emqttd_sm_helper).
-define(TABS, [mqtt_transient_session, -define(TABS, [mqtt_transient_session, mqtt_persistent_session]).
mqtt_persistent_session]).
%% API %% API
-export([start_link/0]). -export([start_link/0]).

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd statistics %% @doc emqttd statistics
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_stats). -module(emqttd_stats).
-include("emqttd.hrl"). -include("emqttd.hrl").
@ -118,7 +117,7 @@ setstats(Stat, MaxStat, Val) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
emqttd:seed_now(), emqttd_time:seed(),
ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]), ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]),
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED, Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED,
ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]), ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]),
@ -166,10 +165,11 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
publish(Stat, Val) -> publish(Stat, Val) ->
Msg = emqttd_message:make(stats, stats_topic(Stat), Msg = emqttd_message:make(stats, stats_topic(Stat), bin(Val)),
emqttd_util:integer_to_binary(Val)),
emqttd_pubsub:publish(Msg). emqttd_pubsub:publish(Msg).
stats_topic(Stat) -> stats_topic(Stat) ->
emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))). emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).
bin(I) when is_integer(I) -> list_to_binary(integer_to_list(I)).

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd top supervisor. %% @doc emqttd top supervisor.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_sup). -module(emqttd_sup).
-behaviour(supervisor). -behaviour(supervisor).
@ -42,10 +41,7 @@ start_link() ->
start_child(ChildSpec) when is_tuple(ChildSpec) -> start_child(ChildSpec) when is_tuple(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec). supervisor:start_child(?MODULE, ChildSpec).
%% -spec start_child(Mod::atom(), Type :: worker | supervisor) -> {ok, pid()}.
%% start_child(Mod::atom(), Type::type()) -> {ok, pid()}
%% @type type() = worker | supervisor
%%
start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc VM System Monitor %% @doc VM System Monitor
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_sysmon). -module(emqttd_sysmon).
-behavior(gen_server). -behavior(gen_server).

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd sysmon supervisor.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_sysmon_sup). -module(emqttd_sysmon_sup).
-behaviour(supervisor). -behaviour(supervisor).
@ -30,8 +28,7 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
Env = emqttd:env(sysmon), Sysmon = {sysmon, {emqttd_sysmon, start_link, [emqttd:env(sysmon)]},
{ok, {{one_for_one, 10, 100}, permanent, 5000, worker, [emqttd_sysmon]} ,
[{sysmon, {emqttd_sysmon, start_link, [Env]}, {ok, {{one_for_one, 10, 100}, [Sysmon]}}.
permanent, 5000, worker, [emqttd_sysmon]}]}}.

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc MQTT Topic Functions %% @doc MQTT Topic Functions
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_topic). -module(emqttd_topic).
-import(lists, [reverse/1]). -import(lists, [reverse/1]).
@ -26,8 +25,6 @@
-type topic() :: binary(). -type topic() :: binary().
%-type type() :: static | dynamic.
-type word() :: '' | '+' | '#' | binary(). -type word() :: '' | '+' | '#' | binary().
-type words() :: list(word()). -type words() :: list(word()).
@ -38,10 +35,7 @@
-define(MAX_TOPIC_LEN, 4096). -define(MAX_TOPIC_LEN, 4096).
%%%-----------------------------------------------------------------------------
%% @doc Is wildcard topic? %% @doc Is wildcard topic?
%% @end
%%%-----------------------------------------------------------------------------
-spec wildcard(topic()) -> true | false. -spec wildcard(topic()) -> true | false.
wildcard(Topic) when is_binary(Topic) -> wildcard(Topic) when is_binary(Topic) ->
wildcard(words(Topic)); wildcard(words(Topic));
@ -54,10 +48,7 @@ wildcard(['+'|_]) ->
wildcard([_H|T]) -> wildcard([_H|T]) ->
wildcard(T). wildcard(T).
%%------------------------------------------------------------------------------
%% @doc Match Topic name with filter %% @doc Match Topic name with filter
%% @end
%%------------------------------------------------------------------------------
-spec match(Name, Filter) -> boolean() when -spec match(Name, Filter) -> boolean() when
Name :: topic() | words(), Name :: topic() | words(),
Filter :: topic() | words(). Filter :: topic() | words().
@ -82,10 +73,7 @@ match([_H1|_], []) ->
match([], [_H|_T2]) -> match([], [_H|_T2]) ->
false. false.
%%------------------------------------------------------------------------------
%% @doc Validate Topic %% @doc Validate Topic
%% @end
%%------------------------------------------------------------------------------
-spec validate({name | filter, topic()}) -> boolean(). -spec validate({name | filter, topic()}) -> boolean().
validate({_, <<>>}) -> validate({_, <<>>}) ->
false; false;
@ -120,10 +108,7 @@ validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
validate3(<<_/utf8, Rest/binary>>) -> validate3(<<_/utf8, Rest/binary>>) ->
validate3(Rest). validate3(Rest).
%%%-----------------------------------------------------------------------------
%% @doc Topic to Triples %% @doc Topic to Triples
%% @end
%%%-----------------------------------------------------------------------------
-spec triples(topic()) -> list(triple()). -spec triples(topic()) -> list(triple()).
triples(Topic) when is_binary(Topic) -> triples(Topic) when is_binary(Topic) ->
triples(words(Topic), root, []). triples(words(Topic), root, []).
@ -145,10 +130,7 @@ bin('+') -> <<"+">>;
bin('#') -> <<"#">>; bin('#') -> <<"#">>;
bin(B) when is_binary(B) -> B. bin(B) when is_binary(B) -> B.
%%------------------------------------------------------------------------------
%% @doc Split Topic Path to Words %% @doc Split Topic Path to Words
%% @end
%%------------------------------------------------------------------------------
-spec words(topic()) -> words(). -spec words(topic()) -> words().
words(Topic) when is_binary(Topic) -> words(Topic) when is_binary(Topic) ->
[word(W) || W <- binary:split(Topic, <<"/">>, [global])]. [word(W) || W <- binary:split(Topic, <<"/">>, [global])].
@ -158,21 +140,14 @@ word(<<"+">>) -> '+';
word(<<"#">>) -> '#'; word(<<"#">>) -> '#';
word(Bin) -> Bin. word(Bin) -> Bin.
%%------------------------------------------------------------------------------
%% @doc Queue is a special topic name that starts with "$Q/" %% @doc Queue is a special topic name that starts with "$Q/"
%% @end
%%------------------------------------------------------------------------------
-spec is_queue(topic()) -> boolean(). -spec is_queue(topic()) -> boolean().
is_queue(<<"$Q/", _Queue/binary>>) -> is_queue(<<"$Q/", _Queue/binary>>) ->
true; true;
is_queue(_) -> is_queue(_) ->
false. false.
%%------------------------------------------------------------------------------
%% @doc '$SYS' Topic. %% @doc '$SYS' Topic.
%% @end
%%------------------------------------------------------------------------------
systop(Name) when is_atom(Name) -> systop(Name) when is_atom(Name) ->
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])); list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name]));

View File

@ -14,8 +14,9 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Trace MQTT packets/messages by ClientID or Topic. %% @doc
%% @author Feng Lee <feng@emqtt.io> %% Trace MQTT packets/messages by ClientID or Topic.
%% @end
-module(emqttd_trace). -module(emqttd_trace).
-behaviour(gen_server). -behaviour(gen_server).

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd trace supervisor.
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_trace_sup). -module(emqttd_trace_sup).
-behaviour(supervisor). -behaviour(supervisor).
@ -30,7 +28,7 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
{ok, {{one_for_one, 10, 100}, Trace = {trace, {emqttd_trace, start_link, []},
[{trace, {emqttd_trace, start_link, []}, permanent, 5000, worker, [emqttd_trace]},
permanent, 5000, worker, [emqttd_trace]}]}}. {ok, {{one_for_one, 10, 100}, [Trace]}}.

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd erlang vm.
%% @author @huangdan
-module(emqttd_vm). -module(emqttd_vm).
-export([schedulers/0]). -export([schedulers/0]).

View File

@ -14,11 +14,8 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd websocket client
-module(emqttd_ws_client). -module(emqttd_ws_client).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
-include("emqttd_protocol.hrl"). -include("emqttd_protocol.hrl").

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Lager logger backend.
%% @author Feng Lee <feng@emqtt.io>
-module(lager_emqtt_backend). -module(lager_emqtt_backend).
-behaviour(gen_event). -behaviour(gen_event).

View File

@ -20,8 +20,5 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
seed_now_test() ->
?assertNotEqual(emqttd:seed_now(), emqttd:seed_now()).
-endif. -endif.