From bad855bdd93b2e2547947bce25ef5442fb5a9e62 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 16 Feb 2017 11:46:06 +0800 Subject: [PATCH] Improve the session design, support tune_qos, enable_stats --- src/emqttd_session.erl | 1011 ++++++++++++++++++++++------------------ 1 file changed, 561 insertions(+), 450 deletions(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index a73503479..31efaab55 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,47 +14,60 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Session for persistent MQTT client. %% -%% Session State in the broker consists of: +%% @doc MQTT Session %% -%% 1. The Client’s subscriptions. +%% A stateful interaction between a Client and a Server. Some Sessions +%% last only as long as the Network Connection, others can span multiple +%% consecutive Network Connections between a Client and a Server. %% -%% 2. inflight qos1/2 messages sent to the client but unacked, QoS 1 and QoS 2 -%% messages which have been sent to the Client, but have not been completely -%% acknowledged. +%% The Session state in the Server consists of: %% -%% 3. inflight qos2 messages received from client and waiting for pubrel. QoS 2 -%% messages which have been received from the Client, but have not been -%% completely acknowledged. +%% The existence of a Session, even if the rest of the Session state is empty. %% -%% 4. all qos1, qos2 messages published to when client is disconnected. -%% QoS 1 and QoS 2 messages pending transmission to the Client. +%% The Client’s subscriptions. %% -%% 5. Optionally, QoS 0 messages pending transmission to the Client. +%% QoS 1 and QoS 2 messages which have been sent to the Client, but have not +%% been completely acknowledged. %% -%% State of Message: newcome, inflight, pending +%% QoS 1 and QoS 2 messages pending transmission to the Client. +%% +%% QoS 2 messages which have been received from the Client, but have not +%% been completely acknowledged. +%% +%% Optionally, QoS 0 messages pending transmission to the Client. +%% +%% If the session is currently disconnected, the time at which the Session state +%% will be deleted. %% %% @end +%% -module(emqttd_session). +-behaviour(gen_server2). + +-author("Feng Lee "). + -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). -include("emqttd_internal.hrl"). --behaviour(gen_server2). +-import(emqttd_misc, [start_timer/2]). -import(proplists, [get_value/2, get_value/3]). %% Session API --export([start_link/3, resume/3, info/1, destroy/2]). +-export([start_link/3, resume/3, destroy/2]). -%% PubSub APIs --export([publish/2, puback/2, pubrec/2, pubrel/2, pubcomp/2, - subscribe/2, subscribe/3, unsubscribe/2]). +%% Management and Monitor API +-export([state/1, info/1, stats/1]). + +%% PubSub API +-export([subscribe/2, subscribe/3, publish/2, puback/2, pubrec/2, + pubrel/2, pubcomp/2, unsubscribe/2]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -63,150 +76,199 @@ %% gen_server2 Message Priorities -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). --record(session, { +-record(state, + { + %% Clean Session Flag + clean_sess = false :: boolean(), - %% Clean Session Flag - clean_sess = true, + %% Client Binding: local | remote + binding = local :: local | remote, - %% ClientId: Identifier of Session - client_id :: binary(), + %% ClientId: Identifier of Session + client_id :: binary(), - %% Client Pid bind with session - client_pid :: pid(), + %% Username + username :: binary() | undefined, - %% Old Client Pid that has been kickout - old_client_pid :: pid(), + %% Client Pid binding with session + client_pid :: pid(), - %% Username - username :: binary() | undefined, + %% Old Client Pid that has been kickout + old_client_pid :: pid(), - %% Last packet id of the session - packet_id = 1, - - %% Client’s subscriptions. - subscriptions :: map(), + %% Next message id of the session + next_msg_id = 1 :: mqtt_packet_id(), - %% Inflight qos1, qos2 messages sent to the client but unacked, - %% QoS 1 and QoS 2 messages which have been sent to the Client, - %% but have not been completely acknowledged. - %% Client <- Broker - inflight_queue :: list(), + max_subscriptions :: non_neg_integer(), - max_inflight = 0, + %% Client’s subscriptions. + subscriptions :: map(), - %% All qos1, qos2 messages published to when client is disconnected. - %% QoS 1 and QoS 2 messages pending transmission to the Client. - %% - %% Optionally, QoS 0 messages pending transmission to the Client. - message_queue :: emqttd_mqueue:mqueue(), + %% Upgrade Qos? + upgrade_qos = false :: boolean(), - %% Inflight qos2 messages received from client and waiting for pubrel. - %% QoS 2 messages which have been received from the Client, - %% but have not been completely acknowledged. - %% Client -> Broker - awaiting_rel :: map(), + %% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked. + inflight :: emqttd_inflight:inflight(), - %% Awaiting PUBREL timeout - await_rel_timeout = 8, + %% Max Inflight Size + max_inflight = 32 :: non_neg_integer(), - %% Max Packets that Awaiting PUBREL - max_awaiting_rel = 100, + %% Retry interval for redelivering QoS1/2 messages + retry_interval = 20000 :: pos_integer(), - %% Awaiting timers for ack, rel. - awaiting_ack :: map(), + %% Retry Timer + retry_timer :: reference(), - %% Retry interval for redelivering QoS1/2 messages - retry_interval = 20, + %% All QoS1, QoS2 messages published to when client is disconnected. + %% QoS 1 and QoS 2 messages pending transmission to the Client. + %% + %% Optionally, QoS 0 messages pending transmission to the Client. + mqueue :: emqttd_mqueue:mqueue(), - %% Awaiting for PUBCOMP - awaiting_comp :: map(), + %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. + awaiting_rel :: map(), - %% session expired after 48 hours - expired_after = 172800, + %% Awaiting PUBREL timeout + await_rel_timeout = 20000 :: pos_integer(), - expired_timer, + %% Max Packets that Awaiting PUBREL + max_awaiting_rel = 100 :: non_neg_integer(), - collect_interval, + %% Awaiting PUBREL timer + await_rel_timer :: reference(), - collect_timer, - - timestamp}). + %% Session Expiry Interval + expiry_interval = 7200000 :: pos_integer(), --define(PUBSUB_TIMEOUT, 60000). + %% Expired Timer + expiry_timer :: reference(), + + %% Enable Stats + enable_stats :: false | pos_integer(), + + %% Stats Timer + stats_timer :: reference(), + + created_at :: erlang:timestamp() + }). + +-define(TIMEOUT, 60000). + +-define(INFO_KEYS, [clean_sess, client_id, username, client_pid, binding, created_at]). + +-define(STATE_KEYS, [clean_sess, client_id, username, binding, client_pid, old_client_pid, + next_msg_id, max_subscriptions, subscriptions, upgrade_qos, inflight, + max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel, + await_rel_timeout, expiry_interval, enable_stats, created_at]). -define(LOG(Level, Format, Args, State), - lager:Level([{client, State#session.client_id}], - "Session(~s): " ++ Format, [State#session.client_id | Args])). + lager:Level([{client, State#state.client_id}], + "Session(~s): " ++ Format, [State#state.client_id | Args])). -%% @doc Start a session. +%% @doc Start a Session -spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, any()}). start_link(CleanSess, {ClientId, Username}, ClientPid) -> gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []). -%% @doc Resume a session. --spec(resume(pid(), mqtt_client_id(), pid()) -> ok). -resume(SessPid, ClientId, ClientPid) -> - gen_server2:cast(SessPid, {resume, ClientId, ClientPid}). - -%% @doc Session Info. -info(SessPid) -> - gen_server2:call(SessPid, info). - -%% @doc Destroy a session. --spec(destroy(pid(), mqtt_client_id()) -> ok). -destroy(SessPid, ClientId) -> - gen_server2:cast(SessPid, {destroy, ClientId}). - %%-------------------------------------------------------------------- -%% PubSub +%% PubSub API %%-------------------------------------------------------------------- -%% @doc Subscribe Topics +%% @doc Subscribe topics -spec(subscribe(pid(), [{binary(), [emqttd_topic:option()]}]) -> ok). -subscribe(SessPid, TopicTable) -> - gen_server2:cast(SessPid, {subscribe, TopicTable, fun(_) -> ok end}). +subscribe(Session, TopicTable) ->%%TODO: the ack function??... + gen_server2:cast(Session, {subscribe, self(), TopicTable, fun(_) -> ok end}). --spec(subscribe(pid(), mqtt_pktid(), [{binary(), [emqttd_topic:option()]}]) -> ok). -subscribe(SessPid, PktId, TopicTable) -> +-spec(subscribe(pid(), mqtt_packet_id(), [{binary(), [emqttd_topic:option()]}]) -> ok). +subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??... From = self(), - AckFun = fun(GrantedQos) -> From ! {suback, PktId, GrantedQos} end, - gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}). + AckFun = fun(GrantedQos) -> From ! {suback, PacketId, GrantedQos} end, + gen_server2:cast(Session, {subscribe, From, TopicTable, AckFun}). -%% @doc Publish message +%% @doc Publish Message -spec(publish(pid(), mqtt_message()) -> ok | {error, any()}). -publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) -> - %% publish qos0 directly - emqttd:publish(Msg), ok; +publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) -> + %% Publish QoS0 Directly + emqttd_server:publish(Msg), ok; -publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) -> - %% publish qos1 directly, and client will puback automatically - emqttd:publish(Msg), ok; +publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) -> + %% Publish QoS1 message directly for client will PubAck automatically + emqttd_server:publish(Msg), ok; -publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) -> - %% publish qos2 by session - gen_server2:call(SessPid, {publish, Msg}, ?PUBSUB_TIMEOUT). +publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) -> + %% Publish QoS2 to Session + gen_server2:call(Session, {publish, Msg}, ?TIMEOUT). -%% @doc PubAck message +%% @doc PubAck Message -spec(puback(pid(), mqtt_packet_id()) -> ok). -puback(SessPid, PktId) -> - gen_server2:cast(SessPid, {puback, PktId}). +puback(Session, PacketId) -> + gen_server2:cast(Session, {puback, PacketId}). -spec(pubrec(pid(), mqtt_packet_id()) -> ok). -pubrec(SessPid, PktId) -> - gen_server2:cast(SessPid, {pubrec, PktId}). +pubrec(Session, PacketId) -> + gen_server2:cast(Session, {pubrec, PacketId}). -spec(pubrel(pid(), mqtt_packet_id()) -> ok). -pubrel(SessPid, PktId) -> - gen_server2:cast(SessPid, {pubrel, PktId}). +pubrel(Session, PacketId) -> + gen_server2:cast(Session, {pubrel, PacketId}). -spec(pubcomp(pid(), mqtt_packet_id()) -> ok). -pubcomp(SessPid, PktId) -> - gen_server2:cast(SessPid, {pubcomp, PktId}). +pubcomp(Session, PacketId) -> + gen_server2:cast(Session, {pubcomp, PacketId}). -%% @doc Unsubscribe Topics +%% @doc Unsubscribe the topics -spec(unsubscribe(pid(), [{binary(), [emqttd_topic:option()]}]) -> ok). -unsubscribe(SessPid, TopicTable) -> - gen_server2:cast(SessPid, {unsubscribe, TopicTable}). +unsubscribe(Session, TopicTable) -> + gen_server2:cast(Session, {unsubscribe, self(), TopicTable}). + +%% @doc Resume the session +-spec(resume(pid(), mqtt_client_id(), pid()) -> ok). +resume(Session, ClientId, ClientPid) -> + gen_server2:cast(Session, {resume, ClientId, ClientPid}). + +%% @doc Get session state +state(Session) when is_pid(Session) -> + gen_server2:call(Session, state). + +%% @doc Get session info +-spec(info(pid() | #state{}) -> list(tuple())). +info(Session) when is_pid(Session) -> + gen_server2:call(Session, info); + +info(State) when is_record(State, state) -> + ?record_to_proplist(state, State, ?INFO_KEYS). + +-spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})). +stats(Session) when is_pid(Session) -> + gen_server2:call(Session, stats); + +stats(#state{max_subscriptions = MaxSubscriptions, + subscriptions = Subscriptions, + inflight = Inflight, + max_inflight = MaxInflight, + mqueue = MQueue, + max_awaiting_rel = MaxAwaitingRel, + awaiting_rel = AwaitingRel}) -> + lists:append(emqttd_misc:proc_stats(), + [{max_subscriptions, MaxSubscriptions}, + {subscriptions, maps:size(Subscriptions)}, + {max_inflight, MaxInflight}, + {inflight_len, Inflight:size()}, + {max_mqueue, case emqttd_mqueue:max_len(MQueue) of + infinity -> 0; + Len -> Len + end}, + {mqueue_len, emqttd_mqueue:len(MQueue)}, + {mqueue_dropped, emqttd_mqueue:dropped(MQueue)}, + {max_awaiting_rel, MaxAwaitingRel}, + {awaiting_rel_len, maps:size(AwaitingRel)}, + {deliver_msg, get(deliver_msg)}, + {enqueue_msg, get(enqueue_msg)}]). + +%% @doc Destroy the session +-spec(destroy(pid(), mqtt_client_id()) -> ok). +destroy(Session, ClientId) -> + gen_server2:cast(Session, {destroy, ClientId}). %%-------------------------------------------------------------------- %% gen_server Callbacks @@ -215,36 +277,43 @@ unsubscribe(SessPid, TopicTable) -> init([CleanSess, {ClientId, Username}, ClientPid]) -> process_flag(trap_exit, true), true = link(ClientPid), + init_stats([deliver_msg, enqueue_msg]), + {ok, Env} = emqttd:env(session), {ok, QEnv} = emqttd:env(queue), - {ok, SessEnv} = emqttd:env(session), - Session = #session{ - clean_sess = CleanSess, - client_id = ClientId, - client_pid = ClientPid, - username = Username, - subscriptions = #{}, - inflight_queue = [], - max_inflight = get_value(max_inflight, SessEnv, 0), - message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), - awaiting_rel = #{}, - awaiting_ack = #{}, - awaiting_comp = #{}, - retry_interval = get_value(retry_interval, SessEnv), - await_rel_timeout = get_value(await_rel_timeout, SessEnv), - max_awaiting_rel = get_value(max_awaiting_rel, SessEnv), - expired_after = get_value(expired_after, SessEnv), - collect_interval = get_value(collect_interval, SessEnv, 0), - timestamp = os:timestamp()}, - emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)), - emqttd:run_hooks('session.created', [ClientId, Username]), - %% Start statistics - {ok, start_collector(Session), hibernate}. + MaxInflight = get_value(max_inflight, Env, 0), + EnableStats = get_value(enable_stats, Env, false), + MQueue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), + State = #state{clean_sess = CleanSess, + binding = binding(ClientPid), + client_id = ClientId, + client_pid = ClientPid, + username = Username, + subscriptions = #{}, + max_subscriptions = get_value(max_subscriptions, Env, 0), + upgrade_qos = get_value(upgrade_qos, Env, false), + max_inflight = MaxInflight, + inflight = emqttd_inflight:new(MaxInflight), + mqueue = MQueue, + retry_interval = get_value(retry_interval, Env), + awaiting_rel = #{}, + await_rel_timeout = get_value(await_rel_timeout, Env), + max_awaiting_rel = get_value(max_awaiting_rel, Env), + expiry_interval = get_value(expiry_interval, Env), + enable_stats = EnableStats, + created_at = os:timestamp()}, + emqttd_stats:set_session_stats(ClientId, stats(State)), + emqttd_sm:register_session(ClientId, CleanSess, info(State)), + emqttd_hooks:run('session.created', [ClientId, Username]), + {ok, State, hibernate, {backoff, 1000, 1000, 5000}, ?MODULE}. + +init_stats(Keys) -> + lists:foreach(fun(K) -> put(K, 0) end, Keys). + +binding(ClientPid) -> + case node(ClientPid) =:= node() of true -> local; false -> remote end. prioritise_call(Msg, _From, _Len, _State) -> - case Msg of - info -> 10; - _ -> 0 - end. + case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end. prioritise_cast(Msg, _Len, _State) -> case Msg of @@ -262,279 +331,249 @@ prioritise_cast(Msg, _Len, _State) -> prioritise_info(Msg, _Len, _State) -> case Msg of {'EXIT', _, _} -> 10; - expired -> 10; {timeout, _, _} -> 5; - collect_info -> 2; {dispatch, _, _} -> 1; _ -> 0 end. -handle_call(info, _From, State) -> - {reply, sess_info(State), State, hibernate}; - -handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, - _From, Session = #session{awaiting_rel = AwaitingRel, - await_rel_timeout = Timeout}) -> - case check_awaiting_rel(Session) of - true -> - TRef = timer(Timeout, {timeout, awaiting_rel, PktId}), - AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), - {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; +handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PacketId}}, + _From, State = #state{awaiting_rel = AwaitingRel, + await_rel_timer = Timer, + await_rel_timeout = Timeout}) -> + case is_awaiting_full(State) of false -> - ?LOG(critical, "Dropped Qos2 message for too many awaiting_rel: ~p", [Msg], Session), - {reply, {error, dropped}, Session, hibernate} + State1 = case Timer == undefined of + true -> State#state{await_rel_timer = start_timer(Timeout, check_awaiting_rel)}; + false -> State + end, + reply(ok, State1#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)}); + true -> + ?LOG(warning, "Dropped Qos2 Message for too many awaiting_rel: ~p", [Msg], State), + emqttd_metrics:inc('messages/qos2/dropped'), + reply({error, dropped}, State) end; +handle_call(info, _From, State) -> + reply(info(State), State); + +handle_call(stats, _From, State) -> + reply(stats(State), State); + +handle_call(state, _From, State) -> + reply(?record_to_proplist(state, State, ?STATE_KEYS), State); + handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). -handle_cast({subscribe, TopicTable, AckFun}, Session = #session{client_id = ClientId, - username = Username, - subscriptions = Subscriptions}) -> - ?LOG(info, "Subscribe ~p", [TopicTable], Session), +handle_cast({subscribe, _From, TopicTable, AckFun}, + State = #state{client_id = ClientId, + username = Username, + subscriptions = Subscriptions}) -> + ?LOG(info, "Subscribe ~p", [TopicTable], State), {GrantedQos, Subscriptions1} = lists:foldl(fun({Topic, Opts}, {QosAcc, SubMap}) -> NewQos = proplists:get_value(qos, Opts), SubMap1 = case maps:find(Topic, SubMap) of {ok, NewQos} -> - ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], Session), + ?LOG(warning, "Duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], State), SubMap; {ok, OldQos} -> emqttd:setqos(Topic, ClientId, NewQos), - ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", - [Topic, OldQos, NewQos], Session), + ?LOG(warning, "Duplicated subscribe ~s, old_qos=~w, new_qos=~w", + [Topic, OldQos, NewQos], State), maps:put(Topic, NewQos, SubMap); error -> emqttd:subscribe(Topic, ClientId, Opts), - emqttd:run_hooks('session.subscribed', [ClientId, Username], {Topic, Opts}), + emqttd_hooks:run('session.subscribed', [ClientId, Username], {Topic, Opts}), maps:put(Topic, NewQos, SubMap) end, {[NewQos|QosAcc], SubMap1} end, {[], Subscriptions}, TopicTable), AckFun(lists:reverse(GrantedQos)), - hibernate(Session#session{subscriptions = Subscriptions1}); + noreply(emit_stats(State#state{subscriptions = Subscriptions1})); -handle_cast({unsubscribe, TopicTable}, Session = #session{client_id = ClientId, - username = Username, - subscriptions = Subscriptions}) -> - ?LOG(info, "unsubscribe ~p", [TopicTable], Session), +handle_cast({unsubscribe, _From, TopicTable}, + State = #state{client_id = ClientId, + username = Username, + subscriptions = Subscriptions}) -> + ?LOG(info, "Unsubscribe ~p", [TopicTable], State), Subscriptions1 = lists:foldl(fun({Topic, Opts}, SubMap) -> case maps:find(Topic, SubMap) of {ok, _Qos} -> emqttd:unsubscribe(Topic, ClientId), - emqttd:run_hooks('session.unsubscribed', [ClientId, Username], {Topic, Opts}), + emqttd_hooks:run('session.unsubscribed', [ClientId, Username], {Topic, Opts}), maps:remove(Topic, SubMap); error -> SubMap end end, Subscriptions, TopicTable), - hibernate(Session#session{subscriptions = Subscriptions1}); + noreply(emit_stats(State#state{subscriptions = Subscriptions1})); -handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId, client_pid = undefined}) -> - ?LOG(warning, "destroyed", [], Session), - shutdown(destroy, Session); +%% PUBACK: +handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) -> + case Inflight:contain(PacketId) of + true -> + noreply(dequeue(acked(puback, PacketId, State))); + false -> + ?LOG(warning, "The PUBACK ~p is not inflight: ~p", + [PacketId, Inflight:window()], State), + emqttd_metrics:inc('packets/puback/missed'), + noreply(State) + end; -handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId, client_pid = OldClientPid}) -> - ?LOG(warning, "kickout ~p", [OldClientPid], Session), - shutdown(conflict, Session); +%% PUBREC: +handle_cast({pubrec, PacketId}, State = #state{inflight = Inflight}) -> + case Inflight:contain(PacketId) of + true -> + noreply(acked(pubrec, PacketId, State)); + false -> + ?LOG(warning, "The PUBREC ~p is not inflight: ~p", + [PacketId, Inflight:window()], State), + emqttd_metrics:inc('packets/pubrec/missed'), + noreply(State) + end; -handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId, - client_pid = OldClientPid, - clean_sess = CleanSess, - inflight_queue = InflightQ, - awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp, - expired_timer = ETimer} = Session) -> +%% PUBREL: +handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) -> + case maps:take(PacketId, AwaitingRel) of + {Msg, AwaitingRel1} -> + spawn(emqttd_server, publish, [Msg]),%%:) + noreply(State#state{awaiting_rel = AwaitingRel1}); + error -> + ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State), + emqttd_metrics:inc('packets/pubrel/missed'), + noreply(State) + end; - ?LOG(info, "resumed by ~p", [ClientPid], Session), +%% PUBCOMP: +handle_cast({pubcomp, PacketId}, State = #state{inflight = Inflight}) -> + case Inflight:contain(PacketId) of + true -> + noreply(dequeue(acked(pubcomp, PacketId, State))); + false -> + ?LOG(warning, "The PUBCOMP ~p is not inflight: ~p", + [PacketId, Inflight:window()], State), + emqttd_metrics:inc('packets/pubcomp/missed'), + noreply(State) + end; - %% Cancel expired timer - cancel_timer(ETimer), +%% RESUME: +handle_cast({resume, ClientId, ClientPid}, + State = #state{client_id = ClientId, + client_pid = OldClientPid, + clean_sess = CleanSess, + retry_timer = RetryTimer, + await_rel_timer = AwaitTimer, + stats_timer = StatsTimer, + expiry_timer = ExpireTimer}) -> + + ?LOG(info, "Resumed by ~p", [ClientPid], State), + + %% Cancel Timers + lists:foreach(fun emqttd_misc:cancel_timer/1, + [RetryTimer, AwaitTimer, StatsTimer, ExpireTimer]), case kick(ClientId, OldClientPid, ClientPid) of - ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], Session); + ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], State); ignore -> ok end, true = link(ClientPid), - %% Redeliver PUBREL - [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)], + State1 = State#state{client_pid = ClientPid, + binding = binding(ClientPid), + old_client_pid = OldClientPid, + clean_sess = false, + retry_timer = undefined, + awaiting_rel = #{}, + await_rel_timer = undefined, + expiry_timer = undefined}, - %% Clear awaiting_ack timers - [cancel_timer(TRef) || TRef <- maps:values(AwaitingAck)], - - %% Clear awaiting_comp timers - [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)], - - Session1 = Session#session{client_pid = ClientPid, - old_client_pid = OldClientPid, - clean_sess = false, - awaiting_ack = #{}, - awaiting_comp = #{}, - expired_timer = undefined}, - - %% CleanSess: true -> false? + %% Clean Session: true -> false? if - CleanSess =:= true -> - ?LOG(warning, "CleanSess changed to false.", [], Session), - emqttd_sm:reg_session(ClientId, false, sess_info(Session1)); + CleanSess =:= true -> + ?LOG(error, "CleanSess changed to false.", [], State1), + emqttd_sm:register_session(ClientId, false, info(State1)); CleanSess =:= false -> ok end, - %% Redeliver inflight messages - Session2 = - lists:foldl(fun({_Id, Msg}, Sess) -> - redeliver(Msg, Sess) - end, Session1, lists:reverse(InflightQ)), + %% Replay delivery and Dequeue pending messages + noreply(emit_stats(dequeue(retry_delivery(true, State1)))); - %% Dequeue pending messages - hibernate(dequeue(Session2)); +handle_cast({destroy, ClientId}, State = #state{client_id = ClientId, + client_pid = undefined}) -> + ?LOG(warning, "Destroyed", [], State), + shutdown(destroy, State); -%% PUBACK -handle_cast({puback, PktId}, Session = #session{awaiting_ack = AwaitingAck}) -> - case maps:find(PktId, AwaitingAck) of - {ok, TRef} -> - cancel_timer(TRef), - hibernate(dequeue(acked(PktId, Session))); - error -> - ?LOG(warning, "Cannot find PUBACK: ~p", [PktId], Session), - hibernate(Session) - end; - -%% PUBREC -handle_cast({pubrec, PktId}, Session = #session{awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp, - await_rel_timeout = Timeout}) -> - case maps:find(PktId, AwaitingAck) of - {ok, TRef} -> - cancel_timer(TRef), - TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}), - AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp), - Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}), - hibernate(dequeue(Session1)); - error -> - ?LOG(error, "Cannot find PUBREC: ~p", [PktId], Session), - hibernate(Session) - end; - -%% PUBREL -handle_cast({pubrel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) -> - case maps:find(PktId, AwaitingRel) of - {ok, {Msg, TRef}} -> - cancel_timer(TRef), - emqttd:publish(Msg), - hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); - error -> - ?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session), - hibernate(Session) - end; - -%% PUBCOMP -handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp}) -> - case maps:find(PktId, AwaitingComp) of - {ok, TRef} -> - cancel_timer(TRef), - hibernate(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}); - error -> - ?LOG(error, "Cannot find PUBCOMP: ~p", [PktId], Session), - hibernate(Session) - end; +handle_cast({destroy, ClientId}, State = #state{client_id = ClientId, + client_pid = OldClientPid}) -> + ?LOG(warning, "kickout ~p", [OldClientPid], State), + shutdown(conflict, State); handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). %% Dispatch Message -handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions}) - when is_record(Msg, mqtt_message) -> - dispatch(tune_qos(Topic, Msg, Subscriptions), Session); +handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) -> + noreply(dispatch(tune_qos(Topic, Msg, State), State)); -handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, - awaiting_ack = AwaitingAck}) -> - %% just remove awaiting - hibernate(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); +%% Do nothing if the client has been disconnected. +handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) -> + hibernate(emit_stats(State#state{retry_timer = undefined})); -handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ, - awaiting_ack = AwaitingAck}) -> - case maps:find(PktId, AwaitingAck) of - {ok, _TRef} -> - case lists:keyfind(PktId, 1, InflightQ) of - {_, Msg} -> - hibernate(redeliver(Msg, Session)); - false -> - ?LOG(error, "AwaitingAck timeout but Cannot find PktId: ~p", [PktId], Session), - hibernate(dequeue(Session)) - end; - error -> - ?LOG(error, "Cannot find AwaitingAck: ~p", [PktId], Session), - hibernate(Session) - end; +handle_info({timeout, _Timer, retry_delivery}, State) -> + noreply(emit_stats(retry_delivery(false, State#state{retry_timer = undefined}))); -handle_info({timeout, awaiting_rel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) -> - case maps:find(PktId, AwaitingRel) of - {ok, {_Msg, _TRef}} -> - ?LOG(warning, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session), - hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); - error -> - ?LOG(error, "Cannot find AwaitingRel: ~p", [PktId], Session), - hibernate(Session) - end; +handle_info({timeout, _Timer, check_awaiting_rel}, State) -> + noreply(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined}))); -handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp = Awaiting}) -> - case maps:find(PktId, Awaiting) of - {ok, _TRef} -> - ?LOG(warning, "Awaiting PUBCOMP Timout: ~p", [PktId], Session), - hibernate(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}); - error -> - ?LOG(error, "Cannot find Awaiting PUBCOMP: ~p", [PktId], Session), - hibernate(Session) - end; +handle_info({timeout, _Timer, emit_stats}, State) -> + hibernate(maybe_enable_stats(emit_stats(State))); -handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) -> - emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)), - hibernate(start_collector(Session)); +handle_info({timeout, _Timer, expired}, State) -> + ?LOG(info, "Expired, shutdown now.", [], State), + shutdown(expired, State); -handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, - client_pid = ClientPid}) -> - {stop, normal, Session}; +handle_info({'EXIT', ClientPid, _Reason}, + State = #state{clean_sess = true, client_pid = ClientPid}) -> + {stop, normal, State}; -handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, - client_pid = ClientPid, - expired_after = Expires}) -> - ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], Session), - TRef = timer(Expires, expired), - hibernate(Session#session{client_pid = undefined, expired_timer = TRef}); +handle_info({'EXIT', ClientPid, Reason}, + State = #state{clean_sess = false, + client_pid = ClientPid, + expiry_interval = Interval}) -> + ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State), + ExpireTimer = start_timer(Interval, expired), + State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer}, + hibernate(maybe_enable_stats(emit_stats(State1))); -handle_info({'EXIT', Pid, _Reason}, Session = #session{old_client_pid = Pid}) -> +handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) -> %%ignore - hibernate(Session); + hibernate(State); -handle_info({'EXIT', Pid, Reason}, Session = #session{client_pid = ClientPid}) -> +handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) -> ?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", - [ClientPid, Pid, Reason], Session), - hibernate(Session); - -handle_info(expired, Session) -> - ?LOG(info, "expired, shutdown now.", [], Session), - shutdown(expired, Session); + [ClientPid, Pid, Reason], State), + hibernate(State); handle_info(Info, Session) -> ?UNEXPECTED_INFO(Info, Session). -terminate(Reason, #session{client_id = ClientId, username = Username}) -> - emqttd:run_hooks('session.terminated', [ClientId, Username, Reason]), - emqttd:subscriber_down(ClientId), - emqttd_sm:unreg_session(ClientId). +terminate(Reason, #state{client_id = ClientId, username = Username}) -> + emqttd_stats:del_session_stats(ClientId), + emqttd_hooks:run('session.terminated', [ClientId, Username, Reason]), + emqttd_server:subscriber_down(ClientId), + emqttd_sm:unregister_session(ClientId). code_change(_OldVsn, Session, _Extra) -> {ok, Session}. %%-------------------------------------------------------------------- -%% Kick old client out +%% Kick old client %%-------------------------------------------------------------------- kick(_ClientId, undefined, _Pid) -> ignore; @@ -546,130 +585,228 @@ kick(ClientId, OldPid, Pid) -> %% Clean noproc receive {'EXIT', OldPid, _} -> ok after 0 -> ok end. +%%-------------------------------------------------------------------- +%% Replay or Retry Delivery +%%-------------------------------------------------------------------- + +%% Redeliver at once if Force is true + +retry_delivery(Force, State = #state{inflight = Inflight}) -> + case Inflight:is_empty() of + true -> State; + false -> Msgs = lists:sort(sortfun(inflight), Inflight:values()), + retry_delivery(Force, Msgs, os:timestamp(), State) + end. + +retry_delivery(_Force, [], _Now, State = #state{retry_interval = Interval}) -> + State#state{retry_timer = start_timer(Interval, retry_delivery)}; + +retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now, + State = #state{inflight = Inflight, + retry_interval = Interval}) -> + Diff = timer:now_diff(Now, Ts) div 1000, %% micro -> ms + if + Force orelse (Diff >= Interval) -> + case {Type, Msg} of + {publish, Msg = #mqtt_message{pktid = PacketId}} -> + redeliver(Msg, State), + Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}), + retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); + {pubrel, PacketId} -> %% remove 'pubrel' directly? + retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight:delete(PacketId)}) + end; + true -> + State#state{retry_timer = start_timer(Interval - Diff, retry_delivery)} + end. + +%%-------------------------------------------------------------------- +%% Expire Awaiting Rel +%%-------------------------------------------------------------------- + +expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) -> + case maps:size(AwaitingRel) of + 0 -> State; + _ -> Msgs = lists:sort(sortfun(awaiting_rel), maps:to_list(AwaitingRel)), + expire_awaiting_rel(Msgs, os:timestamp(), State) + end. + +expire_awaiting_rel([], _Now, State) -> + State#state{await_rel_timer = undefined}; + +expire_awaiting_rel([{PacketId, #mqtt_message{timestamp = TS}} | Msgs], + Now, State = #state{awaiting_rel = AwaitingRel, + await_rel_timeout = Timeout}) -> + case (timer:now_diff(Now, TS) div 1000) of + Diff when Diff >= Timeout -> + expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); + Diff -> + State#state{await_rel_timer = start_timer(Timeout - Diff, check_awaiting_rel)} + end. + +%%-------------------------------------------------------------------- +%% Sort Inflight, AwaitingRel +%%-------------------------------------------------------------------- + +sortfun(inflight) -> + fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end; + +sortfun(awaiting_rel) -> + fun({_, #mqtt_message{timestamp = Ts1}}, + {_, #mqtt_message{timestamp = Ts2}}) -> + Ts1 < Ts2 + end. + +%%-------------------------------------------------------------------- +%% Check awaiting rel +%%-------------------------------------------------------------------- + +is_awaiting_full(#state{max_awaiting_rel = 0}) -> + false; +is_awaiting_full(#state{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen}) -> + maps:size(AwaitingRel) >= MaxLen. + %%-------------------------------------------------------------------- %% Dispatch Messages %%-------------------------------------------------------------------- -%% Queue message if client disconnected -dispatch(Msg, Session = #session{client_pid = undefined, message_queue = Q}) -> - hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); +%% Enqueue message if the client has been disconnected +dispatch(Msg, State = #state{client_pid = undefined}) -> + enqueue_msg(Msg, State); %% Deliver qos0 message directly to client -dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) -> - ClientPid ! {deliver, Msg}, - hibernate(Session); +dispatch(Msg = #mqtt_message{qos = ?QOS0}, State) -> + deliver(Msg, State), State; -dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ}) +dispatch(Msg = #mqtt_message{qos = QoS}, + State = #state{next_msg_id = MsgId, inflight = Inflight}) when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> - case check_inflight(Session) of + case Inflight:is_full() of true -> - noreply(deliver(Msg, Session)); + enqueue_msg(Msg, State); false -> - hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) + Msg1 = Msg#mqtt_message{pktid = MsgId}, + deliver(Msg1, State), + await(Msg1, next_msg_id(State)) end. -tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, SubMap) -> +enqueue_msg(Msg, State = #state{mqueue = Q}) -> + inc(enqueue_msg), State#state{mqueue = emqttd_mqueue:in(Msg, Q)}. + +%%-------------------------------------------------------------------- +%% Deliver +%%-------------------------------------------------------------------- + +redeliver(Msg = #mqtt_message{qos = QoS}, State) -> + deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State). + +deliver(Msg, #state{client_pid = Pid}) -> + inc(deliver_msg), Pid ! {deliver, Msg}. + +%%-------------------------------------------------------------------- +%% Awaiting ACK for QoS1/QoS2 Messages +%%-------------------------------------------------------------------- + +await(Msg = #mqtt_message{pktid = PacketId}, + State = #state{inflight = Inflight, + retry_timer = RetryTimer, + retry_interval = Interval}) -> + %% Start retry timer if the Inflight is still empty + State1 = ?IF(RetryTimer == undefined, State#state{retry_timer = start_timer(Interval, retry_delivery)}, State), + State1#state{inflight = Inflight:insert(PacketId, {publish, Msg, os:timestamp()})}. + +acked(puback, PacketId, State = #state{client_id = ClientId, + username = Username, + inflight = Inflight}) -> + {publish, Msg, _Ts} = Inflight:lookup(PacketId), + emqttd_hooks:run('message.acked', [ClientId, Username], Msg), + State#state{inflight = Inflight:delete(PacketId)}; + +acked(pubrec, PacketId, State = #state{client_id = ClientId, + username = Username, + inflight = Inflight}) -> + {publish, Msg, _Ts} = Inflight:lookup(PacketId), + emqttd_hooks:run('message.acked', [ClientId, Username], Msg), + State#state{inflight = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()})}; + +acked(pubcomp, PacketId, State = #state{inflight = Inflight}) -> + State#state{inflight = Inflight:delete(PacketId)}. + +%%-------------------------------------------------------------------- +%% Dequeue +%%-------------------------------------------------------------------- + +%% Do nothing if client is disconnected +dequeue(State = #state{client_pid = undefined}) -> + State; + +dequeue(State = #state{inflight = Inflight}) -> + case Inflight:is_full() of + true -> State; + false -> dequeue2(State) + end. + +dequeue2(State = #state{mqueue = Q}) -> + case emqttd_mqueue:out(Q) of + {empty, _Q} -> + State; + {{value, Msg}, Q1} -> + %% Dequeue more + dequeue(dispatch(Msg, State#state{mqueue = Q1})) + end. + +%%-------------------------------------------------------------------- +%% Tune QoS +%%-------------------------------------------------------------------- + +tune_qos(Topic, Msg = #mqtt_message{qos = PubQoS}, + #state{subscriptions = SubMap, upgrade_qos = UpgradeQoS}) -> case maps:find(Topic, SubMap) of - {ok, SubQos} when PubQos > SubQos -> - Msg#mqtt_message{qos = SubQos}; - {ok, _SubQos} -> + {ok, SubQoS} when UpgradeQoS andalso (SubQoS > PubQoS) -> + Msg#mqtt_message{qos = SubQoS}; + {ok, SubQoS} when (not UpgradeQoS) andalso (SubQoS < PubQoS) -> + Msg#mqtt_message{qos = SubQoS}; + {ok, _} -> Msg; error -> Msg end. %%-------------------------------------------------------------------- -%% Check inflight and awaiting_rel +%% Next Msg Id %%-------------------------------------------------------------------- -check_inflight(#session{max_inflight = 0}) -> - true; -check_inflight(#session{max_inflight = Max, inflight_queue = Q}) -> - Max > length(Q). +next_msg_id(State = #state{next_msg_id = 16#FFFF}) -> + State#state{next_msg_id = 1}; -check_awaiting_rel(#session{max_awaiting_rel = 0}) -> - true; -check_awaiting_rel(#session{awaiting_rel = AwaitingRel, - max_awaiting_rel = MaxLen}) -> - maps:size(AwaitingRel) < MaxLen. +next_msg_id(State = #state{next_msg_id = Id}) -> + State#state{next_msg_id = Id + 1}. %%-------------------------------------------------------------------- -%% Dequeue and Deliver +%% Emit session stats %%-------------------------------------------------------------------- -dequeue(Session = #session{client_pid = undefined}) -> - %% do nothing if client is disconnected - Session; +maybe_enable_stats(State = #state{enable_stats = false}) -> + State; +maybe_enable_stats(State = #state{client_pid = Pid}) when is_pid(Pid) -> + State; +maybe_enable_stats(State = #state{enable_stats = Interval}) -> + StatsTimer = start_timer(Interval, emit_stats), + State#state{stats_timer = StatsTimer}. -dequeue(Session) -> - case check_inflight(Session) of - true -> dequeue2(Session); - false -> Session - end. - -dequeue2(Session = #session{message_queue = Q}) -> - case emqttd_mqueue:out(Q) of - {empty, _Q} -> - Session; - {{value, Msg}, Q1} -> - %% dequeue more - dequeue(deliver(Msg, Session#session{message_queue = Q1})) - end. - -deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) -> - ClientPid ! {deliver, Msg}, Session; - -deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{packet_id = PktId, - client_pid = ClientPid, - inflight_queue = InflightQ}) - when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> - Msg1 = Msg#mqtt_message{pktid = PktId, dup = false}, - ClientPid ! {deliver, Msg1}, - await(Msg1, next_packet_id(Session#session{inflight_queue = [{PktId, Msg1}|InflightQ]})). - -redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) -> - deliver(Msg, Session); - -redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = ClientPid}) - when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> - ClientPid ! {deliver, Msg#mqtt_message{dup = true}}, - await(Msg, Session). +emit_stats(State = #state{enable_stats = false}) -> + State; +emit_stats(State = #state{client_id = ClientId}) -> + emqttd_stats:set_session_stats(ClientId, stats(State)), + State. %%-------------------------------------------------------------------- -%% Awaiting ack for qos1, qos2 message -%%------------------------------------------------------------------------------ -await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting, - retry_interval = Timeout}) -> - TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), - Awaiting1 = maps:put(PktId, TRef, Awaiting), - Session#session{awaiting_ack = Awaiting1}. +%% Helper functions +%%-------------------------------------------------------------------- -acked(PktId, Session = #session{client_id = ClientId, - username = Username, - inflight_queue = InflightQ, - awaiting_ack = Awaiting}) -> - case lists:keyfind(PktId, 1, InflightQ) of - {_, Msg} -> - emqttd:run_hooks('message.acked', [ClientId, Username], Msg); - false -> - ?LOG(error, "Cannot find acked pktid: ~p", [PktId], Session) - end, - Session#session{awaiting_ack = maps:remove(PktId, Awaiting), - inflight_queue = lists:keydelete(PktId, 1, InflightQ)}. +inc(Key) -> put(Key, get(Key) + 1). -next_packet_id(Session = #session{packet_id = 16#ffff}) -> - Session#session{packet_id = 1}; - -next_packet_id(Session = #session{packet_id = Id}) -> - Session#session{packet_id = Id + 1}. - -timer(TimeoutSec, TimeoutMsg) -> - erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg). - -cancel_timer(undefined) -> - undefined; -cancel_timer(Ref) -> - catch erlang:cancel_timer(Ref). +reply(Reply, State) -> + {reply, Reply, State}. noreply(State) -> {noreply, State}. @@ -680,29 +817,3 @@ hibernate(State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. -start_collector(Session = #session{collect_interval = 0}) -> - Session; - -start_collector(Session = #session{collect_interval = Interval}) -> - TRef = erlang:send_after(timer:seconds(Interval), self(), collect_info), - Session#session{collect_timer = TRef}. - -sess_info(#session{clean_sess = CleanSess, - inflight_queue = InflightQueue, - max_inflight = MaxInflight, - message_queue = MessageQueue, - awaiting_rel = AwaitingRel, - awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp, - timestamp = CreatedAt}) -> - Stats = emqttd_mqueue:stats(MessageQueue), - [{clean_sess, CleanSess}, - {max_inflight, MaxInflight}, - {inflight_queue, length(InflightQueue)}, - {message_queue, get_value(len, Stats)}, - {message_dropped,get_value(dropped, Stats)}, - {awaiting_rel, maps:size(AwaitingRel)}, - {awaiting_ack, maps:size(AwaitingAck)}, - {awaiting_comp, maps:size(AwaitingComp)}, - {created_at, CreatedAt}]. -