diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 07463b0c5..2c0a74fe9 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -365,7 +365,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> - case emqx_session:puback(PacketId, Session) of + case emqx_session:puback(ClientInfo, PacketId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), {ok, set_session(NSession, Channel)}; @@ -384,7 +384,7 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> - case emqx_session:pubrec(PacketId, Session) of + case emqx_session:pubrec(ClientInfo, PacketId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), NChannel = set_session(NSession, Channel), @@ -399,8 +399,9 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel handle_out(pubrel, {PacketId, RC}, Channel) end; -handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> - case emqx_session:pubrel(PacketId, Session) of +handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{clientinfo = ClientInfo, + session = Session}) -> + case emqx_session:pubrel(ClientInfo, PacketId, Session) of {ok, NSession} -> NChannel = set_session(NSession, Channel), handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); @@ -410,8 +411,9 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se handle_out(pubcomp, {PacketId, RC}, Channel) end; -handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> - case emqx_session:pubcomp(PacketId, Session) of +handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{ + clientinfo = ClientInfo, session = Session}) -> + case emqx_session:pubcomp(ClientInfo, PacketId, Session) of {ok, NSession} -> {ok, set_session(NSession, Channel)}; {ok, Publishes, NSession} -> @@ -617,8 +619,8 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) -> handle_out(puback, {PacketId, RC}, NChannel); do_publish(PacketId, Msg = #message{qos = ?QOS_2}, - Channel = #channel{session = Session}) -> - case emqx_session:publish(PacketId, Msg, Session) of + Channel = #channel{clientinfo = ClientInfo, session = Session}) -> + case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of {ok, PubRes, NSession} -> RC = puback_reason_code(PubRes), NChannel0 = set_session(NSession, Channel), @@ -779,22 +781,22 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel. handle_deliver(Delivers, Channel = #channel{takeover = true, pendings = Pendings, session = Session, - clientinfo = #{clientid := ClientId}}) -> + clientinfo = #{clientid := ClientId} = ClientInfo}) -> %% NOTE: Order is important here. While the takeover is in %% progress, the session cannot enqueue messages, since it already %% passed on the queue to the new connection in the session state. NPendings = lists:append( Pendings, - emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)), + emqx_session:ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)), {ok, Channel#channel{pendings = NPendings}}; handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, takeover = false, session = Session, - clientinfo = #{clientid := ClientId}}) -> + clientinfo = #{clientid := ClientId} = ClientInfo}) -> Delivers1 = maybe_nack(Delivers), - Delivers2 = emqx_session:ignore_local(Delivers1, ClientId, Session), - NSession = emqx_session:enqueue(Delivers2, Session), + Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session), + NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session), NChannel = set_session(NSession, Channel), %% We consider queued/dropped messages as delivered since they are now in the session state. maybe_mark_as_delivered(Session, Delivers), @@ -802,9 +804,10 @@ handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, handle_deliver(Delivers, Channel = #channel{session = Session, takeover = false, - clientinfo = #{clientid := ClientId} + clientinfo = #{clientid := ClientId} = ClientInfo }) -> - case emqx_session:deliver(emqx_session:ignore_local(Delivers, ClientId, Session), Session) of + case emqx_session:deliver(ClientInfo, + emqx_session:ignore_local(ClientInfo, Delivers, ClientId, Session), Session) of {ok, Publishes, NSession} -> NChannel = set_session(NSession, Channel), maybe_mark_as_delivered(NSession, Delivers), @@ -1111,8 +1114,8 @@ handle_timeout(_TRef, retry_delivery, Channel = #channel{conn_state = disconnected}) -> {ok, Channel}; handle_timeout(_TRef, retry_delivery, - Channel = #channel{session = Session}) -> - case emqx_session:retry(Session) of + Channel = #channel{session = Session, clientinfo = ClientInfo}) -> + case emqx_session:retry(ClientInfo, Session) of {ok, NSession} -> {ok, clean_timer(retry_timer, set_session(NSession, Channel))}; {ok, Publishes, Timeout, NSession} -> @@ -1124,8 +1127,8 @@ handle_timeout(_TRef, expire_awaiting_rel, Channel = #channel{conn_state = disconnected}) -> {ok, Channel}; handle_timeout(_TRef, expire_awaiting_rel, - Channel = #channel{session = Session}) -> - case emqx_session:expire(awaiting_rel, Session) of + Channel = #channel{session = Session, clientinfo = ClientInfo}) -> + case emqx_session:expire(ClientInfo, awaiting_rel, Session) of {ok, NSession} -> {ok, clean_timer(await_timer, set_session(NSession, Channel))}; {ok, Timeout, NSession} -> @@ -1690,11 +1693,11 @@ maybe_resume_session(#channel{resuming = false}) -> maybe_resume_session(#channel{session = Session, resuming = true, pendings = Pendings, - clientinfo = #{clientid := ClientId}}) -> - {ok, Publishes, Session1} = emqx_session:replay(Session), + clientinfo = #{clientid := ClientId} = ClientInfo}) -> + {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), %% We consider queued/dropped messages as delivered since they are now in the session state. emqx_persistent_session:mark_as_delivered(ClientId, Pendings), - case emqx_session:deliver(Pendings, Session1) of + case emqx_session:deliver(ClientInfo, Pendings, Session1) of {ok, Session2} -> {ok, Publishes, Session2}; {ok, More, Session2} -> diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 4fbfa1155..43566d1df 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -124,36 +124,39 @@ -define(ACTIVE_N, 100). --define(INFO_KEYS, [ socktype - , peername - , sockname - , sockstate - ]). +-define(INFO_KEYS, + [ socktype + , peername + , sockname + , sockstate + ]). --define(CONN_STATS, [ recv_pkt - , recv_msg - , 'recv_msg.qos0' - , 'recv_msg.qos1' - , 'recv_msg.qos2' - , 'recv_msg.dropped' - , 'recv_msg.dropped.expired' - , send_pkt - , send_msg - , 'send_msg.qos0' - , 'send_msg.qos1' - , 'send_msg.qos2' - , 'send_msg.dropped' - , 'send_msg.dropped.expired' - , 'send_msg.dropped.queue_full' - , 'send_msg.dropped.too_large' - ]). +-define(CONN_STATS, + [ recv_pkt + , recv_msg + , 'recv_msg.qos0' + , 'recv_msg.qos1' + , 'recv_msg.qos2' + , 'recv_msg.dropped' + , 'recv_msg.dropped.await_pubrel_timeout' + , send_pkt + , send_msg + , 'send_msg.qos0' + , 'send_msg.qos1' + , 'send_msg.qos2' + , 'send_msg.dropped' + , 'send_msg.dropped.expired' + , 'send_msg.dropped.queue_full' + , 'send_msg.dropped.too_large' + ]). --define(SOCK_STATS, [ recv_oct - , recv_cnt - , send_oct - , send_cnt - , send_pend - ]). +-define(SOCK_STATS, + [ recv_oct + , recv_cnt + , send_oct + , send_cnt + , send_pend + ]). -define(ENABLED(X), (X =/= undefined)). diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index 70e660f28..ad1bb18ce 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -143,7 +143,7 @@ %% PubSub Metrics {counter, 'messages.publish'}, % Messages Publish {counter, 'messages.dropped'}, % Messages dropped due to no subscribers - {counter, 'messages.dropped.expired'}, % QoS2 Messages expired + {counter, 'messages.dropped.await_pubrel_timeout'}, % QoS2 Messages expired {counter, 'messages.dropped.no_subscribers'}, % Messages dropped {counter, 'messages.forward'}, % Messages forward {counter, 'messages.delayed'}, % Messages delayed @@ -552,7 +552,7 @@ reserved_idx('messages.qos2.received') -> 106; reserved_idx('messages.qos2.sent') -> 107; reserved_idx('messages.publish') -> 108; reserved_idx('messages.dropped') -> 109; -reserved_idx('messages.dropped.expired') -> 110; +reserved_idx('messages.dropped.await_pubrel_timeout') -> 110; reserved_idx('messages.dropped.no_subscribers') -> 111; reserved_idx('messages.forward') -> 112; %%reserved_idx('messages.retained') -> 113; %% keep the index, new metrics can use this diff --git a/apps/emqx/src/emqx_persistent_session.erl b/apps/emqx/src/emqx_persistent_session.erl index e625b8fbd..4d6798834 100644 --- a/apps/emqx/src/emqx_persistent_session.erl +++ b/apps/emqx/src/emqx_persistent_session.erl @@ -266,14 +266,14 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) -> %% 1. Get pending messages from DB. ?tp(ps_initial_pendings, #{sid => SessionID}), Pendings1 = pending(SessionID), - Pendings2 = emqx_session:ignore_local(Pendings1, ClientID, Session), + Pendings2 = emqx_session:ignore_local(ClientInfo, Pendings1, ClientID, Session), ?tp(ps_got_initial_pendings, #{ sid => SessionID , msgs => Pendings1}), %% 2. Enqueue messages to mimic that the process was alive %% when the messages were delivered. ?tp(ps_persist_pendings, #{sid => SessionID}), - Session1 = emqx_session:enqueue(Pendings2, Session), + Session1 = emqx_session:enqueue(ClientInfo, Pendings2, Session), Session2 = persist(ClientInfo, ConnInfo, Session1), mark_as_delivered(SessionID, Pendings2), ?tp(ps_persist_pendings_msgs, #{ msgs => Pendings2 @@ -294,7 +294,7 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) -> ?tp(ps_marker_pendings, #{sid => SessionID}), MarkerIDs = [Marker || {_, Marker} <- NodeMarkers], Pendings3 = pending(SessionID, MarkerIDs), - Pendings4 = emqx_session:ignore_local(Pendings3, ClientID, Session), + Pendings4 = emqx_session:ignore_local(ClientInfo, Pendings3, ClientID, Session), ?tp(ps_marker_pendings_msgs, #{ sid => SessionID , msgs => Pendings4}), diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 479f4f1b2..1d888d910 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -66,27 +66,27 @@ , unsubscribe/4 ]). --export([ publish/3 - , puback/2 - , pubrec/2 - , pubrel/2 - , pubcomp/2 +-export([ publish/4 + , puback/3 + , pubrec/3 + , pubrel/3 + , pubcomp/3 ]). --export([ deliver/2 - , enqueue/2 - , dequeue/1 - , ignore_local/3 - , retry/1 +-export([ deliver/3 + , enqueue/3 + , dequeue/2 + , ignore_local/4 + , retry/2 , terminate/3 ]). -export([ takeover/1 , resume/2 - , replay/1 + , replay/2 ]). --export([expire/2]). +-export([expire/3]). %% Export for CT -export([set_field/3]). @@ -147,27 +147,29 @@ -type(replies() :: list(publish() | pubrel())). --define(INFO_KEYS, [id, - is_persistent, - subscriptions, - upgrade_qos, - retry_interval, - await_rel_timeout, - created_at - ]). +-define(INFO_KEYS, + [ id + , is_persistent + , subscriptions + , upgrade_qos + , retry_interval + , await_rel_timeout + , created_at + ]). --define(STATS_KEYS, [subscriptions_cnt, - subscriptions_max, - inflight_cnt, - inflight_max, - mqueue_len, - mqueue_max, - mqueue_dropped, - next_pkt_id, - awaiting_rel_cnt, - awaiting_rel_max, - latency_stats - ]). +-define(STATS_KEYS, + [ subscriptions_cnt + , subscriptions_max + , inflight_cnt + , inflight_max + , mqueue_len + , mqueue_max + , mqueue_dropped + , next_pkt_id + , awaiting_rel_cnt + , awaiting_rel_max + , latency_stats + ]). -define(DEFAULT_BATCH_N, 1000). @@ -277,18 +279,19 @@ stats(Session) -> info(?STATS_KEYS, Session). %% Ignore local messages %%-------------------------------------------------------------------- -ignore_local(Delivers, Subscriber, Session) -> +ignore_local(ClientInfo, Delivers, Subscriber, Session) -> Subs = info(subscriptions, Session), - lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) -> - case maps:find(Topic, Subs) of - {ok, #{nl := 1}} when Subscriber =:= Publisher -> - ok = emqx_metrics:inc('delivery.dropped'), - ok = emqx_metrics:inc('delivery.dropped.no_local'), - true; - _ -> - false - end - end, Delivers). + lists:dropwhile(fun({deliver, Topic, #message{from = Publisher} = Msg}) -> + case maps:find(Topic, Subs) of + {ok, #{nl := 1}} when Subscriber =:= Publisher -> + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]), + ok = emqx_metrics:inc('delivery.dropped'), + ok = emqx_metrics:inc('delivery.dropped.no_local'), + true; + _ -> + false + end + end, Delivers). %%-------------------------------------------------------------------- %% Client -> Broker: SUBSCRIBE @@ -310,7 +313,6 @@ subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts, true -> {error, ?RC_QUOTA_EXCEEDED} end. --compile({inline, [is_subscriptions_full/1]}). is_subscriptions_full(#session{max_subscriptions = infinity}) -> false; is_subscriptions_full(#session{subscriptions = Subs, @@ -340,10 +342,10 @@ unsubscribe(ClientInfo, TopicFilter, UnSubOpts, %% Client -> Broker: PUBLISH %%-------------------------------------------------------------------- --spec(publish(emqx_types:packet_id(), emqx_types:message(), session()) +-spec(publish(emqx_types:clientinfo(), emqx_types:packet_id(), emqx_types:message(), session()) -> {ok, emqx_types:publish_result(), session()} | {error, emqx_types:reason_code()}). -publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}, +publish(_ClientInfo, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}, Session = #session{awaiting_rel = AwaitingRel}) -> case is_awaiting_full(Session) of false -> @@ -359,10 +361,9 @@ publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}, end; %% Publish QoS0/1 directly -publish(_PacketId, Msg, Session) -> +publish(_ClientInfo, _PacketId, Msg, Session) -> {ok, emqx_broker:publish(Msg), Session}. --compile({inline, [is_awaiting_full/1]}). is_awaiting_full(#session{max_awaiting_rel = infinity}) -> false; is_awaiting_full(#session{awaiting_rel = AwaitingRel, @@ -373,23 +374,22 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, %% Client -> Broker: PUBACK %%-------------------------------------------------------------------- --spec(puback(emqx_types:packet_id(), session()) +-spec(puback(emqx_types:clientinfo(), emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} | {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}). -puback(PacketId, Session = #session{inflight = Inflight}) -> +puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), Session2 = update_latency(Msg, Session), - return_with(Msg, dequeue(Session2#session{inflight = Inflight1})); + return_with(Msg, dequeue(ClientInfo, Session2#session{inflight = Inflight1})); {value, {_Pubrel, _Ts}} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. --compile({inline, [return_with/2]}). return_with(Msg, {ok, Session}) -> {ok, Msg, Session}; return_with(Msg, {ok, Publishes, Session}) -> @@ -399,10 +399,10 @@ return_with(Msg, {ok, Publishes, Session}) -> %% Client -> Broker: PUBREC %%-------------------------------------------------------------------- --spec(pubrec(emqx_types:packet_id(), session()) +-spec(pubrec(emqx_types:clientinfo(), emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} | {error, emqx_types:reason_code()}). -pubrec(PacketId, Session = #session{inflight = Inflight}) -> +pubrec(_ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}), @@ -418,9 +418,9 @@ pubrec(PacketId, Session = #session{inflight = Inflight}) -> %% Client -> Broker: PUBREL %%-------------------------------------------------------------------- --spec(pubrel(emqx_types:packet_id(), session()) +-spec(pubrel(emqx_types:clientinfo(), emqx_types:packet_id(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). -pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> +pubrel(_ClientInfo, PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> case maps:take(PacketId, AwaitingRel) of {_Ts, AwaitingRel1} -> {ok, Session#session{awaiting_rel = AwaitingRel1}}; @@ -432,15 +432,15 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> %% Client -> Broker: PUBCOMP %%-------------------------------------------------------------------- --spec(pubcomp(emqx_types:packet_id(), session()) +-spec(pubcomp(emqx_types:clientinfo(), emqx_types:packet_id(), session()) -> {ok, session()} | {ok, replies(), session()} | {error, emqx_types:reason_code()}). -pubcomp(PacketId, Session = #session{inflight = Inflight}) -> +pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) -> Session2 = update_latency(Pubrel, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), - dequeue(Session2#session{inflight = Inflight1}); + dequeue(ClientInfo, Session2#session{inflight = Inflight1}); {value, _Other} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -451,29 +451,30 @@ pubcomp(PacketId, Session = #session{inflight = Inflight}) -> %% Dequeue Msgs %%-------------------------------------------------------------------- -dequeue(Session = #session{inflight = Inflight, mqueue = Q}) -> +dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) -> case emqx_mqueue:is_empty(Q) of true -> {ok, Session}; false -> - {Msgs, Q1} = dequeue(batch_n(Inflight), [], Q), - deliver(Msgs, [], Session#session{mqueue = Q1}) + {Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q), + do_deliver(ClientInfo, Msgs, [], Session#session{mqueue = Q1}) end. -dequeue(0, Msgs, Q) -> +dequeue(_ClientInfo, 0, Msgs, Q) -> {lists:reverse(Msgs), Q}; -dequeue(Cnt, Msgs, Q) -> +dequeue(ClientInfo, Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of - {empty, _Q} -> dequeue(0, Msgs, Q); + {empty, _Q} -> dequeue(ClientInfo, 0, Msgs, Q); {{value, Msg}, Q1} -> case emqx_message:is_expired(Msg) of - true -> ok = inc_expired_cnt(delivery), - dequeue(Cnt, Msgs, Q1); - false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1) + true -> + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), + ok = inc_delivery_expired_cnt(), + dequeue(ClientInfo, Cnt, Msgs, Q1); + false -> dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg|Msgs], Q1) end end. --compile({inline, [acc_cnt/2]}). acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt; acc_cnt(_Msg, Cnt) -> Cnt - 1. @@ -481,38 +482,38 @@ acc_cnt(_Msg, Cnt) -> Cnt - 1. %% Broker -> Client: Deliver %%-------------------------------------------------------------------- --spec(deliver(list(emqx_types:deliver()), session()) +-spec(deliver(emqx_types:clientinfo(), list(emqx_types:deliver()), session()) -> {ok, session()} | {ok, replies(), session()}). -deliver([Deliver], Session) -> %% Optimize - Enrich = enrich_fun(Session), - deliver_msg(Enrich(Deliver), Session); +deliver(ClientInfo, [Deliver], Session) -> %% Optimize + Msg = enrich_deliver(Deliver, Session), + deliver_msg(ClientInfo, Msg, Session); -deliver(Delivers, Session) -> - Msgs = lists:map(enrich_fun(Session), Delivers), - deliver(Msgs, [], Session). +deliver(ClientInfo, Delivers, Session) -> + Msgs = [enrich_deliver(D, Session) || D <- Delivers], + do_deliver(ClientInfo, Msgs, [], Session). -deliver([], Publishes, Session) -> +do_deliver(_ClientInfo, [], Publishes, Session) -> {ok, lists:reverse(Publishes), Session}; -deliver([Msg | More], Acc, Session) -> - case deliver_msg(Msg, Session) of +do_deliver(ClientInfo, [Msg | More], Acc, Session) -> + case deliver_msg(ClientInfo, Msg, Session) of {ok, Session1} -> - deliver(More, Acc, Session1); + do_deliver(ClientInfo, More, Acc, Session1); {ok, [Publish], Session1} -> - deliver(More, [Publish|Acc], Session1) + do_deliver(ClientInfo, More, [Publish|Acc], Session1) end. -deliver_msg(Msg = #message{qos = ?QOS_0}, Session) -> +deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) -> {ok, [{undefined, maybe_ack(Msg)}], Session}; -deliver_msg(Msg = #message{qos = QoS}, Session = +deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = #session{next_pkt_id = PacketId, inflight = Inflight}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case emqx_inflight:is_full(Inflight) of true -> Session1 = case maybe_nack(Msg) of true -> Session; - false -> enqueue(Msg, Session) + false -> enqueue(ClientInfo, Msg, Session) end, {ok, Session1}; false -> @@ -521,32 +522,34 @@ deliver_msg(Msg = #message{qos = QoS}, Session = {ok, [Publish], next_pkt_id(Session1)} end. --spec(enqueue(list(emqx_types:deliver())|emqx_types:message(), +-spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver())|emqx_types:message(), session()) -> session()). -enqueue([Deliver], Session) -> %% Optimize - Enrich = enrich_fun(Session), - enqueue(Enrich(Deliver), Session); +enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) -> + lists:foldl(fun(Deliver, Session0) -> + Msg = enrich_deliver(Deliver, Session), + enqueue(ClientInfo, Msg, Session0) + end, Session, Delivers); -enqueue(Delivers, Session) when is_list(Delivers) -> - Msgs = lists:map(enrich_fun(Session), Delivers), - lists:foldl(fun enqueue/2, Session, Msgs); - -enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> +enqueue(ClientInfo, #message{} = Msg, Session = #session{mqueue = Q}) -> {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), - (Dropped =/= undefined) andalso log_dropped(Dropped, Session), + (Dropped =/= undefined) andalso handle_dropped(ClientInfo, Dropped, Session), Session#session{mqueue = NewQ}. -log_dropped(Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) -> +handle_dropped(ClientInfo, Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) -> Payload = emqx_message:to_log_map(Msg), #{store_qos0 := StoreQos0} = QueueInfo = emqx_mqueue:info(Q), case (QoS == ?QOS_0) andalso (not StoreQos0) of true -> + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]), + ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), ok = inc_pd('send_msg.dropped'), ?SLOG(warning, #{msg => "dropped_qos0_msg", queue => QueueInfo, payload => Payload}, #{topic => Topic}); false -> + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]), + ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.queue_full'), ok = inc_pd('send_msg.dropped'), ok = inc_pd('send_msg.dropped.queue_full'), @@ -555,10 +558,8 @@ log_dropped(Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) -> payload => Payload}, #{topic => Topic}) end. -enrich_fun(Session = #session{subscriptions = Subs}) -> - fun({deliver, Topic, Msg}) -> - enrich_subopts(get_subopts(Topic, Subs), Msg, Session) - end. +enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) -> + enrich_subopts(get_subopts(Topic, Subs), Msg, Session). maybe_ack(Msg) -> case emqx_shared_sub:is_ack_required(Msg) of @@ -613,53 +614,54 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) -> %% Retry Delivery %%-------------------------------------------------------------------- --spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). -retry(Session = #session{inflight = Inflight, retry_interval = RetryInterval}) -> +-spec(retry(emqx_types:clientinfo(), session()) -> + {ok, session()} | {ok, replies(), timeout(), session()}). +retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session}; false -> Now = erlang:system_time(millisecond), Session2 = check_expire_latency(Now, RetryInterval, Session), - retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), - [], - Now, - Session2) + retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), [], Now, + Session2, ClientInfo) end. -retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) -> +retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) -> {ok, lists:reverse(Acc), Interval, Session}; retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session = - #session{retry_interval = Interval, inflight = Inflight}) -> + #session{retry_interval = Interval, inflight = Inflight}, ClientInfo) -> case (Age = age(Now, Ts)) >= Interval of true -> - {Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight), - retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}); + {Acc1, Inflight1} = do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo), + retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo); false -> {ok, lists:reverse(Acc), Interval - max(0, Age), Session} end. -retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -> +do_retry_delivery(PacketId, pubrel, Now, Acc, Inflight, _) -> + Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight), + {[{pubrel, PacketId}|Acc], Inflight1}; + +do_retry_delivery(PacketId, #message{} = Msg, Now, Acc, Inflight, ClientInfo) -> case emqx_message:is_expired(Msg) of true -> - ok = inc_expired_cnt(delivery), + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), + ok = inc_delivery_expired_cnt(), {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> Msg1 = emqx_message:set_flag(dup, true, Msg), Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight), {[{PacketId, Msg1}|Acc], Inflight1} - end; - -retry_delivery(PacketId, Pubrel, Now, Acc, Inflight) -> - Inflight1 = emqx_inflight:update(PacketId, {Pubrel, Now}, Inflight), - {[{pubrel, PacketId}|Acc], Inflight1}. + end. %%-------------------------------------------------------------------- %% Expire Awaiting Rel %%-------------------------------------------------------------------- --spec(expire(awaiting_rel, session()) -> {ok, session()} | {ok, timeout(), session()}). -expire(awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) -> +-spec(expire(emqx_types:clientinfo(), awaiting_rel, session()) -> + {ok, session()} | {ok, timeout(), session()}). +expire(_ClientInfo, awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) -> case maps:size(AwaitingRel) of 0 -> {ok, Session}; _ -> expire_awaiting_rel(erlang:system_time(millisecond), Session) @@ -670,7 +672,7 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel, NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end, AwaitingRel1 = maps:filter(NotExpired, AwaitingRel), ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1), - (ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt), + (ExpiredCnt > 0) andalso inc_await_pubrel_timeout(ExpiredCnt), NSession = Session#session{awaiting_rel = AwaitingRel1}, case maps:size(AwaitingRel1) of 0 -> {ok, NSession}; @@ -693,14 +695,14 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = ok = emqx_metrics:inc('session.resumed'), emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]). --spec(replay(session()) -> {ok, replies(), session()}). -replay(Session = #session{inflight = Inflight}) -> +-spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}). +replay(ClientInfo, Session = #session{inflight = Inflight}) -> Pubs = lists:map(fun({PacketId, {Pubrel, _Ts}}) when is_record(Pubrel, pubrel_await) -> {pubrel, PacketId}; ({PacketId, {Msg, _Ts}}) -> {PacketId, emqx_message:set_flag(dup, true, Msg)} end, emqx_inflight:to_list(Inflight)), - case dequeue(Session) of + case dequeue(ClientInfo, Session) of {ok, NSession} -> {ok, Pubs, NSession}; {ok, More, NSession} -> {ok, lists:append(Pubs, More), NSession} @@ -714,29 +716,26 @@ terminate(ClientInfo, takenover, Session) -> terminate(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). --compile({inline, [run_hook/2]}). run_hook(Name, Args) -> ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). %%-------------------------------------------------------------------- %% Inc message/delivery expired counter %%-------------------------------------------------------------------- +inc_delivery_expired_cnt() -> + inc_delivery_expired_cnt(1). --compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}). - -inc_expired_cnt(K) -> inc_expired_cnt(K, 1). - -inc_expired_cnt(delivery, N) -> +inc_delivery_expired_cnt(N) -> ok = inc_pd('send_msg.dropped', N), ok = inc_pd('send_msg.dropped.expired', N), ok = emqx_metrics:inc('delivery.dropped', N), - emqx_metrics:inc('delivery.dropped.expired', N); + emqx_metrics:inc('delivery.dropped.expired', N). -inc_expired_cnt(message, N) -> +inc_await_pubrel_timeout(N) -> ok = inc_pd('recv_msg.dropped', N), - ok = inc_pd('recv_msg.dropped.expired', N), + ok = inc_pd('recv_msg.dropped.await_pubrel_timeout', N), ok = emqx_metrics:inc('messages.dropped', N), - emqx_metrics:inc('messages.dropped.expired', N). + emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N). inc_pd(Key) -> inc_pd(Key, 1). @@ -747,9 +746,6 @@ inc_pd(Key, Inc) -> %%-------------------------------------------------------------------- %% Next Packet Id %%-------------------------------------------------------------------- - --compile({inline, [next_pkt_id/1]}). - next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) -> Session#session{next_pkt_id = 1}; @@ -788,9 +784,6 @@ get_birth_timestamp(_, _) -> %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- - --compile({inline, [sort_fun/0, batch_n/1, with_ts/1, age/2]}). - sort_fun() -> fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 =< Ts2 end. diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 19bc2b3c3..a0f3a935b 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -115,6 +115,56 @@ listeners_conf() -> ws => #{default => listener_mqtt_ws_conf()} }. +limiter_conf() -> + #{bytes_in => + #{bucket => + #{default => + #{aggregated => + #{capacity => infinity,initial => 0,rate => infinity}, + per_client => + #{capacity => infinity,divisible => false, + failure_strategy => force,initial => 0,low_water_mark => 0, + max_retry_time => 5000,rate => infinity}, + zone => default}}, + global => #{burst => 0,rate => infinity}, + zone => #{default => #{burst => 0,rate => infinity}}}, + connection => + #{bucket => + #{default => + #{aggregated => + #{capacity => infinity,initial => 0,rate => infinity}, + per_client => + #{capacity => infinity,divisible => false, + failure_strategy => force,initial => 0,low_water_mark => 0, + max_retry_time => 5000,rate => infinity}, + zone => default}}, + global => #{burst => 0,rate => infinity}, + zone => #{default => #{burst => 0,rate => infinity}}}, + message_in => + #{bucket => + #{default => + #{aggregated => + #{capacity => infinity,initial => 0,rate => infinity}, + per_client => + #{capacity => infinity,divisible => false, + failure_strategy => force,initial => 0,low_water_mark => 0, + max_retry_time => 5000,rate => infinity}, + zone => default}}, + global => #{burst => 0,rate => infinity}, + zone => #{default => #{burst => 0,rate => infinity}}}, + message_routing => + #{bucket => + #{default => + #{aggregated => + #{capacity => infinity,initial => 0,rate => infinity}, + per_client => + #{capacity => infinity,divisible => false, + failure_strategy => force,initial => 0,low_water_mark => 0, + max_retry_time => 5000,rate => infinity}, + zone => default}}, + global => #{burst => 0,rate => infinity}, + zone => #{default => #{burst => 0,rate => infinity}}}}. + stats_conf() -> #{enable => true}. @@ -130,11 +180,11 @@ basic_conf() -> stats => stats_conf(), listeners => listeners_conf(), zones => zone_conf(), - limiter => emqx:get_config([limiter]) + limiter => limiter_conf() }. set_test_listener_confs() -> - Conf = emqx_config:get([]), + Conf = emqx_config:get([], #{}), emqx_config:put(basic_conf()), Conf. @@ -180,10 +230,10 @@ end_per_suite(_Config) -> ]). init_per_testcase(TestCase, Config) -> - NewConf = set_test_listener_confs(), + OldConf = set_test_listener_confs(), emqx_common_test_helpers:start_apps([]), - modify_limiter(TestCase, NewConf), - [{config, NewConf}|Config]. + modify_limiter(TestCase, OldConf), + [{config, OldConf}|Config]. end_per_testcase(_TestCase, Config) -> emqx_config:put(?config(config, Config)), @@ -232,15 +282,16 @@ t_chan_info(_) -> ?assertEqual(clientinfo(), ClientInfo). t_chan_caps(_) -> - #{max_clientid_len := 65535, + ?assertMatch(#{ + max_clientid_len := 65535, max_qos_allowed := 2, max_topic_alias := 65535, - max_topic_levels := 128, + max_topic_levels := Level, retain_available := true, shared_subscription := true, subscription_identifiers := true, wildcard_subscription := true - } = emqx_channel:caps(channel()). + } when is_integer(Level), emqx_channel:caps(channel())). %%-------------------------------------------------------------------- %% Test cases for channel handle_in @@ -377,14 +428,14 @@ t_handle_in_qos2_publish_with_error_return(_) -> t_handle_in_puback_ok(_) -> Msg = emqx_message:make(<<"t">>, <<"payload">>), ok = meck:expect(emqx_session, puback, - fun(_PacketId, Session) -> {ok, Msg, Session} end), + fun(_, _PacketId, Session) -> {ok, Msg, Session} end), Channel = channel(#{conn_state => connected}), {ok, _NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel). % ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)). t_handle_in_puback_id_in_use(_) -> ok = meck:expect(emqx_session, puback, - fun(_, _Session) -> + fun(_, _, _Session) -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} end), {ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()). @@ -392,7 +443,7 @@ t_handle_in_puback_id_in_use(_) -> t_handle_in_puback_id_not_found(_) -> ok = meck:expect(emqx_session, puback, - fun(_, _Session) -> + fun(_, _, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), {ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()). @@ -430,14 +481,14 @@ t_override_client_receive_maximum(_) -> t_handle_in_pubrec_ok(_) -> Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>), - ok = meck:expect(emqx_session, pubrec, fun(_, Session) -> {ok, Msg, Session} end), + ok = meck:expect(emqx_session, pubrec, fun(_, _, Session) -> {ok, Msg, Session} end), Channel = channel(#{conn_state => connected}), {ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), _Channel1} = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel). t_handle_in_pubrec_id_in_use(_) -> ok = meck:expect(emqx_session, pubrec, - fun(_, _Session) -> + fun(_, _, _Session) -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} end), {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel} = @@ -445,34 +496,34 @@ t_handle_in_pubrec_id_in_use(_) -> t_handle_in_pubrec_id_not_found(_) -> ok = meck:expect(emqx_session, pubrec, - fun(_, _Session) -> + fun(_, _, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()). t_handle_in_pubrel_ok(_) -> - ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, pubrel, fun(_, _, Session) -> {ok, Session} end), Channel = channel(#{conn_state => connected}), {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel1} = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel). t_handle_in_pubrel_not_found_error(_) -> ok = meck:expect(emqx_session, pubrel, - fun(_PacketId, _Session) -> + fun(_, _PacketId, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()). t_handle_in_pubcomp_ok(_) -> - ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, pubcomp, fun(_, _, Session) -> {ok, Session} end), {ok, _Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()). % ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)). t_handle_in_pubcomp_not_found_error(_) -> ok = meck:expect(emqx_session, pubcomp, - fun(_PacketId, _Session) -> + fun(_, _PacketId, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), Channel = channel(#{conn_state => connected}), @@ -795,13 +846,13 @@ t_handle_timeout_keepalive(_) -> t_handle_timeout_retry_delivery(_) -> TRef = make_ref(), - ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, retry, fun(_, Session) -> {ok, Session} end), Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()), {ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, Channel). t_handle_timeout_expire_awaiting_rel(_) -> TRef = make_ref(), - ok = meck:expect(emqx_session, expire, fun(_, Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, expire, fun(_, _, Session) -> {ok, Session} end), Channel = emqx_channel:set_field(timers, #{await_timer => TRef}, channel()), {ok, _Chan} = emqx_channel:handle_timeout(TRef, expire_awaiting_rel, Channel). diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index 7976001a3..9932424d7 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -126,23 +126,23 @@ t_unsubscribe(_) -> t_publish_qos0(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Msg = emqx_message:make(clientid, ?QOS_0, <<"t">>, <<"payload">>), - {ok, [], Session} = emqx_session:publish(1, Msg, Session = session()), - {ok, [], Session} = emqx_session:publish(undefined, Msg, Session). + {ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, Session = session()), + {ok, [], Session} = emqx_session:publish(clientinfo(), undefined, Msg, Session). t_publish_qos1(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Msg = emqx_message:make(clientid, ?QOS_1, <<"t">>, <<"payload">>), - {ok, [], Session} = emqx_session:publish(1, Msg, Session = session()), - {ok, [], Session} = emqx_session:publish(2, Msg, Session). + {ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, Session = session()), + {ok, [], Session} = emqx_session:publish(clientinfo(), 2, Msg, Session). t_publish_qos2(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>), - {ok, [], Session} = emqx_session:publish(1, Msg, session()), + {ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, session()), ?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)), - {ok, Session1} = emqx_session:pubrel(1, Session), + {ok, Session1} = emqx_session:pubrel(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session1)), - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, Session1). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, Session1). t_publish_qos2_with_error_return(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), @@ -150,10 +150,10 @@ t_publish_qos2_with_error_return(_) -> awaiting_rel => #{1 => ts(millisecond)} }), Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>), - {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(1, Msg, Session), - {ok, [], Session1} = emqx_session:publish(2, Msg, Session), + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(clientinfo(), 1, Msg, Session), + {ok, [], Session1} = emqx_session:publish(clientinfo(), 2, Msg, Session), ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)), - {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(3, Msg, Session1). + {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(clientinfo(), 3, Msg, Session1). t_is_awaiting_full_false(_) -> Session = session(#{max_awaiting_rel => infinity}), @@ -169,7 +169,7 @@ t_puback(_) -> Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>), Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight, mqueue => mqueue()}), - {ok, Msg, Session1} = emqx_session:puback(1, Session), + {ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). t_puback_with_dequeue(_) -> @@ -178,7 +178,7 @@ t_puback_with_dequeue(_) -> Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>), {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})), Session = session(#{inflight => Inflight, mqueue => Q}), - {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(1, Session), + {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(clientinfo(), 1, Session), ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(0, emqx_session:info(mqueue_len, Session1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)). @@ -186,48 +186,48 @@ t_puback_with_dequeue(_) -> t_puback_error_packet_id_in_use(_) -> Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = - emqx_session:puback(1, session(#{inflight => Inflight})). + emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})). t_puback_error_packet_id_not_found(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(clientinfo(), 1, session()). t_pubrec(_) -> Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), - {ok, Msg, Session1} = emqx_session:pubrec(2, Session), + {ok, Msg, Session1} = emqx_session:pubrec(clientinfo(), 2, Session), ?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))). t_pubrec_packet_id_in_use_error(_) -> Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = - emqx_session:pubrec(1, session(#{inflight => Inflight})). + emqx_session:pubrec(clientinfo(), 1, session(#{inflight => Inflight})). t_pubrec_packet_id_not_found_error(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(1, session()). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(clientinfo(), 1, session()). t_pubrel(_) -> Session = session(#{awaiting_rel => #{1 => ts(millisecond)}}), - {ok, Session1} = emqx_session:pubrel(1, Session), + {ok, Session1} = emqx_session:pubrel(clientinfo(), 1, Session), ?assertEqual(#{}, emqx_session:info(awaiting_rel, Session1)). t_pubrel_error_packetid_not_found(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, session()). t_pubcomp(_) -> Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), - {ok, Session1} = emqx_session:pubcomp(1, Session), + {ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). t_pubcomp_error_packetid_in_use(_) -> Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), - {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(1, Session). + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(clientinfo(), 1, Session). t_pubcomp_error_packetid_not_found(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(1, session()). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(clientinfo(), 1, session()). %%-------------------------------------------------------------------- %% Test cases for deliver/retry @@ -235,14 +235,16 @@ t_pubcomp_error_packetid_not_found(_) -> t_dequeue(_) -> Q = mqueue(#{store_qos0 => true}), - {ok, Session} = emqx_session:dequeue(session(#{mqueue => Q})), + {ok, Session} = emqx_session:dequeue(clientinfo(), session(#{mqueue => Q})), Msgs = [emqx_message:make(clientid, ?QOS_0, <<"t0">>, <<"payload">>), emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>), emqx_message:make(clientid, ?QOS_2, <<"t2">>, <<"payload">>) ], - Session1 = lists:foldl(fun emqx_session:enqueue/2, Session, Msgs), + Session1 = lists:foldl(fun(Msg, S) -> + emqx_session:enqueue(clientinfo(), Msg, S) + end, Session, Msgs), {ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} = - emqx_session:dequeue(Session1), + emqx_session:dequeue(clientinfo(), Session1), ?assertEqual(0, emqx_session:info(mqueue_len, Session2)), ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)), ?assertEqual(<<"t0">>, emqx_message:topic(Msg0)), @@ -257,7 +259,7 @@ t_deliver_qos0(_) -> clientinfo(), <<"t1">>, subopts(), Session), Deliveries = [delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]], {ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} = - emqx_session:deliver(Deliveries, Session1), + emqx_session:deliver(clientinfo(), Deliveries, Session1), ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)). @@ -266,38 +268,38 @@ t_deliver_qos1(_) -> {ok, Session} = emqx_session:subscribe( clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()), Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]], - {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session), + {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)), - {ok, Msg1, Session2} = emqx_session:puback(1, Session1), + {ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1), ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), - {ok, Msg2, Session3} = emqx_session:puback(2, Session2), + {ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2), ?assertEqual(0, emqx_session:info(inflight_cnt, Session3)). t_deliver_qos2(_) -> ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), Delivers = [delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)], {ok, [{1, Msg1}, {2, Msg2}], Session} = - emqx_session:deliver(Delivers, session()), + emqx_session:deliver(clientinfo(), Delivers, session()), ?assertEqual(2, emqx_session:info(inflight_cnt, Session)), ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)). t_deliver_one_msg(_) -> {ok, [{1, Msg}], Session} = - emqx_session:deliver([delivery(?QOS_1, <<"t1">>)], session()), + emqx_session:deliver(clientinfo(), [delivery(?QOS_1, <<"t1">>)], session()), ?assertEqual(1, emqx_session:info(inflight_cnt, Session)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg)). t_deliver_when_inflight_is_full(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session = session(#{inflight => emqx_inflight:new(1)}), - {ok, Publishes, Session1} = emqx_session:deliver(Delivers, Session), + {ok, Publishes, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ?assertEqual(1, length(Publishes)), ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(1, emqx_session:info(mqueue_len, Session1)), - {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(1, Session1), + {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(clientinfo(), 1, Session1), ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), ?assertEqual(0, emqx_session:info(mqueue_len, Session2)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), @@ -305,8 +307,8 @@ t_deliver_when_inflight_is_full(_) -> t_enqueue(_) -> %% store_qos0 = true - Session = emqx_session:enqueue([delivery(?QOS_0, <<"t0">>)], session()), - Session1 = emqx_session:enqueue([delivery(?QOS_1, <<"t1">>), + Session = emqx_session:enqueue(clientinfo(), [delivery(?QOS_0, <<"t0">>)], session()), + Session1 = emqx_session:enqueue(clientinfo(), [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session), ?assertEqual(3, emqx_session:info(mqueue_len, Session1)). @@ -314,11 +316,11 @@ t_retry(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], RetryIntervalMs = 100, %% 0.1s Session = session(#{retry_interval => RetryIntervalMs}), - {ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session), + {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ElapseMs = 200, %% 0.2s ok = timer:sleep(ElapseMs), Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs], - {ok, Msgs1, 100, Session2} = emqx_session:retry(Session1), + {ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1), ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)). %%-------------------------------------------------------------------- @@ -337,24 +339,24 @@ t_resume(_) -> t_replay(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], - {ok, Pubs, Session1} = emqx_session:deliver(Delivers, session()), + {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, session()), Msg = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>), - Session2 = emqx_session:enqueue(Msg, Session1), + Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1), Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs], - {ok, ReplayPubs, Session3} = emqx_session:replay(Session2), + {ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2), ?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs), ?assertEqual(3, emqx_session:info(inflight_cnt, Session3)). t_expire_awaiting_rel(_) -> - {ok, Session} = emqx_session:expire(awaiting_rel, session()), + {ok, Session} = emqx_session:expire(clientinfo(), awaiting_rel, session()), Timeout = emqx_session:info(await_rel_timeout, Session), Session1 = emqx_session:set_field(awaiting_rel, #{1 => Ts = ts(millisecond)}, Session), - {ok, Timeout, Session2} = emqx_session:expire(awaiting_rel, Session1), + {ok, Timeout, Session2} = emqx_session:expire(clientinfo(), awaiting_rel, Session1), ?assertEqual(#{1 => Ts}, emqx_session:info(awaiting_rel, Session2)). t_expire_awaiting_rel_all(_) -> Session = session(#{awaiting_rel => #{1 => 1, 2 => 2}}), - {ok, Session1} = emqx_session:expire(awaiting_rel, Session), + {ok, Session1} = emqx_session:expire(clientinfo(), awaiting_rel, Session), ?assertEqual(#{}, emqx_session:info(awaiting_rel, Session1)). %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 5dc42cc88..4a0a49b91 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -527,7 +527,7 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), clientinfo = ClientInfo = #{clientid := ClientId}}) -> case ReturnCode of ?SN_RC_ACCEPTED -> - case emqx_session:puback(MsgId, Session) of + case emqx_session:puback(ClientInfo, MsgId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), {ok, Channel#channel{session = NSession}}; @@ -574,7 +574,7 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId), Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) -> - case emqx_session:pubrec(MsgId, Session) of + case emqx_session:pubrec(ClientInfo, MsgId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), NChannel = Channel#channel{session = NSession}, @@ -596,8 +596,8 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId), end; handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId), - Channel = #channel{ctx = Ctx, session = Session}) -> - case emqx_session:pubrel(MsgId, Session) of + Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) -> + case emqx_session:pubrel(ClientInfo, MsgId, Session) of {ok, NSession} -> NChannel = Channel#channel{session = NSession}, handle_out(pubcomp, MsgId, NChannel); @@ -611,8 +611,8 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId), end; handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId), - Channel = #channel{ctx = Ctx, session = Session}) -> - case emqx_session:pubcomp(MsgId, Session) of + Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) -> + case emqx_session:pubcomp(ClientInfo, MsgId, Session) of {ok, NSession} -> {ok, Channel#channel{session = NSession}}; {ok, Publishes, NSession} -> @@ -823,8 +823,8 @@ do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_1}, Channel) -> handle_out(puback, {TopicId, MsgId, ?SN_RC_ACCEPTED}, Channel); do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_2}, - Channel = #channel{ctx = Ctx, session = Session}) -> - case emqx_session:publish(MsgId, Msg, Session) of + Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) -> + case emqx_session:publish(ClientInfo, MsgId, Msg, Session) of {ok, _PubRes, NSession} -> NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession} @@ -1012,9 +1012,9 @@ do_unsubscribe(TopicFilters, %%-------------------------------------------------------------------- %% Awake & Asleep -awake(Channel = #channel{session = Session}) -> - {ok, Publishes, Session1} = emqx_session:replay(Session), - {NPublishes, NSession} = case emqx_session:deliver([], Session1) of +awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) -> + {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), + {NPublishes, NSession} = case emqx_session:deliver(ClientInfo, [], Session1) of {ok, Session2} -> {Publishes, Session2}; {ok, More, Session2} -> @@ -1109,9 +1109,9 @@ maybe_resume_session(#channel{resuming = false}) -> ignore; maybe_resume_session(#channel{session = Session, resuming = true, - pendings = Pendings}) -> - {ok, Publishes, Session1} = emqx_session:replay(Session), - case emqx_session:deliver(Pendings, Session1) of + pendings = Pendings, clientinfo = ClientInfo}) -> + {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), + case emqx_session:deliver(ClientInfo, Pendings, Session1) of {ok, Session2} -> {ok, Publishes, Session2}; {ok, More, Session2} -> @@ -1335,10 +1335,10 @@ handle_deliver(Delivers, Channel = #channel{ ctx = Ctx, conn_state = ConnState, session = Session, - clientinfo = #{clientid := ClientId}}) + clientinfo = ClientInfo = #{clientid := ClientId}}) when ConnState =:= disconnected; ConnState =:= asleep -> - NSession = emqx_session:enqueue( + NSession = emqx_session:enqueue(ClientInfo, ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx), Session ), @@ -1359,8 +1359,8 @@ handle_deliver(Delivers, Channel = #channel{ handle_deliver(Delivers, Channel = #channel{ ctx = Ctx, session = Session, - clientinfo = #{clientid := ClientId}}) -> - case emqx_session:deliver( + clientinfo = ClientInfo = #{clientid := ClientId}}) -> + case emqx_session:deliver(ClientInfo, ignore_local(Delivers, ClientId, Session, Ctx), Session ) of @@ -1427,8 +1427,8 @@ handle_timeout(_TRef, retry_delivery, Channel = #channel{conn_state = asleep}) -> {ok, reset_timer(retry_timer, Channel)}; handle_timeout(_TRef, retry_delivery, - Channel = #channel{session = Session}) -> - case emqx_session:retry(Session) of + Channel = #channel{session = Session, clientinfo = ClientInfo}) -> + case emqx_session:retry(ClientInfo, Session) of {ok, NSession} -> {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; {ok, Publishes, Timeout, NSession} -> @@ -1443,8 +1443,8 @@ handle_timeout(_TRef, expire_awaiting_rel, Channel = #channel{conn_state = asleep}) -> {ok, reset_timer(await_timer, Channel)}; handle_timeout(_TRef, expire_awaiting_rel, - Channel = #channel{session = Session}) -> - case emqx_session:expire(awaiting_rel, Session) of + Channel = #channel{session = Session, clientinfo = ClientInfo}) -> + case emqx_session:expire(ClientInfo, awaiting_rel, Session) of {ok, NSession} -> {ok, clean_timer(await_timer, Channel#channel{session = NSession})}; {ok, Timeout, NSession} -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 5cb40d519..5c9747cbf 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -153,9 +153,9 @@ properties(client) -> {'recv_msg.qos2', integer, <<"Number of PUBLISH QoS2 packets received">>}, {'recv_msg.dropped', integer, - <<"Number of dropped PUBLISH packets">>}, - {'recv_msg.dropped.expired', integer, - <<"Number of dropped PUBLISH packets due to expired">>}, + <<"Number of dropped PUBLISH messages">>}, + {'recv_msg.dropped.await_pubrel_timeout', integer, + <<"Number of dropped PUBLISH messages due to waiting PUBREL timeout">>}, {recv_oct, integer, <<"Number of bytes received by EMQ X Broker (the same below)">>}, {recv_pkt, integer, @@ -165,21 +165,21 @@ properties(client) -> {send_cnt, integer, <<"Number of TCP packets sent">>}, {send_msg, integer, - <<"Number of PUBLISH packets sent">>}, + <<"Number of PUBLISH messages sent">>}, {'send_msg.qos0', integer, - <<"Number of PUBLISH QoS0 packets sent">>}, + <<"Number of PUBLISH QoS0 messages sent">>}, {'send_msg.qos1', integer, - <<"Number of PUBLISH QoS1 packets sent">>}, + <<"Number of PUBLISH QoS1 messages sent">>}, {'send_msg.qos2', integer, - <<"Number of PUBLISH QoS2 packets sent">>}, + <<"Number of PUBLISH QoS2 messages sent">>}, {'send_msg.dropped', integer, - <<"Number of dropped PUBLISH packets">>}, + <<"Number of dropped PUBLISH messages">>}, {'send_msg.dropped.expired', integer, - <<"Number of dropped PUBLISH packets due to expired">>}, + <<"Number of dropped PUBLISH messages due to expired">>}, {'send_msg.dropped.queue_full', integer, - <<"Number of dropped PUBLISH packets due to queue full">>}, + <<"Number of dropped PUBLISH messages due to queue full">>}, {'send_msg.dropped.too_large', integer, - <<"Number of dropped PUBLISH packets due to packet length too large">>}, + <<"Number of dropped PUBLISH messages due to packet length too large">>}, {send_oct, integer, <<"Number of bytes sent">>}, {send_pkt, integer, diff --git a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl index 5ebde4d11..4a58942ad 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl @@ -69,7 +69,7 @@ properties() -> {'messages.delayed', integer, <<"Number of delay- published messages stored by EMQ X Broker">>}, {'messages.delivered', integer, <<"Number of messages forwarded to the subscription process internally by EMQ X Broker">>}, {'messages.dropped', integer, <<"Total number of messages dropped by EMQ X Broker before forwarding to the subscription process">>}, - {'messages.dropped.expired', integer, <<"Number of messages dropped due to message expiration when receiving">>}, + {'messages.dropped.await_pubrel_timeout', integer, <<"Number of messages dropped due to waiting PUBREL timeout">>}, {'messages.dropped.no_subscribers', integer, <<"Number of messages dropped due to no subscribers">>}, {'messages.forward', integer, <<"Number of messages forwarded to other nodes">>}, {'messages.publish', integer, <<"Number of messages published in addition to system messages">>}, diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 3d4971f69..81d957591 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -403,7 +403,7 @@ emqx_collect(emqx_messages_dropped, Metrics) -> counter_metric(?C('messages.dropped', Metrics)); emqx_collect(emqx_messages_dropped_expired, Metrics) -> - counter_metric(?C('messages.dropped.expired', Metrics)); + counter_metric(?C('messages.dropped.await_pubrel_timeout', Metrics)); emqx_collect(emqx_messages_dropped_no_subscribers, Metrics) -> counter_metric(?C('messages.dropped.no_subscribers', Metrics)); diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 1dd64582e..a0a33470b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -39,6 +39,7 @@ , on_message_dropped/4 , on_message_delivered/3 , on_message_acked/3 + , on_delivery_dropped/4 , on_bridge_message_received/2 ]). @@ -63,6 +64,7 @@ event_names() -> , 'message.delivered' , 'message.acked' , 'message.dropped' + , 'delivery.dropped' ]. reload() -> @@ -153,6 +155,15 @@ on_message_acked(ClientInfo, Message, Env) -> end, {ok, Message}. +on_delivery_dropped(ClientInfo, Message, Reason, Env) -> + case ignore_sys_message(Message) of + true -> ok; + false -> + apply_event('delivery.dropped', + fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end, Env) + end, + {ok, Message}. + %%-------------------------------------------------------------------- %% Event Messages %%-------------------------------------------------------------------- @@ -311,6 +322,32 @@ eventmsg_acked(_ClientInfo = #{ publish_received_at => Timestamp }). +eventmsg_delivery_dropped(_ClientInfo = #{ + peerhost := PeerHost, + clientid := ReceiverCId, + username := ReceiverUsername + }, + Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, + headers = Headers, payload = Payload, timestamp = Timestamp}, + Reason) -> + with_basic_columns('delivery.dropped', + #{id => emqx_guid:to_hexstr(Id), + reason => Reason, + from_clientid => ClientId, + from_username => emqx_message:get_header(username, Message, undefined), + clientid => ReceiverCId, + username => ReceiverUsername, + payload => Payload, + peerhost => ntoa(PeerHost), + topic => Topic, + qos => QoS, + flags => Flags, + %% the column 'headers' will be removed in the next major release + headers => printable_maps(Headers), + pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), + publish_received_at => Timestamp + }). + sub_unsub_prop_key('session.subscribed') -> sub_props; sub_unsub_prop_key('session.unsubscribed') -> unsub_props. @@ -345,6 +382,7 @@ event_info() -> , event_info_client_disconnected() , event_info_session_subscribed() , event_info_session_unsubscribed() + , event_info_delivery_dropped() ]. event_info_message_publish() -> @@ -371,10 +409,18 @@ event_info_message_acked() -> event_info_message_dropped() -> event_info_common( 'message.dropped', - {<<"message dropped">>, <<"消息丢弃"/utf8>>}, - {<<"message dropped">>, <<"消息丢弃"/utf8>>}, + {<<"message routing-drop">>, <<"消息转发丢弃"/utf8>>}, + {<<"messages are discarded during routing, usually because there are no subscribers">>, <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>}, <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">> ). +event_info_delivery_dropped() -> + event_info_common( + 'delivery.dropped', + {<<"message delivery-drop">>, <<"消息投递丢弃"/utf8>>}, + {<<"messages are discarded during delivery, i.e. because the message queue is full">>, + <<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>}, + <<"SELECT * FROM \"$events/delivery_dropped\" WHERE topic =~ 't/#'">> + ). event_info_client_connected() -> event_info_common( 'client.connected', @@ -414,7 +460,8 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> }. test_columns('message.dropped') -> - test_columns('message.publish'); + [ {<<"reason">>, <<"no_subscribers">>} + ] ++ test_columns('message.publish'); test_columns('message.publish') -> [ {<<"clientid">>, <<"c_emqx">>} , {<<"username">>, <<"u_emqx">>} @@ -422,6 +469,9 @@ test_columns('message.publish') -> , {<<"qos">>, 1} , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} ]; +test_columns('delivery.dropped') -> + [ {<<"reason">>, <<"queue_full">>} + ] ++ test_columns('message.delivered'); test_columns('message.acked') -> test_columns('message.delivered'); test_columns('message.delivered') -> @@ -486,6 +536,23 @@ columns_with_exam('message.dropped') -> , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; +columns_with_exam('delivery.dropped') -> + [ {<<"event">>, 'delivery.dropped'} + , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} + , {<<"reason">>, queue_full} + , {<<"from_clientid">>, <<"c_emqx_1">>} + , {<<"from_username">>, <<"u_emqx_1">>} + , {<<"clientid">>, <<"c_emqx_2">>} + , {<<"username">>, <<"u_emqx_2">>} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"flags">>, #{}} + , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; columns_with_exam('client.connected') -> [ {<<"event">>, 'client.connected'} , {<<"clientid">>, <<"c_emqx">>} @@ -578,6 +645,7 @@ event_name(<<"$events/session_unsubscribed", _/binary>>) -> event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered'; event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked'; event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'; +event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped'; event_name(_) -> 'message.publish'. event_topic('client.connected') -> <<"$events/client_connected">>; @@ -587,6 +655,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.dropped') -> <<"$events/message_dropped">>; +event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; event_topic('message.publish') -> <<"$events/message_publish">>. printable_maps(undefined) -> #{}; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 4b8afea40..8f0bc0beb 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -144,6 +144,7 @@ init_per_testcase(t_events, Config) -> "\"$events/message_acked\", " "\"$events/message_delivered\", " "\"$events/message_dropped\", " + "\"$events/delivery_dropped\", " "\"t1\"", {ok, Rule} = emqx_rule_engine:create_rule( #{id => <<"rule:t_events">>, @@ -322,18 +323,20 @@ t_events(_Config) -> ]), ct:pal("====== verify $events/client_connected"), client_connected(Client, Client2), + ct:pal("====== verify $events/message_dropped"), + message_dropped(Client), ct:pal("====== verify $events/session_subscribed"), session_subscribed(Client2), ct:pal("====== verify t1"), message_publish(Client), + ct:pal("====== verify $events/delivery_dropped"), + delivery_dropped(Client), ct:pal("====== verify $events/message_delivered"), message_delivered(Client), ct:pal("====== verify $events/message_acked"), message_acked(Client), ct:pal("====== verify $events/session_unsubscribed"), session_unsubscribed(Client2), - ct:pal("====== verify $events/message_dropped"), - message_dropped(Client), ct:pal("====== verify $events/client_disconnected"), client_disconnected(Client, Client2), ok. @@ -365,6 +368,15 @@ session_unsubscribed(Client2) -> message_delivered(_Client) -> verify_event('message.delivered'), ok. +delivery_dropped(Client) -> + %% subscribe "t1" and then publish to "t1", the message will not be received by itself + %% because we have set the subscribe flag 'nl' = true + {ok, _, _} = emqtt:subscribe(Client, #{}, <<"t1">>, [{nl, true}, {qos, 1}]), + ct:sleep(50), + message_publish(Client), + ct:pal("--- current emqx hooks: ~p", [ets:tab2list(emqx_hooks)]), + verify_event('delivery.dropped'), + ok. message_dropped(Client) -> message_publish(Client), verify_event('message.dropped'), @@ -1490,6 +1502,45 @@ verify_event_fields(SubUnsub, Fields) when SubUnsub == 'session.subscribed' maps:get(PropKey, Fields)), ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000); +verify_event_fields('delivery.dropped', Fields) -> + #{event := 'delivery.dropped', + id := ID, + metadata := #{rule_id := RuleId}, + reason := Reason, + clientid := ClientId, + username := Username, + from_clientid := FromClientId, + from_username := FromUsername, + node := Node, + payload := Payload, + peerhost := PeerHost, + pub_props := Properties, + publish_received_at := EventAt, + qos := QoS, + flags := Flags, + timestamp := Timestamp, + topic := Topic} = Fields, + Now = erlang:system_time(millisecond), + TimestampElapse = Now - Timestamp, + RcvdAtElapse = Now - EventAt, + ?assert(is_binary(ID)), + ?assertEqual(<<"rule:t_events">>, RuleId), + ?assertEqual(no_local, Reason), + ?assertEqual(node(), Node), + ?assertEqual(<<"c_event">>, ClientId), + ?assertEqual(<<"u_event">>, Username), + ?assertEqual(<<"c_event">>, FromClientId), + ?assertEqual(<<"u_event">>, FromUsername), + ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload), + verify_ipaddr(PeerHost), + ?assertEqual(<<"t1">>, Topic), + ?assertEqual(1, QoS), + ?assert(is_map(Flags)), + ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties), + ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000), + ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000), + ?assert(EventAt =< Timestamp); + verify_event_fields('message.dropped', Fields) -> #{id := ID, reason := Reason,