diff --git a/src/emqx.erl b/src/emqx.erl index 15704f64d..44dd41f7e 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -18,21 +18,43 @@ -include("types.hrl"). %% Start/Stop the application --export([start/0, restart/1, is_running/1, stop/0]). +-export([ start/0 + , restart/1 + , is_running/1 + , stop/0 + ]). %% PubSub API --export([subscribe/1, subscribe/2, subscribe/3]). +-export([ subscribe/1 + , subscribe/2 + , subscribe/3 + ]). + -export([publish/1]). + -export([unsubscribe/1]). %% PubSub management API --export([topics/0, subscriptions/1, subscribers/1, subscribed/2]). +-export([ topics/0 + , subscriptions/1 + , subscribers/1 + , subscribed/2 + ]). %% Hooks API --export([hook/2, hook/3, hook/4, unhook/2, run_hook/2, run_fold_hook/3]). +-export([ hook/2 + , hook/3 + , hook/4 + , unhook/2 + , run_hook/2 + , run_fold_hook/3 + ]). %% Shutdown and reboot --export([shutdown/0, shutdown/1, reboot/0]). +-export([ shutdown/0 + , shutdown/1 + , reboot/0 + ]). -define(APP, ?MODULE). diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 8a73e5dca..4fda19b08 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -18,7 +18,10 @@ -include("logger.hrl"). -export([authenticate/1]). --export([check_acl/3, reload_acl/0]). + +-export([ check_acl/3 + , reload_acl/0 + ]). %%------------------------------------------------------------------------------ %% APIs @@ -57,12 +60,14 @@ do_check_acl(#{zone := Zone} = Credentials, PubSub, Topic) -> _ -> deny end. --spec(reload_acl() -> list(ok | {error, term()})). +-spec(reload_acl() -> ok | {error, term()}). reload_acl() -> + emqx_acl_cache:is_enabled() andalso + emqx_acl_cache:empty_acl_cache(), emqx_mod_acl_internal:reload_acl(). init_auth_result(Credentials) -> case emqx_zone:get_env(maps:get(zone, Credentials, undefined), allow_anonymous, false) of true -> success; false -> not_authorized - end. \ No newline at end of file + end. diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index c14cbec7f..5d40341cc 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -16,6 +16,11 @@ -include("emqx.hrl"). +%% APIs +-export([ match/3 + , compile/1 + ]). + -type(acl_result() :: allow | deny). -type(who() :: all | binary() | @@ -30,12 +35,13 @@ -export_type([rule/0]). --export([compile/1]). --export([match/3]). - -define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))). -define(PUBSUB(A), ((A =:= subscribe) orelse (A =:= publish) orelse (A =:= pubsub))). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + %% @doc Compile Access Rule. compile({A, all}) when ?ALLOW_DENY(A) -> {A, all}; diff --git a/src/emqx_acl_cache.erl b/src/emqx_acl_cache.erl index fb63e0ad5..92d4d8328 100644 --- a/src/emqx_acl_cache.erl +++ b/src/emqx_acl_cache.erl @@ -16,19 +16,19 @@ -include("emqx.hrl"). --export([ get_acl_cache/2 - , put_acl_cache/3 - , cleanup_acl_cache/0 - , empty_acl_cache/0 - , dump_acl_cache/0 - , get_cache_size/0 - , get_cache_max_size/0 - , get_newest_key/0 - , get_oldest_key/0 - , cache_k/2 - , cache_v/1 - , is_enabled/0 - ]). +-export([ get_acl_cache/2 + , put_acl_cache/3 + , cleanup_acl_cache/0 + , empty_acl_cache/0 + , dump_acl_cache/0 + , get_cache_size/0 + , get_cache_max_size/0 + , get_newest_key/0 + , get_oldest_key/0 + , cache_k/2 + , cache_v/1 + , is_enabled/0 + ]). -type(acl_result() :: allow | deny). diff --git a/src/emqx_acl_mod.erl b/src/emqx_acl_mod.erl deleted file mode 100644 index c75532441..000000000 --- a/src/emqx_acl_mod.erl +++ /dev/null @@ -1,44 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_acl_mod). - --include("emqx.hrl"). - -%%-------------------------------------------------------------------- -%% ACL behavihour -%%-------------------------------------------------------------------- - --ifdef(use_specs). - --callback(init(AclOpts :: list()) -> {ok, State :: term()}). - --callback(check_acl({credentials(), pubsub(), topic()}, State :: term()) - -> allow | deny | ignore). - --callback(reload_acl(State :: term()) -> ok | {error, term()}). - --callback(description() -> string()). - --else. - --export([behaviour_info/1]). - -behaviour_info(callbacks) -> - [{init, 1}, {check_acl, 2}, {reload_acl, 1}, {description, 0}]; -behaviour_info(_Other) -> - undefined. - --endif. - diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index cb65c1338..6c7d1a470 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -25,15 +25,18 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). --export([init/1, - handle_event/2, - handle_call/2, - handle_info/2, - terminate/2]). +%% gen_server callbacks +-export([ init/1 + , handle_event/2 + , handle_call/2 + , handle_info/2 + , terminate/2 + ]). --export([load/0, - unload/0, - get_alarms/0]). +-export([ load/0 + , unload/0 + , get_alarms/0 + ]). -record(common_alarm, {id, desc}). -record(alarm_history, {id, clear_at}). @@ -171,4 +174,3 @@ set_alarm_history(Id) -> mnesia:dirty_write(?ALARM_HISTORY_TAB, #alarm_history{id = Id, clear_at = undefined}). - diff --git a/src/emqx_app.erl b/src/emqx_app.erl index e7bcca398..2ef660521 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -16,7 +16,9 @@ -behaviour(application). --export([start/2, stop/1]). +-export([ start/2 + , stop/1 + ]). -define(APP, emqx). diff --git a/src/emqx_auth_mod.erl b/src/emqx_auth_mod.erl deleted file mode 100644 index 615b4e528..000000000 --- a/src/emqx_auth_mod.erl +++ /dev/null @@ -1,41 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_auth_mod). - --include("emqx.hrl"). - -%%-------------------------------------------------------------------- -%% Authentication behavihour -%%-------------------------------------------------------------------- - --ifdef(use_specs). - --callback(init(AuthOpts :: list()) -> {ok, State :: term()}). - --callback(check(credentials(), password(), State :: term()) - -> ok | {ok, boolean()} | {ok, map()} | - {continue, map()} | ignore | {error, term()}). --callback(description() -> string()). - --else. - --export([behaviour_info/1]). - -behaviour_info(callbacks) -> - [{init, 1}, {check, 3}, {description, 0}]; -behaviour_info(_Other) -> - undefined. - --endif. diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index a35365d4e..9e7c2d4ae 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -27,11 +27,20 @@ -copy_mnesia({mnesia, [copy]}). -export([start_link/0]). --export([check/1]). --export([add/1, delete/1]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ add/1 + , delete/1 + , check/1 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -define(TAB, ?MODULE). diff --git a/src/emqx_base62.erl b/src/emqx_base62.erl index 79c19a508..77d04fc4d 100644 --- a/src/emqx_base62.erl +++ b/src/emqx_base62.erl @@ -14,15 +14,19 @@ -module(emqx_base62). --export([encode/1, - encode/2, - decode/1, - decode/2]). +%% APIs +-export([ encode/1 + , encode/2 + , decode/1 + , decode/2 + ]). + +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ %% @doc Encode any data to base62 binary --spec encode(string() - | integer() - | binary()) -> binary(). +-spec encode(string() | integer() | binary()) -> binary(). encode(I) when is_integer(I) -> encode(integer_to_binary(I)); encode(S) when is_list(S)-> @@ -39,11 +43,9 @@ decode(L) when is_list(L) -> decode(B) when is_binary(B) -> decode(B, <<>>). - - -%%==================================================================== -%% Internal functions -%%==================================================================== +%%------------------------------------------------------------------------------ +%% Interval Functions +%%------------------------------------------------------------------------------ encode(D, string) -> binary_to_list(encode(D)); @@ -110,3 +112,4 @@ decode_char(I) when I >= $A andalso I =< $Z-> decode_char(9, I) -> I + 61 - $A. + diff --git a/src/emqx_batch.erl b/src/emqx_batch.erl index 9fb59eb1f..b7e341367 100644 --- a/src/emqx_batch.erl +++ b/src/emqx_batch.erl @@ -14,29 +14,38 @@ -module(emqx_batch). --export([init/1, push/2, commit/1]). --export([size/1, items/1]). +%% APIs +-export([ init/1 + , push/2 + , commit/1 + , size/1 + , items/1 + ]). --type(options() :: #{ - batch_size => non_neg_integer(), - linger_ms => pos_integer(), - commit_fun := function() +-record(batch, + { batch_size :: non_neg_integer() + , batch_q :: list(any()) + , linger_ms :: pos_integer() + , linger_timer :: reference() | undefined + , commit_fun :: function() + }). + +-type(options() :: + #{ batch_size => non_neg_integer() + , linger_ms => pos_integer() + , commit_fun := function() }). --export_type([options/0]). - --record(batch, { - batch_size :: non_neg_integer(), - batch_q :: list(any()), - linger_ms :: pos_integer(), - linger_timer :: reference() | undefined, - commit_fun :: function() - }). - -opaque(batch() :: #batch{}). +-export_type([options/0]). + -export_type([batch/0]). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + -spec(init(options()) -> batch()). init(Opts) when is_map(Opts) -> #batch{batch_size = maps:get(batch_size, Opts, 1000), diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index cdb4183fd..5bd865ccf 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -61,25 +61,47 @@ -behaviour(gen_statem). %% APIs --export([start_link/2, - import_batch/2, - handle_ack/2, - stop/1]). +-export([ start_link/2 + , import_batch/2 + , handle_ack/2 + , stop/1 + ]). %% gen_statem callbacks --export([terminate/3, code_change/4, init/1, callback_mode/0]). +-export([ terminate/3 + , code_change/4 + , init/1 + , callback_mode/0 + ]). %% state functions --export([standing_by/3, connecting/3, connected/3]). +-export([ standing_by/3 + , connecting/3 + , connected/3 + ]). %% management APIs --export([ensure_started/1, ensure_started/2, ensure_stopped/1, ensure_stopped/2, status/1]). --export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]). --export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]). +-export([ ensure_started/1 + , ensure_started/2 + , ensure_stopped/1 + , ensure_stopped/2 + , status/1 + ]). --export_type([config/0, - batch/0, - ack_ref/0]). +-export([ get_forwards/1 + , ensure_forward_present/2 + , ensure_forward_absent/2 + ]). + +-export([ get_subscriptions/1 + , ensure_subscription_present/3 + , ensure_subscription_absent/2 + ]). + +-export_type([ config/0 + , batch/0 + , ack_ref/0 + ]). -type id() :: atom() | string() | pid(). -type qos() :: emqx_mqtt_types:qos(). @@ -552,3 +574,4 @@ name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])). id(Pid) when is_pid(Pid) -> Pid; id(Name) -> name(Name). + diff --git a/src/emqx_bridge_connect.erl b/src/emqx_bridge_connect.erl index b2781cc2c..49fa2b478 100644 --- a/src/emqx_bridge_connect.erl +++ b/src/emqx_bridge_connect.erl @@ -69,3 +69,4 @@ obfuscate(Map) -> is_sensitive(password) -> true; is_sensitive(_) -> false. + diff --git a/src/emqx_bridge_mqtt.erl b/src/emqx_bridge_mqtt.erl index 486f3206a..10f56d7c5 100644 --- a/src/emqx_bridge_mqtt.erl +++ b/src/emqx_bridge_mqtt.erl @@ -15,17 +15,18 @@ %% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol -module(emqx_bridge_mqtt). + -behaviour(emqx_bridge_connect). %% behaviour callbacks --export([start/1, - send/2, - stop/2 +-export([ start/1 + , send/2 + , stop/2 ]). %% optional behaviour callbacks --export([ensure_subscribed/3, - ensure_unsubscribed/2 +-export([ ensure_subscribed/3 + , ensure_unsubscribed/2 ]). -include("emqx_mqtt.hrl"). @@ -39,6 +40,10 @@ -define(ACKED(AnyPktId), {acked, AnyPktId}). -define(STOP(Ref), {stop, Ref}). +%%------------------------------------------------------------------------------ +%% emqx_bridge_connect callbacks +%%------------------------------------------------------------------------------ + start(Config = #{address := Address}) -> Ref = make_ref(), Parent = self(), @@ -183,3 +188,4 @@ subscribe_remote_topics(ClientPid, Subscriptions) -> Error -> throw(Error) end end, Subscriptions). + diff --git a/src/emqx_bridge_rpc.erl b/src/emqx_bridge_rpc.erl index b818d65da..9674fdcf1 100644 --- a/src/emqx_bridge_rpc.erl +++ b/src/emqx_bridge_rpc.erl @@ -18,9 +18,9 @@ -behaviour(emqx_bridge_connect). %% behaviour callbacks --export([start/1, - send/2, - stop/2 +-export([ start/1 + , send/2 + , stop/2 ]). %% Internal exports diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index bcacb411c..fc0be3995 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -17,8 +17,17 @@ -include("logger.hrl"). --export([start_link/0, start_link/1, bridges/0]). --export([create_bridge/2, drop_bridge/1]). +%% APIs +-export([ start_link/0 + , start_link/1 + , bridges/0 + ]). + +-export([ create_bridge/2 + , drop_bridge/1 + ]). + +%% supervisor callbacks -export([init/1]). -define(SUP, ?MODULE). @@ -60,3 +69,4 @@ drop_bridge(Id) -> ?LOG(error, "[Bridge] Delete bridge failed", [Error]), Error end. + diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index c63831ab9..33fc24056 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -21,21 +21,46 @@ -include("types.hrl"). -export([start_link/2]). --export([subscribe/1, subscribe/2, subscribe/3]). + +%% PubSub +-export([ subscribe/1 + , subscribe/2 + , subscribe/3 + ]). + -export([unsubscribe/1]). + -export([subscriber_down/1]). --export([publish/1, safe_publish/1]). + +-export([ publish/1 + , safe_publish/1 + ]). + -export([dispatch/2]). --export([subscriptions/1, subscribers/1, subscribed/2]). --export([get_subopts/2, set_subopts/2]). + +%% PubSub Infos +-export([ subscriptions/1 + , subscribers/1 + , subscribed/2 + ]). + +-export([ get_subopts/2 + , set_subopts/2 + ]). + -export([topics/0]). %% Stats fun -export([stats_fun/0]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -import(emqx_tables, [lookup_value/2, lookup_value/3]). @@ -444,4 +469,4 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ %% Internal functions -%%------------------------------------------------------------------------------ \ No newline at end of file +%%------------------------------------------------------------------------------ diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index f5b880163..cd92ec89b 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -20,13 +20,24 @@ -include("types.hrl"). -export([start_link/0]). --export([register_sub/2]). --export([lookup_subid/1, lookup_subpid/1]). --export([get_sub_shard/2]). --export([create_seq/1, reclaim_seq/1]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +%% APIs +-export([ register_sub/2 + , lookup_subid/1 + , lookup_subpid/1 + , get_sub_shard/2 + , create_seq/1 + , reclaim_seq/1 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -define(HELPER, ?MODULE). -define(SUBID, emqx_subid). diff --git a/src/emqx_cli.erl b/src/emqx_cli.erl index 37b2f17fb..e681936c3 100644 --- a/src/emqx_cli.erl +++ b/src/emqx_cli.erl @@ -14,7 +14,11 @@ -module(emqx_cli). --export([print/1, print/2, usage/1, usage/2]). +-export([ print/1 + , print/2 + , usage/1 + , usage/2 + ]). print(Msg) -> io:format(Msg), lists:flatten(io_lib:format("~p", [Msg])). diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 4ec28d2d9..73622a2a6 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -20,27 +20,71 @@ -include("emqx_client.hrl"). -export([start_link/0, start_link/1]). --export([connect/1]). --export([subscribe/2, subscribe/3, subscribe/4]). --export([publish/2, publish/3, publish/4, publish/5]). --export([unsubscribe/2, unsubscribe/3]). + +-export([ connect/1 + , disconnect/1 + , disconnect/2 + , disconnect/3 + ]). + -export([ping/1]). --export([disconnect/1, disconnect/2, disconnect/3]). --export([puback/2, puback/3, puback/4]). --export([pubrec/2, pubrec/3, pubrec/4]). --export([pubrel/2, pubrel/3, pubrel/4]). --export([pubcomp/2, pubcomp/3, pubcomp/4]). + +%% PubSub +-export([ subscribe/2 + , subscribe/3 + , subscribe/4 + , publish/2 + , publish/3 + , publish/4 + , publish/5 + , unsubscribe/2 + , unsubscribe/3 + ]). + +%% Puback... +-export([ puback/2 + , puback/3 + , puback/4 + , pubrec/2 + , pubrec/3 + , pubrec/4 + , pubrel/2 + , pubrel/3 + , pubrel/4 + , pubcomp/2 + , pubcomp/3 + , pubcomp/4 + ]). + -export([subscriptions/1]). + -export([info/1, stop/1]). + %% For test cases -export([pause/1, resume/1]). --export([initialized/3, waiting_for_connack/3, connected/3, inflight_full/3]). --export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). +-export([ initialized/3 + , waiting_for_connack/3 + , connected/3 + , inflight_full/3 + ]). --export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, mqtt_msg/0]). +-export([ init/1 + , callback_mode/0 + , handle_event/4 + , terminate/3 + , code_change/4 + ]). --export_type([host/0, option/0]). +-export_type([ host/0 + , client/0 + , option/0 + , properties/0 + , payload/0 + , pubopt/0 + , subopt/0 + , mqtt_msg/0 + ]). %% Default timeout -define(DEFAULT_KEEPALIVE, 60000). @@ -1186,3 +1230,4 @@ bump_last_packet_id(State = #state{last_packet_id = Id}) -> -spec next_packet_id(packet_id()) -> packet_id(). next_packet_id(?MAX_PACKET_ID) -> 1; next_packet_id(Id) -> Id + 1. + diff --git a/src/emqx_client_sock.erl b/src/emqx_client_sock.erl index e71eef169..ae9b03305 100644 --- a/src/emqx_client_sock.erl +++ b/src/emqx_client_sock.erl @@ -14,9 +14,15 @@ -module(emqx_client_sock). --export([connect/4, send/2, close/1]). +-export([ connect/4 + , send/2 + , close/1 + ]). --export([sockname/1, setopts/2, getstat/2]). +-export([ sockname/1 + , setopts/2 + , getstat/2 + ]). -record(ssl_socket, {tcp, ssl}). diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 01bab4c78..37642a17f 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -22,17 +22,34 @@ -export([start_link/0]). --export([register_connection/1, register_connection/2]). --export([unregister_connection/1, unregister_connection/2]). --export([get_conn_attrs/1, get_conn_attrs/2]). --export([set_conn_attrs/2, set_conn_attrs/3]). --export([get_conn_stats/1, get_conn_stats/2]). --export([set_conn_stats/2, set_conn_stats/3]). +-export([ register_connection/1 + , register_connection/2 + , unregister_connection/1 + , unregister_connection/2 + ]). + +-export([ get_conn_attrs/1 + , get_conn_attrs/2 + , set_conn_attrs/2 + , set_conn_attrs/3 + ]). + +-export([ get_conn_stats/1 + , get_conn_stats/2 + , set_conn_stats/2 + , set_conn_stats/3 + ]). + -export([lookup_conn_pid/1]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). %% internal export -export([stats_fun/0]). diff --git a/src/emqx_config.erl b/src/emqx_config.erl index 5be039cf1..3d37fc001 100644 --- a/src/emqx_config.erl +++ b/src/emqx_config.erl @@ -23,11 +23,20 @@ -module(emqx_config). --export([get_env/1, get_env/2]). - -export([populate/1]). --export([read/1, write/2, dump/2, reload/1, get/2, get/3, set/3]). +-export([ read/1 + , write/2 + , dump/2 + , reload/1 + ]). + +-export([ set/3 + , get/2 + , get/3 + , get_env/1 + , get_env/2 + ]). -type(env() :: {atom(), term()}). diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 74f98b2f0..5e5d758f8 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -21,15 +21,28 @@ -include("logger.hrl"). -export([start_link/3]). + +%% APIs -export([info/1]). + -export([attrs/1]). + -export([stats/1]). + -export([kick/1]). + -export([session/1]). %% gen_statem callbacks --export([idle/3, connected/3]). --export([init/1, callback_mode/0, code_change/4, terminate/3]). +-export([ idle/3 + , connected/3 + ]). + +-export([ init/1 + , callback_mode/0 + , code_change/4 + , terminate/3 + ]). -record(state, { transport, diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index df7c1d1c9..55be098ad 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -19,11 +19,25 @@ -include("logger.hrl"). -export([start_link/0]). --export([register_command/2, register_command/3, unregister_command/1]). --export([run_command/1, run_command/2, lookup_command/1]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ register_command/2 + , register_command/3 + , unregister_command/1 + ]). + +-export([ run_command/1 + , run_command/2 + , lookup_command/1 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -record(state, {seq = 0}). diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index c3d0d8abd..ed1d3e0c8 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -23,10 +23,18 @@ -export([start_link/0]). --export([is_banned/1, banned/1]). +-export([ is_banned/1 + , banned/1 + ]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -define(SERVER, ?MODULE). diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 19c6ca87b..bdc440215 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -17,9 +17,14 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --export([initial_state/0, initial_state/1]). --export([parse/2]). --export([serialize/1, serialize/2]). +-export([ initial_state/0 + , initial_state/1 + ]). + +-export([ parse/2 + , serialize/1 + , serialize/2 + ]). -type(options() :: #{max_packet_size => 1..?MAX_PACKET_SIZE, version => emqx_mqtt_types:version()}). diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index 1c438ddf1..fa0d3d6ed 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -23,7 +23,11 @@ -include("types.hrl"). --export([init/1, run/3, info/1, reset/1]). +-export([ init/1 + , run/3 + , info/1 + , reset/1 + ]). -type(opts() :: #{count => integer(), bytes => integer()}). diff --git a/src/emqx_guid.erl b/src/emqx_guid.erl index 57baebaaf..2e54e9aaa 100644 --- a/src/emqx_guid.erl +++ b/src/emqx_guid.erl @@ -28,7 +28,14 @@ -module(emqx_guid). --export([gen/0, new/0, timestamp/1, to_hexstr/1, from_hexstr/1, to_base62/1, from_base62/1]). +-export([ gen/0 + , new/0 + , timestamp/1 + , to_hexstr/1 + , from_hexstr/1 + , to_base62/1 + , from_base62/1 + ]). -define(MAX_SEQ, 16#FFFF). diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index b17507275..4f8e674ba 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -22,11 +22,23 @@ -export([start_link/0, stop/0]). %% Hooks API --export([add/2, add/3, add/4, del/2, run/2, run_fold/3, lookup/1]). +-export([ add/2 + , add/3 + , add/4 + , del/2 + , run/2 + , run_fold/3 + , lookup/1 + ]). %% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). %% Multiple callbacks can be registered on a hookpoint. %% The execution order depends on the priority value: diff --git a/src/emqx_inflight.erl b/src/emqx_inflight.erl index 7e1fc8a67..7a41e4ec7 100644 --- a/src/emqx_inflight.erl +++ b/src/emqx_inflight.erl @@ -14,11 +14,27 @@ -module(emqx_inflight). --export([new/1, contain/2, lookup/2, insert/3, update/3, update_size/2, delete/2, - values/1, to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]). +%% APIs +-export([ new/1 + , contain/2 + , lookup/2 + , insert/3 + , update/3 + , update_size/2 + , delete/2 + , values/1 + , to_list/1 + , size/1 + , max_size/1 + , is_full/1 + , is_empty/1 + , window/1 + ]). -type(key() :: term()). + -type(max_size() :: pos_integer()). + -opaque(inflight() :: {?MODULE, max_size(), gb_trees:tree()}). -define(Inflight(Tree), {?MODULE, _MaxSize, Tree}). @@ -26,6 +42,10 @@ -export_type([inflight/0]). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + -spec(new(non_neg_integer()) -> inflight()). new(MaxSize) when MaxSize >= 0 -> {?MODULE, MaxSize, gb_trees:empty()}. diff --git a/src/emqx_json.erl b/src/emqx_json.erl index 76eeab0be..07a2e9a23 100644 --- a/src/emqx_json.erl +++ b/src/emqx_json.erl @@ -14,8 +14,17 @@ -module(emqx_json). --export([encode/1, encode/2, safe_encode/1, safe_encode/2]). --export([decode/1, decode/2, safe_decode/1, safe_decode/2]). +-export([ encode/1 + , encode/2 + , safe_encode/1 + , safe_encode/2 + ]). + +-export([ decode/1 + , decode/2 + , safe_decode/1 + , safe_decode/2 + ]). -spec(encode(jsx:json_term()) -> jsx:json_text()). encode(Term) -> diff --git a/src/emqx_keepalive.erl b/src/emqx_keepalive.erl index 4cc71764a..ebd5e18d4 100644 --- a/src/emqx_keepalive.erl +++ b/src/emqx_keepalive.erl @@ -14,7 +14,11 @@ -module(emqx_keepalive). --export([start/3, check/1, cancel/1]). +%% APIs +-export([ start/3 + , check/1 + , cancel/1 + ]). -record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}). @@ -22,6 +26,10 @@ -export_type([keepalive/0]). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + %% @doc Start a keepalive -spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}). start(_, 0, _) -> diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index f52a376ba..20be60602 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -17,13 +17,26 @@ -include("emqx_mqtt.hrl"). --export([start/0, restart/0, stop/0]). --export([start_listener/1, start_listener/3]). --export([restart_listener/1, restart_listener/3]). --export([stop_listener/1, stop_listener/3]). +%% APIs +-export([ start/0 + , restart/0 + , stop/0 + ]). + +-export([ start_listener/1 + , start_listener/3 + , stop_listener/1 + , stop_listener/3 + , restart_listener/1 + , restart_listener/3 + ]). -type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + %% @doc Start all listeners. -spec(start() -> ok). start() -> diff --git a/src/emqx_logger.erl b/src/emqx_logger.erl index 9d215affa..b77fa2d70 100644 --- a/src/emqx_logger.erl +++ b/src/emqx_logger.erl @@ -16,18 +16,41 @@ -compile({no_auto_import,[error/1]}). --export([debug/1, debug/2, debug/3]). --export([info/1, info/2, info/3]). --export([warning/1, warning/2, warning/3]). --export([error/1, error/2, error/3]). --export([critical/1, critical/2, critical/3]). +%% Logs +-export([ debug/1 + , debug/2 + , debug/3 + , info/1 + , info/2 + , info/3 + , warning/1 + , warning/2 + , warning/3 + , error/1 + , error/2 + , error/3 + , critical/1 + , critical/2 + , critical/3 + ]). --export([set_metadata_peername/1, set_metadata_client_id/1]). --export([set_proc_metadata/1]). +%% Configs +-export([ set_metadata_peername/1 + , set_metadata_client_id/1 + , set_proc_metadata/1 + , set_primary_log_level/1 + , set_log_handler_level/2 + , set_log_level/1 + ]). --export([get_primary_log_level/0, set_primary_log_level/1]). --export([get_log_handlers/0, get_log_handler/1, set_log_handler_level/2]). --export([set_log_level/1]). +-export([ get_primary_log_level/0 + , get_log_handlers/0 + , get_log_handler/1 + ]). + +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ debug(Msg) -> logger:debug(Msg). @@ -97,9 +120,10 @@ set_log_level(Level) -> {error, Error} -> {error, {primary_logger_level, Error}} end. -%%======================== +%%------------------------------------------------------------------------------ %% Internal Functions -%%======================== +%%------------------------------------------------------------------------------ + log_hanlder_info(#{id := Id, level := Level, module := logger_std_h, config := #{type := Type}}) when Type =:= standard_io; Type =:= standard_error -> @@ -135,3 +159,4 @@ rollback([{ID, Level} | List]) -> emqx_logger:set_log_handler_level(ID, Level), rollback(List); rollback([]) -> ok. + diff --git a/src/emqx_logger_formatter.erl b/src/emqx_logger_formatter.erl index dd94cceb6..92c194883 100644 --- a/src/emqx_logger_formatter.erl +++ b/src/emqx_logger_formatter.erl @@ -24,6 +24,7 @@ -module(emqx_logger_formatter). -export([format/2]). + -export([check_config/1]). -define(DEFAULT_FORMAT_TEMPLATE_SINGLE, [time," ",level,": ",msg,"\n"]). diff --git a/src/emqx_logger_handler.erl b/src/emqx_logger_handler.erl index 5a8dc08d1..e0f5d9af4 100644 --- a/src/emqx_logger_handler.erl +++ b/src/emqx_logger_handler.erl @@ -15,6 +15,7 @@ -module(emqx_logger_handler). -export([log/2]). + -export([init/0]). init() -> diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 25e6492bd..e0f876719 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -17,13 +17,35 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --export([make/2, make/3, make/4]). --export([set_flags/2]). --export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]). --export([set_headers/2]). --export([get_header/2, get_header/3, set_header/3]). --export([is_expired/1, update_expiry/1]). --export([remove_topic_alias/1]). +-export([ make/2 + , make/3 + , make/4 + ]). + +-export([ get_flag/2 + , get_flag/3 + , set_flag/2 + , set_flag/3 + , set_flags/2 + , unset_flag/2 + ]). + +-export([ get_headers/1 + , get_header/2 + , get_header/3 + , set_header/3 + , set_headers/2 + , remove_header/2 + ]). + +-export([ is_expired/1 + , update_expiry/1 + ]). + +-export([ to_map/1 + , to_list/1 + ]). + -export([format/1]). -type(flag() :: atom()). @@ -40,13 +62,13 @@ make(From, Topic, Payload) -> -spec(make(atom() | emqx_types:client_id(), emqx_mqtt_types:qos(), emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). make(From, QoS, Topic, Payload) -> - #message{id = emqx_guid:gen(), - qos = QoS, - from = From, - flags = #{dup => false}, - topic = Topic, - payload = Payload, - timestamp = os:timestamp()}. + #message{id = emqx_guid:gen(), + qos = QoS, + from = From, + flags = #{dup => false}, + topic = Topic, + payload = Payload, + timestamp = os:timestamp()}. -spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()). set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) -> @@ -88,6 +110,10 @@ set_headers(New, Msg = #message{headers = Old}) when is_map(New) -> Msg#message{headers = maps:merge(Old, New)}; set_headers(undefined, Msg) -> Msg. +-spec(get_headers(emqx_types:message()) -> map()). +get_headers(Msg) -> + Msg#message.headers. + -spec(get_header(term(), emqx_types:message()) -> term()). get_header(Hdr, Msg) -> get_header(Hdr, Msg, undefined). @@ -101,14 +127,24 @@ set_header(Hdr, Val, Msg = #message{headers = undefined}) -> set_header(Hdr, Val, Msg = #message{headers = Headers}) -> Msg#message{headers = maps:put(Hdr, Val, Headers)}. +-spec(remove_header(term(), emqx_types:message()) -> emqx_types:message()). +remove_header(Hdr, Msg = #message{headers = Headers}) -> + case maps:is_key(Hdr, Headers) of + true -> + Msg#message{headers = maps:remove(Hdr, Headers)}; + false -> Msg + end. + -spec(is_expired(emqx_types:message()) -> boolean()). -is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> +is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, + timestamp = CreatedAt}) -> elapsed(CreatedAt) > timer:seconds(Interval); is_expired(_Msg) -> false. -spec(update_expiry(emqx_types:message()) -> emqx_types:message()). -update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> +update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, + timestamp = CreatedAt}) -> case elapsed(CreatedAt) of Elapsed when Elapsed > 0 -> set_header('Message-Expiry-Interval', max(1, Interval - (Elapsed div 1000)), Msg); @@ -116,8 +152,15 @@ update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, end; update_expiry(Msg) -> Msg. -remove_topic_alias(Msg = #message{headers = Headers}) -> - Msg#message{headers = maps:remove('Topic-Alias', Headers)}. +%% @doc Message to map +-spec(to_map(emqx_types:message()) -> map()). +to_map(Msg) -> + maps:from_list(to_list(Msg)). + +%% @doc Message to tuple list +-spec(to_list(emqx_types:message()) -> map()). +to_list(Msg) -> + lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))). %% MilliSeconds elapsed(Since) -> @@ -133,3 +176,4 @@ format(flags, Flags) -> io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); format(headers, Headers) -> io_lib:format("~p", [Headers]). + diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 41176c7e2..827324bf4 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -14,20 +14,46 @@ -module(emqx_metrics). +-behavior(gen_server). + -include("logger.hrl"). -include("types.hrl"). -include("emqx_mqtt.hrl"). -export([start_link/0]). --export([new/1, all/0]). --export([val/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2]). --export([trans/2, trans/3, trans/4, commit/0]). + +-export([ new/1 + , all/0 + ]). + +-export([ val/1 + , inc/1 + , inc/2 + , inc/3 + , dec/2 + , dec/3 + , set/2 + ]). + +-export([ trans/2 + , trans/3 + , trans/4 + , commit/0 + ]). + %% Received/sent metrics --export([received/1, sent/1]). +-export([ received/1 + , sent/1 + ]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). %% Bytes sent and received of Broker -define(BYTES_METRICS, [ diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index c8f5449b8..e236b560f 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -14,10 +14,18 @@ -module(emqx_misc). --export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1, - proc_name/2, proc_stats/0, proc_stats/1]). +-export([ merge_opts/2 + , start_timer/2 + , start_timer/3 + , cancel_timer/1 + , proc_name/2 + , proc_stats/0 + , proc_stats/1 + ]). --export([init_proc_mng_policy/1, conn_proc_mng_policy/1]). +-export([ init_proc_mng_policy/1 + , conn_proc_mng_policy/1 + ]). -export([drain_down/1]). diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index 0c4152c4e..a09d5645d 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -19,17 +19,20 @@ -include("emqx.hrl"). -include("logger.hrl"). --export([load/1, unload/1]). +%% APIs +-export([ all_rules/0 + , check_acl/5 + , reload_acl/0 + ]). --export([all_rules/0]). +%% emqx_gen_mod callbacks +-export([ load/1 + , unload/1 + ]). --export([check_acl/5, reload_acl/0]). +-define(MFA(M, F, A), {M, F, A}). --define(ACL_RULE_TAB, emqx_acl_rule). - --define(FUNC(M, F, A), {M, F, A}). - --type(acl_rules() :: #{publish => [emqx_access_rule:rule()], +-type(acl_rules() :: #{publish => [emqx_access_rule:rule()], subscribe => [emqx_access_rule:rule()]}). %%------------------------------------------------------------------------------ @@ -37,28 +40,64 @@ %%------------------------------------------------------------------------------ load(_Env) -> - Rules = load_rules_from_file(acl_file()), - emqx_hooks:add('client.check_acl', ?FUNC(?MODULE, check_acl, [Rules]), -1). + Rules = rules_from_file(acl_file()), + emqx_hooks:add('client.check_acl', ?MFA(?MODULE, check_acl, [Rules]), -1). unload(_Env) -> - Rules = load_rules_from_file(acl_file()), - emqx_hooks:del('client.check_acl', ?FUNC(?MODULE, check_acl, [Rules])). + Rules = rules_from_file(acl_file()), + emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])). %% @doc Read all rules -spec(all_rules() -> list(emqx_access_rule:rule())). all_rules() -> - load_rules_from_file(acl_file()). + rules_from_file(acl_file()). %%------------------------------------------------------------------------------ %% ACL callbacks %%------------------------------------------------------------------------------ -load_rules_from_file(AclFile) -> +%% @doc Check ACL +-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic(), + emqx_access_rule:acl_result(), acl_rules()) + -> {ok, allow} | {ok, deny} | ok). +check_acl(Credentials, PubSub, Topic, _AclResult, Rules) -> + case match(Credentials, Topic, lookup(PubSub, Rules)) of + {matched, allow} -> {ok, allow}; + {matched, deny} -> {ok, deny}; + nomatch -> ok + end. + +-spec(reload_acl() -> ok | {error, term()}). +reload_acl() -> + unload([]), load([]). + +%%------------------------------------------------------------------------------ +%% Internal Functions +%%------------------------------------------------------------------------------ + +acl_file() -> + emqx_config:get_env(acl_file). + +lookup(PubSub, Rules) -> + maps:get(PubSub, Rules, []). + +match(_Credentials, _Topic, []) -> + nomatch; +match(Credentials, Topic, [Rule|Rules]) -> + case emqx_access_rule:match(Credentials, Topic, Rule) of + nomatch -> + match(Credentials, Topic, Rules); + {matched, AllowDeny} -> + {matched, AllowDeny} + end. + +-spec(rules_from_file(file:filename()) -> map()). +rules_from_file(AclFile) -> case file:consult(AclFile) of {ok, Terms} -> Rules = [emqx_access_rule:compile(Term) || Term <- Terms], - #{publish => lists:filter(fun(Rule) -> filter(publish, Rule) end, Rules), - subscribe => lists:filter(fun(Rule) -> filter(subscribe, Rule) end, Rules)}; + #{publish => [Rule || Rule <- Rules, filter(publish, Rule)], + subscribe => [Rule || Rule <- Rules, filter(subscribe, Rule)]}; {error, Reason} -> ?LOG(error, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]), #{} @@ -77,43 +116,3 @@ filter(subscribe, {_AllowDeny, _Who, subscribe, _Topics}) -> filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> false. -%% @doc Check ACL --spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic(), - emqx_access_rule:acl_result(), acl_rules()) - -> {ok, allow} | {ok, deny} | ok). -check_acl(Credentials, PubSub, Topic, _AclResult, Rules) -> - case match(Credentials, Topic, lookup(PubSub, Rules)) of - {matched, allow} -> {ok, allow}; - {matched, deny} -> {ok, deny}; - nomatch -> ok - end. - -lookup(PubSub, Rules) -> - maps:get(PubSub, Rules, []). - -match(_Credentials, _Topic, []) -> - nomatch; -match(Credentials, Topic, [Rule|Rules]) -> - case emqx_access_rule:match(Credentials, Topic, Rule) of - nomatch -> - match(Credentials, Topic, Rules); - {matched, AllowDeny} -> - {matched, AllowDeny} - end. - --spec(reload_acl() -> ok | {error, term()}). -reload_acl() -> - try load_rules_from_file(acl_file()) of - _ -> - emqx_logger:info("Reload acl_file ~s successfully", [acl_file()]), - ok; - {error, Error} -> - {error, Error} - catch - error:Reason:StackTrace -> - ?LOG(error, "Reload acl failed. StackTrace: ~p", [StackTrace]), - {error, Reason} - end. - -acl_file() -> - emqx_config:get_env(acl_file). diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 090a846cd..1d45fee2e 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -18,12 +18,22 @@ -include("emqx.hrl"). --export([load/1, unload/1]). +%% APIs +-export([ on_client_connected/4 + , on_client_disconnected/3 + ]). --export([on_client_connected/4, on_client_disconnected/3]). +%% emqx_gen_mod callbacks +-export([ load/1 + , unload/1 + ]). -define(ATTR_KEYS, [clean_start, proto_ver, proto_name, keepalive]). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + load(Env) -> emqx_hooks:add('client.connected', fun ?MODULE:on_client_connected/4, [Env]), emqx_hooks:add('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]). diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index b07694c56..b06428e9f 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -14,12 +14,21 @@ -module(emqx_mod_rewrite). +-behavior(emqx_gen_mod). + -include_lib("emqx.hrl"). -include_lib("emqx_mqtt.hrl"). --export([load/1, unload/1]). +%% APIs +-export([ rewrite_subscribe/3 + , rewrite_unsubscribe/3 + , rewrite_publish/2 + ]). --export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]). +%% emqx_gen_mod callbacks +-export([ load/1 + , unload/1 + ]). %%------------------------------------------------------------------------------ %% Load/Unload diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index bba4e2ebf..c674adf97 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -19,11 +19,16 @@ -include_lib("emqx.hrl"). -include_lib("emqx_mqtt.hrl"). --export([load/1, on_session_created/3, unload/1]). +%% APIs +-export([on_session_created/3]). -%%-------------------------------------------------------------------- +%% emqx_gen_mod callbacks +-export([ load/1 + , unload/1 + ]). +%%------------------------------------------------------------------------------ %% Load/Unload Hook -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ load(Topics) -> emqx_hooks:add('session.created', fun ?MODULE:on_session_created/3, [Topics]). @@ -38,9 +43,9 @@ on_session_created(#{client_id := ClientId}, SessAttrs, Topics) -> unload(_) -> emqx_hooks:del('session.created', fun ?MODULE:on_session_created/3). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ rep(<<"%c">>, ClientId, Topic) -> emqx_topic:feed_var(<<"%c">>, ClientId, Topic); diff --git a/src/emqx_mod_sup.erl b/src/emqx_mod_sup.erl index 1361854ea..54a77feda 100644 --- a/src/emqx_mod_sup.erl +++ b/src/emqx_mod_sup.erl @@ -16,7 +16,12 @@ -behaviour(supervisor). --export([start_link/0, start_child/1, start_child/2, stop_child/1]). +-export([ start_link/0 + , start_child/1 + , start_child/2 + , stop_child/1 + ]). + -export([init/1]). %% Helper macro for declaring children of supervisor diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index 777db54fc..93110cafe 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -14,7 +14,9 @@ -module(emqx_modules). --export([load/0, unload/0]). +-export([ load/0 + , unload/0 + ]). -spec(load() -> ok). load() -> diff --git a/src/emqx_mountpoint.erl b/src/emqx_mountpoint.erl index 8b85ebb94..ef5d28e4d 100644 --- a/src/emqx_mountpoint.erl +++ b/src/emqx_mountpoint.erl @@ -17,12 +17,20 @@ -include("emqx.hrl"). -include("logger.hrl"). --export([mount/2, unmount/2]). +-export([ mount/2 + , unmount/2 + ]). + -export([replvar/2]). -type(mountpoint() :: binary()). + -export_type([mountpoint/0]). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + mount(undefined, Any) -> Any; mount(MountPoint, Msg = #message{topic = Topic}) -> @@ -53,3 +61,4 @@ feed_var({<<"%u">>, undefined}, MountPoint) -> MountPoint; feed_var({<<"%u">>, Username}, MountPoint) -> emqx_topic:feed_var(<<"%u">>, Username, MountPoint). + diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 8a3211ec0..ff38f687e 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -18,8 +18,11 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --export([check_pub/2, check_sub/2]). --export([get_caps/1, get_caps/2]). +-export([ check_pub/2 + , check_sub/2 + , get_caps/1 + , get_caps/2 + ]). -type(caps() :: #{max_packet_size => integer(), max_clientid_len => integer(), diff --git a/src/emqx_mqtt_props.erl b/src/emqx_mqtt_props.erl index 71767537b..686bb5bc8 100644 --- a/src/emqx_mqtt_props.erl +++ b/src/emqx_mqtt_props.erl @@ -17,7 +17,11 @@ -include("emqx_mqtt.hrl"). --export([id/1, name/1, filter/2, validate/1]). +-export([ id/1 + , name/1 + , filter/2 + , validate/1 + ]). -define(PROPS_TABLE, #{16#01 => {'Payload-Format-Indicator', 'Byte', [?PUBLISH]}, diff --git a/src/emqx_mqtt_types.erl b/src/emqx_mqtt_types.erl index cb797e36a..0274b6ac3 100644 --- a/src/emqx_mqtt_types.erl +++ b/src/emqx_mqtt_types.erl @@ -40,3 +40,4 @@ }). -type(topic_filters() :: [{emqx_topic:topic(), subopts()}]). -type(packet() :: #mqtt_packet{}). + diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 016ff007f..594599daf 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -50,10 +50,15 @@ -include("emqx_mqtt.hrl"). -export([init/1]). --export([is_empty/1]). --export([len/1, max_len/1]). --export([in/2, out/1]). --export([stats/1, dropped/1]). + +-export([ is_empty/1 + , len/1 + , max_len/1 + , in/2 + , out/1 + , stats/1 + , dropped/1 + ]). -export_type([mqueue/0, options/0]). diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index 3d186e93a..f8d8f41e1 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -20,31 +20,34 @@ -export([start_link/1]). --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). --export([get_cpu_check_interval/0, - set_cpu_check_interval/1, - get_cpu_high_watermark/0, - set_cpu_high_watermark/1, - get_cpu_low_watermark/0, - set_cpu_low_watermark/1, - get_mem_check_interval/0, - set_mem_check_interval/1, - get_sysmem_high_watermark/0, - set_sysmem_high_watermark/1, - get_procmem_high_watermark/0, - set_procmem_high_watermark/1]). +-export([ get_cpu_check_interval/0 + , set_cpu_check_interval/1 + , get_cpu_high_watermark/0 + , set_cpu_high_watermark/1 + , get_cpu_low_watermark/0 + , set_cpu_low_watermark/1 + , get_mem_check_interval/0 + , set_mem_check_interval/1 + , get_sysmem_high_watermark/0 + , set_sysmem_high_watermark/1 + , get_procmem_high_watermark/0 + , set_procmem_high_watermark/1 + ]). -define(OS_MON, ?MODULE). -%%---------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% API -%%---------------------------------------------------------------------- +%%------------------------------------------------------------------------------ start_link(Opts) -> gen_server:start_link({local, ?OS_MON}, ?MODULE, [Opts], []). @@ -85,9 +88,9 @@ get_procmem_high_watermark() -> set_procmem_high_watermark(Float) -> memsup:set_procmem_high_watermark(Float). -%%---------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%---------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init([Opts]) -> _ = cpu_sup:util(), @@ -148,11 +151,12 @@ terminate(_Reason, #{timer := Timer}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%---------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%---------------------------------------------------------------------- +%%------------------------------------------------------------------------------ call(Req) -> gen_server:call(?OS_MON, Req, infinity). ensure_check_timer(State = #{cpu_check_interval := Interval}) -> - State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. \ No newline at end of file + State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. + diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 0224c239a..3e5b0ec0c 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -17,12 +17,14 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --export([protocol_name/1]). --export([type_name/1]). --export([validate/1]). --export([format/1]). --export([to_message/2, from_message/2]). --export([will_msg/1]). +-export([ protocol_name/1 + , type_name/1 + , validate/1 + , format/1 + , to_message/2 + , from_message/2 + , will_msg/1 + ]). %% @doc Protocol name of version -spec(protocol_name(emqx_mqtt_types:version()) -> binary()). @@ -254,3 +256,4 @@ format_password(_Password) -> '******'. i(true) -> 1; i(false) -> 0; i(I) when is_integer(I) -> I. + diff --git a/src/emqx_pd.erl b/src/emqx_pd.erl index 12603ef38..04b4d6075 100644 --- a/src/emqx_pd.erl +++ b/src/emqx_pd.erl @@ -17,7 +17,10 @@ -include("types.hrl"). --export([update_counter/2, get_counter/1, reset_counter/1]). +-export([ update_counter/2 + , get_counter/1 + , reset_counter/1 + ]). -type(key() :: term()). diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index 164655d47..83d6863e8 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -18,15 +18,18 @@ -export([init/0]). --export([load/0, unload/0]). +-export([ load/0 + , load/1 + , unload/0 + , unload/1 + , list/0 + , find_plugin/1 + , load_expand_plugin/1 + ]). --export([load/1, unload/1]). - --export([list/0]). - --export([find_plugin/1]). - --export([load_expand_plugin/1]). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ %% @doc Init plugins' config -spec(init() -> ok). @@ -302,3 +305,4 @@ write_loaded(AppNames) -> emqx_logger:error("Open File ~p Error: ~p", [File, Error]), {error, Error} end. + diff --git a/src/emqx_pmon.erl b/src/emqx_pmon.erl index a16154dce..6aa880f90 100644 --- a/src/emqx_pmon.erl +++ b/src/emqx_pmon.erl @@ -17,15 +17,26 @@ -compile({no_auto_import, [monitor/3]}). -export([new/0]). --export([monitor/2, monitor/3]). --export([demonitor/2]). --export([find/2]). --export([erase/2, erase_all/2]). + +-export([ monitor/2 + , monitor/3 + , demonitor/2 + ]). + +-export([ find/2 + , erase/2 + , erase_all/2 + ]). + -export([count/1]). -type(pmon() :: {?MODULE, map()}). -export_type([pmon/0]). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + -spec(new() -> pmon()). new() -> {?MODULE, maps:new()}. diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index 6f952db72..db5f8db35 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -19,21 +19,36 @@ -include("logger.hrl"). -include("types.hrl"). +%% APIs -export([start_link/2]). --export([submit/1, submit/2]). --export([async_submit/1, async_submit/2]). + +-export([ submit/1 + , submit/2 + , async_submit/1 + , async_submit/2 + ]). + -ifdef(TEST). -export([worker/0]). -endif. %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -define(POOL, ?MODULE). -type(task() :: fun() | mfa() | {fun(), Args :: list(any())}). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + %% @doc Start pool. -spec(start_link(atom(), pos_integer()) -> startlink_ret()). start_link(Pool, Id) -> diff --git a/src/emqx_pool_sup.erl b/src/emqx_pool_sup.erl index aebf48c20..1e884bafe 100644 --- a/src/emqx_pool_sup.erl +++ b/src/emqx_pool_sup.erl @@ -17,7 +17,11 @@ -behaviour(supervisor). -export([spec/1, spec/2]). --export([start_link/0, start_link/3, start_link/4]). + +-export([ start_link/0 + , start_link/3 + , start_link/4 + ]). -export([init/1]). diff --git a/src/emqx_pqueue.erl b/src/emqx_pqueue.erl index f9392d5f6..06ea192fb 100644 --- a/src/emqx_pqueue.erl +++ b/src/emqx_pqueue.erl @@ -39,8 +39,23 @@ -module(emqx_pqueue). --export([new/0, is_queue/1, is_empty/1, len/1, plen/2, to_list/1, from_list/1, - in/2, in/3, out/1, out/2, out_p/1, join/2, filter/2, fold/3, highest/1]). +-export([ new/0 + , is_queue/1 + , is_empty/1 + , len/1 + , plen/2 + , to_list/1 + , from_list/1 + , in/2 + , in/3 + , out/1 + , out/2 + , out_p/1 + , join/2 + , filter/2 + , fold/3 + , highest/1 + ]). %%---------------------------------------------------------------------------- diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 4548821b0..cd5756671 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -18,21 +18,22 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). --export([init/2]). --export([info/1]). --export([attrs/1]). --export([attr/2]). --export([caps/1]). --export([stats/1]). --export([client_id/1]). --export([credentials/1]). --export([parser/1]). --export([session/1]). --export([received/2]). --export([process/2]). --export([deliver/2]). --export([send/2]). --export([terminate/2]). +-export([ init/2 + , info/1 + , attrs/1 + , attr/2 + , caps/1 + , stats/1 + , client_id/1 + , credentials/1 + , parser/1 + , session/1 + , received/2 + , process/2 + , deliver/2 + , send/2 + , terminate/2 + ]). -export_type([state/0]). @@ -654,7 +655,7 @@ deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) -> Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg), Msg1 = emqx_message:update_expiry(Msg0), Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), - send(emqx_packet:from_message(PacketId, emqx_message:remove_topic_alias(Msg2)), PState); + send(emqx_packet:from_message(PacketId, Msg2), PState); deliver({puback, PacketId, ReasonCode}, PState) -> send(?PUBACK_PACKET(PacketId, ReasonCode), PState); diff --git a/src/emqx_rate_limiter.erl b/src/emqx_rate_limiter.erl index c01af7eda..5298651a6 100644 --- a/src/emqx_rate_limiter.erl +++ b/src/emqx_rate_limiter.erl @@ -20,25 +20,30 @@ -export([start_link/0]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -define(SERVER, ?MODULE). -record(state, {}). -%%%=================================================================== +%%------------------------------------------------------------------------------ %%% API -%%%=================================================================== +%%------------------------------------------------------------------------------ %% @doc Starts the server -spec(start_link() -> {ok, pid()} | ignore | {error, any()}). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -%%%=================================================================== +%%------------------------------------------------------------------------------ %%% gen_server callbacks -%%%=================================================================== +%%------------------------------------------------------------------------------ init([]) -> {ok, #state{}}. @@ -59,7 +64,7 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%%=================================================================== +%%------------------------------------------------------------------------------ %%% Internal functions -%%%=================================================================== +%%------------------------------------------------------------------------------ diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index ebe992ffe..b8a9b9e4b 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -17,7 +17,11 @@ -include("emqx_mqtt.hrl"). --export([name/2, text/1, connack_error/1]). +-export([ name/2 + , text/1 + , connack_error/1 + ]). + -export([compat/2]). name(I, Ver) when Ver >= ?MQTT_PROTO_V5 -> @@ -146,6 +150,7 @@ compat(unsuback, _Code) -> undefined. connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID; connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD; +connack_error(bad_clientid_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD; connack_error(username_or_password_undefined) -> ?RC_BAD_USER_NAME_OR_PASSWORD; connack_error(password_error) -> ?RC_BAD_USER_NAME_OR_PASSWORD; connack_error(not_authorized) -> ?RC_NOT_AUTHORIZED; diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 224a14e48..868094609 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -30,19 +30,38 @@ -export([start_link/2]). %% Route APIs --export([add_route/1, add_route/2]). --export([do_add_route/1, do_add_route/2]). --export([match_routes/1, lookup_routes/1, has_routes/1]). --export([delete_route/1, delete_route/2]). --export([do_delete_route/1, do_delete_route/2]). +-export([ add_route/1 + , add_route/2 + , do_add_route/1 + , do_add_route/2 + ]). + +-export([ delete_route/1 + , delete_route/2 + , do_delete_route/1 + , do_delete_route/2 + ]). + +-export([ match_routes/1 + , lookup_routes/1 + , has_routes/1 + ]). + -export([print_routes/1]). + -export([topics/0]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -type(group() :: binary()). + -type(destination() :: node() | {group(), node()}). -define(ROUTE, emqx_route). diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index 9772f8050..0e2abb731 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -27,15 +27,22 @@ -copy_mnesia({mnesia, [copy]}). %% API --export([start_link/0, monitor/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ start_link/0 + , monitor/1 + ]). %% Internal export -export([stats_fun/0]). +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + -record(routing_node, {name, const = unused}). -define(ROUTE, emqx_route). diff --git a/src/emqx_rpc.erl b/src/emqx_rpc.erl index b658023fd..4d01b6229 100644 --- a/src/emqx_rpc.erl +++ b/src/emqx_rpc.erl @@ -15,8 +15,10 @@ %% @doc wrap gen_rpc? -module(emqx_rpc). --export([call/4, cast/4]). --export([multicall/4]). +-export([ call/4 + , cast/4 + , multicall/4 + ]). -define(RPC, gen_rpc). diff --git a/src/emqx_sequence.erl b/src/emqx_sequence.erl index 4c25108ea..f02165ea6 100644 --- a/src/emqx_sequence.erl +++ b/src/emqx_sequence.erl @@ -14,14 +14,25 @@ -module(emqx_sequence). --export([create/1, nextval/2, currval/2, reclaim/2, delete/1]). +-export([ create/1 + , nextval/2 + , currval/2 + , reclaim/2 + , delete/1 + ]). -type(key() :: term()). + -type(name() :: atom()). + -type(seqid() :: non_neg_integer()). -export_type([seqid/0]). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + %% @doc Create a sequence. -spec(create(name()) -> ok). create(Name) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 8be299ced..49c63b539 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -46,21 +46,40 @@ -include("types.hrl"). -export([start_link/1]). --export([info/1, attrs/1]). --export([stats/1]). --export([resume/2, discard/2]). --export([update_expiry_interval/2]). --export([subscribe/2, subscribe/4]). --export([publish/3]). --export([puback/2, puback/3]). --export([pubrec/2, pubrec/3]). --export([pubrel/3, pubcomp/3]). --export([unsubscribe/2, unsubscribe/4]). + +-export([ info/1 + , attrs/1 + , stats/1 + ]). + +-export([ resume/2 + , discard/2 + , update_expiry_interval/2 + ]). + +-export([ subscribe/2 + , subscribe/4 + , unsubscribe/2 + , unsubscribe/4 + , publish/3 + , puback/2 + , puback/3 + , pubrec/2 + , pubrec/3 + , pubrel/3 + , pubcomp/3 + ]). + -export([close/1]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -import(emqx_zone, [get_env/2, get_env/3]). diff --git a/src/emqx_session_sup.erl b/src/emqx_session_sup.erl index 3e360794d..4672687a3 100644 --- a/src/emqx_session_sup.erl +++ b/src/emqx_session_sup.erl @@ -20,20 +20,28 @@ -include("types.hrl"). -export([start_link/1]). --export([start_session/1, count_sessions/0]). + +-export([ start_session/1 + , count_sessions/0 + ]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -type(shutdown() :: brutal_kill | infinity | pos_integer()). --record(state, { - sessions :: #{pid() => emqx_types:client_id()}, - mfargs :: mfa(), - shutdown :: shutdown(), - clean_down :: fun() - }). +-record(state, + { sessions :: #{pid() => emqx_types:client_id()} + , mfargs :: mfa() + , shutdown :: shutdown() + , clean_down :: fun() + }). -define(SUP, ?MODULE). -define(BATCH_EXIT, 100000). diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 65f535059..d25910e31 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -27,18 +27,32 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). +%% APIs -export([start_link/0]). --export([subscribe/3, unsubscribe/3]). +-export([ subscribe/3 + , unsubscribe/3 + ]). + -export([dispatch/3]). --export([maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, is_ack_required/1]). + +-export([ maybe_ack/1 + , maybe_nack_dropped/1 + , nack_no_connection/1 + , is_ack_required/1 + ]). %% for testing -export([subscribers/2]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -define(SERVER, ?MODULE). -define(TAB, emqx_shared_subscription). diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 2feb42433..900e7ca1a 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -20,17 +20,30 @@ -include("logger.hrl"). -include("types.hrl"). +%% APIs -export([start_link/0]). --export([open_session/1, close_session/1]). --export([resume_session/2]). --export([discard_session/1, discard_session/2]). --export([register_session/1, register_session/2]). --export([unregister_session/1, unregister_session/2]). --export([get_session_attrs/1, get_session_attrs/2, - set_session_attrs/2, set_session_attrs/3]). --export([get_session_stats/1, get_session_stats/2, - set_session_stats/2, set_session_stats/3]). +-export([ open_session/1 + , close_session/1 + , resume_session/2 + , discard_session/1 + , discard_session/2 + , register_session/1 + , register_session/2 + , unregister_session/1 + , unregister_session/2 + ]). + +-export([ get_session_attrs/1 + , get_session_attrs/2 + , set_session_attrs/2 + , set_session_attrs/3 + , get_session_stats/1 + , get_session_stats/2 + , set_session_stats/2 + , set_session_stats/3 + ]). + -export([lookup_session_pids/1]). %% Internal functions for rpc @@ -43,8 +56,13 @@ -export([clean_down/1]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -define(SM, ?MODULE). @@ -56,6 +74,10 @@ -define(BATCH_SIZE, 100000). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + -spec(start_link() -> startlink_ret()). start_link() -> gen_server:start_link({local, ?SM}, ?MODULE, [], []). diff --git a/src/emqx_sm_locker.erl b/src/emqx_sm_locker.erl index b7361f102..854b95653 100644 --- a/src/emqx_sm_locker.erl +++ b/src/emqx_sm_locker.erl @@ -19,8 +19,16 @@ -export([start_link/0]). --export([trans/2, trans/3]). --export([lock/1, lock/2, unlock/1]). +-export([ trans/2 + , trans/3 + , lock/1 + , lock/2 + , unlock/1 + ]). + +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ -spec(start_link() -> startlink_ret()). start_link() -> diff --git a/src/emqx_sm_registry.erl b/src/emqx_sm_registry.erl index d2aede1ed..e49b76e2c 100644 --- a/src/emqx_sm_registry.erl +++ b/src/emqx_sm_registry.erl @@ -21,12 +21,21 @@ -include("types.hrl"). -export([start_link/0]). --export([is_enabled/0]). --export([register_session/1, lookup_session/1, unregister_session/1]). + +-export([ is_enabled/0 + , register_session/1 + , lookup_session/1 + , unregister_session/1 + ]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -define(REGISTRY, ?MODULE). -define(TAB, emqx_session_registry). @@ -36,6 +45,10 @@ -type(session_pid() :: pid()). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + %% @doc Start the global session manager. -spec(start_link() -> startlink_ret()). start_link() -> diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index a7cbbb314..cef81b8c7 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -20,17 +20,32 @@ -include("logger.hrl"). -include("types.hrl"). --export([start_link/0, start_link/1, stop/0]). +%% APIs +-export([ start_link/0 + , start_link/1 + , stop/0 + ]). %% Stats API. --export([getstats/0, getstat/1]). --export([setstat/2, setstat/3]). --export([statsfun/1, statsfun/2]). --export([update_interval/2, update_interval/3, cancel_update/1]). +-export([ getstats/0 + , getstat/1 + , setstat/2 + , setstat/3 + , statsfun/1 + , statsfun/2 + , update_interval/2 + , update_interval/3 + , cancel_update/1 + ]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -record(update, {name, countdown, interval, func}). -record(state, {timer, updates :: [#update{}], diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index 72ec6431f..dc30f4af1 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -16,7 +16,11 @@ -behaviour(supervisor). --export([start_link/0, start_child/1, start_child/2, stop_child/1]). +-export([ start_link/0 + , start_child/1 + , start_child/2 + , stop_child/1 + ]). -export([init/1]). @@ -26,13 +30,13 @@ -define(SUPERVISOR, ?MODULE). -start_link() -> - supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- +start_link() -> + supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []). + -spec(start_child(supervisor:child_spec()) -> startchild_ret()). start_child(ChildSpec) when is_tuple(ChildSpec) -> supervisor:start_child(?SUPERVISOR, ChildSpec). diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index 3164cd92c..a0448fb18 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -20,10 +20,24 @@ -include("logger.hrl"). -export([start_link/0]). --export([version/0, uptime/0, datetime/0, sysdescr/0, sys_interval/0]). + +-export([ version/0 + , uptime/0 + , datetime/0 + , sysdescr/0 + , sys_interval/0 + ]). + -export([info/0]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -import(emqx_topic, [systop/1]). -import(emqx_misc, [start_timer/2]). @@ -40,6 +54,10 @@ sysdescr % Broker description ]). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + -spec(start_link() -> {ok, pid()} | ignore | {error, any()}). start_link() -> gen_server:start_link({local, ?SYS}, ?MODULE, [], []). diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index 11a3c8ebe..d6c894cc7 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -20,11 +20,19 @@ -include("types.hrl"). -export([start_link/1]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). + %% compress unused warning -export([procinfo/1]). +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + -type(option() :: {long_gc, false | pos_integer()} | {long_schedule, false | pos_integer()} | {large_heap, pos_integer()} @@ -33,6 +41,10 @@ -define(SYSMON, ?MODULE). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + %% @doc Start system monitor -spec(start_link(list(option())) -> startlink_ret()). start_link(Opts) -> diff --git a/src/emqx_tables.erl b/src/emqx_tables.erl index 837a9d3e9..2c11b9d88 100644 --- a/src/emqx_tables.erl +++ b/src/emqx_tables.erl @@ -15,7 +15,10 @@ -module(emqx_tables). -export([new/2]). --export([lookup_value/2, lookup_value/3]). + +-export([ lookup_value/2 + , lookup_value/3 + ]). %% Create a named_table ets. -spec(new(atom(), list()) -> ok). diff --git a/src/emqx_time.erl b/src/emqx_time.erl index 60787fa37..e0ef8e5fb 100644 --- a/src/emqx_time.erl +++ b/src/emqx_time.erl @@ -14,7 +14,13 @@ -module(emqx_time). --export([seed/0, now_secs/0, now_secs/1, now_ms/0, now_ms/1, ts_from_ms/1]). +-export([ seed/0 + , now_secs/0 + , now_secs/1 + , now_ms/0 + , now_ms/1 + , ts_from_ms/1 + ]). seed() -> rand:seed(exsplus, erlang:timestamp()). diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 913946a79..f1d15d8e2 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -16,17 +16,22 @@ -include("emqx_mqtt.hrl"). --export([match/2]). --export([validate/1, validate/2]). --export([levels/1]). --export([triples/1]). --export([tokens/1]). --export([words/1]). --export([wildcard/1]). --export([join/1, prepend/2]). --export([feed_var/3]). --export([systop/1]). --export([parse/1, parse/2]). +%% APIs +-export([ match/2 + , validate/1 + , validate/2 + , levels/1 + , triples/1 + , tokens/1 + , words/1 + , wildcard/1 + , join/1 + , prepend/2 + , feed_var/3 + , systop/1 + , parse/1 + , parse/2 + ]). -type(group() :: binary()). -type(topic() :: binary()). @@ -38,6 +43,10 @@ -define(MAX_TOPIC_LEN, 4096). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + %% @doc Is wildcard topic? -spec(wildcard(topic() | words()) -> true | false). wildcard(Topic) when is_binary(Topic) -> @@ -221,4 +230,3 @@ parse(Topic, Options = #{qos := QoS}) -> {Topic, Options#{rc => QoS}}; parse(Topic, Options) -> {Topic, Options}. - diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index e88442f61..051c9f6f2 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -18,12 +18,23 @@ -include("emqx.hrl"). +%% APIs -export([start_link/0]). --export([trace/2]). --export([start_trace/3, lookup_traces/0, stop_trace/1]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ trace/2 + , start_trace/3 + , lookup_traces/0 + , stop_trace/1 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -record(state, {traces}). @@ -42,6 +53,10 @@ []}]}, msg,"\n"]}}). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + -spec(start_link() -> {ok, pid()} | ignore | {error, term()}). start_link() -> gen_server:start_link({local, ?TRACER}, ?MODULE, [], []). diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index 9b9436ce0..1fb5e0ead 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -23,7 +23,12 @@ -copy_mnesia({mnesia, [copy]}). %% Trie APIs --export([insert/1, match/1, lookup/1, delete/1]). +-export([ insert/1 + , match/1 + , lookup/1 + , delete/1 + ]). + -export([empty/0]). %% Mnesia tables diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 586a1aef0..c8c274b70 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -18,14 +18,43 @@ -include("types.hrl"). -export_type([zone/0]). --export_type([pubsub/0, topic/0, subid/0, subopts/0]). --export_type([client_id/0, username/0, password/0, peername/0, protocol/0]). --export_type([credentials/0, session/0]). --export_type([subscription/0, subscriber/0, topic_table/0]). --export_type([payload/0, message/0]). --export_type([delivery/0, deliver_results/0]). + +-export_type([ pubsub/0 + , topic/0 + , subid/0 + , subopts/0 + ]). + +-export_type([ client_id/0 + , username/0 + , password/0 + , peername/0 + , protocol/0 + ]). + +-export_type([ credentials/0 + , session/0 + ]). + +-export_type([ subscription/0 + , subscriber/0 + , topic_table/0 + ]). + +-export_type([ payload/0 + , message/0 + ]). + +-export_type([ delivery/0 + , deliver_results/0 + ]). + -export_type([route/0]). --export_type([alarm/0, plugin/0, command/0]). + +-export_type([ alarm/0 + , plugin/0 + , command/0 + ]). -type(zone() :: atom()). -type(pubsub() :: publish | subscribe). @@ -43,6 +72,7 @@ -type(auth_result() :: success | client_identifier_not_valid | bad_username_or_password + | bad_clientid_or_password | not_authorized | server_unavailable | server_busy @@ -52,7 +82,7 @@ -type(credentials() :: #{client_id := client_id(), username := username(), peername := peername(), - result := auth_result(), + auth_result := auth_result(), zone => zone(), atom() => term() }). @@ -68,3 +98,4 @@ -type(alarm() :: #alarm{}). -type(plugin() :: #plugin{}). -type(command() :: #command{}). + diff --git a/src/emqx_vm.erl b/src/emqx_vm.erl index 0e12067c0..fcc9b3067 100644 --- a/src/emqx_vm.erl +++ b/src/emqx_vm.erl @@ -14,17 +14,36 @@ -module(emqx_vm). --export([schedulers/0]). --export([microsecs/0]). --export([loads/0, get_system_info/0, get_system_info/1, mem_info/0, scheduler_usage/1]). --export([get_memory/0]). --export([get_process_list/0, get_process_info/0, get_process_info/1, - get_process_gc/0, get_process_gc/1, - get_process_group_leader_info/1, - get_process_limit/0]). --export([get_ets_list/0, get_ets_info/0, get_ets_info/1, - get_ets_object/0, get_ets_object/1]). --export([get_port_types/0, get_port_info/0, get_port_info/1]). +-export([ schedulers/0 + , scheduler_usage/1 + , microsecs/0 + , get_system_info/0 + , get_system_info/1 + , get_memory/0 + , mem_info/0 + , loads/0 + ]). + +-export([ get_process_list/0 + , get_process_info/0 + , get_process_info/1 + , get_process_gc/0 + , get_process_gc/1 + , get_process_group_leader_info/1 + , get_process_limit/0 + ]). + +-export([ get_ets_list/0 + , get_ets_info/0 + , get_ets_info/1 + , get_ets_object/0 + , get_ets_object/1 + ]). + +-export([ get_port_types/0 + , get_port_info/0 + , get_port_info/1 + ]). -define(UTIL_ALLOCATORS, [temp_alloc, eheap_alloc, diff --git a/src/emqx_vm_mon.erl b/src/emqx_vm_mon.erl index be4f5749e..f91fc519c 100644 --- a/src/emqx_vm_mon.erl +++ b/src/emqx_vm_mon.erl @@ -16,21 +16,25 @@ -behaviour(gen_server). +%% APIs -export([start_link/1]). --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). +-export([ get_check_interval/0 + , set_check_interval/1 + , get_process_high_watermark/0 + , set_process_high_watermark/1 + , get_process_low_watermark/0 + , set_process_low_watermark/1 + ]). --export([get_check_interval/0, - set_check_interval/1, - get_process_high_watermark/0, - set_process_high_watermark/1, - get_process_low_watermark/0, - set_process_low_watermark/1]). +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -define(VM_MON, ?MODULE). @@ -121,4 +125,4 @@ call(Req) -> gen_server:call(?VM_MON, Req, infinity). ensure_check_timer(State = #{check_interval := Interval}) -> - State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. \ No newline at end of file + State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 27d52ab00..6d47e16ad 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -18,18 +18,20 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). --export([info/1]). --export([attrs/1]). --export([stats/1]). --export([kick/1]). --export([session/1]). +-export([ info/1 + , attrs/1 + , stats/1 + , kick/1 + , session/1 + ]). %% websocket callbacks --export([init/2]). --export([websocket_init/1]). --export([websocket_handle/2]). --export([websocket_info/2]). --export([terminate/3]). +-export([ init/2 + , websocket_init/1 + , websocket_handle/2 + , websocket_info/2 + , terminate/3 + ]). -record(state, { request, @@ -306,3 +308,4 @@ shutdown(Reason, State) -> wsock_stats() -> [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS]. + diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 522da1271..8a821713b 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -20,20 +20,34 @@ -include("logger.hrl"). -include("types.hrl"). +%% APIs -export([start_link/0]). --export([get_env/2, get_env/3]). --export([set_env/3]). --export([force_reload/0]). + +-export([ get_env/2 + , get_env/3 + , set_env/3 + , force_reload/0 + ]). + %% for test -export([stop/0]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -define(TAB, ?MODULE). -define(SERVER, ?MODULE). +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + -spec(start_link() -> startlink_ret()). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index 0975585cc..74f6702ce 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -24,12 +24,12 @@ -include_lib("eunit/include/eunit.hrl"). all() -> - [ - message_make, - message_flag, - message_header, - message_format, - message_expired + [ message_make + , message_flag + , message_header + , message_format + , message_expired + , message_to_map ]. message_make(_) -> @@ -60,7 +60,9 @@ message_header(_) -> Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg), Msg2 = emqx_message:set_header(c, 3, Msg1), ?assertEqual(1, emqx_message:get_header(a, Msg2)), - ?assertEqual(4, emqx_message:get_header(d, Msg2, 4)). + ?assertEqual(4, emqx_message:get_header(d, Msg2, 4)), + Msg3 = emqx_message:remove_header(a, Msg2), + ?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg3)). message_format(_) -> io:format("~s", [emqx_message:format(emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>))]). @@ -75,3 +77,17 @@ message_expired(_) -> timer:sleep(1000), Msg2 = emqx_message:update_expiry(Msg1), ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). + +message_to_map(_) -> + Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>), + List = [{id, Msg#message.id}, + {qos, ?QOS_1}, + {from, <<"clientid">>}, + {flags, #{dup => false}}, + {headers, #{}}, + {topic, <<"topic">>}, + {payload, <<"payload">>}, + {timestamp, Msg#message.timestamp}], + ?assertEqual(List, emqx_message:to_list(Msg)), + ?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)). +