diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 13765a5e2..1f8c2a22e 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -703,7 +703,7 @@ waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode, case take_call(connect, State) of {value, #call{from = From}, _State} -> Reply = {error, {Reason, Properties}}, - {stop_and_reply, Reason, [{reply, From, Reply}]}; + {stop_and_reply, {shutdown, Reason}, [{reply, From, Reply}]}; false -> {stop, connack_error} end; @@ -1004,20 +1004,24 @@ handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) handle_event(info, {Error, _Sock, Reason}, _StateName, State) when Error =:= tcp_error; Error =:= ssl_error -> - {stop, Reason, State}; + emqx_logger:error("[~p] ~p, Reason: ~p", [?MODULE, Error, Reason]), + {stop, {shutdown, Reason}, State}; handle_event(info, {Closed, _Sock}, _StateName, State) when Closed =:= tcp_closed; Closed =:= ssl_closed -> + emqx_logger:debug("[~p] ~p", [?MODULE, Closed]), {stop, {shutdown, Closed}, State}; handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) -> - {stop, Reason, State}; + emqx_logger:debug("[~p] Got EXIT from owner, Reason: ~p", [?MODULE, Reason]), + {stop, {shutdown, Reason}, State}; handle_event(info, {inet_reply, _Sock, ok}, _, State) -> {keep_state, State}; handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> - {stop, Reason, State}; + emqx_logger:error("[~p] got tcp error: ~p", [?MODULE, Reason]), + {stop, {shutdown, Reason}, State}; handle_event(EventType, EventContent, StateName, StateData) -> emqx_logger:error("State: ~s, Unexpected Event: (~p, ~p)", @@ -1305,7 +1309,7 @@ hosts(#state{hosts = Hosts}) -> Hosts. send_puback(Packet, State) -> case send(Packet, State) of {ok, NewState} -> {keep_state, NewState}; - {error, Reason} -> {stop, Reason} + {error, Reason} -> {stop, {shutdown, Reason}} end. send(Msg, State) when is_record(Msg, mqtt_msg) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 87bf50d08..8aafb313f 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -820,7 +820,7 @@ check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) -> case emqx_access_control:check_acl(credentials(PState), publish, WillTopic) of allow -> ok; deny -> - ?LOG(warning, "Cannot publish will message to ~p for acl checking failed", [WillTopic]), + ?LOG(warning, "Will message (to ~s) validation failed, acl denied", [WillTopic]), {error, ?RC_UNSPECIFIED_ERROR} end. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 3ebcc6dfd..098d76c52 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -675,7 +675,7 @@ code_change(_OldVsn, State, _Extra) -> maybe_shutdown(undefined, _Reason) -> ok; maybe_shutdown(Pid, normal) -> - Pid ! {shutdown, normal}; + Pid ! {shutdown, normal}; maybe_shutdown(Pid, Reason) -> exit(Pid, Reason). diff --git a/test/emqx_access_SUITE_data/acl_deny_action.conf b/test/emqx_access_SUITE_data/acl_deny_action.conf index 753782605..c7f933ce7 100644 --- a/test/emqx_access_SUITE_data/acl_deny_action.conf +++ b/test/emqx_access_SUITE_data/acl_deny_action.conf @@ -1,4 +1,3 @@ - {deny, {user, "emqx"}, pubsub, ["acl_deny_action"]}. - -{allow, all}. +{deny, {user, "pub_deny"}, publish, ["pub_deny"]}. +{allow, all}. \ No newline at end of file diff --git a/test/emqx_ct_broker_helpers.erl b/test/emqx_ct_broker_helpers.erl index 62a91df54..c1433c203 100644 --- a/test/emqx_ct_broker_helpers.erl +++ b/test/emqx_ct_broker_helpers.erl @@ -124,3 +124,28 @@ client_ssl_twoway() -> client_ssl() -> ?CIPHERS ++ [{reuse_sessions, true}]. + +wait_mqtt_payload(Payload) -> + receive + {publish, #{payload := Payload}} -> + ct:pal("OK - received msg: ~p~n", [Payload]) + after 1000 -> + ct:fail({timeout, Payload, {msg_box, flush()}}) + end. + +not_wait_mqtt_payload(Payload) -> + receive + {publish, #{payload := Payload}} -> + ct:fail({received, Payload}) + after 1000 -> + ct:pal("OK - msg ~p is not received", [Payload]) + end. + +flush() -> + flush([]). +flush(Msgs) -> + receive + M -> flush([M|Msgs]) + after + 0 -> lists:reverse(Msgs) + end. \ No newline at end of file diff --git a/test/emqx_mod_sup_SUITE.erl b/test/emqx_mod_sup_SUITE.erl index 8169c3f91..29b59d0b8 100644 --- a/test/emqx_mod_sup_SUITE.erl +++ b/test/emqx_mod_sup_SUITE.erl @@ -21,8 +21,23 @@ all() -> [t_child_all]. +start_link() -> + Pid = spawn_link(?MODULE, echo, [0]), + {ok, Pid}. + +echo(State) -> + receive + {From, Req} -> + ct:pal("======from:~p, req:~p", [From, Req]), + From ! Req, + echo(State) + end. + t_child_all(_) -> - {ok, _Pid} = emqx_mod_sup:start_link(), - {ok, _Child} = emqx_mod_sup:start_child(emqx_banned, worker), + {ok, Pid} = emqx_mod_sup:start_link(), + {ok, Child} = emqx_mod_sup:start_child(?MODULE, worker), timer:sleep(10), - ok = emqx_mod_sup:stop_child(emqx_banned). + Child ! {self(), hi}, + receive hi -> ok after 100 -> ct:fail({timeout, wait_echo}) end, + ok = emqx_mod_sup:stop_child(?MODULE), + exit(Pid, normal). diff --git a/test/emqx_pool_SUITE.erl b/test/emqx_pool_SUITE.erl index 36ad7c507..72fe51ec0 100644 --- a/test/emqx_pool_SUITE.erl +++ b/test/emqx_pool_SUITE.erl @@ -30,7 +30,7 @@ all() -> groups() -> [ {submit_case, [sequence], [submit_mfa, submit_fa]}, - {async_submit_case, [sequence], [async_submit_mfa, async_submit_ex]} + {async_submit_case, [sequence], [async_submit_mfa, async_submit_crash]} ]. init_per_suite(Config) -> @@ -61,8 +61,8 @@ async_submit_mfa(_Config) -> emqx_pool:async_submit({?MODULE, test_mfa, []}), emqx_pool:async_submit(fun ?MODULE:test_mfa/0, []). -async_submit_ex(_) -> - emqx_pool:async_submit(fun error_fun/0). +async_submit_crash(_) -> + emqx_pool:async_submit(fun() -> A = 1, A = 0 end). t_unexpected(_) -> Pid = emqx_pool:worker(), @@ -73,6 +73,3 @@ t_unexpected(_) -> test_mfa() -> lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]). - -error_fun() -> error(test_error). - diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index becdd60f2..fc96cc839 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -101,19 +101,17 @@ all() -> ]. groups() -> - [{mqtt_common, - [sequence], - [will_check]}, - {mqttv4, - [sequence], + [{mqtt_common, [sequence], + [will_topic_check, + will_acl_check + ]}, + {mqttv4, [sequence], [connect_v4, subscribe_v4]}, - {mqttv5, - [sequence], + {mqttv5, [sequence], [connect_v5, subscribe_v5]}, - {acl, - [sequence], + {acl, [sequence], [acl_deny_action_ct, acl_deny_action_eunit]}]. @@ -266,11 +264,10 @@ connect_v5(_) -> raw_recv_parse(Data, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, - qos => ?QOS_2, - rap => 0, - nl => 0, - rc => 0}}]), - #{version => ?MQTT_PROTO_V5})), + qos => ?QOS_2, + rap => 0, + nl => 0, + rc => 0}}]), #{version => ?MQTT_PROTO_V5})), {ok, Data2} = gen_tcp:recv(Sock, 0), {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5), @@ -365,19 +362,16 @@ connect_v5(_) -> do_connect(Sock2, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock2, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, - qos => ?QOS_2, - rap => 0, - nl => 0, - rc => 0}}]), - #{version => ?MQTT_PROTO_V5})), + qos => ?QOS_2, + rap => 0, + nl => 0, + rc => 0}}]), #{version => ?MQTT_PROTO_V5})), {ok, SubData} = gen_tcp:recv(Sock2, 0), {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize( - ?DISCONNECT_PACKET(?RC_SUCCESS) - ) - ), + ?DISCONNECT_PACKET(?RC_SUCCESS))), {error, timeout} = gen_tcp:recv(Sock2, 0, 2000), @@ -572,8 +566,8 @@ raw_recv_parse(P, ProtoVersion) -> acl_deny_action_ct(_) -> emqx_zone:set_env(external, acl_deny_action, disconnect), process_flag(trap_exit, true), - [acl_deny_do_disconnect(publish, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)], [acl_deny_do_disconnect(subscribe, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)], + [acl_deny_do_disconnect(publish, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)], emqx_zone:set_env(external, acl_deny_action, ignore), ok. @@ -585,57 +579,62 @@ acl_deny_action_eunit(_) -> {error, CodeName, NEWPSTATE2} = emqx_protocol:process_packet(?PUBLISH_PACKET(?QOS_2, <<"acl_deny_action">>, 2, <<"payload">>), PState), ?assertEqual(#{pkt => 1, msg => 0}, NEWPSTATE2#pstate.send_stats). -will_check(_) -> +will_topic_check(_) -> + {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}, + {will_flag, true}, + {will_topic, <<"aaa">>}, + {will_payload, <<"I have died">>}, + {will_qos, 0}]), + {ok, _} = emqx_client:connect(Client), + + {ok, T} = emqx_client:start_link([{client_id, <<"client">>}]), + emqx_client:connect(T), + emqx_client:subscribe(T, <<"aaa">>), + ct:sleep(200), + + emqx_client:stop(Client), + ct:sleep(100), + false = is_process_alive(Client), + emqx_ct_broker_helpers:wait_mqtt_payload(<<"I have died">>), + emqx_client:stop(T). + +will_acl_check(_) -> + %% The connection will be rejected if publishing of the will message is not allowed by + %% ACL rules process_flag(trap_exit, true), - will_topic_check(0), - will_acl_check(0). - -will_topic_check(QoS) -> - {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}, + {ok, Client} = emqx_client:start_link([{username, <<"pub_deny">>}, {will_flag, true}, - {will_topic, <<"">>}, + {will_topic, <<"pub_deny">>}, {will_payload, <<"I have died">>}, - {will_qos, QoS}]), - try emqx_client:connect(Client) of - _ -> - ok - catch - exit : _Reason -> - false = is_process_alive(Client) - end. - -will_acl_check(QoS) -> - {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}, - {will_flag, true}, - {will_topic, <<"acl_deny_action">>}, - {will_payload, <<"I have died">>}, - {will_qos, QoS}]), - try emqx_client:connect(Client) of - _ -> - ok - catch - exit : _Reason -> - false = is_process_alive(Client) - end. + {will_qos, 0}]), + ?assertMatch({error,{_,_}}, emqx_client:connect(Client)). acl_deny_do_disconnect(publish, QoS, Topic) -> + process_flag(trap_exit, true), {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]), {ok, _} = emqx_client:connect(Client), emqx_client:publish(Client, Topic, <<"test">>, QoS), receive - {'EXIT', Client, _Reason} -> - false = is_process_alive(Client) + {'EXIT', Client, {shutdown,tcp_closed}} -> + ct:pal(info, "[OK] after publish, received exit: {shutdown,tcp_closed}"), + false = is_process_alive(Client); + {'EXIT', Client, Reason} -> + ct:pal(info, "[OK] after publish, client got disconnected: ~p", [Reason]) + after 1000 -> ct:fail({timeout, wait_tcp_closed}) end; acl_deny_do_disconnect(subscribe, QoS, Topic) -> + process_flag(trap_exit, true), {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]), {ok, _} = emqx_client:connect(Client), - try emqx_client:subscribe(Client, Topic, QoS) of - _ -> - ok - catch - exit : _Reason -> - false = is_process_alive(Client) + {ok, _, [128]} = emqx_client:subscribe(Client, Topic, QoS), + receive + {'EXIT', Client, {shutdown,tcp_closed}} -> + ct:pal(info, "[OK] after subscribe, received exit: {shutdown,tcp_closed}"), + false = is_process_alive(Client); + {'EXIT', Client, Reason} -> + ct:pal(info, "[OK] after subscribe, client got disconnected: ~p", [Reason]) + after 1000 -> ct:fail({timeout, wait_tcp_closed}) end. start_apps(App, SchemaFile, ConfigFile) -> diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index b3ce70c82..3d1db5ddc 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -16,6 +16,7 @@ -include("emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -compile(export_all). -compile(nowarn_export_all). @@ -33,7 +34,7 @@ all() -> [{group, sm}]. groups() -> [{sm, [non_parallel_tests], - [t_open_close_session, + [ t_resume_session, t_discard_session, t_register_unregister_session, @@ -48,45 +49,47 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). -t_open_close_session(_) -> - {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), - {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}), - ?assertEqual(ok, emqx_sm:close_session(SPid)). +init_per_testcase(_All, Config) -> + {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => self()}), + [{session_pid, SPid}|Config]. -t_resume_session(_) -> - {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), - {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}), - ?assertEqual({ok, SPid}, emqx_sm:resume_session(<<"client">>, ?ATTRS#{conn_pid => ClientPid})). +end_per_testcase(_All, Config) -> + emqx_sm:close_session(?config(session_pid, Config)), + receive + {shutdown, normal} -> ok + after 500 -> ct:fail({timeout, wait_session_shutdown}) + end. + +t_resume_session(Config) -> + ?assertEqual({ok, ?config(session_pid, Config)}, emqx_sm:resume_session(<<"client">>, ?ATTRS#{conn_pid => self()})). t_discard_session(_) -> - {ok, ClientPid} = emqx_mock_client:start_link(<<"client1">>), - {ok, _SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}), ?assertEqual(ok, emqx_sm:discard_session(<<"client1">>)). t_register_unregister_session(_) -> Pid = self(), - {ok, _ClientPid} = emqx_mock_client:start_link(<<"client">>), ?assertEqual(ok, emqx_sm:register_session(<<"client">>)), ?assertEqual(ok, emqx_sm:register_session(<<"client">>, Pid)), ?assertEqual(ok, emqx_sm:unregister_session(<<"client">>)), ?assertEqual(ok, emqx_sm:unregister_session(<<"client">>), Pid). -t_get_set_session_attrs(_) -> - {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), - {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}), - ?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, [?ATTRS#{conn_pid => ClientPid}])), - ?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, SPid, [?ATTRS#{conn_pid => ClientPid}])), - [SAttr] = emqx_sm:get_session_attrs(<<"client">>, SPid), - ?assertEqual(<<"client">>, maps:get(client_id, SAttr)). +t_get_set_session_attrs(Config) -> + SPid = ?config(session_pid, Config), + ClientPid0 = spawn(fun() -> receive _ -> ok end end), + ?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, [?ATTRS#{conn_pid => ClientPid0}])), + ?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, SPid, [?ATTRS#{conn_pid => ClientPid0}])), + [SAttr0] = emqx_sm:get_session_attrs(<<"client">>, SPid), + ?assertEqual(ClientPid0, maps:get(conn_pid, SAttr0)), + ?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, SPid, [?ATTRS#{conn_pid => self()}])), + [SAttr1] = emqx_sm:get_session_attrs(<<"client">>, SPid), + ?assertEqual(self(), maps:get(conn_pid, SAttr1)). -t_get_set_session_stats(_) -> - {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), - {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}), +t_get_set_session_stats(Config) -> + SPid = ?config(session_pid, Config), ?assertEqual(true, emqx_sm:set_session_stats(<<"client">>, [{inflight, 10}])), ?assertEqual(true, emqx_sm:set_session_stats(<<"client">>, SPid, [{inflight, 10}])), ?assertEqual([{inflight, 10}], emqx_sm:get_session_stats(<<"client">>, SPid)). -t_lookup_session_pids(_) -> - {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), - {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}), +t_lookup_session_pids(Config) -> + SPid = ?config(session_pid, Config), ?assertEqual([SPid], emqx_sm:lookup_session_pids(<<"client">>)). diff --git a/test/emqx_sys_mon_SUITE.erl b/test/emqx_sys_mon_SUITE.erl index c014802ac..e2e9fe2ad 100644 --- a/test/emqx_sys_mon_SUITE.erl +++ b/test/emqx_sys_mon_SUITE.erl @@ -58,7 +58,8 @@ validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) -> after 1000 -> ct:fail("flase") - end. + end, + emqx_client:stop(C). concat_str(ValidateInfo, InfoOrPort, Info) -> WarnInfo = io_lib:format(ValidateInfo, [InfoOrPort, Info]), diff --git a/test/emqx_tracer_SUITE.erl b/test/emqx_tracer_SUITE.erl index b8ec8e21c..d1a3bfa5e 100644 --- a/test/emqx_tracer_SUITE.erl +++ b/test/emqx_tracer_SUITE.erl @@ -36,12 +36,39 @@ start_traces(_Config) -> {username, <<"testuser">>}, {password, <<"pass">>}]), emqx_client:connect(T), - emqx_client:subscribe(T, <<"a/b/c">>), - ok = emqx_tracer:start_trace({client_id, <<"client">>}, all, "test/emqx_SUITE_data/clientid_trace.log"), - ok = emqx_tracer:start_trace({topic, <<"topic">>}, all, "test/emqx_SUITE_data/topic_trace.log"), - {ok, _} = file:read_file("test/emqx_SUITE_data/clientid_trace.log"), - {ok, _} = file:read_file("test/emqx_SUITE_data/topic_trace.log"), - Result = emqx_tracer:lookup_traces(), - ?assertEqual([{{client_id,<<"client">>},{all,"test/emqx_SUITE_data/clientid_trace.log"}},{{topic,<<"topic">>},{all,"test/emqx_SUITE_data/topic_trace.log"}}], Result), + + %% Start tracing + ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"), + ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"), + ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"), + ct:sleep(100), + + %% Verify the tracing file exits + ?assert(filelib:is_regular("tmp/client.log")), + ?assert(filelib:is_regular("tmp/client2.log")), + ?assert(filelib:is_regular("tmp/topic_trace.log")), + + %% Get current traces + ?assertEqual([{{client_id,<<"client">>},{debug,"tmp/client.log"}}, + {{client_id,<<"client2">>},{all,"tmp/client2.log"}}, + {{topic,<<"a/#">>},{all,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()), + + %% set the overall log level to debug + emqx_logger:set_log_level(debug), + + %% Client with clientid = "client" publishes a "hi" message to "a/b/c". + emqx_client:publish(T, <<"a/b/c">>, <<"hi">>), + ct:sleep(200), + + %% Verify messages are logged to "tmp/client.log" and "tmp/topic_trace.log", but not "tmp/client2.log". + ?assert(filelib:file_size("tmp/client.log") > 0), + ?assert(filelib:file_size("tmp/topic_trace.log") > 0), + ?assert(filelib:file_size("tmp/client2.log") == 0), + + %% Stop tracing ok = emqx_tracer:stop_trace({client_id, <<"client">>}), - ok = emqx_tracer:stop_trace({topic, <<"topic">>}). + ok = emqx_tracer:stop_trace({client_id, <<"client2">>}), + ok = emqx_tracer:stop_trace({topic, <<"a/#">>}), + emqx_client:disconnect(T), + + emqx_logger:set_log_level(error). \ No newline at end of file