Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-03-23 04:22:32 +08:00
commit 3d45da8e03
87 changed files with 1341 additions and 575 deletions

View File

@ -18,21 +18,43 @@
-include("types.hrl"). -include("types.hrl").
%% Start/Stop the application %% Start/Stop the application
-export([start/0, restart/1, is_running/1, stop/0]). -export([ start/0
, restart/1
, is_running/1
, stop/0
]).
%% PubSub API %% PubSub API
-export([subscribe/1, subscribe/2, subscribe/3]). -export([ subscribe/1
, subscribe/2
, subscribe/3
]).
-export([publish/1]). -export([publish/1]).
-export([unsubscribe/1]). -export([unsubscribe/1]).
%% PubSub management API %% PubSub management API
-export([topics/0, subscriptions/1, subscribers/1, subscribed/2]). -export([ topics/0
, subscriptions/1
, subscribers/1
, subscribed/2
]).
%% Hooks API %% Hooks API
-export([hook/2, hook/3, hook/4, unhook/2, run_hook/2, run_fold_hook/3]). -export([ hook/2
, hook/3
, hook/4
, unhook/2
, run_hook/2
, run_fold_hook/3
]).
%% Shutdown and reboot %% Shutdown and reboot
-export([shutdown/0, shutdown/1, reboot/0]). -export([ shutdown/0
, shutdown/1
, reboot/0
]).
-define(APP, ?MODULE). -define(APP, ?MODULE).

View File

@ -18,7 +18,10 @@
-include("logger.hrl"). -include("logger.hrl").
-export([authenticate/1]). -export([authenticate/1]).
-export([check_acl/3, reload_acl/0]).
-export([ check_acl/3
, reload_acl/0
]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% APIs %% APIs
@ -57,8 +60,10 @@ do_check_acl(#{zone := Zone} = Credentials, PubSub, Topic) ->
_ -> deny _ -> deny
end. end.
-spec(reload_acl() -> list(ok | {error, term()})). -spec(reload_acl() -> ok | {error, term()}).
reload_acl() -> reload_acl() ->
emqx_acl_cache:is_enabled() andalso
emqx_acl_cache:empty_acl_cache(),
emqx_mod_acl_internal:reload_acl(). emqx_mod_acl_internal:reload_acl().
init_auth_result(Credentials) -> init_auth_result(Credentials) ->

View File

@ -16,6 +16,11 @@
-include("emqx.hrl"). -include("emqx.hrl").
%% APIs
-export([ match/3
, compile/1
]).
-type(acl_result() :: allow | deny). -type(acl_result() :: allow | deny).
-type(who() :: all | binary() | -type(who() :: all | binary() |
@ -30,12 +35,13 @@
-export_type([rule/0]). -export_type([rule/0]).
-export([compile/1]).
-export([match/3]).
-define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))). -define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))).
-define(PUBSUB(A), ((A =:= subscribe) orelse (A =:= publish) orelse (A =:= pubsub))). -define(PUBSUB(A), ((A =:= subscribe) orelse (A =:= publish) orelse (A =:= pubsub))).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
%% @doc Compile Access Rule. %% @doc Compile Access Rule.
compile({A, all}) when ?ALLOW_DENY(A) -> compile({A, all}) when ?ALLOW_DENY(A) ->
{A, all}; {A, all};

View File

@ -16,19 +16,19 @@
-include("emqx.hrl"). -include("emqx.hrl").
-export([ get_acl_cache/2 -export([ get_acl_cache/2
, put_acl_cache/3 , put_acl_cache/3
, cleanup_acl_cache/0 , cleanup_acl_cache/0
, empty_acl_cache/0 , empty_acl_cache/0
, dump_acl_cache/0 , dump_acl_cache/0
, get_cache_size/0 , get_cache_size/0
, get_cache_max_size/0 , get_cache_max_size/0
, get_newest_key/0 , get_newest_key/0
, get_oldest_key/0 , get_oldest_key/0
, cache_k/2 , cache_k/2
, cache_v/1 , cache_v/1
, is_enabled/0 , is_enabled/0
]). ]).
-type(acl_result() :: allow | deny). -type(acl_result() :: allow | deny).

View File

@ -1,44 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_acl_mod).
-include("emqx.hrl").
%%--------------------------------------------------------------------
%% ACL behavihour
%%--------------------------------------------------------------------
-ifdef(use_specs).
-callback(init(AclOpts :: list()) -> {ok, State :: term()}).
-callback(check_acl({credentials(), pubsub(), topic()}, State :: term())
-> allow | deny | ignore).
-callback(reload_acl(State :: term()) -> ok | {error, term()}).
-callback(description() -> string()).
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
[{init, 1}, {check_acl, 2}, {reload_acl, 1}, {description, 0}];
behaviour_info(_Other) ->
undefined.
-endif.

View File

@ -25,15 +25,18 @@
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
-export([init/1, %% gen_server callbacks
handle_event/2, -export([ init/1
handle_call/2, , handle_event/2
handle_info/2, , handle_call/2
terminate/2]). , handle_info/2
, terminate/2
]).
-export([load/0, -export([ load/0
unload/0, , unload/0
get_alarms/0]). , get_alarms/0
]).
-record(common_alarm, {id, desc}). -record(common_alarm, {id, desc}).
-record(alarm_history, {id, clear_at}). -record(alarm_history, {id, clear_at}).
@ -171,4 +174,3 @@ set_alarm_history(Id) ->
mnesia:dirty_write(?ALARM_HISTORY_TAB, #alarm_history{id = Id, mnesia:dirty_write(?ALARM_HISTORY_TAB, #alarm_history{id = Id,
clear_at = undefined}). clear_at = undefined}).

View File

@ -16,7 +16,9 @@
-behaviour(application). -behaviour(application).
-export([start/2, stop/1]). -export([ start/2
, stop/1
]).
-define(APP, emqx). -define(APP, emqx).

View File

@ -1,41 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_auth_mod).
-include("emqx.hrl").
%%--------------------------------------------------------------------
%% Authentication behavihour
%%--------------------------------------------------------------------
-ifdef(use_specs).
-callback(init(AuthOpts :: list()) -> {ok, State :: term()}).
-callback(check(credentials(), password(), State :: term())
-> ok | {ok, boolean()} | {ok, map()} |
{continue, map()} | ignore | {error, term()}).
-callback(description() -> string()).
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
[{init, 1}, {check, 3}, {description, 0}];
behaviour_info(_Other) ->
undefined.
-endif.

View File

@ -27,11 +27,20 @@
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
-export([start_link/0]). -export([start_link/0]).
-export([check/1]).
-export([add/1, delete/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ add/1
code_change/3]). , delete/1
, check/1
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(TAB, ?MODULE). -define(TAB, ?MODULE).

View File

@ -14,15 +14,19 @@
-module(emqx_base62). -module(emqx_base62).
-export([encode/1, %% APIs
encode/2, -export([ encode/1
decode/1, , encode/2
decode/2]). , decode/1
, decode/2
]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
%% @doc Encode any data to base62 binary %% @doc Encode any data to base62 binary
-spec encode(string() -spec encode(string() | integer() | binary()) -> binary().
| integer()
| binary()) -> binary().
encode(I) when is_integer(I) -> encode(I) when is_integer(I) ->
encode(integer_to_binary(I)); encode(integer_to_binary(I));
encode(S) when is_list(S)-> encode(S) when is_list(S)->
@ -39,11 +43,9 @@ decode(L) when is_list(L) ->
decode(B) when is_binary(B) -> decode(B) when is_binary(B) ->
decode(B, <<>>). decode(B, <<>>).
%%------------------------------------------------------------------------------
%% Interval Functions
%%==================================================================== %%------------------------------------------------------------------------------
%% Internal functions
%%====================================================================
encode(D, string) -> encode(D, string) ->
binary_to_list(encode(D)); binary_to_list(encode(D));
@ -110,3 +112,4 @@ decode_char(I) when I >= $A andalso I =< $Z->
decode_char(9, I) -> decode_char(9, I) ->
I + 61 - $A. I + 61 - $A.

View File

@ -14,29 +14,38 @@
-module(emqx_batch). -module(emqx_batch).
-export([init/1, push/2, commit/1]). %% APIs
-export([size/1, items/1]). -export([ init/1
, push/2
, commit/1
, size/1
, items/1
]).
-type(options() :: #{ -record(batch,
batch_size => non_neg_integer(), { batch_size :: non_neg_integer()
linger_ms => pos_integer(), , batch_q :: list(any())
commit_fun := function() , linger_ms :: pos_integer()
, linger_timer :: reference() | undefined
, commit_fun :: function()
}).
-type(options() ::
#{ batch_size => non_neg_integer()
, linger_ms => pos_integer()
, commit_fun := function()
}). }).
-export_type([options/0]).
-record(batch, {
batch_size :: non_neg_integer(),
batch_q :: list(any()),
linger_ms :: pos_integer(),
linger_timer :: reference() | undefined,
commit_fun :: function()
}).
-opaque(batch() :: #batch{}). -opaque(batch() :: #batch{}).
-export_type([options/0]).
-export_type([batch/0]). -export_type([batch/0]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec(init(options()) -> batch()). -spec(init(options()) -> batch()).
init(Opts) when is_map(Opts) -> init(Opts) when is_map(Opts) ->
#batch{batch_size = maps:get(batch_size, Opts, 1000), #batch{batch_size = maps:get(batch_size, Opts, 1000),

View File

@ -61,25 +61,47 @@
-behaviour(gen_statem). -behaviour(gen_statem).
%% APIs %% APIs
-export([start_link/2, -export([ start_link/2
import_batch/2, , import_batch/2
handle_ack/2, , handle_ack/2
stop/1]). , stop/1
]).
%% gen_statem callbacks %% gen_statem callbacks
-export([terminate/3, code_change/4, init/1, callback_mode/0]). -export([ terminate/3
, code_change/4
, init/1
, callback_mode/0
]).
%% state functions %% state functions
-export([standing_by/3, connecting/3, connected/3]). -export([ standing_by/3
, connecting/3
, connected/3
]).
%% management APIs %% management APIs
-export([ensure_started/1, ensure_started/2, ensure_stopped/1, ensure_stopped/2, status/1]). -export([ ensure_started/1
-export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]). , ensure_started/2
-export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]). , ensure_stopped/1
, ensure_stopped/2
, status/1
]).
-export_type([config/0, -export([ get_forwards/1
batch/0, , ensure_forward_present/2
ack_ref/0]). , ensure_forward_absent/2
]).
-export([ get_subscriptions/1
, ensure_subscription_present/3
, ensure_subscription_absent/2
]).
-export_type([ config/0
, batch/0
, ack_ref/0
]).
-type id() :: atom() | string() | pid(). -type id() :: atom() | string() | pid().
-type qos() :: emqx_mqtt_types:qos(). -type qos() :: emqx_mqtt_types:qos().
@ -552,3 +574,4 @@ name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])).
id(Pid) when is_pid(Pid) -> Pid; id(Pid) when is_pid(Pid) -> Pid;
id(Name) -> name(Name). id(Name) -> name(Name).

