diff --git a/Makefile b/Makefile index 0aa6a03c8..acea3f331 100644 --- a/Makefile +++ b/Makefile @@ -11,8 +11,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ - emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ - emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \ + emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_channel \ + emqx_packet emqx_channel emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \ emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping CT_NODE_NAME = emqxct@127.0.0.1 diff --git a/src/emqx_client.erl b/src/emqx_client.erl index cd83e61ad..e6dbea27b 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,6 +12,7 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- -module(emqx_client). @@ -20,7 +22,9 @@ -include("types.hrl"). -include("emqx_client.hrl"). --export([start_link/0, start_link/1]). +-export([ start_link/0 + , start_link/1 + ]). -export([ connect/1 , disconnect/1 @@ -175,7 +179,8 @@ retry_timer :: reference(), session_present :: boolean(), last_packet_id :: packet_id(), - parse_state :: emqx_frame:state()}). + parse_state :: emqx_frame:state() + }). -record(call, {id, from, req, ts}). @@ -202,9 +207,9 @@ -type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% API -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -spec(start_link() -> gen_statem:start_ret()). start_link() -> start_link([]). @@ -352,9 +357,9 @@ disconnect(Client, ReasonCode) -> disconnect(Client, ReasonCode, Properties) -> gen_statem:call(Client, {disconnect, ReasonCode, Properties}). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% For test cases -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- puback(Client, PacketId) when is_integer(PacketId) -> puback(Client, PacketId, ?RC_SUCCESS). @@ -407,9 +412,9 @@ pause(Client) -> resume(Client) -> gen_statem:call(Client, resume). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% gen_statem callbacks -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- init([Options]) -> process_flag(trap_exit, true), @@ -443,7 +448,8 @@ init([Options]) -> ack_timeout = ?DEFAULT_ACK_TIMEOUT, retry_interval = 0, connect_timeout = ?DEFAULT_CONNECT_TIMEOUT, - last_packet_id = 1}), + last_packet_id = 1 + }), {ok, initialized, init_parse_state(State)}. random_client_id() -> @@ -563,9 +569,10 @@ init_will_msg({qos, QoS}, WillMsg) -> WillMsg#mqtt_msg{qos = ?QOS_I(QoS)}. init_parse_state(State = #state{proto_ver = Ver, properties = Properties}) -> - Size = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE), - State#state{parse_state = emqx_frame:initial_state( - #{max_packet_size => Size, version => Ver})}. + MaxSize = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE), + ParseState = emqx_frame:initial_parse_state( + #{max_size => MaxSize, version => Ver}), + State#state{parse_state = ParseState}. callback_mode() -> state_functions. @@ -955,9 +962,9 @@ terminate(Reason, _StateName, State = #state{socket = Socket}) -> code_change(_Vsn, State, Data, _Extra) -> {ok, State, Data}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Internal functions -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- should_ping(Sock) -> case emqx_client_sock:getstat(Sock, [send_oct]) of @@ -1010,7 +1017,8 @@ assign_id(?NO_CLIENT_ID, Props) -> assign_id(Id, _Props) -> Id. -publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State0 = #state{auto_ack = AutoAck}) -> +publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), + State0 = #state{auto_ack = AutoAck}) -> State = deliver(packet_to_msg(Packet), State0), case AutoAck of true -> send_puback(?PUBACK_PACKET(PacketId), State); @@ -1161,7 +1169,7 @@ msg_to_packet(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = Packe properties = Props}, payload = Payload}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Socket Connect/Send sock_connect(Hosts, SockOpts, Timeout) -> @@ -1201,7 +1209,7 @@ send(Packet, State = #state{socket = Sock, proto_ver = Ver}) run_sock(State = #state{socket = Sock}) -> emqx_client_sock:setopts(Sock, [{active, once}]), State. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Process incomming process_incoming(<<>>, Packets, State) -> @@ -1209,10 +1217,10 @@ process_incoming(<<>>, Packets, State) -> process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) -> try emqx_frame:parse(Bytes, ParseState) of - {ok, Packet, Rest} -> - process_incoming(Rest, [Packet|Packets], init_parse_state(State)); - {more, NewParseState} -> - {keep_state, State#state{parse_state = NewParseState}, next_events(Packets)}; + {ok, Packet, Rest, NParseState} -> + process_incoming(Rest, [Packet|Packets], State#state{parse_state = NParseState}); + {ok, NParseState} -> + {keep_state, State#state{parse_state = NParseState}, next_events(Packets)}; {error, Reason} -> {stop, Reason} catch @@ -1227,7 +1235,7 @@ next_events([Packet]) -> next_events(Packets) -> [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)]. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% packet_id generation bump_last_packet_id(State = #state{last_packet_id = Id}) -> @@ -1236,3 +1244,4 @@ bump_last_packet_id(State = #state{last_packet_id = Id}) -> -spec next_packet_id(packet_id()) -> packet_id(). next_packet_id(?MAX_PACKET_ID) -> 1; next_packet_id(Id) -> Id + 1. + diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index bcc0a6b6b..a74e16552 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -110,7 +110,7 @@ mqtt_connect_with_tcp(_) -> Packet = raw_send_serialize(?CLIENT2), emqx_client_sock:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data), + {ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), <<>>, _} = raw_recv_pase(Data), emqx_client_sock:close(Sock). mqtt_connect_with_will_props(_) -> @@ -133,7 +133,7 @@ mqtt_connect_with_ssl_oneway(_) -> emqx_client_sock:send(Sock, Packet), ?assert( receive {ssl, _, ConAck}-> - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(ConAck), true + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_pase(ConAck), true after 1000 -> false end), @@ -152,7 +152,7 @@ mqtt_connect_with_ssl_twoway(_Config) -> timer:sleep(500), ?assert( receive {ssl, _, Data}-> - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), true + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_pase(Data), true after 1000 -> false end), @@ -167,19 +167,19 @@ mqtt_connect_with_ws(_Config) -> Packet = raw_send_serialize(?CLIENT), ok = rfc6455_client:send_binary(WS, Packet), {binary, CONACK} = rfc6455_client:recv(WS), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_pase(CONACK), %% Sub Packet SubPacket = raw_send_serialize(?SUBPACKET), rfc6455_client:send_binary(WS, SubPacket), {binary, SubAck} = rfc6455_client:recv(WS), - {ok, ?SUBACK_PACKET(?PACKETID, ?SUBCODE), _} = raw_recv_pase(SubAck), + {ok, ?SUBACK_PACKET(?PACKETID, ?SUBCODE), <<>>, _} = raw_recv_pase(SubAck), %% Pub Packet QoS 1 PubPacket = raw_send_serialize(?PUBPACKET), rfc6455_client:send_binary(WS, PubPacket), {binary, PubAck} = rfc6455_client:recv(WS), - {ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(PubAck), + {ok, ?PUBACK_PACKET(?PACKETID), <<>>, _} = raw_recv_pase(PubAck), {close, _} = rfc6455_client:close(WS), ok. @@ -189,18 +189,18 @@ packet_size(_Config) -> Packet = raw_send_serialize(?CLIENT), emqx_client_sock:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_pase(Data), %% Pub Packet QoS 1 PubPacket = raw_send_serialize(?BIG_PUBPACKET), emqx_client_sock:send(Sock, PubPacket), {ok, Data1} = gen_tcp:recv(Sock, 0), - {ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(Data1), + {ok, ?PUBACK_PACKET(?PACKETID), <<>>, _} = raw_recv_pase(Data1), emqx_client_sock:close(Sock). raw_send_serialize(Packet) -> emqx_frame:serialize(Packet). -raw_recv_pase(P) -> - emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4} }). +raw_recv_pase(Bin) -> + emqx_frame:parse(Bin). + diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl index d50c7fb5f..25a8303e6 100644 --- a/test/emqx_alarm_handler_SUITE.erl +++ b/test/emqx_alarm_handler_SUITE.erl @@ -60,7 +60,7 @@ t_alarm_handler(_) -> #{version => ?MQTT_PROTO_V5} )), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>), Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>), @@ -74,7 +74,7 @@ t_alarm_handler(_) -> #{version => ?MQTT_PROTO_V5})), {ok, Data2} = gen_tcp:recv(Sock, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2, 2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5), + {ok, ?SUBACK_PACKET(1, #{}, [2, 2]), <<>>, _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5), alarm_handler:set_alarm({alarm_for_test, #alarm{id = alarm_for_test, severity = error, @@ -83,7 +83,7 @@ t_alarm_handler(_) -> {ok, Data3} = gen_tcp:recv(Sock, 0), - {ok, ?PUBLISH_PACKET(?QOS_0, Topic1, _, _), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), + {ok, ?PUBLISH_PACKET(?QOS_0, Topic1, _, _), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), ?assertEqual(true, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())), @@ -91,7 +91,7 @@ t_alarm_handler(_) -> {ok, Data4} = gen_tcp:recv(Sock, 0), - {ok, ?PUBLISH_PACKET(?QOS_0, Topic2, _, _), _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5), + {ok, ?PUBLISH_PACKET(?QOS_0, Topic2, _, _), <<>>, _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5), ?assertEqual(false, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())) @@ -119,6 +119,7 @@ raw_send_serialize(Packet) -> raw_send_serialize(Packet, Opts) -> emqx_frame:serialize(Packet, Opts). -raw_recv_parse(P, ProtoVersion) -> - emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, - version => ProtoVersion}}). +raw_recv_parse(Bin, ProtoVer) -> + emqx_frame:parse(Bin, {none, #{max_size => ?MAX_PACKET_SIZE, + version => ProtoVer}}). + diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_channel_SUITE.erl similarity index 83% rename from test/emqx_connection_SUITE.erl rename to test/emqx_channel_SUITE.erl index 2c124fd44..7d8b216f2 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,18 +12,19 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- --module(emqx_connection_SUITE). +-module(emqx_channel_SUITE). -compile(export_all). -compile(nowarn_export_all). +-include("emqx_mqtt.hrl"). + -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --include("emqx_mqtt.hrl"). - all() -> [t_connect_api]. @@ -40,13 +42,13 @@ t_connect_api(_Config) -> {password, <<"pass1">>}]), {ok, _} = emqx_client:connect(T1), CPid = emqx_cm:lookup_conn_pid(<<"client1">>), - ConnStats = emqx_connection:stats(CPid), + ConnStats = emqx_channel:stats(CPid), ok = t_stats(ConnStats), - ConnAttrs = emqx_connection:attrs(CPid), + ConnAttrs = emqx_channel:attrs(CPid), ok = t_attrs(ConnAttrs), - ConnInfo = emqx_connection:info(CPid), + ConnInfo = emqx_channel:info(CPid), ok = t_info(ConnInfo), - SessionPid = emqx_connection:session(CPid), + SessionPid = emqx_channel:session(CPid), true = is_pid(SessionPid), emqx_client:disconnect(T1). @@ -59,7 +61,7 @@ t_info(ConnInfo) -> t_attrs(AttrsData) -> ?assertEqual(<<"client1">>, maps:get(client_id, AttrsData)), - ?assertEqual(emqx_connection, maps:get(conn_mod, AttrsData)), + ?assertEqual(emqx_channel, maps:get(conn_mod, AttrsData)), ?assertEqual(<<"testuser1">>, maps:get(username, AttrsData)). t_stats(StatsData) -> diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 9e8107665..60d298008 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -139,7 +139,7 @@ connect_v4(_) -> })), emqx_client_sock:send(Sock, ConnectPacket), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_parse(Data, ?MQTT_PROTO_V4), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V4), emqx_client_sock:send(Sock, ConnectPacket), {error, closed} = gen_tcp:recv(Sock, 0) @@ -156,7 +156,7 @@ connect_v5(_) -> properties = #{'Request-Response-Information' => -1}}))), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) + {ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) end), with_connection(fun([Sock]) -> @@ -168,7 +168,7 @@ connect_v5(_) -> properties = #{'Request-Problem-Information' => 2}}))), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) + {ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) end), with_connection(fun([Sock]) -> @@ -181,7 +181,7 @@ connect_v5(_) -> #{'Request-Response-Information' => 1}}) )), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), _} = + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), ?assertNot(maps:is_key('Response-Information', Props)) end), @@ -202,7 +202,7 @@ connect_v5(_) -> )), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, - #{'Topic-Alias-Maximum' := 20}), _} = + #{'Topic-Alias-Maximum' := 20}), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize( @@ -211,7 +211,7 @@ connect_v5(_) -> )), {ok, Data2} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5) + {ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), <<>>, _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5) end), % topic alias maximum @@ -227,7 +227,7 @@ connect_v5(_) -> )), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, - #{'Topic-Alias-Maximum' := 20}), _} = + #{'Topic-Alias-Maximum' := 20}), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, @@ -237,7 +237,7 @@ connect_v5(_) -> rc => 0}}]), #{version => ?MQTT_PROTO_V5})), {ok, Data2} = gen_tcp:recv(Sock, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5), + {ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize( @@ -247,11 +247,11 @@ connect_v5(_) -> {ok, Data3} = gen_tcp:recv(Sock, 0), - {ok, ?PUBACK_PACKET(1, 0), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), + {ok, ?PUBACK_PACKET(1, 0), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), {ok, Data4} = gen_tcp:recv(Sock, 0), - {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"hello">>), _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5), + {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"hello">>), <<>>, _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize( @@ -260,7 +260,7 @@ connect_v5(_) -> )), {ok, Data5} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), _} = raw_recv_parse(Data5, ?MQTT_PROTO_V5) + {ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), <<>>, _} = raw_recv_parse(Data5, ?MQTT_PROTO_V5) end), % test clean start @@ -276,7 +276,7 @@ connect_v5(_) -> #{'Session-Expiry-Interval' => 10}}) )), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize( ?DISCONNECT_PACKET(?RC_SUCCESS) )) @@ -296,7 +296,7 @@ connect_v5(_) -> #{'Session-Expiry-Interval' => 10}}) )), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) end), % test will message publish and cancel @@ -320,7 +320,7 @@ connect_v5(_) -> ) ), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), {ok, Sock2} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, [binary, {packet, raw}, @@ -335,7 +335,7 @@ connect_v5(_) -> rc => 0}}]), #{version => ?MQTT_PROTO_V5})), {ok, SubData} = gen_tcp:recv(Sock2, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), + {ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize( ?DISCONNECT_PACKET(?RC_SUCCESS))), @@ -367,7 +367,7 @@ connect_v5(_) -> ) ), {ok, Data3} = gen_tcp:recv(Sock3, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock3, raw_send_serialize( ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE), @@ -376,7 +376,8 @@ connect_v5(_) -> ), {ok, WillData} = gen_tcp:recv(Sock2, 0, 5000), - {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5) + {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), <<>>, _} + = raw_recv_parse(WillData, ?MQTT_PROTO_V5) end), % duplicate client id @@ -393,7 +394,7 @@ connect_v5(_) -> #{'Session-Expiry-Interval' => 10}}) )), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock1, raw_send_serialize( @@ -408,7 +409,7 @@ connect_v5(_) -> #{'Session-Expiry-Interval' => 10}}) )), {ok, Data1} = gen_tcp:recv(Sock1, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data1, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data1, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, qos => ?QOS_2, @@ -418,7 +419,7 @@ connect_v5(_) -> #{version => ?MQTT_PROTO_V5})), {ok, SubData} = gen_tcp:recv(Sock, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), + {ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock1, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, qos => ?QOS_2, @@ -428,7 +429,7 @@ connect_v5(_) -> #{version => ?MQTT_PROTO_V5})), {ok, SubData1} = gen_tcp:recv(Sock1, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData1, ?MQTT_PROTO_V5) + {ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(SubData1, ?MQTT_PROTO_V5) end, 2), ok. @@ -441,7 +442,7 @@ do_connect(Sock, ProtoVer) -> proto_ver = ProtoVer }))), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_parse(Data, ProtoVer). + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_parse(Data, ProtoVer). subscribe_v4(_) -> with_connection(fun([Sock]) -> @@ -455,7 +456,7 @@ subscribe_v4(_) -> rc => 0}}])), emqx_client_sock:send(Sock, SubPacket), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?SUBACK_PACKET(15, _), _} = raw_recv_parse(Data, ?MQTT_PROTO_V4) + {ok, ?SUBACK_PACKET(15, _), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V4) end), ok. @@ -466,7 +467,7 @@ subscribe_v5(_) -> #{version => ?MQTT_PROTO_V5}), emqx_client_sock:send(Sock, SubPacket), {ok, DisConnData} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_TOPIC_FILTER_INVALID), _} = + {ok, ?DISCONNECT_PACKET(?RC_TOPIC_FILTER_INVALID), <<>>, _} = raw_recv_parse(DisConnData, ?MQTT_PROTO_V5) end), with_connection(fun([Sock]) -> @@ -479,8 +480,9 @@ subscribe_v5(_) -> #{version => ?MQTT_PROTO_V5}), emqx_client_sock:send(Sock, SubPacket), {ok, DisConnData} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _} = - raw_recv_parse(DisConnData, ?MQTT_PROTO_V5) + ?assertMatch( + {ok, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), <<>>, _}, + raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)) end), with_connection(fun([Sock]) -> do_connect(Sock, ?MQTT_PROTO_V5), @@ -493,8 +495,9 @@ subscribe_v5(_) -> #{version => ?MQTT_PROTO_V5}), emqx_client_sock:send(Sock, SubPacket), {ok, DisConnData} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED), _} = - raw_recv_parse(DisConnData, ?MQTT_PROTO_V5) + ?assertMatch( + {ok, ?DISCONNECT_PACKET(?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED), <<>>, _}, + raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)) end), with_connection(fun([Sock]) -> do_connect(Sock, ?MQTT_PROTO_V5), @@ -507,8 +510,8 @@ subscribe_v5(_) -> #{version => ?MQTT_PROTO_V5}), emqx_client_sock:send(Sock, SubPacket), {ok, SubData} = gen_tcp:recv(Sock, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = - raw_recv_parse(SubData, ?MQTT_PROTO_V5) + {ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} + = raw_recv_parse(SubData, ?MQTT_PROTO_V5) end), ok. @@ -524,9 +527,8 @@ raw_send_serialize(Packet) -> raw_send_serialize(Packet, Opts) -> emqx_frame:serialize(Packet, Opts). -raw_recv_parse(P, ProtoVersion) -> - emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, - version => ProtoVersion}}). +raw_recv_parse(Bin, ProtoVer) -> + emqx_frame:parse(Bin, emqx_frame:initial_parse_state(#{version => ProtoVer})). acl_deny_action_ct(_) -> emqx_zone:set_env(external, acl_deny_action, disconnect), diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_channel_SUITE.erl similarity index 68% rename from test/emqx_ws_connection_SUITE.erl rename to test/emqx_ws_channel_SUITE.erl index c086ef6b7..4edbc3ab5 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_channel_SUITE.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,29 +12,16 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- --module(emqx_ws_connection_SUITE). +-module(emqx_ws_channel_SUITE). -compile(export_all). -compile(nowarn_export_all). --include_lib("eunit/include/eunit.hrl"). - --include_lib("common_test/include/ct.hrl"). - -include("emqx_mqtt.hrl"). - - --define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{ - client_id = <<"mqtt_client">>, - username = <<"admin">>, - password = <<"public">>})). - --define(SUBCODE, [0]). - --define(PACKETID, 1). - --define(PUBQOS, 1). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). all() -> [t_ws_connect_api]. @@ -48,29 +36,34 @@ end_per_suite(_Config) -> t_ws_connect_api(_Config) -> WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), {ok, _} = rfc6455_client:open(WS), - Packet = raw_send_serialize(?CLIENT), - ok = rfc6455_client:send_binary(WS, Packet), - {binary, CONACK} = rfc6455_client:recv(WS), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK), + Connect = ?CONNECT_PACKET( + #mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + username = <<"admin">>, + password = <<"public">> + }), + ok = rfc6455_client:send_binary(WS, raw_send_serialize(Connect)), + {binary, Bin} = rfc6455_client:recv(WS), + Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT), + {ok, Connack, <<>>, _} = raw_recv_pase(Bin), Pid = emqx_cm:lookup_conn_pid(<<"mqtt_client">>), - ConnInfo = emqx_ws_connection:info(Pid), + ConnInfo = emqx_ws_channel:info(Pid), ok = t_info(ConnInfo), - ConnAttrs = emqx_ws_connection:attrs(Pid), + ConnAttrs = emqx_ws_channel:attrs(Pid), ok = t_attrs(ConnAttrs), - ConnStats = emqx_ws_connection:stats(Pid), + ConnStats = emqx_ws_channel:stats(Pid), ok = t_stats(ConnStats), - SessionPid = emqx_ws_connection:session(Pid), + SessionPid = emqx_ws_channel:session(Pid), true = is_pid(SessionPid), - ok = emqx_ws_connection:kick(Pid), + ok = emqx_ws_channel:kick(Pid), {close, _} = rfc6455_client:close(WS), ok. raw_send_serialize(Packet) -> emqx_frame:serialize(Packet). -raw_recv_pase(P) -> - emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4} }). +raw_recv_pase(Packet) -> + emqx_frame:parse(Packet). t_info(InfoData) -> ?assertEqual(websocket, maps:get(socktype, InfoData)), @@ -81,7 +74,7 @@ t_info(InfoData) -> t_attrs(AttrsData) -> ?assertEqual(<<"mqtt_client">>, maps:get(client_id, AttrsData)), - ?assertEqual(emqx_ws_connection, maps:get(conn_mod, AttrsData)), + ?assertEqual(emqx_ws_channel, maps:get(conn_mod, AttrsData)), ?assertEqual(<<"admin">>, maps:get(username, AttrsData)). t_stats(StatsData) -> @@ -92,3 +85,4 @@ t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(recv_pkt, StatsData) =:=1), ?assertEqual(true, proplists:get_value(recv_msg, StatsData) >=0), ?assertEqual(true, proplists:get_value(send_pkt, StatsData) =:=1). +