Merge pull request #4067 from HJianBo/fix/webhook_certs
fix(webhook): fix bad https confs
This commit is contained in:
commit
7919f08e03
|
@ -72,11 +72,9 @@ translate_env() ->
|
|||
true -> verify_peer;
|
||||
false -> verify_none
|
||||
end,
|
||||
TLSOpts = lists:filter(fun({_K, V}) when V =:= <<>> ->
|
||||
false;
|
||||
(_) ->
|
||||
true
|
||||
end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]),
|
||||
TLSOpts = lists:filter(fun({_K, V}) ->
|
||||
V /= <<>> andalso V /= undefined andalso V /= "" andalso true
|
||||
end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]),
|
||||
TlsVers = ['tlsv1.2','tlsv1.1',tlsv1],
|
||||
NTLSOpts = [{verify, VerifyType},
|
||||
{versions, TlsVers},
|
||||
|
|
|
@ -47,49 +47,38 @@ init_per_group(Name, Config) ->
|
|||
set_special_cfgs(),
|
||||
case Name of
|
||||
http ->
|
||||
http_server:start_http(),
|
||||
emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_http/1);
|
||||
https ->
|
||||
http_server:start_https(),
|
||||
emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_https/1);
|
||||
ipv6http ->
|
||||
http_server:start_http(),
|
||||
emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_ipv6_http/1);
|
||||
ipv6https ->
|
||||
http_server:start_https(),
|
||||
emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_ipv6_https/1)
|
||||
end,
|
||||
Config.
|
||||
|
||||
end_per_group(Name, Config) ->
|
||||
case lists:member(Name,[http, ipv6http]) of
|
||||
true ->
|
||||
http_server:stop_http();
|
||||
_ ->
|
||||
http_server:stop_https()
|
||||
end,
|
||||
end_per_group(_Name, Config) ->
|
||||
emqx_ct_helpers:stop_apps([emqx_web_hook]),
|
||||
Config.
|
||||
|
||||
set_special_configs_http(_) ->
|
||||
ok.
|
||||
application:set_env(emqx_web_hook, url, "http://127.0.0.1:9999").
|
||||
|
||||
set_special_configs_https(_) ->
|
||||
Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"),
|
||||
SslOpts = [{keyfile, Path ++ "/client-key.pem"},
|
||||
{certfile, Path ++ "/client-cert.pem"},
|
||||
{cacertfile, Path ++ "/ca.pem"}],
|
||||
{cafile, Path ++ "/ca.pem"}],
|
||||
application:set_env(emqx_web_hook, ssl, true),
|
||||
application:set_env(emqx_web_hook, ssloptions, SslOpts),
|
||||
application:set_env(emqx_web_hook, url, "https://127.0.0.1:8081").
|
||||
application:set_env(emqx_web_hook, url, "https://127.0.0.1:8888").
|
||||
|
||||
set_special_configs_ipv6_http(N) ->
|
||||
set_special_configs_http(N),
|
||||
application:set_env(emqx_web_hook, url, "http://[::1]:8080").
|
||||
set_special_configs_ipv6_http(_) ->
|
||||
application:set_env(emqx_web_hook, url, "http://[::1]:9999").
|
||||
|
||||
set_special_configs_ipv6_https(N) ->
|
||||
set_special_configs_https(N),
|
||||
application:set_env(emqx_web_hook, url, "https://[::1]:8081").
|
||||
application:set_env(emqx_web_hook, url, "https://[::1]:8888").
|
||||
|
||||
set_special_cfgs() ->
|
||||
AllRules = [{"message.acked", "{\"action\": \"on_message_acked\"}"},
|
||||
|
@ -109,6 +98,27 @@ set_special_cfgs() ->
|
|||
%% Test cases
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
t_valid(Config) ->
|
||||
{ok, ServerPid} = http_server:start_link(),
|
||||
application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]),
|
||||
{ok, C} = emqtt:start_link([ {clientid, <<"simpleClient">>}
|
||||
, {proto_ver, v5}
|
||||
, {keepalive, 60}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
emqtt:subscribe(C, <<"TopicA">>, qos2),
|
||||
emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2),
|
||||
emqtt:unsubscribe(C, <<"TopicA">>),
|
||||
emqtt:disconnect(C),
|
||||
[begin
|
||||
Maps = emqx_json:decode(P, [return_maps]),
|
||||
validate_hook_resp(Maps),
|
||||
validate_hook_headers(Headers)
|
||||
end
|
||||
|| {{P, _Bool}, Headers} <- http_server:get_received_data()],
|
||||
http_server:stop(ServerPid),
|
||||
Config.
|
||||
|
||||
t_check_hooked(_) ->
|
||||
{ok, Rules} = application:get_env(emqx_web_hook, rules),
|
||||
lists:foreach(fun({HookName, _Action}) ->
|
||||
|
@ -127,90 +137,69 @@ t_change_config(_) ->
|
|||
application:set_env(emqx_web_hook, rules, Rules),
|
||||
emqx_web_hook:load().
|
||||
|
||||
t_valid() ->
|
||||
application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]),
|
||||
{ok, C} = emqtt:start_link([ {clientid, <<"simpleClient">>}
|
||||
, {proto_ver, v5}
|
||||
, {keepalive, 60}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
emqtt:subscribe(C, <<"TopicA">>, qos2),
|
||||
emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2),
|
||||
emqtt:unsubscribe(C, <<"TopicA">>),
|
||||
emqtt:disconnect(C),
|
||||
{Params, Headers} = get_http_message(),
|
||||
[validate_hook_resp(A) || A <- Params],
|
||||
?assertEqual(<<"K1">>, maps:get(<<"k1">>, Headers)),
|
||||
?assertEqual(<<"K2">>, maps:get(<<"k2">>, Headers)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Utils
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
get_http_message() ->
|
||||
receive
|
||||
{Params, Headers} ->
|
||||
L = [B || {B, _} <- Params],
|
||||
{lists:reverse([emqx_json:decode(E, [return_maps]) || E <- L]), Headers}
|
||||
after 500 ->
|
||||
{null, null}
|
||||
end.
|
||||
validate_hook_headers(Headers) ->
|
||||
?assertEqual(<<"K1">>, maps:get(<<"k1">>, Headers)),
|
||||
?assertEqual(<<"K2">>, maps:get(<<"k2">>, Headers)).
|
||||
|
||||
validate_hook_resp(Body = ?ACTION(<<"client_connect">>)) ->
|
||||
?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
|
||||
?assertEqual(60, maps:get(<<"keepalive">>, Body)),
|
||||
?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
|
||||
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
assert_username_clientid(Body);
|
||||
validate_hook_resp(Body = ?ACTION(<<"client_connack">>)) ->
|
||||
?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
|
||||
?assertEqual(60, maps:get(<<"keepalive">>, Body)),
|
||||
?assertEqual(<<"success">>, maps:get(<<"conn_ack">>, Body)),
|
||||
?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
|
||||
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
assert_username_clientid(Body);
|
||||
validate_hook_resp(Body = ?ACTION(<<"client_connected">>)) ->
|
||||
_ = maps:get(<<"connected_at">>, Body),
|
||||
?assertEqual(5, maps:get(<<"proto_ver">>, Body)),
|
||||
?assertEqual(60, maps:get(<<"keepalive">>, Body)),
|
||||
?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
|
||||
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
assert_username_clientid(Body);
|
||||
validate_hook_resp(Body = ?ACTION(<<"client_disconnected">>)) ->
|
||||
?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
|
||||
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
assert_username_clientid(Body);
|
||||
validate_hook_resp(Body = ?ACTION(<<"client_subscribe">>)) ->
|
||||
_ = maps:get(<<"opts">>, Body),
|
||||
?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
|
||||
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
assert_username_clientid(Body);
|
||||
validate_hook_resp(Body = ?ACTION(<<"client_unsubscribe">>)) ->
|
||||
_ = maps:get(<<"opts">>, Body),
|
||||
?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
|
||||
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
assert_username_clientid(Body);
|
||||
validate_hook_resp(Body = ?ACTION(<<"session_subscribed">>)) ->
|
||||
_ = maps:get(<<"opts">>, Body),
|
||||
?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
|
||||
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
assert_username_clientid(Body);
|
||||
validate_hook_resp(Body = ?ACTION(<<"session_unsubscribed">>)) ->
|
||||
?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
|
||||
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
assert_username_clientid(Body);
|
||||
validate_hook_resp(Body = ?ACTION(<<"session_terminated">>)) ->
|
||||
?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
|
||||
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
assert_username_clientid(Body);
|
||||
validate_hook_resp(Body = ?ACTION(<<"message_publish">>)) ->
|
||||
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
assert_messages_attrs(Body);
|
||||
validate_hook_resp(Body = ?ACTION(<<"message_delivered">>)) ->
|
||||
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
assert_messages_attrs(Body);
|
||||
validate_hook_resp(Body = ?ACTION(<<"message_acked">>)) ->
|
||||
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
|
||||
assert_messages_attrs(Body).
|
||||
|
||||
assert_username_clientid(#{<<"clientid">> := ClientId, <<"username">> := Username}) ->
|
||||
|
|
|
@ -3,73 +3,103 @@
|
|||
%%
|
||||
%% It will deliver the http-request params to initialer process
|
||||
%%--------------------------------------------------------------------
|
||||
%%
|
||||
%% Author:wwhai
|
||||
%%
|
||||
-module(http_server).
|
||||
-behaviour(gen_server).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([get_received_data/0]).
|
||||
-export([stop/1]).
|
||||
-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]).
|
||||
-define(HTTP_PORT, 9999).
|
||||
-define(HTTPS_PORT, 8888).
|
||||
-record(state, {}).
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
start() ->
|
||||
{ok, _} = application:ensure_all_started(cowboy),
|
||||
cowboy_router:compile([
|
||||
{'_', [
|
||||
{"/", ?MODULE, self()}
|
||||
]}
|
||||
]).
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link(?MODULE, [], []).
|
||||
|
||||
start_http() ->
|
||||
{ok, _Pid1} = cowboy:start_clear(http, [{port, 8080}], #{
|
||||
env => #{dispatch => start()}
|
||||
}),
|
||||
io:format("Start http server on 8080 successfully!~n").
|
||||
init([]) ->
|
||||
EtsOptions = [named_table, public, set, {write_concurrency, true},
|
||||
{read_concurrency, true}],
|
||||
emqx_web_hook_http_test = ets:new(emqx_web_hook_http_test, EtsOptions),
|
||||
ok = start_http(?HTTP_PORT),
|
||||
ok = start_https(?HTTPS_PORT),
|
||||
{ok, #state{}}.
|
||||
|
||||
start_https() ->
|
||||
Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"),
|
||||
SslOpts = [{keyfile, Path ++ "/server-key.pem"},
|
||||
{cacertfile, Path ++ "/ca.pem"},
|
||||
{certfile, Path ++ "/server-cert.pem"}],
|
||||
handle_call(_Request, _From, State) ->
|
||||
{reply, ignored, State}.
|
||||
|
||||
{ok, _Pid2} = cowboy:start_tls(https, [{port, 8081}] ++ SslOpts,
|
||||
#{env => #{dispatch => start()}}),
|
||||
io:format(standard_error, "Start https server on 8081 successfully!~n", []).
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
stop_http() ->
|
||||
ok = cowboy:stop_listener(http),
|
||||
io:format("Stopped http server on 8080").
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
stop_https() ->
|
||||
ok = cowboy:stop_listener(https),
|
||||
io:format("Stopped https server on 8081").
|
||||
terminate(_Reason, _State) ->
|
||||
stop_http(),
|
||||
stop_https().
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
get_received_data() ->
|
||||
ets:tab2list(emqx_web_hook_http_test).
|
||||
|
||||
stop(Pid) ->
|
||||
ok = gen_server:stop(Pid).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init(Req, ReceiverPid) ->
|
||||
Req1 = handle_request(Req, ReceiverPid),
|
||||
{ok, Req1, ReceiverPid}.
|
||||
start_http(Port) ->
|
||||
{ok, _Pid1} = cowboy:start_clear(http, [{port, Port}], #{
|
||||
env => #{dispatch => compile_router()}
|
||||
}),
|
||||
io:format(standard_error, "[TEST LOG] Start http server on 9999 successfully!~n", []).
|
||||
|
||||
%% @private
|
||||
handle_request(Req, ReceiverPid) ->
|
||||
start_https(Port) ->
|
||||
Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"),
|
||||
SslOpts = [{keyfile, Path ++ "/server-key.pem"},
|
||||
{cacertfile, Path ++ "/ca.pem"},
|
||||
{certfile, Path ++ "/server-cert.pem"}],
|
||||
|
||||
{ok, _Pid2} = cowboy:start_tls(https, [{port, Port}] ++ SslOpts,
|
||||
#{env => #{dispatch => compile_router()}}),
|
||||
io:format(standard_error, "[TEST LOG] Start https server on 8888 successfully!~n", []).
|
||||
|
||||
stop_http() ->
|
||||
ok = cowboy:stop_listener(http),
|
||||
io:format("[TEST LOG] Stopped http server on 9999").
|
||||
|
||||
stop_https() ->
|
||||
ok = cowboy:stop_listener(https),
|
||||
io:format("[TEST LOG] Stopped https server on 8888").
|
||||
|
||||
compile_router() ->
|
||||
{ok, _} = application:ensure_all_started(cowboy),
|
||||
cowboy_router:compile([
|
||||
{'_', [{"/", ?MODULE, #{}}]}
|
||||
]).
|
||||
|
||||
init(Req, State) ->
|
||||
Method = cowboy_req:method(Req),
|
||||
Headers = cowboy_req:headers(Req),
|
||||
Params =
|
||||
case Method of
|
||||
<<"GET">> -> cowboy_req:parse_qs(Req);
|
||||
<<"POST">> ->
|
||||
{ok, PostVals, _Req2} = cowboy_req:read_urlencoded_body(Req),
|
||||
PostVals
|
||||
end,
|
||||
io:format("Request Data:~p~nHeaders :~p~n", [Params, Headers]),
|
||||
erlang:send(ReceiverPid, {Params, Headers}),
|
||||
reply(Req, ok).
|
||||
[Params] = case Method of
|
||||
<<"GET">> -> cowboy_req:parse_qs(Req);
|
||||
<<"POST">> ->
|
||||
{ok, PostVals, _} = cowboy_req:read_urlencoded_body(Req),
|
||||
PostVals
|
||||
end,
|
||||
ets:insert(emqx_web_hook_http_test, {Params, Headers}),
|
||||
{ok, reply(Req, ok), State}.
|
||||
|
||||
%% @private
|
||||
reply(Req, ok) ->
|
||||
cowboy_req:reply(200, #{<<"content-type">> => <<"text/plain">>}, <<"hello">>, Req);
|
||||
cowboy_req:reply(200, #{<<"content-type">> => <<"text/plain">>}, <<"ok">>, Req);
|
||||
reply(Req, error) ->
|
||||
cowboy_req:reply(404, #{<<"content-type">> => <<"text/plain">>}, <<"deny">>, Req).
|
||||
cowboy_req:reply(404, #{<<"content-type">> => <<"text/plain">>}, <<"deny">>, Req).
|
Loading…
Reference in New Issue