diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index f4661a85e..86c3b840c 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -930,7 +930,8 @@ handle_deliver( Delivers1 = maybe_nack(Delivers), Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session), NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session), - {ok, Channel#channel{session = NSession}}; + %% we need to update stats here, as the stats_timer is canceled after disconnected + {ok, {event, updated}, Channel#channel{session = NSession}}; handle_deliver(Delivers, Channel) -> Delivers1 = emqx_external_trace:start_trace_send(Delivers, trace_info(Channel)), do_handle_deliver(Delivers1, Channel). diff --git a/apps/emqx/src/emqx_message.erl b/apps/emqx/src/emqx_message.erl index 6b684c199..0628908d1 100644 --- a/apps/emqx/src/emqx_message.erl +++ b/apps/emqx/src/emqx_message.erl @@ -65,7 +65,7 @@ ]). -export([ - is_expired/1, + is_expired/2, update_expiry/1, timestamp_now/0 ]). @@ -273,14 +273,20 @@ remove_header(Hdr, Msg = #message{headers = Headers}) -> false -> Msg end. --spec is_expired(emqx_types:message()) -> boolean(). -is_expired(#message{ - headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, - timestamp = CreatedAt -}) -> +-spec is_expired(emqx_types:message(), atom()) -> boolean(). +is_expired( + #message{ + headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, + timestamp = CreatedAt + }, + _ +) -> elapsed(CreatedAt) > timer:seconds(Interval); -is_expired(_Msg) -> - false. +is_expired(#message{timestamp = CreatedAt}, Zone) -> + case emqx_config:get_zone_conf(Zone, [mqtt, message_expiry_interval], infinity) of + infinity -> false; + Interval -> elapsed(CreatedAt) > Interval + end. -spec update_expiry(emqx_types:message()) -> emqx_types:message(). update_expiry( diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index e26475855..d9171f711 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -3745,6 +3745,15 @@ mqtt_session() -> importance => ?IMPORTANCE_LOW } )}, + {"message_expiry_interval", + sc( + hoconsc:union([duration(), infinity]), + #{ + default => infinity, + desc => ?DESC(mqtt_message_expiry_interval), + importance => ?IMPORTANCE_LOW + } + )}, {"max_awaiting_rel", sc( hoconsc:union([non_neg_integer(), infinity]), diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index c8affdaea..e8c7a7d18 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -468,12 +468,12 @@ dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) -> dequeue(_ClientInfo, 0, Msgs, Q) -> {lists:reverse(Msgs), Q}; -dequeue(ClientInfo, Cnt, Msgs, Q) -> +dequeue(ClientInfo = #{zone := Zone}, Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of {empty, _Q} -> dequeue(ClientInfo, 0, Msgs, Q); {{value, Msg}, Q1} -> - case emqx_message:is_expired(Msg) of + case emqx_message:is_expired(Msg, Zone) of true -> _ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}), dequeue(ClientInfo, Cnt, Msgs, Q1); @@ -619,14 +619,14 @@ retry_delivery( end. do_retry_delivery( - ClientInfo, + ClientInfo = #{zone := Zone}, PacketId, #inflight_data{phase = wait_ack, message = Msg} = Data, Now, Acc, Inflight ) -> - case emqx_message:is_expired(Msg) of + case emqx_message:is_expired(Msg, Zone) of true -> _ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}), {Acc, emqx_inflight:delete(PacketId, Inflight)}; diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index 6a49507a6..4ddebd278 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -446,6 +446,7 @@ zone_global_defaults() -> response_information => [], retain_available => true, retry_interval => 30000, + message_expiry_interval => infinity, server_keepalive => disabled, session_expiry_interval => 7200000, shared_subscription => true, diff --git a/apps/emqx/test/emqx_message_SUITE.erl b/apps/emqx/test/emqx_message_SUITE.erl index 2e4164652..c97c0d899 100644 --- a/apps/emqx/test/emqx_message_SUITE.erl +++ b/apps/emqx/test/emqx_message_SUITE.erl @@ -143,12 +143,12 @@ t_undefined_headers(_) -> t_is_expired(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), - ?assertNot(emqx_message:is_expired(Msg)), + ?assertNot(emqx_message:is_expired(Msg, ?MODULE)), Msg1 = emqx_message:set_headers(#{properties => #{'Message-Expiry-Interval' => 1}}, Msg), timer:sleep(500), - ?assertNot(emqx_message:is_expired(Msg1)), + ?assertNot(emqx_message:is_expired(Msg1, ?MODULE)), timer:sleep(600), - ?assert(emqx_message:is_expired(Msg1)), + ?assert(emqx_message:is_expired(Msg1, ?MODULE)), timer:sleep(1000), Msg = emqx_message:update_expiry(Msg), Msg2 = emqx_message:update_expiry(Msg1), diff --git a/apps/emqx/test/emqx_session_mem_SUITE.erl b/apps/emqx/test/emqx_session_mem_SUITE.erl index 20d622941..a539dde9a 100644 --- a/apps/emqx/test/emqx_session_mem_SUITE.erl +++ b/apps/emqx/test/emqx_session_mem_SUITE.erl @@ -545,6 +545,7 @@ clientinfo() -> clientinfo(#{}). clientinfo(Init) -> maps:merge( #{ + zone => ?MODULE, clientid => <<"clientid">>, username => <<"username">> }, diff --git a/rel/config/examples/mqtt.conf.example b/rel/config/examples/mqtt.conf.example index 64c73524c..84fdf7783 100644 --- a/rel/config/examples/mqtt.conf.example +++ b/rel/config/examples/mqtt.conf.example @@ -82,6 +82,21 @@ mqtt { ## Specifies how long the session will expire after the connection is disconnected, only for non-MQTT 5.0 connections session_expiry_interval = 2h + ## The expiry interval of MQTT messages. + ## + ## For MQTT 5.0 clients, this configuration will only take effect when the + ## Message-Expiry-Interval property is not set in the message; otherwise, the + ## value of the Message-Expiry-Interval property will be used. + ## For MQTT versions older than 5.0, this configuration will always take effect. + ## Please note that setting message_expiry_interval greater than session_expiry_interval + ## is meaningless, as all messages will be cleared when the session expires. + ## + ## Type: + ## - infinity :: Never expire + ## - Time Duration :: The expiry interval + ## Default: infinity + message_expiry_interval = infinity + ## Maximum queue length. Enqueued messages when persistent client disconnected, or inflight window is full ## Type: infinity | Integer max_mqueue_len = 1000 diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 5ff4e063e..40da8d75e 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -191,6 +191,12 @@ mqtt_session_expiry_interval.desc: mqtt_session_expiry_interval.label: """Session Expiry Interval""" +mqtt_message_expiry_interval.desc: +"""The expiry interval of MQTT messages. For MQTT 5.0 clients, this configuration will only take effect when the Message-Expiry-Interval property is not set in the message; otherwise, the value of the Message-Expiry-Interval property will be used. For MQTT versions older than 5.0, this configuration will always take effect. Please note that setting message_expiry_interval greater than session_expiry_interval is meaningless, as all messages will be cleared when the session expires.""" + +mqtt_message_expiry_interval.label: +"""Message Expiry Interval""" + fields_listener_enabled.desc: """Enable listener."""