%%-------------------------------------------------------------------- %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- %%-------------------------------------------------------------------- %% @doc %% A stateful interaction between a Client and a Server. Some Sessions %% last only as long as the Network Connection, others can span multiple %% consecutive Network Connections between a Client and a Server. %% %% The Session State in the Server consists of: %% %% The existence of a Session, even if the rest of the Session State is empty. %% %% The Clients subscriptions, including any Subscription Identifiers. %% %% QoS 1 and QoS 2 messages which have been sent to the Client, but have not %% been completely acknowledged. %% %% QoS 1 and QoS 2 messages pending transmission to the Client and OPTIONALLY %% QoS 0 messages pending transmission to the Client. %% %% QoS 2 messages which have been received from the Client, but have not been %% completely acknowledged.The Will Message and the Will Delay Interval %% %% If the Session is currently not connected, the time at which the Session %% will end and Session State will be discarded. %% @end %%-------------------------------------------------------------------- %% MQTT Session -module(emqx_session). -include("emqx.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include("types.hrl"). -logger_header("[Session]"). -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). -endif. -export([init/2]). -export([ info/1 , info/2 , stats/1 ]). -export([ subscribe/4 , unsubscribe/4 ]). -export([ publish/3 , puback/2 , pubrec/2 , pubrel/2 , pubcomp/2 ]). -export([ deliver/2 , enqueue/2 , dequeue/1 , retry/1 , terminate/3 ]). -export([ takeover/1 , resume/2 , replay/1 ]). -export([expire/2]). %% Export for CT -export([set_field/3]). -export_type([session/0]). -import(emqx_zone, [get_env/3]). -record(session, { %% Client’s Subscriptions. subscriptions :: map(), %% Max subscriptions allowed max_subscriptions :: non_neg_integer(), %% Upgrade QoS? upgrade_qos :: boolean(), %% Client <- Broker: QoS1/2 messages sent to the client but %% have not been unacked. inflight :: emqx_inflight:inflight(), %% All QoS1/2 messages published to when client is disconnected, %% or QoS1/2 messages pending transmission to the Client. %% %% Optionally, QoS0 messages pending transmission to the Client. mqueue :: emqx_mqueue:mqueue(), %% Next packet id of the session next_pkt_id = 1 :: emqx_types:packet_id(), %% Retry interval for redelivering QoS1/2 messages (Unit: millsecond) retry_interval :: timeout(), %% Client -> Broker: QoS2 messages received from the client, but %% have not been completely acknowledged awaiting_rel :: map(), %% Maximum number of awaiting QoS2 messages allowed max_awaiting_rel :: non_neg_integer(), %% Awaiting PUBREL Timeout (Unit: millsecond) await_rel_timeout :: timeout(), %% Created at created_at :: pos_integer() }). -type(session() :: #session{}). -type(publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}). -type(pubrel() :: {pubrel, emqx_types:packet_id()}). -type(replies() :: list(publish() | pubrel())). -define(INFO_KEYS, [subscriptions, upgrade_qos, retry_interval, await_rel_timeout, created_at ]). -define(STATS_KEYS, [subscriptions_cnt, subscriptions_max, inflight_cnt, inflight_max, mqueue_len, mqueue_max, mqueue_dropped, next_pkt_id, awaiting_rel_cnt, awaiting_rel_max ]). -define(DEFAULT_BATCH_N, 1000). %%-------------------------------------------------------------------- %% Init a Session %%-------------------------------------------------------------------- -spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()). init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> #session{max_subscriptions = get_env(Zone, max_subscriptions, 0), subscriptions = #{}, upgrade_qos = get_env(Zone, upgrade_qos, false), inflight = emqx_inflight:new(MaxInflight), mqueue = init_mqueue(Zone), next_pkt_id = 1, retry_interval = timer:seconds(get_env(Zone, retry_interval, 0)), awaiting_rel = #{}, max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), await_rel_timeout = timer:seconds(get_env(Zone, await_rel_timeout, 300)), created_at = erlang:system_time(millisecond) }. %% @private init mq init_mqueue(Zone) -> emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000), store_qos0 => get_env(Zone, mqueue_store_qos0, true), priorities => get_env(Zone, mqueue_priorities, none), default_priority => get_env(Zone, mqueue_default_priority, lowest) }). %%-------------------------------------------------------------------- %% Info, Stats %%-------------------------------------------------------------------- %% @doc Get infos of the session. -spec(info(session()) -> emqx_types:infos()). info(Session) -> maps:from_list(info(?INFO_KEYS, Session)). info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; info(subscriptions, #session{subscriptions = Subs}) -> Subs; info(subscriptions_cnt, #session{subscriptions = Subs}) -> maps:size(Subs); info(subscriptions_max, #session{max_subscriptions = MaxSubs}) -> MaxSubs; info(upgrade_qos, #session{upgrade_qos = UpgradeQoS}) -> UpgradeQoS; info(inflight, #session{inflight = Inflight}) -> Inflight; info(inflight_cnt, #session{inflight = Inflight}) -> emqx_inflight:size(Inflight); info(inflight_max, #session{inflight = Inflight}) -> emqx_inflight:max_size(Inflight); info(retry_interval, #session{retry_interval = Interval}) -> Interval div 1000; info(mqueue, #session{mqueue = MQueue}) -> MQueue; info(mqueue_len, #session{mqueue = MQueue}) -> emqx_mqueue:len(MQueue); info(mqueue_max, #session{mqueue = MQueue}) -> emqx_mqueue:max_len(MQueue); info(mqueue_dropped, #session{mqueue = MQueue}) -> emqx_mqueue:dropped(MQueue); info(next_pkt_id, #session{next_pkt_id = PacketId}) -> PacketId; info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) -> AwaitingRel; info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) -> maps:size(AwaitingRel); info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> Max; info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout div 1000; info(created_at, #session{created_at = CreatedAt}) -> CreatedAt. %% @doc Get stats of the session. -spec(stats(session()) -> emqx_types:stats()). stats(Session) -> info(?STATS_KEYS, Session). %%-------------------------------------------------------------------- %% Client -> Broker: SUBSCRIBE %%-------------------------------------------------------------------- -spec(subscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts, Session = #session{subscriptions = Subs}) -> IsNew = not maps:is_key(TopicFilter, Subs), case IsNew andalso is_subscriptions_full(Session) of false -> ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts), ok = emqx_hooks:run('session.subscribed', [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]), {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}}; true -> {error, ?RC_QUOTA_EXCEEDED} end. -compile({inline, [is_subscriptions_full/1]}). is_subscriptions_full(#session{max_subscriptions = 0}) -> false; is_subscriptions_full(#session{subscriptions = Subs, max_subscriptions = MaxLimit}) -> maps:size(Subs) >= MaxLimit. %%-------------------------------------------------------------------- %% Client -> Broker: UNSUBSCRIBE %%-------------------------------------------------------------------- -spec(unsubscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). unsubscribe(ClientInfo, TopicFilter, UnSubOpts, Session = #session{subscriptions = Subs}) -> case maps:find(TopicFilter, Subs) of {ok, SubOpts} -> ok = emqx_broker:unsubscribe(TopicFilter), ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]), {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}}; error -> {error, ?RC_NO_SUBSCRIPTION_EXISTED} end. %%-------------------------------------------------------------------- %% Client -> Broker: PUBLISH %%-------------------------------------------------------------------- -spec(publish(emqx_types:packet_id(), emqx_types:message(), session()) -> {ok, emqx_types:publish_result(), session()} | {error, emqx_types:reason_code()}). publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}, Session = #session{awaiting_rel = AwaitingRel}) -> case is_awaiting_full(Session) of false -> case maps:is_key(PacketId, AwaitingRel) of false -> Results = emqx_broker:publish(Msg), AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel), {ok, Results, Session#session{awaiting_rel = AwaitingRel1}}; true -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} end; true -> {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} end; %% Publish QoS0/1 directly publish(_PacketId, Msg, Session) -> {ok, emqx_broker:publish(Msg), Session}. -compile({inline, [is_awaiting_full/1]}). is_awaiting_full(#session{max_awaiting_rel = 0}) -> false; is_awaiting_full(#session{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLimit}) -> maps:size(AwaitingRel) >= MaxLimit. %%-------------------------------------------------------------------- %% Client -> Broker: PUBACK %%-------------------------------------------------------------------- -spec(puback(emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} | {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}). puback(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), return_with(Msg, dequeue(Session#session{inflight = Inflight1})); {value, {_Pubrel, _Ts}} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. -compile({inline, [return_with/2]}). return_with(Msg, {ok, Session}) -> {ok, Msg, Session}; return_with(Msg, {ok, Publishes, Session}) -> {ok, Msg, Publishes, Session}. %%-------------------------------------------------------------------- %% Client -> Broker: PUBREC %%-------------------------------------------------------------------- -spec(pubrec(emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} | {error, emqx_types:reason_code()}). pubrec(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight), {ok, Msg, Session#session{inflight = Inflight1}}; {value, {pubrel, _Ts}} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. %%-------------------------------------------------------------------- %% Client -> Broker: PUBREL %%-------------------------------------------------------------------- -spec(pubrel(emqx_types:packet_id(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> case maps:take(PacketId, AwaitingRel) of {_Ts, AwaitingRel1} -> {ok, Session#session{awaiting_rel = AwaitingRel1}}; error -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. %%-------------------------------------------------------------------- %% Client -> Broker: PUBCOMP %%-------------------------------------------------------------------- -spec(pubcomp(emqx_types:packet_id(), session()) -> {ok, session()} | {ok, replies(), session()} | {error, emqx_types:reason_code()}). pubcomp(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {pubrel, _Ts}} -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), dequeue(Session#session{inflight = Inflight1}); {value, _Other} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. %%-------------------------------------------------------------------- %% Dequeue Msgs %%-------------------------------------------------------------------- dequeue(Session = #session{inflight = Inflight, mqueue = Q}) -> case emqx_mqueue:is_empty(Q) of true -> {ok, Session}; false -> {Msgs, Q1} = dequeue(batch_n(Inflight), [], Q), deliver(Msgs, [], Session#session{mqueue = Q1}) end. dequeue(0, Msgs, Q) -> {lists:reverse(Msgs), Q}; dequeue(Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of {empty, _Q} -> dequeue(0, Msgs, Q); {{value, Msg}, Q1} -> case emqx_message:is_expired(Msg) of true -> ok = inc_expired_cnt(delivery), dequeue(Cnt, Msgs, Q1); false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1) end end. -compile({inline, [acc_cnt/2]}). acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt; acc_cnt(_Msg, Cnt) -> Cnt - 1. %%-------------------------------------------------------------------- %% Broker -> Client: Deliver %%-------------------------------------------------------------------- -spec(deliver(list(emqx_types:deliver()), session()) -> {ok, session()} | {ok, replies(), session()}). deliver([Deliver], Session) -> %% Optimize Enrich = enrich_fun(Session), deliver_msg(Enrich(Deliver), Session); deliver(Delivers, Session) -> Msgs = lists:map(enrich_fun(Session), Delivers), deliver(Msgs, [], Session). deliver([], Publishes, Session) -> {ok, lists:reverse(Publishes), Session}; deliver([Msg | More], Acc, Session) -> case deliver_msg(Msg, Session) of {ok, Session1} -> deliver(More, Acc, Session1); {ok, [Publish], Session1} -> deliver(More, [Publish|Acc], Session1) end. deliver_msg(Msg = #message{qos = ?QOS_0}, Session) -> {ok, [{undefined, maybe_ack(Msg)}], Session}; deliver_msg(Msg = #message{qos = QoS}, Session = #session{next_pkt_id = PacketId, inflight = Inflight}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case emqx_inflight:is_full(Inflight) of true -> Session1 = case maybe_nack(Msg) of true -> Session; false -> enqueue(Msg, Session) end, {ok, Session1}; false -> Publish = {PacketId, maybe_ack(Msg)}, Session1 = await(PacketId, Msg, Session), {ok, [Publish], next_pkt_id(Session1)} end. -spec(enqueue(list(emqx_types:deliver())|emqx_types:message(), session()) -> session()). enqueue([Deliver], Session) -> %% Optimize Enrich = enrich_fun(Session), enqueue(Enrich(Deliver), Session); enqueue(Delivers, Session) when is_list(Delivers) -> Msgs = lists:map(enrich_fun(Session), Delivers), lists:foldl(fun enqueue/2, Session, Msgs); enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), (Dropped =/= undefined) andalso log_dropped(Dropped, Session), Session#session{mqueue = NewQ}. log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) -> case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of true -> ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), ?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]); false -> ok = emqx_metrics:inc('delivery.dropped.queue_full'), ?LOG(warning, "Dropped msg due to mqueue is full: ~s", [emqx_message:format(Msg)]) end. enrich_fun(Session = #session{subscriptions = Subs}) -> fun({deliver, Topic, Msg}) -> enrich_subopts(get_subopts(Topic, Subs), Msg, Session) end. maybe_ack(Msg) -> case emqx_shared_sub:is_ack_required(Msg) of true -> emqx_shared_sub:maybe_ack(Msg); false -> Msg end. maybe_nack(Msg) -> emqx_shared_sub:is_ack_required(Msg) andalso (ok == emqx_shared_sub:maybe_nack_dropped(Msg)). get_subopts(Topic, SubMap) -> case maps:find(Topic, SubMap) of {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> [{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}]; {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> [{nl, Nl}, {qos, QoS}, {rap, Rap}]; error -> [] end. enrich_subopts([], Msg, _Session) -> Msg; enrich_subopts([{nl, 1}|Opts], Msg, Session) -> enrich_subopts(Opts, emqx_message:set_flag(nl, Msg), Session); enrich_subopts([{nl, 0}|Opts], Msg, Session) -> enrich_subopts(Opts, Msg, Session); enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos = true}) -> enrich_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session); enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos = false}) -> enrich_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session); enrich_subopts([{rap, 1}|Opts], Msg, Session) -> enrich_subopts(Opts, Msg, Session); enrich_subopts([{rap, 0}|Opts], Msg = #message{headers = #{retained := true}}, Session) -> enrich_subopts(Opts, Msg, Session); enrich_subopts([{rap, 0}|Opts], Msg, Session) -> enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session); enrich_subopts([{subid, SubId}|Opts], Msg, Session) -> Props = emqx_message:get_header(properties, Msg, #{}), Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg), enrich_subopts(Opts, Msg1, Session). %%-------------------------------------------------------------------- %% Awaiting ACK for QoS1/QoS2 Messages %%-------------------------------------------------------------------- await(PacketId, Msg, Session = #session{inflight = Inflight}) -> Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight), Session#session{inflight = Inflight1}. %%-------------------------------------------------------------------- %% Retry Delivery %%-------------------------------------------------------------------- -spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). retry(Session = #session{inflight = Inflight}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session}; false -> retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), [], erlang:system_time(millisecond), Session) end. retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) -> {ok, lists:reverse(Acc), Interval, Session}; retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session = #session{retry_interval = Interval, inflight = Inflight}) -> case (Age = age(Now, Ts)) >= Interval of true -> {Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight), retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}); false -> {ok, lists:reverse(Acc), Interval - max(0, Age), Session} end. retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -> case emqx_message:is_expired(Msg) of true -> ok = inc_expired_cnt(delivery), {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> Msg1 = emqx_message:set_flag(dup, true, Msg), Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight), {[{PacketId, Msg1}|Acc], Inflight1} end; retry_delivery(PacketId, pubrel, Now, Acc, Inflight) -> Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight), {[{pubrel, PacketId}|Acc], Inflight1}. %%-------------------------------------------------------------------- %% Expire Awaiting Rel %%-------------------------------------------------------------------- -spec(expire(awaiting_rel, session()) -> {ok, session()} | {ok, timeout(), session()}). expire(awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) -> case maps:size(AwaitingRel) of 0 -> {ok, Session}; _ -> expire_awaiting_rel(erlang:system_time(millisecond), Session) end. expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end, AwaitingRel1 = maps:filter(NotExpired, AwaitingRel), ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1), (ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt), NSession = Session#session{awaiting_rel = AwaitingRel1}, case maps:size(AwaitingRel1) of 0 -> {ok, NSession}; _ -> {ok, Timeout, NSession} end. %%-------------------------------------------------------------------- %% Takeover, Resume and Replay %%-------------------------------------------------------------------- -spec(takeover(session()) -> ok). takeover(#session{subscriptions = Subs}) -> lists:foreach(fun emqx_broker:unsubscribe/1, maps:keys(Subs)). -spec(resume(emqx_types:clientinfo(), session()) -> ok). resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = Subs}) -> lists:foreach(fun({TopicFilter, SubOpts}) -> ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts) end, maps:to_list(Subs)), ok = emqx_metrics:inc('session.resumed'), emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]). -spec(replay(session()) -> {ok, replies(), session()}). replay(Session = #session{inflight = Inflight}) -> Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) -> {pubrel, PacketId}; ({PacketId, {Msg, _Ts}}) -> {PacketId, emqx_message:set_flag(dup, true, Msg)} end, emqx_inflight:to_list(Inflight)), case dequeue(Session) of {ok, NSession} -> {ok, Pubs, NSession}; {ok, More, NSession} -> {ok, lists:append(Pubs, More), NSession} end. -spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok). terminate(ClientInfo, discarded, Session) -> run_hook('session.discarded', [ClientInfo, info(Session)]); terminate(ClientInfo, takeovered, Session) -> run_hook('session.takeovered', [ClientInfo, info(Session)]); terminate(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). -compile({inline, [run_hook/2]}). run_hook(Name, Args) -> ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). %%-------------------------------------------------------------------- %% Inc message/delivery expired counter %%-------------------------------------------------------------------- -compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}). inc_expired_cnt(K) -> inc_expired_cnt(K, 1). inc_expired_cnt(delivery, N) -> ok = emqx_metrics:inc('delivery.dropped', N), emqx_metrics:inc('delivery.dropped.expired', N); inc_expired_cnt(message, N) -> ok = emqx_metrics:inc('messages.dropped', N), emqx_metrics:inc('messages.dropped.expired', N). %%-------------------------------------------------------------------- %% Next Packet Id %%-------------------------------------------------------------------- -compile({inline, [next_pkt_id/1]}). next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) -> Session#session{next_pkt_id = 1}; next_pkt_id(Session = #session{next_pkt_id = Id}) -> Session#session{next_pkt_id = Id + 1}. %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- -compile({inline, [sort_fun/0, batch_n/1, with_ts/1, age/2]}). sort_fun() -> fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 =< Ts2 end. batch_n(Inflight) -> case emqx_inflight:max_size(Inflight) of 0 -> ?DEFAULT_BATCH_N; Sz -> Sz - emqx_inflight:size(Inflight) end. with_ts(Msg) -> {Msg, erlang:system_time(millisecond)}. age(Now, Ts) -> Now - Ts. %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- set_field(Name, Value, Session) -> Pos = emqx_misc:index_of(Name, record_info(fields, session)), setelement(Pos+1, Session, Value).