diff --git a/apps/emqx_web_hook/src/emqx_web_hook_app.erl b/apps/emqx_web_hook/src/emqx_web_hook_app.erl index 2ec8ebf42..431cb5792 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_app.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_app.erl @@ -72,11 +72,9 @@ translate_env() -> true -> verify_peer; false -> verify_none end, - TLSOpts = lists:filter(fun({_K, V}) when V =:= <<>> -> - false; - (_) -> - true - end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]), + TLSOpts = lists:filter(fun({_K, V}) -> + V /= <<>> andalso V /= undefined andalso V /= "" andalso true + end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]), TlsVers = ['tlsv1.2','tlsv1.1',tlsv1], NTLSOpts = [{verify, VerifyType}, {versions, TlsVers}, diff --git a/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl b/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl index cd370a57b..94ea212ec 100644 --- a/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl +++ b/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl @@ -47,49 +47,38 @@ init_per_group(Name, Config) -> set_special_cfgs(), case Name of http -> - http_server:start_http(), emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_http/1); https -> - http_server:start_https(), emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_https/1); ipv6http -> - http_server:start_http(), emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_ipv6_http/1); ipv6https -> - http_server:start_https(), emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_ipv6_https/1) end, Config. -end_per_group(Name, Config) -> - case lists:member(Name,[http, ipv6http]) of - true -> - http_server:stop_http(); - _ -> - http_server:stop_https() - end, +end_per_group(_Name, Config) -> emqx_ct_helpers:stop_apps([emqx_web_hook]), Config. set_special_configs_http(_) -> - ok. + application:set_env(emqx_web_hook, url, "http://127.0.0.1:9999"). set_special_configs_https(_) -> Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"), SslOpts = [{keyfile, Path ++ "/client-key.pem"}, {certfile, Path ++ "/client-cert.pem"}, - {cacertfile, Path ++ "/ca.pem"}], + {cafile, Path ++ "/ca.pem"}], application:set_env(emqx_web_hook, ssl, true), application:set_env(emqx_web_hook, ssloptions, SslOpts), - application:set_env(emqx_web_hook, url, "https://127.0.0.1:8081"). + application:set_env(emqx_web_hook, url, "https://127.0.0.1:8888"). -set_special_configs_ipv6_http(N) -> - set_special_configs_http(N), - application:set_env(emqx_web_hook, url, "http://[::1]:8080"). +set_special_configs_ipv6_http(_) -> + application:set_env(emqx_web_hook, url, "http://[::1]:9999"). set_special_configs_ipv6_https(N) -> set_special_configs_https(N), - application:set_env(emqx_web_hook, url, "https://[::1]:8081"). + application:set_env(emqx_web_hook, url, "https://[::1]:8888"). set_special_cfgs() -> AllRules = [{"message.acked", "{\"action\": \"on_message_acked\"}"}, @@ -109,6 +98,27 @@ set_special_cfgs() -> %% Test cases %%-------------------------------------------------------------------- +t_valid(Config) -> + {ok, ServerPid} = http_server:start_link(), + application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]), + {ok, C} = emqtt:start_link([ {clientid, <<"simpleClient">>} + , {proto_ver, v5} + , {keepalive, 60} + ]), + {ok, _} = emqtt:connect(C), + emqtt:subscribe(C, <<"TopicA">>, qos2), + emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2), + emqtt:unsubscribe(C, <<"TopicA">>), + emqtt:disconnect(C), + [begin + Maps = emqx_json:decode(P, [return_maps]), + validate_hook_resp(Maps), + validate_hook_headers(Headers) + end + || {{P, _Bool}, Headers} <- http_server:get_received_data()], + http_server:stop(ServerPid), + Config. + t_check_hooked(_) -> {ok, Rules} = application:get_env(emqx_web_hook, rules), lists:foreach(fun({HookName, _Action}) -> @@ -127,90 +137,69 @@ t_change_config(_) -> application:set_env(emqx_web_hook, rules, Rules), emqx_web_hook:load(). -t_valid() -> - application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]), - {ok, C} = emqtt:start_link([ {clientid, <<"simpleClient">>} - , {proto_ver, v5} - , {keepalive, 60} - ]), - {ok, _} = emqtt:connect(C), - emqtt:subscribe(C, <<"TopicA">>, qos2), - emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2), - emqtt:unsubscribe(C, <<"TopicA">>), - emqtt:disconnect(C), - {Params, Headers} = get_http_message(), - [validate_hook_resp(A) || A <- Params], - ?assertEqual(<<"K1">>, maps:get(<<"k1">>, Headers)), - ?assertEqual(<<"K2">>, maps:get(<<"k2">>, Headers)). - %%-------------------------------------------------------------------- %% Utils %%-------------------------------------------------------------------- -get_http_message() -> - receive - {Params, Headers} -> - L = [B || {B, _} <- Params], - {lists:reverse([emqx_json:decode(E, [return_maps]) || E <- L]), Headers} - after 500 -> - {null, null} - end. +validate_hook_headers(Headers) -> + ?assertEqual(<<"K1">>, maps:get(<<"k1">>, Headers)), + ?assertEqual(<<"K2">>, maps:get(<<"k2">>, Headers)). validate_hook_resp(Body = ?ACTION(<<"client_connect">>)) -> ?assertEqual(5, maps:get(<<"proto_ver">>, Body)), ?assertEqual(60, maps:get(<<"keepalive">>, Body)), ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)), - ?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)), + ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), assert_username_clientid(Body); validate_hook_resp(Body = ?ACTION(<<"client_connack">>)) -> ?assertEqual(5, maps:get(<<"proto_ver">>, Body)), ?assertEqual(60, maps:get(<<"keepalive">>, Body)), ?assertEqual(<<"success">>, maps:get(<<"conn_ack">>, Body)), ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)), - ?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)), + ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), assert_username_clientid(Body); validate_hook_resp(Body = ?ACTION(<<"client_connected">>)) -> _ = maps:get(<<"connected_at">>, Body), ?assertEqual(5, maps:get(<<"proto_ver">>, Body)), ?assertEqual(60, maps:get(<<"keepalive">>, Body)), ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)), - ?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)), + ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), assert_username_clientid(Body); validate_hook_resp(Body = ?ACTION(<<"client_disconnected">>)) -> ?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)), - ?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)), + ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), assert_username_clientid(Body); validate_hook_resp(Body = ?ACTION(<<"client_subscribe">>)) -> _ = maps:get(<<"opts">>, Body), ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)), - ?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)), + ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), assert_username_clientid(Body); validate_hook_resp(Body = ?ACTION(<<"client_unsubscribe">>)) -> _ = maps:get(<<"opts">>, Body), ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)), - ?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)), + ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), assert_username_clientid(Body); validate_hook_resp(Body = ?ACTION(<<"session_subscribed">>)) -> _ = maps:get(<<"opts">>, Body), ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)), - ?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)), + ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), assert_username_clientid(Body); validate_hook_resp(Body = ?ACTION(<<"session_unsubscribed">>)) -> ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)), - ?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)), + ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), assert_username_clientid(Body); validate_hook_resp(Body = ?ACTION(<<"session_terminated">>)) -> ?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)), - ?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)), + ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), assert_username_clientid(Body); validate_hook_resp(Body = ?ACTION(<<"message_publish">>)) -> - ?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)), + ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), assert_messages_attrs(Body); validate_hook_resp(Body = ?ACTION(<<"message_delivered">>)) -> - ?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)), + ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), assert_messages_attrs(Body); validate_hook_resp(Body = ?ACTION(<<"message_acked">>)) -> - ?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)), + ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)), assert_messages_attrs(Body). assert_username_clientid(#{<<"clientid">> := ClientId, <<"username">> := Username}) -> diff --git a/apps/emqx_web_hook/test/http_server.erl b/apps/emqx_web_hook/test/http_server.erl index 2404c7e78..e0f367eba 100644 --- a/apps/emqx_web_hook/test/http_server.erl +++ b/apps/emqx_web_hook/test/http_server.erl @@ -3,73 +3,103 @@ %% %% It will deliver the http-request params to initialer process %%-------------------------------------------------------------------- +%% +%% Author:wwhai +%% -module(http_server). +-behaviour(gen_server). --compile(export_all). --compile(nowarn_export_all). - +-export([start_link/0]). +-export([get_received_data/0]). +-export([stop/1]). +-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]). +-define(HTTP_PORT, 9999). +-define(HTTPS_PORT, 8888). +-record(state, {}). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- -start() -> - {ok, _} = application:ensure_all_started(cowboy), - cowboy_router:compile([ - {'_', [ - {"/", ?MODULE, self()} - ]} - ]). +start_link() -> + gen_server:start_link(?MODULE, [], []). -start_http() -> - {ok, _Pid1} = cowboy:start_clear(http, [{port, 8080}], #{ - env => #{dispatch => start()} - }), - io:format("Start http server on 8080 successfully!~n"). +init([]) -> + EtsOptions = [named_table, public, set, {write_concurrency, true}, + {read_concurrency, true}], + emqx_web_hook_http_test = ets:new(emqx_web_hook_http_test, EtsOptions), + ok = start_http(?HTTP_PORT), + ok = start_https(?HTTPS_PORT), + {ok, #state{}}. -start_https() -> - Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"), - SslOpts = [{keyfile, Path ++ "/server-key.pem"}, - {cacertfile, Path ++ "/ca.pem"}, - {certfile, Path ++ "/server-cert.pem"}], +handle_call(_Request, _From, State) -> + {reply, ignored, State}. - {ok, _Pid2} = cowboy:start_tls(https, [{port, 8081}] ++ SslOpts, - #{env => #{dispatch => start()}}), - io:format(standard_error, "Start https server on 8081 successfully!~n", []). +handle_cast(_Msg, State) -> + {noreply, State}. -stop_http() -> - ok = cowboy:stop_listener(http), - io:format("Stopped http server on 8080"). +handle_info(_Info, State) -> + {noreply, State}. -stop_https() -> - ok = cowboy:stop_listener(https), - io:format("Stopped https server on 8081"). +terminate(_Reason, _State) -> + stop_http(), + stop_https(). +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +get_received_data() -> + ets:tab2list(emqx_web_hook_http_test). + +stop(Pid) -> + ok = gen_server:stop(Pid). %%-------------------------------------------------------------------- %% Callbacks %%-------------------------------------------------------------------- -init(Req, ReceiverPid) -> - Req1 = handle_request(Req, ReceiverPid), - {ok, Req1, ReceiverPid}. +start_http(Port) -> + {ok, _Pid1} = cowboy:start_clear(http, [{port, Port}], #{ + env => #{dispatch => compile_router()} + }), + io:format(standard_error, "[TEST LOG] Start http server on 9999 successfully!~n", []). -%% @private -handle_request(Req, ReceiverPid) -> +start_https(Port) -> + Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"), + SslOpts = [{keyfile, Path ++ "/server-key.pem"}, + {cacertfile, Path ++ "/ca.pem"}, + {certfile, Path ++ "/server-cert.pem"}], + + {ok, _Pid2} = cowboy:start_tls(https, [{port, Port}] ++ SslOpts, + #{env => #{dispatch => compile_router()}}), + io:format(standard_error, "[TEST LOG] Start https server on 8888 successfully!~n", []). + +stop_http() -> + ok = cowboy:stop_listener(http), + io:format("[TEST LOG] Stopped http server on 9999"). + +stop_https() -> + ok = cowboy:stop_listener(https), + io:format("[TEST LOG] Stopped https server on 8888"). + +compile_router() -> + {ok, _} = application:ensure_all_started(cowboy), + cowboy_router:compile([ + {'_', [{"/", ?MODULE, #{}}]} + ]). + +init(Req, State) -> Method = cowboy_req:method(Req), Headers = cowboy_req:headers(Req), - Params = - case Method of - <<"GET">> -> cowboy_req:parse_qs(Req); - <<"POST">> -> - {ok, PostVals, _Req2} = cowboy_req:read_urlencoded_body(Req), - PostVals - end, - io:format("Request Data:~p~nHeaders :~p~n", [Params, Headers]), - erlang:send(ReceiverPid, {Params, Headers}), - reply(Req, ok). + [Params] = case Method of + <<"GET">> -> cowboy_req:parse_qs(Req); + <<"POST">> -> + {ok, PostVals, _} = cowboy_req:read_urlencoded_body(Req), + PostVals + end, + ets:insert(emqx_web_hook_http_test, {Params, Headers}), + {ok, reply(Req, ok), State}. -%% @private reply(Req, ok) -> - cowboy_req:reply(200, #{<<"content-type">> => <<"text/plain">>}, <<"hello">>, Req); + cowboy_req:reply(200, #{<<"content-type">> => <<"text/plain">>}, <<"ok">>, Req); reply(Req, error) -> - cowboy_req:reply(404, #{<<"content-type">> => <<"text/plain">>}, <<"deny">>, Req). + cowboy_req:reply(404, #{<<"content-type">> => <<"text/plain">>}, <<"deny">>, Req). \ No newline at end of file