Merge pull request #12417 from emqx/port-msg-expiry-interval-from44
feat: configurable server side message_expiry_interval
This commit is contained in:
commit
db3f285054
|
@ -595,7 +595,7 @@ t_session_gc(Config) ->
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
?assertMatch([ClientId1], list_all_sessions(Node1), sessions),
|
?retry(50, 3, [ClientId1] = list_all_sessions(Node1)),
|
||||||
?assertMatch([_], list_all_subscriptions(Node1), subscriptions),
|
?assertMatch([_], list_all_subscriptions(Node1), subscriptions),
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -930,7 +930,8 @@ handle_deliver(
|
||||||
Delivers1 = maybe_nack(Delivers),
|
Delivers1 = maybe_nack(Delivers),
|
||||||
Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session),
|
Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session),
|
||||||
NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session),
|
NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session),
|
||||||
{ok, Channel#channel{session = NSession}};
|
%% we need to update stats here, as the stats_timer is canceled after disconnected
|
||||||
|
{ok, {event, updated}, Channel#channel{session = NSession}};
|
||||||
handle_deliver(Delivers, Channel) ->
|
handle_deliver(Delivers, Channel) ->
|
||||||
Delivers1 = emqx_external_trace:start_trace_send(Delivers, trace_info(Channel)),
|
Delivers1 = emqx_external_trace:start_trace_send(Delivers, trace_info(Channel)),
|
||||||
do_handle_deliver(Delivers1, Channel).
|
do_handle_deliver(Delivers1, Channel).
|
||||||
|
|
|
@ -65,7 +65,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
is_expired/1,
|
is_expired/2,
|
||||||
update_expiry/1,
|
update_expiry/1,
|
||||||
timestamp_now/0
|
timestamp_now/0
|
||||||
]).
|
]).
|
||||||
|
@ -273,14 +273,20 @@ remove_header(Hdr, Msg = #message{headers = Headers}) ->
|
||||||
false -> Msg
|
false -> Msg
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec is_expired(emqx_types:message()) -> boolean().
|
-spec is_expired(emqx_types:message(), atom()) -> boolean().
|
||||||
is_expired(#message{
|
is_expired(
|
||||||
|
#message{
|
||||||
headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
|
headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
|
||||||
timestamp = CreatedAt
|
timestamp = CreatedAt
|
||||||
}) ->
|
},
|
||||||
|
_
|
||||||
|
) ->
|
||||||
elapsed(CreatedAt) > timer:seconds(Interval);
|
elapsed(CreatedAt) > timer:seconds(Interval);
|
||||||
is_expired(_Msg) ->
|
is_expired(#message{timestamp = CreatedAt}, Zone) ->
|
||||||
false.
|
case emqx_config:get_zone_conf(Zone, [mqtt, message_expiry_interval], infinity) of
|
||||||
|
infinity -> false;
|
||||||
|
Interval -> elapsed(CreatedAt) > Interval
|
||||||
|
end.
|
||||||
|
|
||||||
-spec update_expiry(emqx_types:message()) -> emqx_types:message().
|
-spec update_expiry(emqx_types:message()) -> emqx_types:message().
|
||||||
update_expiry(
|
update_expiry(
|
||||||
|
|
|
@ -3745,6 +3745,15 @@ mqtt_session() ->
|
||||||
importance => ?IMPORTANCE_LOW
|
importance => ?IMPORTANCE_LOW
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
{"message_expiry_interval",
|
||||||
|
sc(
|
||||||
|
hoconsc:union([duration(), infinity]),
|
||||||
|
#{
|
||||||
|
default => infinity,
|
||||||
|
desc => ?DESC(mqtt_message_expiry_interval),
|
||||||
|
importance => ?IMPORTANCE_LOW
|
||||||
|
}
|
||||||
|
)},
|
||||||
{"max_awaiting_rel",
|
{"max_awaiting_rel",
|
||||||
sc(
|
sc(
|
||||||
hoconsc:union([non_neg_integer(), infinity]),
|
hoconsc:union([non_neg_integer(), infinity]),
|
||||||
|
|
|
@ -468,12 +468,12 @@ dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) ->
|
||||||
|
|
||||||
dequeue(_ClientInfo, 0, Msgs, Q) ->
|
dequeue(_ClientInfo, 0, Msgs, Q) ->
|
||||||
{lists:reverse(Msgs), Q};
|
{lists:reverse(Msgs), Q};
|
||||||
dequeue(ClientInfo, Cnt, Msgs, Q) ->
|
dequeue(ClientInfo = #{zone := Zone}, Cnt, Msgs, Q) ->
|
||||||
case emqx_mqueue:out(Q) of
|
case emqx_mqueue:out(Q) of
|
||||||
{empty, _Q} ->
|
{empty, _Q} ->
|
||||||
dequeue(ClientInfo, 0, Msgs, Q);
|
dequeue(ClientInfo, 0, Msgs, Q);
|
||||||
{{value, Msg}, Q1} ->
|
{{value, Msg}, Q1} ->
|
||||||
case emqx_message:is_expired(Msg) of
|
case emqx_message:is_expired(Msg, Zone) of
|
||||||
true ->
|
true ->
|
||||||
_ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}),
|
_ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}),
|
||||||
dequeue(ClientInfo, Cnt, Msgs, Q1);
|
dequeue(ClientInfo, Cnt, Msgs, Q1);
|
||||||
|
@ -619,14 +619,14 @@ retry_delivery(
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_retry_delivery(
|
do_retry_delivery(
|
||||||
ClientInfo,
|
ClientInfo = #{zone := Zone},
|
||||||
PacketId,
|
PacketId,
|
||||||
#inflight_data{phase = wait_ack, message = Msg} = Data,
|
#inflight_data{phase = wait_ack, message = Msg} = Data,
|
||||||
Now,
|
Now,
|
||||||
Acc,
|
Acc,
|
||||||
Inflight
|
Inflight
|
||||||
) ->
|
) ->
|
||||||
case emqx_message:is_expired(Msg) of
|
case emqx_message:is_expired(Msg, Zone) of
|
||||||
true ->
|
true ->
|
||||||
_ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}),
|
_ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}),
|
||||||
{Acc, emqx_inflight:delete(PacketId, Inflight)};
|
{Acc, emqx_inflight:delete(PacketId, Inflight)};
|
||||||
|
|
|
@ -446,6 +446,7 @@ zone_global_defaults() ->
|
||||||
response_information => [],
|
response_information => [],
|
||||||
retain_available => true,
|
retain_available => true,
|
||||||
retry_interval => 30000,
|
retry_interval => 30000,
|
||||||
|
message_expiry_interval => infinity,
|
||||||
server_keepalive => disabled,
|
server_keepalive => disabled,
|
||||||
session_expiry_interval => 7200000,
|
session_expiry_interval => 7200000,
|
||||||
shared_subscription => true,
|
shared_subscription => true,
|
||||||
|
|
|
@ -141,18 +141,50 @@ t_undefined_headers(_) ->
|
||||||
Msg2 = emqx_message:set_header(c, 3, Msg),
|
Msg2 = emqx_message:set_header(c, 3, Msg),
|
||||||
?assertEqual(3, emqx_message:get_header(c, Msg2)).
|
?assertEqual(3, emqx_message:get_header(c, Msg2)).
|
||||||
|
|
||||||
t_is_expired(_) ->
|
t_is_expired_1(_) ->
|
||||||
|
test_msg_expired_property(?MODULE).
|
||||||
|
|
||||||
|
t_is_expired_2(_) ->
|
||||||
|
%% if the 'Message-Expiry-Interval' property is set, the message_expiry_interval should be ignored
|
||||||
|
try
|
||||||
|
emqx_config:put(
|
||||||
|
maps:from_list([{list_to_atom(Root), #{}} || Root <- emqx_zone_schema:roots()])
|
||||||
|
),
|
||||||
|
emqx_config:put_zone_conf(?MODULE, [mqtt, message_expiry_interval], timer:seconds(10)),
|
||||||
|
test_msg_expired_property(?MODULE)
|
||||||
|
after
|
||||||
|
emqx_config:erase_all()
|
||||||
|
end.
|
||||||
|
|
||||||
|
t_is_expired_3(_) ->
|
||||||
|
try
|
||||||
|
emqx_config:put(
|
||||||
|
maps:from_list([{list_to_atom(Root), #{}} || Root <- emqx_zone_schema:roots()])
|
||||||
|
),
|
||||||
|
emqx_config:put_zone_conf(?MODULE, [mqtt, message_expiry_interval], 100),
|
||||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||||
?assertNot(emqx_message:is_expired(Msg)),
|
?assertNot(emqx_message:is_expired(Msg, ?MODULE)),
|
||||||
|
timer:sleep(120),
|
||||||
|
?assert(emqx_message:is_expired(Msg, ?MODULE))
|
||||||
|
after
|
||||||
|
emqx_config:erase_all()
|
||||||
|
end.
|
||||||
|
|
||||||
|
test_msg_expired_property(Zone) ->
|
||||||
|
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||||
|
?assertNot(emqx_message:is_expired(Msg, Zone)),
|
||||||
Msg1 = emqx_message:set_headers(#{properties => #{'Message-Expiry-Interval' => 1}}, Msg),
|
Msg1 = emqx_message:set_headers(#{properties => #{'Message-Expiry-Interval' => 1}}, Msg),
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
?assertNot(emqx_message:is_expired(Msg1)),
|
?assertNot(emqx_message:is_expired(Msg1, Zone)),
|
||||||
timer:sleep(600),
|
timer:sleep(600),
|
||||||
?assert(emqx_message:is_expired(Msg1)),
|
?assert(emqx_message:is_expired(Msg1, Zone)).
|
||||||
|
|
||||||
|
t_update_expired(_) ->
|
||||||
|
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
Msg = emqx_message:update_expiry(Msg),
|
?assertEqual(Msg, emqx_message:update_expiry(Msg)),
|
||||||
Msg2 = emqx_message:update_expiry(Msg1),
|
Msg1 = emqx_message:set_headers(#{properties => #{'Message-Expiry-Interval' => 1}}, Msg),
|
||||||
Props = emqx_message:get_header(properties, Msg2),
|
Props = emqx_message:get_header(properties, emqx_message:update_expiry(Msg1)),
|
||||||
?assertEqual(1, maps:get('Message-Expiry-Interval', Props)).
|
?assertEqual(1, maps:get('Message-Expiry-Interval', Props)).
|
||||||
|
|
||||||
% t_to_list(_) ->
|
% t_to_list(_) ->
|
||||||
|
|
|
@ -545,6 +545,7 @@ clientinfo() -> clientinfo(#{}).
|
||||||
clientinfo(Init) ->
|
clientinfo(Init) ->
|
||||||
maps:merge(
|
maps:merge(
|
||||||
#{
|
#{
|
||||||
|
zone => ?MODULE,
|
||||||
clientid => <<"clientid">>,
|
clientid => <<"clientid">>,
|
||||||
username => <<"username">>
|
username => <<"username">>
|
||||||
},
|
},
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Added support for specifying the expiration time of MQTT messages via configuration file.
|
||||||
|
|
||||||
|
See the description of the `message_expiry_interval` configuration in the `mqtt.conf.example` file for more details.
|
|
@ -82,6 +82,21 @@ mqtt {
|
||||||
## Specifies how long the session will expire after the connection is disconnected, only for non-MQTT 5.0 connections
|
## Specifies how long the session will expire after the connection is disconnected, only for non-MQTT 5.0 connections
|
||||||
session_expiry_interval = 2h
|
session_expiry_interval = 2h
|
||||||
|
|
||||||
|
## The expiry interval of MQTT messages.
|
||||||
|
##
|
||||||
|
## For MQTT 5.0 clients, this configuration will only take effect when the
|
||||||
|
## Message-Expiry-Interval property is not set in the message; otherwise, the
|
||||||
|
## value of the Message-Expiry-Interval property will be used.
|
||||||
|
## For MQTT versions older than 5.0, this configuration will always take effect.
|
||||||
|
## Please note that setting message_expiry_interval greater than session_expiry_interval
|
||||||
|
## is meaningless, as all messages will be cleared when the session expires.
|
||||||
|
##
|
||||||
|
## Type:
|
||||||
|
## - infinity :: Never expire
|
||||||
|
## - Time Duration :: The expiry interval
|
||||||
|
## Default: infinity
|
||||||
|
message_expiry_interval = infinity
|
||||||
|
|
||||||
## Maximum queue length. Enqueued messages when persistent client disconnected, or inflight window is full
|
## Maximum queue length. Enqueued messages when persistent client disconnected, or inflight window is full
|
||||||
## Type: infinity | Integer
|
## Type: infinity | Integer
|
||||||
max_mqueue_len = 1000
|
max_mqueue_len = 1000
|
||||||
|
|
|
@ -191,6 +191,12 @@ mqtt_session_expiry_interval.desc:
|
||||||
mqtt_session_expiry_interval.label:
|
mqtt_session_expiry_interval.label:
|
||||||
"""Session Expiry Interval"""
|
"""Session Expiry Interval"""
|
||||||
|
|
||||||
|
mqtt_message_expiry_interval.desc:
|
||||||
|
"""The expiry interval of MQTT messages. For MQTT 5.0 clients, this configuration will only take effect when the Message-Expiry-Interval property is not set in the message; otherwise, the value of the Message-Expiry-Interval property will be used. For MQTT versions older than 5.0, this configuration will always take effect. Please note that setting message_expiry_interval greater than session_expiry_interval is meaningless, as all messages will be cleared when the session expires."""
|
||||||
|
|
||||||
|
mqtt_message_expiry_interval.label:
|
||||||
|
"""Message Expiry Interval"""
|
||||||
|
|
||||||
fields_listener_enabled.desc:
|
fields_listener_enabled.desc:
|
||||||
"""Enable listener."""
|
"""Enable listener."""
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue