From bfb23ff0b26192ff3fcffdca3637aea44b9db23c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 8 Apr 2018 15:16:05 +0800 Subject: [PATCH] Improve the design of MQTT session management --- include/emqx.hrl | 4 +- src/emqx_acl_mod.erl | 2 +- src/emqx_alarm.erl | 2 +- src/emqx_auth_mod.erl | 2 +- src/emqx_banned.erl | 2 +- src/emqx_base62.erl | 2 +- src/emqx_boot.erl | 2 +- src/emqx_bridge.erl | 2 +- src/emqx_broker.erl | 6 +- src/emqx_broker_sup.erl | 6 +- src/emqx_config.erl | 5 + src/emqx_locker.erl | 41 ---- src/emqx_pmon.erl | 26 ++- src/emqx_router_helper.erl | 2 +- src/emqx_router_sup.erl | 2 +- src/emqx_serializer.erl | 2 +- src/emqx_session.erl | 198 +++++++++--------- ..._shared_pubsub.erl => emqx_shared_sub.erl} | 38 ++-- src/emqx_sm.erl | 190 +++++++++-------- src/emqx_sm_registry.erl | 109 ++++++++++ src/emqx_sm_stats.erl | 72 +++++++ src/emqx_sm_sup.erl | 19 +- src/emqx_stats.erl | 19 +- src/emqx_sup.erl | 1 - 24 files changed, 446 insertions(+), 308 deletions(-) delete mode 100644 src/emqx_locker.erl rename src/{emqx_shared_pubsub.erl => emqx_shared_sub.erl} (88%) create mode 100644 src/emqx_sm_registry.erl create mode 100644 src/emqx_sm_stats.erl diff --git a/include/emqx.hrl b/include/emqx.hrl index 0639ae19d..a0b96c1c7 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -96,8 +96,8 @@ -type(client() :: #client{}). -record(session, - { client_id :: client_id(), - pid :: pid() + { sid :: client_id(), + pid :: pid() }). -type(session() :: #session{}). diff --git a/src/emqx_acl_mod.erl b/src/emqx_acl_mod.erl index e01c0ed95..7c0866e61 100644 --- a/src/emqx_acl_mod.erl +++ b/src/emqx_acl_mod.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index 83e957047..0f639af3b 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_auth_mod.erl b/src/emqx_auth_mod.erl index 2cf222d65..1a2fd72c9 100644 --- a/src/emqx_auth_mod.erl +++ b/src/emqx_auth_mod.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 66cdf87b5..85cfd8b6c 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_base62.erl b/src/emqx_base62.erl index a997912eb..3d0d01969 100644 --- a/src/emqx_base62.erl +++ b/src/emqx_base62.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_boot.erl b/src/emqx_boot.erl index 26348b271..dadc4c4cd 100644 --- a/src/emqx_boot.erl +++ b/src/emqx_boot.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 5c20538f3..bdd077106 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 067e2c6da..73b557eac 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -119,7 +119,7 @@ route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) -> forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]}); route([{To, Group}], Delivery) when is_binary(Group) -> - emqx_shared_pubsub:dispatch(Group, To, Delivery); + emqx_shared_sub:dispatch(Group, To, Delivery); route(Routes, Delivery) -> lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes). @@ -248,7 +248,7 @@ handle_cast({From, {subscribe, Topic, Subscriber, Options}}, State) -> [] -> Group = proplists:get_value(share, Options), true = do_subscribe(Group, Topic, Subscriber, Options), - emqx_shared_pubsub:subscribe(Group, Topic, subpid(Subscriber)), + emqx_shared_sub:subscribe(Group, Topic, subpid(Subscriber)), emqx_router:add_route(From, Topic, dest(Options)), {noreply, monitor_subscriber(Subscriber, State)}; [_] -> @@ -261,7 +261,7 @@ handle_cast({From, {unsubscribe, Topic, Subscriber}}, State) -> [{_, Options}] -> Group = proplists:get_value(share, Options), true = do_unsubscribe(Group, Topic, Subscriber), - emqx_shared_pubsub:unsubscribe(Group, Topic, subpid(Subscriber)), + emqx_shared_sub:unsubscribe(Group, Topic, subpid(Subscriber)), case ets:member(subscriber, Topic) of false -> emqx_router:del_route(From, Topic, dest(Options)); true -> gen_server:reply(From, ok) diff --git a/src/emqx_broker_sup.erl b/src/emqx_broker_sup.erl index 96c004b49..d962526eb 100644 --- a/src/emqx_broker_sup.erl +++ b/src/emqx_broker_sup.erl @@ -35,9 +35,9 @@ init([]) -> %% Create the pubsub tables create_tabs(), - %% Shared pubsub - Shared = {shared_pubsub, {emqx_shared_pubsub, start_link, []}, - permanent, 5000, worker, [emqx_shared_pubsub]}, + %% Shared subscription + Shared = {shared_sub, {emqx_shared_sub, start_link, []}, + permanent, 5000, worker, [emqx_shared_sub]}, %% Broker helper Helper = {broker_helper, {emqx_broker_helper, start_link, [stats_fun()]}, diff --git a/src/emqx_config.erl b/src/emqx_config.erl index 15a3a014a..954539e16 100644 --- a/src/emqx_config.erl +++ b/src/emqx_config.erl @@ -29,6 +29,11 @@ -type(env() :: {atom(), term()}). +-define(APP, emqx). + +get_env(Key) -> + application:get_env(?APP, Key). + %% @doc Read the configuration of an application. -spec(read(atom()) -> {ok, list(env())} | {error, term()}). read(App) -> diff --git a/src/emqx_locker.erl b/src/emqx_locker.erl deleted file mode 100644 index 196a3aa69..000000000 --- a/src/emqx_locker.erl +++ /dev/null @@ -1,41 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_locker). - --export([start_link/0]). - -%% Lock/Unlock API based on canal-lock. --export([lock/1, unlock/1]). - -%% @doc Starts the lock server --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). -start_link() -> - canal_lock:start_link(?MODULE, 1). - -%% @doc Lock a Key --spec(lock(binary()) -> boolean()). -lock(Key) -> - case canal_lock:acquire(?MODULE, Key, 1, 1) of - {acquired, 1} -> true; - full -> false - end. - -%% @doc Unlock a Key --spec(unlock(binary()) -> ok). -unlock(Key) -> - canal_lock:release(?MODULE, Key, 1, 1). - diff --git a/src/emqx_pmon.erl b/src/emqx_pmon.erl index cd66414fc..774b3d8ec 100644 --- a/src/emqx_pmon.erl +++ b/src/emqx_pmon.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,7 +16,9 @@ -module(emqx_pmon). --export([new/0, monitor/2, demonitor/2, erase/2]). +-export([new/0, monitor/2, monitor/3, demonitor/2, find/2, erase/2]). + +-compile({no_auto_import,[monitor/3]}). -type(pmon() :: {?MODULE, map()}). @@ -26,25 +28,35 @@ new() -> {?MODULE, [maps:new()]}. -spec(monitor(pid(), pmon()) -> pmon()). -monitor(Pid, PM = {?MODULE, [M]}) -> +monitor(Pid, PM) -> + monitor(Pid, undefined, PM). + +monitor(Pid, Val, PM = {?MODULE, [M]}) -> case maps:is_key(Pid, M) of - true -> - PM; + true -> PM; false -> Ref = erlang:monitor(process, Pid), - {?MODULE, [maps:put(Pid, Ref, M)]} + {?MODULE, [maps:put(Pid, {Ref, Val}, M)]} end. -spec(demonitor(pid(), pmon()) -> pmon()). demonitor(Pid, PM = {?MODULE, [M]}) -> case maps:find(Pid, M) of - {ok, Ref} -> + {ok, {Ref, _Val}} -> erlang:demonitor(Ref, [flush]), {?MODULE, [maps:remove(Pid, M)]}; error -> PM end. +-spec(find(pid(), pmon()) -> undefined | term()). +find(Pid, {?MODULE, [M]}) -> + case maps:find(Pid, M) of + {ok, {_Ref, Val}} -> + Val; + error -> undefined + end. + -spec(erase(pid(), pmon()) -> pmon()). erase(Pid, {?MODULE, [M]}) -> {?MODULE, [maps:remove(Pid, M)]}. diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index 97ba1811f..f8c41efc0 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_router_sup.erl b/src/emqx_router_sup.erl index bb3c59e3e..2fa5c21e1 100644 --- a/src/emqx_router_sup.erl +++ b/src/emqx_router_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_serializer.erl b/src/emqx_serializer.erl index eb1cec4db..82c888665 100644 --- a/src/emqx_serializer.erl +++ b/src/emqx_serializer.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d1b691d32..19147c242 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -67,82 +67,81 @@ %% If the session is currently disconnected, the time at which the Session state %% will be deleted. -record(state, - { - %% Clean Start Flag - clean_start = false :: boolean(), + { %% Clean Start Flag + clean_start = false :: boolean(), - %% Client Binding: local | remote - binding = local :: local | remote, + %% Client Binding: local | remote + binding = local :: local | remote, - %% ClientId: Identifier of Session - client_id :: binary(), + %% ClientId: Identifier of Session + client_id :: binary(), - %% Username - username :: binary() | undefined, + %% Username + username :: binary() | undefined, - %% Client Pid binding with session - client_pid :: pid(), + %% Client Pid binding with session + client_pid :: pid(), - %% Old Client Pid that has been kickout - old_client_pid :: pid(), + %% Old Client Pid that has been kickout + old_client_pid :: pid(), - %% Next message id of the session - next_msg_id = 1 :: mqtt_packet_id(), + %% Next message id of the session + next_msg_id = 1 :: mqtt_packet_id(), - max_subscriptions :: non_neg_integer(), + max_subscriptions :: non_neg_integer(), - %% Client’s subscriptions. - subscriptions :: map(), + %% Client’s subscriptions. + subscriptions :: map(), - %% Upgrade Qos? - upgrade_qos = false :: boolean(), + %% Upgrade Qos? + upgrade_qos = false :: boolean(), - %% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked. - inflight :: emqx_inflight:inflight(), + %% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked. + inflight :: emqx_inflight:inflight(), - %% Max Inflight Size - max_inflight = 32 :: non_neg_integer(), + %% Max Inflight Size + max_inflight = 32 :: non_neg_integer(), - %% Retry interval for redelivering QoS1/2 messages - retry_interval = 20000 :: timeout(), + %% Retry interval for redelivering QoS1/2 messages + retry_interval = 20000 :: timeout(), - %% Retry Timer - retry_timer :: reference() | undefined, + %% Retry Timer + retry_timer :: reference() | undefined, - %% All QoS1, QoS2 messages published to when client is disconnected. - %% QoS 1 and QoS 2 messages pending transmission to the Client. - %% - %% Optionally, QoS 0 messages pending transmission to the Client. - mqueue :: ?MQueue:mqueue(), + %% All QoS1, QoS2 messages published to when client is disconnected. + %% QoS 1 and QoS 2 messages pending transmission to the Client. + %% + %% Optionally, QoS 0 messages pending transmission to the Client. + mqueue :: ?MQueue:mqueue(), - %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. - awaiting_rel :: map(), + %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. + awaiting_rel :: map(), - %% Max Packets that Awaiting PUBREL - max_awaiting_rel = 100 :: non_neg_integer(), + %% Max Packets that Awaiting PUBREL + max_awaiting_rel = 100 :: non_neg_integer(), - %% Awaiting PUBREL timeout - await_rel_timeout = 20000 :: timeout(), + %% Awaiting PUBREL timeout + await_rel_timeout = 20000 :: timeout(), - %% Awaiting PUBREL timer - await_rel_timer :: reference() | undefined, + %% Awaiting PUBREL timer + await_rel_timer :: reference() | undefined, - %% Session Expiry Interval - expiry_interval = 7200000 :: timeout(), + %% Session Expiry Interval + expiry_interval = 7200000 :: timeout(), - %% Expired Timer - expiry_timer :: reference() | undefined, + %% Expired Timer + expiry_timer :: reference() | undefined, - %% Enable Stats - enable_stats :: boolean(), + %% Enable Stats + enable_stats :: boolean(), - %% Force GC Count - force_gc_count :: undefined | integer(), + %% Force GC reductions + reductions = 0 :: non_neg_integer(), - %% Ignore loop deliver? - ignore_loop_deliver = false :: boolean(), + %% Ignore loop deliver? + ignore_loop_deliver = false :: boolean(), - created_at :: erlang:timestamp() + created_at :: erlang:timestamp() }). -define(TIMEOUT, 60000). @@ -161,8 +160,8 @@ %% @doc Start a Session -spec(start_link(map()) -> {ok, pid()} | {error, term()}). -start_link(ClientAttrs) -> - gen_server:start_link(?MODULE, ClientAttrs, [{hibernate_after, 10000}]). +start_link(Attrs) -> + gen_server:start_link(?MODULE, Attrs, [{hibernate_after, 10000}]). %%-------------------------------------------------------------------- %% PubSub API @@ -170,71 +169,71 @@ start_link(ClientAttrs) -> %% @doc Subscribe topics -spec(subscribe(pid(), [{binary(), [emqx_topic:option()]}]) -> ok). -subscribe(Session, TopicTable) -> %%TODO: the ack function??... - gen_server:cast(Session, {subscribe, self(), TopicTable, fun(_) -> ok end}). +subscribe(SessionPid, TopicTable) -> %%TODO: the ack function??... + gen_server:cast(SessionPid, {subscribe, self(), TopicTable, fun(_) -> ok end}). -spec(subscribe(pid(), mqtt_packet_id(), [{binary(), [emqx_topic:option()]}]) -> ok). -subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??... +subscribe(SessionPid, PacketId, TopicTable) -> %%TODO: the ack function??... From = self(), AckFun = fun(GrantedQos) -> From ! {suback, PacketId, GrantedQos} end, - gen_server:cast(Session, {subscribe, From, TopicTable, AckFun}). + gen_server:cast(SessionPid, {subscribe, From, TopicTable, AckFun}). %% @doc Publish Message -spec(publish(pid(), message()) -> ok | {error, term()}). -publish(_Session, Msg = #message{qos = ?QOS_0}) -> +publish(_SessionPid, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 Directly emqx_broker:publish(Msg), ok; -publish(_Session, Msg = #message{qos = ?QOS_1}) -> +publish(_SessionPid, Msg = #message{qos = ?QOS_1}) -> %% Publish QoS1 message directly for client will PubAck automatically emqx_broker:publish(Msg), ok; -publish(Session, Msg = #message{qos = ?QOS_2}) -> +publish(SessionPid, Msg = #message{qos = ?QOS_2}) -> %% Publish QoS2 to Session - gen_server:call(Session, {publish, Msg}, ?TIMEOUT). + gen_server:call(SessionPid, {publish, Msg}, ?TIMEOUT). %% @doc PubAck Message -spec(puback(pid(), mqtt_packet_id()) -> ok). -puback(Session, PacketId) -> - gen_server:cast(Session, {puback, PacketId}). +puback(SessionPid, PacketId) -> + gen_server:cast(SessionPid, {puback, PacketId}). -spec(pubrec(pid(), mqtt_packet_id()) -> ok). -pubrec(Session, PacketId) -> - gen_server:cast(Session, {pubrec, PacketId}). +pubrec(SessionPid, PacketId) -> + gen_server:cast(SessionPid, {pubrec, PacketId}). -spec(pubrel(pid(), mqtt_packet_id()) -> ok). -pubrel(Session, PacketId) -> - gen_server:cast(Session, {pubrel, PacketId}). +pubrel(SessionPid, PacketId) -> + gen_server:cast(SessionPid, {pubrel, PacketId}). -spec(pubcomp(pid(), mqtt_packet_id()) -> ok). -pubcomp(Session, PacketId) -> - gen_server:cast(Session, {pubcomp, PacketId}). +pubcomp(SessionPid, PacketId) -> + gen_server:cast(SessionPid, {pubcomp, PacketId}). %% @doc Unsubscribe the topics -spec(unsubscribe(pid(), [{binary(), [suboption()]}]) -> ok). -unsubscribe(Session, TopicTable) -> - gen_server:cast(Session, {unsubscribe, self(), TopicTable}). +unsubscribe(SessionPid, TopicTable) -> + gen_server:cast(SessionPid, {unsubscribe, self(), TopicTable}). %% @doc Resume the session -spec(resume(pid(), client_id(), pid()) -> ok). -resume(Session, ClientId, ClientPid) -> - gen_server:cast(Session, {resume, ClientId, ClientPid}). +resume(SessionPid, ClientId, ClientPid) -> + gen_server:cast(SessionPid, {resume, ClientId, ClientPid}). %% @doc Get session state -state(Session) when is_pid(Session) -> - gen_server:call(Session, state). +state(SessionPid) when is_pid(SessionPid) -> + gen_server:call(SessionPid, state). %% @doc Get session info -spec(info(pid() | #state{}) -> list(tuple())). -info(Session) when is_pid(Session) -> - gen_server:call(Session, info); +info(SessionPid) when is_pid(SessionPid) -> + gen_server:call(SessionPid, info); info(State) when is_record(State, state) -> ?record_to_proplist(state, State, ?INFO_KEYS). -spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})). -stats(Session) when is_pid(Session) -> - gen_server:call(Session, stats); +stats(SessionPid) when is_pid(SessionPid) -> + gen_server:call(SessionPid, stats); stats(#state{max_subscriptions = MaxSubscriptions, subscriptions = Subscriptions, @@ -258,8 +257,8 @@ stats(#state{max_subscriptions = MaxSubscriptions, %% @doc Discard the session -spec(discard(pid(), client_id()) -> ok). -discard(Session, ClientId) -> - gen_server:cast(Session, {discard, ClientId}). +discard(SessionPid, ClientId) -> + gen_server:call(SessionPid, {discard, ClientId}). %%-------------------------------------------------------------------- %% gen_server Callbacks @@ -276,7 +275,6 @@ init(#{clean_start := CleanStart, {ok, QEnv} = emqx:env(mqueue), MaxInflight = get_value(max_inflight, Env, 0), EnableStats = get_value(enable_stats, Env, false), - ForceGcCount = emqx_gc:conn_max_gc_count(), IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false), MQueue = ?MQueue:new(ClientId, QEnv, emqx_alarm:alarm_fun()), State = #state{clean_start = CleanStart, @@ -296,10 +294,9 @@ init(#{clean_start := CleanStart, max_awaiting_rel = get_value(max_awaiting_rel, Env), expiry_interval = get_value(expiry_interval, Env), enable_stats = EnableStats, - force_gc_count = ForceGcCount, ignore_loop_deliver = IgnoreLoopDeliver, created_at = os:timestamp()}, - emqx_sm:register_session(ClientId, self()), + emqx_sm:register_session(#session{sid = ClientId, pid = self()}, info(State)), emqx_hooks:run('session.created', [ClientId, Username]), io:format("Session started: ~p~n", [self()]), {ok, emit_stats(State), hibernate}. @@ -310,8 +307,13 @@ init_stats(Keys) -> binding(ClientPid) -> case node(ClientPid) =:= node() of true -> local; false -> remote end. -handle_pre_hibernate(State) -> - {hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}. +handle_call({discard, ClientPid}, _From, State = #state{client_pid = undefined}) -> + ?LOG(warning, "Discarded by ~p", [ClientPid], State), + {stop, {shutdown, discard}, ok, State}; + +handle_call({discard, ClientPid}, _From, State = #state{client_pid = OldClientPid}) -> + ?LOG(warning, " ~p kickout ~p", [ClientPid, OldClientPid], State), + {stop, {shutdown, conflict}, ok, State}; handle_call({publish, Msg = #message{qos = ?QOS_2, headers = #{packet_id := PacketId}}}, _From, State = #state{awaiting_rel = AwaitingRel, @@ -498,16 +500,6 @@ handle_cast({resume, ClientId, ClientPid}, %% Replay delivery and Dequeue pending messages {noreply, emit_stats(dequeue(retry_delivery(true, State1)))}; -handle_cast({discard, ClientId}, - State = #state{client_id = ClientId, client_pid = undefined}) -> - ?LOG(warning, "Destroyed", [], State), - shutdown(discard, State); - -handle_cast({discard, ClientId}, - State = #state{client_id = ClientId, client_pid = OldClientPid}) -> - ?LOG(warning, "kickout ~p", [OldClientPid], State), - shutdown(conflict, State); - handle_cast(Msg, State) -> lager:error("[~s] Unexpected Cast: ~p", [?MODULE, Msg]), {noreply, State}. @@ -563,10 +555,9 @@ handle_info(Info, State) -> {noreply, State}. terminate(Reason, #state{client_id = ClientId, username = Username}) -> - %% Move to emqx_sm to avoid race condition - %% emqx_stats:del_session_stats(ClientId), + emqx_hooks:run('session.terminated', [ClientId, Username, Reason]), - emqx_sm:unregister_session(ClientId). + emqx_sm:unregister_session(#session{sid = ClientId, pid = self()}). code_change(_OldVsn, Session, _Extra) -> {ok, Session}. @@ -574,6 +565,7 @@ code_change(_OldVsn, Session, _Extra) -> %%-------------------------------------------------------------------- %% Kickout old client %%-------------------------------------------------------------------- + kick(_ClientId, undefined, _Pid) -> ignore; kick(_ClientId, Pid, Pid) -> @@ -820,7 +812,8 @@ next_msg_id(State = #state{next_msg_id = Id}) -> emit_stats(State = #state{enable_stats = false}) -> State; emit_stats(State = #state{client_id = ClientId}) -> - emqx_stats:set_session_stats(ClientId, stats(State)), + Session = #session{sid = ClientId, pid = self()}, + emqx_sm_stats:set_session_stats(Session, stats(State)), State. inc_stats(Key) -> put(Key, get(Key) + 1). @@ -836,5 +829,6 @@ shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. gc(State) -> - emqx_gc:maybe_force_gc(#state.force_gc_count, State). + State. + %%emqx_gc:maybe_force_gc(#state.force_gc_count, State). diff --git a/src/emqx_shared_pubsub.erl b/src/emqx_shared_sub.erl similarity index 88% rename from src/emqx_shared_pubsub.erl rename to src/emqx_shared_sub.erl index bbc4d1930..6d5efbd51 100644 --- a/src/emqx_shared_pubsub.erl +++ b/src/emqx_shared_sub.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,18 +14,12 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_shared_pubsub). +-module(emqx_shared_sub). -behaviour(gen_server). -include("emqx.hrl"). -%% Mnesia bootstrap --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - %% API -export([start_link/0]). @@ -41,32 +35,24 @@ -define(SERVER, ?MODULE). --define(TABLE, shared_subscription). +-define(TAB, shared_subscription). -record(state, {pmon}). -record(shared_subscription, {group, topic, subpid}). -%%-------------------------------------------------------------------- -%% Mnesia bootstrap -%%-------------------------------------------------------------------- - -mnesia(boot) -> - ok = ekka_mnesia:create_table(?TABLE, [ - {type, bag}, - {ram_copies, [node()]}, - {record_name, shared_subscription}, - {attributes, record_info(fields, shared_subscription)}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TABLE). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- -spec(start_link() -> {ok, pid()} | ignore | {error, any()}). start_link() -> + ok = ekka_mnesia:create_table(?TAB, [ + {type, bag}, + {ram_copies, [node()]}, + {record_name, shared_subscription}, + {attributes, record_info(fields, shared_subscription)}]), + ok = ekka_mnesia:copy_table(?TAB), gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -spec(strategy() -> random | hash). @@ -113,14 +99,14 @@ subscribers(Group, Topic) -> init([]) -> {atomic, PMon} = mnesia:transaction(fun init_monitors/0), - mnesia:subscribe({table, ?TABLE, simple}), + mnesia:subscribe({table, ?TAB, simple}), {ok, #state{pmon = PMon}}. init_monitors() -> mnesia:foldl( fun(#shared_subscription{subpid = SubPid}, Mon) -> Mon:monitor(SubPid) - end, emqx_pmon:new(), ?TABLE). + end, emqx_pmon:new(), ?TAB). handle_call(Req, _From, State) -> emqx_log:error("[Shared] Unexpected request: ~p", [Req]), @@ -156,7 +142,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, _State) -> - mnesia:unsubscribe({table, ?TABLE, simple}). + mnesia:unsubscribe({table, ?TAB, simple}). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index f4ea1468c..56587501e 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -20,29 +20,34 @@ -include("emqx.hrl"). --export([start_link/1]). +-export([start_link/0]). -export([open_session/1, lookup_session/1, close_session/1]). --export([resume_session/1, discard_session/1]). --export([register_session/1, register_session/2]). --export([unregister_session/1, unregister_session/2]). +-export([resume_session/1, resume_session/2, discard_session/1, discard_session/2]). +-export([register_session/2, unregister_session/1]). %% Internal functions for rpc --export([lookup/1, dispatch/3]). +-export([dispatch/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {stats, pids = #{}}). +-record(state, {pmon}). --spec(start_link(fun()) -> {ok, pid()} | ignore | {error, term()}). -start_link(StatsFun) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []). +-define(SM, ?MODULE). + +-spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%-------------------------------------------------------------------- +%% Open Session +%%-------------------------------------------------------------------- open_session(Attrs = #{clean_start := true, client_id := ClientId, client_pid := ClientPid}) -> CleanStart = fun(_) -> - discard_session(ClientId, ClientPid), + ok = discard_session(ClientId, ClientPid), emqx_session_sup:start_session(Attrs) end, emqx_sm_locker:trans(ClientId, CleanStart); @@ -61,13 +66,26 @@ open_session(Attrs = #{clean_start := false, end, emqx_sm_locker:trans(ClientId, ResumeStart). -discard_session(ClientId) -> +%%-------------------------------------------------------------------- +%% Discard Session +%%-------------------------------------------------------------------- + +discard_session(ClientId) when is_binary(ClientId) -> discard_session(ClientId, self()). -discard_session(ClientId, ClientPid) -> - lists:foreach(fun({_, SessionPid}) -> - catch emqx_session:discard(SessionPid, ClientPid) - end, lookup_session(ClientId)). +discard_session(ClientId, ClientPid) when is_binary(ClientId) -> + lists:foreach( + fun(#session{pid = SessionPid}) -> + case catch emqx_session:discard(SessionPid, ClientPid) of + {'EXIT', Error} -> + emqx_log:error("[SM] Failed to discard ~p: ~p", [SessionPid, Error]); + ok -> ok + end + end, lookup_session(ClientId)). + +%%-------------------------------------------------------------------- +%% Resume Session +%%-------------------------------------------------------------------- resume_session(ClientId) -> resume_session(ClientId, self()). @@ -75,99 +93,106 @@ resume_session(ClientId) -> resume_session(ClientId, ClientPid) -> case lookup_session(ClientId) of [] -> {error, not_found}; - [{_, SessionPid}] -> + [#session{pid = SessionPid}] -> ok = emqx_session:resume(SessionPid, ClientPid), {ok, SessionPid}; - [{_, SessionPid}|_More] = Sessions -> + Sessions -> + [#session{pid = SessionPid}|StaleSessions] = lists:reverse(Sessions), emqx_log:error("[SM] More than one session found: ~p", [Sessions]), + lists:foreach(fun(#session{pid = Pid}) -> + catch emqx_session:discard(Pid, ClientPid) + end, StaleSessions), ok = emqx_session:resume(SessionPid, ClientPid), {ok, SessionPid} end. +%%-------------------------------------------------------------------- +%% Close a session +%%-------------------------------------------------------------------- + +close_session(#session{pid = SessionPid}) -> + emqx_session:close(SessionPid). + +%%-------------------------------------------------------------------- +%% Create/Delete a session +%%-------------------------------------------------------------------- + +register_session(Session, Attrs) when is_record(Session, session) -> + ets:insert(session, Session), + ets:insert(session_attrs, {Session, Attrs}), + emqx_sm_registry:register_session(Session), + gen_server:cast(?MODULE, {registered, Session}). + +unregister_session(Session) when is_record(Session, session) -> + emqx_sm_registry:unregister_session(Session), + emqx_sm_stats:del_session_stats(Session), + ets:delete(session_attrs, Session), + ets:delete_object(session, Session), + gen_server:cast(?MODULE, {unregistered, Session}). + +%%-------------------------------------------------------------------- +%% Lookup a session from registry +%%-------------------------------------------------------------------- + lookup_session(ClientId) -> - {ResL, _} = multicall(?MODULE, lookup, [ClientId]), - lists:append(ResL). + emqx_sm_registry:lookup_session(ClientId). -close_session(ClientId) -> - lists:foreach(fun(#session{pid = SessionPid}) -> - emqx_session:close(SessionPid) - end, lookup_session(ClientId)). - -register_session(ClientId) -> - register_session(ClientId, self()). - -register_session(ClientId, SessionPid) -> - ets:insert(session, {ClientId, SessionPid}). - -unregister_session(ClientId) -> - unregister_session(ClientId, self()). - -unregister_session(ClientId, SessionPid) -> - case ets:lookup(session, ClientId) of - [Session = {ClientId, SessionPid}] -> - ets:delete(session_attrs, Session), - ets:delete(session_stats, Session), - ets:delete_object(session, Session); - _ -> - false - end. +%%-------------------------------------------------------------------- +%% Dispatch by client Id +%%-------------------------------------------------------------------- dispatch(ClientId, Topic, Msg) -> - case lookup(ClientId) of - [{_, Pid}] -> + case lookup_session_pid(ClientId) of + Pid when is_pid(Pid) -> Pid ! {dispatch, Topic, Msg}; - [] -> + undefined -> emqx_hooks:run('message.dropped', [ClientId, Msg]) end. -lookup(ClientId) -> - ets:lookup(session, ClientId). - -multicall(Mod, Fun, Args) -> - multicall(ekka:nodelist(up), Mod, Fun, Args). - -multicall([Node], Mod, Fun, Args) when Node == node() -> - Res = erlang:apply(Mod, Fun, Args), [Res]; - -multicall(Nodes, Mod, Fun, Args) -> - {ResL, _} = emqx_rpc:multicall(Nodes, Mod, Fun, Args), - ResL. +lookup_session_pid(ClientId) -> + try ets:lookup_element(session, ClientId, #session.pid) + catch error:badarg -> + undefined + end. %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- -init([StatsFun]) -> - {ok, sched_stats(StatsFun, #state{pids = #{}})}. - -sched_stats(Fun, State) -> - {ok, TRef} = timer:send_interval(timer:seconds(1), stats), - State#state{stats = #{func => Fun, timer => TRef}}. +init([]) -> + _ = emqx_tables:create(session, [public, set, {keypos, 2}, + {read_concurrency, true}, + {write_concurrency, true}]), + _ = emqx_tables:create(session_attrs, [public, set, + {write_concurrency, true}]), + {ok, #state{pmon = emqx_pmon:new()}}. handle_call(Req, _From, State) -> emqx_log:error("[SM] Unexpected request: ~p", [Req]), {reply, ignore, State}. -handle_cast({registered, ClientId, SessionPid}, - State = #state{pids = Pids}) -> - _ = erlang:monitor(process, SessionPid), - {noreply, State#state{pids = maps:put(SessionPid, ClientId, Pids)}}; +handle_cast({registered, #session{sid = ClientId, pid = SessionPid}}, + State = #state{pmon = PMon}) -> + {noreply, State#state{pmon = PMon:monitor(SessionPid, ClientId)}}; + +handle_cast({unregistered, #session{sid = _ClientId, pid = SessionPid}}, + State = #state{pmon = PMon}) -> + {noreply, State#state{pmon = PMon:erase(SessionPid)}}; handle_cast(Msg, State) -> emqx_log:error("[SM] Unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info(stats, State) -> - {noreply, setstats(State), hibernate}; - handle_info({'DOWN', _MRef, process, DownPid, _Reason}, - State = #state{pids = Pids}) -> - case maps:find(DownPid, Pids) of + State = #state{pmon = PMon}) -> + case PMon:find(DownPid) of {ok, ClientId} -> - unregister_session(ClientId, DownPid), - {noreply, State#state{pids = maps:remove(DownPid, Pids)}}; - error -> - emqx_log:error("[SM] Session ~p not found", [DownPid]), + case ets:lookup(session, ClientId) of + [] -> ok; + _ -> unregister_session(#session{sid = ClientId, pid = DownPid}) + end, + {noreply, State}; + undefined -> {noreply, State} end; @@ -175,16 +200,9 @@ handle_info(Info, State) -> emqx_log:error("[SM] Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, _State = #state{stats = #{timer := TRef}}) -> - timer:cancel(TRef). +terminate(_Reason, _State) -> + ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -setstats(State = #state{stats = #{func := Fun}}) -> - Fun(ets:info(session, size)), State. - diff --git a/src/emqx_sm_registry.erl b/src/emqx_sm_registry.erl new file mode 100644 index 000000000..e718ee3e7 --- /dev/null +++ b/src/emqx_sm_registry.erl @@ -0,0 +1,109 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_sm_registry). + +-behaviour(gen_server). + +-include("emqx.hrl"). + +%% API +-export([start_link/0]). + +-export([register_session/1, lookup_session/1, unregister_session/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-define(TAB, session_registry). + +-define(LOCK, {?MODULE, cleanup_sessions}). + +-record(state, {}). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +start_link() -> + ok = ekka_mnesia:create_table(?TAB, [ + {type, bag}, + {ram_copies, [node()]}, + {record_name, session}, + {attributes, record_info(fields, session)}]), + ok = ekka_mnesia:copy_table(?TAB), + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +-spec(lookup_session(client_id()) -> list(session())). +lookup_session(ClientId) -> + mnesia:dirty_read(?TAB, ClientId). + +-spec(register_session(session()) -> ok). +register_session(Session) when is_record(Session, session) -> + mnesia:dirty_write(?TAB, Session). + +-spec(unregister_session(session()) -> ok). +unregister_session(Session) when is_record(Session, session) -> + mnesia:dirty_delete_object(?TAB, Session). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([]) -> + ekka:monitor(membership), + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({membership, {mnesia, down, Node}}, State) -> + global:trans({?LOCK, self()}, + fun() -> + mnesia:transaction(fun cleanup_sessions/1, [Node]) + end), + {noreply, State}; + +handle_info({membership, _Event}, State) -> + {noreply, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +cleanup_sessions(Node) -> + Pat = [{#session{pid = '$1', _ = '_'}, + [{'==', {node, '$1'}, Node}], ['$_']}], + lists:foreach(fun(Session) -> + mnesia:delete_object(?TAB, Session) + end, mnesia:select(?TAB, Pat)). + diff --git a/src/emqx_sm_stats.erl b/src/emqx_sm_stats.erl new file mode 100644 index 000000000..4d7766f45 --- /dev/null +++ b/src/emqx_sm_stats.erl @@ -0,0 +1,72 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_sm_stats). + +-behaviour(gen_statem). + +-include("emqx.hrl"). + +%% API +-export([start_link/0]). + +-export([set_session_stats/2, get_session_stats/1, del_session_stats/1]). + +%% gen_statem callbacks +-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). + +-define(TAB, session_stats). + +-record(state, {statsfun}). + +start_link() -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec(set_session_stats(session(), emqx_stats:stats()) -> true). +set_session_stats(Session, Stats) when is_record(Session, session) -> + ets:insert(?TAB, {Session, [{'$ts', emqx_time:now_secs()}|Stats]}). + +-spec(get_session_stats(session()) -> emqx_stats:stats()). +get_session_stats(Session) -> + case ets:lookup(?TAB, Session) of + [{_, Stats}] -> Stats; + [] -> [] + end. + +-spec(del_session_stats(session()) -> true). +del_session_stats(Session) -> + ets:delete(?TAB, Session). + +init([]) -> + _ = emqx_tables:create(?TAB, [public, {write_concurrency, true}]), + StatsFun = emqx_stats:statsfun('sessions/count', 'sessions/max'), + {ok, idle, #state{statsfun = StatsFun}, timer:seconds(1)}. + +callback_mode() -> handle_event_function. + +handle_event(timeout, _Timeout, idle, State = #state{statsfun = StatsFun}) -> + case ets:info(session, size) of + undefined -> ok; + Size -> StatsFun(Size) + end, + {next_state, idle, State, timer:seconds(1)}. + +terminate(_Reason, _StateName, _State) -> + ok. + +code_change(_OldVsn, StateName, State, _Extra) -> + {ok, StateName, State}. + diff --git a/src/emqx_sm_sup.erl b/src/emqx_sm_sup.erl index 51a495f58..f65dead14 100644 --- a/src/emqx_sm_sup.erl +++ b/src/emqx_sm_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -26,15 +26,12 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - lists:foreach(fun create_tab/1, [session, session_stats, session_attrs]), + Childs = [child(M) || M <- [emqx_sm_locker, + emqx_sm_registry, + emqx_sm_stats, + emqx_sm]], + {ok, {{one_for_all, 10, 3600}, Childs}}. - StatsFun = emqx_stats:statsfun('sessions/count', 'sessions/max'), - - SM = {emqx_sm, {emqx_sm, start_link, [StatsFun]}, - permanent, 5000, worker, [emqx_sm]}, - - {ok, {{one_for_all, 10, 3600}, [SM]}}. - -create_tab(Tab) -> - emqx_tables:create(Tab, [public, ordered_set, named_table, {write_concurrency, true}]). +child(M) -> + {M, {M, start_link, []}, permanent, 5000, worker, [M]}. diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index d93c1b969..83048a001 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -26,8 +26,7 @@ -export([all/0]). %% Client and Session Stats --export([set_client_stats/2, get_client_stats/1, del_client_stats/1, - set_session_stats/2, get_session_stats/1, del_session_stats/1]). +-export([set_client_stats/2, get_client_stats/1, del_client_stats/1]). %% Statistics API. -export([statsfun/1, statsfun/2, getstats/0, getstat/1, setstat/2, setstat/3]). @@ -40,6 +39,8 @@ -type(stats() :: list({atom(), non_neg_integer()})). +-export_type([stats/0]). + -define(STATS_TAB, mqtt_stats). -define(CLIENT_STATS_TAB, mqtt_client_stats). -define(SESSION_STATS_TAB, mqtt_session_stats). @@ -101,20 +102,6 @@ get_client_stats(ClientId) -> del_client_stats(ClientId) -> ets:delete(?CLIENT_STATS_TAB, ClientId). --spec(set_session_stats(binary(), stats()) -> true). -set_session_stats(ClientId, Stats) -> - ets:insert(?SESSION_STATS_TAB, {ClientId, [{'$ts', emqx_time:now_secs()}|Stats]}). - --spec(get_session_stats(binary()) -> stats()). -get_session_stats(ClientId) -> - case ets:lookup(?SESSION_STATS_TAB, ClientId) of - [{_, Stats}] -> Stats; - [] -> [] - end. - --spec(del_session_stats(binary()) -> true). -del_session_stats(ClientId) -> - ets:delete(?SESSION_STATS_TAB, ClientId). all() -> ets:tab2list(?STATS_TAB). diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index d06a8df79..ec05316c7 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -57,7 +57,6 @@ init([]) -> {ok, {{one_for_all, 10, 3600}, [?CHILD(emqx_ctl, worker), ?CHILD(emqx_hooks, worker), - ?CHILD(emqx_locker, worker), ?CHILD(emqx_stats, worker), ?CHILD(emqx_metrics, worker), ?CHILD(emqx_sys, worker),