View File

@ -69,3 +69,4 @@ obfuscate(Map) ->
is_sensitive(password) -> true; is_sensitive(password) -> true;
is_sensitive(_) -> false. is_sensitive(_) -> false.

View File

@ -15,17 +15,18 @@
%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol %% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol
-module(emqx_bridge_mqtt). -module(emqx_bridge_mqtt).
-behaviour(emqx_bridge_connect). -behaviour(emqx_bridge_connect).
%% behaviour callbacks %% behaviour callbacks
-export([start/1, -export([ start/1
send/2, , send/2
stop/2 , stop/2
]). ]).
%% optional behaviour callbacks %% optional behaviour callbacks
-export([ensure_subscribed/3, -export([ ensure_subscribed/3
ensure_unsubscribed/2 , ensure_unsubscribed/2
]). ]).
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
@ -39,6 +40,10 @@
-define(ACKED(AnyPktId), {acked, AnyPktId}). -define(ACKED(AnyPktId), {acked, AnyPktId}).
-define(STOP(Ref), {stop, Ref}). -define(STOP(Ref), {stop, Ref}).
%%------------------------------------------------------------------------------
%% emqx_bridge_connect callbacks
%%------------------------------------------------------------------------------
start(Config = #{address := Address}) -> start(Config = #{address := Address}) ->
Ref = make_ref(), Ref = make_ref(),
Parent = self(), Parent = self(),
@ -183,3 +188,4 @@ subscribe_remote_topics(ClientPid, Subscriptions) ->
Error -> throw(Error) Error -> throw(Error)
end end
end, Subscriptions). end, Subscriptions).

View File

@ -18,9 +18,9 @@
-behaviour(emqx_bridge_connect). -behaviour(emqx_bridge_connect).
%% behaviour callbacks %% behaviour callbacks
-export([start/1, -export([ start/1
send/2, , send/2
stop/2 , stop/2
]). ]).
%% Internal exports %% Internal exports

View File

@ -17,8 +17,17 @@
-include("logger.hrl"). -include("logger.hrl").
-export([start_link/0, start_link/1, bridges/0]). %% APIs
-export([create_bridge/2, drop_bridge/1]). -export([ start_link/0
, start_link/1
, bridges/0
]).
-export([ create_bridge/2
, drop_bridge/1
]).
%% supervisor callbacks
-export([init/1]). -export([init/1]).
-define(SUP, ?MODULE). -define(SUP, ?MODULE).
@ -60,3 +69,4 @@ drop_bridge(Id) ->
?LOG(error, "[Bridge] Delete bridge failed", [Error]), ?LOG(error, "[Bridge] Delete bridge failed", [Error]),
Error Error
end. end.

View File

@ -21,21 +21,46 @@
-include("types.hrl"). -include("types.hrl").
-export([start_link/2]). -export([start_link/2]).
-export([subscribe/1, subscribe/2, subscribe/3]).
%% PubSub
-export([ subscribe/1
, subscribe/2
, subscribe/3
]).
-export([unsubscribe/1]). -export([unsubscribe/1]).
-export([subscriber_down/1]). -export([subscriber_down/1]).
-export([publish/1, safe_publish/1]).
-export([ publish/1
, safe_publish/1
]).
-export([dispatch/2]). -export([dispatch/2]).
-export([subscriptions/1, subscribers/1, subscribed/2]).
-export([get_subopts/2, set_subopts/2]). %% PubSub Infos
-export([ subscriptions/1
, subscribers/1
, subscribed/2
]).
-export([ get_subopts/2
, set_subopts/2
]).
-export([topics/0]). -export([topics/0]).
%% Stats fun %% Stats fun
-export([stats_fun/0]). -export([stats_fun/0]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-import(emqx_tables, [lookup_value/2, lookup_value/3]). -import(emqx_tables, [lookup_value/2, lookup_value/3]).

View File

@ -20,13 +20,24 @@
-include("types.hrl"). -include("types.hrl").
-export([start_link/0]). -export([start_link/0]).
-export([register_sub/2]).
-export([lookup_subid/1, lookup_subpid/1]).
-export([get_sub_shard/2]).
-export([create_seq/1, reclaim_seq/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, %% APIs
code_change/3]). -export([ register_sub/2
, lookup_subid/1
, lookup_subpid/1
, get_sub_shard/2
, create_seq/1
, reclaim_seq/1
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(HELPER, ?MODULE). -define(HELPER, ?MODULE).
-define(SUBID, emqx_subid). -define(SUBID, emqx_subid).

View File

@ -14,7 +14,11 @@
-module(emqx_cli). -module(emqx_cli).
-export([print/1, print/2, usage/1, usage/2]). -export([ print/1
, print/2
, usage/1
, usage/2
]).
print(Msg) -> print(Msg) ->
io:format(Msg), lists:flatten(io_lib:format("~p", [Msg])). io:format(Msg), lists:flatten(io_lib:format("~p", [Msg])).

View File

@ -20,27 +20,71 @@
-include("emqx_client.hrl"). -include("emqx_client.hrl").
-export([start_link/0, start_link/1]). -export([start_link/0, start_link/1]).
-export([connect/1]).
-export([subscribe/2, subscribe/3, subscribe/4]). -export([ connect/1
-export([publish/2, publish/3, publish/4, publish/5]). , disconnect/1
-export([unsubscribe/2, unsubscribe/3]). , disconnect/2
, disconnect/3
]).
-export([ping/1]). -export([ping/1]).
-export([disconnect/1, disconnect/2, disconnect/3]).
-export([puback/2, puback/3, puback/4]). %% PubSub
-export([pubrec/2, pubrec/3, pubrec/4]). -export([ subscribe/2
-export([pubrel/2, pubrel/3, pubrel/4]). , subscribe/3
-export([pubcomp/2, pubcomp/3, pubcomp/4]). , subscribe/4
, publish/2
, publish/3
, publish/4
, publish/5
, unsubscribe/2
, unsubscribe/3
]).
%% Puback...
-export([ puback/2
, puback/3
, puback/4
, pubrec/2
, pubrec/3
, pubrec/4
, pubrel/2
, pubrel/3
, pubrel/4
, pubcomp/2
, pubcomp/3
, pubcomp/4
]).
-export([subscriptions/1]). -export([subscriptions/1]).
-export([info/1, stop/1]). -export([info/1, stop/1]).
%% For test cases %% For test cases
-export([pause/1, resume/1]). -export([pause/1, resume/1]).
-export([initialized/3, waiting_for_connack/3, connected/3, inflight_full/3]). -export([ initialized/3
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). , waiting_for_connack/3
, connected/3
, inflight_full/3
]).
-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, mqtt_msg/0]). -export([ init/1
, callback_mode/0
, handle_event/4
, terminate/3
, code_change/4
]).
-export_type([host/0, option/0]). -export_type([ host/0
, client/0
, option/0
, properties/0
, payload/0
, pubopt/0
, subopt/0
, mqtt_msg/0
]).
%% Default timeout %% Default timeout
-define(DEFAULT_KEEPALIVE, 60000). -define(DEFAULT_KEEPALIVE, 60000).
@ -1186,3 +1230,4 @@ bump_last_packet_id(State = #state{last_packet_id = Id}) ->
-spec next_packet_id(packet_id()) -> packet_id(). -spec next_packet_id(packet_id()) -> packet_id().
next_packet_id(?MAX_PACKET_ID) -> 1; next_packet_id(?MAX_PACKET_ID) -> 1;
next_packet_id(Id) -> Id + 1. next_packet_id(Id) -> Id + 1.

View File

@ -14,9 +14,15 @@
-module(emqx_client_sock). -module(emqx_client_sock).
-export([connect/4, send/2, close/1]). -export([ connect/4
, send/2
, close/1
]).
-export([sockname/1, setopts/2, getstat/2]). -export([ sockname/1
, setopts/2
, getstat/2
]).
-record(ssl_socket, {tcp, ssl}). -record(ssl_socket, {tcp, ssl}).

View File

@ -22,17 +22,34 @@
-export([start_link/0]). -export([start_link/0]).
-export([register_connection/1, register_connection/2]). -export([ register_connection/1
-export([unregister_connection/1, unregister_connection/2]). , register_connection/2
-export([get_conn_attrs/1, get_conn_attrs/2]). , unregister_connection/1
-export([set_conn_attrs/2, set_conn_attrs/3]). , unregister_connection/2
-export([get_conn_stats/1, get_conn_stats/2]). ]).
-export([set_conn_stats/2, set_conn_stats/3]).
-export([ get_conn_attrs/1
, get_conn_attrs/2
, set_conn_attrs/2
, set_conn_attrs/3
]).
-export([ get_conn_stats/1
, get_conn_stats/2
, set_conn_stats/2
, set_conn_stats/3
]).
-export([lookup_conn_pid/1]). -export([lookup_conn_pid/1]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
%% internal export %% internal export
-export([stats_fun/0]). -export([stats_fun/0]).

View File

@ -23,11 +23,20 @@
-module(emqx_config). -module(emqx_config).
-export([get_env/1, get_env/2]).
-export([populate/1]). -export([populate/1]).
-export([read/1, write/2, dump/2, reload/1, get/2, get/3, set/3]). -export([ read/1
, write/2
, dump/2
, reload/1
]).
-export([ set/3
, get/2
, get/3
, get_env/1
, get_env/2
]).
-type(env() :: {atom(), term()}). -type(env() :: {atom(), term()}).

View File

@ -21,15 +21,28 @@
-include("logger.hrl"). -include("logger.hrl").
-export([start_link/3]). -export([start_link/3]).
%% APIs
-export([info/1]). -export([info/1]).
-export([attrs/1]). -export([attrs/1]).
-export([stats/1]). -export([stats/1]).
-export([kick/1]). -export([kick/1]).
-export([session/1]). -export([session/1]).
%% gen_statem callbacks %% gen_statem callbacks
-export([idle/3, connected/3]). -export([ idle/3
-export([init/1, callback_mode/0, code_change/4, terminate/3]). , connected/3
]).
-export([ init/1
, callback_mode/0
, code_change/4
, terminate/3
]).
-record(state, { -record(state, {
transport, transport,

View File

@ -19,11 +19,25 @@
-include("logger.hrl"). -include("logger.hrl").
-export([start_link/0]). -export([start_link/0]).
-export([register_command/2, register_command/3, unregister_command/1]).
-export([run_command/1, run_command/2, lookup_command/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ register_command/2
code_change/3]). , register_command/3
, unregister_command/1
]).
-export([ run_command/1
, run_command/2
, lookup_command/1
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(state, {seq = 0}). -record(state, {seq = 0}).

View File

@ -23,10 +23,18 @@
-export([start_link/0]). -export([start_link/0]).
-export([is_banned/1, banned/1]). -export([ is_banned/1
, banned/1
]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, %% gen_server callbacks
terminate/2, code_change/3]). -export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).

View File

@ -17,9 +17,14 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([initial_state/0, initial_state/1]). -export([ initial_state/0
-export([parse/2]). , initial_state/1
-export([serialize/1, serialize/2]). ]).
-export([ parse/2
, serialize/1
, serialize/2
]).
-type(options() :: #{max_packet_size => 1..?MAX_PACKET_SIZE, -type(options() :: #{max_packet_size => 1..?MAX_PACKET_SIZE,
version => emqx_mqtt_types:version()}). version => emqx_mqtt_types:version()}).

View File

@ -23,7 +23,11 @@
-include("types.hrl"). -include("types.hrl").
-export([init/1, run/3, info/1, reset/1]). -export([ init/1
, run/3
, info/1
, reset/1
]).
-type(opts() :: #{count => integer(), -type(opts() :: #{count => integer(),
bytes => integer()}). bytes => integer()}).

