diff --git a/Makefile b/Makefile index 0aa6a03c8..771fb9d96 100644 --- a/Makefile +++ b/Makefile @@ -11,8 +11,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ - emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ - emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \ + emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_channel \ + emqx_packet emqx_channel emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \ emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping CT_NODE_NAME = emqxct@127.0.0.1 @@ -45,8 +45,8 @@ deps: eunit: @rebar3 eunit -v -.PHONY: ct-setup -ct-setup: +.PHONY: ct_setup +ct_setup: rebar3 as test compile @mkdir -p data @if [ ! -f data/loaded_plugins ]; then touch data/loaded_plugins; fi @@ -54,14 +54,14 @@ ct-setup: @ln -s -f '../../../../data' _build/test/lib/emqx/ .PHONY: ct -ct: ct-setup +ct: ct_setup @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',') ## Run one single CT with rebar3 ## e.g. make ct-one-suite suite=emqx_bridge -.PHONY: ct-one-suite -ct-one-suite: ct-setup - @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE +.PHONY: $(SUITES:%=ct-%) +$(CT_SUITES:%=ct-%): ct_setup + @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(@:ct-%=%)_SUITE .PHONY: app.config app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 1c2ce1a27..655b8e755 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,6 +12,7 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- -ifndef(EMQ_X_MQTT_HRL). -define(EMQ_X_MQTT_HRL, true). diff --git a/src/emqx_connection.erl b/src/emqx_channel.erl similarity index 82% rename from src/emqx_connection.erl rename to src/emqx_channel.erl index 2e8d2ac69..7e7b7f4c1 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_channel.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,8 +12,9 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- --module(emqx_connection). +-module(emqx_channel). -behaviour(gen_statem). @@ -23,11 +25,10 @@ -export([start_link/3]). %% APIs --export([info/1]). - --export([attrs/1]). - --export([stats/1]). +-export([ info/1 + , attrs/1 + , stats/1 + ]). -export([kick/1]). @@ -45,7 +46,6 @@ ]). -record(state, { - zone, transport, socket, peername, @@ -56,10 +56,12 @@ parse_state, gc_state, keepalive, - stats_timer, rate_limit, pub_limit, - limit_timer + limit_timer, + enable_stats, + stats_timer, + idle_timeout }). -define(ACTIVE_N, 100). @@ -69,11 +71,12 @@ start_link(Transport, Socket, Options) -> {ok, proc_lib:spawn_link(?MODULE, init, [{Transport, Socket, Options}])}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% API -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% For debug +-spec(info(pid() | #state{}) -> map()). info(CPid) when is_pid(CPid) -> call(CPid, info); @@ -92,7 +95,8 @@ info(#state{transport = Transport, conn_state => ConnState, active_n => ActiveN, rate_limit => rate_limit_info(RateLimit), - pub_limit => rate_limit_info(PubLimit)}, + pub_limit => rate_limit_info(PubLimit) + }, ProtoInfo = emqx_protocol:info(ProtoState), maps:merge(ConnInfo, ProtoInfo). @@ -137,9 +141,9 @@ session(CPid) -> call(CPid, Req) -> gen_statem:call(CPid, Req, infinity). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% gen_statem callbacks -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- init({Transport, RawSocket, Options}) -> {ok, Socket} = Transport:wait(RawSocket), @@ -151,12 +155,10 @@ init({Transport, RawSocket, Options}) -> RateLimit = init_limiter(proplists:get_value(rate_limit, Options)), PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)), ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), - IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), - SendFun = fun(Packet, SeriaOpts) -> - Data = emqx_frame:serialize(Packet, SeriaOpts), + SendFun = fun(Packet, Opts) -> + Data = emqx_frame:serialize(Packet, Opts), case Transport:async_send(Socket, Data) of - ok -> - {ok, Data}; + ok -> {ok, Data}; {error, Reason} -> {error, Reason} end @@ -166,11 +168,13 @@ init({Transport, RawSocket, Options}) -> peercert => Peercert, sendfun => SendFun, conn_mod => ?MODULE}, Options), - ParseState = emqx_protocol:parser(ProtoState), + MaxSize = emqx_zone:get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE), + ParseState = emqx_frame:initial_parse_state(#{max_size => MaxSize}), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), GcState = emqx_gc:init(GcPolicy), - State = #state{zone = Zone, - transport = Transport, + EnableStats = emqx_zone:get_env(Zone, enable_stats, true), + IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), + State = #state{transport = Transport, socket = Socket, peername = Peername, conn_state = running, @@ -179,7 +183,10 @@ init({Transport, RawSocket, Options}) -> pub_limit = PubLimit, proto_state = ProtoState, parse_state = ParseState, - gc_state = GcState}, + gc_state = GcState, + enable_stats = EnableStats, + idle_timeout = IdleTimout + }, ok = emqx_misc:init_proc_mng_policy(Zone), gen_statem:enter_loop(?MODULE, [{hibernate_after, 2 * IdleTimout}], idle, State, self(), [IdleTimout]). @@ -192,8 +199,8 @@ init_limiter({Rate, Burst}) -> callback_mode() -> [state_functions, state_enter]. -%%------------------------------------------------------------------------------ -%% Idle state +%%-------------------------------------------------------------------- +%% Idle State idle(enter, _, State) -> ok = activate_socket(State), @@ -203,15 +210,15 @@ idle(timeout, _Timeout, State) -> {stop, idle_timeout, State}; idle(cast, {incoming, Packet}, State) -> - handle_packet(Packet, fun(NState) -> - {next_state, connected, reset_parser(NState)} - end, State); + handle_incoming(Packet, fun(NState) -> + {next_state, connected, NState} + end, State); idle(EventType, Content, State) -> ?HANDLE(EventType, Content, State). -%%------------------------------------------------------------------------------ -%% Connected state +%%-------------------------------------------------------------------- +%% Connected State connected(enter, _, _State) -> %% What to do? @@ -221,9 +228,7 @@ connected(enter, _, _State) -> connected(cast, {incoming, Packet = ?PACKET(Type)}, State) -> ok = emqx_metrics:inc_recv(Packet), (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1), - handle_packet(Packet, fun(NState) -> - {keep_state, NState} - end, State); + handle_incoming(Packet, fun(NState) -> {keep_state, NState} end, State); %% Handle Output connected(info, {deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> @@ -283,18 +288,18 @@ handle({call, From}, session, State = #state{proto_state = ProtoState}) -> reply(From, emqx_protocol:session(ProtoState), State); handle({call, From}, Req, State) -> - ?LOG(error, "[Connection] Unexpected call: ~p", [Req]), + ?LOG(error, "[Channel] Unexpected call: ~p", [Req]), reply(From, ignored, State); %% Handle cast handle(cast, Msg, State) -> - ?LOG(error, "[Connection] Unexpected cast: ~p", [Msg]), + ?LOG(error, "[Channel] Unexpected cast: ~p", [Msg]), {keep_state, State}; %% Handle Incoming handle(info, {Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> Oct = iolist_size(Data), - ?LOG(debug, "[Connection] RECV ~p", [Data]), + ?LOG(debug, "[Channel] RECV ~p", [Data]), emqx_pd:update_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), NState = ensure_stats_timer(maybe_gc({1, Oct}, State)), @@ -308,13 +313,8 @@ handle(info, {Closed, _Sock}, State) when Closed == tcp_closed; Closed == ssl_closed -> shutdown(closed, State); -handle(info, {tcp_passive, _Sock}, State) -> - %% Rate limit here:) - NState = ensure_rate_limit(State), - ok = activate_socket(NState), - {keep_state, NState}; - -handle(info, {ssl_passive, _Sock}, State) -> +handle(info, {Passive, _Sock}, State) when Passive == tcp_passive; + Passive == ssl_passive -> %% Rate limit here:) NState = ensure_rate_limit(State), ok = activate_socket(NState), @@ -336,7 +336,8 @@ handle(info, {timeout, Timer, emit_stats}, State = #state{stats_timer = Timer, proto_state = ProtoState, gc_state = GcState}) -> - emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), + ClientId = emqx_protocol:client_id(ProtoState), + emqx_cm:set_conn_stats(ClientId, stats(State)), NState = State#state{stats_timer = undefined}, Limits = erlang:get(force_shutdown_policy), case emqx_misc:conn_proc_mng_policy(Limits) of @@ -347,23 +348,23 @@ handle(info, {timeout, Timer, emit_stats}, GcState1 = emqx_gc:reset(GcState), {keep_state, NState#state{gc_state = GcState1}, hibernate}; {shutdown, Reason} -> - ?LOG(error, "[Connection] Shutdown exceptionally due to ~p", [Reason]), + ?LOG(error, "[Channel] Shutdown exceptionally due to ~p", [Reason]), shutdown(Reason, NState) end; handle(info, {shutdown, discard, {ClientId, ByPid}}, State) -> - ?LOG(error, "[Connection] Discarded by ~s:~p", [ClientId, ByPid]), + ?LOG(error, "[Channel] Discarded by ~s:~p", [ClientId, ByPid]), shutdown(discard, State); handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) -> - ?LOG(warning, "[Connection] Clientid '~s' conflict with ~p", [ClientId, NewPid]), + ?LOG(warning, "[Channel] Clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); handle(info, {shutdown, Reason}, State) -> shutdown(Reason, State); handle(info, Info, State) -> - ?LOG(error, "[Connection] Unexpected info: ~p", [Info]), + ?LOG(error, "[Channel] Unexpected info: ~p", [Info]), {keep_state, State}. code_change(_Vsn, State, Data, _Extra) -> @@ -373,7 +374,7 @@ terminate(Reason, _StateName, #state{transport = Transport, socket = Socket, keepalive = KeepAlive, proto_state = ProtoState}) -> - ?LOG(debug, "[Connection] Terminated for ~p", [Reason]), + ?LOG(debug, "[Channel] Terminated for ~p", [Reason]), Transport:fast_close(Socket), emqx_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of @@ -384,7 +385,7 @@ terminate(Reason, _StateName, #state{transport = Transport, emqx_protocol:terminate(Reason, ProtoState) end. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Process incoming data process_incoming(<<>>, Packets, State) -> @@ -392,30 +393,30 @@ process_incoming(<<>>, Packets, State) -> process_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> try emqx_frame:parse(Data, ParseState) of - {ok, Packet, Rest} -> - process_incoming(Rest, [Packet|Packets], reset_parser(State)); - {more, NewParseState} -> - {keep_state, State#state{parse_state = NewParseState}, next_events(Packets)}; + {ok, NParseState} -> + NState = State#state{parse_state = NParseState}, + {keep_state, NState, next_events(Packets)}; + {ok, Packet, Rest, NParseState} -> + NState = State#state{parse_state = NParseState}, + process_incoming(Rest, [Packet|Packets], NState); {error, Reason} -> shutdown(Reason, State) catch - _:Error:Stk-> - ?LOG(error, "[Connection] Parse failed for ~p~nStacktrace:~p~nError data:~p", [Error, Stk, Data]), - shutdown(Error, State) + error:Reason:Stk -> + ?LOG(error, "[Channel] Parse failed for ~p~n\ + Stacktrace:~p~nError data:~p", [Reason, Stk, Data]), + shutdown(parse_error, State) end. -reset_parser(State = #state{proto_state = ProtoState}) -> - State#state{parse_state = emqx_protocol:parser(ProtoState)}. - next_events(Packets) when is_list(Packets) -> [next_events(Packet) || Packet <- lists:reverse(Packets)]; next_events(Packet) -> {next_event, cast, {incoming, Packet}}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Handle incoming packet -handle_packet(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> +handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> case emqx_protocol:received(Packet, ProtoState) of {ok, NProtoState} -> SuccFun(State#state{proto_state = NProtoState}); @@ -427,7 +428,7 @@ handle_packet(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> stop(Error, State#state{proto_state = NProtoState}) end. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Ensure rate limit ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl}) -> @@ -444,12 +445,12 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> {0, Rl1} -> ensure_rate_limit(Limiters, setelement(Pos, State, Rl1)); {Pause, Rl1} -> - ?LOG(debug, "[Connection] Rate limit pause connection ~pms", [Pause]), + ?LOG(debug, "[Channel] Rate limit pause connection ~pms", [Pause]), TRef = erlang:send_after(Pause, self(), activate_socket), setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1) end. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Activate socket activate_socket(#state{conn_state = blocked}) -> @@ -463,20 +464,16 @@ activate_socket(#state{transport = Transport, socket = Socket, active_n = N}) -> ok end. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Ensure stats timer -ensure_stats_timer(State = #state{zone = Zone, stats_timer = undefined}) -> - case emqx_zone:get_env(Zone, enable_stats, true) of - true -> - IdleTimeout = emqx_zone:get_env(Zone, idle_timeout, 30000), - State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; - false -> - State - end; +ensure_stats_timer(State = #state{enable_stats = true, + stats_timer = undefined, + idle_timeout = IdleTimeout}) -> + State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; ensure_stats_timer(State) -> State. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Maybe GC maybe_gc(_, State = #state{gc_state = undefined}) -> @@ -494,7 +491,7 @@ maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) -> State#state{gc_state = GCSt1}; maybe_gc(_, State) -> State. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Helper functions reply(From, Reply, State) -> @@ -505,3 +502,4 @@ shutdown(Reason, State) -> stop(Reason, State) -> {stop, Reason, State}. + diff --git a/src/emqx_client.erl b/src/emqx_client.erl index cd83e61ad..e6dbea27b 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,6 +12,7 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- -module(emqx_client). @@ -20,7 +22,9 @@ -include("types.hrl"). -include("emqx_client.hrl"). --export([start_link/0, start_link/1]). +-export([ start_link/0 + , start_link/1 + ]). -export([ connect/1 , disconnect/1 @@ -175,7 +179,8 @@ retry_timer :: reference(), session_present :: boolean(), last_packet_id :: packet_id(), - parse_state :: emqx_frame:state()}). + parse_state :: emqx_frame:state() + }). -record(call, {id, from, req, ts}). @@ -202,9 +207,9 @@ -type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% API -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -spec(start_link() -> gen_statem:start_ret()). start_link() -> start_link([]). @@ -352,9 +357,9 @@ disconnect(Client, ReasonCode) -> disconnect(Client, ReasonCode, Properties) -> gen_statem:call(Client, {disconnect, ReasonCode, Properties}). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% For test cases -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- puback(Client, PacketId) when is_integer(PacketId) -> puback(Client, PacketId, ?RC_SUCCESS). @@ -407,9 +412,9 @@ pause(Client) -> resume(Client) -> gen_statem:call(Client, resume). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% gen_statem callbacks -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- init([Options]) -> process_flag(trap_exit, true), @@ -443,7 +448,8 @@ init([Options]) -> ack_timeout = ?DEFAULT_ACK_TIMEOUT, retry_interval = 0, connect_timeout = ?DEFAULT_CONNECT_TIMEOUT, - last_packet_id = 1}), + last_packet_id = 1 + }), {ok, initialized, init_parse_state(State)}. random_client_id() -> @@ -563,9 +569,10 @@ init_will_msg({qos, QoS}, WillMsg) -> WillMsg#mqtt_msg{qos = ?QOS_I(QoS)}. init_parse_state(State = #state{proto_ver = Ver, properties = Properties}) -> - Size = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE), - State#state{parse_state = emqx_frame:initial_state( - #{max_packet_size => Size, version => Ver})}. + MaxSize = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE), + ParseState = emqx_frame:initial_parse_state( + #{max_size => MaxSize, version => Ver}), + State#state{parse_state = ParseState}. callback_mode() -> state_functions. @@ -955,9 +962,9 @@ terminate(Reason, _StateName, State = #state{socket = Socket}) -> code_change(_Vsn, State, Data, _Extra) -> {ok, State, Data}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Internal functions -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- should_ping(Sock) -> case emqx_client_sock:getstat(Sock, [send_oct]) of @@ -1010,7 +1017,8 @@ assign_id(?NO_CLIENT_ID, Props) -> assign_id(Id, _Props) -> Id. -publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State0 = #state{auto_ack = AutoAck}) -> +publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), + State0 = #state{auto_ack = AutoAck}) -> State = deliver(packet_to_msg(Packet), State0), case AutoAck of true -> send_puback(?PUBACK_PACKET(PacketId), State); @@ -1161,7 +1169,7 @@ msg_to_packet(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = Packe properties = Props}, payload = Payload}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Socket Connect/Send sock_connect(Hosts, SockOpts, Timeout) -> @@ -1201,7 +1209,7 @@ send(Packet, State = #state{socket = Sock, proto_ver = Ver}) run_sock(State = #state{socket = Sock}) -> emqx_client_sock:setopts(Sock, [{active, once}]), State. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Process incomming process_incoming(<<>>, Packets, State) -> @@ -1209,10 +1217,10 @@ process_incoming(<<>>, Packets, State) -> process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) -> try emqx_frame:parse(Bytes, ParseState) of - {ok, Packet, Rest} -> - process_incoming(Rest, [Packet|Packets], init_parse_state(State)); - {more, NewParseState} -> - {keep_state, State#state{parse_state = NewParseState}, next_events(Packets)}; + {ok, Packet, Rest, NParseState} -> + process_incoming(Rest, [Packet|Packets], State#state{parse_state = NParseState}); + {ok, NParseState} -> + {keep_state, State#state{parse_state = NParseState}, next_events(Packets)}; {error, Reason} -> {stop, Reason} catch @@ -1227,7 +1235,7 @@ next_events([Packet]) -> next_events(Packets) -> [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)]. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% packet_id generation bump_last_packet_id(State = #state{last_packet_id = Id}) -> @@ -1236,3 +1244,4 @@ bump_last_packet_id(State = #state{last_packet_id = Id}) -> -spec next_packet_id(packet_id()) -> packet_id(). next_packet_id(?MAX_PACKET_ID) -> 1; next_packet_id(Id) -> Id + 1. + diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 79c8ed3c8..a4ef34840 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,79 +12,95 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- -module(emqx_frame). -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --export([ initial_state/0 - , initial_state/1 +-export([ initial_parse_state/0 + , initial_parse_state/1 ]). --export([ parse/2 +-export([ parse/1 + , parse/2 , serialize/1 , serialize/2 ]). --type(options() :: #{max_packet_size => 1..?MAX_PACKET_SIZE, - version => emqx_mqtt_types:version()}). +-type(options() :: #{max_size => 1..?MAX_PACKET_SIZE, + version => emqx_mqtt_types:version() + }). --opaque(parse_state() :: {none, options()} | cont_fun(binary())). +-opaque(parse_state() :: {none, options()} | {more, cont_fun()}). --type(cont_fun(Bin) :: fun((Bin) -> {ok, emqx_mqtt_types:packet(), binary()} - | {more, cont_fun(Bin)})). +-opaque(parse_result() :: {ok, parse_state()} + | {ok, emqx_mqtt_types:packet(), binary(), parse_state()}). --export_type([options/0, parse_state/0]). +-type(cont_fun() :: fun((binary()) -> parse_result())). --define(DEFAULT_OPTIONS, #{max_packet_size => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4}). +-export_type([ options/0 + , parse_state/0 + , parse_result/0 + ]). -%%------------------------------------------------------------------------------ -%% Init parse state -%%------------------------------------------------------------------------------ +-define(none(Opts), {none, Opts}). +-define(more(Cont), {more, Cont}). +-define(DEFAULT_OPTIONS, + #{max_size => ?MAX_PACKET_SIZE, + version => ?MQTT_PROTO_V4 + }). --spec(initial_state() -> {none, options()}). -initial_state() -> - initial_state(#{}). +%%-------------------------------------------------------------------- +%% Init Parse State +%%-------------------------------------------------------------------- --spec(initial_state(options()) -> {none, options()}). -initial_state(Options) when is_map(Options) -> - {none, merge_opts(Options)}. +-spec(initial_parse_state() -> {none, options()}). +initial_parse_state() -> + initial_parse_state(#{}). +-spec(initial_parse_state(options()) -> {none, options()}). +initial_parse_state(Options) when is_map(Options) -> + ?none(merge_opts(Options)). + +%% @pivate merge_opts(Options) -> maps:merge(?DEFAULT_OPTIONS, Options). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Parse MQTT Frame -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- --spec(parse(binary(), parse_state()) -> {ok, emqx_mqtt_types:packet(), binary()} | - {more, cont_fun(binary())}). +-spec(parse(binary()) -> parse_result()). +parse(Bin) -> + parse(Bin, initial_parse_state()). + +-spec(parse(binary(), parse_state()) -> parse_result()). parse(<<>>, {none, Options}) -> - {more, fun(Bin) -> parse(Bin, {none, Options}) end}; + {ok, ?more(fun(Bin) -> parse(Bin, {none, Options}) end)}; parse(<>, {none, Options}) -> parse_remaining_len(Rest, #mqtt_packet_header{type = Type, dup = bool(Dup), qos = fixqos(Type, QoS), retain = bool(Retain)}, Options); -parse(Bin, Cont) when is_binary(Bin), is_function(Cont) -> +parse(Bin, {more, Cont}) when is_binary(Bin), is_function(Cont) -> Cont(Bin). parse_remaining_len(<<>>, Header, Options) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end}; + {ok, ?more(fun(Bin) -> parse_remaining_len(Bin, Header, Options) end)}; parse_remaining_len(Rest, Header, Options) -> parse_remaining_len(Rest, Header, 1, 0, Options). -parse_remaining_len(_Bin, _Header, _Multiplier, Length, - #{max_packet_size := MaxSize}) - when Length > MaxSize -> +parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) + when Length > MaxSize -> error(mqtt_frame_too_large); parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end}; + {ok, ?more(fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end)}; %% Match DISCONNECT without payload -parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, _Options) -> - wrap(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}, Rest); +parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> + Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), + {ok, Packet, Rest, ?none(Options)}; %% Match PINGREQ. parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> parse_frame(Rest, Header, 0, Options); @@ -92,38 +109,40 @@ parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> parse_frame(Rest, Header, 2, Options); parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); -parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, - Options = #{max_packet_size:= MaxSize}) -> +parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, + Options = #{max_size := MaxSize}) -> FrameLen = Value + Len * Multiplier, if FrameLen > MaxSize -> error(mqtt_frame_too_large); true -> parse_frame(Rest, Header, FrameLen, Options) end. -parse_frame(Bin, Header, 0, _Options) -> - wrap(Header, Bin); +parse_frame(Bin, Header, 0, Options) -> + {ok, packet(Header), Bin, ?none(Options)}; parse_frame(Bin, Header, Length, Options) -> case Bin of <> -> case parse_packet(Header, FrameBin, Options) of {Variable, Payload} -> - wrap(Header, Variable, Payload, Rest); + {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; + Variable = #mqtt_packet_connect{proto_ver = Ver} -> + {ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})}; Variable -> - wrap(Header, Variable, Rest) + {ok, packet(Header, Variable), Rest, ?none(Options)} end; TooShortBin -> - {more, fun(BinMore) -> - parse_frame(<>, Header, Length, Options) - end} + {ok, ?more(fun(BinMore) -> + parse_frame(<>, Header, Length, Options) + end)} end. -wrap(Header, Variable, Payload, Rest) -> - {ok, #mqtt_packet{header = Header, variable = Variable, payload = Payload}, Rest}. -wrap(Header, Variable, Rest) -> - {ok, #mqtt_packet{header = Header, variable = Variable}, Rest}. -wrap(Header, Rest) -> - {ok, #mqtt_packet{header = Header}, Rest}. +packet(Header) -> + #mqtt_packet{header = Header}. +packet(Header, Variable) -> + #mqtt_packet{header = Header, variable = Variable}. +packet(Header, Variable, Payload) -> + #mqtt_packet{header = Header, variable = Variable, payload = Payload}. parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> {ProtoName, Rest} = parse_utf8_string(FrameBin), @@ -362,9 +381,9 @@ parse_utf8_string(<>) -> parse_binary_data(<>) -> {Data, Rest}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Serialize MQTT Packet -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -spec(serialize(emqx_mqtt_types:packet()) -> iodata()). serialize(Packet) -> diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 65515a8d3..745daf000 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -73,7 +73,7 @@ start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> start_mqtt_listener(Name, ListenOn, Options) -> SockOpts = esockd:parse_opt(Options), esockd:open(Name, ListenOn, merge_default(SockOpts), - {emqx_connection, start_link, [Options -- SockOpts]}). + {emqx_channel, start_link, [Options -- SockOpts]}). start_http_listener(Start, Name, ListenOn, RanchOpts, ProtoOpts) -> Start(Name, with_port(ListenOn, RanchOpts), ProtoOpts). @@ -82,7 +82,7 @@ mqtt_path(Options) -> proplists:get_value(mqtt_path, Options, "/mqtt"). ws_opts(Options) -> - Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_channel, Options}]}]), #{env => #{dispatch => Dispatch}, proxy_header => proplists:get_value(proxy_protocol, Options, false)}. ranch_opts(Options) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index f56d6a6e6..b6ca03d7f 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,6 +12,7 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- -module(emqx_protocol). @@ -18,16 +20,18 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). --export([ init/2 - , info/1 +-export([ info/1 , attrs/1 , attr/2 , caps/1 + , caps/2 , stats/1 , client_id/1 , credentials/1 - , parser/1 , session/1 + ]). + +-export([ init/2 , received/2 , process/2 , deliver/2 @@ -35,8 +39,6 @@ , terminate/2 ]). --export_type([state/0]). - -record(pstate, { zone, sendfun, @@ -70,6 +72,8 @@ -opaque(state() :: #pstate{}). +-export_type([state/0]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -168,6 +172,8 @@ attrs(#pstate{zone = Zone, , credentials => Credentials }. +attr(proto_ver, #pstate{proto_ver = ProtoVer}) -> + ProtoVer; attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> get_property('Receive-Maximum', ConnProps, 65535); attr(max_inflight, #pstate{zone = Zone}) -> @@ -190,6 +196,9 @@ attr(Name, PState) -> false -> undefined end. +caps(Name, PState) -> + maps:get(Name, caps(PState)). + caps(#pstate{zone = Zone}) -> emqx_mqtt_caps:get_caps(Zone). @@ -232,10 +241,6 @@ stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg}, session(#pstate{session = SPid}) -> SPid. -parser(#pstate{zone = Zone, proto_ver = Ver}) -> - Size = emqx_zone:get_env(Zone, max_packet_size), - emqx_frame:initial_state(#{max_packet_size => Size, version => Ver}). - %%------------------------------------------------------------------------------ %% Packet Received %%------------------------------------------------------------------------------ diff --git a/src/emqx_psk.erl b/src/emqx_psk.erl index 3b2407b1c..ae2f8935b 100644 --- a/src/emqx_psk.erl +++ b/src/emqx_psk.erl @@ -19,8 +19,6 @@ %% SSL PSK Callbacks -export([lookup/3]). --define(TAB, ?MODULE). - -type psk_identity() :: string(). -type psk_user_state() :: term(). diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 56f29bd4c..85d2c1947 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,6 +12,7 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- %% @doc %% A stateful interaction between a Client and a Server. Some Sessions diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_channel.erl similarity index 78% rename from src/emqx_ws_connection.erl rename to src/emqx_ws_channel.erl index d635a0caa..b20ad3478 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_channel.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,8 +12,9 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- --module(emqx_ws_connection). +-module(emqx_ws_channel). -include("emqx.hrl"). -include("emqx_mqtt.hrl"). @@ -38,20 +40,20 @@ options, peername, sockname, - idle_timeout, proto_state, parse_state, keepalive, enable_stats, stats_timer, + idle_timeout, shutdown }). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% API -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% for debug info(WSPid) when is_pid(WSPid) -> @@ -108,9 +110,9 @@ call(WSPid, Req) when is_pid(WSPid) -> exit(timeout) end. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% WebSocket callbacks -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- init(Req, Opts) -> IdleTimeout = proplists:get_value(idle_timeout, Opts, 7200000), @@ -141,11 +143,11 @@ websocket_init(#state{request = Req, options = Options}) -> WsCookie = try cowboy_req:parse_cookies(Req) catch error:badarg -> - ?LOG(error, "[WS Connection] Illegal cookie"), + ?LOG(error, "[WS Channel] Illegal cookie"), undefined; Error:Reason -> ?LOG(error, - "[WS Connection] Cookie is parsed failed, Error: ~p, Reason ~p", + "[WS Channel] Cookie is parsed failed, Error: ~p, Reason ~p", [Error, Reason]), undefined end, @@ -155,15 +157,16 @@ websocket_init(#state{request = Req, options = Options}) -> sendfun => send_fun(self()), ws_cookie => WsCookie, conn_mod => ?MODULE}, Options), - ParserState = emqx_protocol:parser(ProtoState), Zone = proplists:get_value(zone, Options), + MaxSize = emqx_zone:get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE), + ParseState = emqx_frame:initial_parse_state(#{max_size => MaxSize}), EnableStats = emqx_zone:get_env(Zone, enable_stats, true), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), emqx_logger:set_metadata_peername(esockd_net:format(Peername)), ok = emqx_misc:init_proc_mng_policy(Zone), {ok, #state{peername = Peername, sockname = Sockname, - parse_state = ParserState, + parse_state = ParseState, proto_state = ProtoState, enable_stats = EnableStats, idle_timeout = IdleTimout}}. @@ -185,35 +188,28 @@ websocket_handle({binary, <<>>}, State) -> {ok, ensure_stats_timer(State)}; websocket_handle({binary, [<<>>]}, State) -> {ok, ensure_stats_timer(State)}; -websocket_handle({binary, Data}, State = #state{parse_state = ParseState, - proto_state = ProtoState}) -> - ?LOG(debug, "[WS Connection] RECV ~p", [Data]), +websocket_handle({binary, Data}, State = #state{parse_state = ParseState}) -> + ?LOG(debug, "[WS Channel] RECV ~p", [Data]), BinSize = iolist_size(Data), emqx_pd:update_counter(recv_oct, BinSize), ok = emqx_metrics:inc('bytes.received', BinSize), try emqx_frame:parse(iolist_to_binary(Data), ParseState) of - {more, ParseState1} -> - {ok, State#state{parse_state = ParseState1}}; - {ok, Packet, Rest} -> + {ok, NParseState} -> + {ok, State#state{parse_state = NParseState}}; + {ok, Packet, Rest, NParseState} -> ok = emqx_metrics:inc_recv(Packet), emqx_pd:update_counter(recv_cnt, 1), - case emqx_protocol:received(Packet, ProtoState) of - {ok, ProtoState1} -> - websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1})); - {error, Error} -> - ?LOG(error, "[WS Connection] Protocol error: ~p", [Error]), - shutdown(Error, State); - {error, Reason, ProtoState1} -> - shutdown(Reason, State#state{proto_state = ProtoState1}); - {stop, Error, ProtoState1} -> - shutdown(Error, State#state{proto_state = ProtoState1}) - end; - {error, Error} -> - ?LOG(error, "[WS Connection] Frame error: ~p", [Error]), - shutdown(Error, State) + handle_incoming(Packet, fun(NState) -> + websocket_handle({binary, Rest}, NState) + end, + State#state{parse_state = NParseState}); + {error, Reason} -> + ?LOG(error, "[WS Channel] Frame error: ~p", [Reason]), + shutdown(Reason, State) catch - _:Error -> - ?LOG(error, "[WS Connection] Frame error:~p~nFrame data: ~p", [Error, Data]), + error:Reason:Stk -> + ?LOG(error, "[WS Channel] Parse failed for ~p~n\ + Stacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]), shutdown(parse_error, State) end; %% Pings should be replied with pongs, cowboy does it automatically @@ -259,12 +255,12 @@ websocket_info({timeout, Timer, emit_stats}, {ok, State#state{stats_timer = undefined}, hibernate}; websocket_info({keepalive, start, Interval}, State) -> - ?LOG(debug, "[WS Connection] Keepalive at the interval of ~p", [Interval]), + ?LOG(debug, "[WS Channel] Keepalive at the interval of ~p", [Interval]), case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of {ok, KeepAlive} -> {ok, State#state{keepalive = KeepAlive}}; {error, Error} -> - ?LOG(warning, "[WS Connection] Keepalive error: ~p", [Error]), + ?LOG(warning, "[WS Channel] Keepalive error: ~p", [Error]), shutdown(Error, State) end; @@ -273,19 +269,19 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> {ok, KeepAlive1} -> {ok, State#state{keepalive = KeepAlive1}}; {error, timeout} -> - ?LOG(debug, "[WS Connection] Keepalive Timeout!"), + ?LOG(debug, "[WS Channel] Keepalive Timeout!"), shutdown(keepalive_timeout, State); {error, Error} -> - ?LOG(error, "[WS Connection] Keepalive error: ~p", [Error]), + ?LOG(error, "[WS Channel] Keepalive error: ~p", [Error]), shutdown(keepalive_error, State) end; websocket_info({shutdown, discard, {ClientId, ByPid}}, State) -> - ?LOG(warning, "[WS Connection] Discarded by ~s:~p", [ClientId, ByPid]), + ?LOG(warning, "[WS Channel] Discarded by ~s:~p", [ClientId, ByPid]), shutdown(discard, State); websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) -> - ?LOG(warning, "[WS Connection] Clientid '~s' conflict with ~p", [ClientId, NewPid]), + ?LOG(warning, "[WS Channel] Clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); websocket_info({binary, Data}, State) -> @@ -294,15 +290,18 @@ websocket_info({binary, Data}, State) -> websocket_info({shutdown, Reason}, State) -> shutdown(Reason, State); +websocket_info({stop, Reason}, State) -> + {stop, State#state{shutdown = Reason}}; + websocket_info(Info, State) -> - ?LOG(error, "[WS Connection] Unexpected info: ~p", [Info]), + ?LOG(error, "[WS Channel] Unexpected info: ~p", [Info]), {ok, State}. terminate(SockError, _Req, #state{keepalive = Keepalive, proto_state = ProtoState, shutdown = Shutdown}) -> - - ?LOG(debug, "[WS Connection] Terminated for ~p, sockerror: ~p", [Shutdown, SockError]), + ?LOG(debug, "[WS Channel] Terminated for ~p, sockerror: ~p", + [Shutdown, SockError]), emqx_keepalive:cancel(Keepalive), case {ProtoState, Shutdown} of {undefined, _} -> ok; @@ -312,12 +311,24 @@ terminate(SockError, _Req, #state{keepalive = Keepalive, emqx_protocol:terminate(Error, ProtoState) end. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Internal functions -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- + +handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> + case emqx_protocol:received(Packet, ProtoState) of + {ok, NProtoState} -> + SuccFun(State#state{proto_state = NProtoState}); + {error, Reason} -> + ?LOG(error, "[WS Channel] Protocol error: ~p", [Reason]), + shutdown(Reason, State); + {error, Reason, NProtoState} -> + shutdown(Reason, State#state{proto_state = NProtoState}); + {stop, Error, NProtoState} -> + shutdown(Error, State#state{proto_state = NProtoState}) + end. + -reset_parser(State = #state{proto_state = ProtoState}) -> - State#state{parse_state = emqx_protocol:parser(ProtoState)}. ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, @@ -327,7 +338,9 @@ ensure_stats_timer(State) -> State. shutdown(Reason, State) -> - {stop, State#state{shutdown = Reason}}. + %% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696) + self() ! {stop, Reason}, + {ok, State}. wsock_stats() -> [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS]. diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index bcc0a6b6b..a74e16552 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -110,7 +110,7 @@ mqtt_connect_with_tcp(_) -> Packet = raw_send_serialize(?CLIENT2), emqx_client_sock:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data), + {ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), <<>>, _} = raw_recv_pase(Data), emqx_client_sock:close(Sock). mqtt_connect_with_will_props(_) -> @@ -133,7 +133,7 @@ mqtt_connect_with_ssl_oneway(_) -> emqx_client_sock:send(Sock, Packet), ?assert( receive {ssl, _, ConAck}-> - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(ConAck), true + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_pase(ConAck), true after 1000 -> false end), @@ -152,7 +152,7 @@ mqtt_connect_with_ssl_twoway(_Config) -> timer:sleep(500), ?assert( receive {ssl, _, Data}-> - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), true + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_pase(Data), true after 1000 -> false end), @@ -167,19 +167,19 @@ mqtt_connect_with_ws(_Config) -> Packet = raw_send_serialize(?CLIENT), ok = rfc6455_client:send_binary(WS, Packet), {binary, CONACK} = rfc6455_client:recv(WS), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_pase(CONACK), %% Sub Packet SubPacket = raw_send_serialize(?SUBPACKET), rfc6455_client:send_binary(WS, SubPacket), {binary, SubAck} = rfc6455_client:recv(WS), - {ok, ?SUBACK_PACKET(?PACKETID, ?SUBCODE), _} = raw_recv_pase(SubAck), + {ok, ?SUBACK_PACKET(?PACKETID, ?SUBCODE), <<>>, _} = raw_recv_pase(SubAck), %% Pub Packet QoS 1 PubPacket = raw_send_serialize(?PUBPACKET), rfc6455_client:send_binary(WS, PubPacket), {binary, PubAck} = rfc6455_client:recv(WS), - {ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(PubAck), + {ok, ?PUBACK_PACKET(?PACKETID), <<>>, _} = raw_recv_pase(PubAck), {close, _} = rfc6455_client:close(WS), ok. @@ -189,18 +189,18 @@ packet_size(_Config) -> Packet = raw_send_serialize(?CLIENT), emqx_client_sock:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_pase(Data), %% Pub Packet QoS 1 PubPacket = raw_send_serialize(?BIG_PUBPACKET), emqx_client_sock:send(Sock, PubPacket), {ok, Data1} = gen_tcp:recv(Sock, 0), - {ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(Data1), + {ok, ?PUBACK_PACKET(?PACKETID), <<>>, _} = raw_recv_pase(Data1), emqx_client_sock:close(Sock). raw_send_serialize(Packet) -> emqx_frame:serialize(Packet). -raw_recv_pase(P) -> - emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4} }). +raw_recv_pase(Bin) -> + emqx_frame:parse(Bin). + diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl index d50c7fb5f..25a8303e6 100644 --- a/test/emqx_alarm_handler_SUITE.erl +++ b/test/emqx_alarm_handler_SUITE.erl @@ -60,7 +60,7 @@ t_alarm_handler(_) -> #{version => ?MQTT_PROTO_V5} )), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>), Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>), @@ -74,7 +74,7 @@ t_alarm_handler(_) -> #{version => ?MQTT_PROTO_V5})), {ok, Data2} = gen_tcp:recv(Sock, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2, 2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5), + {ok, ?SUBACK_PACKET(1, #{}, [2, 2]), <<>>, _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5), alarm_handler:set_alarm({alarm_for_test, #alarm{id = alarm_for_test, severity = error, @@ -83,7 +83,7 @@ t_alarm_handler(_) -> {ok, Data3} = gen_tcp:recv(Sock, 0), - {ok, ?PUBLISH_PACKET(?QOS_0, Topic1, _, _), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), + {ok, ?PUBLISH_PACKET(?QOS_0, Topic1, _, _), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), ?assertEqual(true, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())), @@ -91,7 +91,7 @@ t_alarm_handler(_) -> {ok, Data4} = gen_tcp:recv(Sock, 0), - {ok, ?PUBLISH_PACKET(?QOS_0, Topic2, _, _), _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5), + {ok, ?PUBLISH_PACKET(?QOS_0, Topic2, _, _), <<>>, _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5), ?assertEqual(false, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())) @@ -119,6 +119,7 @@ raw_send_serialize(Packet) -> raw_send_serialize(Packet, Opts) -> emqx_frame:serialize(Packet, Opts). -raw_recv_parse(P, ProtoVersion) -> - emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, - version => ProtoVersion}}). +raw_recv_parse(Bin, ProtoVer) -> + emqx_frame:parse(Bin, {none, #{max_size => ?MAX_PACKET_SIZE, + version => ProtoVer}}). + diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_channel_SUITE.erl similarity index 83% rename from test/emqx_connection_SUITE.erl rename to test/emqx_channel_SUITE.erl index 2c124fd44..7d8b216f2 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,18 +12,19 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- --module(emqx_connection_SUITE). +-module(emqx_channel_SUITE). -compile(export_all). -compile(nowarn_export_all). +-include("emqx_mqtt.hrl"). + -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --include("emqx_mqtt.hrl"). - all() -> [t_connect_api]. @@ -40,13 +42,13 @@ t_connect_api(_Config) -> {password, <<"pass1">>}]), {ok, _} = emqx_client:connect(T1), CPid = emqx_cm:lookup_conn_pid(<<"client1">>), - ConnStats = emqx_connection:stats(CPid), + ConnStats = emqx_channel:stats(CPid), ok = t_stats(ConnStats), - ConnAttrs = emqx_connection:attrs(CPid), + ConnAttrs = emqx_channel:attrs(CPid), ok = t_attrs(ConnAttrs), - ConnInfo = emqx_connection:info(CPid), + ConnInfo = emqx_channel:info(CPid), ok = t_info(ConnInfo), - SessionPid = emqx_connection:session(CPid), + SessionPid = emqx_channel:session(CPid), true = is_pid(SessionPid), emqx_client:disconnect(T1). @@ -59,7 +61,7 @@ t_info(ConnInfo) -> t_attrs(AttrsData) -> ?assertEqual(<<"client1">>, maps:get(client_id, AttrsData)), - ?assertEqual(emqx_connection, maps:get(conn_mod, AttrsData)), + ?assertEqual(emqx_channel, maps:get(conn_mod, AttrsData)), ?assertEqual(<<"testuser1">>, maps:get(username, AttrsData)). t_stats(StatsData) -> diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 86505a566..8de05e734 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,6 +12,7 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- -module(emqx_frame_SUITE). @@ -18,11 +20,8 @@ -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). - -include_lib("eunit/include/eunit.hrl"). --import(emqx_frame, [serialize/1, serialize/2]). - all() -> [{group, connect}, {group, connack}, @@ -44,15 +43,18 @@ groups() -> serialize_parse_v5_connect, serialize_parse_connect_without_clientid, serialize_parse_connect_with_will, - serialize_parse_bridge_connect]}, + serialize_parse_bridge_connect + ]}, {connack, [parallel], [serialize_parse_connack, - serialize_parse_connack_v5]}, + serialize_parse_connack_v5 + ]}, {publish, [parallel], [serialize_parse_qos0_publish, serialize_parse_qos1_publish, serialize_parse_qos2_publish, - serialize_parse_publish_v5]}, + serialize_parse_publish_v5 + ]}, {puback, [parallel], [serialize_parse_puback, serialize_parse_puback_v5, @@ -61,27 +63,35 @@ groups() -> serialize_parse_pubrel, serialize_parse_pubrel_v5, serialize_parse_pubcomp, - serialize_parse_pubcomp_v5]}, + serialize_parse_pubcomp_v5 + ]}, {subscribe, [parallel], [serialize_parse_subscribe, - serialize_parse_subscribe_v5]}, + serialize_parse_subscribe_v5 + ]}, {suback, [parallel], [serialize_parse_suback, - serialize_parse_suback_v5]}, + serialize_parse_suback_v5 + ]}, {unsubscribe, [parallel], [serialize_parse_unsubscribe, - serialize_parse_unsubscribe_v5]}, + serialize_parse_unsubscribe_v5 + ]}, {unsuback, [parallel], [serialize_parse_unsuback, - serialize_parse_unsuback_v5]}, + serialize_parse_unsuback_v5 + ]}, {ping, [parallel], [serialize_parse_pingreq, - serialize_parse_pingresp]}, + serialize_parse_pingresp + ]}, {disconnect, [parallel], [serialize_parse_disconnect, - serialize_parse_disconnect_v5]}, + serialize_parse_disconnect_v5 + ]}, {auth, [parallel], - [serialize_parse_auth_v5]}]. + [serialize_parse_auth_v5] + }]. init_per_suite(Config) -> Config. @@ -97,7 +107,7 @@ end_per_group(_Group, _Config) -> serialize_parse_connect(_) -> Packet1 = ?CONNECT_PACKET(#mqtt_packet_connect{}), - ?assertEqual({ok, Packet1, <<>>}, parse_serialize(Packet1)), + ?assertEqual(Packet1, parse_serialize(Packet1)), Packet2 = ?CONNECT_PACKET(#mqtt_packet_connect{ client_id = <<"clientId">>, will_qos = ?QOS_1, @@ -105,8 +115,9 @@ serialize_parse_connect(_) -> will_retain = true, will_topic = <<"will">>, will_payload = <<"bye">>, - clean_start = true}), - ?assertEqual({ok, Packet2, <<>>}, parse_serialize(Packet2)). + clean_start = true + }), + ?assertEqual(Packet2, parse_serialize(Packet2)). serialize_parse_v3_connect(_) -> Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, @@ -117,8 +128,9 @@ serialize_parse_v3_connect(_) -> proto_name = <<"MQIsdp">>, client_id = <<"mosqpub/10451-iMac.loca">>, clean_start = true, - keepalive = 60}), - ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + keepalive = 60 + }), + ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). serialize_parse_v4_connect(_) -> Bin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117, @@ -128,8 +140,8 @@ serialize_parse_v4_connect(_) -> client_id = <<"mosqpub/10451-iMac.loca">>, clean_start = true, keepalive = 60}), - ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), - ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + ?assertEqual(Bin, serialize_to_binary(Packet)), + ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). serialize_parse_v5_connect(_) -> Props = #{'Session-Expiry-Interval' => 60, @@ -141,7 +153,8 @@ serialize_parse_v5_connect(_) -> 'Request-Response-Information' => 1, 'Request-Problem-Information' => 1, 'Authentication-Method' => <<"oauth2">>, - 'Authentication-Data' => <<"33kx93k">>}, + 'Authentication-Data' => <<"33kx93k">> + }, WillProps = #{'Will-Delay-Interval' => 60, 'Payload-Format-Indicator' => 1, @@ -149,7 +162,8 @@ serialize_parse_v5_connect(_) -> 'Content-Type' => <<"text/json">>, 'Response-Topic' => <<"topic">>, 'Correlation-Data' => <<"correlateid">>, - 'User-Property' => [{<<"k">>, <<"v">>}]}, + 'User-Property' => [{<<"k">>, <<"v">>}] + }, Packet = ?CONNECT_PACKET( #mqtt_packet_connect{proto_name = <<"MQTT">>, proto_ver = ?MQTT_PROTO_V5, @@ -165,18 +179,21 @@ serialize_parse_v5_connect(_) -> will_topic = <<"topic">>, will_payload = <<>>, username = <<"device:1">>, - password = <<"passwd">>}), - ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + password = <<"passwd">> + }), + ?assertEqual(Packet, parse_serialize(Packet)). serialize_parse_connect_without_clientid(_) -> Bin = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>, - Packet = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = 4, - proto_name = <<"MQTT">>, - client_id = <<>>, - clean_start = true, - keepalive = 60}), - ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), - ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + Packet = ?CONNECT_PACKET( + #mqtt_packet_connect{proto_ver = 4, + proto_name = <<"MQTT">>, + client_id = <<>>, + clean_start = true, + keepalive = 60 + }), + ?assertEqual(Bin, serialize_to_binary(Packet)), + ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). serialize_parse_connect_with_will(_) -> Bin = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112, @@ -195,9 +212,10 @@ serialize_parse_connect_with_will(_) -> will_topic = <<"/will">>, will_payload = <<"willmsg">>, username = <<"test">>, - password = <<"public">>}}, - ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), - ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + password = <<"public">> + }}, + ?assertEqual(Bin, serialize_to_binary(Packet)), + ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). serialize_parse_bridge_connect(_) -> Bin = <<16,86,0,6,77,81,73,115,100,112,131,44,0,60,0,19,67,95,48,48,58,48,67, @@ -216,14 +234,15 @@ serialize_parse_bridge_connect(_) -> clean_start = false, keepalive = 60, will_topic = Topic, - will_payload = <<"0">>}}, - ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), - ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + will_payload = <<"0">> + }}, + ?assertEqual(Bin, serialize_to_binary(Packet)), + ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). serialize_parse_connack(_) -> Packet = ?CONNACK_PACKET(?RC_SUCCESS), - ?assertEqual(<<32,2,0,0>>, iolist_to_binary(serialize(Packet))), - ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + ?assertEqual(<<32,2,0,0>>, serialize_to_binary(Packet)), + ?assertEqual(Packet, parse_serialize(Packet)). serialize_parse_connack_v5(_) -> Props = #{'Session-Expiry-Interval' => 60, @@ -241,10 +260,10 @@ serialize_parse_connack_v5(_) -> 'Response-Information' => <<"response">>, 'Server-Reference' => <<"192.168.1.10">>, 'Authentication-Method' => <<"oauth2">>, - 'Authentication-Data' => <<"33kx93k">>}, + 'Authentication-Data' => <<"33kx93k">> + }, Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), - ?assertEqual({ok, Packet, <<>>}, - parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). serialize_parse_qos0_publish(_) -> Bin = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111>>, @@ -255,8 +274,8 @@ serialize_parse_qos0_publish(_) -> variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>, packet_id = undefined}, payload = <<"hello">>}, - ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), - ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + ?assertEqual(Bin, serialize_to_binary(Packet)), + ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). serialize_parse_qos1_publish(_) -> Bin = <<50,13,0,5,97,47,98,47,99,0,1,104,97,104,97>>, @@ -267,12 +286,12 @@ serialize_parse_qos1_publish(_) -> variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>, packet_id = 1}, payload = <<"haha">>}, - ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), - ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + ?assertEqual(Bin, serialize_to_binary(Packet)), + ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). serialize_parse_qos2_publish(_) -> Packet = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, payload()), - ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)). serialize_parse_publish_v5(_) -> Props = #{'Payload-Format-Indicator' => 1, @@ -283,147 +302,139 @@ serialize_parse_publish_v5(_) -> 'Subscription-Identifier' => 1, 'Content-Type' => <<"text/json">>}, Packet = ?PUBLISH_PACKET(?QOS_1, <<"$share/group/topic">>, 1, Props, <<"payload">>), - ?assertEqual({ok, Packet, <<>>}, - parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). serialize_parse_puback(_) -> Packet = ?PUBACK_PACKET(1), - ?assertEqual(<<64,2,0,1>>, iolist_to_binary(serialize(Packet))), - ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + ?assertEqual(<<64,2,0,1>>, serialize_to_binary(Packet)), + ?assertEqual(Packet, parse_serialize(Packet)). serialize_parse_puback_v5(_) -> Packet = ?PUBACK_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), - ?assertEqual({ok, Packet, <<>>}, - parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). serialize_parse_pubrec(_) -> Packet = ?PUBREC_PACKET(1), - ?assertEqual(<<5:4,0:4,2,0,1>>, iolist_to_binary(serialize(Packet))), - ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + ?assertEqual(<<5:4,0:4,2,0,1>>, serialize_to_binary(Packet)), + ?assertEqual(Packet, parse_serialize(Packet)). serialize_parse_pubrec_v5(_) -> Packet = ?PUBREC_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), - ?assertEqual({ok, Packet, <<>>}, - parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). serialize_parse_pubrel(_) -> Packet = ?PUBREL_PACKET(1), - ?assertEqual(<<6:4,2:4,2,0,1>>, iolist_to_binary(serialize(Packet))), - ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + Bin = serialize_to_binary(Packet), + ?assertEqual(<<6:4,2:4,2,0,1>>, Bin), + ?assertEqual(Packet, parse_serialize(Packet)). serialize_parse_pubrel_v5(_) -> Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), - ?assertEqual({ok, Packet, <<>>}, - parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). serialize_parse_pubcomp(_) -> Packet = ?PUBCOMP_PACKET(1), - ?assertEqual(<<7:4,0:4,2,0,1>>, iolist_to_binary(serialize(Packet))), - ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + Bin = serialize_to_binary(Packet), + ?assertEqual(<<7:4,0:4,2,0,1>>, Bin), + ?assertEqual(Packet, parse_serialize(Packet)). serialize_parse_pubcomp_v5(_) -> Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), - ?assertEqual({ok, Packet, <<>>}, - parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). serialize_parse_subscribe(_) -> %% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}]) Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>, - TopicOpts = #{ nl => 0 , rap => 0, rc => 0, - rh => 0, qos => 2 }, + TopicOpts = #{nl => 0 , rap => 0, rc => 0, rh => 0, qos => 2}, TopicFilters = [{<<"TopicA">>, TopicOpts}], Packet = ?SUBSCRIBE_PACKET(2, TopicFilters), - ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), - ct:log("Bin: ~p, Packet: ~p ~n", [Packet, parse(Bin)]), - ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + ?assertEqual(Bin, serialize_to_binary(Packet)), + %%ct:log("Bin: ~p, Packet: ~p ~n", [Packet, parse(Bin)]), + ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). serialize_parse_subscribe_v5(_) -> TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}}, {<<"TopicQos1">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}}], - Packet = ?SUBSCRIBE_PACKET(3, #{'Subscription-Identifier' => 16#FFFFFFF}, - TopicFilters), - ?assertEqual({ok, Packet, <<>>}, - parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + Packet = ?SUBSCRIBE_PACKET(3, #{'Subscription-Identifier' => 16#FFFFFFF}, TopicFilters), + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). serialize_parse_suback(_) -> Packet = ?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128]), - ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)). serialize_parse_suback_v5(_) -> Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>, 'User-Property' => [{<<"key">>, <<"value">>}]}, [?QOS_0, ?QOS_1, 128]), - ?assertEqual({ok, Packet, <<>>}, - parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). - + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). serialize_parse_unsubscribe(_) -> %% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>]) Packet = ?UNSUBSCRIBE_PACKET(2, [<<"TopicA">>]), Bin = <<162,10,0,2,0,6,84,111,112,105,99,65>>, - ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), - ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + ?assertEqual(Bin, serialize_to_binary(Packet)), + ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). serialize_parse_unsubscribe_v5(_) -> Props = #{'User-Property' => [{<<"key">>, <<"val">>}]}, Packet = ?UNSUBSCRIBE_PACKET(10, Props, [<<"Topic1">>, <<"Topic2">>]), - ?assertEqual({ok, Packet, <<>>}, - parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). serialize_parse_unsuback(_) -> Packet = ?UNSUBACK_PACKET(10), - ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)). serialize_parse_unsuback_v5(_) -> Packet = ?UNSUBACK_PACKET(10, #{'Reason-String' => <<"Not authorized">>, 'User-Property' => [{<<"key">>, <<"val">>}]}, [16#87, 16#87, 16#87]), - ?assertEqual({ok, Packet, <<>>}, - parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). serialize_parse_pingreq(_) -> PingReq = ?PACKET(?PINGREQ), - ?assertEqual({ok, PingReq, <<>>}, parse_serialize(PingReq)). + ?assertEqual(PingReq, parse_serialize(PingReq)). serialize_parse_pingresp(_) -> PingResp = ?PACKET(?PINGRESP), - ?assertEqual({ok, PingResp, <<>>}, parse_serialize(PingResp)). + ?assertEqual(PingResp, parse_serialize(PingResp)). parse_disconnect(_) -> - ?assertEqual({ok, ?DISCONNECT_PACKET(?RC_SUCCESS), <<>>}, parse(<<224, 0>>)). + Packet = ?DISCONNECT_PACKET(?RC_SUCCESS), + ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(<<224, 0>>)). serialize_parse_disconnect(_) -> Packet = ?DISCONNECT_PACKET(?RC_SUCCESS), - ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)). serialize_parse_disconnect_v5(_) -> Packet = ?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' => 60, - 'Reason-String' => <<"server_moved">>, - 'Server-Reference' => <<"192.168.1.10">>}), - ?assertEqual({ok, Packet, <<>>}, - parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + 'Reason-String' => <<"server_moved">>, + 'Server-Reference' => <<"192.168.1.10">> + }), + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). serialize_parse_auth_v5(_) -> Packet = ?AUTH_PACKET(?RC_SUCCESS, #{'Authentication-Method' => <<"oauth2">>, - 'Authentication-Data' => <<"3zekkd">>, - 'Reason-String' => <<"success">>, - 'User-Property' => [{<<"key">>, <<"val">>}]}), - ?assertEqual({ok, Packet, <<>>}, - parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + 'Authentication-Data' => <<"3zekkd">>, + 'Reason-String' => <<"success">>, + 'User-Property' => [{<<"key">>, <<"val">>}] + }), + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). parse_serialize(Packet) -> - parse(iolist_to_binary(serialize(Packet))). + parse_serialize(Packet, #{}). parse_serialize(Packet, Opts) when is_map(Opts) -> - parse(iolist_to_binary(serialize(Packet, Opts)), Opts). + Bin = iolist_to_binary(emqx_frame:serialize(Packet, Opts)), + ParseState = emqx_frame:initial_parse_state(Opts), + {ok, NPacket, <<>>, _} = emqx_frame:parse(Bin, ParseState), + NPacket. -parse(Bin) -> - parse(Bin, #{}). - -parse(Bin, Opts) when is_map(Opts) -> - emqx_frame:parse(Bin, emqx_frame:initial_state(Opts)). +serialize_to_binary(Packet) -> + iolist_to_binary(emqx_frame:serialize(Packet)). payload() -> iolist_to_binary(["payload." || _I <- lists:seq(1, 1000)]). + diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 9e8107665..60d298008 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -139,7 +139,7 @@ connect_v4(_) -> })), emqx_client_sock:send(Sock, ConnectPacket), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_parse(Data, ?MQTT_PROTO_V4), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V4), emqx_client_sock:send(Sock, ConnectPacket), {error, closed} = gen_tcp:recv(Sock, 0) @@ -156,7 +156,7 @@ connect_v5(_) -> properties = #{'Request-Response-Information' => -1}}))), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) + {ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) end), with_connection(fun([Sock]) -> @@ -168,7 +168,7 @@ connect_v5(_) -> properties = #{'Request-Problem-Information' => 2}}))), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) + {ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) end), with_connection(fun([Sock]) -> @@ -181,7 +181,7 @@ connect_v5(_) -> #{'Request-Response-Information' => 1}}) )), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), _} = + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), ?assertNot(maps:is_key('Response-Information', Props)) end), @@ -202,7 +202,7 @@ connect_v5(_) -> )), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, - #{'Topic-Alias-Maximum' := 20}), _} = + #{'Topic-Alias-Maximum' := 20}), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize( @@ -211,7 +211,7 @@ connect_v5(_) -> )), {ok, Data2} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5) + {ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), <<>>, _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5) end), % topic alias maximum @@ -227,7 +227,7 @@ connect_v5(_) -> )), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, - #{'Topic-Alias-Maximum' := 20}), _} = + #{'Topic-Alias-Maximum' := 20}), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, @@ -237,7 +237,7 @@ connect_v5(_) -> rc => 0}}]), #{version => ?MQTT_PROTO_V5})), {ok, Data2} = gen_tcp:recv(Sock, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5), + {ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize( @@ -247,11 +247,11 @@ connect_v5(_) -> {ok, Data3} = gen_tcp:recv(Sock, 0), - {ok, ?PUBACK_PACKET(1, 0), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), + {ok, ?PUBACK_PACKET(1, 0), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), {ok, Data4} = gen_tcp:recv(Sock, 0), - {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"hello">>), _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5), + {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"hello">>), <<>>, _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize( @@ -260,7 +260,7 @@ connect_v5(_) -> )), {ok, Data5} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), _} = raw_recv_parse(Data5, ?MQTT_PROTO_V5) + {ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), <<>>, _} = raw_recv_parse(Data5, ?MQTT_PROTO_V5) end), % test clean start @@ -276,7 +276,7 @@ connect_v5(_) -> #{'Session-Expiry-Interval' => 10}}) )), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize( ?DISCONNECT_PACKET(?RC_SUCCESS) )) @@ -296,7 +296,7 @@ connect_v5(_) -> #{'Session-Expiry-Interval' => 10}}) )), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) end), % test will message publish and cancel @@ -320,7 +320,7 @@ connect_v5(_) -> ) ), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), {ok, Sock2} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, [binary, {packet, raw}, @@ -335,7 +335,7 @@ connect_v5(_) -> rc => 0}}]), #{version => ?MQTT_PROTO_V5})), {ok, SubData} = gen_tcp:recv(Sock2, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), + {ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize( ?DISCONNECT_PACKET(?RC_SUCCESS))), @@ -367,7 +367,7 @@ connect_v5(_) -> ) ), {ok, Data3} = gen_tcp:recv(Sock3, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock3, raw_send_serialize( ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE), @@ -376,7 +376,8 @@ connect_v5(_) -> ), {ok, WillData} = gen_tcp:recv(Sock2, 0, 5000), - {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5) + {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), <<>>, _} + = raw_recv_parse(WillData, ?MQTT_PROTO_V5) end), % duplicate client id @@ -393,7 +394,7 @@ connect_v5(_) -> #{'Session-Expiry-Interval' => 10}}) )), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock1, raw_send_serialize( @@ -408,7 +409,7 @@ connect_v5(_) -> #{'Session-Expiry-Interval' => 10}}) )), {ok, Data1} = gen_tcp:recv(Sock1, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data1, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), <<>>, _} = raw_recv_parse(Data1, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, qos => ?QOS_2, @@ -418,7 +419,7 @@ connect_v5(_) -> #{version => ?MQTT_PROTO_V5})), {ok, SubData} = gen_tcp:recv(Sock, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), + {ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock1, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, qos => ?QOS_2, @@ -428,7 +429,7 @@ connect_v5(_) -> #{version => ?MQTT_PROTO_V5})), {ok, SubData1} = gen_tcp:recv(Sock1, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData1, ?MQTT_PROTO_V5) + {ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} = raw_recv_parse(SubData1, ?MQTT_PROTO_V5) end, 2), ok. @@ -441,7 +442,7 @@ do_connect(Sock, ProtoVer) -> proto_ver = ProtoVer }))), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_parse(Data, ProtoVer). + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), <<>>, _} = raw_recv_parse(Data, ProtoVer). subscribe_v4(_) -> with_connection(fun([Sock]) -> @@ -455,7 +456,7 @@ subscribe_v4(_) -> rc => 0}}])), emqx_client_sock:send(Sock, SubPacket), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?SUBACK_PACKET(15, _), _} = raw_recv_parse(Data, ?MQTT_PROTO_V4) + {ok, ?SUBACK_PACKET(15, _), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V4) end), ok. @@ -466,7 +467,7 @@ subscribe_v5(_) -> #{version => ?MQTT_PROTO_V5}), emqx_client_sock:send(Sock, SubPacket), {ok, DisConnData} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_TOPIC_FILTER_INVALID), _} = + {ok, ?DISCONNECT_PACKET(?RC_TOPIC_FILTER_INVALID), <<>>, _} = raw_recv_parse(DisConnData, ?MQTT_PROTO_V5) end), with_connection(fun([Sock]) -> @@ -479,8 +480,9 @@ subscribe_v5(_) -> #{version => ?MQTT_PROTO_V5}), emqx_client_sock:send(Sock, SubPacket), {ok, DisConnData} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _} = - raw_recv_parse(DisConnData, ?MQTT_PROTO_V5) + ?assertMatch( + {ok, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), <<>>, _}, + raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)) end), with_connection(fun([Sock]) -> do_connect(Sock, ?MQTT_PROTO_V5), @@ -493,8 +495,9 @@ subscribe_v5(_) -> #{version => ?MQTT_PROTO_V5}), emqx_client_sock:send(Sock, SubPacket), {ok, DisConnData} = gen_tcp:recv(Sock, 0), - {ok, ?DISCONNECT_PACKET(?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED), _} = - raw_recv_parse(DisConnData, ?MQTT_PROTO_V5) + ?assertMatch( + {ok, ?DISCONNECT_PACKET(?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED), <<>>, _}, + raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)) end), with_connection(fun([Sock]) -> do_connect(Sock, ?MQTT_PROTO_V5), @@ -507,8 +510,8 @@ subscribe_v5(_) -> #{version => ?MQTT_PROTO_V5}), emqx_client_sock:send(Sock, SubPacket), {ok, SubData} = gen_tcp:recv(Sock, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = - raw_recv_parse(SubData, ?MQTT_PROTO_V5) + {ok, ?SUBACK_PACKET(1, #{}, [2]), <<>>, _} + = raw_recv_parse(SubData, ?MQTT_PROTO_V5) end), ok. @@ -524,9 +527,8 @@ raw_send_serialize(Packet) -> raw_send_serialize(Packet, Opts) -> emqx_frame:serialize(Packet, Opts). -raw_recv_parse(P, ProtoVersion) -> - emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, - version => ProtoVersion}}). +raw_recv_parse(Bin, ProtoVer) -> + emqx_frame:parse(Bin, emqx_frame:initial_parse_state(#{version => ProtoVer})). acl_deny_action_ct(_) -> emqx_zone:set_env(external, acl_deny_action, disconnect), diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_channel_SUITE.erl similarity index 68% rename from test/emqx_ws_connection_SUITE.erl rename to test/emqx_ws_channel_SUITE.erl index c086ef6b7..29f02e653 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_channel_SUITE.erl @@ -1,4 +1,5 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -11,32 +12,26 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. +%%-------------------------------------------------------------------- --module(emqx_ws_connection_SUITE). +-module(emqx_ws_channel_SUITE). -compile(export_all). -compile(nowarn_export_all). --include_lib("eunit/include/eunit.hrl"). - --include_lib("common_test/include/ct.hrl"). - -include("emqx_mqtt.hrl"). - +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{ client_id = <<"mqtt_client">>, username = <<"admin">>, password = <<"public">>})). --define(SUBCODE, [0]). - --define(PACKETID, 1). - --define(PUBQOS, 1). - all() -> - [t_ws_connect_api]. + [ t_ws_connect_api + , t_ws_auth_failure + ]. init_per_suite(Config) -> emqx_ct_helpers:start_apps([]), @@ -45,32 +40,42 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). -t_ws_connect_api(_Config) -> +t_ws_auth_failure(_Config) -> + application:set_env(emqx, allow_anonymous, false), WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), {ok, _} = rfc6455_client:open(WS), Packet = raw_send_serialize(?CLIENT), ok = rfc6455_client:send_binary(WS, Packet), - {binary, CONACK} = rfc6455_client:recv(WS), - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK), + {binary, CONNACK} = rfc6455_client:recv(WS), + {ok, ?CONNACK_PACKET(?CONNACK_AUTH), <<>>, _} = raw_recv_pase(CONNACK), + application:set_env(emqx, allow_anonymous, true), + ok. + +t_ws_connect_api(_Config) -> + WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), + {ok, _} = rfc6455_client:open(WS), + ok = rfc6455_client:send_binary(WS, raw_send_serialize(?CLIENT)), + {binary, Bin} = rfc6455_client:recv(WS), + Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT), + {ok, Connack, <<>>, _} = raw_recv_pase(Bin), Pid = emqx_cm:lookup_conn_pid(<<"mqtt_client">>), - ConnInfo = emqx_ws_connection:info(Pid), + ConnInfo = emqx_ws_channel:info(Pid), ok = t_info(ConnInfo), - ConnAttrs = emqx_ws_connection:attrs(Pid), + ConnAttrs = emqx_ws_channel:attrs(Pid), ok = t_attrs(ConnAttrs), - ConnStats = emqx_ws_connection:stats(Pid), + ConnStats = emqx_ws_channel:stats(Pid), ok = t_stats(ConnStats), - SessionPid = emqx_ws_connection:session(Pid), + SessionPid = emqx_ws_channel:session(Pid), true = is_pid(SessionPid), - ok = emqx_ws_connection:kick(Pid), + ok = emqx_ws_channel:kick(Pid), {close, _} = rfc6455_client:close(WS), ok. raw_send_serialize(Packet) -> emqx_frame:serialize(Packet). -raw_recv_pase(P) -> - emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4} }). +raw_recv_pase(Packet) -> + emqx_frame:parse(Packet). t_info(InfoData) -> ?assertEqual(websocket, maps:get(socktype, InfoData)), @@ -81,7 +86,7 @@ t_info(InfoData) -> t_attrs(AttrsData) -> ?assertEqual(<<"mqtt_client">>, maps:get(client_id, AttrsData)), - ?assertEqual(emqx_ws_connection, maps:get(conn_mod, AttrsData)), + ?assertEqual(emqx_ws_channel, maps:get(conn_mod, AttrsData)), ?assertEqual(<<"admin">>, maps:get(username, AttrsData)). t_stats(StatsData) ->