221 lines
6.4 KiB
Erlang
221 lines
6.4 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_retainer_mqtt_v5_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
emqx_retainer_SUITE:load_conf(),
|
|
%% Start Apps
|
|
emqx_common_test_helpers:start_apps([emqx_retainer]),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
emqx_common_test_helpers:stop_apps([emqx_retainer]).
|
|
|
|
client_info(Key, Client) ->
|
|
maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
|
|
|
|
receive_messages(Count) ->
|
|
receive_messages(Count, []).
|
|
|
|
receive_messages(0, Msgs) ->
|
|
Msgs;
|
|
receive_messages(Count, Msgs) ->
|
|
receive
|
|
{publish, Msg} ->
|
|
receive_messages(Count - 1, [Msg | Msgs]);
|
|
_Other ->
|
|
receive_messages(Count, Msgs)
|
|
after 300 ->
|
|
Msgs
|
|
end.
|
|
|
|
receive_disconnect_reasoncode() ->
|
|
receive
|
|
{disconnected, ReasonCode, _} -> ReasonCode;
|
|
_Other -> receive_disconnect_reasoncode()
|
|
after 100 ->
|
|
error("no disconnect packet")
|
|
end.
|
|
|
|
clean_retained(Topic) ->
|
|
{ok, Clean} = emqtt:start_link([{clean_start, true}]),
|
|
{ok, _} = emqtt:connect(Clean),
|
|
{ok, _} = emqtt:publish(Clean, Topic, #{}, <<"">>, [{qos, qos2}, {retain, true}]),
|
|
ok = emqtt:disconnect(Clean).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Publish
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_publish_retain_message(_) ->
|
|
Topic = <<"Topic/A">>,
|
|
|
|
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
|
{ok, _} = emqtt:connect(Client1),
|
|
{ok, _} = emqtt:publish(
|
|
Client1,
|
|
Topic,
|
|
#{},
|
|
<<"retained message">>,
|
|
[{qos, 2}, {retain, true}]
|
|
),
|
|
{ok, _} = emqtt:publish(
|
|
Client1,
|
|
Topic,
|
|
#{},
|
|
<<"new retained message">>,
|
|
[{qos, 2}, {retain, true}]
|
|
),
|
|
{ok, _} = emqtt:publish(
|
|
Client1,
|
|
Topic,
|
|
#{},
|
|
<<"not retained message">>,
|
|
[{qos, 2}, {retain, false}]
|
|
),
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2),
|
|
|
|
[Msg] = receive_messages(3),
|
|
%% [MQTT-3.3.1-5] [MQTT-3.3.1-8]
|
|
?assertEqual(<<"new retained message">>, maps:get(payload, Msg)),
|
|
|
|
{ok, _, [0]} = emqtt:unsubscribe(Client1, Topic),
|
|
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"">>, [{qos, 2}, {retain, true}]),
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2),
|
|
|
|
%% [MQTT-3.3.1-6] [MQTT-3.3.1-7]
|
|
?assertEqual(0, length(receive_messages(1))),
|
|
|
|
ok = emqtt:disconnect(Client1).
|
|
|
|
t_publish_message_expiry_interval(_) ->
|
|
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
|
{ok, _} = emqtt:connect(Client1),
|
|
{ok, _} = emqtt:publish(
|
|
Client1,
|
|
<<"topic/A">>,
|
|
#{'Message-Expiry-Interval' => 1},
|
|
<<"retained message">>,
|
|
[{qos, 1}, {retain, true}]
|
|
),
|
|
{ok, _} = emqtt:publish(
|
|
Client1,
|
|
<<"topic/B">>,
|
|
#{'Message-Expiry-Interval' => 1},
|
|
<<"retained message">>,
|
|
[{qos, 2}, {retain, true}]
|
|
),
|
|
{ok, _} = emqtt:publish(
|
|
Client1,
|
|
<<"topic/C">>,
|
|
#{'Message-Expiry-Interval' => 10},
|
|
<<"retained message">>,
|
|
[{qos, 1}, {retain, true}]
|
|
),
|
|
{ok, _} = emqtt:publish(
|
|
Client1,
|
|
<<"topic/D">>,
|
|
#{'Message-Expiry-Interval' => 10},
|
|
<<"retained message">>,
|
|
[{qos, 2}, {retain, true}]
|
|
),
|
|
timer:sleep(1500),
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, <<"topic/+">>, 2),
|
|
Msgs = receive_messages(6),
|
|
%% [MQTT-3.3.2-5]
|
|
?assertEqual(2, length(Msgs)),
|
|
|
|
L = lists:map(
|
|
fun(Msg) ->
|
|
MessageExpiryInterval = maps:get(
|
|
'Message-Expiry-Interval',
|
|
maps:get(properties, Msg)
|
|
),
|
|
MessageExpiryInterval < 10
|
|
end,
|
|
Msgs
|
|
),
|
|
%% [MQTT-3.3.2-6]
|
|
?assertEqual(2, length(L)),
|
|
|
|
ok = emqtt:disconnect(Client1),
|
|
clean_retained(<<"topic/C">>),
|
|
clean_retained(<<"topic/D">>).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Subsctibe
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_subscribe_retain_handing(_) ->
|
|
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
|
{ok, _} = emqtt:connect(Client1),
|
|
ok = emqtt:publish(
|
|
Client1,
|
|
<<"topic/A">>,
|
|
#{},
|
|
<<"retained message">>,
|
|
[{qos, 0}, {retain, true}]
|
|
),
|
|
{ok, _} = emqtt:publish(
|
|
Client1,
|
|
<<"topic/B">>,
|
|
#{},
|
|
<<"retained message">>,
|
|
[{qos, 1}, {retain, true}]
|
|
),
|
|
{ok, _} = emqtt:publish(
|
|
Client1,
|
|
<<"topic/C">>,
|
|
#{},
|
|
<<"retained message">>,
|
|
[{qos, 2}, {retain, true}]
|
|
),
|
|
|
|
timer:sleep(200),
|
|
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]),
|
|
%% [MQTT-3.3.1-10]
|
|
?assertEqual(3, length(receive_messages(3))),
|
|
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 2}, {qos, 2}]}]),
|
|
%% [MQTT-3.3.1-11]
|
|
?assertEqual(0, length(receive_messages(3))),
|
|
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 0}, {qos, 2}]}]),
|
|
%% [MQTT-3.3.1-9]
|
|
?assertEqual(3, length(receive_messages(3))),
|
|
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]),
|
|
%% [MQTT-3.3.1-10]
|
|
?assertEqual(0, length(receive_messages(3))),
|
|
|
|
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 0}, {qos, 2}]}]),
|
|
%% [MQTT-3.8.4-4]
|
|
?assertEqual(3, length(receive_messages(3))),
|
|
|
|
ok = emqtt:disconnect(Client1),
|
|
clean_retained(<<"topic/A">>),
|
|
clean_retained(<<"topic/B">>),
|
|
clean_retained(<<"topic/C">>).
|