refactor(session): pass ClientInfo as first params to APIs of emqx_session

This commit is contained in:
Shawn 2022-02-08 11:07:03 +08:00
parent b2f027bcf7
commit 06168f7080
4 changed files with 145 additions and 154 deletions

View File

@ -365,7 +365,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
= #channel{clientinfo = ClientInfo, session = Session}) ->
case emqx_session:puback(PacketId, Session) of
case emqx_session:puback(ClientInfo, PacketId, Session) of
{ok, Msg, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Properties),
{ok, set_session(NSession, Channel)};
@ -384,7 +384,7 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
= #channel{clientinfo = ClientInfo, session = Session}) ->
case emqx_session:pubrec(PacketId, Session) of
case emqx_session:pubrec(ClientInfo, PacketId, Session) of
{ok, Msg, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Properties),
NChannel = set_session(NSession, Channel),
@ -399,8 +399,9 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
handle_out(pubrel, {PacketId, RC}, Channel)
end;
handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
case emqx_session:pubrel(PacketId, Session) of
handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{clientinfo = ClientInfo,
session = Session}) ->
case emqx_session:pubrel(ClientInfo, PacketId, Session) of
{ok, NSession} ->
NChannel = set_session(NSession, Channel),
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
@ -410,8 +411,9 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se
handle_out(pubcomp, {PacketId, RC}, Channel)
end;
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
case emqx_session:pubcomp(PacketId, Session) of
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{
clientinfo = ClientInfo, session = Session}) ->
case emqx_session:pubcomp(ClientInfo, PacketId, Session) of
{ok, NSession} ->
{ok, set_session(NSession, Channel)};
{ok, Publishes, NSession} ->
@ -617,8 +619,8 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
handle_out(puback, {PacketId, RC}, NChannel);
do_publish(PacketId, Msg = #message{qos = ?QOS_2},
Channel = #channel{session = Session}) ->
case emqx_session:publish(PacketId, Msg, Session) of
Channel = #channel{clientinfo = ClientInfo, session = Session}) ->
case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of
{ok, PubRes, NSession} ->
RC = puback_reason_code(PubRes),
NChannel0 = set_session(NSession, Channel),
@ -779,22 +781,22 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
handle_deliver(Delivers, Channel = #channel{takeover = true,
pendings = Pendings,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
clientinfo = #{clientid := ClientId} = ClientInfo}) ->
%% NOTE: Order is important here. While the takeover is in
%% progress, the session cannot enqueue messages, since it already
%% passed on the queue to the new connection in the session state.
NPendings = lists:append(
Pendings,
emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)),
emqx_session:ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)),
{ok, Channel#channel{pendings = NPendings}};
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
takeover = false,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
clientinfo = #{clientid := ClientId} = ClientInfo}) ->
Delivers1 = maybe_nack(Delivers),
Delivers2 = emqx_session:ignore_local(Delivers1, ClientId, Session),
NSession = emqx_session:enqueue(Delivers2, Session),
Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session),
NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session),
NChannel = set_session(NSession, Channel),
%% We consider queued/dropped messages as delivered since they are now in the session state.
maybe_mark_as_delivered(Session, Delivers),
@ -802,9 +804,10 @@ handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
handle_deliver(Delivers, Channel = #channel{session = Session,
takeover = false,
clientinfo = #{clientid := ClientId}
clientinfo = #{clientid := ClientId} = ClientInfo
}) ->
case emqx_session:deliver(emqx_session:ignore_local(Delivers, ClientId, Session), Session) of
case emqx_session:deliver(ClientInfo,
emqx_session:ignore_local(ClientInfo, Delivers, ClientId, Session), Session) of
{ok, Publishes, NSession} ->
NChannel = set_session(NSession, Channel),
maybe_mark_as_delivered(NSession, Delivers),
@ -1111,8 +1114,8 @@ handle_timeout(_TRef, retry_delivery,
Channel = #channel{conn_state = disconnected}) ->
{ok, Channel};
handle_timeout(_TRef, retry_delivery,
Channel = #channel{session = Session}) ->
case emqx_session:retry(Session) of
Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
case emqx_session:retry(ClientInfo, Session) of
{ok, NSession} ->
{ok, clean_timer(retry_timer, set_session(NSession, Channel))};
{ok, Publishes, Timeout, NSession} ->
@ -1124,8 +1127,8 @@ handle_timeout(_TRef, expire_awaiting_rel,
Channel = #channel{conn_state = disconnected}) ->
{ok, Channel};
handle_timeout(_TRef, expire_awaiting_rel,
Channel = #channel{session = Session}) ->
case emqx_session:expire(awaiting_rel, Session) of
Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
case emqx_session:expire(ClientInfo, awaiting_rel, Session) of
{ok, NSession} ->
{ok, clean_timer(await_timer, set_session(NSession, Channel))};
{ok, Timeout, NSession} ->
@ -1690,11 +1693,11 @@ maybe_resume_session(#channel{resuming = false}) ->
maybe_resume_session(#channel{session = Session,
resuming = true,
pendings = Pendings,
clientinfo = #{clientid := ClientId}}) ->
{ok, Publishes, Session1} = emqx_session:replay(Session),
clientinfo = #{clientid := ClientId} = ClientInfo}) ->
{ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
%% We consider queued/dropped messages as delivered since they are now in the session state.
emqx_persistent_session:mark_as_delivered(ClientId, Pendings),
case emqx_session:deliver(Pendings, Session1) of
case emqx_session:deliver(ClientInfo, Pendings, Session1) of
{ok, Session2} ->
{ok, Publishes, Session2};
{ok, More, Session2} ->

View File

@ -135,14 +135,13 @@
, 'recv_msg.qos0'
, 'recv_msg.qos1'
, 'recv_msg.qos2'
, 'recv_msg.dropped'
, 'recv_msg.dropped.expired'
, send_pkt
, send_msg
, 'send_msg.qos0'
, 'send_msg.qos1'
, 'send_msg.qos2'
, 'send_msg.dropped'
, 'send_msg.dropped.await_pubrel_timeout'
, 'send_msg.dropped.expired'
, 'send_msg.dropped.queue_full'
, 'send_msg.dropped.too_large'

View File

@ -66,27 +66,27 @@
, unsubscribe/4
]).
-export([ publish/3
, puback/2
, pubrec/2
, pubrel/2
, pubcomp/2
-export([ publish/4
, puback/3
, pubrec/3
, pubrel/3
, pubcomp/3
]).
-export([ deliver/2
, enqueue/2
, dequeue/1
, ignore_local/3
, retry/1
-export([ deliver/3
, enqueue/3
, dequeue/2
, ignore_local/4
, retry/2
, terminate/3
]).
-export([ takeover/1
, resume/2
, replay/1
, replay/2
]).
-export([expire/2]).
-export([expire/3]).
%% Export for CT
-export([set_field/3]).
@ -277,18 +277,19 @@ stats(Session) -> info(?STATS_KEYS, Session).
%% Ignore local messages
%%--------------------------------------------------------------------
ignore_local(Delivers, Subscriber, Session) ->
ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
Subs = info(subscriptions, Session),
lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
case maps:find(Topic, Subs) of
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.no_local'),
true;
_ ->
false
end
end, Delivers).
lists:dropwhile(fun({deliver, Topic, #message{from = Publisher} = Msg}) ->
case maps:find(Topic, Subs) of
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]),
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.no_local'),
true;
_ ->
false
end
end, Delivers).
%%--------------------------------------------------------------------
%% Client -> Broker: SUBSCRIBE
@ -310,7 +311,6 @@ subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts,
true -> {error, ?RC_QUOTA_EXCEEDED}
end.
-compile({inline, [is_subscriptions_full/1]}).
is_subscriptions_full(#session{max_subscriptions = infinity}) ->
false;
is_subscriptions_full(#session{subscriptions = Subs,
@ -340,10 +340,10 @@ unsubscribe(ClientInfo, TopicFilter, UnSubOpts,
%% Client -> Broker: PUBLISH
%%--------------------------------------------------------------------
-spec(publish(emqx_types:packet_id(), emqx_types:message(), session())
-spec(publish(emqx_types:clientinfo(), emqx_types:packet_id(), emqx_types:message(), session())
-> {ok, emqx_types:publish_result(), session()}
| {error, emqx_types:reason_code()}).
publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts},
publish(_ClientInfo, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts},
Session = #session{awaiting_rel = AwaitingRel}) ->
case is_awaiting_full(Session) of
false ->
@ -359,10 +359,9 @@ publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts},
end;
%% Publish QoS0/1 directly
publish(_PacketId, Msg, Session) ->
publish(_ClientInfo, _PacketId, Msg, Session) ->
{ok, emqx_broker:publish(Msg), Session}.
-compile({inline, [is_awaiting_full/1]}).
is_awaiting_full(#session{max_awaiting_rel = infinity}) ->
false;
is_awaiting_full(#session{awaiting_rel = AwaitingRel,
@ -373,23 +372,22 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
%% Client -> Broker: PUBACK
%%--------------------------------------------------------------------
-spec(puback(emqx_types:packet_id(), session())
-spec(puback(emqx_types:clientinfo(), emqx_types:packet_id(), session())
-> {ok, emqx_types:message(), session()}
| {ok, emqx_types:message(), replies(), session()}
| {error, emqx_types:reason_code()}).
puback(PacketId, Session = #session{inflight = Inflight}) ->
puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
Session2 = update_latency(Msg, Session),
return_with(Msg, dequeue(Session2#session{inflight = Inflight1}));
return_with(Msg, dequeue(ClientInfo, Session2#session{inflight = Inflight1}));
{value, {_Pubrel, _Ts}} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
end.
-compile({inline, [return_with/2]}).
return_with(Msg, {ok, Session}) ->
{ok, Msg, Session};
return_with(Msg, {ok, Publishes, Session}) ->
@ -399,10 +397,10 @@ return_with(Msg, {ok, Publishes, Session}) ->
%% Client -> Broker: PUBREC
%%--------------------------------------------------------------------
-spec(pubrec(emqx_types:packet_id(), session())
-spec(pubrec(emqx_types:clientinfo(), emqx_types:packet_id(), session())
-> {ok, emqx_types:message(), session()}
| {error, emqx_types:reason_code()}).
pubrec(PacketId, Session = #session{inflight = Inflight}) ->
pubrec(_ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}),
@ -418,9 +416,9 @@ pubrec(PacketId, Session = #session{inflight = Inflight}) ->
%% Client -> Broker: PUBREL
%%--------------------------------------------------------------------
-spec(pubrel(emqx_types:packet_id(), session())
-spec(pubrel(emqx_types:clientinfo(), emqx_types:packet_id(), session())
-> {ok, session()} | {error, emqx_types:reason_code()}).
pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
pubrel(_ClientInfo, PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
case maps:take(PacketId, AwaitingRel) of
{_Ts, AwaitingRel1} ->
{ok, Session#session{awaiting_rel = AwaitingRel1}};
@ -432,15 +430,15 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
%% Client -> Broker: PUBCOMP
%%--------------------------------------------------------------------
-spec(pubcomp(emqx_types:packet_id(), session())
-spec(pubcomp(emqx_types:clientinfo(), emqx_types:packet_id(), session())
-> {ok, session()} | {ok, replies(), session()}
| {error, emqx_types:reason_code()}).
pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) ->
Session2 = update_latency(Pubrel, Session),
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
dequeue(Session2#session{inflight = Inflight1});
dequeue(ClientInfo, Session2#session{inflight = Inflight1});
{value, _Other} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
@ -451,29 +449,30 @@ pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
%% Dequeue Msgs
%%--------------------------------------------------------------------
dequeue(Session = #session{inflight = Inflight, mqueue = Q}) ->
dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) ->
case emqx_mqueue:is_empty(Q) of
true -> {ok, Session};
false ->
{Msgs, Q1} = dequeue(batch_n(Inflight), [], Q),
{Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q),
deliver(Msgs, [], Session#session{mqueue = Q1})
end.
dequeue(0, Msgs, Q) ->
dequeue(_ClientInfo, 0, Msgs, Q) ->
{lists:reverse(Msgs), Q};
dequeue(Cnt, Msgs, Q) ->
dequeue(ClientInfo, Cnt, Msgs, Q) ->
case emqx_mqueue:out(Q) of
{empty, _Q} -> dequeue(0, Msgs, Q);
{empty, _Q} -> dequeue(ClientInfo, 0, Msgs, Q);
{{value, Msg}, Q1} ->
case emqx_message:is_expired(Msg) of
true -> ok = inc_expired_cnt(delivery),
dequeue(Cnt, Msgs, Q1);
false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
true ->
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
ok = inc_delivery_expired_cnt(),
dequeue(ClientInfo, Cnt, Msgs, Q1);
false -> dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
end
end.
-compile({inline, [acc_cnt/2]}).
acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt;
acc_cnt(_Msg, Cnt) -> Cnt - 1.
@ -481,38 +480,38 @@ acc_cnt(_Msg, Cnt) -> Cnt - 1.
%% Broker -> Client: Deliver
%%--------------------------------------------------------------------
-spec(deliver(list(emqx_types:deliver()), session())
-spec(deliver(emqx_types:clientinfo(), list(emqx_types:deliver()), session())
-> {ok, session()} | {ok, replies(), session()}).
deliver([Deliver], Session) -> %% Optimize
Enrich = enrich_fun(Session),
deliver_msg(Enrich(Deliver), Session);
deliver(ClientInfo, [Deliver], Session) -> %% Optimize
Msg = enrich_deliver(Deliver, Session),
deliver_msg(ClientInfo, Msg, Session);
deliver(Delivers, Session) ->
Msgs = lists:map(enrich_fun(Session), Delivers),
deliver(Msgs, [], Session).
deliver(ClientInfo, Delivers, Session) ->
Msgs = [enrich_deliver(D, Session) || D <- Delivers],
do_deliver(ClientInfo, Msgs, [], Session).
deliver([], Publishes, Session) ->
do_deliver(_ClientInfo, [], Publishes, Session) ->
{ok, lists:reverse(Publishes), Session};
deliver([Msg | More], Acc, Session) ->
case deliver_msg(Msg, Session) of
do_deliver(ClientInfo, [Msg | More], Acc, Session) ->
case deliver_msg(ClientInfo, Msg, Session) of
{ok, Session1} ->
deliver(More, Acc, Session1);
do_deliver(ClientInfo, More, Acc, Session1);
{ok, [Publish], Session1} ->
deliver(More, [Publish|Acc], Session1)
do_deliver(ClientInfo, More, [Publish|Acc], Session1)
end.
deliver_msg(Msg = #message{qos = ?QOS_0}, Session) ->
deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
{ok, [{undefined, maybe_ack(Msg)}], Session};
deliver_msg(Msg = #message{qos = QoS}, Session =
deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
#session{next_pkt_id = PacketId, inflight = Inflight})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
case emqx_inflight:is_full(Inflight) of
true ->
Session1 = case maybe_nack(Msg) of
true -> Session;
false -> enqueue(Msg, Session)
false -> enqueue(ClientInfo, Msg, Session)
end,
{ok, Session1};
false ->
@ -521,32 +520,34 @@ deliver_msg(Msg = #message{qos = QoS}, Session =
{ok, [Publish], next_pkt_id(Session1)}
end.
-spec(enqueue(list(emqx_types:deliver())|emqx_types:message(),
-spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver())|emqx_types:message(),
session()) -> session()).
enqueue([Deliver], Session) -> %% Optimize
Enrich = enrich_fun(Session),
enqueue(Enrich(Deliver), Session);
enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) ->
lists:foldl(fun(Deliver, Session0) ->
Msg = enrich_deliver(Deliver, Session),
enqueue(ClientInfo, Msg, Session0)
end, Session, Delivers);
enqueue(Delivers, Session) when is_list(Delivers) ->
Msgs = lists:map(enrich_fun(Session), Delivers),
lists:foldl(fun enqueue/2, Session, Msgs);
enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
enqueue(ClientInfo, #message{} = Msg, Session = #session{mqueue = Q}) ->
{Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
(Dropped =/= undefined) andalso log_dropped(Dropped, Session),
(Dropped =/= undefined) andalso handle_dropped(ClientInfo, Dropped, Session),
Session#session{mqueue = NewQ}.
log_dropped(Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) ->
handle_dropped(ClientInfo, Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) ->
Payload = emqx_message:to_log_map(Msg),
#{store_qos0 := StoreQos0} = QueueInfo = emqx_mqueue:info(Q),
case (QoS == ?QOS_0) andalso (not StoreQos0) of
true ->
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]),
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
ok = inc_pd('send_msg.dropped'),
?SLOG(warning, #{msg => "dropped_qos0_msg",
queue => QueueInfo,
payload => Payload}, #{topic => Topic});
false ->
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]),
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
ok = inc_pd('send_msg.dropped'),
ok = inc_pd('send_msg.dropped.queue_full'),
@ -555,10 +556,8 @@ log_dropped(Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) ->
payload => Payload}, #{topic => Topic})
end.
enrich_fun(Session = #session{subscriptions = Subs}) ->
fun({deliver, Topic, Msg}) ->
enrich_subopts(get_subopts(Topic, Subs), Msg, Session)
end.
enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) ->
enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
maybe_ack(Msg) ->
case emqx_shared_sub:is_ack_required(Msg) of
@ -613,53 +612,54 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
%% Retry Delivery
%%--------------------------------------------------------------------
-spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
retry(Session = #session{inflight = Inflight, retry_interval = RetryInterval}) ->
-spec(retry(emqx_types:clientinfo(), session()) ->
{ok, session()} | {ok, replies(), timeout(), session()}).
retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) ->
case emqx_inflight:is_empty(Inflight) of
true -> {ok, Session};
false ->
Now = erlang:system_time(millisecond),
Session2 = check_expire_latency(Now, RetryInterval, Session),
retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
[],
Now,
Session2)
retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), [], Now,
Session2, ClientInfo)
end.
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) ->
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) ->
{ok, lists:reverse(Acc), Interval, Session};
retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
#session{retry_interval = Interval, inflight = Inflight}) ->
#session{retry_interval = Interval, inflight = Inflight}, ClientInfo) ->
case (Age = age(Now, Ts)) >= Interval of
true ->
{Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight),
retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1});
{Acc1, Inflight1} = do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo),
retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo);
false ->
{ok, lists:reverse(Acc), Interval - max(0, Age), Session}
end.
retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
do_retry_delivery(PacketId, pubrel, Now, Acc, Inflight, _) ->
Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight),
{[{pubrel, PacketId}|Acc], Inflight1};
do_retry_delivery(PacketId, #message{} = Msg, Now, Acc, Inflight, ClientInfo) ->
case emqx_message:is_expired(Msg) of
true ->
ok = inc_expired_cnt(delivery),
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
ok = inc_delivery_expired_cnt(),
{Acc, emqx_inflight:delete(PacketId, Inflight)};
false ->
Msg1 = emqx_message:set_flag(dup, true, Msg),
Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight),
{[{PacketId, Msg1}|Acc], Inflight1}
end;
retry_delivery(PacketId, Pubrel, Now, Acc, Inflight) ->
Inflight1 = emqx_inflight:update(PacketId, {Pubrel, Now}, Inflight),
{[{pubrel, PacketId}|Acc], Inflight1}.
end.
%%--------------------------------------------------------------------
%% Expire Awaiting Rel
%%--------------------------------------------------------------------
-spec(expire(awaiting_rel, session()) -> {ok, session()} | {ok, timeout(), session()}).
expire(awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) ->
-spec(expire(emqx_types:clientinfo(), awaiting_rel, session()) ->
{ok, session()} | {ok, timeout(), session()}).
expire(_ClientInfo, awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) ->
case maps:size(AwaitingRel) of
0 -> {ok, Session};
_ -> expire_awaiting_rel(erlang:system_time(millisecond), Session)
@ -670,7 +670,7 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
AwaitingRel1 = maps:filter(NotExpired, AwaitingRel),
ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1),
(ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt),
(ExpiredCnt > 0) andalso inc_await_pubrel_timeout(ExpiredCnt),
NSession = Session#session{awaiting_rel = AwaitingRel1},
case maps:size(AwaitingRel1) of
0 -> {ok, NSession};
@ -693,14 +693,14 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
ok = emqx_metrics:inc('session.resumed'),
emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]).
-spec(replay(session()) -> {ok, replies(), session()}).
replay(Session = #session{inflight = Inflight}) ->
-spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}).
replay(ClientInfo, Session = #session{inflight = Inflight}) ->
Pubs = lists:map(fun({PacketId, {Pubrel, _Ts}}) when is_record(Pubrel, pubrel_await) ->
{pubrel, PacketId};
({PacketId, {Msg, _Ts}}) ->
{PacketId, emqx_message:set_flag(dup, true, Msg)}
end, emqx_inflight:to_list(Inflight)),
case dequeue(Session) of
case dequeue(ClientInfo, Session) of
{ok, NSession} -> {ok, Pubs, NSession};
{ok, More, NSession} ->
{ok, lists:append(Pubs, More), NSession}
@ -714,29 +714,26 @@ terminate(ClientInfo, takenover, Session) ->
terminate(ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
-compile({inline, [run_hook/2]}).
run_hook(Name, Args) ->
ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
%%--------------------------------------------------------------------
%% Inc message/delivery expired counter
%%--------------------------------------------------------------------
inc_delivery_expired_cnt() ->
inc_delivery_expired_cnt(1).
-compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}).
inc_expired_cnt(K) -> inc_expired_cnt(K, 1).
inc_expired_cnt(delivery, N) ->
inc_delivery_expired_cnt(N) ->
ok = inc_pd('send_msg.dropped', N),
ok = inc_pd('send_msg.dropped.expired', N),
ok = emqx_metrics:inc('delivery.dropped', N),
emqx_metrics:inc('delivery.dropped.expired', N);
emqx_metrics:inc('delivery.dropped.expired', N).
inc_expired_cnt(message, N) ->
ok = inc_pd('recv_msg.dropped', N),
ok = inc_pd('recv_msg.dropped.expired', N),
inc_await_pubrel_timeout(N) ->
ok = inc_pd('send_msg.dropped', N),
ok = inc_pd('send_msg.dropped.await_pubrel_timeout', N),
ok = emqx_metrics:inc('messages.dropped', N),
emqx_metrics:inc('messages.dropped.expired', N).
emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N).
inc_pd(Key) ->
inc_pd(Key, 1).
@ -747,9 +744,6 @@ inc_pd(Key, Inc) ->
%%--------------------------------------------------------------------
%% Next Packet Id
%%--------------------------------------------------------------------
-compile({inline, [next_pkt_id/1]}).
next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) ->
Session#session{next_pkt_id = 1};
@ -788,9 +782,6 @@ get_birth_timestamp(_, _) ->
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
-compile({inline, [sort_fun/0, batch_n/1, with_ts/1, age/2]}).
sort_fun() ->
fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 =< Ts2 end.

View File

@ -152,10 +152,6 @@ properties(client) ->
<<"Number of PUBLISH QoS1 packets received">>},
{'recv_msg.qos2', integer,
<<"Number of PUBLISH QoS2 packets received">>},
{'recv_msg.dropped', integer,
<<"Number of dropped PUBLISH packets">>},
{'recv_msg.dropped.expired', integer,
<<"Number of dropped PUBLISH packets due to expired">>},
{recv_oct, integer,
<<"Number of bytes received by EMQ X Broker (the same below)">>},
{recv_pkt, integer,
@ -165,21 +161,23 @@ properties(client) ->
{send_cnt, integer,
<<"Number of TCP packets sent">>},
{send_msg, integer,
<<"Number of PUBLISH packets sent">>},
<<"Number of PUBLISH messages sent">>},
{'send_msg.qos0', integer,
<<"Number of PUBLISH QoS0 packets sent">>},
<<"Number of PUBLISH QoS0 messages sent">>},
{'send_msg.qos1', integer,
<<"Number of PUBLISH QoS1 packets sent">>},
<<"Number of PUBLISH QoS1 messages sent">>},
{'send_msg.qos2', integer,
<<"Number of PUBLISH QoS2 packets sent">>},
<<"Number of PUBLISH QoS2 messages sent">>},
{'send_msg.dropped', integer,
<<"Number of dropped PUBLISH packets">>},
<<"Number of dropped PUBLISH messages">>},
{'send_msg.dropped.await_pubrel_timeout', integer,
<<"Number of dropped PUBLISH packets due to waiting PUBREL timeout">>},
{'send_msg.dropped.expired', integer,
<<"Number of dropped PUBLISH packets due to expired">>},
<<"Number of dropped PUBLISH messages due to expired">>},
{'send_msg.dropped.queue_full', integer,
<<"Number of dropped PUBLISH packets due to queue full">>},
<<"Number of dropped PUBLISH messages due to queue full">>},
{'send_msg.dropped.too_large', integer,
<<"Number of dropped PUBLISH packets due to packet length too large">>},
<<"Number of dropped PUBLISH messages due to packet length too large">>},
{send_oct, integer,
<<"Number of bytes sent">>},
{send_pkt, integer,