diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index c5299d638..16d35585e 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -20,7 +20,9 @@ -include("emqx_mqtt.hrl"). -export([start_link/3]). --export([info/1, stats/1, kick/1]). +-export([info/1, attrs/1]). +-export([stats/1]). +-export([kick/1]). -export([session/1]). %% gen_server callbacks @@ -49,7 +51,7 @@ -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(LOG(Level, Format, Args, State), - emqx_logger:Level("Client(~s): " ++ Format, + emqx_logger:Level("MQTT(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). start_link(Transport, Socket, Options) -> @@ -59,17 +61,58 @@ start_link(Transport, Socket, Options) -> %% API %%------------------------------------------------------------------------------ -info(CPid) -> - call(CPid, info). +%% for debug +info(CPid) when is_pid(CPid) -> + call(CPid, info); -stats(CPid) -> - call(CPid, stats). +info(#state{transport = Transport, + socket = Socket, + peername = Peername, + sockname = Sockname, + conn_state = ConnState, + await_recv = AwaitRecv, + rate_limit = RateLimit, + publish_limit = PubLimit, + proto_state = ProtoState}) -> + ConnInfo = [{socktype, Transport:type(Socket)}, + {peername, Peername}, + {sockname, Sockname}, + {conn_state, ConnState}, + {await_recv, AwaitRecv}, + {rate_limit, esockd_rate_limit:info(RateLimit)}, + {publish_limit, esockd_rate_limit:info(PubLimit)}], + ProtoInfo = emqx_protocol:info(ProtoState), + lists:usort(lists:append(ConnInfo, ProtoInfo)). -kick(CPid) -> - call(CPid, kick). +%% for dashboard +attrs(CPid) when is_pid(CPid) -> + call(CPid, attrs); -session(CPid) -> - call(CPid, session). +attrs(#state{peername = Peername, + sockname = Sockname, + proto_state = ProtoState}) -> + SockAttrs = [{peername, Peername}, + {sockname, Sockname}], + ProtoAttrs = emqx_protocol:attrs(ProtoState), + lists:usort(lists:append(SockAttrs, ProtoAttrs)). + +%% Conn stats +stats(CPid) when is_pid(CPid) -> + call(CPid, stats); + +stats(#state{transport = Transport, + socket = Socket, + proto_state = ProtoState}) -> + lists:append([emqx_misc:proc_stats(), + emqx_protocol:stats(ProtoState), + case Transport:getstat(Socket, ?SOCK_STATS) of + {ok, Ss} -> Ss; + {error, _} -> [] + end]). + +kick(CPid) -> call(CPid, kick). + +session(CPid) -> call(CPid, session). call(CPid, Req) -> gen_server:call(CPid, Req, infinity). @@ -131,38 +174,17 @@ send_fun(Transport, Socket, Peername) -> end end. -handle_call(info, _From, State = #state{transport = Transport, - socket = Socket, - peername = Peername, - sockname = Sockname, - conn_state = ConnState, - await_recv = AwaitRecv, - rate_limit = RateLimit, - publish_limit = PubLimit, - proto_state = ProtoState}) -> - ConnInfo = [{socktype, Transport:type(Socket)}, - {peername, Peername}, - {sockname, Sockname}, - {conn_state, ConnState}, - {await_recv, AwaitRecv}, - {rate_limit, esockd_rate_limit:info(RateLimit)}, - {publish_limit, esockd_rate_limit:info(PubLimit)}], - ProtoInfo = emqx_protocol:info(ProtoState), - {reply, lists:usort(lists:append([ConnInfo, ProtoInfo])), State}; +handle_call(info, _From, State) -> + {reply, info(State), State}; -handle_call(stats, _From, State = #state{transport = Transport, - socket = Socket, - proto_state = ProtoState}) -> - ProcStats = emqx_misc:proc_stats(), - ProtoStats = emqx_protocol:stats(ProtoState), - SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of - {ok, Ss} -> Ss; - {error, _} -> [] - end, - {reply, lists:append([ProcStats, ProtoStats, SockStats]), State}; +handle_call(attrs, _From, State) -> + {reply, attrs(State), State}; + +handle_call(stats, _From, State) -> + {reply, stats(State), State}; handle_call(kick, _From, State) -> - {stop, {shutdown, kick}, ok, State}; + {stop, {shutdown, kicked}, ok, State}; handle_call(session, _From, State = #state{proto_state = ProtoState}) -> {reply, emqx_protocol:session(ProtoState), State}; @@ -186,8 +208,7 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> end; handle_info(emit_stats, State = #state{proto_state = ProtoState}) -> - Stats = element(2, handle_call(stats, undefined, State)), - emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), Stats), + emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), {noreply, State#state{stats_timer = undefined}, hibernate}; handle_info(timeout, State) -> diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index 56dac9110..c1cefd893 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -47,7 +47,7 @@ banned(ClientId) -> %%-------------------------------------------------------------------- init([]) -> - _ = ets:new(banned, [public, ordered_set, named_table]), + %% ets:new(banned, [public, ordered_set, named_table]), {ok, #state{}}. handle_call(_Request, _From, State) -> diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index 6b1d43207..5a32b43c5 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -19,8 +19,6 @@ %% Memory: (10, 100, 1000) %% -%%-record - -export([conn_max_gc_count/0, reset_conn_gc_count/2, maybe_force_gc/2, maybe_force_gc/3]). diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index b0da175c6..48edac2c4 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -28,8 +28,8 @@ load(Topics) -> emqx_hooks:add('session.created', fun ?MODULE:on_session_created/3, [Topics]). -on_session_created(#{client_id := ClientId}, SessInfo, Topics) -> - Username = proplists:get_value(username, SessInfo), +on_session_created(#{client_id := ClientId}, SessAttrs, Topics) -> + Username = proplists:get_value(username, SessAttrs), Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end, diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 81d882007..01971258c 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -17,7 +17,11 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --export([init/2, info/1, caps/1, stats/1]). +-export([init/2]). +-export([info/1]). +-export([attrs/1]). +-export([caps/1]). +-export([stats/1]). -export([client_id/1]). -export([credentials/1]). -export([parser/1]). @@ -28,22 +32,6 @@ -export([send/2]). -export([shutdown/2]). - -%%-record(mqtt_client, { -%% client_id :: binary() | undefined, -%% client_pid :: pid(), -%% username :: binary() | undefined, -%% peername :: {inet:ip_address(), inet:port_number()}, -%% clean_start :: boolean(), -%% proto_ver :: emqx_mqtt_types:version(), -%% keepalive = 0 :: non_neg_integer(), -%% will_topic :: undefined | binary(), -%% mountpoint :: undefined | binary(), -%% connected_at :: erlang:timestamp(), -%% attributes :: map() -%% }). - - -record(pstate, { zone, sendfun, @@ -61,6 +49,7 @@ clean_start, topic_aliases, packet_size, + will_topic, will_msg, keepalive, mountpoint, @@ -81,7 +70,7 @@ -endif. -define(LOG(Level, Format, Args, PState), - emqx_logger:Level([{client, PState#pstate.client_id}], "Client(~s@~s): " ++ Format, + emqx_logger:Level([{client, PState#pstate.client_id}], "MQTT(~s@~s): " ++ Format, [PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])). %%------------------------------------------------------------------------------ @@ -127,33 +116,46 @@ set_username(_Username, PState) -> %% API %%------------------------------------------------------------------------------ -info(#pstate{zone = Zone, - client_id = ClientId, - username = Username, - peername = Peername, - proto_ver = ProtoVer, - proto_name = ProtoName, - clean_start = CleanStart, - conn_props = ConnProps, - keepalive = Keepalive, - mountpoint = Mountpoint, - is_super = IsSuper, - is_bridge = IsBridge, - connected = Connected, - connected_at = ConnectedAt}) -> +info(PState = #pstate{conn_props = ConnProps, + ack_props = AclProps, + session = Session, + topic_aliases = Aliases, + will_msg = WillMsg, + enable_acl = EnableAcl}) -> + attrs(PState) ++ [{conn_props, ConnProps}, + {ack_props, AclProps}, + {session, Session}, + {topic_aliases, Aliases}, + {will_msg, WillMsg}, + {enable_acl, EnableAcl}]. + +attrs(#pstate{zone = Zone, + client_id = ClientId, + username = Username, + peername = Peername, + peercert = Peercert, + clean_start = CleanStart, + proto_ver = ProtoVer, + proto_name = ProtoName, + keepalive = Keepalive, + will_topic = WillTopic, + mountpoint = Mountpoint, + is_super = IsSuper, + is_bridge = IsBridge, + connected_at = ConnectedAt}) -> [{zone, Zone}, {client_id, ClientId}, {username, Username}, {peername, Peername}, + {peercert, Peercert}, {proto_ver, ProtoVer}, {proto_name, ProtoName}, - {conn_props, ConnProps}, {clean_start, CleanStart}, {keepalive, Keepalive}, + {will_topic, WillTopic}, {mountpoint, Mountpoint}, {is_super, IsSuper}, {is_bridge, IsBridge}, - {connected, Connected}, {connected_at, ConnectedAt}]. caps(#pstate{zone = Zone}) -> @@ -254,6 +256,7 @@ process_packet(?CONNECT_PACKET( clean_start = CleanStart, keepalive = Keepalive, properties = ConnProps, + will_topic = WillTopic, client_id = ClientId, username = Username, password = Password} = Connect), PState) -> @@ -269,9 +272,9 @@ process_packet(?CONNECT_PACKET( clean_start = CleanStart, keepalive = Keepalive, conn_props = ConnProps, + will_topic = WillTopic, will_msg = WillMsg, is_bridge = IsBridge, - connected = true, connected_at = os:timestamp()}), connack( @@ -284,8 +287,8 @@ process_packet(?CONNECT_PACKET( %% Open session case try_open_session(PState3) of {ok, SPid, SP} -> - PState4 = PState3#pstate{session = SPid}, - ok = emqx_cm:register_connection(client_id(PState4), info(PState4)), + PState4 = PState3#pstate{session = SPid, connected = true}, + ok = emqx_cm:register_connection(client_id(PState4), attrs(PState4)), %% Start keepalive start_keepalive(Keepalive, PState4), %% Success @@ -521,7 +524,8 @@ try_open_session(#pstate{zone = Zone, username => Username, clean_start => CleanStart, conn_props => ConnProps}) of - {ok, SPid} -> {ok, SPid, false}; + {ok, SPid} -> + {ok, SPid, false}; Other -> Other end. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 840be8156..bbe78e02b 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -44,7 +44,8 @@ -include("emqx_mqtt.hrl"). -export([start_link/1]). --export([info/1, stats/1]). +-export([info/1, attrs/1]). +-export([stats/1]). -export([resume/2, discard/2]). -export([subscribe/2, subscribe/4]). -export([publish/3]). @@ -94,8 +95,8 @@ %% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked. inflight :: emqx_inflight:inflight(), - %% Max Inflight Size - max_inflight = 32 :: non_neg_integer(), + %% Max Inflight Size. DEPRECATED: Get from inflight + %% max_inflight = 32 :: non_neg_integer(), %% Retry interval for redelivering QoS1/2 messages retry_interval = 20000 :: timeout(), @@ -145,11 +146,6 @@ -define(TIMEOUT, 60000). --define(INFO_KEYS, [clean_start, client_id, username, binding, conn_pid, old_conn_pid, - next_pkt_id, max_subscriptions, subscriptions, upgrade_qos, inflight, - max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel, - await_rel_timeout, expiry_interval, enable_stats, created_at]). - -define(LOG(Level, Format, Args, State), emqx_logger:Level([{client, State#state.client_id}], "Session(~s): " ++ Format, [State#state.client_id | Args])). @@ -160,6 +156,77 @@ start_link(SessAttrs) -> IdleTimeout = maps:get(idle_timeout, SessAttrs, 30000), gen_server:start_link(?MODULE, SessAttrs, [{hibernate_after, IdleTimeout}]). +%% @doc Get session info +-spec(info(pid() | #state{}) -> list({atom(), term()})). +info(SPid) when is_pid(SPid) -> + gen_server:call(SPid, info, infinity); + +info(State = #state{conn_pid = ConnPid, + next_pkt_id = PktId, + max_subscriptions = MaxSubscriptions, + subscriptions = Subscriptions, + upgrade_qos = UpgradeQoS, + inflight = Inflight, + retry_interval = RetryInterval, + mqueue = MQueue, + awaiting_rel = AwaitingRel, + max_awaiting_rel = MaxAwaitingRel, + await_rel_timeout = AwaitRelTimeout}) -> + attrs(State) ++ [{conn_pid, ConnPid}, + {next_pkt_id, PktId}, + {max_subscriptions, MaxSubscriptions}, + {subscriptions, Subscriptions}, + {upgrade_qos, UpgradeQoS}, + {inflight, Inflight}, + {retry_interval, RetryInterval}, + {mqueue_len, MQueue}, + {awaiting_rel, AwaitingRel}, + {max_awaiting_rel, MaxAwaitingRel}, + {await_rel_timeout, AwaitRelTimeout}]. + +%% @doc Get session attrs +-spec(attrs(pid() | #state{}) -> list({atom(), term()})). +attrs(SPid) when is_pid(SPid) -> + gen_server:call(SPid, attrs, infinity); + +attrs(#state{clean_start = CleanStart, + binding = Binding, + client_id = ClientId, + username = Username, + expiry_interval = ExpiryInterval, + created_at = CreatedAt}) -> + [{clean_start, CleanStart}, + {binding, Binding}, + {client_id, ClientId}, + {username, Username}, + {expiry_interval, ExpiryInterval div 1000}, + {created_at, CreatedAt}]. + +-spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})). +stats(SPid) when is_pid(SPid) -> + gen_server:call(SPid, stats, infinity); + +stats(#state{max_subscriptions = MaxSubscriptions, + subscriptions = Subscriptions, + inflight = Inflight, + mqueue = MQueue, + max_awaiting_rel = MaxAwaitingRel, + awaiting_rel = AwaitingRel, + deliver_stats = DeliverMsg, + enqueue_stats = EnqueueMsg}) -> + lists:append(emqx_misc:proc_stats(), + [{max_subscriptions, MaxSubscriptions}, + {subscriptions_num, maps:size(Subscriptions)}, + {max_inflight, emqx_inflight:max_size(Inflight)}, + {inflight_len, emqx_inflight:size(Inflight)}, + {max_mqueue, emqx_mqueue:max_len(MQueue)}, + {mqueue_len, emqx_mqueue:len(MQueue)}, + {mqueue_dropped, emqx_mqueue:dropped(MQueue)}, + {max_awaiting_rel, MaxAwaitingRel}, + {awaiting_rel_len, maps:size(AwaitingRel)}, + {deliver_msg, DeliverMsg}, + {enqueue_msg, EnqueueMsg}]). + %%------------------------------------------------------------------------------ %% PubSub API %%------------------------------------------------------------------------------ @@ -229,70 +296,6 @@ unsubscribe(SPid, PacketId, Properties, TopicFilters) -> resume(SPid, ConnPid) -> gen_server:cast(SPid, {resume, ConnPid}). -%% @doc Get session info --spec(info(pid() | #state{}) -> list(tuple())). -info(SPid) when is_pid(SPid) -> - gen_server:call(SPid, info); - -info(#state{clean_start = CleanStart, - binding = Binding, - client_id = ClientId, - username = Username, - max_subscriptions = MaxSubscriptions, - subscriptions = Subscriptions, - upgrade_qos = UpgradeQoS, - inflight = Inflight, - max_inflight = MaxInflight, - retry_interval = RetryInterval, - mqueue = MQueue, - awaiting_rel = AwaitingRel, - max_awaiting_rel = MaxAwaitingRel, - await_rel_timeout = AwaitRelTimeout, - expiry_interval = ExpiryInterval, - created_at = CreatedAt}) -> - [{clean_start, CleanStart}, - {binding, Binding}, - {client_id, ClientId}, - {username, Username}, - {max_subscriptions, MaxSubscriptions}, - {subscriptions, maps:size(Subscriptions)}, - {upgrade_qos, UpgradeQoS}, - {inflight, emqx_inflight:size(Inflight)}, - {max_inflight, MaxInflight}, - {retry_interval, RetryInterval}, - {mqueue_len, emqx_mqueue:len(MQueue)}, - {awaiting_rel, maps:size(AwaitingRel)}, - {max_awaiting_rel, MaxAwaitingRel}, - {await_rel_timeout, AwaitRelTimeout}, - {expiry_interval, ExpiryInterval}, - {created_at, CreatedAt}]. - --spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})). -stats(SPid) when is_pid(SPid) -> - gen_server:call(SPid, stats, infinity); - -stats(#state{max_subscriptions = MaxSubscriptions, - subscriptions = Subscriptions, - inflight = Inflight, - max_inflight = MaxInflight, - mqueue = MQueue, - max_awaiting_rel = MaxAwaitingRel, - awaiting_rel = AwaitingRel, - deliver_stats = DeliverMsg, - enqueue_stats = EnqueueMsg}) -> - lists:append(emqx_misc:proc_stats(), - [{max_subscriptions, MaxSubscriptions}, - {subscriptions, maps:size(Subscriptions)}, - {max_inflight, MaxInflight}, - {inflight_len, emqx_inflight:size(Inflight)}, - {max_mqueue, emqx_mqueue:max_len(MQueue)}, - {mqueue_len, emqx_mqueue:len(MQueue)}, - {mqueue_dropped, emqx_mqueue:dropped(MQueue)}, - {max_awaiting_rel, MaxAwaitingRel}, - {awaiting_rel_len, maps:size(AwaitingRel)}, - {deliver_msg, DeliverMsg}, - {enqueue_msg, EnqueueMsg}]). - %% @doc Discard the session -spec(discard(pid(), emqx_types:client_id()) -> ok). discard(SPid, ClientId) -> @@ -311,7 +314,7 @@ init(#{zone := Zone, username := Username, conn_pid := ConnPid, clean_start := CleanStart, - conn_props := _ConnProps}) -> + conn_props := ConnProps}) -> process_flag(trap_exit, true), true = link(ConnPid), MaxInflight = get_env(Zone, max_inflight), @@ -323,21 +326,26 @@ init(#{zone := Zone, subscriptions = #{}, max_subscriptions = get_env(Zone, max_subscriptions, 0), upgrade_qos = get_env(Zone, upgrade_qos, false), - max_inflight = MaxInflight, inflight = emqx_inflight:new(MaxInflight), mqueue = init_mqueue(Zone, ClientId), retry_interval = get_env(Zone, retry_interval, 0), awaiting_rel = #{}, await_rel_timeout = get_env(Zone, await_rel_timeout), max_awaiting_rel = get_env(Zone, max_awaiting_rel), - expiry_interval = get_env(Zone, session_expiry_interval), + expiry_interval = expire_interval(Zone, ConnProps), enable_stats = get_env(Zone, enable_stats, true), - deliver_stats = 0, - enqueue_stats = 0, + deliver_stats = 0, + enqueue_stats = 0, created_at = os:timestamp()}, - emqx_sm:register_session(ClientId, info(State)), + emqx_sm:register_session(ClientId, [{zone, Zone} | attrs(State)]), + emqx_sm:set_session_stats(ClientId, stats(State)), emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), - {ok, ensure_stats_timer(State), hibernate}. + {ok, State}. + +expire_interval(_Zone, #{'Session-Expiry-Interval' := I}) -> + I * 1000; +expire_interval(Zone, _ConnProps) -> %% Maybe v3.1.1 + get_env(Zone, session_expiry_interval, 0). init_mqueue(Zone, ClientId) -> emqx_mqueue:new(ClientId, #{type => simple, @@ -399,6 +407,9 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, handle_call(info, _From, State) -> reply(info(State), State); +handle_call(attrs, _From, State) -> + reply(attrs(State), State); + handle_call(stats, _From, State) -> reply(stats(State), State); @@ -501,7 +512,7 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, %% Clean Session: true -> false? CleanStart andalso emqx_sm:set_session_attrs(ClientId, info(State1)), - emqx_hooks:run('session.resumed', [#{client_id => ClientId}, info(State)]), + emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]), %% Replay delivery and Dequeue pending messages {noreply, ensure_stats_timer(dequeue(retry_delivery(true, State1)))}; @@ -541,20 +552,18 @@ handle_info({timeout, _Timer, expired}, State) -> ?LOG(info, "Expired, shutdown now.", [], State), shutdown(expired, State); -handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start= true, conn_pid = ConnPid}) -> - {stop, normal, State}; +handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = true, conn_pid = ConnPid}) -> + {stop, Reason, State}; -handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = false, - conn_pid = ConnPid, - expiry_interval = Interval}) -> - ?LOG(info, "Connection ~p EXIT for ~p", [ConnPid, Reason], State), - ExpireTimer = emqx_misc:start_timer(Interval, expired), - State1 = State#state{conn_pid = undefined, expiry_timer = ExpireTimer}, - {noreply, State1}; +handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) -> + {stop, Reason, State}; -handle_info({'EXIT', Pid, _Reason}, State = #state{old_conn_pid = Pid}) -> +handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start = false, conn_pid = ConnPid}) -> + {noreply, ensure_expire_timer(State#state{conn_pid = undefined})}; + +handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) -> %% ignore - {noreply, State, hibernate}; + {noreply, State#state{old_conn_pid = undefined}, hibernate}; handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) -> ?LOG(error, "unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p", @@ -571,6 +580,7 @@ handle_info(Info, State) -> terminate(Reason, #state{client_id = ClientId}) -> emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]), + %%TODO: notify conn_pid to shutdown? emqx_sm:unregister_session(ClientId). code_change(_OldVsn, State, _Extra) -> @@ -819,6 +829,11 @@ ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_tim ensure_await_rel_timer(State) -> State. +ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 -> + State#state{expiry_timer = emqx_misc:start_timer(Interval, expired)}; +ensure_expire_timer(State) -> + State. + %%------------------------------------------------------------------------------ %% Reset Dup @@ -837,8 +852,7 @@ next_pkt_id(State = #state{next_pkt_id = Id}) -> %%------------------------------------------------------------------------------ %% Ensure stats timer -ensure_stats_timer(State = #state{enable_stats = true, - stats_timer = undefined}) -> +ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined}) -> State#state{stats_timer = erlang:send_after(30000, self(), emit_stats)}; ensure_stats_timer(State) -> State. @@ -857,5 +871,5 @@ reply(Reply, State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. -%%TODO: maybe_gc(State) -> State. +%% TODO: maybe_gc(State) -> State.