View File

@ -28,7 +28,14 @@
-module(emqx_guid). -module(emqx_guid).
-export([gen/0, new/0, timestamp/1, to_hexstr/1, from_hexstr/1, to_base62/1, from_base62/1]). -export([ gen/0
, new/0
, timestamp/1
, to_hexstr/1
, from_hexstr/1
, to_base62/1
, from_base62/1
]).
-define(MAX_SEQ, 16#FFFF). -define(MAX_SEQ, 16#FFFF).

View File

@ -22,11 +22,23 @@
-export([start_link/0, stop/0]). -export([start_link/0, stop/0]).
%% Hooks API %% Hooks API
-export([add/2, add/3, add/4, del/2, run/2, run_fold/3, lookup/1]). -export([ add/2
, add/3
, add/4
, del/2
, run/2
, run_fold/3
, lookup/1
]).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
%% Multiple callbacks can be registered on a hookpoint. %% Multiple callbacks can be registered on a hookpoint.
%% The execution order depends on the priority value: %% The execution order depends on the priority value:

View File

@ -14,11 +14,27 @@
-module(emqx_inflight). -module(emqx_inflight).
-export([new/1, contain/2, lookup/2, insert/3, update/3, update_size/2, delete/2, %% APIs
values/1, to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]). -export([ new/1
, contain/2
, lookup/2
, insert/3
, update/3
, update_size/2
, delete/2
, values/1
, to_list/1
, size/1
, max_size/1
, is_full/1
, is_empty/1
, window/1
]).
-type(key() :: term()). -type(key() :: term()).
-type(max_size() :: pos_integer()). -type(max_size() :: pos_integer()).
-opaque(inflight() :: {?MODULE, max_size(), gb_trees:tree()}). -opaque(inflight() :: {?MODULE, max_size(), gb_trees:tree()}).
-define(Inflight(Tree), {?MODULE, _MaxSize, Tree}). -define(Inflight(Tree), {?MODULE, _MaxSize, Tree}).
@ -26,6 +42,10 @@
-export_type([inflight/0]). -export_type([inflight/0]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec(new(non_neg_integer()) -> inflight()). -spec(new(non_neg_integer()) -> inflight()).
new(MaxSize) when MaxSize >= 0 -> new(MaxSize) when MaxSize >= 0 ->
{?MODULE, MaxSize, gb_trees:empty()}. {?MODULE, MaxSize, gb_trees:empty()}.

View File

@ -14,8 +14,17 @@
-module(emqx_json). -module(emqx_json).
-export([encode/1, encode/2, safe_encode/1, safe_encode/2]). -export([ encode/1
-export([decode/1, decode/2, safe_decode/1, safe_decode/2]). , encode/2
, safe_encode/1
, safe_encode/2
]).
-export([ decode/1
, decode/2
, safe_decode/1
, safe_decode/2
]).
-spec(encode(jsx:json_term()) -> jsx:json_text()). -spec(encode(jsx:json_term()) -> jsx:json_text()).
encode(Term) -> encode(Term) ->

View File

@ -14,7 +14,11 @@
-module(emqx_keepalive). -module(emqx_keepalive).
-export([start/3, check/1, cancel/1]). %% APIs
-export([ start/3
, check/1
, cancel/1
]).
-record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}). -record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}).
@ -22,6 +26,10 @@
-export_type([keepalive/0]). -export_type([keepalive/0]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
%% @doc Start a keepalive %% @doc Start a keepalive
-spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}). -spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}).
start(_, 0, _) -> start(_, 0, _) ->

View File

@ -17,13 +17,26 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([start/0, restart/0, stop/0]). %% APIs
-export([start_listener/1, start_listener/3]). -export([ start/0
-export([restart_listener/1, restart_listener/3]). , restart/0
-export([stop_listener/1, stop_listener/3]). , stop/0
]).
-export([ start_listener/1
, start_listener/3
, stop_listener/1
, stop_listener/3
, restart_listener/1
, restart_listener/3
]).
-type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}). -type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
%% @doc Start all listeners. %% @doc Start all listeners.
-spec(start() -> ok). -spec(start() -> ok).
start() -> start() ->

View File

