diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index dc9aa6c3e..1c14fec43 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -84,6 +84,10 @@ , get_connected_client_count/0 ]). +-ifdef(TEST). +-export([request_stepdown/3]). +-endif. + -type(chan_pid() :: pid()). %% Tables for channel management. diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index c636e03f8..e7dc586e1 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -75,7 +75,8 @@ groups() -> %% t_keepalive, %% t_redelivery_on_reconnect, %% subscribe_failure_test, - t_dollar_topics + t_dollar_topics, + t_sub_non_utf8_topic ]}, {mqttv5, [non_parallel_tests], [t_basic_with_props_v5 @@ -275,6 +276,40 @@ t_dollar_topics(_) -> ok = emqtt:disconnect(C), ct:pal("$ topics test succeeded"). +t_sub_non_utf8_topic(_) -> + {ok, Socket} = gen_tcp:connect({127,0,0,1}, 1883, [{active, true}, binary]), + ConnPacket = emqx_frame:serialize(#mqtt_packet{ + header = #mqtt_packet_header{type = 1}, + variable = #mqtt_packet_connect{ + clientid = <<"abcdefg">> + } + }), + %% gen_tcp:send(Socket, <<16,19,0,4,77,81,84,84,4,2,0,0,0,7,97,98,99,100,101,102,103>>). + ok = gen_tcp:send(Socket, ConnPacket), + receive + {tcp, _, _ConnAck = <<32,2,0,0>>} -> ok + after + 3000 -> ct:fail({connect_ack_not_recv, process_info(self(), messages)}) + end, + %% gen_tcp:send(Socket, <<130,18,25,178,0,13,128,10,10,12,178,159,162,47,115,1,1,1,1,1>>). + SubHeader = <<130,18,25,178>>, + SubTopicLen = <<0,13>>, + %% this is not a valid utf8 topic + SubTopic = <<128,10,10,12,178,159,162,47,115,1,1,1,1>>, + SubQoS = <<1>>, + SubPacket = <>, + ok = gen_tcp:send(Socket, SubPacket), + receive + {tcp_closed, _} -> ok + after + 3000 -> ct:fail({should_get_disconnected, process_info(self(), messages)}) + end, + timer:sleep(1000), + ListenerCounts = esockd:get_shutdown_count({'mqtt:tcp',{{0,0,0,0},1883}}), + TopicInvalidCount = proplists:get_value(topic_filter_invalid, ListenerCounts), + ?assert(is_integer(TopicInvalidCount) andalso TopicInvalidCount > 0), + ok. + %%-------------------------------------------------------------------- %% Test cases for MQTT v5 %%-------------------------------------------------------------------- diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index 1e075efe1..122ca81f5 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -314,16 +314,19 @@ flush_emqx_pool() -> t_discard_session_race(_) -> ClientId = rand_client_id(), + ConnMod = emqx_ws_connection, ?check_trace( begin #{conninfo := ConnInfo0} = ?ChanInfo, - ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection}, + ConnInfo = ConnInfo0#{conn_mod := ConnMod}, {Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end), ok = emqx_cm:register_channel(ClientId, Pid, ConnInfo), Pid ! stop, receive {'DOWN', Ref, process, Pid, normal} -> ok end, - ok = emqx_cm:discard_session(ClientId), - {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000) + %% Here we simulate the situation where we are going to emqx_cm:discard_session/1 + %% but the session has died. + emqx_cm:request_stepdown(discard, ConnMod, Pid), + {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", stale_pid := Pid}, 2000) end, fun(_, _) -> true