feat(rule): add new event 'delivery_dropped'

This commit is contained in:
Shawn 2022-01-12 20:10:09 +08:00
parent 9434c0fa6c
commit 217acc0154
2 changed files with 123 additions and 57 deletions

View File

@ -37,6 +37,7 @@
, on_message_dropped/4
, on_message_delivered/3
, on_message_acked/3
, on_delivery_dropped/4
]).
-export([ event_info/0
@ -53,6 +54,7 @@
, 'message.delivered'
, 'message.acked'
, 'message.dropped'
, 'delivery.dropped'
]).
-ifdef(TEST).
@ -136,6 +138,14 @@ on_message_acked(ClientInfo, Message, Env) ->
fun() -> eventmsg_acked(ClientInfo, Message) end, Env),
{ok, Message}.
on_delivery_dropped(_ClientInfo, Message = #message{flags = #{sys := true}},
_Reason, #{ignore_sys_message := true}) ->
{ok, Message};
on_delivery_dropped(ClientInfo, Message, Reason, Env) ->
may_publish_and_apply('delivery.dropped',
fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end, Env),
{ok, Message}.
%%--------------------------------------------------------------------
%% Event Messages
%%--------------------------------------------------------------------
@ -242,6 +252,32 @@ eventmsg_dropped(Message = #message{id = Id, from = ClientId, qos = QoS, flags =
publish_received_at => Timestamp
}).
eventmsg_delivery_dropped(_ClientInfo = #{
peerhost := PeerHost,
clientid := ReceiverCId,
username := ReceiverUsername
},
Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic,
headers = Headers, payload = Payload, timestamp = Timestamp},
Reason) ->
with_basic_columns('delivery.dropped',
#{id => emqx_guid:to_hexstr(Id),
reason => Reason,
from_clientid => ClientId,
from_username => emqx_message:get_header(username, Message, undefined),
clientid => ReceiverCId,
username => ReceiverUsername,
payload => Payload,
peerhost => ntoa(PeerHost),
topic => Topic,
qos => QoS,
flags => Flags,
%% the column 'headers' will be removed in the next major release
headers => printable_maps(Headers),
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
}).
eventmsg_delivered(_ClientInfo = #{
peerhost := PeerHost,
clientid := ReceiverCId,
@ -333,6 +369,7 @@ event_info() ->
, event_info_message_deliver()
, event_info_message_acked()
, event_info_message_dropped()
, event_info_delivery_dropped()
, event_info_client_connected()
, event_info_client_disconnected()
, event_info_session_subscribed()
@ -363,10 +400,19 @@ event_info_message_acked() ->
event_info_message_dropped() ->
event_info_common(
'message.dropped',
{<<"message dropped">>, <<"消息丢弃"/utf8>>},
{<<"message dropped">>, <<"消息丢弃"/utf8>>},
{<<"message routing-drop">>, <<"消息转发丢弃"/utf8>>},
{<<"messages are discarded during forwarding, usually because there are no subscribers">>,
<<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>},
<<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">>
).
event_info_delivery_dropped() ->
event_info_common(
'delivery.dropped',
{<<"message delivery-drop">>, <<"消息投递丢弃"/utf8>>},
{<<"messages are discarded during delivery, i.e. because the message queue is full">>,
<<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>},
<<"SELECT * FROM \"$events/delivery_dropped\" WHERE topic =~ 't/#'">>
).
event_info_client_connected() ->
event_info_common(
'client.connected',
@ -506,6 +552,23 @@ columns_with_exam('message.dropped') ->
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('delivery.dropped') ->
[ {<<"event">>, 'delivery.dropped'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"reason">>, queue_full}
, {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('client.connected') ->
[ {<<"event">>, 'client.connected'}
, {<<"clientid">>, <<"c_emqx">>}

View File

@ -67,22 +67,22 @@
]).
-export([ publish/3
, puback/2
, puback/3
, pubrec/2
, pubrel/2
, pubcomp/2
, pubcomp/3
]).
-export([ deliver/2
, enqueue/2
, dequeue/1
, retry/1
-export([ deliver/3
, enqueue/3
, dequeue/2
, retry/2
, terminate/3
]).
-export([ takeover/1
, resume/2
, replay/1
, replay/2
]).
-export([expire/2]).
@ -312,15 +312,15 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
%% Client -> Broker: PUBACK
%%--------------------------------------------------------------------
-spec(puback(emqx_types:packet_id(), session())
-spec(puback(emqx_types:clientinfo(), emqx_types:packet_id(), session())
-> {ok, emqx_types:message(), session()}
| {ok, emqx_types:message(), replies(), session()}
| {error, emqx_types:reason_code()}).
puback(PacketId, Session = #session{inflight = Inflight}) ->
puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
return_with(Msg, dequeue(Session#session{inflight = Inflight1}));
return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1}));
{value, {_Pubrel, _Ts}} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
@ -369,14 +369,14 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
%% Client -> Broker: PUBCOMP
%%--------------------------------------------------------------------
-spec(pubcomp(emqx_types:packet_id(), session())
-spec(pubcomp(emqx_types:clientinfo(), emqx_types:packet_id(), session())
-> {ok, session()} | {ok, replies(), session()}
| {error, emqx_types:reason_code()}).
pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {pubrel, _Ts}} ->
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
dequeue(Session#session{inflight = Inflight1});
dequeue(ClientInfo, Session#session{inflight = Inflight1});
{value, _Other} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
@ -387,27 +387,27 @@ pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
%% Dequeue Msgs
%%--------------------------------------------------------------------
dequeue(Session = #session{inflight = Inflight, mqueue = Q}) ->
dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) ->
case emqx_mqueue:is_empty(Q) of
true -> {ok, Session};
false ->
{Msgs, Q1} = dequeue(batch_n(Inflight), [], Q),
{Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q),
deliver(Msgs, [], Session#session{mqueue = Q1})
end.
dequeue(0, Msgs, Q) ->
dequeue(_ClientInfo, 0, Msgs, Q) ->
{lists:reverse(Msgs), Q};
dequeue(Cnt, Msgs, Q) ->
dequeue(ClientInfo, Cnt, Msgs, Q) ->
case emqx_mqueue:out(Q) of
{empty, _Q} -> dequeue(0, Msgs, Q);
{empty, _Q} -> dequeue(ClientInfo, 0, Msgs, Q);
{{value, Msg}, Q1} ->
case emqx_message:is_expired(Msg) of
true ->
ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, expired]),
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
ok = inc_delivery_expired_cnt(),
dequeue(Cnt, Msgs, Q1);
false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
dequeue(ClientInfo, Cnt, Msgs, Q1);
false -> dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
end
end.
@ -419,38 +419,38 @@ acc_cnt(_Msg, Cnt) -> Cnt - 1.
%% Broker -> Client: Deliver
%%--------------------------------------------------------------------
-spec(deliver(list(emqx_types:deliver()), session())
-spec(deliver(emqx_types:clientinfo(), list(emqx_types:deliver()), session())
-> {ok, session()} | {ok, replies(), session()}).
deliver([Deliver], Session) -> %% Optimize
deliver(ClientInfo, [Deliver], Session) -> %% Optimize
Enrich = enrich_fun(Session),
deliver_msg(Enrich(Deliver), Session);
deliver_msg(ClientInfo, Enrich(Deliver), Session);
deliver(Delivers, Session) ->
deliver(ClientInfo, Delivers, Session) ->
Msgs = lists:map(enrich_fun(Session), Delivers),
deliver(Msgs, [], Session).
deliver(ClientInfo, Msgs, [], Session).
deliver([], Publishes, Session) ->
deliver(_ClientInfo, [], Publishes, Session) ->
{ok, lists:reverse(Publishes), Session};
deliver([Msg | More], Acc, Session) ->
case deliver_msg(Msg, Session) of
deliver(ClientInfo, [Msg | More], Acc, Session) ->
case deliver_msg(ClientInfo, Msg, Session) of
{ok, Session1} ->
deliver(More, Acc, Session1);
{ok, [Publish], Session1} ->
deliver(More, [Publish|Acc], Session1)
end.
deliver_msg(Msg = #message{qos = ?QOS_0}, Session) ->
deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
{ok, [{undefined, maybe_ack(Msg)}], Session};
deliver_msg(Msg = #message{qos = QoS}, Session =
deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
#session{next_pkt_id = PacketId, inflight = Inflight})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
case emqx_inflight:is_full(Inflight) of
true ->
Session1 = case maybe_nack(Msg) of
true -> Session;
false -> enqueue(Msg, Session)
false -> enqueue(ClientInfo, Msg, Session)
end,
{ok, Session1};
false ->
@ -459,27 +459,29 @@ deliver_msg(Msg = #message{qos = QoS}, Session =
{ok, [Publish], next_pkt_id(Session1)}
end.
-spec(enqueue(list(emqx_types:deliver())|emqx_types:message(),
-spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver())|emqx_types:message(),
session()) -> session()).
enqueue([Deliver], Session) -> %% Optimize
Enrich = enrich_fun(Session),
enqueue(Enrich(Deliver), Session);
enqueue(Delivers, Session) when is_list(Delivers) ->
enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) ->
Msgs = lists:map(enrich_fun(Session), Delivers),
lists:foldl(fun enqueue/2, Session, Msgs);
lists:foldl(fun(Msg, Session0) ->
enqueue(ClientInfo, Msg, Session0)
end, Session, Msgs);
enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
enqueue(ClientInfo, #message{} = Msg, Session = #session{mqueue = Q}) ->
{Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
(Dropped =/= undefined) andalso log_dropped(Dropped, Session),
(Dropped =/= undefined) andalso log_dropped(ClientInfo, Dropped, Session),
Session#session{mqueue = NewQ}.
log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
log_dropped(ClientInfo, Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of
true ->
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]),
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]);
false ->
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]),
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
?LOG(warning, "Dropped msg due to mqueue is full: ~s",
[emqx_message:format(Msg)])
@ -543,31 +545,32 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
%% Retry Delivery
%%--------------------------------------------------------------------
-spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
retry(Session = #session{inflight = Inflight}) ->
-spec(retry(emqx_types:clientinfo(), session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
retry(ClientInfo, Session = #session{inflight = Inflight}) ->
case emqx_inflight:is_empty(Inflight) of
true -> {ok, Session};
false -> retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
[], erlang:system_time(millisecond), Session)
[], erlang:system_time(millisecond), Session, ClientInfo)
end.
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) ->
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) ->
{ok, lists:reverse(Acc), Interval, Session};
retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
#session{retry_interval = Interval, inflight = Inflight}) ->
#session{retry_interval = Interval, inflight = Inflight}, ClientInfo) ->
case (Age = age(Now, Ts)) >= Interval of
true ->
{Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight),
retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1});
{Acc1, Inflight1} = do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo),
retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo);
false ->
{ok, lists:reverse(Acc), Interval - max(0, Age), Session}
end.
retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo)
when is_record(Msg, message) ->
case emqx_message:is_expired(Msg) of
true ->
ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, expired]),
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
ok = inc_delivery_expired_cnt(),
{Acc, emqx_inflight:delete(PacketId, Inflight)};
false ->
@ -576,7 +579,7 @@ retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -
{[{PacketId, Msg1}|Acc], Inflight1}
end;
retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
do_retry_delivery(PacketId, pubrel, Now, Acc, Inflight, _) ->
Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight),
{[{pubrel, PacketId}|Acc], Inflight1}.
@ -619,14 +622,14 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
ok = emqx_metrics:inc('session.resumed'),
emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]).
-spec(replay(session()) -> {ok, replies(), session()}).
replay(Session = #session{inflight = Inflight}) ->
-spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}).
replay(ClientInfo, Session = #session{inflight = Inflight}) ->
Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) ->
{pubrel, PacketId};
({PacketId, {Msg, _Ts}}) ->
{PacketId, emqx_message:set_flag(dup, true, Msg)}
end, emqx_inflight:to_list(Inflight)),
case dequeue(Session) of
case dequeue(ClientInfo, Session) of
{ok, NSession} -> {ok, Pubs, NSession};
{ok, More, NSession} ->
{ok, lists:append(Pubs, More), NSession}