diff --git a/Makefile b/Makefile index 41cdd5ab8..09b9a4da2 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ - emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc + emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ + emqx_packet emqx_connection emqx_tracer CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 9f2572b32..7eb68e329 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -78,11 +78,14 @@ info(#state{transport = Transport, {sockname, Sockname}, {conn_state, ConnState}, {active_n, ActiveN}, - {rate_limit, esockd_rate_limit:info(RateLimit)}, - {pub_limit, esockd_rate_limit:info(PubLimit)}], + {rate_limit, rate_limit_info(RateLimit)}, + {pub_limit, rate_limit_info(PubLimit)}], ProtoInfo = emqx_protocol:info(ProtoState), lists:usort(lists:append(ConnInfo, ProtoInfo)). +rate_limit_info(undefined) -> #{}; +rate_limit_info(Limit) -> esockd_rate_limit:info(Limit). + %% for dashboard attrs(CPid) when is_pid(CPid) -> call(CPid, attrs); diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl new file mode 100644 index 000000000..1678eab44 --- /dev/null +++ b/test/emqx_connection_SUITE.erl @@ -0,0 +1,102 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_connection_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("common_test/include/ct.hrl"). + +-include("emqx_mqtt.hrl"). + +-define(STATS, [{mailbox_len, _}, + {heap_size, _}, + {reductions, _}, + {recv_pkt, _}, + {recv_msg, _}, + {send_pkt, _}, + {send_msg, _}, + {recv_oct, _}, + {recv_cnt, _}, + {send_oct, _}, + {send_cnt, _}, + {send_pend, _}]). + +-define(ATTRS, [{clean_start, _}, + {client_id, _}, + {connected_at, _}, + {is_bridge, _}, + {is_super, _}, + {keepalive, _}, + {mountpoint, _}, + {peercert, _}, + {peername, _}, + {proto_name, _}, + {proto_ver, _}, + {sockname, _}, + {username, _}, + {zone, _}]). + +-define(INFO, [{ack_props, _}, + {active_n, _}, + {clean_start, _}, + {client_id, _}, + {conn_props, _}, + {conn_state, _}, + {connected_at, _}, + {enable_acl, _}, + {is_bridge, _}, + {is_super, _}, + {keepalive, _}, + {mountpoint, _}, + {peercert, _}, + {peername, _}, + {proto_name, _}, + {proto_ver, _}, + {pub_limit, _}, + {rate_limit, _}, + {session, _}, + {sockname, _}, + {socktype, _}, + {topic_aliases, _}, + {username, _}, + {will_msg, _}, + {zone, _}]). + +all() -> + [t_connect_api]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +t_connect_api(_Config) -> + {ok, T1} = emqx_client:start_link([{host, "localhost"}, + {client_id, <<"client1">>}, + {username, <<"testuser1">>}, + {password, <<"pass1">>}]), + {ok, _} = emqx_client:connect(T1), + CPid = emqx_cm:lookup_conn_pid(<<"client1">>), + ?STATS = emqx_connection:stats(CPid), + ?ATTRS = emqx_connection:attrs(CPid), + ?INFO = emqx_connection:info(CPid), + SessionPid = emqx_connection:session(CPid), + true = is_pid(SessionPid), + emqx_client:disconnect(T1). diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 3bf9976c7..5189a395b 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -28,10 +28,10 @@ -define(TOPICS, [<<"TopicA">>, <<"TopicA/B">>, <<"Topic/C">>, <<"TopicA/C">>, <<"/TopicA">>]). --define(CLIENT2, ?CONNECT_PACKET(#mqtt_packet_connect{ - username = <<"admin">>, - clean_start = false, - password = <<"public">>})). +-define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + username = <<"emqx">>, + password = <<"public">>})). all() -> [ @@ -507,7 +507,6 @@ raw_recv_parse(P, ProtoVersion) -> emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, version => ProtoVersion}}). - acl_deny_action(_) -> emqx_zone:set_env(external, acl_deny_action, disconnect), process_flag(trap_exit, true), @@ -557,6 +556,7 @@ acl_deny_do_disconnect(publish, QoS, Topic) -> {'EXIT', Client, _Reason} -> false = is_process_alive(Client) end; + acl_deny_do_disconnect(subscribe, QoS, Topic) -> {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]), {ok, _} = emqx_client:connect(Client), diff --git a/test/emqx_tracer_SUITE.erl b/test/emqx_tracer_SUITE.erl new file mode 100644 index 000000000..b8ec8e21c --- /dev/null +++ b/test/emqx_tracer_SUITE.erl @@ -0,0 +1,47 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_tracer_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("common_test/include/ct.hrl"). + +all() -> [start_traces]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +start_traces(_Config) -> + {ok, T} = emqx_client:start_link([{host, "localhost"}, + {client_id, <<"client">>}, + {username, <<"testuser">>}, + {password, <<"pass">>}]), + emqx_client:connect(T), + emqx_client:subscribe(T, <<"a/b/c">>), + ok = emqx_tracer:start_trace({client_id, <<"client">>}, all, "test/emqx_SUITE_data/clientid_trace.log"), + ok = emqx_tracer:start_trace({topic, <<"topic">>}, all, "test/emqx_SUITE_data/topic_trace.log"), + {ok, _} = file:read_file("test/emqx_SUITE_data/clientid_trace.log"), + {ok, _} = file:read_file("test/emqx_SUITE_data/topic_trace.log"), + Result = emqx_tracer:lookup_traces(), + ?assertEqual([{{client_id,<<"client">>},{all,"test/emqx_SUITE_data/clientid_trace.log"}},{{topic,<<"topic">>},{all,"test/emqx_SUITE_data/topic_trace.log"}}], Result), + ok = emqx_tracer:stop_trace({client_id, <<"client">>}), + ok = emqx_tracer:stop_trace({topic, <<"topic">>}). diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl new file mode 100644 index 000000000..848e518e1 --- /dev/null +++ b/test/emqx_ws_connection_SUITE.erl @@ -0,0 +1,121 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_ws_connection_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("common_test/include/ct.hrl"). + +-include("emqx_mqtt.hrl"). + + +-define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + username = <<"admin">>, + password = <<"public">>})). + +-define(SUBCODE, [0]). + +-define(PACKETID, 1). + +-define(PUBQOS, 1). + +-define(INFO, [{socktype, _}, + {conn_state, _}, + {peername, _}, + {sockname, _}, + {zone, _}, + {client_id, <<"mqtt_client">>}, + {username, <<"admin">>}, + {peername, _}, + {peercert, _}, + {proto_ver, _}, + {proto_name, _}, + {clean_start, _}, + {keepalive, _}, + {mountpoint, _}, + {is_super, _}, + {is_bridge, _}, + {connected_at, _}, + {conn_props, _}, + {ack_props, _}, + {session, _}, + {topic_aliases, _}, + {will_msg, _}, + {enable_acl, _}]). + +-define(ATTRS, [{clean_start,true}, + {client_id, <<"mqtt_client">>}, + {connected_at, _}, + {is_bridge, _}, + {is_super, _}, + {keepalive, _}, + {mountpoint, _}, + {peercert, _}, + {peername, _}, + {proto_name, _}, + {proto_ver, _}, + {sockname, _}, + {username, <<"admin">>}, + {zone, _}]). + +-define(STATS, [{recv_oct, _}, + {recv_cnt, _}, + {send_oct, _}, + {send_cnt, _}, + {mailbox_len, _}, + {heap_size, _}, + {reductions, _}, + {recv_pkt, _}, + {recv_msg, _}, + {send_pkt, _}, + {send_msg, _}]). + +all() -> + [t_ws_connect_api]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +t_ws_connect_api(_Config) -> + WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), + {ok, _} = rfc6455_client:open(WS), + Packet = raw_send_serialize(?CLIENT), + ok = rfc6455_client:send_binary(WS, Packet), + {binary, CONACK} = rfc6455_client:recv(WS), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK), + Pid = emqx_cm:lookup_conn_pid(<<"mqtt_client">>), + ?INFO = emqx_ws_connection:info(Pid), + ?ATTRS = emqx_ws_connection:attrs(Pid), + ?STATS = emqx_ws_connection:stats(Pid), + SessionPid = emqx_ws_connection:session(Pid), + true = is_pid(SessionPid), + ok = emqx_ws_connection:kick(Pid), + {close, _} = rfc6455_client:close(WS), + ok. + +raw_send_serialize(Packet) -> + emqx_frame:serialize(Packet). + +raw_recv_pase(P) -> + emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, + version => ?MQTT_PROTO_V4} }).