Fix Message-Expiry-Interval not working
This commit is contained in:
parent
b2ddcb26e2
commit
fa1adf5cfb
|
@ -465,7 +465,13 @@ dequeue(Cnt, Msgs, Q) ->
|
|||
case emqx_mqueue:out(Q) of
|
||||
{empty, _Q} -> {Msgs, Q};
|
||||
{{value, Msg}, Q1} ->
|
||||
case emqx_message:is_expired(Msg) of
|
||||
true ->
|
||||
ok = emqx_metrics:inc('messages.expired'),
|
||||
dequeue(Cnt-1, Msgs, Q1);
|
||||
false ->
|
||||
dequeue(Cnt-1, [Msg|Msgs], Q1)
|
||||
end
|
||||
end.
|
||||
|
||||
batch_n(Inflight) ->
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_msg_expiry_interval_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
t_message_expiry_interval_1(_) ->
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]].
|
||||
|
||||
t_message_expiry_interval_2(_) ->
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]].
|
||||
|
||||
message_expiry_interval_init() ->
|
||||
{ok, ClientA} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, ClientB} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqx_client:connect(ClientA),
|
||||
{ok, _} = emqx_client:connect(ClientB),
|
||||
%% subscribe and disconnect client-b
|
||||
emqx_client:subscribe(ClientB, <<"t/a">>, 1),
|
||||
emqx_client:stop(ClientB),
|
||||
ClientA.
|
||||
|
||||
message_expiry_interval_exipred(ClientA, QoS) ->
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a and waiting for the message expired
|
||||
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
ct:sleep(1000),
|
||||
|
||||
%% resume the session for client-b
|
||||
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqx_client:connect(ClientB1),
|
||||
|
||||
%% verify client-b could not receive the publish message
|
||||
receive
|
||||
{publish,#{client_pid := ClientB1, topic := <<"t/a">>}} ->
|
||||
ct:fail(should_have_expired)
|
||||
after 300 ->
|
||||
ok
|
||||
end,
|
||||
emqx_client:stop(ClientB1).
|
||||
|
||||
message_expiry_interval_not_exipred(ClientA, QoS) ->
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a
|
||||
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
|
||||
%% wait for 1s and then resume the session for client-b, the message should not expires
|
||||
%% as Message-Expiry-Interval = 20s
|
||||
ct:sleep(1000),
|
||||
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqx_client:connect(ClientB1),
|
||||
|
||||
%% verify client-b could receive the publish message and the Message-Expiry-Interval is set
|
||||
receive
|
||||
{publish,#{client_pid := ClientB1, topic := <<"t/a">>,
|
||||
properties := #{'Message-Expiry-Interval' := MsgExpItvl}}}
|
||||
when MsgExpItvl < 20 -> ok;
|
||||
{publish, _} = Msg ->
|
||||
ct:fail({incorrect_publish, Msg})
|
||||
after 300 ->
|
||||
ct:fail(no_publish_received)
|
||||
end,
|
||||
emqx_client:stop(ClientB1).
|
Loading…
Reference in New Issue