@ -16,18 +16,41 @@
-compile({no_auto_import,[error/1]}). -compile({no_auto_import,[error/1]}).
-export([debug/1, debug/2, debug/3]). %% Logs
-export([info/1, info/2, info/3]). -export([ debug/1
-export([warning/1, warning/2, warning/3]). , debug/2
-export([error/1, error/2, error/3]). , debug/3
-export([critical/1, critical/2, critical/3]). , info/1
, info/2
, info/3
, warning/1
, warning/2
, warning/3
, error/1
, error/2
, error/3
, critical/1
, critical/2
, critical/3
]).
-export([set_metadata_peername/1, set_metadata_client_id/1]). %% Configs
-export([set_proc_metadata/1]). -export([ set_metadata_peername/1
, set_metadata_client_id/1
, set_proc_metadata/1
, set_primary_log_level/1
, set_log_handler_level/2
, set_log_level/1
]).
-export([get_primary_log_level/0, set_primary_log_level/1]). -export([ get_primary_log_level/0
-export([get_log_handlers/0, get_log_handler/1, set_log_handler_level/2]). , get_log_handlers/0
-export([set_log_level/1]). , get_log_handler/1
]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
debug(Msg) -> debug(Msg) ->
logger:debug(Msg). logger:debug(Msg).
@ -97,9 +120,10 @@ set_log_level(Level) ->
{error, Error} -> {error, {primary_logger_level, Error}} {error, Error} -> {error, {primary_logger_level, Error}}
end. end.
%%======================== %%------------------------------------------------------------------------------
%% Internal Functions %% Internal Functions
%%======================== %%------------------------------------------------------------------------------
log_hanlder_info(#{id := Id, level := Level, module := logger_std_h, log_hanlder_info(#{id := Id, level := Level, module := logger_std_h,
config := #{type := Type}}) when Type =:= standard_io; config := #{type := Type}}) when Type =:= standard_io;
Type =:= standard_error -> Type =:= standard_error ->
@ -135,3 +159,4 @@ rollback([{ID, Level} | List]) ->
emqx_logger:set_log_handler_level(ID, Level), emqx_logger:set_log_handler_level(ID, Level),
rollback(List); rollback(List);
rollback([]) -> ok. rollback([]) -> ok.

View File

@ -24,6 +24,7 @@
-module(emqx_logger_formatter). -module(emqx_logger_formatter).
-export([format/2]). -export([format/2]).
-export([check_config/1]). -export([check_config/1]).
-define(DEFAULT_FORMAT_TEMPLATE_SINGLE, [time," ",level,": ",msg,"\n"]). -define(DEFAULT_FORMAT_TEMPLATE_SINGLE, [time," ",level,": ",msg,"\n"]).

View File

@ -15,6 +15,7 @@
-module(emqx_logger_handler). -module(emqx_logger_handler).
-export([log/2]). -export([log/2]).
-export([init/0]). -export([init/0]).
init() -> init() ->

View File

@ -17,13 +17,35 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([make/2, make/3, make/4]). -export([ make/2
-export([set_flags/2]). , make/3
-export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]). , make/4
-export([set_headers/2]). ]).
-export([get_header/2, get_header/3, set_header/3]).
-export([is_expired/1, update_expiry/1]). -export([ get_flag/2
-export([remove_topic_alias/1]). , get_flag/3
, set_flag/2
, set_flag/3
, set_flags/2
, unset_flag/2
]).
-export([ get_headers/1
, get_header/2
, get_header/3
, set_header/3
, set_headers/2
, remove_header/2
]).
-export([ is_expired/1
, update_expiry/1
]).
-export([ to_map/1
, to_list/1
]).
-export([format/1]). -export([format/1]).
-type(flag() :: atom()). -type(flag() :: atom()).
@ -40,13 +62,13 @@ make(From, Topic, Payload) ->
-spec(make(atom() | emqx_types:client_id(), emqx_mqtt_types:qos(), -spec(make(atom() | emqx_types:client_id(), emqx_mqtt_types:qos(),
emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()).
make(From, QoS, Topic, Payload) -> make(From, QoS, Topic, Payload) ->
#message{id = emqx_guid:gen(), #message{id = emqx_guid:gen(),
qos = QoS, qos = QoS,
from = From, from = From,
flags = #{dup => false}, flags = #{dup => false},
topic = Topic, topic = Topic,
payload = Payload, payload = Payload,
timestamp = os:timestamp()}. timestamp = os:timestamp()}.
-spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()). -spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()).
set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) -> set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) ->
@ -88,6 +110,10 @@ set_headers(New, Msg = #message{headers = Old}) when is_map(New) ->
Msg#message{headers = maps:merge(Old, New)}; Msg#message{headers = maps:merge(Old, New)};
set_headers(undefined, Msg) -> Msg. set_headers(undefined, Msg) -> Msg.
-spec(get_headers(emqx_types:message()) -> map()).
get_headers(Msg) ->
Msg#message.headers.
-spec(get_header(term(), emqx_types:message()) -> term()). -spec(get_header(term(), emqx_types:message()) -> term()).
get_header(Hdr, Msg) -> get_header(Hdr, Msg) ->
get_header(Hdr, Msg, undefined). get_header(Hdr, Msg, undefined).
@ -101,14 +127,24 @@ set_header(Hdr, Val, Msg = #message{headers = undefined}) ->
set_header(Hdr, Val, Msg = #message{headers = Headers}) -> set_header(Hdr, Val, Msg = #message{headers = Headers}) ->
Msg#message{headers = maps:put(Hdr, Val, Headers)}. Msg#message{headers = maps:put(Hdr, Val, Headers)}.
-spec(remove_header(term(), emqx_types:message()) -> emqx_types:message()).
remove_header(Hdr, Msg = #message{headers = Headers}) ->
case maps:is_key(Hdr, Headers) of
true ->
Msg#message{headers = maps:remove(Hdr, Headers)};
false -> Msg
end.
-spec(is_expired(emqx_types:message()) -> boolean()). -spec(is_expired(emqx_types:message()) -> boolean()).
is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval},
timestamp = CreatedAt}) ->
elapsed(CreatedAt) > timer:seconds(Interval); elapsed(CreatedAt) > timer:seconds(Interval);
is_expired(_Msg) -> is_expired(_Msg) ->
false. false.
-spec(update_expiry(emqx_types:message()) -> emqx_types:message()). -spec(update_expiry(emqx_types:message()) -> emqx_types:message()).
update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval},
timestamp = CreatedAt}) ->
case elapsed(CreatedAt) of case elapsed(CreatedAt) of
Elapsed when Elapsed > 0 -> Elapsed when Elapsed > 0 ->
set_header('Message-Expiry-Interval', max(1, Interval - (Elapsed div 1000)), Msg); set_header('Message-Expiry-Interval', max(1, Interval - (Elapsed div 1000)), Msg);
@ -116,8 +152,15 @@ update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval},
end; end;
update_expiry(Msg) -> Msg. update_expiry(Msg) -> Msg.
remove_topic_alias(Msg = #message{headers = Headers}) -> %% @doc Message to map
Msg#message{headers = maps:remove('Topic-Alias', Headers)}. -spec(to_map(emqx_types:message()) -> map()).
to_map(Msg) ->
maps:from_list(to_list(Msg)).
%% @doc Message to tuple list
-spec(to_list(emqx_types:message()) -> map()).
to_list(Msg) ->
lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))).
%% MilliSeconds %% MilliSeconds
elapsed(Since) -> elapsed(Since) ->
@ -133,3 +176,4 @@ format(flags, Flags) ->
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
format(headers, Headers) -> format(headers, Headers) ->
io_lib:format("~p", [Headers]). io_lib:format("~p", [Headers]).

View File

@ -14,20 +14,46 @@
-module(emqx_metrics). -module(emqx_metrics).
-behavior(gen_server).
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([start_link/0]). -export([start_link/0]).
-export([new/1, all/0]).
-export([val/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2]). -export([ new/1
-export([trans/2, trans/3, trans/4, commit/0]). , all/0
]).
-export([ val/1
, inc/1
, inc/2
, inc/3
, dec/2
, dec/3
, set/2
]).
-export([ trans/2
, trans/3
, trans/4
, commit/0
]).
%% Received/sent metrics %% Received/sent metrics
-export([received/1, sent/1]). -export([ received/1
, sent/1
]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
%% Bytes sent and received of Broker %% Bytes sent and received of Broker
-define(BYTES_METRICS, [ -define(BYTES_METRICS, [

View File

@ -14,10 +14,18 @@
-module(emqx_misc). -module(emqx_misc).
-export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1, -export([ merge_opts/2
proc_name/2, proc_stats/0, proc_stats/1]). , start_timer/2
, start_timer/3
, cancel_timer/1
, proc_name/2
, proc_stats/0
, proc_stats/1
]).
-export([init_proc_mng_policy/1, conn_proc_mng_policy/1]). -export([ init_proc_mng_policy/1
, conn_proc_mng_policy/1
]).
-export([drain_down/1]). -export([drain_down/1]).

View File

@ -19,17 +19,20 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include("logger.hrl"). -include("logger.hrl").
-export([load/1, unload/1]). %% APIs
-export([ all_rules/0
, check_acl/5
, reload_acl/0
]).
-export([all_rules/0]). %% emqx_gen_mod callbacks
-export([ load/1
, unload/1
]).
-export([check_acl/5, reload_acl/0]). -define(MFA(M, F, A), {M, F, A}).
-define(ACL_RULE_TAB, emqx_acl_rule). -type(acl_rules() :: #{publish => [emqx_access_rule:rule()],
-define(FUNC(M, F, A), {M, F, A}).
-type(acl_rules() :: #{publish => [emqx_access_rule:rule()],
subscribe => [emqx_access_rule:rule()]}). subscribe => [emqx_access_rule:rule()]}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -37,28 +40,64 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
load(_Env) -> load(_Env) ->
Rules = load_rules_from_file(acl_file()), Rules = rules_from_file(acl_file()),
emqx_hooks:add('client.check_acl', ?FUNC(?MODULE, check_acl, [Rules]), -1). emqx_hooks:add('client.check_acl', ?MFA(?MODULE, check_acl, [Rules]), -1).
unload(_Env) -> unload(_Env) ->
Rules = load_rules_from_file(acl_file()), Rules = rules_from_file(acl_file()),
emqx_hooks:del('client.check_acl', ?FUNC(?MODULE, check_acl, [Rules])). emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])).
%% @doc Read all rules %% @doc Read all rules
-spec(all_rules() -> list(emqx_access_rule:rule())). -spec(all_rules() -> list(emqx_access_rule:rule())).
all_rules() -> all_rules() ->
load_rules_from_file(acl_file()). rules_from_file(acl_file()).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% ACL callbacks %% ACL callbacks
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
load_rules_from_file(AclFile) -> %% @doc Check ACL
-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic(),
emqx_access_rule:acl_result(), acl_rules())
-> {ok, allow} | {ok, deny} | ok).
check_acl(Credentials, PubSub, Topic, _AclResult, Rules) ->
case match(Credentials, Topic, lookup(PubSub, Rules)) of
{matched, allow} -> {ok, allow};
{matched, deny} -> {ok, deny};
nomatch -> ok
end.
-spec(reload_acl() -> ok | {error, term()}).
reload_acl() ->
unload([]), load([]).
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
acl_file() ->
emqx_config:get_env(acl_file).
lookup(PubSub, Rules) ->
maps:get(PubSub, Rules, []).
match(_Credentials, _Topic, []) ->
nomatch;
match(Credentials, Topic, [Rule|Rules]) ->
case emqx_access_rule:match(Credentials, Topic, Rule) of
nomatch ->
match(Credentials, Topic, Rules);
{matched, AllowDeny} ->
{matched, AllowDeny}
end.
-spec(rules_from_file(file:filename()) -> map()).
rules_from_file(AclFile) ->
case file:consult(AclFile) of case file:consult(AclFile) of
{ok, Terms} -> {ok, Terms} ->
Rules = [emqx_access_rule:compile(Term) || Term <- Terms], Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
#{publish => lists:filter(fun(Rule) -> filter(publish, Rule) end, Rules), #{publish => [Rule || Rule <- Rules, filter(publish, Rule)],
subscribe => lists:filter(fun(Rule) -> filter(subscribe, Rule) end, Rules)}; subscribe => [Rule || Rule <- Rules, filter(subscribe, Rule)]};
{error, Reason} -> {error, Reason} ->
?LOG(error, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]), ?LOG(error, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]),
#{} #{}
@ -77,43 +116,3 @@ filter(subscribe, {_AllowDeny, _Who, subscribe, _Topics}) ->
filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
false. false.
%% @doc Check ACL
-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic(),
emqx_access_rule:acl_result(), acl_rules())
-> {ok, allow} | {ok, deny} | ok).
check_acl(Credentials, PubSub, Topic, _AclResult, Rules) ->
case match(Credentials, Topic, lookup(PubSub, Rules)) of
{matched, allow} -> {ok, allow};
{matched, deny} -> {ok, deny};
nomatch -> ok
end.
lookup(PubSub, Rules) ->
maps:get(PubSub, Rules, []).
match(_Credentials, _Topic, []) ->
nomatch;
match(Credentials, Topic, [Rule|Rules]) ->
case emqx_access_rule:match(Credentials, Topic, Rule) of
nomatch ->
match(Credentials, Topic, Rules);
{matched, AllowDeny} ->
{matched, AllowDeny}
end.
-spec(reload_acl() -> ok | {error, term()}).
reload_acl() ->
try load_rules_from_file(acl_file()) of
_ ->
emqx_logger:info("Reload acl_file ~s successfully", [acl_file()]),
ok;
{error, Error} ->
{error, Error}
catch
error:Reason:StackTrace ->
?LOG(error, "Reload acl failed. StackTrace: ~p", [StackTrace]),
{error, Reason}
end.
acl_file() ->
emqx_config:get_env(acl_file).

