diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 97e40439d..5d85d5990 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -37,6 +37,7 @@ , on_message_dropped/4 , on_message_delivered/3 , on_message_acked/3 + , on_delivery_dropped/4 ]). -export([ event_info/0 @@ -53,6 +54,7 @@ , 'message.delivered' , 'message.acked' , 'message.dropped' + , 'delivery.dropped' ]). -ifdef(TEST). @@ -136,6 +138,14 @@ on_message_acked(ClientInfo, Message, Env) -> fun() -> eventmsg_acked(ClientInfo, Message) end, Env), {ok, Message}. +on_delivery_dropped(_ClientInfo, Message = #message{flags = #{sys := true}}, + _Reason, #{ignore_sys_message := true}) -> + {ok, Message}; +on_delivery_dropped(ClientInfo, Message, Reason, Env) -> + may_publish_and_apply('delivery.dropped', + fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end, Env), + {ok, Message}. + %%-------------------------------------------------------------------- %% Event Messages %%-------------------------------------------------------------------- @@ -242,6 +252,32 @@ eventmsg_dropped(Message = #message{id = Id, from = ClientId, qos = QoS, flags = publish_received_at => Timestamp }). +eventmsg_delivery_dropped(_ClientInfo = #{ + peerhost := PeerHost, + clientid := ReceiverCId, + username := ReceiverUsername + }, + Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, + headers = Headers, payload = Payload, timestamp = Timestamp}, + Reason) -> + with_basic_columns('delivery.dropped', + #{id => emqx_guid:to_hexstr(Id), + reason => Reason, + from_clientid => ClientId, + from_username => emqx_message:get_header(username, Message, undefined), + clientid => ReceiverCId, + username => ReceiverUsername, + payload => Payload, + peerhost => ntoa(PeerHost), + topic => Topic, + qos => QoS, + flags => Flags, + %% the column 'headers' will be removed in the next major release + headers => printable_maps(Headers), + pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), + publish_received_at => Timestamp + }). + eventmsg_delivered(_ClientInfo = #{ peerhost := PeerHost, clientid := ReceiverCId, @@ -333,6 +369,7 @@ event_info() -> , event_info_message_deliver() , event_info_message_acked() , event_info_message_dropped() + , event_info_delivery_dropped() , event_info_client_connected() , event_info_client_disconnected() , event_info_session_subscribed() @@ -363,10 +400,19 @@ event_info_message_acked() -> event_info_message_dropped() -> event_info_common( 'message.dropped', - {<<"message dropped">>, <<"消息丢弃"/utf8>>}, - {<<"message dropped">>, <<"消息丢弃"/utf8>>}, + {<<"message routing-drop">>, <<"消息转发丢弃"/utf8>>}, + {<<"messages are discarded during forwarding, usually because there are no subscribers">>, + <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>}, <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">> ). +event_info_delivery_dropped() -> + event_info_common( + 'delivery.dropped', + {<<"message delivery-drop">>, <<"消息投递丢弃"/utf8>>}, + {<<"messages are discarded during delivery, i.e. because the message queue is full">>, + <<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>}, + <<"SELECT * FROM \"$events/delivery_dropped\" WHERE topic =~ 't/#'">> + ). event_info_client_connected() -> event_info_common( 'client.connected', @@ -506,6 +552,23 @@ columns_with_exam('message.dropped') -> , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; +columns_with_exam('delivery.dropped') -> + [ {<<"event">>, 'delivery.dropped'} + , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} + , {<<"reason">>, queue_full} + , {<<"from_clientid">>, <<"c_emqx_1">>} + , {<<"from_username">>, <<"u_emqx_1">>} + , {<<"clientid">>, <<"c_emqx_2">>} + , {<<"username">>, <<"u_emqx_2">>} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"flags">>, #{}} + , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; columns_with_exam('client.connected') -> [ {<<"event">>, 'client.connected'} , {<<"clientid">>, <<"c_emqx">>} diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 346098add..154516711 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -67,22 +67,22 @@ ]). -export([ publish/3 - , puback/2 + , puback/3 , pubrec/2 , pubrel/2 - , pubcomp/2 + , pubcomp/3 ]). --export([ deliver/2 - , enqueue/2 - , dequeue/1 - , retry/1 +-export([ deliver/3 + , enqueue/3 + , dequeue/2 + , retry/2 , terminate/3 ]). -export([ takeover/1 , resume/2 - , replay/1 + , replay/2 ]). -export([expire/2]). @@ -312,15 +312,15 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, %% Client -> Broker: PUBACK %%-------------------------------------------------------------------- --spec(puback(emqx_types:packet_id(), session()) +-spec(puback(emqx_types:clientinfo(), emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} | {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}). -puback(PacketId, Session = #session{inflight = Inflight}) -> +puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), - return_with(Msg, dequeue(Session#session{inflight = Inflight1})); + return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1})); {value, {_Pubrel, _Ts}} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -369,14 +369,14 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> %% Client -> Broker: PUBCOMP %%-------------------------------------------------------------------- --spec(pubcomp(emqx_types:packet_id(), session()) +-spec(pubcomp(emqx_types:clientinfo(), emqx_types:packet_id(), session()) -> {ok, session()} | {ok, replies(), session()} | {error, emqx_types:reason_code()}). -pubcomp(PacketId, Session = #session{inflight = Inflight}) -> +pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {pubrel, _Ts}} -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), - dequeue(Session#session{inflight = Inflight1}); + dequeue(ClientInfo, Session#session{inflight = Inflight1}); {value, _Other} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -387,27 +387,27 @@ pubcomp(PacketId, Session = #session{inflight = Inflight}) -> %% Dequeue Msgs %%-------------------------------------------------------------------- -dequeue(Session = #session{inflight = Inflight, mqueue = Q}) -> +dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) -> case emqx_mqueue:is_empty(Q) of true -> {ok, Session}; false -> - {Msgs, Q1} = dequeue(batch_n(Inflight), [], Q), + {Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q), deliver(Msgs, [], Session#session{mqueue = Q1}) end. -dequeue(0, Msgs, Q) -> +dequeue(_ClientInfo, 0, Msgs, Q) -> {lists:reverse(Msgs), Q}; -dequeue(Cnt, Msgs, Q) -> +dequeue(ClientInfo, Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of - {empty, _Q} -> dequeue(0, Msgs, Q); + {empty, _Q} -> dequeue(ClientInfo, 0, Msgs, Q); {{value, Msg}, Q1} -> case emqx_message:is_expired(Msg) of true -> - ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, expired]), + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), ok = inc_delivery_expired_cnt(), - dequeue(Cnt, Msgs, Q1); - false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1) + dequeue(ClientInfo, Cnt, Msgs, Q1); + false -> dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg|Msgs], Q1) end end. @@ -419,38 +419,38 @@ acc_cnt(_Msg, Cnt) -> Cnt - 1. %% Broker -> Client: Deliver %%-------------------------------------------------------------------- --spec(deliver(list(emqx_types:deliver()), session()) +-spec(deliver(emqx_types:clientinfo(), list(emqx_types:deliver()), session()) -> {ok, session()} | {ok, replies(), session()}). -deliver([Deliver], Session) -> %% Optimize +deliver(ClientInfo, [Deliver], Session) -> %% Optimize Enrich = enrich_fun(Session), - deliver_msg(Enrich(Deliver), Session); + deliver_msg(ClientInfo, Enrich(Deliver), Session); -deliver(Delivers, Session) -> +deliver(ClientInfo, Delivers, Session) -> Msgs = lists:map(enrich_fun(Session), Delivers), - deliver(Msgs, [], Session). + deliver(ClientInfo, Msgs, [], Session). -deliver([], Publishes, Session) -> +deliver(_ClientInfo, [], Publishes, Session) -> {ok, lists:reverse(Publishes), Session}; -deliver([Msg | More], Acc, Session) -> - case deliver_msg(Msg, Session) of +deliver(ClientInfo, [Msg | More], Acc, Session) -> + case deliver_msg(ClientInfo, Msg, Session) of {ok, Session1} -> deliver(More, Acc, Session1); {ok, [Publish], Session1} -> deliver(More, [Publish|Acc], Session1) end. -deliver_msg(Msg = #message{qos = ?QOS_0}, Session) -> +deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) -> {ok, [{undefined, maybe_ack(Msg)}], Session}; -deliver_msg(Msg = #message{qos = QoS}, Session = +deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = #session{next_pkt_id = PacketId, inflight = Inflight}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case emqx_inflight:is_full(Inflight) of true -> Session1 = case maybe_nack(Msg) of true -> Session; - false -> enqueue(Msg, Session) + false -> enqueue(ClientInfo, Msg, Session) end, {ok, Session1}; false -> @@ -459,27 +459,29 @@ deliver_msg(Msg = #message{qos = QoS}, Session = {ok, [Publish], next_pkt_id(Session1)} end. --spec(enqueue(list(emqx_types:deliver())|emqx_types:message(), +-spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver())|emqx_types:message(), session()) -> session()). -enqueue([Deliver], Session) -> %% Optimize - Enrich = enrich_fun(Session), - enqueue(Enrich(Deliver), Session); - -enqueue(Delivers, Session) when is_list(Delivers) -> +enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) -> Msgs = lists:map(enrich_fun(Session), Delivers), - lists:foldl(fun enqueue/2, Session, Msgs); + lists:foldl(fun(Msg, Session0) -> + enqueue(ClientInfo, Msg, Session0) + end, Session, Msgs); -enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> +enqueue(ClientInfo, #message{} = Msg, Session = #session{mqueue = Q}) -> {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), - (Dropped =/= undefined) andalso log_dropped(Dropped, Session), + (Dropped =/= undefined) andalso log_dropped(ClientInfo, Dropped, Session), Session#session{mqueue = NewQ}. -log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) -> +log_dropped(ClientInfo, Msg = #message{qos = QoS}, #session{mqueue = Q}) -> case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of true -> + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]), + ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), ?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]); false -> + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]), + ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.queue_full'), ?LOG(warning, "Dropped msg due to mqueue is full: ~s", [emqx_message:format(Msg)]) @@ -543,31 +545,32 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) -> %% Retry Delivery %%-------------------------------------------------------------------- --spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). -retry(Session = #session{inflight = Inflight}) -> +-spec(retry(emqx_types:clientinfo(), session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). +retry(ClientInfo, Session = #session{inflight = Inflight}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session}; false -> retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), - [], erlang:system_time(millisecond), Session) + [], erlang:system_time(millisecond), Session, ClientInfo) end. -retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) -> +retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) -> {ok, lists:reverse(Acc), Interval, Session}; retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session = - #session{retry_interval = Interval, inflight = Inflight}) -> + #session{retry_interval = Interval, inflight = Inflight}, ClientInfo) -> case (Age = age(Now, Ts)) >= Interval of true -> - {Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight), - retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}); + {Acc1, Inflight1} = do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo), + retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo); false -> {ok, lists:reverse(Acc), Interval - max(0, Age), Session} end. -retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -> +do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo) + when is_record(Msg, message) -> case emqx_message:is_expired(Msg) of true -> - ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, expired]), + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), ok = inc_delivery_expired_cnt(), {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> @@ -576,7 +579,7 @@ retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) - {[{PacketId, Msg1}|Acc], Inflight1} end; -retry_delivery(PacketId, pubrel, Now, Acc, Inflight) -> +do_retry_delivery(PacketId, pubrel, Now, Acc, Inflight, _) -> Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight), {[{pubrel, PacketId}|Acc], Inflight1}. @@ -619,14 +622,14 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = ok = emqx_metrics:inc('session.resumed'), emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]). --spec(replay(session()) -> {ok, replies(), session()}). -replay(Session = #session{inflight = Inflight}) -> +-spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}). +replay(ClientInfo, Session = #session{inflight = Inflight}) -> Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) -> {pubrel, PacketId}; ({PacketId, {Msg, _Ts}}) -> {PacketId, emqx_message:set_flag(dup, true, Msg)} end, emqx_inflight:to_list(Inflight)), - case dequeue(Session) of + case dequeue(ClientInfo, Session) of {ok, NSession} -> {ok, Pubs, NSession}; {ok, More, NSession} -> {ok, lists:append(Pubs, More), NSession}