diff --git a/include/emqx.hrl b/include/emqx.hrl index fba079b91..e52df41f6 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,6 +12,7 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- -ifndef(EMQ_X_HRL). -define(EMQ_X_HRL, true). @@ -19,10 +21,6 @@ %% Banner %%-------------------------------------------------------------------- --define(COPYRIGHT, "Copyright (c) 2013-2019 EMQ Technologies Co., Ltd"). - --define(LICENSE_MESSAGE, "Licensed under the Apache License, Version 2.0"). - -define(PROTOCOL_VERSION, "MQTT/5.0"). -define(ERTS_MINIMUM_REQUIRED, "10.0"). @@ -47,8 +45,6 @@ %% Message and Delivery %%-------------------------------------------------------------------- --record(session, {sid, pid}). - -record(subscription, {topic, subid, subopts}). %% See 'Application Message' in MQTT Version 5.0 @@ -72,9 +68,12 @@ }). -record(delivery, { - sender :: pid(), %% Sender of the delivery - message :: #message{}, %% The message delivered - results :: list() %% Dispatches of the message + %% Sender of the delivery + sender :: pid(), + %% The message delivered + message :: #message{}, + %% Dispatches of the message + results :: list() }). %%-------------------------------------------------------------------- @@ -151,6 +150,7 @@ %%-------------------------------------------------------------------- %% Banned %%-------------------------------------------------------------------- + -type(banned_who() :: {client_id, binary()} | {username, binary()} | {ip_address, inet:ip_address()}). diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 126562401..7d1b2c773 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,6 +12,7 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- -module(emqx_banned). @@ -42,14 +44,14 @@ , code_change/3 ]). --define(TAB, ?MODULE). +-define(BANNED_TAB, ?MODULE). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Mnesia bootstrap -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?TAB, [ + ok = ekka_mnesia:create_table(?BANNED_TAB, [ {type, set}, {disc_copies, [node()]}, {record_name, banned}, @@ -57,7 +59,7 @@ mnesia(boot) -> {storage_properties, [{ets, [{read_concurrency, true}]}]}]); mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB). + ok = ekka_mnesia:copy_table(?BANNED_TAB). %% @doc Start the banned server. -spec(start_link() -> startlink_ret()). @@ -66,41 +68,42 @@ start_link() -> -spec(check(emqx_types:credentials()) -> boolean()). check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) -> - ets:member(?TAB, {client_id, ClientId}) - orelse ets:member(?TAB, {username, Username}) - orelse ets:member(?TAB, {ipaddr, IPAddr}). + ets:member(?BANNED_TAB, {client_id, ClientId}) + orelse ets:member(?BANNED_TAB, {username, Username}) + orelse ets:member(?BANNED_TAB, {ipaddr, IPAddr}). -spec(add(emqx_types:banned()) -> ok). add(Banned) when is_record(Banned, banned) -> - mnesia:dirty_write(?TAB, Banned). + mnesia:dirty_write(?BANNED_TAB, Banned). -spec(delete({client_id, emqx_types:client_id()} | {username, emqx_types:username()} | {peername, emqx_types:peername()}) -> ok). delete(Key) -> - mnesia:dirty_delete(?TAB, Key). + mnesia:dirty_delete(?BANNED_TAB, Key). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% gen_server callbacks -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- init([]) -> {ok, ensure_expiry_timer(#{expiry_timer => undefined})}. handle_call(Req, _From, State) -> - ?LOG(error, "[Banned] unexpected call: ~p", [Req]), + ?LOG(error, "[Banned] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Banned] unexpected msg: ~p", [Msg]), + ?LOG(error, "[Banned] Unexpected msg: ~p", [Msg]), {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> - mnesia:async_dirty(fun expire_banned_items/1, [erlang:system_time(second)]), + mnesia:async_dirty(fun expire_banned_items/1, + [erlang:system_time(second)]), {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> - ?LOG(error, "[Banned] unexpected info: ~p", [Info]), + ?LOG(error, "[Banned] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{expiry_timer := TRef}) -> @@ -109,9 +112,9 @@ terminate(_Reason, #{expiry_timer := TRef}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Internal functions -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -ifdef(TEST). ensure_expiry_timer(State) -> @@ -124,6 +127,7 @@ ensure_expiry_timer(State) -> expire_banned_items(Now) -> mnesia:foldl( fun(B = #banned{until = Until}, _Acc) when Until < Now -> - mnesia:delete_object(?TAB, B, sticky_write); + mnesia:delete_object(?BANNED_TAB, B, sticky_write); (_, _Acc) -> ok - end, ok, ?TAB). + end, ok, ?BANNED_TAB). + diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 7e7b7f4c1..89c65f45f 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -30,7 +30,10 @@ , stats/1 ]). --export([kick/1]). +-export([ kick/1 + , discard/1 + , takeover/1 + ]). -export([session/1]). @@ -135,6 +138,12 @@ stats(#state{transport = Transport, kick(CPid) -> call(CPid, kick). +discard(CPid) -> + call(CPid, discard). + +takeover(CPid) -> + call(CPid, takeover). + session(CPid) -> call(CPid, session). @@ -284,6 +293,10 @@ handle({call, From}, kick, State) -> ok = gen_statem:reply(From, ok), shutdown(kicked, State); +handle({call, From}, discard, State) -> + ok = gen_statem:reply(From, ok), + shutdown(discard, State); + handle({call, From}, session, State = #state{proto_state = ProtoState}) -> reply(From, emqx_protocol:session(ProtoState), State); diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index d78e99465..d4982f80b 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,7 +12,9 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- +%% Channel Manager -module(emqx_cm). -behaviour(gen_server). @@ -22,25 +25,39 @@ -export([start_link/0]). --export([ register_connection/1 - , register_connection/2 - , unregister_connection/1 - , unregister_connection/2 +-export([ register_channel/1 + , unregister_channel/1 + , unregister_channel/2 ]). -export([ get_conn_attrs/1 , get_conn_attrs/2 , set_conn_attrs/2 - , set_conn_attrs/3 ]). -export([ get_conn_stats/1 , get_conn_stats/2 , set_conn_stats/2 - , set_conn_stats/3 ]). --export([lookup_conn_pid/1]). +-export([ open_session/1 + , discard_session/1 + , resume_session/1 + ]). + +-export([ get_session_attrs/1 + , get_session_attrs/2 + , set_session_attrs/2 + ]). + +-export([ get_session_stats/1 + , get_session_stats/2 + , set_session_stats/2 + ]). + +-export([ lookup_channels/1 + , lookup_channels/2 + ]). %% gen_server callbacks -export([ init/1 @@ -51,159 +68,350 @@ , code_change/3 ]). -%% internal export +%% Internal export -export([stats_fun/0]). --define(CM, ?MODULE). +-type(chan_pid() :: pid()). -%% ETS tables for connection management. --define(CONN_TAB, emqx_conn). --define(CONN_ATTRS_TAB, emqx_conn_attrs). --define(CONN_STATS_TAB, emqx_conn_stats). +-opaque(attrs() :: #{atom() => term()}). +-opaque(stats() :: #{atom() => integer()}). + +-export_type([attrs/0, stats/0]). + +%% Tables for channel management. +-define(CHAN_TAB, emqx_channel). + +-define(CONN_TAB, emqx_connection). + +-define(SESSION_TAB, emqx_session). + +-define(SESSION_P_TAB, emqx_session_p). + +%% Chan stats +-define(CHAN_STATS, + [{?CHAN_TAB, 'channels.count', 'channels.max'}, + {?CONN_TAB, 'connections.count', 'connections.max'}, + {?SESSION_TAB, 'sessions.count', 'sessions.max'}, + {?SESSION_P_TAB, 'sessions.persistent.count', 'sessions.persistent.max'} + ]). + +%% Batch drain -define(BATCH_SIZE, 100000). -%% @doc Start the connection manager. +%% Server name +-define(CM, ?MODULE). + +%% @doc Start the channel manager. -spec(start_link() -> startlink_ret()). start_link() -> gen_server:start_link({local, ?CM}, ?MODULE, [], []). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% API -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -%% @doc Register a connection. --spec(register_connection(emqx_types:client_id()) -> ok). -register_connection(ClientId) when is_binary(ClientId) -> - register_connection(ClientId, self()). +%% @doc Register a channel. +-spec(register_channel(emqx_types:client_id()) -> ok). +register_channel(ClientId) when is_binary(ClientId) -> + register_channel(ClientId, self()). --spec(register_connection(emqx_types:client_id(), pid()) -> ok). -register_connection(ClientId, ConnPid) when is_binary(ClientId), is_pid(ConnPid) -> - true = ets:insert(?CONN_TAB, {ClientId, ConnPid}), - notify({registered, ClientId, ConnPid}). +-spec(register_channel(emqx_types:client_id(), chan_pid()) -> ok). +register_channel(ClientId, ChanPid) -> + Chan = {ClientId, ChanPid}, + true = ets:insert(?CHAN_TAB, Chan), + ok = emqx_cm_registry:register_channel(Chan), + cast({registered, Chan}). -%% @doc Unregister a connection. --spec(unregister_connection(emqx_types:client_id()) -> ok). -unregister_connection(ClientId) when is_binary(ClientId) -> - unregister_connection(ClientId, self()). +%% @doc Unregister a channel. +-spec(unregister_channel(emqx_types:client_id()) -> ok). +unregister_channel(ClientId) when is_binary(ClientId) -> + unregister_channel(ClientId, self()). --spec(unregister_connection(emqx_types:client_id(), pid()) -> ok). -unregister_connection(ClientId, ConnPid) when is_binary(ClientId), is_pid(ConnPid) -> - true = do_unregister_connection({ClientId, ConnPid}), - notify({unregistered, ConnPid}). +-spec(unregister_channel(emqx_types:client_id(), chan_pid()) -> ok). +unregister_channel(ClientId, ChanPid) -> + Chan = {ClientId, ChanPid}, + true = do_unregister_channel(Chan), + cast({unregistered, Chan}). -do_unregister_connection(Conn) -> - true = ets:delete(?CONN_STATS_TAB, Conn), - true = ets:delete(?CONN_ATTRS_TAB, Conn), - true = ets:delete_object(?CONN_TAB, Conn). +%% @private +do_unregister_channel(Chan) -> + ok = emqx_cm_registry:unregister_channel(Chan), + true = ets:delete_object(?SESSION_P_TAB, Chan), + true = ets:delete(?SESSION_TAB, Chan), + true = ets:delete(?CONN_TAB, Chan), + ets:delete_object(?CHAN_TAB, Chan). -%% @doc Get conn attrs --spec(get_conn_attrs(emqx_types:client_id()) -> list()). -get_conn_attrs(ClientId) when is_binary(ClientId) -> - ConnPid = lookup_conn_pid(ClientId), - get_conn_attrs(ClientId, ConnPid). +%% @doc Get conn attrs. +-spec(get_conn_attrs(emqx_types:client_id()) -> maybe(attrs())). +get_conn_attrs(ClientId) -> + with_channel(ClientId, fun(ChanPid) -> + get_conn_attrs(ClientId, ChanPid) + end). --spec(get_conn_attrs(emqx_types:client_id(), pid()) -> list()). -get_conn_attrs(ClientId, ConnPid) when is_binary(ClientId) -> - emqx_tables:lookup_value(?CONN_ATTRS_TAB, {ClientId, ConnPid}, []). +-spec(get_conn_attrs(emqx_types:client_id(), chan_pid()) -> maybe(attrs())). +get_conn_attrs(ClientId, ChanPid) when node(ChanPid) == node() -> + Chan = {ClientId, ChanPid}, + try ets:lookup_element(?CONN_TAB, Chan, 2) of + Attrs -> Attrs + catch + error:badarg -> undefined + end; +get_conn_attrs(ClientId, ChanPid) -> + rpc_call(node(ChanPid), get_conn_attrs, [ClientId, ChanPid]). -%% @doc Set conn attrs --spec(set_conn_attrs(emqx_types:client_id(), list()) -> true). -set_conn_attrs(ClientId, Attrs) when is_binary(ClientId) -> - set_conn_attrs(ClientId, self(), Attrs). +%% @doc Set conn attrs. +-spec(set_conn_attrs(emqx_types:client_id(), attrs()) -> ok). +set_conn_attrs(ClientId, Attrs) when is_map(Attrs) -> + Chan = {ClientId, self()}, + case ets:update_element(?CONN_TAB, Chan, {2, Attrs}) of + true -> ok; + false -> true = ets:insert(?CONN_TAB, {Chan, Attrs, #{}}), + ok + end. --spec(set_conn_attrs(emqx_types:client_id(), pid(), list()) -> true). -set_conn_attrs(ClientId, ConnPid, Attrs) when is_binary(ClientId), is_pid(ConnPid) -> - Conn = {ClientId, ConnPid}, - ets:insert(?CONN_ATTRS_TAB, {Conn, Attrs}). +%% @doc Get conn stats. +-spec(get_conn_stats(emqx_types:client_id()) -> maybe(stats())). +get_conn_stats(ClientId) -> + with_channel(ClientId, fun(ChanPid) -> + get_conn_stats(ClientId, ChanPid) + end). -%% @doc Get conn stats --spec(get_conn_stats(emqx_types:client_id()) -> list(emqx_stats:stats())). -get_conn_stats(ClientId) when is_binary(ClientId) -> - ConnPid = lookup_conn_pid(ClientId), - get_conn_stats(ClientId, ConnPid). - --spec(get_conn_stats(emqx_types:client_id(), pid()) -> list(emqx_stats:stats())). -get_conn_stats(ClientId, ConnPid) when is_binary(ClientId) -> - Conn = {ClientId, ConnPid}, - emqx_tables:lookup_value(?CONN_STATS_TAB, Conn, []). +-spec(get_conn_stats(emqx_types:client_id(), chan_pid()) -> maybe(stats())). +get_conn_stats(ClientId, ChanPid) when node(ChanPid) == node() -> + Chan = {ClientId, ChanPid}, + try ets:lookup_element(?CONN_TAB, Chan, 3) of + Stats -> Stats + catch + error:badarg -> undefined + end; +get_conn_stats(ClientId, ChanPid) -> + rpc_call(node(ChanPid), get_conn_stats, [ClientId, ChanPid]). %% @doc Set conn stats. --spec(set_conn_stats(emqx_types:client_id(), list(emqx_stats:stats())) -> true). +-spec(set_conn_stats(emqx_types:client_id(), stats()) -> ok). set_conn_stats(ClientId, Stats) when is_binary(ClientId) -> set_conn_stats(ClientId, self(), Stats). --spec(set_conn_stats(emqx_types:client_id(), pid(), list(emqx_stats:stats())) -> true). -set_conn_stats(ClientId, ConnPid, Stats) when is_binary(ClientId), is_pid(ConnPid) -> - Conn = {ClientId, ConnPid}, - ets:insert(?CONN_STATS_TAB, {Conn, Stats}). +-spec(set_conn_stats(emqx_types:client_id(), chan_pid(), stats()) -> ok). +set_conn_stats(ClientId, ChanPid, Stats) -> + Chan = {ClientId, ChanPid}, + _ = ets:update_element(?CONN_TAB, Chan, {3, Stats}), + ok. -%% @doc Lookup connection pid. --spec(lookup_conn_pid(emqx_types:client_id()) -> maybe(pid())). -lookup_conn_pid(ClientId) when is_binary(ClientId) -> - emqx_tables:lookup_value(?CONN_TAB, ClientId). +%% @doc Open a session. +-spec(open_session(map()) -> {ok, emqx_session:session()} + | {error, Reason :: term()}). +open_session(Attrs = #{clean_start := true, + client_id := ClientId}) -> + CleanStart = fun(_) -> + ok = discard_session(ClientId), + {ok, emqx_session:new(Attrs)} + end, + emqx_cm_locker:trans(ClientId, CleanStart); -notify(Msg) -> - gen_server:cast(?CM, {notify, Msg}). +open_session(Attrs = #{clean_start := false, + client_id := ClientId}) -> + ResumeStart = fun(_) -> + case resume_session(ClientId) of + {ok, Session} -> + {ok, Session, true}; + {error, not_found} -> + {ok, emqx_session:new(Attrs)} + end + end, + emqx_cm_locker:trans(ClientId, ResumeStart). -%%----------------------------------------------------------------------------- +%% @doc Try to resume a session. +-spec(resume_session(emqx_types:client_id()) + -> {ok, emqx_session:session()} | {error, Reason :: term()}). +resume_session(ClientId) -> + case lookup_channels(ClientId) of + [] -> {error, not_found}; + [ChanPid] -> + emqx_channel:resume(ChanPid); + ChanPids -> + [ChanPid|StalePids] = lists:reverse(ChanPids), + ?LOG(error, "[SM] More than one channel found: ~p", [ChanPids]), + lists:foreach(fun(StalePid) -> + catch emqx_channel:discard(StalePid) + end, StalePids), + emqx_channel:resume(ChanPid) + end. + +%% @doc Discard all the sessions identified by the ClientId. +-spec(discard_session(emqx_types:client_id()) -> ok). +discard_session(ClientId) when is_binary(ClientId) -> + case lookup_channels(ClientId) of + [] -> ok; + ChanPids -> + lists:foreach( + fun(ChanPid) -> + try emqx_channel:discard(ChanPid) + catch + _:Error:_Stk -> + ?LOG(warning, "[SM] Failed to discard ~p: ~p", [ChanPid, Error]) + end + end, ChanPids) + end. + +%% @doc Get session attrs. +-spec(get_session_attrs(emqx_types:client_id()) -> attrs()). +get_session_attrs(ClientId) -> + with_channel(ClientId, fun(ChanPid) -> + get_session_attrs(ClientId, ChanPid) + end). + +-spec(get_session_attrs(emqx_types:client_id(), chan_pid()) -> maybe(attrs())). +get_session_attrs(ClientId, ChanPid) when node(ChanPid) == node() -> + Chan = {ClientId, ChanPid}, + try ets:lookup_element(?SESSION_TAB, Chan, 2) of + Attrs -> Attrs + catch + error:badarg -> undefined + end; +get_session_attrs(ClientId, ChanPid) -> + rpc_call(node(ChanPid), get_session_attrs, [ClientId, ChanPid]). + +%% @doc Set session attrs. +-spec(set_session_attrs(emqx_types:client_id(), attrs()) -> ok). +set_session_attrs(ClientId, Attrs) when is_binary(ClientId) -> + Chan = {ClientId, self()}, + case ets:update_element(?SESSION_TAB, Chan, {2, Attrs}) of + true -> ok; + false -> + true = ets:insert(?SESSION_TAB, {Chan, Attrs, #{}}), + is_clean_start(Attrs) orelse ets:insert(?SESSION_P_TAB, Chan), + ok + end. + +%% @doc Is clean start? +is_clean_start(#{clean_start := false}) -> false; +is_clean_start(_Attrs) -> true. + +%% @doc Get session stats. +-spec(get_session_stats(emqx_types:client_id()) -> stats()). +get_session_stats(ClientId) -> + with_channel(ClientId, fun(ChanPid) -> + get_session_stats(ClientId, ChanPid) + end). + +-spec(get_session_stats(emqx_types:client_id(), chan_pid()) -> maybe(stats())). +get_session_stats(ClientId, ChanPid) when node(ChanPid) == node() -> + Chan = {ClientId, ChanPid}, + try ets:lookup_element(?SESSION_TAB, Chan, 3) of + Stats -> Stats + catch + error:badarg -> undefined + end; +get_session_stats(ClientId, ChanPid) -> + rpc_call(node(ChanPid), get_session_stats, [ClientId, ChanPid]). + +%% @doc Set session stats. +-spec(set_session_stats(emqx_types:client_id(), stats()) -> ok). +set_session_stats(ClientId, Stats) when is_binary(ClientId) -> + set_session_stats(ClientId, self(), Stats). + +-spec(set_session_stats(emqx_types:client_id(), chan_pid(), stats()) -> ok). +set_session_stats(ClientId, ChanPid, Stats) -> + Chan = {ClientId, ChanPid}, + _ = ets:update_element(?SESSION_TAB, Chan, {3, Stats}), + ok. + +with_channel(ClientId, Fun) -> + case lookup_channels(ClientId) of + [] -> undefined; + [Pid] -> Fun(Pid); + Pids -> Fun(lists:last(Pids)) + end. + +%% @doc Lookup channels. +-spec(lookup_channels(emqx_types:client_id()) -> list(chan_pid())). +lookup_channels(ClientId) -> + lookup_channels(global, ClientId). + +%% @doc Lookup local or global channels. +-spec(lookup_channels(local | global, emqx_types:client_id()) -> list(chan_pid())). +lookup_channels(global, ClientId) -> + case emqx_cm_registry:is_enabled() of + true -> + emqx_cm_registry:lookup_channels(ClientId); + false -> + lookup_channels(local, ClientId) + end; + +lookup_channels(local, ClientId) -> + [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)]. + +%% @private +rpc_call(Node, Fun, Args) -> + case rpc:call(Node, ?MODULE, Fun, Args) of + {badrpc, Reason} -> error(Reason); + Res -> Res + end. + +%% @private +cast(Msg) -> gen_server:cast(?CM, Msg). + +%%-------------------------------------------------------------------- %% gen_server callbacks -%%----------------------------------------------------------------------------- +%%-------------------------------------------------------------------- init([]) -> - TabOpts = [public, set, {write_concurrency, true}], - ok = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]), - ok = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts), - ok = emqx_tables:new(?CONN_STATS_TAB, TabOpts), - ok = emqx_stats:update_interval(conn_stats, fun ?MODULE:stats_fun/0), - {ok, #{conn_pmon => emqx_pmon:new()}}. + TabOpts = [public, {write_concurrency, true}], + ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]), + ok = emqx_tables:new(?CONN_TAB, [set, compressed | TabOpts]), + ok = emqx_tables:new(?SESSION_TAB, [set, compressed | TabOpts]), + ok = emqx_tables:new(?SESSION_P_TAB, [bag | TabOpts]), + ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), + {ok, #{chan_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> ?LOG(error, "[CM] Unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) -> - {noreply, State#{conn_pmon := emqx_pmon:monitor(ConnPid, ClientId, PMon)}}; +handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> + PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon), + {noreply, State#{chan_pmon := PMon1}}; -handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) -> - {noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}}; +handle_cast({unregistered, {_ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> + PMon1 = emqx_pmon:demonitor(ChanPid, PMon), + {noreply, State#{chan_pmon := PMon1}}; handle_cast(Msg, State) -> ?LOG(error, "[CM] Unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) -> - ConnPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], - {Items, PMon1} = emqx_pmon:erase_all(ConnPids, PMon), - ok = emqx_pool:async_submit( - fun lists:foreach/2, [fun clean_down/1, Items]), - {noreply, State#{conn_pmon := PMon1}}; +handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> + ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], + {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), + ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]), + {noreply, State#{chan_pmon := PMon1}}; handle_info(Info, State) -> ?LOG(error, "[CM] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> - emqx_stats:cancel_update(conn_stats). + emqx_stats:cancel_update(chan_stats). code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Internal functions -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -clean_down({Pid, ClientId}) -> - Conn = {ClientId, Pid}, - case ets:member(?CONN_TAB, ClientId) - orelse ets:member(?CONN_ATTRS_TAB, Conn) of - true -> - do_unregister_connection(Conn); - false -> false - end. +clean_down({ChanPid, ClientId}) -> + Chan = {ClientId, ChanPid}, + do_unregister_channel(Chan). stats_fun() -> - case ets:info(?CONN_TAB, size) of + lists:foreach(fun update_stats/1, ?CHAN_STATS). + +update_stats({Tab, Stat, MaxStat}) -> + case ets:info(Tab, size) of undefined -> ok; - Size -> emqx_stats:setstat('connections.count', 'connections.max', Size) + Size -> emqx_stats:setstat(Stat, MaxStat, Size) end. + diff --git a/src/emqx_cm_locker.erl b/src/emqx_cm_locker.erl new file mode 100644 index 000000000..c5cd9c2f6 --- /dev/null +++ b/src/emqx_cm_locker.erl @@ -0,0 +1,66 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cm_locker). + +-include("emqx.hrl"). +-include("types.hrl"). + +-export([start_link/0]). + +-export([ trans/2 + , trans/3 + , lock/1 + , lock/2 + , unlock/1 + ]). + +-spec(start_link() -> startlink_ret()). +start_link() -> + ekka_locker:start_link(?MODULE). + +-spec(trans(emqx_types:client_id(), fun(([node()]) -> any())) -> any()). +trans(ClientId, Fun) -> + trans(ClientId, Fun, undefined). + +-spec(trans(maybe(emqx_types:client_id()), + fun(([node()])-> any()), ekka_locker:piggyback()) -> any()). +trans(undefined, Fun, _Piggyback) -> + Fun([]); +trans(ClientId, Fun, Piggyback) -> + case lock(ClientId, Piggyback) of + {true, Nodes} -> + try Fun(Nodes) after unlock(ClientId) end; + {false, _Nodes} -> + {error, client_id_unavailable} + end. + +-spec(lock(emqx_types:client_id()) -> ekka_locker:lock_result()). +lock(ClientId) -> + ekka_locker:acquire(?MODULE, ClientId, strategy()). + +-spec(lock(emqx_types:client_id(), ekka_locker:piggyback()) -> ekka_locker:lock_result()). +lock(ClientId, Piggyback) -> + ekka_locker:acquire(?MODULE, ClientId, strategy(), Piggyback). + +-spec(unlock(emqx_types:client_id()) -> {boolean(), [node()]}). +unlock(ClientId) -> + ekka_locker:release(?MODULE, ClientId, strategy()). + +-spec(strategy() -> local | one | quorum | all). +strategy() -> + emqx_config:get_env(session_locking_strategy, quorum). + diff --git a/src/emqx_cm_registry.erl b/src/emqx_cm_registry.erl index ccacc9907..ffdb6661a 100644 --- a/src/emqx_cm_registry.erl +++ b/src/emqx_cm_registry.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,8 +12,10 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- --module(emqx_sm_registry). +%% Global Channel Registry +-module(emqx_cm_registry). -behaviour(gen_server). @@ -22,12 +25,14 @@ -export([start_link/0]). --export([ is_enabled/0 - , register_session/1 - , lookup_session/1 - , unregister_session/1 +-export([is_enabled/0]). + +-export([ register_channel/1 + , unregister_channel/1 ]). +-export([lookup_channels/1]). + %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -38,57 +43,67 @@ ]). -define(REGISTRY, ?MODULE). --define(TAB, emqx_session_registry). --define(LOCK, {?MODULE, cleanup_sessions}). +-define(TAB, emqx_channel_registry). +-define(LOCK, {?MODULE, cleanup_down}). --record(global_session, {sid, pid}). +-record(channel, {chid, pid}). --type(session_pid() :: pid()). - -%%------------------------------------------------------------------------------ -%% APIs -%%------------------------------------------------------------------------------ - -%% @doc Start the global session manager. +%% @doc Start the global channel registry. -spec(start_link() -> startlink_ret()). start_link() -> gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []). +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +%% @doc Is the global registry enabled? -spec(is_enabled() -> boolean()). is_enabled() -> - emqx_config:get_env(enable_session_registry, true). + emqx_config:get_env(enable_channel_registry, true). --spec(lookup_session(emqx_types:client_id()) -> list(session_pid())). -lookup_session(ClientId) -> - [SessPid || #global_session{pid = SessPid} <- mnesia:dirty_read(?TAB, ClientId)]. +%% @doc Register a global channel. +-spec(register_channel(emqx_types:client_id() + | {emqx_types:client_id(), pid()}) -> ok). +register_channel(ClientId) when is_binary(ClientId) -> + register_channel({ClientId, self()}); --spec(register_session({emqx_types:client_id(), session_pid()}) -> ok). -register_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) -> +register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> mnesia:dirty_write(?TAB, record(ClientId, SessPid)); + true -> mnesia:dirty_write(?TAB, record(ClientId, ChanPid)); false -> ok end. --spec(unregister_session({emqx_types:client_id(), session_pid()}) -> ok). -unregister_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) -> +%% @doc Unregister a global channel. +-spec(unregister_channel(emqx_types:client_id() + | {emqx_types:client_id(), pid()}) -> ok). +unregister_channel(ClientId) when is_binary(ClientId) -> + unregister_channel({ClientId, self()}); + +unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> mnesia:dirty_delete_object(?TAB, record(ClientId, SessPid)); + true -> mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid)); false -> ok end. -record(ClientId, SessPid) -> - #global_session{sid = ClientId, pid = SessPid}. +%% @doc Lookup the global channels. +-spec(lookup_channels(emqx_types:client_id()) -> list(pid())). +lookup_channels(ClientId) -> + [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?TAB, ClientId)]. -%%------------------------------------------------------------------------------ +record(ClientId, ChanPid) -> + #channel{chid = ClientId, pid = ChanPid}. + +%%-------------------------------------------------------------------- %% gen_server callbacks -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- init([]) -> ok = ekka_mnesia:create_table(?TAB, [ {type, bag}, {ram_copies, [node()]}, - {record_name, global_session}, - {attributes, record_info(fields, global_session)}, + {record_name, channel}, + {attributes, record_info(fields, channel)}, {storage_properties, [{ets, [{read_concurrency, true}, {write_concurrency, true}]}]}]), ok = ekka_mnesia:copy_table(?TAB), @@ -106,7 +121,7 @@ handle_cast(Msg, State) -> handle_info({membership, {mnesia, down, Node}}, State) -> global:trans({?LOCK, self()}, fun() -> - mnesia:transaction(fun cleanup_sessions/1, [Node]) + mnesia:transaction(fun cleanup_channels/1, [Node]) end), {noreply, State}; @@ -123,14 +138,14 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Internal functions -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -cleanup_sessions(Node) -> - Pat = [{#global_session{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}], - lists:foreach(fun delete_session/1, mnesia:select(?TAB, Pat, write)). +cleanup_channels(Node) -> + Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}], + lists:foreach(fun delete_channel/1, mnesia:select(?TAB, Pat, write)). -delete_session(Session) -> - mnesia:delete_object(?TAB, Session, write). +delete_channel(Chan) -> + mnesia:delete_object(?TAB, Chan, write). diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index 65702b26f..6cbf8d432 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,8 +12,9 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- --module(emqx_sm_sup). +-module(emqx_cm_sup). -behaviour(supervisor). @@ -24,41 +26,45 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - %% Session locker + Banned = #{id => banned, + start => {emqx_banned, start_link, []}, + restart => permanent, + shutdown => 1000, + type => worker, + modules => [emqx_banned]}, + Flapping = #{id => flapping, + start => {emqx_flapping, start_link, []}, + restart => permanent, + shutdown => 1000, + type => worker, + modules => [emqx_flapping]}, + %% Channel locker Locker = #{id => locker, - start => {emqx_sm_locker, start_link, []}, + start => {emqx_cm_locker, start_link, []}, restart => permanent, shutdown => 5000, type => worker, - modules => [emqx_sm_locker] + modules => [emqx_cm_locker] }, - %% Session registry + %% Channel registry Registry = #{id => registry, - start => {emqx_sm_registry, start_link, []}, + start => {emqx_cm_registry, start_link, []}, restart => permanent, shutdown => 5000, type => worker, - modules => [emqx_sm_registry] + modules => [emqx_cm_registry] }, - %% Session Manager + %% Channel Manager Manager = #{id => manager, - start => {emqx_sm, start_link, []}, + start => {emqx_cm, start_link, []}, restart => permanent, shutdown => 5000, type => worker, - modules => [emqx_sm] + modules => [emqx_cm] }, - %% Session Sup - SessSpec = #{start => {emqx_session, start_link, []}, - shutdown => brutal_kill, - clean_down => fun emqx_sm:clean_down/1 + SupFlags = #{strategy => one_for_one, + intensity => 100, + period => 10 }, - SessionSup = #{id => session_sup, - start => {emqx_session_sup, start_link, [SessSpec ]}, - restart => transient, - shutdown => infinity, - type => supervisor, - modules => [emqx_session_sup] - }, - {ok, {{rest_for_one, 10, 3600}, [Locker, Registry, Manager, SessionSup]}}. + {ok, {SupFlags, [Banned, Flapping, Locker, Registry, Manager]}}. diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index 099bf3910..94e1bcc78 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,6 +12,9 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc This module is used to garbage clean the flapping records. -module(emqx_flapping). @@ -19,31 +23,29 @@ -behaviour(gen_statem). --export([start_link/1]). - -%% This module is used to garbage clean the flapping records +-export([start_link/0]). %% gen_statem callbacks --export([ terminate/3 - , code_change/4 - , init/1 +-export([ init/1 , initialized/3 , callback_mode/0 + , terminate/3 + , code_change/4 ]). -define(FLAPPING_TAB, ?MODULE). -export([check/3]). --record(flapping, - { client_id :: binary() - , check_count :: integer() - , timestamp :: integer() - }). +-record(flapping, { + client_id :: binary(), + check_count :: integer(), + timestamp :: integer() + }). -type(flapping_record() :: #flapping{}). --type(flapping_state() :: flapping | ok). +-type(flapping_state() :: flapping | ok). %% @doc This function is used to initialize flapping records %% the expiry time unit is minutes. @@ -96,18 +98,20 @@ check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval}, %%-------------------------------------------------------------------- %% gen_statem callbacks %%-------------------------------------------------------------------- --spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()). -start_link(TimerInterval) -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []). -init([TimerInterval]) -> +-spec(start_link() -> startlink_ret()). +start_link() -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + Interval = emqx_config:get_env(flapping_clean_interval, 3600000), TabOpts = [ public , set , {keypos, 2} , {write_concurrency, true} , {read_concurrency, true}], ok = emqx_tables:new(?FLAPPING_TAB, TabOpts), - {ok, initialized, #{timer_interval => TimerInterval}}. + {ok, initialized, #{timer_interval => Interval}}. callback_mode() -> [state_functions, state_enter]. @@ -134,3 +138,4 @@ clean_expired_records() -> NowTime = emqx_time:now_secs(), MatchSpec = [{{'$1', '$2', '$3'},[{'<', '$3', NowTime}], [true]}], ets:select_delete(?FLAPPING_TAB, MatchSpec). + diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index 6d448d2f3..c789ae482 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,6 +12,7 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- -module(emqx_hooks). @@ -19,7 +21,9 @@ -include("logger.hrl"). -include("types.hrl"). --export([start_link/0, stop/0]). +-export([ start_link/0 + , stop/0 + ]). %% Hooks API -export([ add/2 @@ -52,11 +56,16 @@ -type(action() :: function() | mfa()). -type(filter() :: function() | mfa()). --record(callback, {action :: action(), - filter :: filter(), - priority :: integer()}). +-record(callback, { + action :: action(), + filter :: filter(), + priority :: integer() + }). --record(hook, {name :: hookpoint(), callbacks :: list(#callback{})}). +-record(hook, { + name :: hookpoint(), + callbacks :: list(#callback{}) + }). -export_type([hookpoint/0, action/0, filter/0]). @@ -65,15 +74,16 @@ -spec(start_link() -> startlink_ret()). start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{hibernate_after, 1000}]). + gen_server:start_link({local, ?SERVER}, + ?MODULE, [], [{hibernate_after, 1000}]). -spec(stop() -> ok). stop() -> gen_server:stop(?SERVER, normal, infinity). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Hooks API -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% @doc Register a callback -spec(add(hookpoint(), action() | #callback{}) -> ok_or_error(already_exists)). @@ -111,7 +121,6 @@ run(HookPoint, Args) -> run_fold(HookPoint, Args, Acc) -> do_run_fold(lookup(HookPoint), Args, Acc). - do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) -> case filter_passed(Filter, Args) andalso execute(Action, Args) of %% stop the hook chain and return @@ -163,12 +172,12 @@ lookup(HookPoint) -> [] -> [] end. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% gen_server callbacks -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- init([]) -> - ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}, protected]), + ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), {ok, #{}}. handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) -> diff --git a/src/emqx_keepalive.erl b/src/emqx_keepalive.erl index ebd5e18d4..36cb6b335 100644 --- a/src/emqx_keepalive.erl +++ b/src/emqx_keepalive.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,6 +12,7 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- -module(emqx_keepalive). @@ -20,15 +22,22 @@ , cancel/1 ]). --record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}). +-record(keepalive, { + statfun, + statval, + tsec, + tmsg, + tref, + repeat = 0 + }). -opaque(keepalive() :: #keepalive{}). -export_type([keepalive/0]). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% APIs -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% @doc Start a keepalive -spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}). @@ -79,3 +88,4 @@ cancel(_) -> timer(Secs, Msg) -> erlang:send_after(timer:seconds(Secs), self(), Msg). + diff --git a/src/emqx_logger_formatter.erl b/src/emqx_logger_formatter.erl index 92c194883..6cfe1ca58 100644 --- a/src/emqx_logger_formatter.erl +++ b/src/emqx_logger_formatter.erl @@ -34,18 +34,22 @@ -define(IS_STRING(String), (is_list(String) orelse is_binary(String))). -%%%----------------------------------------------------------------- -%%% Types --type config() :: #{chars_limit => pos_integer() | unlimited, - depth => pos_integer() | unlimited, - max_size => pos_integer() | unlimited, - report_cb => logger:report_cb(), - quit => template()}. --type template() :: [metakey() | {metakey(),template(),template()} | string()]. --type metakey() :: atom() | [atom()]. +%%-------------------------------------------------------------------- +%% Types + +-type(config() :: #{chars_limit => pos_integer() | unlimited, + depth => pos_integer() | unlimited, + max_size => pos_integer() | unlimited, + report_cb => logger:report_cb(), + quit => template()}). + +-type(template() :: [metakey() | {metakey(),template(),template()} | string()]). + +-type(metakey() :: atom() | [atom()]). + +%%-------------------------------------------------------------------- +%% API -%%%----------------------------------------------------------------- -%%% API -spec format(LogEvent,Config) -> unicode:chardata() when LogEvent :: logger:log_event(), Config :: config(). diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index f9d74dc81..38ad29b86 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,6 +12,7 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- -module(emqx_modules). @@ -20,20 +22,25 @@ , unload/0 ]). +%% @doc Load all the extended modules. -spec(load() -> ok). load() -> ok = emqx_mod_acl_internal:load([]), - lists:foreach( - fun({Mod, Env}) -> - ok = Mod:load(Env), - ?LOG(info, "[Modules] Load ~s module successfully.", [Mod]) - end, emqx_config:get_env(modules, [])). + lists:foreach(fun load/1, modules()). +load({Mod, Env}) -> + ok = Mod:load(Env), + ?LOG(info, "[Modules] Load ~s module successfully.", [Mod]). + +modules() -> + emqx_config:get_env(modules, []). + +%% @doc Unload all the extended modules. -spec(unload() -> ok). unload() -> ok = emqx_mod_acl_internal:unload([]), - lists:foreach( - fun({Mod, Env}) -> - Mod:unload(Env) end, - emqx_config:get_env(modules, [])). + lists:foreach(fun unload/1, modules()). + +unload({Mod, Env}) -> + Mod:unload(Env).