View File

@ -18,12 +18,22 @@
-include("emqx.hrl"). -include("emqx.hrl").
-export([load/1, unload/1]). %% APIs
-export([ on_client_connected/4
, on_client_disconnected/3
]).
-export([on_client_connected/4, on_client_disconnected/3]). %% emqx_gen_mod callbacks
-export([ load/1
, unload/1
]).
-define(ATTR_KEYS, [clean_start, proto_ver, proto_name, keepalive]). -define(ATTR_KEYS, [clean_start, proto_ver, proto_name, keepalive]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
load(Env) -> load(Env) ->
emqx_hooks:add('client.connected', fun ?MODULE:on_client_connected/4, [Env]), emqx_hooks:add('client.connected', fun ?MODULE:on_client_connected/4, [Env]),
emqx_hooks:add('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]). emqx_hooks:add('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]).

View File

@ -14,12 +14,21 @@
-module(emqx_mod_rewrite). -module(emqx_mod_rewrite).
-behavior(emqx_gen_mod).
-include_lib("emqx.hrl"). -include_lib("emqx.hrl").
-include_lib("emqx_mqtt.hrl"). -include_lib("emqx_mqtt.hrl").
-export([load/1, unload/1]). %% APIs
-export([ rewrite_subscribe/3
, rewrite_unsubscribe/3
, rewrite_publish/2
]).
-export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]). %% emqx_gen_mod callbacks
-export([ load/1
, unload/1
]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Load/Unload %% Load/Unload

View File

@ -19,11 +19,16 @@
-include_lib("emqx.hrl"). -include_lib("emqx.hrl").
-include_lib("emqx_mqtt.hrl"). -include_lib("emqx_mqtt.hrl").
-export([load/1, on_session_created/3, unload/1]). %% APIs
-export([on_session_created/3]).
%%-------------------------------------------------------------------- %% emqx_gen_mod callbacks
-export([ load/1
, unload/1
]).
%%------------------------------------------------------------------------------
%% Load/Unload Hook %% Load/Unload Hook
%%-------------------------------------------------------------------- %%------------------------------------------------------------------------------
load(Topics) -> load(Topics) ->
emqx_hooks:add('session.created', fun ?MODULE:on_session_created/3, [Topics]). emqx_hooks:add('session.created', fun ?MODULE:on_session_created/3, [Topics]).
@ -38,9 +43,9 @@ on_session_created(#{client_id := ClientId}, SessAttrs, Topics) ->
unload(_) -> unload(_) ->
emqx_hooks:del('session.created', fun ?MODULE:on_session_created/3). emqx_hooks:del('session.created', fun ?MODULE:on_session_created/3).
%%-------------------------------------------------------------------- %%------------------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%------------------------------------------------------------------------------
rep(<<"%c">>, ClientId, Topic) -> rep(<<"%c">>, ClientId, Topic) ->
emqx_topic:feed_var(<<"%c">>, ClientId, Topic); emqx_topic:feed_var(<<"%c">>, ClientId, Topic);

View File

@ -16,7 +16,12 @@
-behaviour(supervisor). -behaviour(supervisor).
-export([start_link/0, start_child/1, start_child/2, stop_child/1]). -export([ start_link/0
, start_child/1
, start_child/2
, stop_child/1
]).
-export([init/1]). -export([init/1]).
%% Helper macro for declaring children of supervisor %% Helper macro for declaring children of supervisor

View File

@ -14,7 +14,9 @@
-module(emqx_modules). -module(emqx_modules).
-export([load/0, unload/0]). -export([ load/0
, unload/0
]).
-spec(load() -> ok). -spec(load() -> ok).
load() -> load() ->

View File

@ -17,12 +17,20 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include("logger.hrl"). -include("logger.hrl").
-export([mount/2, unmount/2]). -export([ mount/2
, unmount/2
]).
-export([replvar/2]). -export([replvar/2]).
-type(mountpoint() :: binary()). -type(mountpoint() :: binary()).
-export_type([mountpoint/0]). -export_type([mountpoint/0]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
mount(undefined, Any) -> mount(undefined, Any) ->
Any; Any;
mount(MountPoint, Msg = #message{topic = Topic}) -> mount(MountPoint, Msg = #message{topic = Topic}) ->
@ -53,3 +61,4 @@ feed_var({<<"%u">>, undefined}, MountPoint) ->
MountPoint; MountPoint;
feed_var({<<"%u">>, Username}, MountPoint) -> feed_var({<<"%u">>, Username}, MountPoint) ->
emqx_topic:feed_var(<<"%u">>, Username, MountPoint). emqx_topic:feed_var(<<"%u">>, Username, MountPoint).

View File

@ -18,8 +18,11 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([check_pub/2, check_sub/2]). -export([ check_pub/2
-export([get_caps/1, get_caps/2]). , check_sub/2
, get_caps/1
, get_caps/2
]).
-type(caps() :: #{max_packet_size => integer(), -type(caps() :: #{max_packet_size => integer(),
max_clientid_len => integer(), max_clientid_len => integer(),

View File

@ -17,7 +17,11 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([id/1, name/1, filter/2, validate/1]). -export([ id/1
, name/1
, filter/2
, validate/1
]).
-define(PROPS_TABLE, -define(PROPS_TABLE,
#{16#01 => {'Payload-Format-Indicator', 'Byte', [?PUBLISH]}, #{16#01 => {'Payload-Format-Indicator', 'Byte', [?PUBLISH]},

View File

@ -40,3 +40,4 @@
}). }).
-type(topic_filters() :: [{emqx_topic:topic(), subopts()}]). -type(topic_filters() :: [{emqx_topic:topic(), subopts()}]).
-type(packet() :: #mqtt_packet{}). -type(packet() :: #mqtt_packet{}).

View File

@ -50,10 +50,15 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([init/1]). -export([init/1]).
-export([is_empty/1]).
-export([len/1, max_len/1]). -export([ is_empty/1
-export([in/2, out/1]). , len/1
-export([stats/1, dropped/1]). , max_len/1
, in/2
, out/1
, stats/1
, dropped/1
]).
-export_type([mqueue/0, options/0]). -export_type([mqueue/0, options/0]).

View File

@ -20,31 +20,34 @@
-export([start_link/1]). -export([start_link/1]).
-export([init/1, %% gen_server callbacks
handle_call/3, -export([ init/1
handle_cast/2, , handle_call/3
handle_info/2, , handle_cast/2
terminate/2, , handle_info/2
code_change/3]). , terminate/2
, code_change/3
]).
-export([get_cpu_check_interval/0, -export([ get_cpu_check_interval/0
set_cpu_check_interval/1, , set_cpu_check_interval/1
get_cpu_high_watermark/0, , get_cpu_high_watermark/0
set_cpu_high_watermark/1, , set_cpu_high_watermark/1
get_cpu_low_watermark/0, , get_cpu_low_watermark/0
set_cpu_low_watermark/1, , set_cpu_low_watermark/1
get_mem_check_interval/0, , get_mem_check_interval/0
set_mem_check_interval/1, , set_mem_check_interval/1
get_sysmem_high_watermark/0, , get_sysmem_high_watermark/0
set_sysmem_high_watermark/1, , set_sysmem_high_watermark/1
get_procmem_high_watermark/0, , get_procmem_high_watermark/0
set_procmem_high_watermark/1]). , set_procmem_high_watermark/1
]).
-define(OS_MON, ?MODULE). -define(OS_MON, ?MODULE).
%%---------------------------------------------------------------------- %%------------------------------------------------------------------------------
%% API %% API
%%---------------------------------------------------------------------- %%------------------------------------------------------------------------------
start_link(Opts) -> start_link(Opts) ->
gen_server:start_link({local, ?OS_MON}, ?MODULE, [Opts], []). gen_server:start_link({local, ?OS_MON}, ?MODULE, [Opts], []).
@ -85,9 +88,9 @@ get_procmem_high_watermark() ->
set_procmem_high_watermark(Float) -> set_procmem_high_watermark(Float) ->
memsup:set_procmem_high_watermark(Float). memsup:set_procmem_high_watermark(Float).
%%---------------------------------------------------------------------- %%------------------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%---------------------------------------------------------------------- %%------------------------------------------------------------------------------
init([Opts]) -> init([Opts]) ->
_ = cpu_sup:util(), _ = cpu_sup:util(),
@ -148,11 +151,12 @@ terminate(_Reason, #{timer := Timer}) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
%%---------------------------------------------------------------------- %%------------------------------------------------------------------------------
%% Internal functions %% Internal functions
%%---------------------------------------------------------------------- %%------------------------------------------------------------------------------
call(Req) -> call(Req) ->
gen_server:call(?OS_MON, Req, infinity). gen_server:call(?OS_MON, Req, infinity).
ensure_check_timer(State = #{cpu_check_interval := Interval}) -> ensure_check_timer(State = #{cpu_check_interval := Interval}) ->
State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}.

View File

@ -17,12 +17,14 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([protocol_name/1]). -export([ protocol_name/1
-export([type_name/1]). , type_name/1
-export([validate/1]). , validate/1
-export([format/1]). , format/1
-export([to_message/2, from_message/2]). , to_message/2
-export([will_msg/1]). , from_message/2
, will_msg/1
]).
%% @doc Protocol name of version %% @doc Protocol name of version
-spec(protocol_name(emqx_mqtt_types:version()) -> binary()). -spec(protocol_name(emqx_mqtt_types:version()) -> binary()).
@ -254,3 +256,4 @@ format_password(_Password) -> '******'.
i(true) -> 1; i(true) -> 1;
i(false) -> 0; i(false) -> 0;
i(I) when is_integer(I) -> I. i(I) when is_integer(I) -> I.

View File

@ -17,7 +17,10 @@
-include("types.hrl"). -include("types.hrl").
-export([update_counter/2, get_counter/1, reset_counter/1]). -export([ update_counter/2
, get_counter/1
, reset_counter/1
]).
-type(key() :: term()). -type(key() :: term()).

View File

@ -18,15 +18,18 @@
-export([init/0]). -export([init/0]).
-export([load/0, unload/0]). -export([ load/0
, load/1
, unload/0
, unload/1
, list/0
, find_plugin/1
, load_expand_plugin/1
]).
-export([load/1, unload/1]). %%------------------------------------------------------------------------------
%% APIs
-export([list/0]). %%------------------------------------------------------------------------------
-export([find_plugin/1]).
-export([load_expand_plugin/1]).
%% @doc Init plugins' config %% @doc Init plugins' config
-spec(init() -> ok). -spec(init() -> ok).
@ -302,3 +305,4 @@ write_loaded(AppNames) ->
emqx_logger:error("Open File ~p Error: ~p", [File, Error]), emqx_logger:error("Open File ~p Error: ~p", [File, Error]),
{error, Error} {error, Error}
end. end.

View File

@ -17,15 +17,26 @@
-compile({no_auto_import, [monitor/3]}). -compile({no_auto_import, [monitor/3]}).
-export([new/0]). -export([new/0]).
-export([monitor/2, monitor/3]).
-export([demonitor/2]). -export([ monitor/2
-export([find/2]). , monitor/3
-export([erase/2, erase_all/2]). , demonitor/2
]).
-export([ find/2
, erase/2
, erase_all/2
]).
-export([count/1]). -export([count/1]).
-type(pmon() :: {?MODULE, map()}). -type(pmon() :: {?MODULE, map()}).
-export_type([pmon/0]). -export_type([pmon/0]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec(new() -> pmon()). -spec(new() -> pmon()).
new() -> new() ->
{?MODULE, maps:new()}. {?MODULE, maps:new()}.

View File

@ -19,21 +19,36 @@
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
%% APIs
-export([start_link/2]). -export([start_link/2]).
-export([submit/1, submit/2]).
-export([async_submit/1, async_submit/2]). -export([ submit/1
, submit/2
, async_submit/1
, async_submit/2
]).
-ifdef(TEST). -ifdef(TEST).
-export([worker/0]). -export([worker/0]).
-endif. -endif.
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(POOL, ?MODULE). -define(POOL, ?MODULE).
-type(task() :: fun() | mfa() | {fun(), Args :: list(any())}). -type(task() :: fun() | mfa() | {fun(), Args :: list(any())}).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
%% @doc Start pool. %% @doc Start pool.
-spec(start_link(atom(), pos_integer()) -> startlink_ret()). -spec(start_link(atom(), pos_integer()) -> startlink_ret()).
start_link(Pool, Id) -> start_link(Pool, Id) ->

View File

@ -17,7 +17,11 @@
-behaviour(supervisor). -behaviour(supervisor).
-export([spec/1, spec/2]). -export([spec/1, spec/2]).
-export([start_link/0, start_link/3, start_link/4]).
-export([ start_link/0
, start_link/3
, start_link/4
]).
-export([init/1]). -export([init/1]).

View File

@ -39,8 +39,23 @@
-module(emqx_pqueue). -module(emqx_pqueue).
-export([new/0, is_queue/1, is_empty/1, len/1, plen/2, to_list/1, from_list/1, -export([ new/0
in/2, in/3, out/1, out/2, out_p/1, join/2, filter/2, fold/3, highest/1]). , is_queue/1
, is_empty/1
, len/1
, plen/2
, to_list/1
, from_list/1
, in/2
, in/3
, out/1
, out/2
, out_p/1
, join/2
, filter/2
, fold/3
, highest/1
]).
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------

View File

@ -18,21 +18,22 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("logger.hrl"). -include("logger.hrl").
-export([init/2]). -export([ init/2
-export([info/1]). , info/1
-export([attrs/1]). , attrs/1
-export([attr/2]). , attr/2
-export([caps/1]). , caps/1
-export([stats/1]). , stats/1
-export([client_id/1]). , client_id/1
-export([credentials/1]). , credentials/1
-export([parser/1]). , parser/1
-export([session/1]). , session/1
-export([received/2]). , received/2
-export([process/2]). , process/2
-export([deliver/2]). , deliver/2
-export([send/2]). , send/2
-export([terminate/2]). , terminate/2
]).
-export_type([state/0]). -export_type([state/0]).
@ -654,7 +655,7 @@ deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) ->
Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg), Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg),
Msg1 = emqx_message:update_expiry(Msg0), Msg1 = emqx_message:update_expiry(Msg0),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
send(emqx_packet:from_message(PacketId, emqx_message:remove_topic_alias(Msg2)), PState); send(emqx_packet:from_message(PacketId, Msg2), PState);
deliver({puback, PacketId, ReasonCode}, PState) -> deliver({puback, PacketId, ReasonCode}, PState) ->
send(?PUBACK_PACKET(PacketId, ReasonCode), PState); send(?PUBACK_PACKET(PacketId, ReasonCode), PState);

View File

@ -20,25 +20,30 @@
-export([start_link/0]). -export([start_link/0]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([ init/1
terminate/2, code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-record(state, {}). -record(state, {}).
%%%=================================================================== %%------------------------------------------------------------------------------
%%% API %%% API
%%%=================================================================== %%------------------------------------------------------------------------------
%% @doc Starts the server %% @doc Starts the server
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}). -spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%=================================================================== %%------------------------------------------------------------------------------
%%% gen_server callbacks %%% gen_server callbacks
%%%=================================================================== %%------------------------------------------------------------------------------
init([]) -> init([]) ->
{ok, #state{}}. {ok, #state{}}.
@ -59,7 +64,7 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
%%%=================================================================== %%------------------------------------------------------------------------------
%%% Internal functions %%% Internal functions
%%%=================================================================== %%------------------------------------------------------------------------------

View File

@ -17,7 +17,11 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([name/2, text/1, connack_error/1]). -export([ name/2
, text/1
, connack_error/1
]).
-export([compat/2]). -export([compat/2]).
name(I, Ver) when Ver >= ?MQTT_PROTO_V5 -> name(I, Ver) when Ver >= ?MQTT_PROTO_V5 ->
@ -146,6 +150,7 @@ compat(unsuback, _Code) -> undefined.
connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID; connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID;
connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD; connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
connack_error(bad_clientid_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
connack_error(username_or_password_undefined) -> ?RC_BAD_USER_NAME_OR_PASSWORD; connack_error(username_or_password_undefined) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
connack_error(password_error) -> ?RC_BAD_USER_NAME_OR_PASSWORD; connack_error(password_error) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
connack_error(not_authorized) -> ?RC_NOT_AUTHORIZED; connack_error(not_authorized) -> ?RC_NOT_AUTHORIZED;

View File

@ -30,19 +30,38 @@
-export([start_link/2]). -export([start_link/2]).
%% Route APIs %% Route APIs
-export([add_route/1, add_route/2]). -export([ add_route/1
-export([do_add_route/1, do_add_route/2]). , add_route/2
-export([match_routes/1, lookup_routes/1, has_routes/1]). , do_add_route/1
-export([delete_route/1, delete_route/2]). , do_add_route/2
-export([do_delete_route/1, do_delete_route/2]). ]).
-export([ delete_route/1
, delete_route/2
, do_delete_route/1
, do_delete_route/2
]).
-export([ match_routes/1
, lookup_routes/1
, has_routes/1
]).
-export([print_routes/1]). -export([print_routes/1]).
-export([topics/0]). -export([topics/0]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-type(group() :: binary()). -type(group() :: binary()).
-type(destination() :: node() | {group(), node()}). -type(destination() :: node() | {group(), node()}).
-define(ROUTE, emqx_route). -define(ROUTE, emqx_route).

View File

@ -27,15 +27,22 @@
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
%% API %% API
-export([start_link/0, monitor/1]). -export([ start_link/0
, monitor/1
%% gen_server callbacks ]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
%% Internal export %% Internal export
-export([stats_fun/0]). -export([stats_fun/0]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(routing_node, {name, const = unused}). -record(routing_node, {name, const = unused}).
-define(ROUTE, emqx_route). -define(ROUTE, emqx_route).

View File

@ -15,8 +15,10 @@
%% @doc wrap gen_rpc? %% @doc wrap gen_rpc?
-module(emqx_rpc). -module(emqx_rpc).
-export([call/4, cast/4]). -export([ call/4
-export([multicall/4]). , cast/4
, multicall/4
]).
-define(RPC, gen_rpc). -define(RPC, gen_rpc).

View File

@ -14,14 +14,25 @@
-module(emqx_sequence). -module(emqx_sequence).
-export([create/1, nextval/2, currval/2, reclaim/2, delete/1]). -export([ create/1
, nextval/2
, currval/2
, reclaim/2
, delete/1
]).
-type(key() :: term()). -type(key() :: term()).
-type(name() :: atom()). -type(name() :: atom()).
-type(seqid() :: non_neg_integer()). -type(seqid() :: non_neg_integer()).
-export_type([seqid/0]). -export_type([seqid/0]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
%% @doc Create a sequence. %% @doc Create a sequence.
-spec(create(name()) -> ok). -spec(create(name()) -> ok).
create(Name) -> create(Name) ->

View File

@ -46,21 +46,40 @@
-include("types.hrl"). -include("types.hrl").
-export([start_link/1]). -export([start_link/1]).
-export([info/1, attrs/1]).
-export([stats/1]). -export([ info/1
-export([resume/2, discard/2]). , attrs/1
-export([update_expiry_interval/2]). , stats/1
-export([subscribe/2, subscribe/4]). ]).
-export([publish/3]).
-export([puback/2, puback/3]). -export([ resume/2
-export([pubrec/2, pubrec/3]). , discard/2
-export([pubrel/3, pubcomp/3]). , update_expiry_interval/2
-export([unsubscribe/2, unsubscribe/4]). ]).
-export([ subscribe/2
, subscribe/4
, unsubscribe/2
, unsubscribe/4
, publish/3
, puback/2
, puback/3
, pubrec/2
, pubrec/3
, pubrel/3
, pubcomp/3
]).
-export([close/1]). -export([close/1]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-import(emqx_zone, [get_env/2, get_env/3]). -import(emqx_zone, [get_env/2, get_env/3]).

View File

@ -20,20 +20,28 @@
-include("types.hrl"). -include("types.hrl").
-export([start_link/1]). -export([start_link/1]).
-export([start_session/1, count_sessions/0]).
-export([ start_session/1
, count_sessions/0
]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-type(shutdown() :: brutal_kill | infinity | pos_integer()). -type(shutdown() :: brutal_kill | infinity | pos_integer()).
-record(state, { -record(state,
sessions :: #{pid() => emqx_types:client_id()}, { sessions :: #{pid() => emqx_types:client_id()}
mfargs :: mfa(), , mfargs :: mfa()
shutdown :: shutdown(), , shutdown :: shutdown()
clean_down :: fun() , clean_down :: fun()
}). }).
-define(SUP, ?MODULE). -define(SUP, ?MODULE).
-define(BATCH_EXIT, 100000). -define(BATCH_EXIT, 100000).

View File

@ -27,18 +27,32 @@
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
%% APIs
-export([start_link/0]). -export([start_link/0]).
-export([subscribe/3, unsubscribe/3]). -export([ subscribe/3
, unsubscribe/3
]).
-export([dispatch/3]). -export([dispatch/3]).
-export([maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, is_ack_required/1]).
-export([ maybe_ack/1
, maybe_nack_dropped/1
, nack_no_connection/1
, is_ack_required/1
]).
%% for testing %% for testing
-export([subscribers/2]). -export([subscribers/2]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(TAB, emqx_shared_subscription). -define(TAB, emqx_shared_subscription).

View File

@ -20,17 +20,30 @@
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
%% APIs
-export([start_link/0]). -export([start_link/0]).
-export([open_session/1, close_session/1]). -export([ open_session/1
-export([resume_session/2]). , close_session/1
-export([discard_session/1, discard_session/2]). , resume_session/2
-export([register_session/1, register_session/2]). , discard_session/1
-export([unregister_session/1, unregister_session/2]). , discard_session/2
-export([get_session_attrs/1, get_session_attrs/2, , register_session/1
set_session_attrs/2, set_session_attrs/3]). , register_session/2
-export([get_session_stats/1, get_session_stats/2, , unregister_session/1
set_session_stats/2, set_session_stats/3]). , unregister_session/2
]).
-export([ get_session_attrs/1
, get_session_attrs/2
, set_session_attrs/2
, set_session_attrs/3
, get_session_stats/1
, get_session_stats/2
, set_session_stats/2
, set_session_stats/3
]).
-export([lookup_session_pids/1]). -export([lookup_session_pids/1]).
%% Internal functions for rpc %% Internal functions for rpc
@ -43,8 +56,13 @@
-export([clean_down/1]). -export([clean_down/1]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(SM, ?MODULE). -define(SM, ?MODULE).
@ -56,6 +74,10 @@
-define(BATCH_SIZE, 100000). -define(BATCH_SIZE, 100000).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec(start_link() -> startlink_ret()). -spec(start_link() -> startlink_ret()).
start_link() -> start_link() ->
gen_server:start_link({local, ?SM}, ?MODULE, [], []). gen_server:start_link({local, ?SM}, ?MODULE, [], []).

View File

@ -19,8 +19,16 @@
-export([start_link/0]). -export([start_link/0]).
-export([trans/2, trans/3]). -export([ trans/2
-export([lock/1, lock/2, unlock/1]). , trans/3
, lock/1
, lock/2
, unlock/1
]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec(start_link() -> startlink_ret()). -spec(start_link() -> startlink_ret()).
start_link() -> start_link() ->

View File

@ -21,12 +21,21 @@
-include("types.hrl"). -include("types.hrl").
-export([start_link/0]). -export([start_link/0]).
-export([is_enabled/0]).
-export([register_session/1, lookup_session/1, unregister_session/1]). -export([ is_enabled/0
, register_session/1
, lookup_session/1
, unregister_session/1
]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(REGISTRY, ?MODULE). -define(REGISTRY, ?MODULE).
-define(TAB, emqx_session_registry). -define(TAB, emqx_session_registry).
@ -36,6 +45,10 @@
-type(session_pid() :: pid()). -type(session_pid() :: pid()).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
%% @doc Start the global session manager. %% @doc Start the global session manager.
-spec(start_link() -> startlink_ret()). -spec(start_link() -> startlink_ret()).
start_link() -> start_link() ->

View File

@ -20,17 +20,32 @@
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
-export([start_link/0, start_link/1, stop/0]). %% APIs
-export([ start_link/0
, start_link/1
, stop/0
]).
%% Stats API. %% Stats API.
-export([getstats/0, getstat/1]). -export([ getstats/0
-export([setstat/2, setstat/3]). , getstat/1
-export([statsfun/1, statsfun/2]). , setstat/2
-export([update_interval/2, update_interval/3, cancel_update/1]). , setstat/3
, statsfun/1
, statsfun/2
, update_interval/2
, update_interval/3
, cancel_update/1
]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(update, {name, countdown, interval, func}). -record(update, {name, countdown, interval, func}).
-record(state, {timer, updates :: [#update{}], -record(state, {timer, updates :: [#update{}],

View File

@ -16,7 +16,11 @@
-behaviour(supervisor). -behaviour(supervisor).
-export([start_link/0, start_child/1, start_child/2, stop_child/1]). -export([ start_link/0
, start_child/1
, start_child/2
, stop_child/1
]).
-export([init/1]). -export([init/1]).
@ -26,13 +30,13 @@
-define(SUPERVISOR, ?MODULE). -define(SUPERVISOR, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_link() ->
supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []).
-spec(start_child(supervisor:child_spec()) -> startchild_ret()). -spec(start_child(supervisor:child_spec()) -> startchild_ret()).
start_child(ChildSpec) when is_tuple(ChildSpec) -> start_child(ChildSpec) when is_tuple(ChildSpec) ->
supervisor:start_child(?SUPERVISOR, ChildSpec). supervisor:start_child(?SUPERVISOR, ChildSpec).

View File

@ -20,10 +20,24 @@
-include("logger.hrl"). -include("logger.hrl").
-export([start_link/0]). -export([start_link/0]).
-export([version/0, uptime/0, datetime/0, sysdescr/0, sys_interval/0]).
-export([ version/0
, uptime/0
, datetime/0
, sysdescr/0
, sys_interval/0
]).
-export([info/0]). -export([info/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]). %% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-import(emqx_topic, [systop/1]). -import(emqx_topic, [systop/1]).
-import(emqx_misc, [start_timer/2]). -import(emqx_misc, [start_timer/2]).
@ -40,6 +54,10 @@
sysdescr % Broker description sysdescr % Broker description
]). ]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}). -spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() -> start_link() ->
gen_server:start_link({local, ?SYS}, ?MODULE, [], []). gen_server:start_link({local, ?SYS}, ?MODULE, [], []).

View File

@ -20,11 +20,19 @@
-include("types.hrl"). -include("types.hrl").
-export([start_link/1]). -export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
%% compress unused warning %% compress unused warning
-export([procinfo/1]). -export([procinfo/1]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-type(option() :: {long_gc, false | pos_integer()} -type(option() :: {long_gc, false | pos_integer()}
| {long_schedule, false | pos_integer()} | {long_schedule, false | pos_integer()}
| {large_heap, pos_integer()} | {large_heap, pos_integer()}
@ -33,6 +41,10 @@
-define(SYSMON, ?MODULE). -define(SYSMON, ?MODULE).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
%% @doc Start system monitor %% @doc Start system monitor
-spec(start_link(list(option())) -> startlink_ret()). -spec(start_link(list(option())) -> startlink_ret()).
start_link(Opts) -> start_link(Opts) ->

View File

@ -15,7 +15,10 @@
-module(emqx_tables). -module(emqx_tables).
-export([new/2]). -export([new/2]).
-export([lookup_value/2, lookup_value/3]).
-export([ lookup_value/2
, lookup_value/3
]).
%% Create a named_table ets. %% Create a named_table ets.
-spec(new(atom(), list()) -> ok). -spec(new(atom(), list()) -> ok).

View File

@ -14,7 +14,13 @@
-module(emqx_time). -module(emqx_time).
-export([seed/0, now_secs/0, now_secs/1, now_ms/0, now_ms/1, ts_from_ms/1]). -export([ seed/0
, now_secs/0
, now_secs/1
, now_ms/0
, now_ms/1
, ts_from_ms/1
]).
seed() -> seed() ->
rand:seed(exsplus, erlang:timestamp()). rand:seed(exsplus, erlang:timestamp()).

View File

@ -16,17 +16,22 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([match/2]). %% APIs
-export([validate/1, validate/2]). -export([ match/2
-export([levels/1]). , validate/1
-export([triples/1]). , validate/2
-export([tokens/1]). , levels/1
-export([words/1]). , triples/1
-export([wildcard/1]). , tokens/1
-export([join/1, prepend/2]). , words/1
-export([feed_var/3]). , wildcard/1
-export([systop/1]). , join/1
-export([parse/1, parse/2]). , prepend/2
, feed_var/3
, systop/1
, parse/1
, parse/2
]).
-type(group() :: binary()). -type(group() :: binary()).
-type(topic() :: binary()). -type(topic() :: binary()).
@ -38,6 +43,10 @@
-define(MAX_TOPIC_LEN, 4096). -define(MAX_TOPIC_LEN, 4096).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
%% @doc Is wildcard topic? %% @doc Is wildcard topic?
-spec(wildcard(topic() | words()) -> true | false). -spec(wildcard(topic() | words()) -> true | false).
wildcard(Topic) when is_binary(Topic) -> wildcard(Topic) when is_binary(Topic) ->
@ -221,4 +230,3 @@ parse(Topic, Options = #{qos := QoS}) ->
{Topic, Options#{rc => QoS}}; {Topic, Options#{rc => QoS}};
parse(Topic, Options) -> parse(Topic, Options) ->
{Topic, Options}. {Topic, Options}.

View File

@ -18,12 +18,23 @@
-include("emqx.hrl"). -include("emqx.hrl").
%% APIs
-export([start_link/0]). -export([start_link/0]).
-export([trace/2]).
-export([start_trace/3, lookup_traces/0, stop_trace/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ trace/2
code_change/3]). , start_trace/3
, lookup_traces/0
, stop_trace/1
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(state, {traces}). -record(state, {traces}).
@ -42,6 +53,10 @@
[]}]}, []}]},
msg,"\n"]}}). msg,"\n"]}}).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}). -spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
start_link() -> start_link() ->
gen_server:start_link({local, ?TRACER}, ?MODULE, [], []). gen_server:start_link({local, ?TRACER}, ?MODULE, [], []).

View File

@ -23,7 +23,12 @@
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
%% Trie APIs %% Trie APIs
-export([insert/1, match/1, lookup/1, delete/1]). -export([ insert/1
, match/1
, lookup/1
, delete/1
]).
-export([empty/0]). -export([empty/0]).
%% Mnesia tables %% Mnesia tables

View File

@ -18,14 +18,43 @@
-include("types.hrl"). -include("types.hrl").
-export_type([zone/0]). -export_type([zone/0]).
-export_type([pubsub/0, topic/0, subid/0, subopts/0]).
-export_type([client_id/0, username/0, password/0, peername/0, protocol/0]). -export_type([ pubsub/0
-export_type([credentials/0, session/0]). , topic/0
-export_type([subscription/0, subscriber/0, topic_table/0]). , subid/0
-export_type([payload/0, message/0]). , subopts/0
-export_type([delivery/0, deliver_results/0]). ]).
-export_type([ client_id/0
, username/0
, password/0
, peername/0
, protocol/0
]).
-export_type([ credentials/0
, session/0
]).
-export_type([ subscription/0
, subscriber/0
, topic_table/0
]).
-export_type([ payload/0
, message/0
]).
-export_type([ delivery/0
, deliver_results/0
]).
-export_type([route/0]). -export_type([route/0]).
-export_type([alarm/0, plugin/0, command/0]).
-export_type([ alarm/0
, plugin/0
, command/0
]).
-type(zone() :: atom()). -type(zone() :: atom()).
-type(pubsub() :: publish | subscribe). -type(pubsub() :: publish | subscribe).
@ -43,6 +72,7 @@
-type(auth_result() :: success -type(auth_result() :: success
| client_identifier_not_valid | client_identifier_not_valid
| bad_username_or_password | bad_username_or_password
| bad_clientid_or_password
| not_authorized | not_authorized
| server_unavailable | server_unavailable
| server_busy | server_busy
@ -52,7 +82,7 @@
-type(credentials() :: #{client_id := client_id(), -type(credentials() :: #{client_id := client_id(),
username := username(), username := username(),
peername := peername(), peername := peername(),
result := auth_result(), auth_result := auth_result(),
zone => zone(), zone => zone(),
atom() => term() atom() => term()
}). }).
@ -68,3 +98,4 @@
-type(alarm() :: #alarm{}). -type(alarm() :: #alarm{}).
-type(plugin() :: #plugin{}). -type(plugin() :: #plugin{}).
-type(command() :: #command{}). -type(command() :: #command{}).

View File

@ -14,17 +14,36 @@
-module(emqx_vm). -module(emqx_vm).
-export([schedulers/0]). -export([ schedulers/0
-export([microsecs/0]). , scheduler_usage/1
-export([loads/0, get_system_info/0, get_system_info/1, mem_info/0, scheduler_usage/1]). , microsecs/0
-export([get_memory/0]). , get_system_info/0
-export([get_process_list/0, get_process_info/0, get_process_info/1, , get_system_info/1
get_process_gc/0, get_process_gc/1, , get_memory/0
get_process_group_leader_info/1, , mem_info/0
get_process_limit/0]). , loads/0
-export([get_ets_list/0, get_ets_info/0, get_ets_info/1, ]).
get_ets_object/0, get_ets_object/1]).
-export([get_port_types/0, get_port_info/0, get_port_info/1]). -export([ get_process_list/0
, get_process_info/0
, get_process_info/1
, get_process_gc/0
, get_process_gc/1
, get_process_group_leader_info/1
, get_process_limit/0
]).
-export([ get_ets_list/0
, get_ets_info/0
, get_ets_info/1
, get_ets_object/0
, get_ets_object/1
]).
-export([ get_port_types/0
, get_port_info/0
, get_port_info/1
]).
-define(UTIL_ALLOCATORS, [temp_alloc, -define(UTIL_ALLOCATORS, [temp_alloc,
eheap_alloc, eheap_alloc,

View File

@ -16,21 +16,25 @@
-behaviour(gen_server). -behaviour(gen_server).
%% APIs
-export([start_link/1]). -export([start_link/1]).
-export([init/1, -export([ get_check_interval/0
handle_call/3, , set_check_interval/1
handle_cast/2, , get_process_high_watermark/0
handle_info/2, , set_process_high_watermark/1
terminate/2, , get_process_low_watermark/0
code_change/3]). , set_process_low_watermark/1
]).
-export([get_check_interval/0, %% gen_server callbacks
set_check_interval/1, -export([ init/1
get_process_high_watermark/0, , handle_call/3
set_process_high_watermark/1, , handle_cast/2
get_process_low_watermark/0, , handle_info/2
set_process_low_watermark/1]). , terminate/2
, code_change/3
]).
-define(VM_MON, ?MODULE). -define(VM_MON, ?MODULE).

View File

@ -18,18 +18,20 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("logger.hrl"). -include("logger.hrl").
-export([info/1]). -export([ info/1
-export([attrs/1]). , attrs/1
-export([stats/1]). , stats/1
-export([kick/1]). , kick/1
-export([session/1]). , session/1
]).
%% websocket callbacks %% websocket callbacks
-export([init/2]). -export([ init/2
-export([websocket_init/1]). , websocket_init/1
-export([websocket_handle/2]). , websocket_handle/2
-export([websocket_info/2]). , websocket_info/2
-export([terminate/3]). , terminate/3
]).
-record(state, { -record(state, {
request, request,
@ -306,3 +308,4 @@ shutdown(Reason, State) ->
wsock_stats() -> wsock_stats() ->
[{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS]. [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS].

View File

@ -20,20 +20,34 @@
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
%% APIs
-export([start_link/0]). -export([start_link/0]).
-export([get_env/2, get_env/3]).
-export([set_env/3]). -export([ get_env/2
-export([force_reload/0]). , get_env/3
, set_env/3
, force_reload/0
]).
%% for test %% for test
-export([stop/0]). -export([stop/0]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([ init/1
code_change/3]). , handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec(start_link() -> startlink_ret()). -spec(start_link() -> startlink_ret()).
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

View File

@ -24,12 +24,12 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
all() -> all() ->
[ [ message_make
message_make, , message_flag
message_flag, , message_header
message_header, , message_format
message_format, , message_expired
message_expired , message_to_map
]. ].
message_make(_) -> message_make(_) ->
@ -60,7 +60,9 @@ message_header(_) ->
Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg), Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg),
Msg2 = emqx_message:set_header(c, 3, Msg1), Msg2 = emqx_message:set_header(c, 3, Msg1),
?assertEqual(1, emqx_message:get_header(a, Msg2)), ?assertEqual(1, emqx_message:get_header(a, Msg2)),
?assertEqual(4, emqx_message:get_header(d, Msg2, 4)). ?assertEqual(4, emqx_message:get_header(d, Msg2, 4)),
Msg3 = emqx_message:remove_header(a, Msg2),
?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg3)).
message_format(_) -> message_format(_) ->
io:format("~s", [emqx_message:format(emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>))]). io:format("~s", [emqx_message:format(emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>))]).
@ -75,3 +77,17 @@ message_expired(_) ->
timer:sleep(1000), timer:sleep(1000),
Msg2 = emqx_message:update_expiry(Msg1), Msg2 = emqx_message:update_expiry(Msg1),
?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)).
message_to_map(_) ->
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>),
List = [{id, Msg#message.id},
{qos, ?QOS_1},
{from, <<"clientid">>},
{flags, #{dup => false}},
{headers, #{}},
{topic, <<"topic">>},
{payload, <<"payload">>},
{timestamp, Msg#message.timestamp}],
?assertEqual(List, emqx_message:to_list(Msg)),
?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)).