diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 1e780eb08..224965fde 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -542,9 +542,6 @@ handle_out({deliver, Delivers}, Channel = #channel{session = Session}) -> {ok, Channel#channel{session = NSession}} end; -handle_out({publish, [Publish]}, Channel) -> - handle_out(Publish, Channel); - handle_out({publish, Publishes}, Channel) when is_list(Publishes) -> Packets = lists:foldl( fun(Publish, Acc) -> diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index b06f526af..1113c15d7 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -110,6 +110,10 @@ check(#mqtt_packet{variable = SubPkt}) when is_record(SubPkt, mqtt_packet_subscr check(#mqtt_packet{variable = UnsubPkt}) when is_record(UnsubPkt, mqtt_packet_unsubscribe) -> check(UnsubPkt); +check(#mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias':= _TopicAlias}}) -> + ok; +check(#mqtt_packet_publish{topic_name = <<>>, properties = #{}}) -> + {error, ?RC_PROTOCOL_ERROR}; check(#mqtt_packet_publish{topic_name = TopicName, properties = Props}) -> try emqx_topic:validate(name, TopicName) of true -> check_pub_props(Props) diff --git a/test/emqx_msg_expiry_interval_SUITE.erl b/test/emqx_msg_expiry_interval_SUITE.erl index 555da9dd7..f3170b718 100644 --- a/test/emqx_msg_expiry_interval_SUITE.erl +++ b/test/emqx_msg_expiry_interval_SUITE.erl @@ -39,24 +39,24 @@ t_message_expiry_interval_2(_) -> [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]]. message_expiry_interval_init() -> - {ok, ClientA} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, ClientB} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqx_client:connect(ClientA), - {ok, _} = emqx_client:connect(ClientB), + {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientA), + {ok, _} = emqtt:connect(ClientB), %% subscribe and disconnect client-b - emqx_client:subscribe(ClientB, <<"t/a">>, 1), - emqx_client:stop(ClientB), + emqtt:subscribe(ClientB, <<"t/a">>, 1), + emqtt:stop(ClientB), ClientA. message_expiry_interval_exipred(ClientA, QoS) -> ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), %% publish to t/a and waiting for the message expired - emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), + emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), ct:sleep(1000), %% resume the session for client-b - {ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqx_client:connect(ClientB1), + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientB1), %% verify client-b could not receive the publish message receive @@ -65,18 +65,18 @@ message_expiry_interval_exipred(ClientA, QoS) -> after 300 -> ok end, - emqx_client:stop(ClientB1). + emqtt:stop(ClientB1). message_expiry_interval_not_exipred(ClientA, QoS) -> ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), %% publish to t/a - emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), + emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), %% wait for 1s and then resume the session for client-b, the message should not expires %% as Message-Expiry-Interval = 20s ct:sleep(1000), - {ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqx_client:connect(ClientB1), + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientB1), %% verify client-b could receive the publish message and the Message-Expiry-Interval is set receive @@ -88,4 +88,4 @@ message_expiry_interval_not_exipred(ClientA, QoS) -> after 300 -> ct:fail(no_publish_received) end, - emqx_client:stop(ClientB1). + emqtt:stop(ClientB1).