Fix test cases
This commit is contained in:
parent
fc553b8cee
commit
1667cbd359
|
@ -542,9 +542,6 @@ handle_out({deliver, Delivers}, Channel = #channel{session = Session}) ->
|
||||||
{ok, Channel#channel{session = NSession}}
|
{ok, Channel#channel{session = NSession}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_out({publish, [Publish]}, Channel) ->
|
|
||||||
handle_out(Publish, Channel);
|
|
||||||
|
|
||||||
handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
|
handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
|
||||||
Packets = lists:foldl(
|
Packets = lists:foldl(
|
||||||
fun(Publish, Acc) ->
|
fun(Publish, Acc) ->
|
||||||
|
|
|
@ -110,6 +110,10 @@ check(#mqtt_packet{variable = SubPkt}) when is_record(SubPkt, mqtt_packet_subscr
|
||||||
check(#mqtt_packet{variable = UnsubPkt}) when is_record(UnsubPkt, mqtt_packet_unsubscribe) ->
|
check(#mqtt_packet{variable = UnsubPkt}) when is_record(UnsubPkt, mqtt_packet_unsubscribe) ->
|
||||||
check(UnsubPkt);
|
check(UnsubPkt);
|
||||||
|
|
||||||
|
check(#mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias':= _TopicAlias}}) ->
|
||||||
|
ok;
|
||||||
|
check(#mqtt_packet_publish{topic_name = <<>>, properties = #{}}) ->
|
||||||
|
{error, ?RC_PROTOCOL_ERROR};
|
||||||
check(#mqtt_packet_publish{topic_name = TopicName, properties = Props}) ->
|
check(#mqtt_packet_publish{topic_name = TopicName, properties = Props}) ->
|
||||||
try emqx_topic:validate(name, TopicName) of
|
try emqx_topic:validate(name, TopicName) of
|
||||||
true -> check_pub_props(Props)
|
true -> check_pub_props(Props)
|
||||||
|
|
|
@ -39,24 +39,24 @@ t_message_expiry_interval_2(_) ->
|
||||||
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]].
|
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]].
|
||||||
|
|
||||||
message_expiry_interval_init() ->
|
message_expiry_interval_init() ->
|
||||||
{ok, ClientA} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
{ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
{ok, ClientB} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
{ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
{ok, _} = emqx_client:connect(ClientA),
|
{ok, _} = emqtt:connect(ClientA),
|
||||||
{ok, _} = emqx_client:connect(ClientB),
|
{ok, _} = emqtt:connect(ClientB),
|
||||||
%% subscribe and disconnect client-b
|
%% subscribe and disconnect client-b
|
||||||
emqx_client:subscribe(ClientB, <<"t/a">>, 1),
|
emqtt:subscribe(ClientB, <<"t/a">>, 1),
|
||||||
emqx_client:stop(ClientB),
|
emqtt:stop(ClientB),
|
||||||
ClientA.
|
ClientA.
|
||||||
|
|
||||||
message_expiry_interval_exipred(ClientA, QoS) ->
|
message_expiry_interval_exipred(ClientA, QoS) ->
|
||||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||||
%% publish to t/a and waiting for the message expired
|
%% publish to t/a and waiting for the message expired
|
||||||
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||||
ct:sleep(1000),
|
ct:sleep(1000),
|
||||||
|
|
||||||
%% resume the session for client-b
|
%% resume the session for client-b
|
||||||
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
{ok, _} = emqx_client:connect(ClientB1),
|
{ok, _} = emqtt:connect(ClientB1),
|
||||||
|
|
||||||
%% verify client-b could not receive the publish message
|
%% verify client-b could not receive the publish message
|
||||||
receive
|
receive
|
||||||
|
@ -65,18 +65,18 @@ message_expiry_interval_exipred(ClientA, QoS) ->
|
||||||
after 300 ->
|
after 300 ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
emqx_client:stop(ClientB1).
|
emqtt:stop(ClientB1).
|
||||||
|
|
||||||
message_expiry_interval_not_exipred(ClientA, QoS) ->
|
message_expiry_interval_not_exipred(ClientA, QoS) ->
|
||||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||||
%% publish to t/a
|
%% publish to t/a
|
||||||
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||||
|
|
||||||
%% wait for 1s and then resume the session for client-b, the message should not expires
|
%% wait for 1s and then resume the session for client-b, the message should not expires
|
||||||
%% as Message-Expiry-Interval = 20s
|
%% as Message-Expiry-Interval = 20s
|
||||||
ct:sleep(1000),
|
ct:sleep(1000),
|
||||||
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
{ok, _} = emqx_client:connect(ClientB1),
|
{ok, _} = emqtt:connect(ClientB1),
|
||||||
|
|
||||||
%% verify client-b could receive the publish message and the Message-Expiry-Interval is set
|
%% verify client-b could receive the publish message and the Message-Expiry-Interval is set
|
||||||
receive
|
receive
|
||||||
|
@ -88,4 +88,4 @@ message_expiry_interval_not_exipred(ClientA, QoS) ->
|
||||||
after 300 ->
|
after 300 ->
|
||||||
ct:fail(no_publish_received)
|
ct:fail(no_publish_received)
|
||||||
end,
|
end,
|
||||||
emqx_client:stop(ClientB1).
|
emqtt:stop(ClientB1).
|
||||||
|
|
Loading…
Reference in New Issue