Update the test cases for emqx_channel, emqx_protocol
- Improve emqx_client to Use the new emqx_frame:parse/2 - Update the ct suites for emqx_channel, emqx_ws_channel
This commit is contained in:
parent
d386b27e8a
commit
de978d4771
4
Makefile
4
Makefile
|
@ -11,8 +11,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
|
|||
emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
|
||||
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
|
||||
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
|
||||
emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
|
||||
emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \
|
||||
emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_channel \
|
||||
emqx_packet emqx_channel emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \
|
||||
emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping
|
||||
|
||||
CT_NODE_NAME = emqxct@127.0.0.1
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
|
@ -11,6 +12,7 @@
|
|||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_client).
|
||||
|
||||
|
@ -20,7 +22,9 @@
|
|||
-include("types.hrl").
|
||||
-include("emqx_client.hrl").
|
||||
|
||||
-export([start_link/0, start_link/1]).
|
||||
-export([ start_link/0
|
||||
, start_link/1
|
||||
]).
|
||||
|
||||
-export([ connect/1
|
||||
, disconnect/1
|
||||
|
@ -175,7 +179,8 @@
|
|||
retry_timer :: reference(),
|
||||
session_present :: boolean(),
|
||||
last_packet_id :: packet_id(),
|
||||
parse_state :: emqx_frame:state()}).
|
||||
parse_state :: emqx_frame:state()
|
||||
}).
|
||||
|
||||
-record(call, {id, from, req, ts}).
|
||||
|
||||
|
@ -202,9 +207,9 @@
|
|||
|
||||
-type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(start_link() -> gen_statem:start_ret()).
|
||||
start_link() -> start_link([]).
|
||||
|
@ -352,9 +357,9 @@ disconnect(Client, ReasonCode) ->
|
|||
disconnect(Client, ReasonCode, Properties) ->
|
||||
gen_statem:call(Client, {disconnect, ReasonCode, Properties}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% For test cases
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
puback(Client, PacketId) when is_integer(PacketId) ->
|
||||
puback(Client, PacketId, ?RC_SUCCESS).
|
||||
|
@ -407,9 +412,9 @@ pause(Client) ->
|
|||
resume(Client) ->
|
||||
gen_statem:call(Client, resume).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_statem callbacks
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([Options]) ->
|
||||
process_flag(trap_exit, true),
|
||||
|
@ -443,7 +448,8 @@ init([Options]) ->
|
|||
ack_timeout = ?DEFAULT_ACK_TIMEOUT,
|
||||
retry_interval = 0,
|
||||
connect_timeout = ?DEFAULT_CONNECT_TIMEOUT,
|
||||
last_packet_id = 1}),
|
||||
last_packet_id = 1
|
||||
}),
|
||||
{ok, initialized, init_parse_state(State)}.
|
||||
|
||||
random_client_id() ->
|
||||
|
@ -563,9 +569,10 @@ init_will_msg({qos, QoS}, WillMsg) ->
|
|||
WillMsg#mqtt_msg{qos = ?QOS_I(QoS)}.
|
||||
|
||||
init_parse_state(State = #state{proto_ver = Ver, properties = Properties}) ->
|
||||
Size = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE),
|
||||
State#state{parse_state = emqx_frame:initial_state(
|
||||
#{max_packet_size => Size, version => Ver})}.
|
||||
MaxSize = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE),
|
||||
ParseState = emqx_frame:initial_parse_state(
|
||||
#{max_size => MaxSize, version => Ver}),
|
||||
State#state{parse_state = ParseState}.
|
||||
|
||||
callback_mode() -> state_functions.
|
||||
|
||||
|
@ -955,9 +962,9 @@ terminate(Reason, _StateName, State = #state{socket = Socket}) ->
|
|||
code_change(_Vsn, State, Data, _Extra) ->
|
||||
{ok, State, Data}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
should_ping(Sock) ->
|
||||
case emqx_client_sock:getstat(Sock, [send_oct]) of
|
||||
|
@ -1010,7 +1017,8 @@ assign_id(?NO_CLIENT_ID, Props) ->
|
|||
assign_id(Id, _Props) ->
|
||||
Id.
|
||||
|
||||
publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State0 = #state{auto_ack = AutoAck}) ->
|
||||
publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId),
|
||||
State0 = #state{auto_ack = AutoAck}) ->
|
||||
State = deliver(packet_to_msg(Packet), State0),
|
||||
case AutoAck of
|
||||
true -> send_puback(?PUBACK_PACKET(PacketId), State);
|
||||
|
@ -1161,7 +1169,7 @@ msg_to_packet(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = Packe
|
|||
properties = Props},
|
||||
payload = Payload}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% Socket Connect/Send
|
||||
|
||||
sock_connect(Hosts, SockOpts, Timeout) ->
|
||||
|
@ -1201,7 +1209,7 @@ send(Packet, State = #state{socket = Sock, proto_ver = Ver})
|
|||
run_sock(State = #state{socket = Sock}) ->
|
||||
emqx_client_sock:setopts(Sock, [{active, once}]), State.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% Process incomming
|
||||
|
||||
process_incoming(<<>>, Packets, State) ->
|
||||
|
@ -1209,10 +1217,10 @@ process_incoming(<<>>, Packets, State) ->
|
|||
|
||||
process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) ->
|
||||
try emqx_frame:parse(Bytes, ParseState) of
|
||||
{ok, Packet, Rest} ->
|
||||
process_incoming(Rest, [Packet|Packets], init_parse_state(State));
|
||||
{more, NewParseState} ->
|
||||
{keep_state, State#state{parse_state = NewParseState}, next_events(Packets)};
|
||||
{ok, Packet, Rest, NParseState} ->
|
||||
process_incoming(Rest, [Packet|Packets], State#state{parse_state = NParseState});
|
||||
{ok, NParseState} ->
|
||||
{keep_state, State#state{parse_state = NParseState}, next_events(Packets)};
|
||||
{error, Reason} ->
|
||||
{stop, Reason}
|
||||
catch
|
||||
|
@ -1227,7 +1235,7 @@ next_events([Packet]) ->
|
|||
next_events(Packets) ->
|
||||
[{next_event, cast, Packet} || Packet <- lists:reverse(Packets)].
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% packet_id generation
|
||||
|
||||
bump_last_packet_id(State = #state{last_packet_id = Id}) ->
|
||||
|
@ -1236,3 +1244,4 @@ bump_last_packet_id(State = #state{last_packet_id = Id}) ->
|
|||
-spec next_packet_id(packet_id()) -> packet_id().
|
||||
next_packet_id(?MAX_PACKET_ID) -> 1;
|
||||
next_packet_id(Id) -> Id + 1.
|
||||
|
||||
|
|
|
@ -110,7 +110,7 @@ mqtt_connect_with_tcp(_) ->
|
|||
Packet = raw_send_serialize(?CLIENT2),
|
||||
emqx_client_sock:send(Sock, Packet),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data),
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), <<>>, _} = raw_recv_pase(Data),
|
||||
emqx_client_sock:close(Sock).
|
||||
|
||||
mqtt_connect_with_will_props(_) ->
|
||||
|
@ -133,7 +133,7 @@ mqtt_connect_with_ssl_oneway(_) ->
|
|||
emqx_client_sock:send(Sock, Packet),
|
||||
?assert(
|
||||
receive {ssl, _, ConAck}->
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(ConAck), true
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_pase(ConAck), true
|
||||
after 1000 ->
|
||||
false
|
||||
end),
|
||||
|
@ -152,7 +152,7 @@ mqtt_connect_with_ssl_twoway(_Config) ->
|
|||
timer:sleep(500),
|
||||
?assert(
|
||||
receive {ssl, _, Data}->
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), true
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_pase(Data), true
|
||||
after 1000 ->
|
||||
false
|
||||
end),
|
||||
|
@ -167,19 +167,19 @@ mqtt_connect_with_ws(_Config) ->
|
|||
Packet = raw_send_serialize(?CLIENT),
|
||||
ok = rfc6455_client:send_binary(WS, Packet),
|
||||
{binary, CONACK} = rfc6455_client:recv(WS),
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK),
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_pase(CONACK),
|
||||
|
||||
%% Sub Packet
|
||||
SubPacket = raw_send_serialize(?SUBPACKET),
|
||||
rfc6455_client:send_binary(WS, SubPacket),
|
||||
{binary, SubAck} = rfc6455_client:recv(WS),
|
||||
{ok, ?SUBACK_PACKET(?PACKETID, ?SUBCODE), _} = raw_recv_pase(SubAck),
|
||||
{ok, ?SUBACK_PACKET(?PACKETID, ?SUBCODE), <<>>, _} = raw_recv_pase(SubAck),
|
||||
|
||||
%% Pub Packet QoS 1
|
||||
PubPacket = raw_send_serialize(?PUBPACKET),
|
||||
rfc6455_client:send_binary(WS, PubPacket),
|
||||
{binary, PubAck} = rfc6455_client:recv(WS),
|
||||
{ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(PubAck),
|
||||
{ok, ?PUBACK_PACKET(?PACKETID), <<>>, _} = raw_recv_pase(PubAck),
|
||||
{close, _} = rfc6455_client:close(WS),
|
||||
ok.
|
||||
|
||||
|
@ -189,18 +189,18 @@ packet_size(_Config) ->
|
|||
Packet = raw_send_serialize(?CLIENT),
|
||||
emqx_client_sock:send(Sock, Packet),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data),
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_pase(Data),
|
||||
|
||||
%% Pub Packet QoS 1
|
||||
PubPacket = raw_send_serialize(?BIG_PUBPACKET),
|
||||
emqx_client_sock:send(Sock, PubPacket),
|
||||
{ok, Data1} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(Data1),
|
||||
{ok, ?PUBACK_PACKET(?PACKETID), <<>>, _} = raw_recv_pase(Data1),
|
||||
emqx_client_sock:close(Sock).
|
||||
|
||||
raw_send_serialize(Packet) ->
|
||||
emqx_frame:serialize(Packet).
|
||||
|
||||
raw_recv_pase(P) ->
|
||||
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
|
||||
version => ?MQTT_PROTO_V4} }).
|
||||
raw_recv_pase(Bin) ->
|
||||
emqx_frame:parse(Bin).
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ t_alarm_handler(_) ->
|
|||
#{version => ?MQTT_PROTO_V5}
|
||||
)),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
|
||||
Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>),
|
||||
Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>),
|
||||
|
@ -74,7 +74,7 @@ t_alarm_handler(_) ->
|
|||
#{version => ?MQTT_PROTO_V5})),
|
||||
|
||||
{ok, Data2} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?SUBACK_PACKET(1, #{}, [2, 2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5),
|
||||
{ok, ?SUBACK_PACKET(1, #{}, [2, 2]), <<>>, _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5),
|
||||
|
||||
alarm_handler:set_alarm({alarm_for_test, #alarm{id = alarm_for_test,
|
||||
severity = error,
|
||||
|
@ -83,7 +83,7 @@ t_alarm_handler(_) ->
|
|||
|
||||
{ok, Data3} = gen_tcp:recv(Sock, 0),
|
||||
|
||||
{ok, ?PUBLISH_PACKET(?QOS_0, Topic1, _, _), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
|
||||
{ok, ?PUBLISH_PACKET(?QOS_0, Topic1, _, _), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
|
||||
|
||||
?assertEqual(true, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())),
|
||||
|
||||
|
@ -91,7 +91,7 @@ t_alarm_handler(_) ->
|
|||
|
||||
{ok, Data4} = gen_tcp:recv(Sock, 0),
|
||||
|
||||
{ok, ?PUBLISH_PACKET(?QOS_0, Topic2, _, _), _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5),
|
||||
{ok, ?PUBLISH_PACKET(?QOS_0, Topic2, _, _), <<>>, _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5),
|
||||
|
||||
?assertEqual(false, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms()))
|
||||
|
||||
|
@ -119,6 +119,7 @@ raw_send_serialize(Packet) ->
|
|||
raw_send_serialize(Packet, Opts) ->
|
||||
emqx_frame:serialize(Packet, Opts).
|
||||
|
||||
raw_recv_parse(P, ProtoVersion) ->
|
||||
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
|
||||
version => ProtoVersion}}).
|
||||
raw_recv_parse(Bin, ProtoVer) ->
|
||||
emqx_frame:parse(Bin, {none, #{max_size => ?MAX_PACKET_SIZE,
|
||||
version => ProtoVer}}).
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
|
@ -11,18 +12,19 @@
|
|||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_connection_SUITE).
|
||||
-module(emqx_channel_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
all() ->
|
||||
[t_connect_api].
|
||||
|
||||
|
@ -40,13 +42,13 @@ t_connect_api(_Config) ->
|
|||
{password, <<"pass1">>}]),
|
||||
{ok, _} = emqx_client:connect(T1),
|
||||
CPid = emqx_cm:lookup_conn_pid(<<"client1">>),
|
||||
ConnStats = emqx_connection:stats(CPid),
|
||||
ConnStats = emqx_channel:stats(CPid),
|
||||
ok = t_stats(ConnStats),
|
||||
ConnAttrs = emqx_connection:attrs(CPid),
|
||||
ConnAttrs = emqx_channel:attrs(CPid),
|
||||
ok = t_attrs(ConnAttrs),
|
||||
ConnInfo = emqx_connection:info(CPid),
|
||||
ConnInfo = emqx_channel:info(CPid),
|
||||
ok = t_info(ConnInfo),
|
||||
SessionPid = emqx_connection:session(CPid),
|
||||
SessionPid = emqx_channel:session(CPid),
|
||||
true = is_pid(SessionPid),
|
||||
emqx_client:disconnect(T1).
|
||||
|
||||
|
@ -59,7 +61,7 @@ t_info(ConnInfo) ->
|
|||
|
||||
t_attrs(AttrsData) ->
|
||||
?assertEqual(<<"client1">>, maps:get(client_id, AttrsData)),
|
||||
?assertEqual(emqx_connection, maps:get(conn_mod, AttrsData)),
|
||||
?assertEqual(emqx_channel, maps:get(conn_mod, AttrsData)),
|
||||
?assertEqual(<<"testuser1">>, maps:get(username, AttrsData)).
|
||||
|
||||
t_stats(StatsData) ->
|
|
@ -139,7 +139,7 @@ connect_v4(_) ->
|
|||
})),
|
||||
emqx_client_sock:send(Sock, ConnectPacket),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_parse(Data, ?MQTT_PROTO_V4),
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V4),
|
||||
|
||||
emqx_client_sock:send(Sock, ConnectPacket),
|
||||
{error, closed} = gen_tcp:recv(Sock, 0)
|
||||
|
@ -156,7 +156,7 @@ connect_v5(_) ->
|
|||
properties =
|
||||
#{'Request-Response-Information' => -1}}))),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
|
||||
{ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
|
||||
end),
|
||||
|
||||
with_connection(fun([Sock]) ->
|
||||
|
@ -168,7 +168,7 @@ connect_v5(_) ->
|
|||
properties =
|
||||
#{'Request-Problem-Information' => 2}}))),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
|
||||
{ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
|
||||
end),
|
||||
|
||||
with_connection(fun([Sock]) ->
|
||||
|
@ -181,7 +181,7 @@ connect_v5(_) ->
|
|||
#{'Request-Response-Information' => 1}})
|
||||
)),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), _} =
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), <<>>, _} =
|
||||
raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
?assertNot(maps:is_key('Response-Information', Props))
|
||||
end),
|
||||
|
@ -202,7 +202,7 @@ connect_v5(_) ->
|
|||
)),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0,
|
||||
#{'Topic-Alias-Maximum' := 20}), _} =
|
||||
#{'Topic-Alias-Maximum' := 20}), <<>>, _} =
|
||||
raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
emqx_client_sock:send(Sock,
|
||||
raw_send_serialize(
|
||||
|
@ -211,7 +211,7 @@ connect_v5(_) ->
|
|||
)),
|
||||
|
||||
{ok, Data2} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5)
|
||||
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), <<>>, _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5)
|
||||
end),
|
||||
|
||||
% topic alias maximum
|
||||
|
@ -227,7 +227,7 @@ connect_v5(_) ->
|
|||
)),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0,
|
||||
#{'Topic-Alias-Maximum' := 20}), _} =
|
||||
#{'Topic-Alias-Maximum' := 20}), <<>>, _} =
|
||||
raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
|
||||
emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
|
||||
|
@ -237,7 +237,7 @@ connect_v5(_) ->
|
|||
rc => 0}}]), #{version => ?MQTT_PROTO_V5})),
|
||||
|
||||
{ok, Data2} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5),
|
||||
{ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5),
|
||||
|
||||
emqx_client_sock:send(Sock,
|
||||
raw_send_serialize(
|
||||
|
@ -247,11 +247,11 @@ connect_v5(_) ->
|
|||
|
||||
{ok, Data3} = gen_tcp:recv(Sock, 0),
|
||||
|
||||
{ok, ?PUBACK_PACKET(1, 0), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
|
||||
{ok, ?PUBACK_PACKET(1, 0), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
|
||||
|
||||
{ok, Data4} = gen_tcp:recv(Sock, 0),
|
||||
|
||||
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"hello">>), _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5),
|
||||
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"hello">>), <<>>, _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5),
|
||||
|
||||
emqx_client_sock:send(Sock,
|
||||
raw_send_serialize(
|
||||
|
@ -260,7 +260,7 @@ connect_v5(_) ->
|
|||
)),
|
||||
|
||||
{ok, Data5} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), _} = raw_recv_parse(Data5, ?MQTT_PROTO_V5)
|
||||
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), <<>>, _} = raw_recv_parse(Data5, ?MQTT_PROTO_V5)
|
||||
end),
|
||||
|
||||
% test clean start
|
||||
|
@ -276,7 +276,7 @@ connect_v5(_) ->
|
|||
#{'Session-Expiry-Interval' => 10}})
|
||||
)),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
emqx_client_sock:send(Sock, raw_send_serialize(
|
||||
?DISCONNECT_PACKET(?RC_SUCCESS)
|
||||
))
|
||||
|
@ -296,7 +296,7 @@ connect_v5(_) ->
|
|||
#{'Session-Expiry-Interval' => 10}})
|
||||
)),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
|
||||
end),
|
||||
|
||||
% test will message publish and cancel
|
||||
|
@ -320,7 +320,7 @@ connect_v5(_) ->
|
|||
)
|
||||
),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
|
||||
{ok, Sock2} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
|
||||
[binary, {packet, raw},
|
||||
|
@ -335,7 +335,7 @@ connect_v5(_) ->
|
|||
rc => 0}}]), #{version => ?MQTT_PROTO_V5})),
|
||||
|
||||
{ok, SubData} = gen_tcp:recv(Sock2, 0),
|
||||
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
|
||||
{ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
|
||||
|
||||
emqx_client_sock:send(Sock, raw_send_serialize(
|
||||
?DISCONNECT_PACKET(?RC_SUCCESS))),
|
||||
|
@ -367,7 +367,7 @@ connect_v5(_) ->
|
|||
)
|
||||
),
|
||||
{ok, Data3} = gen_tcp:recv(Sock3, 0),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
|
||||
|
||||
emqx_client_sock:send(Sock3, raw_send_serialize(
|
||||
?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE),
|
||||
|
@ -376,7 +376,8 @@ connect_v5(_) ->
|
|||
),
|
||||
|
||||
{ok, WillData} = gen_tcp:recv(Sock2, 0, 5000),
|
||||
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5)
|
||||
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), <<>>, _}
|
||||
= raw_recv_parse(WillData, ?MQTT_PROTO_V5)
|
||||
end),
|
||||
|
||||
% duplicate client id
|
||||
|
@ -393,7 +394,7 @@ connect_v5(_) ->
|
|||
#{'Session-Expiry-Interval' => 10}})
|
||||
)),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||
|
||||
emqx_client_sock:send(Sock1,
|
||||
raw_send_serialize(
|
||||
|
@ -408,7 +409,7 @@ connect_v5(_) ->
|
|||
#{'Session-Expiry-Interval' => 10}})
|
||||
)),
|
||||
{ok, Data1} = gen_tcp:recv(Sock1, 0),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data1, ?MQTT_PROTO_V5),
|
||||
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data1, ?MQTT_PROTO_V5),
|
||||
|
||||
emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
|
||||
qos => ?QOS_2,
|
||||
|
@ -418,7 +419,7 @@ connect_v5(_) ->
|
|||
#{version => ?MQTT_PROTO_V5})),
|
||||
|
||||
{ok, SubData} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
|
||||
{ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
|
||||
|
||||
emqx_client_sock:send(Sock1, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
|
||||
qos => ?QOS_2,
|
||||
|
@ -428,7 +429,7 @@ connect_v5(_) ->
|
|||
#{version => ?MQTT_PROTO_V5})),
|
||||
|
||||
{ok, SubData1} = gen_tcp:recv(Sock1, 0),
|
||||
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData1, ?MQTT_PROTO_V5)
|
||||
{ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(SubData1, ?MQTT_PROTO_V5)
|
||||
end, 2),
|
||||
|
||||
ok.
|
||||
|
@ -441,7 +442,7 @@ do_connect(Sock, ProtoVer) ->
|
|||
proto_ver = ProtoVer
|
||||
}))),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_parse(Data, ProtoVer).
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_parse(Data, ProtoVer).
|
||||
|
||||
subscribe_v4(_) ->
|
||||
with_connection(fun([Sock]) ->
|
||||
|
@ -455,7 +456,7 @@ subscribe_v4(_) ->
|
|||
rc => 0}}])),
|
||||
emqx_client_sock:send(Sock, SubPacket),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?SUBACK_PACKET(15, _), _} = raw_recv_parse(Data, ?MQTT_PROTO_V4)
|
||||
{ok, ?SUBACK_PACKET(15, _), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V4)
|
||||
end),
|
||||
ok.
|
||||
|
||||
|
@ -466,7 +467,7 @@ subscribe_v5(_) ->
|
|||
#{version => ?MQTT_PROTO_V5}),
|
||||
emqx_client_sock:send(Sock, SubPacket),
|
||||
{ok, DisConnData} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_FILTER_INVALID), _} =
|
||||
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_FILTER_INVALID), <<>>, _} =
|
||||
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)
|
||||
end),
|
||||
with_connection(fun([Sock]) ->
|
||||
|
@ -479,8 +480,9 @@ subscribe_v5(_) ->
|
|||
#{version => ?MQTT_PROTO_V5}),
|
||||
emqx_client_sock:send(Sock, SubPacket),
|
||||
{ok, DisConnData} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _} =
|
||||
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)
|
||||
?assertMatch(
|
||||
{ok, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), <<>>, _},
|
||||
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5))
|
||||
end),
|
||||
with_connection(fun([Sock]) ->
|
||||
do_connect(Sock, ?MQTT_PROTO_V5),
|
||||
|
@ -493,8 +495,9 @@ subscribe_v5(_) ->
|
|||
#{version => ?MQTT_PROTO_V5}),
|
||||
emqx_client_sock:send(Sock, SubPacket),
|
||||
{ok, DisConnData} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?DISCONNECT_PACKET(?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED), _} =
|
||||
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)
|
||||
?assertMatch(
|
||||
{ok, ?DISCONNECT_PACKET(?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED), <<>>, _},
|
||||
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5))
|
||||
end),
|
||||
with_connection(fun([Sock]) ->
|
||||
do_connect(Sock, ?MQTT_PROTO_V5),
|
||||
|
@ -507,8 +510,8 @@ subscribe_v5(_) ->
|
|||
#{version => ?MQTT_PROTO_V5}),
|
||||
emqx_client_sock:send(Sock, SubPacket),
|
||||
{ok, SubData} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} =
|
||||
raw_recv_parse(SubData, ?MQTT_PROTO_V5)
|
||||
{ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _}
|
||||
= raw_recv_parse(SubData, ?MQTT_PROTO_V5)
|
||||
end),
|
||||
ok.
|
||||
|
||||
|
@ -524,9 +527,8 @@ raw_send_serialize(Packet) ->
|
|||
raw_send_serialize(Packet, Opts) ->
|
||||
emqx_frame:serialize(Packet, Opts).
|
||||
|
||||
raw_recv_parse(P, ProtoVersion) ->
|
||||
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
|
||||
version => ProtoVersion}}).
|
||||
raw_recv_parse(Bin, ProtoVer) ->
|
||||
emqx_frame:parse(Bin, emqx_frame:initial_parse_state(#{version => ProtoVer})).
|
||||
|
||||
acl_deny_action_ct(_) ->
|
||||
emqx_zone:set_env(external, acl_deny_action, disconnect),
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
|
@ -11,29 +12,16 @@
|
|||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_ws_connection_SUITE).
|
||||
-module(emqx_ws_channel_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
|
||||
-define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{
|
||||
client_id = <<"mqtt_client">>,
|
||||
username = <<"admin">>,
|
||||
password = <<"public">>})).
|
||||
|
||||
-define(SUBCODE, [0]).
|
||||
|
||||
-define(PACKETID, 1).
|
||||
|
||||
-define(PUBQOS, 1).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
all() ->
|
||||
[t_ws_connect_api].
|
||||
|
@ -48,29 +36,34 @@ end_per_suite(_Config) ->
|
|||
t_ws_connect_api(_Config) ->
|
||||
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
|
||||
{ok, _} = rfc6455_client:open(WS),
|
||||
Packet = raw_send_serialize(?CLIENT),
|
||||
ok = rfc6455_client:send_binary(WS, Packet),
|
||||
{binary, CONACK} = rfc6455_client:recv(WS),
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK),
|
||||
Connect = ?CONNECT_PACKET(
|
||||
#mqtt_packet_connect{
|
||||
client_id = <<"mqtt_client">>,
|
||||
username = <<"admin">>,
|
||||
password = <<"public">>
|
||||
}),
|
||||
ok = rfc6455_client:send_binary(WS, raw_send_serialize(Connect)),
|
||||
{binary, Bin} = rfc6455_client:recv(WS),
|
||||
Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT),
|
||||
{ok, Connack, <<>>, _} = raw_recv_pase(Bin),
|
||||
Pid = emqx_cm:lookup_conn_pid(<<"mqtt_client">>),
|
||||
ConnInfo = emqx_ws_connection:info(Pid),
|
||||
ConnInfo = emqx_ws_channel:info(Pid),
|
||||
ok = t_info(ConnInfo),
|
||||
ConnAttrs = emqx_ws_connection:attrs(Pid),
|
||||
ConnAttrs = emqx_ws_channel:attrs(Pid),
|
||||
ok = t_attrs(ConnAttrs),
|
||||
ConnStats = emqx_ws_connection:stats(Pid),
|
||||
ConnStats = emqx_ws_channel:stats(Pid),
|
||||
ok = t_stats(ConnStats),
|
||||
SessionPid = emqx_ws_connection:session(Pid),
|
||||
SessionPid = emqx_ws_channel:session(Pid),
|
||||
true = is_pid(SessionPid),
|
||||
ok = emqx_ws_connection:kick(Pid),
|
||||
ok = emqx_ws_channel:kick(Pid),
|
||||
{close, _} = rfc6455_client:close(WS),
|
||||
ok.
|
||||
|
||||
raw_send_serialize(Packet) ->
|
||||
emqx_frame:serialize(Packet).
|
||||
|
||||
raw_recv_pase(P) ->
|
||||
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
|
||||
version => ?MQTT_PROTO_V4} }).
|
||||
raw_recv_pase(Packet) ->
|
||||
emqx_frame:parse(Packet).
|
||||
|
||||
t_info(InfoData) ->
|
||||
?assertEqual(websocket, maps:get(socktype, InfoData)),
|
||||
|
@ -81,7 +74,7 @@ t_info(InfoData) ->
|
|||
|
||||
t_attrs(AttrsData) ->
|
||||
?assertEqual(<<"mqtt_client">>, maps:get(client_id, AttrsData)),
|
||||
?assertEqual(emqx_ws_connection, maps:get(conn_mod, AttrsData)),
|
||||
?assertEqual(emqx_ws_channel, maps:get(conn_mod, AttrsData)),
|
||||
?assertEqual(<<"admin">>, maps:get(username, AttrsData)).
|
||||
|
||||
t_stats(StatsData) ->
|
||||
|
@ -92,3 +85,4 @@ t_stats(StatsData) ->
|
|||
?assertEqual(true, proplists:get_value(recv_pkt, StatsData) =:=1),
|
||||
?assertEqual(true, proplists:get_value(recv_msg, StatsData) >=0),
|
||||
?assertEqual(true, proplists:get_value(send_pkt, StatsData) =:=1).
|
||||
|
Loading…
Reference in New Issue