From 9434c0fa6cd219371ebb79a4edceea05ed8f1189 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 12 Jan 2022 18:34:20 +0800 Subject: [PATCH 01/14] feat(hook): new emqx hook 'delivery.dropped' --- src/emqx_channel.erl | 3 ++- src/emqx_metrics.erl | 5 +++-- src/emqx_session.erl | 25 +++++++++++++------------ 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 2774ac264..c8aefb4aa 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -750,9 +750,10 @@ handle_deliver(Delivers, Channel = #channel{session = Session, ignore_local(Delivers, Subscriber, Session) -> Subs = emqx_session:info(subscriptions, Session), - lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) -> + lists:dropwhile(fun({deliver, Topic, #message{from = Publisher} = Msg}) -> case maps:find(Topic, Subs) of {ok, #{nl := 1}} when Subscriber =:= Publisher -> + ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, no_local]), ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.no_local'), true; diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index c3ce14d83..6cd4d05b1 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -146,7 +146,7 @@ %% PubSub Metrics {counter, 'messages.publish'}, % Messages Publish {counter, 'messages.dropped'}, % Messages dropped due to no subscribers - {counter, 'messages.dropped.expired'}, % QoS2 Messages expired + {counter, 'messages.dropped.await_pubrel_timeout'}, % QoS2 await PUBREL timeout {counter, 'messages.dropped.no_subscribers'}, % Messages dropped {counter, 'messages.forward'}, % Messages forward {counter, 'messages.retained'}, % Messages retained @@ -542,7 +542,8 @@ reserved_idx('messages.qos2.received') -> 106; reserved_idx('messages.qos2.sent') -> 107; reserved_idx('messages.publish') -> 108; reserved_idx('messages.dropped') -> 109; -reserved_idx('messages.dropped.expired') -> 110; +reserved_idx('messages.dropped.expired') -> 110; %% To be removed in 5.0 +reserved_idx('messages.dropped.await_pubrel_timeout') -> 110; reserved_idx('messages.dropped.no_subscribers') -> 111; reserved_idx('messages.forward') -> 112; reserved_idx('messages.retained') -> 113; diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 9463345d4..346098add 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -403,8 +403,10 @@ dequeue(Cnt, Msgs, Q) -> {empty, _Q} -> dequeue(0, Msgs, Q); {{value, Msg}, Q1} -> case emqx_message:is_expired(Msg) of - true -> ok = inc_expired_cnt(delivery), - dequeue(Cnt, Msgs, Q1); + true -> + ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, expired]), + ok = inc_delivery_expired_cnt(), + dequeue(Cnt, Msgs, Q1); false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1) end end. @@ -565,7 +567,8 @@ retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session = retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -> case emqx_message:is_expired(Msg) of true -> - ok = inc_expired_cnt(delivery), + ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, expired]), + ok = inc_delivery_expired_cnt(), {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> Msg1 = emqx_message:set_flag(dup, true, Msg), @@ -593,7 +596,7 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel, NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end, AwaitingRel1 = maps:filter(NotExpired, AwaitingRel), ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1), - (ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt), + (ExpiredCnt > 0) andalso inc_await_pubrel_timeout(ExpiredCnt), NSession = Session#session{awaiting_rel = AwaitingRel1}, case maps:size(AwaitingRel1) of 0 -> {ok, NSession}; @@ -644,18 +647,16 @@ run_hook(Name, Args) -> %%-------------------------------------------------------------------- %% Inc message/delivery expired counter %%-------------------------------------------------------------------- +inc_delivery_expired_cnt() -> + inc_delivery_expired_cnt(1). --compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}). - -inc_expired_cnt(K) -> inc_expired_cnt(K, 1). - -inc_expired_cnt(delivery, N) -> +inc_delivery_expired_cnt(N) -> ok = emqx_metrics:inc('delivery.dropped', N), - emqx_metrics:inc('delivery.dropped.expired', N); + emqx_metrics:inc('delivery.dropped.expired', N). -inc_expired_cnt(message, N) -> +inc_await_pubrel_timeout(N) -> ok = emqx_metrics:inc('messages.dropped', N), - emqx_metrics:inc('messages.dropped.expired', N). + emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N). %%-------------------------------------------------------------------- %% Next Packet Id From 6de114f82234c634db78d80d52143784537d95eb Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 12 Jan 2022 18:49:01 +0800 Subject: [PATCH 02/14] chore: rename CHANGES.md to CHANGES-4.3.md --- CHANGES.md => CHANGES-4.3.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename CHANGES.md => CHANGES-4.3.md (100%) diff --git a/CHANGES.md b/CHANGES-4.3.md similarity index 100% rename from CHANGES.md rename to CHANGES-4.3.md From d7b0e753da286361ebad6e07a48aea8fb6007176 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 12 Jan 2022 11:57:02 +0100 Subject: [PATCH 03/14] ci: skip dialyzer race_condition checks backported from 5.0 race_condition check is very RAM demanding --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 25bd95089..fb60981b2 100644 --- a/rebar.config +++ b/rebar.config @@ -17,7 +17,7 @@ deprecated_function_calls,warnings_as_errors,deprecated_functions]}. {dialyzer, [ - {warnings, [unmatched_returns, error_handling, race_conditions]}, + {warnings, [unmatched_returns, error_handling]}, {plt_location, "."}, {plt_prefix, "emqx_dialyzer"}, {plt_apps, all_apps}, From 217acc0154f6fe282dd4ee0d1fee37329ae3667d Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 12 Jan 2022 20:10:09 +0800 Subject: [PATCH 04/14] feat(rule): add new event 'delivery_dropped' --- .../emqx_rule_engine/src/emqx_rule_events.erl | 67 ++++++++++- src/emqx_session.erl | 113 +++++++++--------- 2 files changed, 123 insertions(+), 57 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 97e40439d..5d85d5990 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -37,6 +37,7 @@ , on_message_dropped/4 , on_message_delivered/3 , on_message_acked/3 + , on_delivery_dropped/4 ]). -export([ event_info/0 @@ -53,6 +54,7 @@ , 'message.delivered' , 'message.acked' , 'message.dropped' + , 'delivery.dropped' ]). -ifdef(TEST). @@ -136,6 +138,14 @@ on_message_acked(ClientInfo, Message, Env) -> fun() -> eventmsg_acked(ClientInfo, Message) end, Env), {ok, Message}. +on_delivery_dropped(_ClientInfo, Message = #message{flags = #{sys := true}}, + _Reason, #{ignore_sys_message := true}) -> + {ok, Message}; +on_delivery_dropped(ClientInfo, Message, Reason, Env) -> + may_publish_and_apply('delivery.dropped', + fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end, Env), + {ok, Message}. + %%-------------------------------------------------------------------- %% Event Messages %%-------------------------------------------------------------------- @@ -242,6 +252,32 @@ eventmsg_dropped(Message = #message{id = Id, from = ClientId, qos = QoS, flags = publish_received_at => Timestamp }). +eventmsg_delivery_dropped(_ClientInfo = #{ + peerhost := PeerHost, + clientid := ReceiverCId, + username := ReceiverUsername + }, + Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, + headers = Headers, payload = Payload, timestamp = Timestamp}, + Reason) -> + with_basic_columns('delivery.dropped', + #{id => emqx_guid:to_hexstr(Id), + reason => Reason, + from_clientid => ClientId, + from_username => emqx_message:get_header(username, Message, undefined), + clientid => ReceiverCId, + username => ReceiverUsername, + payload => Payload, + peerhost => ntoa(PeerHost), + topic => Topic, + qos => QoS, + flags => Flags, + %% the column 'headers' will be removed in the next major release + headers => printable_maps(Headers), + pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), + publish_received_at => Timestamp + }). + eventmsg_delivered(_ClientInfo = #{ peerhost := PeerHost, clientid := ReceiverCId, @@ -333,6 +369,7 @@ event_info() -> , event_info_message_deliver() , event_info_message_acked() , event_info_message_dropped() + , event_info_delivery_dropped() , event_info_client_connected() , event_info_client_disconnected() , event_info_session_subscribed() @@ -363,10 +400,19 @@ event_info_message_acked() -> event_info_message_dropped() -> event_info_common( 'message.dropped', - {<<"message dropped">>, <<"消息丢弃"/utf8>>}, - {<<"message dropped">>, <<"消息丢弃"/utf8>>}, + {<<"message routing-drop">>, <<"消息转发丢弃"/utf8>>}, + {<<"messages are discarded during forwarding, usually because there are no subscribers">>, + <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>}, <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">> ). +event_info_delivery_dropped() -> + event_info_common( + 'delivery.dropped', + {<<"message delivery-drop">>, <<"消息投递丢弃"/utf8>>}, + {<<"messages are discarded during delivery, i.e. because the message queue is full">>, + <<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>}, + <<"SELECT * FROM \"$events/delivery_dropped\" WHERE topic =~ 't/#'">> + ). event_info_client_connected() -> event_info_common( 'client.connected', @@ -506,6 +552,23 @@ columns_with_exam('message.dropped') -> , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; +columns_with_exam('delivery.dropped') -> + [ {<<"event">>, 'delivery.dropped'} + , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} + , {<<"reason">>, queue_full} + , {<<"from_clientid">>, <<"c_emqx_1">>} + , {<<"from_username">>, <<"u_emqx_1">>} + , {<<"clientid">>, <<"c_emqx_2">>} + , {<<"username">>, <<"u_emqx_2">>} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"flags">>, #{}} + , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; columns_with_exam('client.connected') -> [ {<<"event">>, 'client.connected'} , {<<"clientid">>, <<"c_emqx">>} diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 346098add..154516711 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -67,22 +67,22 @@ ]). -export([ publish/3 - , puback/2 + , puback/3 , pubrec/2 , pubrel/2 - , pubcomp/2 + , pubcomp/3 ]). --export([ deliver/2 - , enqueue/2 - , dequeue/1 - , retry/1 +-export([ deliver/3 + , enqueue/3 + , dequeue/2 + , retry/2 , terminate/3 ]). -export([ takeover/1 , resume/2 - , replay/1 + , replay/2 ]). -export([expire/2]). @@ -312,15 +312,15 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, %% Client -> Broker: PUBACK %%-------------------------------------------------------------------- --spec(puback(emqx_types:packet_id(), session()) +-spec(puback(emqx_types:clientinfo(), emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} | {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}). -puback(PacketId, Session = #session{inflight = Inflight}) -> +puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), - return_with(Msg, dequeue(Session#session{inflight = Inflight1})); + return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1})); {value, {_Pubrel, _Ts}} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -369,14 +369,14 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> %% Client -> Broker: PUBCOMP %%-------------------------------------------------------------------- --spec(pubcomp(emqx_types:packet_id(), session()) +-spec(pubcomp(emqx_types:clientinfo(), emqx_types:packet_id(), session()) -> {ok, session()} | {ok, replies(), session()} | {error, emqx_types:reason_code()}). -pubcomp(PacketId, Session = #session{inflight = Inflight}) -> +pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {pubrel, _Ts}} -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), - dequeue(Session#session{inflight = Inflight1}); + dequeue(ClientInfo, Session#session{inflight = Inflight1}); {value, _Other} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -387,27 +387,27 @@ pubcomp(PacketId, Session = #session{inflight = Inflight}) -> %% Dequeue Msgs %%-------------------------------------------------------------------- -dequeue(Session = #session{inflight = Inflight, mqueue = Q}) -> +dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) -> case emqx_mqueue:is_empty(Q) of true -> {ok, Session}; false -> - {Msgs, Q1} = dequeue(batch_n(Inflight), [], Q), + {Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q), deliver(Msgs, [], Session#session{mqueue = Q1}) end. -dequeue(0, Msgs, Q) -> +dequeue(_ClientInfo, 0, Msgs, Q) -> {lists:reverse(Msgs), Q}; -dequeue(Cnt, Msgs, Q) -> +dequeue(ClientInfo, Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of - {empty, _Q} -> dequeue(0, Msgs, Q); + {empty, _Q} -> dequeue(ClientInfo, 0, Msgs, Q); {{value, Msg}, Q1} -> case emqx_message:is_expired(Msg) of true -> - ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, expired]), + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), ok = inc_delivery_expired_cnt(), - dequeue(Cnt, Msgs, Q1); - false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1) + dequeue(ClientInfo, Cnt, Msgs, Q1); + false -> dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg|Msgs], Q1) end end. @@ -419,38 +419,38 @@ acc_cnt(_Msg, Cnt) -> Cnt - 1. %% Broker -> Client: Deliver %%-------------------------------------------------------------------- --spec(deliver(list(emqx_types:deliver()), session()) +-spec(deliver(emqx_types:clientinfo(), list(emqx_types:deliver()), session()) -> {ok, session()} | {ok, replies(), session()}). -deliver([Deliver], Session) -> %% Optimize +deliver(ClientInfo, [Deliver], Session) -> %% Optimize Enrich = enrich_fun(Session), - deliver_msg(Enrich(Deliver), Session); + deliver_msg(ClientInfo, Enrich(Deliver), Session); -deliver(Delivers, Session) -> +deliver(ClientInfo, Delivers, Session) -> Msgs = lists:map(enrich_fun(Session), Delivers), - deliver(Msgs, [], Session). + deliver(ClientInfo, Msgs, [], Session). -deliver([], Publishes, Session) -> +deliver(_ClientInfo, [], Publishes, Session) -> {ok, lists:reverse(Publishes), Session}; -deliver([Msg | More], Acc, Session) -> - case deliver_msg(Msg, Session) of +deliver(ClientInfo, [Msg | More], Acc, Session) -> + case deliver_msg(ClientInfo, Msg, Session) of {ok, Session1} -> deliver(More, Acc, Session1); {ok, [Publish], Session1} -> deliver(More, [Publish|Acc], Session1) end. -deliver_msg(Msg = #message{qos = ?QOS_0}, Session) -> +deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) -> {ok, [{undefined, maybe_ack(Msg)}], Session}; -deliver_msg(Msg = #message{qos = QoS}, Session = +deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = #session{next_pkt_id = PacketId, inflight = Inflight}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case emqx_inflight:is_full(Inflight) of true -> Session1 = case maybe_nack(Msg) of true -> Session; - false -> enqueue(Msg, Session) + false -> enqueue(ClientInfo, Msg, Session) end, {ok, Session1}; false -> @@ -459,27 +459,29 @@ deliver_msg(Msg = #message{qos = QoS}, Session = {ok, [Publish], next_pkt_id(Session1)} end. --spec(enqueue(list(emqx_types:deliver())|emqx_types:message(), +-spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver())|emqx_types:message(), session()) -> session()). -enqueue([Deliver], Session) -> %% Optimize - Enrich = enrich_fun(Session), - enqueue(Enrich(Deliver), Session); - -enqueue(Delivers, Session) when is_list(Delivers) -> +enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) -> Msgs = lists:map(enrich_fun(Session), Delivers), - lists:foldl(fun enqueue/2, Session, Msgs); + lists:foldl(fun(Msg, Session0) -> + enqueue(ClientInfo, Msg, Session0) + end, Session, Msgs); -enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> +enqueue(ClientInfo, #message{} = Msg, Session = #session{mqueue = Q}) -> {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), - (Dropped =/= undefined) andalso log_dropped(Dropped, Session), + (Dropped =/= undefined) andalso log_dropped(ClientInfo, Dropped, Session), Session#session{mqueue = NewQ}. -log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) -> +log_dropped(ClientInfo, Msg = #message{qos = QoS}, #session{mqueue = Q}) -> case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of true -> + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]), + ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), ?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]); false -> + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]), + ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.queue_full'), ?LOG(warning, "Dropped msg due to mqueue is full: ~s", [emqx_message:format(Msg)]) @@ -543,31 +545,32 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) -> %% Retry Delivery %%-------------------------------------------------------------------- --spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). -retry(Session = #session{inflight = Inflight}) -> +-spec(retry(emqx_types:clientinfo(), session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). +retry(ClientInfo, Session = #session{inflight = Inflight}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session}; false -> retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), - [], erlang:system_time(millisecond), Session) + [], erlang:system_time(millisecond), Session, ClientInfo) end. -retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) -> +retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) -> {ok, lists:reverse(Acc), Interval, Session}; retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session = - #session{retry_interval = Interval, inflight = Inflight}) -> + #session{retry_interval = Interval, inflight = Inflight}, ClientInfo) -> case (Age = age(Now, Ts)) >= Interval of true -> - {Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight), - retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}); + {Acc1, Inflight1} = do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo), + retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo); false -> {ok, lists:reverse(Acc), Interval - max(0, Age), Session} end. -retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -> +do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo) + when is_record(Msg, message) -> case emqx_message:is_expired(Msg) of true -> - ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, expired]), + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), ok = inc_delivery_expired_cnt(), {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> @@ -576,7 +579,7 @@ retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) - {[{PacketId, Msg1}|Acc], Inflight1} end; -retry_delivery(PacketId, pubrel, Now, Acc, Inflight) -> +do_retry_delivery(PacketId, pubrel, Now, Acc, Inflight, _) -> Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight), {[{pubrel, PacketId}|Acc], Inflight1}. @@ -619,14 +622,14 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = ok = emqx_metrics:inc('session.resumed'), emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]). --spec(replay(session()) -> {ok, replies(), session()}). -replay(Session = #session{inflight = Inflight}) -> +-spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}). +replay(ClientInfo, Session = #session{inflight = Inflight}) -> Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) -> {pubrel, PacketId}; ({PacketId, {Msg, _Ts}}) -> {PacketId, emqx_message:set_flag(dup, true, Msg)} end, emqx_inflight:to_list(Inflight)), - case dequeue(Session) of + case dequeue(ClientInfo, Session) of {ok, NSession} -> {ok, Pubs, NSession}; {ok, More, NSession} -> {ok, lists:append(Pubs, More), NSession} From cfc3c076bb46d8e22a79302c62ebe4ce49499694 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 12 Jan 2022 21:50:50 +0100 Subject: [PATCH 05/14] build: fix package version regexp match pattern --- build | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build b/build index 46cd60bc9..684057bf1 100755 --- a/build +++ b/build @@ -65,7 +65,7 @@ make_relup() { if [ -d "$releases_dir" ]; then while read -r zip; do local base_vsn - base_vsn="$(echo "$zip" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-[0-9a-e]{8})?")" + base_vsn="$(echo "$zip" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-[0-9a-f]{8})?")" if [ ! -d "$releases_dir/$base_vsn" ]; then local tmp_dir tmp_dir="$(mktemp -d -t emqx.XXXXXXX)" From 9f7f5070b24a61248dd45e17b4e8edde7b26e492 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 13 Jan 2022 11:30:11 +0800 Subject: [PATCH 06/14] fix(channel): update the calls to emqx_session APIs --- .../emqx_rule_engine/src/emqx_rule_events.erl | 8 ++- .../src/emqx_rule_runtime.erl | 2 +- apps/emqx_sn/src/emqx_sn_gateway.erl | 6 +- src/emqx_channel.erl | 56 +++++++++++-------- 4 files changed, 44 insertions(+), 28 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 5d85d5990..fb7649e97 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -452,7 +452,8 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> }. test_columns('message.dropped') -> - test_columns('message.publish'); + [ {<<"reason">>, <<"no_subscribers">>} + ] ++ test_columns('message.publish'); test_columns('message.publish') -> [ {<<"clientid">>, <<"c_emqx">>} , {<<"username">>, <<"u_emqx">>} @@ -471,6 +472,9 @@ test_columns('message.delivered') -> , {<<"qos">>, 1} , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} ]; +test_columns('delivery.dropped') -> + [ {<<"reason">>, <<"queue_full">>} + ] ++ test_columns('message.delivered'); test_columns('client.connected') -> [ {<<"clientid">>, <<"c_emqx">>} , {<<"username">>, <<"u_emqx">>} @@ -657,6 +661,7 @@ event_name(<<"$events/session_unsubscribed", _/binary>>) -> event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered'; event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked'; event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'; +event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped'; event_name(_) -> 'message.publish'. event_topic('client.connected') -> <<"$events/client_connected">>; @@ -666,6 +671,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.dropped') -> <<"$events/message_dropped">>; +event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; event_topic('message.publish') -> <<"$events/message_publish">>. printable_maps(undefined) -> #{}; diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 6c2482075..35121c046 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -101,7 +101,7 @@ do_apply_rule(#rule{id = RuleId, {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> Collection2 = filter_collection(Input, InCase, DoEach, Collection), - case Collection2 of + case Collection2 of [] -> emqx_rule_metrics:inc_rules_no_result(RuleId); _ -> emqx_rule_metrics:inc_rules_passed(RuleId) end, diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 1bccf0c1a..442aa1db8 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -408,7 +408,8 @@ asleep(cast, {incoming, ?SN_PINGREQ_MSG(ClientIdPing)}, inc_ping_counter(), case ClientIdPing of ClientId -> - case emqx_session:replay(emqx_channel:get_session(Channel)) of + case emqx_session:replay(emqx_channel:info(clientinfo, Channel), + emqx_channel:get_session(Channel)) of {ok, [], Session0} -> State0 = send_message(?SN_PINGRESP_MSG(), State), {keep_state, State0#state{ @@ -521,7 +522,8 @@ handle_event(info, {deliver, _Topic, Msg}, asleep, % section 6.14, Support of sleeping clients ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p", [Msg, Pendings]), - Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)), + Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel), + Msg, emqx_channel:get_session(Channel)), {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; handle_event(info, Deliver = {deliver, _Topic, _Msg}, _StateName, diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index c8aefb4aa..20257911b 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -342,7 +342,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> - case emqx_session:puback(PacketId, Session) of + case emqx_session:puback(ClientInfo, PacketId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), {ok, Channel#channel{session = NSession}}; @@ -387,8 +387,9 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se handle_out(pubcomp, {PacketId, RC}, Channel) end; -handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> - case emqx_session:pubcomp(PacketId, Session) of +handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{ + clientinfo = ClientInfo, session = Session}) -> + case emqx_session:pubcomp(ClientInfo, PacketId, Session) of {ok, NSession} -> {ok, Channel#channel{session = NSession}}; {ok, Publishes, NSession} -> @@ -720,27 +721,33 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel. -spec(handle_deliver(list(emqx_types:deliver()), channel()) -> {ok, channel()} | {ok, replies(), channel()}). -handle_deliver(Delivers, Channel = #channel{takeover = true, - pendings = Pendings, - session = Session, - clientinfo = #{clientid := ClientId}}) -> +handle_deliver(Delivers, Channel = #channel{ + takeover = true, + pendings = Pendings, + session = Session, + clientinfo = #{clientid := ClientId} = ClientInfo}) -> %% NOTE: Order is important here. While the takeover is in %% progress, the session cannot enqueue messages, since it already %% passed on the queue to the new connection in the session state. - NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)), + NPendings = lists:append(Pendings, + ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)), {ok, Channel#channel{pendings = NPendings}}; -handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, - takeover = false, - session = Session, - clientinfo = #{clientid := ClientId}}) -> - NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session), +handle_deliver(Delivers, Channel = #channel{ + conn_state = disconnected, + takeover = false, + session = Session, + clientinfo = #{clientid := ClientId} = ClientInfo}) -> + NSession = emqx_session:enqueue(ClientInfo, + ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session), Session), {ok, Channel#channel{session = NSession}}; -handle_deliver(Delivers, Channel = #channel{session = Session, - takeover = false, - clientinfo = #{clientid := ClientId}}) -> - case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of +handle_deliver(Delivers, Channel = #channel{ + session = Session, + takeover = false, + clientinfo = #{clientid := ClientId} = ClientInfo}) -> + case emqx_session:deliver(ClientInfo, + ignore_local(ClientInfo, Delivers, ClientId, Session), Session) of {ok, Publishes, NSession} -> NChannel = Channel#channel{session = NSession}, handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel)); @@ -748,12 +755,12 @@ handle_deliver(Delivers, Channel = #channel{session = Session, {ok, Channel#channel{session = NSession}} end. -ignore_local(Delivers, Subscriber, Session) -> +ignore_local(ClientInfo, Delivers, Subscriber, Session) -> Subs = emqx_session:info(subscriptions, Session), lists:dropwhile(fun({deliver, Topic, #message{from = Publisher} = Msg}) -> case maps:find(Topic, Subs) of {ok, #{nl := 1}} when Subscriber =:= Publisher -> - ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, no_local]), + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]), ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.no_local'), true; @@ -1026,8 +1033,8 @@ handle_timeout(_TRef, retry_delivery, Channel = #channel{conn_state = disconnected}) -> {ok, Channel}; handle_timeout(_TRef, retry_delivery, - Channel = #channel{session = Session}) -> - case emqx_session:retry(Session) of + Channel = #channel{session = Session, clientinfo = ClientInfo}) -> + case emqx_session:retry(ClientInfo, Session) of {ok, NSession} -> {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; {ok, Publishes, Timeout, NSession} -> @@ -1589,9 +1596,10 @@ maybe_resume_session(#channel{resuming = false}) -> ignore; maybe_resume_session(#channel{session = Session, resuming = true, - pendings = Pendings}) -> - {ok, Publishes, Session1} = emqx_session:replay(Session), - case emqx_session:deliver(Pendings, Session1) of + pendings = Pendings, + clientinfo = ClientInfo}) -> + {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), + case emqx_session:deliver(ClientInfo, Pendings, Session1) of {ok, Session2} -> {ok, Publishes, Session2}; {ok, More, Session2} -> From a6408cee4fa14655c7476ad33e59857d0c6d7a5f Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 13 Jan 2022 15:28:18 +0800 Subject: [PATCH 07/14] fix(session): update testcases for emqx_session --- src/emqx_session.erl | 26 +++++++++---------- test/emqx_session_SUITE.erl | 52 +++++++++++++++++++------------------ 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 154516711..82b4c8168 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -392,7 +392,7 @@ dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) -> true -> {ok, Session}; false -> {Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q), - deliver(Msgs, [], Session#session{mqueue = Q1}) + do_deliver(ClientInfo, Msgs, [], Session#session{mqueue = Q1}) end. dequeue(_ClientInfo, 0, Msgs, Q) -> @@ -422,22 +422,22 @@ acc_cnt(_Msg, Cnt) -> Cnt - 1. -spec(deliver(emqx_types:clientinfo(), list(emqx_types:deliver()), session()) -> {ok, session()} | {ok, replies(), session()}). deliver(ClientInfo, [Deliver], Session) -> %% Optimize - Enrich = enrich_fun(Session), - deliver_msg(ClientInfo, Enrich(Deliver), Session); + Msg = enrich_delivers(Deliver, Session), + deliver_msg(ClientInfo, Msg, Session); deliver(ClientInfo, Delivers, Session) -> - Msgs = lists:map(enrich_fun(Session), Delivers), - deliver(ClientInfo, Msgs, [], Session). + Msgs = [enrich_delivers(D, Session) || D <- Delivers], + do_deliver(ClientInfo, Msgs, [], Session). -deliver(_ClientInfo, [], Publishes, Session) -> +do_deliver(_ClientInfo, [], Publishes, Session) -> {ok, lists:reverse(Publishes), Session}; -deliver(ClientInfo, [Msg | More], Acc, Session) -> +do_deliver(ClientInfo, [Msg | More], Acc, Session) -> case deliver_msg(ClientInfo, Msg, Session) of {ok, Session1} -> - deliver(More, Acc, Session1); + do_deliver(ClientInfo, More, Acc, Session1); {ok, [Publish], Session1} -> - deliver(More, [Publish|Acc], Session1) + do_deliver(ClientInfo, More, [Publish|Acc], Session1) end. deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) -> @@ -462,7 +462,7 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = -spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver())|emqx_types:message(), session()) -> session()). enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) -> - Msgs = lists:map(enrich_fun(Session), Delivers), + Msgs = [enrich_delivers(D, Session) || D <- Delivers], lists:foldl(fun(Msg, Session0) -> enqueue(ClientInfo, Msg, Session0) end, Session, Msgs); @@ -487,10 +487,8 @@ log_dropped(ClientInfo, Msg = #message{qos = QoS}, #session{mqueue = Q}) -> [emqx_message:format(Msg)]) end. -enrich_fun(Session = #session{subscriptions = Subs}) -> - fun({deliver, Topic, Msg}) -> - enrich_subopts(get_subopts(Topic, Subs), Msg, Session) - end. +enrich_delivers({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) -> + enrich_subopts(get_subopts(Topic, Subs), Msg, Session). maybe_ack(Msg) -> case emqx_shared_sub:is_ack_required(Msg) of diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index cb7c10cae..5dabeca26 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -165,7 +165,7 @@ t_puback(_) -> Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>), Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight, mqueue => mqueue()}), - {ok, Msg, Session1} = emqx_session:puback(1, Session), + {ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). t_puback_with_dequeue(_) -> @@ -174,7 +174,7 @@ t_puback_with_dequeue(_) -> Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>), {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})), Session = session(#{inflight => Inflight, mqueue => Q}), - {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(1, Session), + {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(clientinfo(), 1, Session), ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(0, emqx_session:info(mqueue_len, Session1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)). @@ -182,10 +182,10 @@ t_puback_with_dequeue(_) -> t_puback_error_packet_id_in_use(_) -> Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = - emqx_session:puback(1, session(#{inflight => Inflight})). + emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})). t_puback_error_packet_id_not_found(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(clientinfo(), 1, session()). t_pubrec(_) -> Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), @@ -213,17 +213,17 @@ t_pubrel_error_packetid_not_found(_) -> t_pubcomp(_) -> Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), - {ok, Session1} = emqx_session:pubcomp(1, Session), + {ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). t_pubcomp_error_packetid_in_use(_) -> Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), - {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(1, Session). + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(clientinfo(), 1, Session). t_pubcomp_error_packetid_not_found(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(1, session()). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(clientinfo(), 1, session()). %%-------------------------------------------------------------------- %% Test cases for deliver/retry @@ -231,14 +231,16 @@ t_pubcomp_error_packetid_not_found(_) -> t_dequeue(_) -> Q = mqueue(#{store_qos0 => true}), - {ok, Session} = emqx_session:dequeue(session(#{mqueue => Q})), + {ok, Session} = emqx_session:dequeue(clientinfo(), session(#{mqueue => Q})), Msgs = [emqx_message:make(clientid, ?QOS_0, <<"t0">>, <<"payload">>), emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>), emqx_message:make(clientid, ?QOS_2, <<"t2">>, <<"payload">>) ], - Session1 = lists:foldl(fun emqx_session:enqueue/2, Session, Msgs), + Session1 = lists:foldl(fun(Msg, Session0) -> + emqx_session:enqueue(clientinfo(), Msg, Session0) + end, Session, Msgs), {ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} = - emqx_session:dequeue(Session1), + emqx_session:dequeue(clientinfo(), Session1), ?assertEqual(0, emqx_session:info(mqueue_len, Session2)), ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)), ?assertEqual(<<"t0">>, emqx_message:topic(Msg0)), @@ -253,7 +255,7 @@ t_deliver_qos0(_) -> clientinfo(), <<"t1">>, subopts(), Session), Deliveries = [delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]], {ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} = - emqx_session:deliver(Deliveries, Session1), + emqx_session:deliver(clientinfo(), Deliveries, Session1), ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)). @@ -262,38 +264,38 @@ t_deliver_qos1(_) -> {ok, Session} = emqx_session:subscribe( clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()), Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]], - {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session), + {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)), - {ok, Msg1, Session2} = emqx_session:puback(1, Session1), + {ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1), ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), - {ok, Msg2, Session3} = emqx_session:puback(2, Session2), + {ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2), ?assertEqual(0, emqx_session:info(inflight_cnt, Session3)). t_deliver_qos2(_) -> ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), Delivers = [delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)], {ok, [{1, Msg1}, {2, Msg2}], Session} = - emqx_session:deliver(Delivers, session()), + emqx_session:deliver(clientinfo(), Delivers, session()), ?assertEqual(2, emqx_session:info(inflight_cnt, Session)), ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)). t_deliver_one_msg(_) -> {ok, [{1, Msg}], Session} = - emqx_session:deliver([delivery(?QOS_1, <<"t1">>)], session()), + emqx_session:deliver(clientinfo(), [delivery(?QOS_1, <<"t1">>)], session()), ?assertEqual(1, emqx_session:info(inflight_cnt, Session)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg)). t_deliver_when_inflight_is_full(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session = session(#{inflight => emqx_inflight:new(1)}), - {ok, Publishes, Session1} = emqx_session:deliver(Delivers, Session), + {ok, Publishes, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ?assertEqual(1, length(Publishes)), ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(1, emqx_session:info(mqueue_len, Session1)), - {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(1, Session1), + {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(clientinfo(), 1, Session1), ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), ?assertEqual(0, emqx_session:info(mqueue_len, Session2)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), @@ -301,18 +303,18 @@ t_deliver_when_inflight_is_full(_) -> t_enqueue(_) -> %% store_qos0 = true - Session = emqx_session:enqueue([delivery(?QOS_0, <<"t0">>)], session()), - Session1 = emqx_session:enqueue([delivery(?QOS_1, <<"t1">>), + Session = emqx_session:enqueue(clientinfo(), [delivery(?QOS_0, <<"t0">>)], session()), + Session1 = emqx_session:enqueue(clientinfo(), [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session), ?assertEqual(3, emqx_session:info(mqueue_len, Session1)). t_retry(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session = session(#{retry_interval => 100}), - {ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session), + {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ok = timer:sleep(200), Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs], - {ok, Msgs1, 100, Session2} = emqx_session:retry(Session1), + {ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1), ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)). %%-------------------------------------------------------------------- @@ -331,11 +333,11 @@ t_resume(_) -> t_replay(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], - {ok, Pubs, Session1} = emqx_session:deliver(Delivers, session()), + {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, session()), Msg = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>), - Session2 = emqx_session:enqueue(Msg, Session1), + Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1), Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs], - {ok, ReplayPubs, Session3} = emqx_session:replay(Session2), + {ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2), ?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs), ?assertEqual(3, emqx_session:info(inflight_cnt, Session3)). From a63799742f6887dd2b4ab16ff1777c59c630530c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 13 Jan 2022 16:10:03 +0800 Subject: [PATCH 08/14] fix(channel): update testcases for emqx_channel --- test/emqx_channel_SUITE.erl | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 97b77ca88..a438fc79a 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -80,15 +80,16 @@ t_chan_info(_) -> ?assertEqual(clientinfo(), ClientInfo). t_chan_caps(_) -> - #{max_clientid_len := 65535, - max_qos_allowed := 2, - max_topic_alias := 65535, - max_topic_levels := 128, - retain_available := true, - shared_subscription := true, - subscription_identifiers := true, - wildcard_subscription := true - } = emqx_channel:caps(channel()). + ?assertMatch(#{ + max_clientid_len := 65535, + max_qos_allowed := 2, + max_topic_alias := 65535, + max_topic_levels := Level, + retain_available := true, + shared_subscription := true, + subscription_identifiers := true, + wildcard_subscription := true + } when is_integer(Level), emqx_channel:caps(channel())). %%-------------------------------------------------------------------- %% Test cases for channel handle_in @@ -216,14 +217,14 @@ t_handle_in_qos2_publish_with_error_return(_) -> t_handle_in_puback_ok(_) -> Msg = emqx_message:make(<<"t">>, <<"payload">>), ok = meck:expect(emqx_session, puback, - fun(_PacketId, Session) -> {ok, Msg, Session} end), + fun(_, _PacketId, Session) -> {ok, Msg, Session} end), Channel = channel(#{conn_state => connected}), {ok, _NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel). % ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)). t_handle_in_puback_id_in_use(_) -> ok = meck:expect(emqx_session, puback, - fun(_, _Session) -> + fun(_, _, _Session) -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} end), {ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()). @@ -231,7 +232,7 @@ t_handle_in_puback_id_in_use(_) -> t_handle_in_puback_id_not_found(_) -> ok = meck:expect(emqx_session, puback, - fun(_, _Session) -> + fun(_, _, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), {ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()). @@ -305,13 +306,13 @@ t_handle_in_pubrel_not_found_error(_) -> emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()). t_handle_in_pubcomp_ok(_) -> - ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, pubcomp, fun(_, _, Session) -> {ok, Session} end), {ok, _Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()). % ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)). t_handle_in_pubcomp_not_found_error(_) -> ok = meck:expect(emqx_session, pubcomp, - fun(_PacketId, _Session) -> + fun(_, _PacketId, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), Channel = channel(#{conn_state => connected}), @@ -633,7 +634,7 @@ t_handle_timeout_keepalive(_) -> t_handle_timeout_retry_delivery(_) -> TRef = make_ref(), - ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, retry, fun(_, Session) -> {ok, Session} end), Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()), {ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, Channel). From c6a571c20770a46dcfb7a51c1e2f09176e362bcc Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 13 Jan 2022 17:41:16 +0800 Subject: [PATCH 09/14] chore(alarm): support license alarm type --- src/emqx_alarm.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index 62ce1af8b..53cd9561d 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -375,6 +375,8 @@ normalize_message(high_cpu_usage, #{usage := Usage}) -> list_to_binary(io_lib:format("~p% cpu usage", [Usage])); normalize_message(too_many_processes, #{usage := Usage}) -> list_to_binary(io_lib:format("~p% process usage", [Usage])); +normalize_message(license_usage, #{high_watermark := High}) -> + iolist_to_binary(["License: the number of connections exceeds ", High, "%"]); normalize_message(partition, #{occurred := Node}) -> list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> From adbb067a91d9959f53398624bca764edd1fb49e8 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 12 Jan 2022 11:57:02 +0100 Subject: [PATCH 10/14] ci: skip dialyzer race_condition checks backported from 5.0 race_condition check is very RAM demanding --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 25bd95089..fb60981b2 100644 --- a/rebar.config +++ b/rebar.config @@ -17,7 +17,7 @@ deprecated_function_calls,warnings_as_errors,deprecated_functions]}. {dialyzer, [ - {warnings, [unmatched_returns, error_handling, race_conditions]}, + {warnings, [unmatched_returns, error_handling]}, {plt_location, "."}, {plt_prefix, "emqx_dialyzer"}, {plt_apps, all_apps}, From a46ea363f676bd10c9d97f080e593671aba0dddb Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 12 Jan 2022 18:49:01 +0800 Subject: [PATCH 11/14] chore: rename CHANGES.md to CHANGES-4.3.md --- CHANGES.md => CHANGES-4.3.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename CHANGES.md => CHANGES-4.3.md (100%) diff --git a/CHANGES.md b/CHANGES-4.3.md similarity index 100% rename from CHANGES.md rename to CHANGES-4.3.md From dac0e824c87df435308afc488cedf5c8f5f235e9 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 12 Jan 2022 21:50:50 +0100 Subject: [PATCH 12/14] build: fix package version regexp match pattern --- build | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build b/build index 46cd60bc9..684057bf1 100755 --- a/build +++ b/build @@ -65,7 +65,7 @@ make_relup() { if [ -d "$releases_dir" ]; then while read -r zip; do local base_vsn - base_vsn="$(echo "$zip" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-[0-9a-e]{8})?")" + base_vsn="$(echo "$zip" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-[0-9a-f]{8})?")" if [ ! -d "$releases_dir/$base_vsn" ]; then local tmp_dir tmp_dir="$(mktemp -d -t emqx.XXXXXXX)" From 4abcab8d52dd4844b050243f18a01f45589984a3 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 13 Jan 2022 18:37:12 +0800 Subject: [PATCH 13/14] fix(appup): update appup files for emqx,emqx_sn,rule_engine --- .../src/emqx_rule_engine.appup.src | 14 +++++ apps/emqx_sn/src/emqx_sn.app.src | 2 +- apps/emqx_sn/src/emqx_sn.appup.src | 40 +++++++++---- src/emqx.appup.src | 56 ++++++++++++++++++- 4 files changed, 97 insertions(+), 15 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index f329f4b22..be586e4ea 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -3,23 +3,27 @@ [ {"4.3.6", [ {update, emqx_rule_metrics, {advanced, ["4.3.6"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} ]}, {"4.3.5", [ {update, emqx_rule_metrics, {advanced, ["4.3.5"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} ]}, {"4.3.4", [ {update, emqx_rule_metrics, {advanced, ["4.3.4"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} ]}, {"4.3.3", [ {update, emqx_rule_metrics, {advanced, ["4.3.3"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} @@ -27,6 +31,7 @@ ]}, {"4.3.2", [ {update, emqx_rule_metrics, {advanced, ["4.3.2"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {apply,{emqx_stats,cancel_update,[rule_registery_stats]}} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} @@ -35,6 +40,7 @@ ]}, {"4.3.1", [ {update, emqx_rule_metrics, {advanced, ["4.3.1"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {apply,{emqx_stats,cancel_update,[rule_registery_stats]}} @@ -43,6 +49,7 @@ ]}, {"4.3.0", [ {update, emqx_rule_metrics, {advanced, ["4.3.0"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} @@ -55,23 +62,27 @@ [ {"4.3.6", [ {update, emqx_rule_metrics, {advanced, ["4.3.6"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} ]}, {"4.3.5", [ {update, emqx_rule_metrics, {advanced, ["4.3.5"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} ]}, {"4.3.4", [ {update, emqx_rule_metrics, {advanced, ["4.3.4"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} ]}, {"4.3.3", [ {update, emqx_rule_metrics, {advanced, ["4.3.3"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} @@ -79,6 +90,7 @@ ]}, {"4.3.2", [ {update, emqx_rule_metrics, {advanced, ["4.3.2"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {apply,{emqx_stats,cancel_update,[rule_registery_stats]}} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} @@ -87,6 +99,7 @@ ]}, {"4.3.1", [ {update, emqx_rule_metrics, {advanced, ["4.3.1"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {apply,{emqx_stats,cancel_update,[rule_registery_stats]}} @@ -95,6 +108,7 @@ ]}, {"4.3.0", [ {update, emqx_rule_metrics, {advanced, ["4.3.0"]}} + , {load_module,emqx_rule_events,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} diff --git a/apps/emqx_sn/src/emqx_sn.app.src b/apps/emqx_sn/src/emqx_sn.app.src index a36166eae..ef0d24bf9 100644 --- a/apps/emqx_sn/src/emqx_sn.app.src +++ b/apps/emqx_sn/src/emqx_sn.app.src @@ -1,6 +1,6 @@ {application, emqx_sn, [{description, "EMQ X MQTT-SN Plugin"}, - {vsn, "4.3.4"}, % strict semver, bump manually! + {vsn, "4.3.5"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,esockd]}, diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 22d2cd606..749a72956 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -1,14 +1,30 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.3",[{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}]}, - {"4.3.2", - [{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[0-1]">>, - [{restart_application,emqx_sn}]}], - [{"4.3.3",[{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}]}, - {"4.3.2", - [{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[0-1]">>, - [{restart_application,emqx_sn}]}]}. + [ + {"4.3.4",[ + {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + ]}, + {"4.3.3",[ + {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + ]}, + {"4.3.2", [ + {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + ]}, + {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} + ], + [ + {"4.3.4",[ + {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + ]}, + {"4.3.3",[ + {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + ]}, + {"4.3.2", [ + {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + ]}, + {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} + ]}. diff --git a/src/emqx.appup.src b/src/emqx.appup.src index c4885b7b1..e06c12e2a 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,16 +1,23 @@ %% -*- mode: erlang -*- {VSN, [{"4.3.12", - [{load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, + [ {load_module,emqx_channel,brutal_purge,soft_purge,[]} + , {load_module,emqx_metrics,brutal_purge,soft_purge,[]} + , {load_module,emqx_session,brutal_purge,soft_purge,[]} + ]}, {"4.3.11", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, @@ -23,6 +30,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, @@ -36,6 +45,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, @@ -49,6 +60,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -64,6 +77,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -80,6 +95,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -97,6 +114,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -115,6 +134,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -138,6 +159,8 @@ {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, @@ -161,6 +184,8 @@ {load_module,emqx_congestion,brutal_purge,soft_purge,[]}, {load_module,emqx_node_dump,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, @@ -188,6 +213,7 @@ {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_node_dump,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -199,16 +225,23 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.12", - [{load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, + [ {load_module,emqx_channel,brutal_purge,soft_purge,[]} + , {load_module,emqx_metrics,brutal_purge,soft_purge,[]} + , {load_module,emqx_session,brutal_purge,soft_purge,[]} + ]}, {"4.3.11", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, @@ -221,6 +254,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, @@ -234,6 +269,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, @@ -247,6 +284,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, @@ -262,6 +301,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, @@ -278,6 +319,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, @@ -295,6 +338,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, @@ -313,6 +358,8 @@ {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -338,6 +385,8 @@ {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, @@ -361,6 +410,8 @@ {load_module,emqx_congestion,brutal_purge,soft_purge,[]}, {load_module,emqx_node_dump,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, @@ -384,6 +435,7 @@ {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_node_dump,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, From 7c0d70cfde9ac20191e17213b3d779c3b2a38fc8 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 14 Jan 2022 00:31:56 +0800 Subject: [PATCH 14/14] feat(license): license expriy early alarm. --- src/emqx_alarm.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index 53cd9561d..de1fbadb8 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -375,8 +375,10 @@ normalize_message(high_cpu_usage, #{usage := Usage}) -> list_to_binary(io_lib:format("~p% cpu usage", [Usage])); normalize_message(too_many_processes, #{usage := Usage}) -> list_to_binary(io_lib:format("~p% process usage", [Usage])); -normalize_message(license_usage, #{high_watermark := High}) -> +normalize_message(license_quota, #{high_watermark := High}) -> iolist_to_binary(["License: the number of connections exceeds ", High, "%"]); +normalize_message(license_expiry, #{expiry_at := ExpiryAt}) -> + iolist_to_binary(["License will be expired at ", ExpiryAt]); normalize_message(partition, #{occurred := Node}) -> list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->