diff --git a/Makefile b/Makefile index 03bbef018..68d383940 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ - emqx_hooks emqx_batch emqx_sequence + emqx_hooks emqx_batch emqx_sequence emqx_pmon CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/etc/emqx.conf b/etc/emqx.conf index 45368a066..09ce1b526 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -645,6 +645,11 @@ listener.tcp.external.max_connections = 1024000 ## Value: Number listener.tcp.external.max_conn_rate = 1000 +## Specify the {active, N} option for the external MQTT/TCP Socket. +## +## Value: Number +listener.tcp.external.active_n = 100 + ## Zone of the external MQTT/TCP listener belonged to. ## ## See: zone.$name.* @@ -781,6 +786,11 @@ listener.tcp.internal.max_connections = 10240000 ## Value: Number listener.tcp.internal.max_conn_rate = 1000 +## Specify the {active, N} option for the internal MQTT/TCP Socket. +## +## Value: Number +listener.tcp.internal.active_n = 1000 + ## Zone of the internal MQTT/TCP listener belonged to. ## ## Value: String @@ -888,6 +898,11 @@ listener.ssl.external.max_connections = 102400 ## Value: Number listener.ssl.external.max_conn_rate = 500 +## Specify the {active, N} option for the internal MQTT/SSL Socket. +## +## Value: Number +listener.ssl.external.active_n = 100 + ## Zone of the external MQTT/SSL listener belonged to. ## ## Value: String diff --git a/priv/emqx.schema b/priv/emqx.schema index 3bb9f34f8..7c37383c4 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -772,6 +772,11 @@ end}. {datatype, integer} ]}. +{mapping, "listener.tcp.$name.active_n", "emqx.listeners", [ + {default, 100}, + {datatype, integer} +]}. + {mapping, "listener.tcp.$name.zone", "emqx.listeners", [ {datatype, string} ]}. @@ -867,6 +872,11 @@ end}. {datatype, integer} ]}. +{mapping, "listener.ssl.$name.active_n", "emqx.listeners", [ + {default, 100}, + {datatype, integer} +]}. + {mapping, "listener.ssl.$name.zone", "emqx.listeners", [ {datatype, string} ]}. @@ -1283,6 +1293,7 @@ end}. {mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)}, {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, + {active_n, cuttlefish:conf_get(Prefix ++ ".active_n", Conf, undefined)}, {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, {zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, {rate_limit, Ratelimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))}, diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 49a698384..af580b48e 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -26,17 +26,16 @@ -export([start_link/0]). -export([check/1]). --export([add/1, del/1]). +-export([add/1, delete/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(TAB, ?MODULE). --define(SERVER, ?MODULE). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Mnesia bootstrap -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ mnesia(boot) -> ok = ekka_mnesia:create_table(?TAB, [ @@ -52,7 +51,7 @@ mnesia(copy) -> %% @doc Start the banned server. -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -spec(check(emqx_types:credentials()) -> boolean()). check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) -> @@ -64,25 +63,25 @@ check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) - add(Banned) when is_record(Banned, banned) -> mnesia:dirty_write(?TAB, Banned). --spec(del({client_id, emqx_types:client_id()} | - {username, emqx_types:username()} | - {peername, emqx_types:peername()}) -> ok). -del(Key) -> +-spec(delete({client_id, emqx_types:client_id()} + | {username, emqx_types:username()} + | {peername, emqx_types:peername()}) -> ok). +delete(Key) -> mnesia:dirty_delete(?TAB, Key). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init([]) -> {ok, ensure_expiry_timer(#{expiry_timer => undefined})}. handle_call(Req, _From, State) -> - emqx_logger:error("[BANNED] unexpected call: ~p", [Req]), + emqx_logger:error("[Banned] unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - emqx_logger:error("[BANNED] unexpected msg: ~p", [Msg]), + emqx_logger:error("[Banned] unexpected msg: ~p", [Msg]), {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> @@ -90,7 +89,7 @@ handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> - emqx_logger:error("[BANNED] unexpected info: ~p", [Info]), + emqx_logger:error("[Banned] unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{expiry_timer := TRef}) -> @@ -99,21 +98,22 @@ terminate(_Reason, #{expiry_timer := TRef}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -ifdef(TEST). ensure_expiry_timer(State) -> State#{expiry_timer := emqx_misc:start_timer(timer:seconds(2), expire)}. -else. ensure_expiry_timer(State) -> - State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}. + State#{expiry_timer := emqx_misc:start_timer(timer:minutes(1), expire)}. -endif. expire_banned_items(Now) -> - mnesia:foldl(fun - (B = #banned{until = Until}, _Acc) when Until < Now -> - mnesia:delete_object(?TAB, B, sticky_write); - (_, _Acc) -> ok - end, ok, ?TAB). + mnesia:foldl( + fun(B = #banned{until = Until}, _Acc) when Until < Now -> + mnesia:delete_object(?TAB, B, sticky_write); + (_, _Acc) -> ok + end, ok, ?TAB). + diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 60fb5fd8f..6d1ee98cb 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -371,7 +371,7 @@ cast(Broker, Msg) -> %% Pick a broker pick(Topic) -> - gproc_pool:pick_worker(emqx_broker_pool, Topic). + gproc_pool:pick_worker(broker_pool, Topic). %%------------------------------------------------------------------------------ %% gen_server callbacks diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index 7d514e31d..c8e2b5eab 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -31,6 +31,8 @@ -define(SUBSEQ, emqx_subseq). -define(SHARD, 1024). +-define(BATCH_SIZE, 10000). + -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?HELPER}, ?MODULE, [], []). @@ -106,14 +108,12 @@ handle_cast(Msg, State) -> emqx_logger:error("[BrokerHelper] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #{pmon := PMon}) -> - case ets:lookup(?SUBMON, SubPid) of - [{_, SubId}] -> - ok = emqx_pool:async_submit(fun subscriber_down/2, [SubPid, SubId]); - [] -> - emqx_logger:error("[BrokerHelper] unexpected DOWN: ~p, reason: ~p", [SubPid, Reason]) - end, - {noreply, State#{pmon := emqx_pmon:erase(SubPid, PMon)}}; +handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) -> + SubPids = [SubPid | emqx_misc:drain_down(?BATCH_SIZE)], + ok = emqx_pool:async_submit( + fun lists:foreach/2, [fun clean_down/1, SubPids]), + {_, PMon1} = emqx_pmon:erase_all(SubPids, PMon), + {noreply, State#{pmon := PMon1}}; handle_info(Info, State) -> emqx_logger:error("[BrokerHelper] unexpected info: ~p", [Info]), @@ -126,8 +126,17 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -subscriber_down(SubPid, SubId) -> - true = ets:delete(?SUBMON, SubPid), - true = (SubId =:= undefined) orelse ets:delete_object(?SUBID, {SubId, SubPid}), - emqx_broker:subscriber_down(SubPid). +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +clean_down(SubPid) -> + case ets:lookup(?SUBMON, SubPid) of + [{_, SubId}] -> + true = ets:delete(?SUBMON, SubPid), + true = (SubId =:= undefined) + orelse ets:delete_object(?SUBID, {SubId, SubPid}), + emqx_broker:subscriber_down(SubPid); + [] -> ok + end. diff --git a/src/emqx_broker_sup.erl b/src/emqx_broker_sup.erl index f60a2a1b2..5b1c0a0e7 100644 --- a/src/emqx_broker_sup.erl +++ b/src/emqx_broker_sup.erl @@ -30,9 +30,9 @@ start_link() -> init([]) -> %% Broker pool PoolSize = emqx_vm:schedulers() * 2, - BrokerPool = emqx_pool_sup:spec(broker_pool, - [emqx_broker_pool, hash, PoolSize, + BrokerPool = emqx_pool_sup:spec([broker_pool, hash, PoolSize, {emqx_broker, start_link, []}]), + %% Shared subscription SharedSub = #{id => shared_sub, start => {emqx_shared_sub, start_link, []}, diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 0d2ecf5eb..62cb417de 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -20,74 +20,57 @@ -export([start_link/0]). --export([lookup_connection/1]). -export([register_connection/1, register_connection/2]). --export([unregister_connection/1]). --export([get_conn_attrs/1, set_conn_attrs/2]). --export([get_conn_stats/1, set_conn_stats/2]). +-export([unregister_connection/1, unregister_connection/2]). +-export([get_conn_attrs/1, get_conn_attrs/2]). +-export([set_conn_attrs/2, set_conn_attrs/3]). +-export([get_conn_stats/1, get_conn_stats/2]). +-export([set_conn_stats/2, set_conn_stats/3]). -export([lookup_conn_pid/1]). +%% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% internal export --export([update_conn_stats/0]). +-export([stats_fun/0]). -define(CM, ?MODULE). -%% ETS Tables. +%% ETS tables for connection management. -define(CONN_TAB, emqx_conn). -define(CONN_ATTRS_TAB, emqx_conn_attrs). -define(CONN_STATS_TAB, emqx_conn_stats). +-define(BATCH_SIZE, 10000). + %% @doc Start the connection manager. -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?CM}, ?MODULE, [], []). -%% @doc Lookup a connection. --spec(lookup_connection(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})). -lookup_connection(ClientId) when is_binary(ClientId) -> - ets:lookup(?CONN_TAB, ClientId). +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ %% @doc Register a connection. --spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok). +-spec(register_connection(emqx_types:client_id()) -> ok). register_connection(ClientId) when is_binary(ClientId) -> - register_connection({ClientId, self()}); + register_connection(ClientId, self()). -register_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) -> - true = ets:insert(?CONN_TAB, Conn), +-spec(register_connection(emqx_types:client_id(), pid()) -> ok). +register_connection(ClientId, ConnPid) when is_binary(ClientId), is_pid(ConnPid) -> + true = ets:insert(?CONN_TAB, {ClientId, ConnPid}), notify({registered, ClientId, ConnPid}). --spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}, list()) -> ok). -register_connection(ClientId, Attrs) when is_binary(ClientId) -> - register_connection({ClientId, self()}, Attrs); -register_connection(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_pid(ConnPid) -> - set_conn_attrs(Conn, Attrs), - register_connection(Conn). - -%% @doc Get conn attrs --spec(get_conn_attrs({emqx_types:client_id(), pid()}) -> list()). -get_conn_attrs(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) -> - try - ets:lookup_element(?CONN_ATTRS_TAB, Conn, 2) - catch - error:badarg -> [] - end. - -%% @doc Set conn attrs -set_conn_attrs(ClientId, Attrs) when is_binary(ClientId) -> - set_conn_attrs({ClientId, self()}, Attrs); -set_conn_attrs(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_pid(ConnPid) -> - ets:insert(?CONN_ATTRS_TAB, {Conn, Attrs}). - -%% @doc Unregister a conn. --spec(unregister_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok). +%% @doc Unregister a connection. +-spec(unregister_connection(emqx_types:client_id()) -> ok). unregister_connection(ClientId) when is_binary(ClientId) -> - unregister_connection({ClientId, self()}); + unregister_connection(ClientId, self()). -unregister_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) -> - do_unregister_connection(Conn), +-spec(unregister_connection(emqx_types:client_id(), pid()) -> ok). +unregister_connection(ClientId, ConnPid) when is_binary(ClientId), is_pid(ConnPid) -> + true = do_unregister_connection({ClientId, ConnPid}), notify({unregistered, ConnPid}). do_unregister_connection(Conn) -> @@ -95,30 +78,52 @@ do_unregister_connection(Conn) -> true = ets:delete(?CONN_ATTRS_TAB, Conn), true = ets:delete_object(?CONN_TAB, Conn). -%% @doc Lookup connection pid --spec(lookup_conn_pid(emqx_types:client_id()) -> pid() | undefined). -lookup_conn_pid(ClientId) when is_binary(ClientId) -> - case ets:lookup(?CONN_TAB, ClientId) of - [] -> undefined; - [{_, Pid}] -> Pid - end. +%% @doc Get conn attrs +-spec(get_conn_attrs(emqx_types:client_id()) -> list()). +get_conn_attrs(ClientId) when is_binary(ClientId) -> + ConnPid = lookup_conn_pid(ClientId), + get_conn_attrs(ClientId, ConnPid). + +-spec(get_conn_attrs(emqx_types:client_id(), pid()) -> list()). +get_conn_attrs(ClientId, ConnPid) when is_binary(ClientId) -> + emqx_tables:lookup_value(?CONN_ATTRS_TAB, {ClientId, ConnPid}, []). + +%% @doc Set conn attrs +-spec(set_conn_attrs(emqx_types:client_id(), list()) -> true). +set_conn_attrs(ClientId, Attrs) when is_binary(ClientId) -> + set_conn_attrs(ClientId, self(), Attrs). + +-spec(set_conn_attrs(emqx_types:client_id(), pid(), list()) -> true). +set_conn_attrs(ClientId, ConnPid, Attrs) when is_binary(ClientId), is_pid(ConnPid) -> + Conn = {ClientId, ConnPid}, + ets:insert(?CONN_ATTRS_TAB, {Conn, Attrs}). %% @doc Get conn stats --spec(get_conn_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())). -get_conn_stats(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) -> - try ets:lookup_element(?CONN_STATS_TAB, Conn, 2) - catch - error:badarg -> [] - end. +-spec(get_conn_stats(emqx_types:client_id()) -> list(emqx_stats:stats())). +get_conn_stats(ClientId) when is_binary(ClientId) -> + ConnPid = lookup_conn_pid(ClientId), + get_conn_stats(ClientId, ConnPid). + +-spec(get_conn_stats(emqx_types:client_id(), pid()) -> list(emqx_stats:stats())). +get_conn_stats(ClientId, ConnPid) when is_binary(ClientId) -> + Conn = {ClientId, ConnPid}, + emqx_tables:lookup_value(?CONN_STATS_TAB, Conn, []). %% @doc Set conn stats. --spec(set_conn_stats(emqx_types:client_id(), list(emqx_stats:stats())) -> boolean()). +-spec(set_conn_stats(emqx_types:client_id(), list(emqx_stats:stats())) -> true). set_conn_stats(ClientId, Stats) when is_binary(ClientId) -> - set_conn_stats({ClientId, self()}, Stats); + set_conn_stats(ClientId, self(), Stats). -set_conn_stats(Conn = {ClientId, ConnPid}, Stats) when is_binary(ClientId), is_pid(ConnPid) -> +-spec(set_conn_stats(emqx_types:client_id(), pid(), list(emqx_stats:stats())) -> true). +set_conn_stats(ClientId, ConnPid, Stats) when is_binary(ClientId), is_pid(ConnPid) -> + Conn = {ClientId, ConnPid}, ets:insert(?CONN_STATS_TAB, {Conn, Stats}). +%% @doc Lookup connection pid. +-spec(lookup_conn_pid(emqx_types:client_id()) -> pid() | undefined). +lookup_conn_pid(ClientId) when is_binary(ClientId) -> + emqx_tables:lookup_value(?CONN_TAB, ClientId). + notify(Msg) -> gen_server:cast(?CM, {notify, Msg}). @@ -131,7 +136,7 @@ init([]) -> ok = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]), ok = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts), ok = emqx_tables:new(?CONN_STATS_TAB, TabOpts), - ok = emqx_stats:update_interval(cm_stats, fun ?MODULE:update_conn_stats/0), + ok = emqx_stats:update_interval(conn_stats, fun ?MODULE:stats_fun/0), {ok, #{conn_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> @@ -148,26 +153,19 @@ handle_cast(Msg, State) -> emqx_logger:error("[CM] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, ConnPid, _Reason}, State = #{conn_pmon := PMon}) -> - case emqx_pmon:find(ConnPid, PMon) of - undefined -> - {noreply, State}; - ClientId -> - Conn = {ClientId, ConnPid}, - case ets:member(?CONN_ATTRS_TAB, Conn) of - true -> - ok = emqx_pool:async_submit(fun do_unregister_connection/1, [Conn]); - false -> ok - end, - {noreply, State#{conn_pmon := emqx_pmon:erase(ConnPid, PMon)}} - end; +handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) -> + ConnPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], + {Items, PMon1} = emqx_pmon:erase_all(ConnPids, PMon), + ok = emqx_pool:async_submit( + fun lists:foreach/2, [fun clean_down/1, Items]), + {noreply, State#{conn_pmon := PMon1}}; handle_info(Info, State) -> emqx_logger:error("[CM] unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> - emqx_stats:cancel_update(cm_stats). + emqx_stats:cancel_update(conn_stats). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -176,7 +174,16 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -update_conn_stats() -> +clean_down({Pid, ClientId}) -> + Conn = {ClientId, Pid}, + case ets:member(?CONN_TAB, ClientId) + orelse ets:member(?CONN_ATTRS_TAB, Conn) of + true -> + do_unregister_connection(Conn); + false -> false + end. + +stats_fun() -> case ets:info(?CONN_TAB, size) of undefined -> ok; Size -> emqx_stats:setstat('connections/count', 'connections/max', Size) diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index 000e79336..19dd9cb50 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -25,17 +25,17 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Banned = #{id => banned, - start => {emqx_banned, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_banned]}, - Manager = #{id => manager, - start => {emqx_cm, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_cm]}, + Banned = #{id => banned, + start => {emqx_banned, start_link, []}, + restart => permanent, + shutdown => 1000, + type => worker, + modules => [emqx_banned]}, + Manager = #{id => manager, + start => {emqx_cm, start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => [emqx_cm]}, {ok, {{one_for_one, 10, 100}, [Banned, Manager]}}. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 3d0ab0df7..e1d4011fc 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -38,7 +38,7 @@ peername, sockname, conn_state, - await_recv, + active_n, proto_state, parser_state, keepalive, @@ -51,6 +51,7 @@ idle_timeout }). +-define(DEFAULT_ACTIVE_N, 100). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). start_link(Transport, Socket, Options) -> @@ -69,7 +70,7 @@ info(#state{transport = Transport, peername = Peername, sockname = Sockname, conn_state = ConnState, - await_recv = AwaitRecv, + active_n = ActiveN, rate_limit = RateLimit, publish_limit = PubLimit, proto_state = ProtoState}) -> @@ -77,7 +78,7 @@ info(#state{transport = Transport, {peername, Peername}, {sockname, Sockname}, {conn_state, ConnState}, - {await_recv, AwaitRecv}, + {active_n, ActiveN}, {rate_limit, esockd_rate_limit:info(RateLimit)}, {publish_limit, esockd_rate_limit:info(PubLimit)}], ProtoInfo = emqx_protocol:info(ProtoState), @@ -87,8 +88,8 @@ info(#state{transport = Transport, attrs(CPid) when is_pid(CPid) -> call(CPid, attrs); -attrs(#state{peername = Peername, - sockname = Sockname, +attrs(#state{peername = Peername, + sockname = Sockname, proto_state = ProtoState}) -> SockAttrs = [{peername, Peername}, {sockname, Sockname}], @@ -129,6 +130,7 @@ init([Transport, RawSocket, Options]) -> Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), RateLimit = init_limiter(proplists:get_value(rate_limit, Options)), PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)), + ActiveN = proplists:get_value(active_n, Options, ?DEFAULT_ACTIVE_N), EnableStats = emqx_zone:get_env(Zone, enable_stats, true), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), SendFun = send_fun(Transport, Socket), @@ -140,8 +142,8 @@ init([Transport, RawSocket, Options]) -> State = run_socket(#state{transport = Transport, socket = Socket, peername = Peername, - await_recv = false, conn_state = running, + active_n = ActiveN, rate_limit = RateLimit, publish_limit = PubLimit, proto_state = ProtoState, @@ -243,19 +245,26 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); -handle_info(activate_sock, State) -> - {noreply, run_socket(State#state{conn_state = running, limit_timer = undefined})}; - -handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> +handle_info({tcp, _Sock, Data}, State) -> ?LOG(debug, "RECV ~p", [Data]), Size = iolist_size(Data), emqx_metrics:trans(inc, 'bytes/received', Size), Incoming = #{bytes => Size, packets => 0}, - handle_packet(Data, State#state{await_recv = false, incoming = Incoming}); + handle_packet(Data, State#state{incoming = Incoming}); -handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> +%% Rate limit here, cool:) +handle_info({tcp_passive, _Sock}, State) -> + {noreply, ensure_rate_limit(State)}; + +handle_info({tcp_error, _Sock, Reason}, State) -> shutdown(Reason, State); +handle_info({tcp_closed, _Sock}, State) -> + shutdown(closed, State); + +handle_info(activate_sock, State) -> + {noreply, run_socket(State#state{conn_state = running, limit_timer = undefined})}; + handle_info({inet_reply, _Sock, ok}, State) -> {noreply, State}; @@ -314,16 +323,17 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ %% Receive and parse data -handle_packet(<<>>, State0) -> - State = ensure_stats_timer(ensure_rate_limit(State0)), - ok = maybe_gc(State, incoming), - {noreply, State}; +handle_packet(<<>>, State) -> + NState = ensure_stats_timer(State), + ok = maybe_gc(NState, incoming), + {noreply, NState}; + handle_packet(Data, State = #state{proto_state = ProtoState, parser_state = ParserState, idle_timeout = IdleTimeout}) -> case catch emqx_frame:parse(Data, ParserState) of {more, NewParserState} -> - {noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout}; + {noreply, State#state{parser_state = NewParserState}, IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of @@ -352,6 +362,7 @@ reset_parser(State = #state{proto_state = ProtoState}) -> inc_publish_cnt(Type, State = #state{incoming = Incoming = #{packets := Cnt}}) when Type == ?PUBLISH; Type == ?SUBSCRIBE -> State#state{incoming = Incoming#{packets := Cnt + 1}}; + inc_publish_cnt(_Type, State) -> State. @@ -379,11 +390,11 @@ ensure_rate_limit([{Rl, Pos, Num}|Limiters], State) -> run_socket(State = #state{conn_state = blocked}) -> State; -run_socket(State = #state{await_recv = true}) -> - State; -run_socket(State = #state{transport = Transport, socket = Socket}) -> - Transport:async_recv(Socket, 0, infinity), - State#state{await_recv = true}. +run_socket(State = #state{transport = Transport, + socket = Socket, + active_n = ActiveN}) -> + Transport:setopts(Socket, [{active, ActiveN}]), + State. %%------------------------------------------------------------------------------ %% Ensure stats timer diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index cf4a555ca..38bc1c2b0 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -19,6 +19,8 @@ -export([init_proc_mng_policy/1, conn_proc_mng_policy/1]). +-export([drain_down/1]). + %% @doc Merge options -spec(merge_opts(list(), list()) -> list()). merge_opts(Defaults, Options) -> @@ -108,3 +110,19 @@ is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED. proc_info(Key) -> {Key, Value} = erlang:process_info(self(), Key), Value. + +-spec(drain_down(pos_integer()) -> list(pid())). +drain_down(Cnt) when Cnt > 0 -> + drain_down(Cnt, []). + +drain_down(0, Acc) -> + lists:reverse(Acc); + +drain_down(Cnt, Acc) -> + receive + {'DOWN', _MRef, process, Pid, _Reason} -> + drain_down(Cnt - 1, [Pid|Acc]) + after 0 -> + lists:reverse(Acc) + end. + diff --git a/src/emqx_pmon.erl b/src/emqx_pmon.erl index 9b874041e..a00212ed5 100644 --- a/src/emqx_pmon.erl +++ b/src/emqx_pmon.erl @@ -14,24 +14,27 @@ -module(emqx_pmon). +-compile({no_auto_import, [monitor/3]}). + -export([new/0]). -export([monitor/2, monitor/3]). -export([demonitor/2]). -export([find/2]). --export([erase/2]). - --compile({no_auto_import,[monitor/3]}). +-export([erase/2, erase_all/2]). +-export([count/1]). -type(pmon() :: {?MODULE, map()}). -export_type([pmon/0]). -spec(new() -> pmon()). -new() -> {?MODULE, maps:new()}. +new() -> + {?MODULE, maps:new()}. -spec(monitor(pid(), pmon()) -> pmon()). monitor(Pid, PM) -> - monitor(Pid, undefined, PM). + ?MODULE:monitor(Pid, undefined, PM). +-spec(monitor(pid(), term(), pmon()) -> pmon()). monitor(Pid, Val, {?MODULE, PM}) -> {?MODULE, case maps:is_key(Pid, PM) of true -> PM; @@ -43,21 +46,36 @@ monitor(Pid, Val, {?MODULE, PM}) -> demonitor(Pid, {?MODULE, PM}) -> {?MODULE, case maps:find(Pid, PM) of {ok, {Ref, _Val}} -> - %% Don't flush - _ = erlang:demonitor(Ref), + %% flush + _ = erlang:demonitor(Ref, [flush]), maps:remove(Pid, PM); error -> PM end}. --spec(find(pid(), pmon()) -> undefined | term()). +-spec(find(pid(), pmon()) -> error | {ok, term()}). find(Pid, {?MODULE, PM}) -> case maps:find(Pid, PM) of {ok, {_Ref, Val}} -> - Val; - error -> undefined + {ok, Val}; + error -> error end. -spec(erase(pid(), pmon()) -> pmon()). erase(Pid, {?MODULE, PM}) -> {?MODULE, maps:remove(Pid, PM)}. +-spec(erase_all([pid()], pmon()) -> {[{pid(), term()}], pmon()}). +erase_all(Pids, PMon0) -> + lists:foldl( + fun(Pid, {Acc, PMon}) -> + case find(Pid, PMon) of + {ok, Val} -> + {[{Pid, Val}|Acc], erase(Pid, PMon)}; + error -> {Acc, PMon} + end + end, {[], PMon0}, Pids). + +-spec(count(pmon()) -> non_neg_integer()). +count({?MODULE, PM}) -> + maps:size(PM). + diff --git a/src/emqx_pool_sup.erl b/src/emqx_pool_sup.erl index e11fa01f2..eb81233f9 100644 --- a/src/emqx_pool_sup.erl +++ b/src/emqx_pool_sup.erl @@ -39,8 +39,6 @@ start_link(Pool, Type, MFA) -> -spec(start_link(atom() | tuple(), atom(), pos_integer(), mfa()) -> {ok, pid()} | {error, term()}). -start_link(Pool, Type, Size, MFA) when is_atom(Pool) -> - supervisor:start_link({local, Pool}, ?MODULE, [Pool, Type, Size, MFA]); start_link(Pool, Type, Size, MFA) -> supervisor:start_link(?MODULE, [Pool, Type, Size, MFA]). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 3052191a1..dad40e190 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -320,7 +320,8 @@ process_packet(?CONNECT_PACKET( case try_open_session(PState3) of {ok, SPid, SP} -> PState4 = PState3#pstate{session = SPid, connected = true}, - ok = emqx_cm:register_connection(client_id(PState4), attrs(PState4)), + ok = emqx_cm:register_connection(client_id(PState4)), + true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)), %% Start keepalive start_keepalive(Keepalive, PState4), %% Success @@ -497,18 +498,18 @@ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId), puback(?QOS_0, _PacketId, _Result, PState) -> {ok, PState}; +puback(?QOS_1, PacketId, [], PState) -> + deliver({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); +puback(?QOS_1, PacketId, [_|_], PState) -> %%TODO: check the dispatch? + deliver({puback, PacketId, ?RC_SUCCESS}, PState); puback(?QOS_1, PacketId, {error, ReasonCode}, PState) -> deliver({puback, PacketId, ReasonCode}, PState); -puback(?QOS_1, PacketId, {ok, []}, PState) -> - deliver({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); -puback(?QOS_1, PacketId, {ok, _}, PState) -> - deliver({puback, PacketId, ?RC_SUCCESS}, PState); -puback(?QOS_2, PacketId, {error, ReasonCode}, PState) -> - deliver({pubrec, PacketId, ReasonCode}, PState); -puback(?QOS_2, PacketId, {ok, []}, PState) -> +puback(?QOS_2, PacketId, [], PState) -> deliver({pubrec, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); -puback(?QOS_2, PacketId, {ok, _}, PState) -> - deliver({pubrec, PacketId, ?RC_SUCCESS}, PState). +puback(?QOS_2, PacketId, [_|_], PState) -> %%TODO: check the dispatch? + deliver({pubrec, PacketId, ?RC_SUCCESS}, PState); +puback(?QOS_2, PacketId, {error, ReasonCode}, PState) -> + deliver({pubrec, PacketId, ReasonCode}, PState). %%------------------------------------------------------------------------------ %% Deliver Packet -> Client diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 0951c82d6..0463526d6 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -148,7 +148,7 @@ call(Router, Msg) -> gen_server:call(Router, Msg, infinity). pick(Topic) -> - gproc_pool:pick_worker(emqx_router_pool, Topic). + gproc_pool:pick_worker(router_pool, Topic). %%------------------------------------------------------------------------------ %% gen_server callbacks diff --git a/src/emqx_router_sup.erl b/src/emqx_router_sup.erl index c0317fc78..945d7910f 100644 --- a/src/emqx_router_sup.erl +++ b/src/emqx_router_sup.erl @@ -17,6 +17,7 @@ -behaviour(supervisor). -export([start_link/0]). + -export([init/1]). start_link() -> @@ -32,8 +33,7 @@ init([]) -> modules => [emqx_router_helper]}, %% Router pool - RouterPool = emqx_pool_sup:spec(router_pool, - [emqx_router_pool, hash, emqx_vm:schedulers(), + RouterPool = emqx_pool_sup:spec([router_pool, hash, {emqx_router, start_link, []}]), {ok, {{one_for_all, 0, 1}, [Helper, RouterPool]}}. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index f0190dc14..f41a25fc5 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -259,7 +259,7 @@ subscribe(SPid, PacketId, Properties, TopicFilters) -> %% @doc Called by connection processes when publishing messages -spec(publish(spid(), emqx_mqtt_types:packet_id(), emqx_types:message()) - -> {ok, emqx_types:deliver_results()}). + -> emqx_types:deliver_results() | {error, term()}). publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 message directly emqx_broker:publish(Msg); @@ -370,7 +370,8 @@ init([Parent, #{zone := Zone, topic_alias_maximum = TopicAliasMaximum, will_msg = WillMsg }, - ok = emqx_sm:register_session(ClientId, attrs(State)), + ok = emqx_sm:register_session(ClientId, self()), + true = emqx_sm:set_session_attrs(ClientId, attrs(State)), true = emqx_sm:set_session_stats(ClientId, stats(State)), emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index fd6a44231..281893fb0 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -21,12 +21,15 @@ -export([start_link/0]). -export([open_session/1, close_session/1]). --export([lookup_session/1, lookup_session_pid/1]). -export([resume_session/2]). -export([discard_session/1, discard_session/2]). --export([register_session/2, unregister_session/1]). --export([get_session_attrs/1, set_session_attrs/2]). --export([get_session_stats/1, set_session_stats/2]). +-export([register_session/1, register_session/2]). +-export([unregister_session/1, unregister_session/2]). +-export([get_session_attrs/1, get_session_attrs/2, + set_session_attrs/2, set_session_attrs/3]). +-export([get_session_stats/1, get_session_stats/2, + set_session_stats/2, set_session_stats/3]). +-export([lookup_session_pids/1]). %% Internal functions for rpc -export([dispatch/3]). @@ -46,6 +49,8 @@ -define(SESSION_ATTRS_TAB, emqx_session_attrs). -define(SESSION_STATS_TAB, emqx_session_stats). +-define(BATCH_SIZE, 10000). + -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?SM}, ?MODULE, [], []). @@ -62,8 +67,8 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) -> ResumeStart = fun(_) -> case resume_session(ClientId, SessAttrs) of - {ok, SPid} -> - {ok, SPid, true}; + {ok, SessPid} -> + {ok, SessPid, true}; {error, not_found} -> emqx_session:start_link(SessAttrs) end @@ -75,76 +80,68 @@ open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) -> discard_session(ClientId) when is_binary(ClientId) -> discard_session(ClientId, self()). +-spec(discard_session(emqx_types:client_id(), pid()) -> ok). discard_session(ClientId, ConnPid) when is_binary(ClientId) -> lists:foreach( - fun({_ClientId, SPid}) -> - case catch emqx_session:discard(SPid, ConnPid) of - {Err, Reason} when Err =:= 'EXIT'; Err =:= error -> - emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]); - ok -> ok - end - end, lookup_session(ClientId)). + fun(SessPid) -> + try emqx_session:discard(SessPid, ConnPid) + catch + _:Error:_Stk -> + emqx_logger:error("[SM] Failed to discard ~p: ~p", [SessPid, Error]) + end + end, lookup_session_pids(ClientId)). %% @doc Try to resume a session. -spec(resume_session(emqx_types:client_id(), map()) -> {ok, pid()} | {error, term()}). resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) -> - case lookup_session(ClientId) of + case lookup_session_pids(ClientId) of [] -> {error, not_found}; - [{_ClientId, SPid}] -> - ok = emqx_session:resume(SPid, SessAttrs), - {ok, SPid}; - Sessions -> - [{_, SPid}|StaleSessions] = lists:reverse(Sessions), - emqx_logger:error("[SM] More than one session found: ~p", [Sessions]), - lists:foreach(fun({_, StalePid}) -> + [SessPid] -> + ok = emqx_session:resume(SessPid, SessAttrs), + {ok, SessPid}; + SessPids -> + [SessPid|StalePids] = lists:reverse(SessPids), + emqx_logger:error("[SM] More than one session found: ~p", [SessPids]), + lists:foreach(fun(StalePid) -> catch emqx_session:discard(StalePid, ConnPid) - end, StaleSessions), - ok = emqx_session:resume(SPid, SessAttrs), - {ok, SPid} + end, StalePids), + ok = emqx_session:resume(SessPid, SessAttrs), + {ok, SessPid} end. %% @doc Close a session. --spec(close_session({emqx_types:client_id(), pid()} | pid()) -> ok). -close_session({_ClientId, SPid}) -> - emqx_session:close(SPid); -close_session(SPid) when is_pid(SPid) -> - emqx_session:close(SPid). +-spec(close_session(emqx_types:client_id() | pid()) -> ok). +close_session(ClientId) when is_binary(ClientId) -> + case lookup_session_pids(ClientId) of + [] -> ok; + [SessPid] -> close_session(SessPid); + SessPids -> lists:foreach(fun close_session/1, SessPids) + end; -%% @doc Register a session with attributes. --spec(register_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}, - list(emqx_session:attr())) -> ok). -register_session(ClientId, SessAttrs) when is_binary(ClientId) -> - register_session({ClientId, self()}, SessAttrs); +close_session(SessPid) when is_pid(SessPid) -> + emqx_session:close(SessPid). -register_session(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) -> +%% @doc Register a session. +-spec(register_session(emqx_types:client_id()) -> ok). +register_session(ClientId) when is_binary(ClientId) -> + register_session(ClientId, self()). + +-spec(register_session(emqx_types:client_id(), pid()) -> ok). +register_session(ClientId, SessPid) when is_binary(ClientId), is_pid(SessPid) -> + Session = {ClientId, SessPid}, true = ets:insert(?SESSION_TAB, Session), - true = ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}), - true = proplists:get_value(clean_start, SessAttrs, true) - orelse ets:insert(?SESSION_P_TAB, Session), ok = emqx_sm_registry:register_session(Session), - notify({registered, ClientId, SPid}). - -%% @doc Get session attrs --spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attr())). -get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> - emqx_tables:lookup_value(?SESSION_ATTRS_TAB, Session, []). - -%% @doc Set session attrs --spec(set_session_attrs(emqx_types:client_id() | {emqx_types:client_id(), pid()}, - list(emqx_session:attr())) -> true). -set_session_attrs(ClientId, SessAttrs) when is_binary(ClientId) -> - set_session_attrs({ClientId, self()}, SessAttrs); -set_session_attrs(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) -> - ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}). + notify({registered, ClientId, SessPid}). %% @doc Unregister a session --spec(unregister_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok). +-spec(unregister_session(emqx_types:client_id()) -> ok). unregister_session(ClientId) when is_binary(ClientId) -> - unregister_session({ClientId, self()}); + unregister_session(ClientId, self()). -unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> - ok = do_unregister_session(Session), - notify({unregistered, ClientId, SPid}). +-spec(unregister_session(emqx_types:client_id(), pid()) -> ok). +unregister_session(ClientId, SessPid) when is_binary(ClientId), is_pid(SessPid) -> + ok = do_unregister_session({ClientId, SessPid}), + notify({unregistered, SessPid}). %% @private do_unregister_session(Session) -> @@ -154,42 +151,69 @@ do_unregister_session(Session) -> true = ets:delete_object(?SESSION_TAB, Session), emqx_sm_registry:unregister_session(Session). +%% @doc Get session attrs +-spec(get_session_attrs(emqx_types:client_id()) -> list(emqx_session:attr())). +get_session_attrs(ClientId) when is_binary(ClientId) -> + case lookup_session_pids(ClientId) of + [] -> []; + [SessPid|_] -> get_session_attrs(ClientId, SessPid) + end. + +-spec(get_session_attrs(emqx_types:client_id(), pid()) -> list(emqx_session:attr())). +get_session_attrs(ClientId, SessPid) when is_binary(ClientId), is_pid(SessPid) -> + emqx_tables:lookup_value(?SESSION_ATTRS_TAB, {ClientId, SessPid}, []). + +%% @doc Set session attrs +-spec(set_session_attrs(emqx_types:client_id(), list(emqx_session:attr())) -> true). +set_session_attrs(ClientId, SessAttrs) when is_binary(ClientId) -> + set_session_attrs(ClientId, self(), SessAttrs). + +-spec(set_session_attrs(emqx_types:client_id(), pid(), list(emqx_session:attr())) -> true). +set_session_attrs(ClientId, SessPid, SessAttrs) when is_binary(ClientId), is_pid(SessPid) -> + Session = {ClientId, SessPid}, + true = ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}), + proplists:get_value(clean_start, SessAttrs, true) orelse ets:insert(?SESSION_P_TAB, Session). + %% @doc Get session stats --spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())). -get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> - emqx_tables:lookup_value(?SESSION_STATS_TAB, Session, []). +-spec(get_session_stats(emqx_types:client_id()) -> list(emqx_stats:stats())). +get_session_stats(ClientId) when is_binary(ClientId) -> + case lookup_session_pids(ClientId) of + [] -> []; + [SessPid|_] -> + get_session_stats(ClientId, SessPid) + end. + +-spec(get_session_stats(emqx_types:client_id(), pid()) -> list(emqx_stats:stats())). +get_session_stats(ClientId, SessPid) when is_binary(ClientId) -> + emqx_tables:lookup_value(?SESSION_STATS_TAB, {ClientId, SessPid}, []). %% @doc Set session stats --spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()}, - emqx_stats:stats()) -> true). +-spec(set_session_stats(emqx_types:client_id(), emqx_stats:stats()) -> true). set_session_stats(ClientId, Stats) when is_binary(ClientId) -> - set_session_stats({ClientId, self()}, Stats); -set_session_stats(Session = {ClientId, SPid}, Stats) when is_binary(ClientId), is_pid(SPid) -> - ets:insert(?SESSION_STATS_TAB, {Session, Stats}). + set_session_stats(ClientId, self(), Stats). -%% @doc Lookup a session from registry --spec(lookup_session(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})). -lookup_session(ClientId) -> +-spec(set_session_stats(emqx_types:client_id(), pid(), emqx_stats:stats()) -> true). +set_session_stats(ClientId, SessPid, Stats) when is_binary(ClientId), is_pid(SessPid) -> + ets:insert(?SESSION_STATS_TAB, {{ClientId, SessPid}, Stats}). + +%% @doc Lookup session pid. +-spec(lookup_session_pids(emqx_types:client_id()) -> list(pid())). +lookup_session_pids(ClientId) -> case emqx_sm_registry:is_enabled() of true -> emqx_sm_registry:lookup_session(ClientId); - false -> ets:lookup(?SESSION_TAB, ClientId) + false -> ets:lookup(?SESSION_TAB, ClientId, []) end. %% @doc Dispatch a message to the session. -spec(dispatch(emqx_types:client_id(), emqx_topic:topic(), emqx_types:message()) -> any()). dispatch(ClientId, Topic, Msg) -> - case lookup_session_pid(ClientId) of - Pid when is_pid(Pid) -> - Pid ! {dispatch, Topic, Msg}; - undefined -> + case lookup_session_pids(ClientId) of + [SessPid|_] when is_pid(SessPid) -> + SessPid ! {dispatch, Topic, Msg}; + [] -> emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg]) end. -%% @doc Lookup session pid. --spec(lookup_session_pid(emqx_types:client_id()) -> pid() | undefined). -lookup_session_pid(ClientId) -> - emqx_tables:lookup_value(?SESSION_TAB, ClientId). - notify(Event) -> gen_server:cast(?SM, {notify, Event}). @@ -203,43 +227,36 @@ init([]) -> ok = emqx_tables:new(?SESSION_P_TAB, TabOpts), ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts), ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts), - ok = emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0), + ok = emqx_stats:update_interval(sess_stats, fun ?MODULE:stats_fun/0), {ok, #{sess_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> emqx_logger:error("[SM] unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({notify, {registered, ClientId, SPid}}, State = #{sess_pmon := PMon}) -> - {noreply, State#{sess_pmon := emqx_pmon:monitor(SPid, ClientId, PMon)}}; +handle_cast({notify, {registered, ClientId, SessPid}}, State = #{sess_pmon := PMon}) -> + {noreply, State#{sess_pmon := emqx_pmon:monitor(SessPid, ClientId, PMon)}}; -handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #{sess_pmon := PMon}) -> - {noreply, State#{sess_pmon := emqx_pmon:demonitor(SPid, PMon)}}; +handle_cast({notify, {unregistered, SessPid}}, State = #{sess_pmon := PMon}) -> + {noreply, State#{sess_pmon := emqx_pmon:demonitor(SessPid, PMon)}}; handle_cast(Msg, State) -> emqx_logger:error("[SM] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{sess_pmon := PMon}) -> - case emqx_pmon:find(DownPid, PMon) of - undefined -> - {noreply, State}; - ClientId -> - Session = {ClientId, DownPid}, - case ets:member(?SESSION_ATTRS_TAB, Session) of - true -> - ok = emqx_pool:async_submit(fun do_unregister_session/1, [Session]); - false -> ok - end, - {noreply, State#{sess_pmon := emqx_pmon:erase(DownPid, PMon)}} - end; +handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{sess_pmon := PMon}) -> + SessPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], + {Items, PMon1} = emqx_pmon:erase_all(SessPids, PMon), + ok = emqx_pool:async_submit( + fun lists:foreach/2, [fun clean_down/1, Items]), + {noreply, State#{sess_pmon := PMon1}}; handle_info(Info, State) -> emqx_logger:error("[SM] unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> - emqx_stats:cancel_update(sm_stats). + emqx_stats:cancel_update(sess_stats). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -248,6 +265,15 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ +clean_down({SessPid, ClientId}) -> + Session = {ClientId, SessPid}, + case ets:member(?SESSION_TAB, ClientId) + orelse ets:member(?SESSION_ATTRS_TAB, Session) of + true -> + do_unregister_session(Session); + false -> ok + end. + stats_fun() -> safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'), safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max'). diff --git a/src/emqx_sm_registry.erl b/src/emqx_sm_registry.erl index 1d0df61bf..a7f44d771 100644 --- a/src/emqx_sm_registry.erl +++ b/src/emqx_sm_registry.erl @@ -43,10 +43,9 @@ start_link() -> is_enabled() -> emqx_config:get_env(enable_session_registry, true). --spec(lookup_session(emqx_types:client_id()) - -> list({emqx_types:client_id(), session_pid()})). +-spec(lookup_session(emqx_types:client_id()) -> list(session_pid())). lookup_session(ClientId) -> - [{ClientId, SessPid} || #global_session{pid = SessPid} <- mnesia:dirty_read(?TAB, ClientId)]. + [SessPid || #global_session{pid = SessPid} <- mnesia:dirty_read(?TAB, ClientId)]. -spec(register_session({emqx_types:client_id(), session_pid()}) -> ok). register_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) -> diff --git a/test/emqx_banned_SUITE.erl b/test/emqx_banned_SUITE.erl index 9d4c85134..7e7434e61 100644 --- a/test/emqx_banned_SUITE.erl +++ b/test/emqx_banned_SUITE.erl @@ -18,9 +18,7 @@ -compile(nowarn_export_all). -include("emqx.hrl"). - -include("emqx_mqtt.hrl"). - -include_lib("eunit/include/eunit.hrl"). all() -> [t_banned_all]. @@ -29,18 +27,27 @@ t_banned_all(_) -> emqx_ct_broker_helpers:run_setup_steps(), emqx_banned:start_link(), TimeNow = erlang:system_time(second), - Banned = #banned{who = {client_id, <<"TestClient">>}, + Banned = #banned{who = {client_id, <<"TestClient">>}, reason = <<"test">>, - by = <<"banned suite">>, - desc = <<"test">>, - until = TimeNow + 1}, + by = <<"banned suite">>, + desc = <<"test">>, + until = TimeNow + 1}, ok = emqx_banned:add(Banned), % here is not expire banned test because its check interval is greater than 5 mins, but its effect has been confirmed - ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), + ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, + username => undefined, + peername => {undefined, undefined}})), timer:sleep(2500), - ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), + ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, + username => undefined, + peername => {undefined, undefined}})), ok = emqx_banned:add(Banned), - ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), - emqx_banned:del({client_id, <<"TestClient">>}), - ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), + ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, + username => undefined, + peername => {undefined, undefined}})), + emqx_banned:delete({client_id, <<"TestClient">>}), + ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, + username => undefined, + peername => {undefined, undefined}})), emqx_ct_broker_helpers:run_teardown_steps(). + diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index 5e29e075e..b720849f6 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -24,14 +24,16 @@ all() -> [t_register_unregister_connection]. t_register_unregister_connection(_) -> {ok, _} = emqx_cm_sup:start_link(), Pid = self(), - emqx_cm:register_connection(<<"conn1">>), - emqx_cm:register_connection({<<"conn2">>, Pid}, [{port, 8080}, {ip, "192.168.0.1"}]), + ok = emqx_cm:register_connection(<<"conn1">>), + ok emqx_cm:register_connection(<<"conn2">>, Pid), + true = emqx_cm:set_conn_attrs(<<"conn1">>, [{port, 8080}, {ip, "192.168.0.1"}]), + true = emqx_cm:set_conn_attrs(<<"conn2">>, Pid, [{port, 8080}, {ip, "192.168.0.1"}]), timer:sleep(2000), - [{<<"conn1">>, Pid}] = emqx_cm:lookup_connection(<<"conn1">>), - [{<<"conn2">>, Pid}] = emqx_cm:lookup_connection(<<"conn2">>), - Pid = emqx_cm:lookup_conn_pid(<<"conn1">>), - emqx_cm:unregister_connection(<<"conn1">>), - [] = emqx_cm:lookup_connection(<<"conn1">>), - [{port, 8080}, {ip, "192.168.0.1"}] = emqx_cm:get_conn_attrs({<<"conn2">>, Pid}), - emqx_cm:set_conn_stats(<<"conn2">>, [[{count, 1}, {max, 2}]]), - [[{count, 1}, {max, 2}]] = emqx_cm:get_conn_stats({<<"conn2">>, Pid}). + ?assertEqual(Pid, emqx_cm:lookup_conn_pid(<<"conn1">>)), + ?assertEqual(Pid, emqx_cm:lookup_conn_pid(<<"conn2">>)), + ok = emqx_cm:unregister_connection(<<"conn1">>), + ?assertEqual(undefined, emqx_cm:lookup_conn_pid(<<"conn1">>)), + ?assertEqual([{port, 8080}, {ip, "192.168.0.1"}], emqx_cm:get_conn_attrs({<<"conn2">>, Pid})), + true = emqx_cm:set_conn_stats(<<"conn2">>, [{count, 1}, {max, 2}]), + ?assertEqual([{count, 1}, {max, 2}], emqx_cm:get_conn_stats({<<"conn2">>, Pid})). + diff --git a/test/emqx_pmon_SUITE.erl b/test/emqx_pmon_SUITE.erl new file mode 100644 index 000000000..67e8cf4d7 --- /dev/null +++ b/test/emqx_pmon_SUITE.erl @@ -0,0 +1,48 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_pmon_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [t_monitor, t_find, t_erase]. + +t_monitor(_) -> + PMon = emqx_pmon:new(), + PMon1 = emqx_pmon:monitor(self(), PMon), + ?assertEqual(1, emqx_pmon:count(PMon1)), + PMon2 = emqx_pmon:demonitor(self(), PMon1), + ?assertEqual(0, emqx_pmon:count(PMon2)). + +t_find(_) -> + PMon = emqx_pmon:new(), + PMon1 = emqx_pmon:monitor(self(), val, PMon), + ?assertEqual(1, emqx_pmon:count(PMon1)), + ?assertEqual({ok, val}, emqx_pmon:find(self(), PMon1)), + PMon2 = emqx_pmon:erase(self(), PMon1), + ?assertEqual(error, emqx_pmon:find(self(), PMon2)). + +t_erase(_) -> + PMon = emqx_pmon:new(), + PMon1 = emqx_pmon:monitor(self(), val, PMon), + PMon2 = emqx_pmon:erase(self(), PMon1), + ?assertEqual(0, emqx_pmon:count(PMon2)), + {Items, PMon3} = emqx_pmon:erase_all([self()], PMon1), + ?assertEqual([{self(), val}], Items), + ?assertEqual(0, emqx_pmon:count(PMon3)). + diff --git a/test/emqx_sequence_SUITE.erl b/test/emqx_sequence_SUITE.erl index 1ac0ea308..ab408b8e0 100644 --- a/test/emqx_sequence_SUITE.erl +++ b/test/emqx_sequence_SUITE.erl @@ -34,5 +34,5 @@ sequence_generate(_) -> ?assertEqual(0, reclaim(seqtab, key)), ?assertEqual(false, ets:member(seqtab, key)), ?assertEqual(1, nextval(seqtab, key)), - ?assert(emqx_sequence:delete(seqtab). + ?assert(emqx_sequence:delete(seqtab)). diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 3aed3090e..008d4b6e6 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -34,16 +34,14 @@ t_open_close_session(_) -> topic_alias_maximum => 0, will_msg => undefined}, {ok, SPid} = emqx_sm:open_session(Attrs), - [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>), - SPid = emqx_sm:lookup_session_pid(<<"client">>), + ?assertEqual([SPid], emqx_sm:lookup_session_pids(<<"client">>)), {ok, NewConnPid} = emqx_mock_client:start_link(<<"client">>), {ok, SPid, true} = emqx_sm:open_session(Attrs#{clean_start => false, conn_pid => NewConnPid}), - [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>), - SAttrs = emqx_sm:get_session_attrs({<<"client">>, SPid}), - <<"client">> = proplists:get_value(client_id, SAttrs), - Session = {<<"client">>, SPid}, - emqx_sm:set_session_stats(Session, {open, true}), - {open, true} = emqx_sm:get_session_stats(Session), + ?assertEqual([SPid], emqx_sm:lookup_session_pids(<<"client">>)), + SAttrs = emqx_sm:get_session_attrs(<<"client">>, SPid), + ?assertEqual(<<"client">>, proplists:get_value(client_id, SAttrs)), + emqx_sm:set_session_stats(<<"client">>, SPid, [{inflight, 10}]), + ?assertEqual([{inflight, 10}], emqx_sm:get_session_stats(<<"client">>, SPid)), ok = emqx_sm:close_session(SPid), - [] = emqx_sm:lookup_session(<<"client">>), + ?assertEqual([], emqx_sm:lookup_session_pids(<<"client">>)), emqx_ct_broker_helpers:run_teardown_steps().