diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 2530f3a42..855cd030e 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -465,7 +465,13 @@ dequeue(Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of {empty, _Q} -> {Msgs, Q}; {{value, Msg}, Q1} -> - dequeue(Cnt-1, [Msg|Msgs], Q1) + case emqx_message:is_expired(Msg) of + true -> + ok = emqx_metrics:inc('messages.expired'), + dequeue(Cnt-1, Msgs, Q1); + false -> + dequeue(Cnt-1, [Msg|Msgs], Q1) + end end. batch_n(Inflight) -> diff --git a/test/emqx_msg_expiry_interval_SUITE.erl b/test/emqx_msg_expiry_interval_SUITE.erl new file mode 100644 index 000000000..555da9dd7 --- /dev/null +++ b/test/emqx_msg_expiry_interval_SUITE.erl @@ -0,0 +1,91 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_msg_expiry_interval_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_message_expiry_interval_1(_) -> + ClientA = message_expiry_interval_init(), + [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]]. + +t_message_expiry_interval_2(_) -> + ClientA = message_expiry_interval_init(), + [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]]. + +message_expiry_interval_init() -> + {ok, ClientA} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqx_client:connect(ClientA), + {ok, _} = emqx_client:connect(ClientB), + %% subscribe and disconnect client-b + emqx_client:subscribe(ClientB, <<"t/a">>, 1), + emqx_client:stop(ClientB), + ClientA. + +message_expiry_interval_exipred(ClientA, QoS) -> + ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), + %% publish to t/a and waiting for the message expired + emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), + ct:sleep(1000), + + %% resume the session for client-b + {ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqx_client:connect(ClientB1), + + %% verify client-b could not receive the publish message + receive + {publish,#{client_pid := ClientB1, topic := <<"t/a">>}} -> + ct:fail(should_have_expired) + after 300 -> + ok + end, + emqx_client:stop(ClientB1). + +message_expiry_interval_not_exipred(ClientA, QoS) -> + ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), + %% publish to t/a + emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), + + %% wait for 1s and then resume the session for client-b, the message should not expires + %% as Message-Expiry-Interval = 20s + ct:sleep(1000), + {ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqx_client:connect(ClientB1), + + %% verify client-b could receive the publish message and the Message-Expiry-Interval is set + receive + {publish,#{client_pid := ClientB1, topic := <<"t/a">>, + properties := #{'Message-Expiry-Interval' := MsgExpItvl}}} + when MsgExpItvl < 20 -> ok; + {publish, _} = Msg -> + ct:fail({incorrect_publish, Msg}) + after 300 -> + ct:fail(no_publish_received) + end, + emqx_client:stop(ClientB1).