From 12da3c0986961ea9f04341d7003e4c2f4c0352f9 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 29 Jan 2024 15:33:23 +0800 Subject: [PATCH 1/4] feat: configurable server side message_expiry_interval --- apps/emqx/src/emqx_channel.erl | 3 ++- apps/emqx/src/emqx_message.erl | 22 ++++++++++++++-------- apps/emqx/src/emqx_schema.erl | 9 +++++++++ apps/emqx/src/emqx_session_mem.erl | 8 ++++---- apps/emqx/test/emqx_config_SUITE.erl | 1 + apps/emqx/test/emqx_message_SUITE.erl | 6 +++--- apps/emqx/test/emqx_session_mem_SUITE.erl | 1 + rel/config/examples/mqtt.conf.example | 15 +++++++++++++++ rel/i18n/emqx_schema.hocon | 6 ++++++ 9 files changed, 55 insertions(+), 16 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index f4661a85e..86c3b840c 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -930,7 +930,8 @@ handle_deliver( Delivers1 = maybe_nack(Delivers), Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session), NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session), - {ok, Channel#channel{session = NSession}}; + %% we need to update stats here, as the stats_timer is canceled after disconnected + {ok, {event, updated}, Channel#channel{session = NSession}}; handle_deliver(Delivers, Channel) -> Delivers1 = emqx_external_trace:start_trace_send(Delivers, trace_info(Channel)), do_handle_deliver(Delivers1, Channel). diff --git a/apps/emqx/src/emqx_message.erl b/apps/emqx/src/emqx_message.erl index 6b684c199..0628908d1 100644 --- a/apps/emqx/src/emqx_message.erl +++ b/apps/emqx/src/emqx_message.erl @@ -65,7 +65,7 @@ ]). -export([ - is_expired/1, + is_expired/2, update_expiry/1, timestamp_now/0 ]). @@ -273,14 +273,20 @@ remove_header(Hdr, Msg = #message{headers = Headers}) -> false -> Msg end. --spec is_expired(emqx_types:message()) -> boolean(). -is_expired(#message{ - headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, - timestamp = CreatedAt -}) -> +-spec is_expired(emqx_types:message(), atom()) -> boolean(). +is_expired( + #message{ + headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, + timestamp = CreatedAt + }, + _ +) -> elapsed(CreatedAt) > timer:seconds(Interval); -is_expired(_Msg) -> - false. +is_expired(#message{timestamp = CreatedAt}, Zone) -> + case emqx_config:get_zone_conf(Zone, [mqtt, message_expiry_interval], infinity) of + infinity -> false; + Interval -> elapsed(CreatedAt) > Interval + end. -spec update_expiry(emqx_types:message()) -> emqx_types:message(). update_expiry( diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index e26475855..d9171f711 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -3745,6 +3745,15 @@ mqtt_session() -> importance => ?IMPORTANCE_LOW } )}, + {"message_expiry_interval", + sc( + hoconsc:union([duration(), infinity]), + #{ + default => infinity, + desc => ?DESC(mqtt_message_expiry_interval), + importance => ?IMPORTANCE_LOW + } + )}, {"max_awaiting_rel", sc( hoconsc:union([non_neg_integer(), infinity]), diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index c8affdaea..e8c7a7d18 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -468,12 +468,12 @@ dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) -> dequeue(_ClientInfo, 0, Msgs, Q) -> {lists:reverse(Msgs), Q}; -dequeue(ClientInfo, Cnt, Msgs, Q) -> +dequeue(ClientInfo = #{zone := Zone}, Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of {empty, _Q} -> dequeue(ClientInfo, 0, Msgs, Q); {{value, Msg}, Q1} -> - case emqx_message:is_expired(Msg) of + case emqx_message:is_expired(Msg, Zone) of true -> _ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}), dequeue(ClientInfo, Cnt, Msgs, Q1); @@ -619,14 +619,14 @@ retry_delivery( end. do_retry_delivery( - ClientInfo, + ClientInfo = #{zone := Zone}, PacketId, #inflight_data{phase = wait_ack, message = Msg} = Data, Now, Acc, Inflight ) -> - case emqx_message:is_expired(Msg) of + case emqx_message:is_expired(Msg, Zone) of true -> _ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}), {Acc, emqx_inflight:delete(PacketId, Inflight)}; diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index 6a49507a6..4ddebd278 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -446,6 +446,7 @@ zone_global_defaults() -> response_information => [], retain_available => true, retry_interval => 30000, + message_expiry_interval => infinity, server_keepalive => disabled, session_expiry_interval => 7200000, shared_subscription => true, diff --git a/apps/emqx/test/emqx_message_SUITE.erl b/apps/emqx/test/emqx_message_SUITE.erl index 2e4164652..c97c0d899 100644 --- a/apps/emqx/test/emqx_message_SUITE.erl +++ b/apps/emqx/test/emqx_message_SUITE.erl @@ -143,12 +143,12 @@ t_undefined_headers(_) -> t_is_expired(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), - ?assertNot(emqx_message:is_expired(Msg)), + ?assertNot(emqx_message:is_expired(Msg, ?MODULE)), Msg1 = emqx_message:set_headers(#{properties => #{'Message-Expiry-Interval' => 1}}, Msg), timer:sleep(500), - ?assertNot(emqx_message:is_expired(Msg1)), + ?assertNot(emqx_message:is_expired(Msg1, ?MODULE)), timer:sleep(600), - ?assert(emqx_message:is_expired(Msg1)), + ?assert(emqx_message:is_expired(Msg1, ?MODULE)), timer:sleep(1000), Msg = emqx_message:update_expiry(Msg), Msg2 = emqx_message:update_expiry(Msg1), diff --git a/apps/emqx/test/emqx_session_mem_SUITE.erl b/apps/emqx/test/emqx_session_mem_SUITE.erl index 20d622941..a539dde9a 100644 --- a/apps/emqx/test/emqx_session_mem_SUITE.erl +++ b/apps/emqx/test/emqx_session_mem_SUITE.erl @@ -545,6 +545,7 @@ clientinfo() -> clientinfo(#{}). clientinfo(Init) -> maps:merge( #{ + zone => ?MODULE, clientid => <<"clientid">>, username => <<"username">> }, diff --git a/rel/config/examples/mqtt.conf.example b/rel/config/examples/mqtt.conf.example index 64c73524c..84fdf7783 100644 --- a/rel/config/examples/mqtt.conf.example +++ b/rel/config/examples/mqtt.conf.example @@ -82,6 +82,21 @@ mqtt { ## Specifies how long the session will expire after the connection is disconnected, only for non-MQTT 5.0 connections session_expiry_interval = 2h + ## The expiry interval of MQTT messages. + ## + ## For MQTT 5.0 clients, this configuration will only take effect when the + ## Message-Expiry-Interval property is not set in the message; otherwise, the + ## value of the Message-Expiry-Interval property will be used. + ## For MQTT versions older than 5.0, this configuration will always take effect. + ## Please note that setting message_expiry_interval greater than session_expiry_interval + ## is meaningless, as all messages will be cleared when the session expires. + ## + ## Type: + ## - infinity :: Never expire + ## - Time Duration :: The expiry interval + ## Default: infinity + message_expiry_interval = infinity + ## Maximum queue length. Enqueued messages when persistent client disconnected, or inflight window is full ## Type: infinity | Integer max_mqueue_len = 1000 diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 5ff4e063e..40da8d75e 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -191,6 +191,12 @@ mqtt_session_expiry_interval.desc: mqtt_session_expiry_interval.label: """Session Expiry Interval""" +mqtt_message_expiry_interval.desc: +"""The expiry interval of MQTT messages. For MQTT 5.0 clients, this configuration will only take effect when the Message-Expiry-Interval property is not set in the message; otherwise, the value of the Message-Expiry-Interval property will be used. For MQTT versions older than 5.0, this configuration will always take effect. Please note that setting message_expiry_interval greater than session_expiry_interval is meaningless, as all messages will be cleared when the session expires.""" + +mqtt_message_expiry_interval.label: +"""Message Expiry Interval""" + fields_listener_enabled.desc: """Enable listener.""" From 9f22c2c4555a502a34d7c7fe8871606d825a95f3 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 29 Jan 2024 18:18:18 +0800 Subject: [PATCH 2/4] ci: add some sleep and retry to emqx_persistent_session_ds_SUITE --- apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 40ffe7f32..96236c0ae 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -595,7 +595,7 @@ t_session_gc(Config) -> } ) ), - ?assertMatch([ClientId1], list_all_sessions(Node1), sessions), + ?retry(50, 3, [ClientId1] = list_all_sessions(Node1)), ?assertMatch([_], list_all_subscriptions(Node1), subscriptions), ok end, From 206af96a33c7071c49e02d99fe6c8a238b27de95 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 30 Jan 2024 15:45:50 +0800 Subject: [PATCH 3/4] ci: update testcases for message-expiry-interval --- apps/emqx/test/emqx_message_SUITE.erl | 46 +++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/apps/emqx/test/emqx_message_SUITE.erl b/apps/emqx/test/emqx_message_SUITE.erl index c97c0d899..7bf6c9a7e 100644 --- a/apps/emqx/test/emqx_message_SUITE.erl +++ b/apps/emqx/test/emqx_message_SUITE.erl @@ -141,18 +141,50 @@ t_undefined_headers(_) -> Msg2 = emqx_message:set_header(c, 3, Msg), ?assertEqual(3, emqx_message:get_header(c, Msg2)). -t_is_expired(_) -> +t_is_expired_1(_) -> + test_msg_expired_property(?MODULE). + +t_is_expired_2(_) -> + %% if the 'Message-Expiry-Interval' property is set, the message_expiry_interval should be ignored + try + emqx_config:put( + maps:from_list([{list_to_atom(Root), #{}} || Root <- emqx_zone_schema:roots()]) + ), + emqx_config:put_zone_conf(?MODULE, [mqtt, message_expiry_interval], timer:seconds(10)), + test_msg_expired_property(?MODULE) + after + emqx_config:erase_all() + end. + +t_is_expired_3(_) -> + try + emqx_config:put( + maps:from_list([{list_to_atom(Root), #{}} || Root <- emqx_zone_schema:roots()]) + ), + emqx_config:put_zone_conf(?MODULE, [mqtt, message_expiry_interval], 100), + Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), + ?assertNot(emqx_message:is_expired(Msg, ?MODULE)), + timer:sleep(120), + ?assert(emqx_message:is_expired(Msg, ?MODULE)) + after + emqx_config:erase_all() + end. + +test_msg_expired_property(Zone) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), - ?assertNot(emqx_message:is_expired(Msg, ?MODULE)), + ?assertNot(emqx_message:is_expired(Msg, Zone)), Msg1 = emqx_message:set_headers(#{properties => #{'Message-Expiry-Interval' => 1}}, Msg), timer:sleep(500), - ?assertNot(emqx_message:is_expired(Msg1, ?MODULE)), + ?assertNot(emqx_message:is_expired(Msg1, Zone)), timer:sleep(600), - ?assert(emqx_message:is_expired(Msg1, ?MODULE)), + ?assert(emqx_message:is_expired(Msg1, Zone)). + +t_update_expired(_) -> + Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), timer:sleep(1000), - Msg = emqx_message:update_expiry(Msg), - Msg2 = emqx_message:update_expiry(Msg1), - Props = emqx_message:get_header(properties, Msg2), + ?assertEqual(Msg, emqx_message:update_expiry(Msg)), + Msg1 = emqx_message:set_headers(#{properties => #{'Message-Expiry-Interval' => 1}}, Msg), + Props = emqx_message:get_header(properties, emqx_message:update_expiry(Msg1)), ?assertEqual(1, maps:get('Message-Expiry-Interval', Props)). % t_to_list(_) -> From 30508d833c22a3bafe97af7f1bb9d0ed3bafb4bd Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 30 Jan 2024 15:46:55 +0800 Subject: [PATCH 4/4] chore: add change logs for message-expiry-interval --- changes/ee/feat-12417.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changes/ee/feat-12417.md diff --git a/changes/ee/feat-12417.md b/changes/ee/feat-12417.md new file mode 100644 index 000000000..6847c6925 --- /dev/null +++ b/changes/ee/feat-12417.md @@ -0,0 +1,3 @@ +Added support for specifying the expiration time of MQTT messages via configuration file. + +See the description of the `message_expiry_interval` configuration in the `mqtt.conf.example` file for more details.