From 89052d8e6ec2001da53339f338b2380214f69a57 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 11 Feb 2016 15:52:59 +0800 Subject: [PATCH] add emqttd_time module, fix comments and format --- src/emqttd_access_control.erl | 11 +++++------ src/emqttd_access_rule.erl | 7 ++----- src/emqttd_acl_internal.erl | 11 +++-------- src/emqttd_acl_mod.erl | 3 --- src/emqttd_app.erl | 3 --- src/emqttd_auth_anonymous.erl | 2 -- src/emqttd_auth_clientid.erl | 3 --- src/emqttd_auth_ldap.erl | 2 -- src/emqttd_auth_mod.erl | 2 -- src/emqttd_bridge.erl | 2 -- src/emqttd_bridge_sup.erl | 6 ++---- src/emqttd_client.erl | 1 - src/emqttd_cm.erl | 5 ++--- src/emqttd_cm_sup.erl | 1 - src/emqttd_ctl.erl | 7 +------ src/emqttd_gen_mod.erl | 1 - src/emqttd_guid.erl | 2 -- src/emqttd_http.erl | 1 - src/emqttd_keepalive.erl | 3 +-- src/emqttd_message.erl | 1 - src/emqttd_metrics.erl | 15 ++++++--------- src/emqttd_mod_presence.erl | 5 ++--- src/emqttd_mod_rewrite.erl | 1 - src/emqttd_mod_subscription.erl | 1 - src/emqttd_mod_sup.erl | 3 --- src/emqttd_mqueue.erl | 3 +-- src/emqttd_net.erl | 11 +++++------ src/emqttd_opts.erl | 2 -- src/emqttd_packet.erl | 1 - src/emqttd_parser.erl | 1 - src/emqttd_plugins.erl | 2 -- src/emqttd_pool_sup.erl | 1 - src/emqttd_pooler.erl | 4 +--- src/emqttd_protocol.erl | 3 +-- src/emqttd_pubsub.erl | 6 ++---- src/emqttd_pubsub_helper.erl | 1 - src/emqttd_pubsub_sup.erl | 1 - src/emqttd_retainer.erl | 3 +-- src/emqttd_router.erl | 15 +++++++-------- src/emqttd_serializer.erl | 1 - src/emqttd_session.erl | 3 +-- src/emqttd_session_sup.erl | 1 - src/emqttd_sm.erl | 7 +++---- src/emqttd_sm_helper.erl | 1 - src/emqttd_sm_sup.erl | 4 +--- src/emqttd_stats.erl | 8 ++++---- src/emqttd_sup.erl | 6 +----- src/emqttd_sysmon.erl | 1 - src/emqttd_sysmon_sup.erl | 9 +++------ src/emqttd_topic.erl | 25 ------------------------- src/emqttd_trace.erl | 5 +++-- src/emqttd_trace_sup.erl | 8 +++----- src/emqttd_vm.erl | 2 -- src/emqttd_ws_client.erl | 3 --- src/lager_emqtt_backend.erl | 2 -- test/emqttd_tests.erl | 3 --- 56 files changed, 61 insertions(+), 181 deletions(-) diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index ead1dc727..7525ce355 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -14,11 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Authentication and ACL Control. -module(emqttd_access_control). --author("Feng Lee "). - -include("emqttd.hrl"). -behaviour(gen_server). @@ -43,6 +40,8 @@ -type password() :: undefined | binary(). +-record(state, {}). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -128,10 +127,9 @@ stop() -> gen_server:call(?MODULE, stop). init([Opts]) -> ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), - ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, Opts))}), ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, Opts))}), - {ok, state}. + {ok, #state{}}. init_mods(auth, AuthMods) -> [init_mod(authmod(Name), Opts) || {Name, Opts} <- AuthMods]; @@ -151,7 +149,8 @@ handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) -> Seq1 >= Seq2 end, [{Mod, ModState, Seq} | Mods]), - ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), NewMods}); + ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), NewMods}), + ok; {error, Error} -> lager:error("Access Control: register ~s error - ~p", [Mod, Error]), {error, Error}; diff --git a/src/emqttd_access_rule.erl b/src/emqttd_access_rule.erl index 6527f3ec0..7f8d149fa 100644 --- a/src/emqttd_access_rule.erl +++ b/src/emqttd_access_rule.erl @@ -1,4 +1,4 @@ -%% ------------------------------------------------------------------- +%%-------------------------------------------------------------------- %% Copyright (c) 2012-2016 Feng Lee . %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,13 +12,10 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. -%% ------------------------------------------------------------------- +%%-------------------------------------------------------------------- -%% @doc Access Rule. -module(emqttd_access_rule). --author("Feng Lee "). - -include("emqttd.hrl"). -type who() :: all | binary() | diff --git a/src/emqttd_acl_internal.erl b/src/emqttd_acl_internal.erl index 2e9d959e5..1f9f2f434 100644 --- a/src/emqttd_acl_internal.erl +++ b/src/emqttd_acl_internal.erl @@ -14,17 +14,14 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Internal ACL that load rules from etc/acl.config -module(emqttd_acl_internal). --author("Feng Lee "). +-behaviour(emqttd_acl_mod). -include("emqttd.hrl"). -export([all_rules/0]). --behaviour(emqttd_acl_mod). - %% ACL callbacks -export([init/1, check_acl/2, reload_acl/1, description/0]). @@ -55,10 +52,8 @@ init(AclOpts) -> AclFile = proplists:get_value(file, AclOpts), Default = proplists:get_value(nomatch, AclOpts, allow), State = #state{acl_file = AclFile, nomatch = Default}, - case load_rules_from_file(State) of - ok -> {ok, State}; - {error, Error} -> {error, Error} - end. + true = load_rules_from_file(State), + {ok, State}. load_rules_from_file(#state{acl_file = AclFile}) -> {ok, Terms} = file:consult(AclFile), diff --git a/src/emqttd_acl_mod.erl b/src/emqttd_acl_mod.erl index 78b327c5e..dfec11157 100644 --- a/src/emqttd_acl_mod.erl +++ b/src/emqttd_acl_mod.erl @@ -14,11 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc ACL module behaviour. -module(emqttd_acl_mod). --author("Feng Lee "). - -include("emqttd.hrl"). %%-------------------------------------------------------------------- diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 447c3fd3b..0f8158c58 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -14,11 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd application. -module(emqttd_app). --author("Feng Lee "). - -include("emqttd_cli.hrl"). -behaviour(application). diff --git a/src/emqttd_auth_anonymous.erl b/src/emqttd_auth_anonymous.erl index 5799c28b1..8acdb7bf0 100644 --- a/src/emqttd_auth_anonymous.erl +++ b/src/emqttd_auth_anonymous.erl @@ -17,8 +17,6 @@ %% @doc Anonymous Authentication Module -module(emqttd_auth_anonymous). --author("Feng Lee "). - -behaviour(emqttd_auth_mod). -export([init/1, check/3, description/0]). diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index dc6a3568b..46d92d0e6 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -14,11 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc ClientId Authentication Module. -module(emqttd_auth_clientid). --author("Feng Lee "). - -include("emqttd.hrl"). -export([add_clientid/1, add_clientid/2, lookup_clientid/1, remove_clientid/1, diff --git a/src/emqttd_auth_ldap.erl b/src/emqttd_auth_ldap.erl index 026d16fdb..11e1f27f3 100644 --- a/src/emqttd_auth_ldap.erl +++ b/src/emqttd_auth_ldap.erl @@ -17,8 +17,6 @@ %% @doc LDAP Authentication Module -module(emqttd_auth_ldap). --author("Feng Lee "). - -include("emqttd.hrl"). -import(proplists, [get_value/2, get_value/3]). diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index 801b45c27..fda937bea 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -17,8 +17,6 @@ %% @doc Authentication Behaviour. -module(emqttd_auth_mod). --author("Feng Lee "). - -include("emqttd.hrl"). -export([passwd_hash/2]). diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index a0b56e639..32e1c02ee 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd bridge -%% @author Feng Lee -module(emqttd_bridge). -behaviour(gen_server2). diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index e2933436d..37183cc4b 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Bridge Supervisor -%% @author Feng Lee -module(emqttd_bridge_sup). -behavior(supervisor). @@ -57,8 +55,8 @@ start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) - stop_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> ChildId = ?BRIDGE_ID(Node, Topic), case supervisor:terminate_child(?MODULE, ChildId) of - ok -> supervisor:delete_child(?MODULE, ChildId); - {error, Reason} -> {error, Reason} + ok -> supervisor:delete_child(?MODULE, ChildId); + Error -> Error end. %%-------------------------------------------------------------------- diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 3b15ad510..3687eaa7f 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc MQTT Client Connection -%% @author Feng Lee -module(emqttd_client). -behaviour(gen_server). diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index f76fd869b..28edac37e 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -15,9 +15,10 @@ %%-------------------------------------------------------------------- %% @doc MQTT Client Manager -%% @author Feng Lee -module(emqttd_cm). +-behaviour(gen_server2). + -include("emqttd.hrl"). -include("emqttd_internal.hrl"). @@ -27,8 +28,6 @@ -export([lookup/1, lookup_proc/1, register/1, unregister/1]). --behaviour(gen_server2). - %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). diff --git a/src/emqttd_cm_sup.erl b/src/emqttd_cm_sup.erl index 05c86c529..0def00005 100644 --- a/src/emqttd_cm_sup.erl +++ b/src/emqttd_cm_sup.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc Client Manager Supervisor. -%% @author Feng Lee -module(emqttd_cm_sup). -behaviour(supervisor). diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 6a49932b0..a880a674f 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd control -%% @author Feng Lee -module(emqttd_ctl). -behaviour(gen_server). @@ -27,10 +25,7 @@ -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/0, - register_cmd/3, - unregister_cmd/1, - run/1]). +-export([start_link/0, register_cmd/3, unregister_cmd/1, run/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, diff --git a/src/emqttd_gen_mod.erl b/src/emqttd_gen_mod.erl index 7201533b3..043c6e177 100644 --- a/src/emqttd_gen_mod.erl +++ b/src/emqttd_gen_mod.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc emqttd gen_mod behaviour -%% @author Feng Lee -module(emqttd_gen_mod). -include("emqttd.hrl"). diff --git a/src/emqttd_guid.erl b/src/emqttd_guid.erl index e2540a668..c50294c65 100644 --- a/src/emqttd_guid.erl +++ b/src/emqttd_guid.erl @@ -27,8 +27,6 @@ %% 4. Sequence: 2 bytes sequence in one process %% %% @end -%% -%% @author Feng Lee -module(emqttd_guid). -export([gen/0, new/0, timestamp/1]). diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 60a7c30d7..d673b14d5 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc emqttd http publish API and websocket client. -%% @author Feng Lee -module(emqttd_http). -include("emqttd.hrl"). diff --git a/src/emqttd_keepalive.erl b/src/emqttd_keepalive.erl index 2154d96c5..e6c6f7809 100644 --- a/src/emqttd_keepalive.erl +++ b/src/emqttd_keepalive.erl @@ -14,8 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc client keepalive -%% @author Feng Lee +%% @doc Client Keepalive -module(emqttd_keepalive). -export([start/3, check/1, cancel/1]). diff --git a/src/emqttd_message.erl b/src/emqttd_message.erl index 80583113f..3508374c4 100644 --- a/src/emqttd_message.erl +++ b/src/emqttd_message.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc MQTT Message Functions -%% @author Feng Lee -module(emqttd_message). -include("emqttd.hrl"). diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index 7c3e5720d..6b6c6b7a8 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -14,8 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd metrics. responsible for collecting broker metrics -%% @author Feng Lee +%% @doc emqttd metrics. responsible for collecting broker metrics. -module(emqttd_metrics). -behaviour(gen_server). @@ -32,10 +31,7 @@ %% Received/Sent Metrics -export([received/1, sent/1]). --export([all/0, value/1, - inc/1, inc/2, inc/3, - dec/2, dec/3, - set/2]). +-export([all/0, value/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -240,7 +236,7 @@ key(counter, Metric) -> %%-------------------------------------------------------------------- init([]) -> - emqttd:seed_now(), + emqttd_time:seed(), Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, % Create metrics table ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]), @@ -276,8 +272,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- publish(Metric, Val) -> - Payload = emqttd_util:integer_to_binary(Val), - Msg = emqttd_message:make(metrics, metric_topic(Metric), Payload), + Msg = emqttd_message:make(metrics, metric_topic(Metric), bin(Val)), emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). create_metric({gauge, Name}) -> @@ -290,3 +285,5 @@ create_metric({counter, Name}) -> metric_topic(Metric) -> emqttd_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))). +bin(I) when is_integer(I) -> list_to_binary(integer_to_list(I)). + diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index 7262c025d..57a02c1bf 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc emqttd presence management module -%% @author Feng Lee -module(emqttd_mod_presence). -behaviour(emqttd_gen_mod). @@ -48,7 +47,7 @@ client_connected(ConnAck, #mqtt_client{client_id = ClientId, {session, Sess}, {protocol, ProtoVer}, {connack, ConnAck}, - {ts, emqttd_util:now_to_secs()}]), + {ts, emqttd_time:now_to_secs()}]), Msg = emqttd_message:make(presence, proplists:get_value(qos, Opts, 0), topic(connected, ClientId), @@ -58,7 +57,7 @@ client_connected(ConnAck, #mqtt_client{client_id = ClientId, client_disconnected(Reason, ClientId, Opts) -> Json = mochijson2:encode([{clientid, ClientId}, {reason, reason(Reason)}, - {ts, emqttd_util:now_to_secs()}]), + {ts, emqttd_time:now_to_secs()}]), Msg = emqttd_message:make(presence, proplists:get_value(qos, Opts, 0), topic(disconnected, ClientId), diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index 35fbedf2a..b1bad9766 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc emqttd rewrite module -%% @author Feng Lee -module(emqttd_mod_rewrite). -behaviour(emqttd_gen_mod). diff --git a/src/emqttd_mod_subscription.erl b/src/emqttd_mod_subscription.erl index c581609ff..93c96d0d9 100644 --- a/src/emqttd_mod_subscription.erl +++ b/src/emqttd_mod_subscription.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc Subscription from Broker Side -%% @author Feng Lee -module(emqttd_mod_subscription). -behaviour(emqttd_gen_mod). diff --git a/src/emqttd_mod_sup.erl b/src/emqttd_mod_sup.erl index a394ab9ad..759886fd3 100644 --- a/src/emqttd_mod_sup.erl +++ b/src/emqttd_mod_sup.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd module supervisor. -%% @author Feng Lee -module(emqttd_mod_sup). -behaviour(supervisor). @@ -48,7 +46,6 @@ start_child(ChildSpec) when is_tuple(ChildSpec) -> start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). - %%-------------------------------------------------------------------- %% Supervisor callbacks %%-------------------------------------------------------------------- diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index e5931899d..2c6a48ed7 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -40,8 +40,7 @@ %% otherwise dropped the oldest one. %% %% @end -%% -%% @author Feng Lee + -module(emqttd_mqueue). -include("emqttd.hrl"). diff --git a/src/emqttd_net.erl b/src/emqttd_net.erl index aff6032c8..a57e4a6ec 100644 --- a/src/emqttd_net.erl +++ b/src/emqttd_net.erl @@ -14,17 +14,16 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd net utility functions. some functions copied from rabbitmq. -%% @author Feng Lee +%% @doc emqttd net utilities. -module(emqttd_net). -include_lib("kernel/include/inet.hrl"). --export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, - getaddr/2, port_to_listeners/1]). +-export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, + port_to_listeners/1]). --export([peername/1, sockname/1, format/2, format/1, - connection_string/2, ntoa/1]). +-export([peername/1, sockname/1, format/2, format/1, ntoa/1, + connection_string/2]). -define(FIRST_TEST_BIND_PORT, 10000). diff --git a/src/emqttd_opts.erl b/src/emqttd_opts.erl index 2685efb05..894d88f18 100644 --- a/src/emqttd_opts.erl +++ b/src/emqttd_opts.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd options handler. -%% @author Feng Lee -module(emqttd_opts). -export([merge/2, g/2, g/3]). diff --git a/src/emqttd_packet.erl b/src/emqttd_packet.erl index fdb42d9b7..cb93d5565 100644 --- a/src/emqttd_packet.erl +++ b/src/emqttd_packet.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc MQTT Packet Functions -%% @author Feng Lee -module(emqttd_packet). -include("emqttd.hrl"). diff --git a/src/emqttd_parser.erl b/src/emqttd_parser.erl index 94a18a20d..8423308a3 100644 --- a/src/emqttd_parser.erl +++ b/src/emqttd_parser.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc MQTT Packet Parser -%% @author Feng Lee -module(emqttd_parser). -include("emqttd.hrl"). diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index ac5c57455..accf2b03e 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd plugins. -%% @author Feng Lee -module(emqttd_plugins). -include("emqttd.hrl"). diff --git a/src/emqttd_pool_sup.erl b/src/emqttd_pool_sup.erl index 5a4e10d72..e1f883e3f 100644 --- a/src/emqttd_pool_sup.erl +++ b/src/emqttd_pool_sup.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc Common Pool Supervisor -%% @author Feng Lee -module(emqttd_pool_sup). -behaviour(supervisor). diff --git a/src/emqttd_pooler.erl b/src/emqttd_pooler.erl index 0f3abe574..c22e501dc 100644 --- a/src/emqttd_pooler.erl +++ b/src/emqttd_pooler.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd pooler. -%% @author Feng Lee -module(emqttd_pooler). -behaviour(gen_server). @@ -43,7 +41,7 @@ start_link() -> %%-------------------------------------------------------------------- -spec start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}. start_link(Pool, Id) -> - gen_server:start_link({local, emqttd:reg_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). + gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []). %% @doc Submit work to pooler submit(Fun) -> gen_server:call(worker(), {submit, Fun}, infinity). diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index c1457be41..554da0752 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -14,8 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd protocol. -%% @author Feng Lee +%% @doc MQTT Protocol Processor. -module(emqttd_protocol). -include("emqttd.hrl"). diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 23a36134a..4626f23ba 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Core PubSub -%% @author Feng Lee -module(emqttd_pubsub). -behaviour(gen_server2). @@ -119,7 +117,7 @@ cache_env(Key) -> StatsFun :: fun((atom()) -> any()), Opts :: list(tuple()). start_link(Pool, Id, StatsFun, Opts) -> - gen_server2:start_link({local, emqttd:reg_name(?MODULE, Id)}, + gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, StatsFun, Opts], []). %% @doc Create Topic or Subscription. @@ -356,7 +354,7 @@ add_subscription(SubId, {Topic, Qos}) -> Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}, Records = mnesia:match_object(subscription, Pattern, write), case lists:member(Subscription, Records) of - true -> + true -> ok; false -> [delete_subscription(Record) || Record <- Records], diff --git a/src/emqttd_pubsub_helper.erl b/src/emqttd_pubsub_helper.erl index 79923ca9e..6e697b060 100644 --- a/src/emqttd_pubsub_helper.erl +++ b/src/emqttd_pubsub_helper.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %%% @doc PubSub Helper. -%%% @author Feng Lee -module(emqttd_pubsub_helper). -behaviour(gen_server). diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index c5df349bc..532f3c2a0 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc PubSub Supervisor. -%% @author Feng Lee -module(emqttd_pubsub_sup). -behaviour(supervisor). diff --git a/src/emqttd_retainer.erl b/src/emqttd_retainer.erl index 7fc3ca268..f18050b28 100644 --- a/src/emqttd_retainer.erl +++ b/src/emqttd_retainer.erl @@ -16,7 +16,6 @@ %% TODO: should match topic tree %% @doc MQTT retained message storage. -%% @author Feng Lee -module(emqttd_retainer). -behaviour(gen_server). @@ -155,7 +154,7 @@ handle_info(expire, State = #state{expired_after = Never}) {noreply, State, hibernate}; handle_info(expire, State = #state{expired_after = ExpiredAfter}) -> - expire(emqttd_util:now_to_secs(os:timestamp()) - ExpiredAfter), + expire(emqttd_time:now_to_secs() - ExpiredAfter), {noreply, State, hibernate}; handle_info(Info, State) -> diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 472b13501..76816302e 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -14,8 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Message Router on local node. -%% @author Feng Lee +%% @doc MQTT Message Router -module(emqttd_router). -behaviour(gen_server2). @@ -51,7 +50,7 @@ %% @doc Start a local router. -spec start_link(atom(), pos_integer(), fun((atom()) -> ok), list()) -> {ok, pid()} | {error, any()}. start_link(Pool, Id, StatsFun, Env) -> - gen_server2:start_link({local, emqttd:reg_name(?MODULE,Id)}, + gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, StatsFun, Env], []). %% @doc Route Message on the local node. @@ -138,7 +137,7 @@ pick(Topic) -> gproc_pool:pick_worker(router, Topic). stop(Id) when is_integer(Id) -> - gen_server2:call(emqttd:reg_name(?MODULE, Id), stop). + gen_server2:call(?PROC_NAME(?MODULE, Id), stop). call(Router, Request) -> gen_server2:call(Router, Request, infinity). @@ -148,13 +147,13 @@ cast(Router, Msg) -> init([Pool, Id, StatsFun, Opts]) -> + emqttd_time:seed(), + %% Calls from pubsub should be scheduled first? process_flag(priority, high), ?GPROC_POOL(join, Pool, Id), - emqttd:seed_now(), - AgingSecs = proplists:get_value(route_aging, Opts, 5), %% Aging Timer @@ -208,7 +207,7 @@ handle_info({clean, aged}, State = #state{aging = Aging}) -> #aging{topics = Dict, time = Time} = Aging, - ByTime = emqttd_util:now_to_secs() - Time, + ByTime = emqttd_time:now_to_secs() - Time, Dict1 = try_clean(ByTime, dict:to_list(Dict)), @@ -269,7 +268,7 @@ delete_topic(TopicR) -> mnesia:delete_object(topic, TopicR, write). store_aged(Topic, Aging = #aging{topics = Dict}) -> - Now = emqttd_util:now_to_secs(), + Now = emqttd_time:now_to_secs(), Aging#aging{topics = dict:store(Topic, Now, Dict)}. setstats(State = #state{statsfun = StatsFun}) -> diff --git a/src/emqttd_serializer.erl b/src/emqttd_serializer.erl index 75222ffc4..3580e8507 100644 --- a/src/emqttd_serializer.erl +++ b/src/emqttd_serializer.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc MQTT Packet Serializer -%% @author Feng Lee -module(emqttd_serializer). -include("emqttd.hrl"). diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 0196b2cd7..f8c8852a3 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -36,8 +36,7 @@ %% State of Message: newcome, inflight, pending %% %% @end -%% -%% @author Feng Lee + -module(emqttd_session). -include("emqttd.hrl"). diff --git a/src/emqttd_session_sup.erl b/src/emqttd_session_sup.erl index 820aff055..6bf703c6d 100644 --- a/src/emqttd_session_sup.erl +++ b/src/emqttd_session_sup.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc emqttd session supervisor. -%% @author Feng Lee -module(emqttd_session_sup). -behavior(supervisor). diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 28cb2e428..a16c13f71 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -15,9 +15,10 @@ %%-------------------------------------------------------------------- %% @doc Session Manager -%% @author Feng Lee -module(emqttd_sm). +-behaviour(gen_server2). + -include("emqttd.hrl"). -include("emqttd_internal.hrl"). @@ -35,8 +36,6 @@ -export([register_session/3, unregister_session/2]). --behaviour(gen_server2). - %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -75,7 +74,7 @@ mnesia(copy) -> %% @doc Start a session manager -spec start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}. start_link(Pool, Id) -> - gen_server2:start_link({local, emqttd:reg_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). + gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []). %% @doc Start a session -spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}. diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index 35c5676d2..bce724eb2 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc Session Helper. -%% @author Feng Lee -module(emqttd_sm_helper). -behaviour(gen_server). diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index 62b8de0ef..556d9540f 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc Session Manager Supervisor. -%% @author Feng Lee -module(emqttd_sm_sup). -behaviour(supervisor). @@ -26,8 +25,7 @@ -define(HELPER, emqttd_sm_helper). --define(TABS, [mqtt_transient_session, - mqtt_persistent_session]). +-define(TABS, [mqtt_transient_session, mqtt_persistent_session]). %% API -export([start_link/0]). diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 292adf4f3..c6a156d81 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc emqttd statistics -%% @author Feng Lee -module(emqttd_stats). -include("emqttd.hrl"). @@ -118,7 +117,7 @@ setstats(Stat, MaxStat, Val) -> %%-------------------------------------------------------------------- init([]) -> - emqttd:seed_now(), + emqttd_time:seed(), ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]), Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED, ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]), @@ -166,10 +165,11 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- publish(Stat, Val) -> - Msg = emqttd_message:make(stats, stats_topic(Stat), - emqttd_util:integer_to_binary(Val)), + Msg = emqttd_message:make(stats, stats_topic(Stat), bin(Val)), emqttd_pubsub:publish(Msg). stats_topic(Stat) -> emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))). +bin(I) when is_integer(I) -> list_to_binary(integer_to_list(I)). + diff --git a/src/emqttd_sup.erl b/src/emqttd_sup.erl index 2c78af8e3..2f21b51ed 100644 --- a/src/emqttd_sup.erl +++ b/src/emqttd_sup.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc emqttd top supervisor. -%% @author Feng Lee -module(emqttd_sup). -behaviour(supervisor). @@ -42,10 +41,7 @@ start_link() -> start_child(ChildSpec) when is_tuple(ChildSpec) -> supervisor:start_child(?MODULE, ChildSpec). -%% -%% start_child(Mod::atom(), Type::type()) -> {ok, pid()} -%% @type type() = worker | supervisor -%% +-spec start_child(Mod::atom(), Type :: worker | supervisor) -> {ok, pid()}. start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). diff --git a/src/emqttd_sysmon.erl b/src/emqttd_sysmon.erl index 14277c23e..5fa68bcec 100644 --- a/src/emqttd_sysmon.erl +++ b/src/emqttd_sysmon.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc VM System Monitor -%% @author Feng Lee -module(emqttd_sysmon). -behavior(gen_server). diff --git a/src/emqttd_sysmon_sup.erl b/src/emqttd_sysmon_sup.erl index fbbe2e436..02d60530a 100644 --- a/src/emqttd_sysmon_sup.erl +++ b/src/emqttd_sysmon_sup.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd sysmon supervisor. -%% @author Feng Lee -module(emqttd_sysmon_sup). -behaviour(supervisor). @@ -30,8 +28,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Env = emqttd:env(sysmon), - {ok, {{one_for_one, 10, 100}, - [{sysmon, {emqttd_sysmon, start_link, [Env]}, - permanent, 5000, worker, [emqttd_sysmon]}]}}. + Sysmon = {sysmon, {emqttd_sysmon, start_link, [emqttd:env(sysmon)]}, + permanent, 5000, worker, [emqttd_sysmon]} , + {ok, {{one_for_one, 10, 100}, [Sysmon]}}. diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index 91ec90d0e..86e9d2d2d 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- %% @doc MQTT Topic Functions -%% @author Feng Lee -module(emqttd_topic). -import(lists, [reverse/1]). @@ -26,8 +25,6 @@ -type topic() :: binary(). -%-type type() :: static | dynamic. - -type word() :: '' | '+' | '#' | binary(). -type words() :: list(word()). @@ -38,10 +35,7 @@ -define(MAX_TOPIC_LEN, 4096). -%%%----------------------------------------------------------------------------- %% @doc Is wildcard topic? -%% @end -%%%----------------------------------------------------------------------------- -spec wildcard(topic()) -> true | false. wildcard(Topic) when is_binary(Topic) -> wildcard(words(Topic)); @@ -54,10 +48,7 @@ wildcard(['+'|_]) -> wildcard([_H|T]) -> wildcard(T). -%%------------------------------------------------------------------------------ %% @doc Match Topic name with filter -%% @end -%%------------------------------------------------------------------------------ -spec match(Name, Filter) -> boolean() when Name :: topic() | words(), Filter :: topic() | words(). @@ -82,10 +73,7 @@ match([_H1|_], []) -> match([], [_H|_T2]) -> false. -%%------------------------------------------------------------------------------ %% @doc Validate Topic -%% @end -%%------------------------------------------------------------------------------ -spec validate({name | filter, topic()}) -> boolean(). validate({_, <<>>}) -> false; @@ -120,10 +108,7 @@ validate3(<>) when C == $#; C == $+; C == 0 -> validate3(<<_/utf8, Rest/binary>>) -> validate3(Rest). -%%%----------------------------------------------------------------------------- %% @doc Topic to Triples -%% @end -%%%----------------------------------------------------------------------------- -spec triples(topic()) -> list(triple()). triples(Topic) when is_binary(Topic) -> triples(words(Topic), root, []). @@ -145,10 +130,7 @@ bin('+') -> <<"+">>; bin('#') -> <<"#">>; bin(B) when is_binary(B) -> B. -%%------------------------------------------------------------------------------ %% @doc Split Topic Path to Words -%% @end -%%------------------------------------------------------------------------------ -spec words(topic()) -> words(). words(Topic) when is_binary(Topic) -> [word(W) || W <- binary:split(Topic, <<"/">>, [global])]. @@ -158,21 +140,14 @@ word(<<"+">>) -> '+'; word(<<"#">>) -> '#'; word(Bin) -> Bin. -%%------------------------------------------------------------------------------ %% @doc Queue is a special topic name that starts with "$Q/" -%% @end -%%------------------------------------------------------------------------------ -spec is_queue(topic()) -> boolean(). is_queue(<<"$Q/", _Queue/binary>>) -> true; is_queue(_) -> false. -%%------------------------------------------------------------------------------ %% @doc '$SYS' Topic. -%% @end -%%------------------------------------------------------------------------------ - systop(Name) when is_atom(Name) -> list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])); diff --git a/src/emqttd_trace.erl b/src/emqttd_trace.erl index 9133bc1f1..e6681e86a 100644 --- a/src/emqttd_trace.erl +++ b/src/emqttd_trace.erl @@ -14,8 +14,9 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Trace MQTT packets/messages by ClientID or Topic. -%% @author Feng Lee +%% @doc +%% Trace MQTT packets/messages by ClientID or Topic. +%% @end -module(emqttd_trace). -behaviour(gen_server). diff --git a/src/emqttd_trace_sup.erl b/src/emqttd_trace_sup.erl index 916dfaaf5..f0402a257 100644 --- a/src/emqttd_trace_sup.erl +++ b/src/emqttd_trace_sup.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd trace supervisor. -%% @author Feng Lee -module(emqttd_trace_sup). -behaviour(supervisor). @@ -30,7 +28,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, {{one_for_one, 10, 100}, - [{trace, {emqttd_trace, start_link, []}, - permanent, 5000, worker, [emqttd_trace]}]}}. + Trace = {trace, {emqttd_trace, start_link, []}, + permanent, 5000, worker, [emqttd_trace]}, + {ok, {{one_for_one, 10, 100}, [Trace]}}. diff --git a/src/emqttd_vm.erl b/src/emqttd_vm.erl index 3ba5fd598..bad4d7c96 100644 --- a/src/emqttd_vm.erl +++ b/src/emqttd_vm.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd erlang vm. -%% @author @huangdan -module(emqttd_vm). -export([schedulers/0]). diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 454d00344..776dc4ce5 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -14,11 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd websocket client -module(emqttd_ws_client). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). diff --git a/src/lager_emqtt_backend.erl b/src/lager_emqtt_backend.erl index 632623eba..9c28090bf 100644 --- a/src/lager_emqtt_backend.erl +++ b/src/lager_emqtt_backend.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Lager logger backend. -%% @author Feng Lee -module(lager_emqtt_backend). -behaviour(gen_event). diff --git a/test/emqttd_tests.erl b/test/emqttd_tests.erl index d50cc7a4d..535543e3a 100644 --- a/test/emqttd_tests.erl +++ b/test/emqttd_tests.erl @@ -20,8 +20,5 @@ -include_lib("eunit/include/eunit.hrl"). -seed_now_test() -> - ?assertNotEqual(emqttd:seed_now(), emqttd:seed_now()). - -endif.