emqx/test/emqx_ocsp_cache_SUITE.erl

499 lines
17 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ocsp_cache_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("ssl/src/ssl_handshake.hrl").
-define(CACHE_TAB, emqx_ocsp_cache).
all() ->
[{group, openssl}] ++ tests().
tests() ->
emqx_ct:all(?MODULE) -- openssl_tests().
openssl_tests() ->
[t_openssl_client].
groups() ->
OpensslTests = openssl_tests(),
[ {openssl, [ {group, tls12}
, {group, tls13}
]}
, {tls12, [ {group, with_status_request}
, {group, without_status_request}
]}
, {tls13, [ {group, with_status_request}
, {group, without_status_request}
]}
, {with_status_request, [], OpensslTests}
, {without_status_request, [], OpensslTests}
].
init_per_suite(Config) ->
application:load(emqx),
OriginalListeners = application:get_env(emqx, listeners, []),
[ {original_listeners, OriginalListeners}
| Config].
end_per_suite(Config) ->
OriginalListeners = ?config(original_listeners, Config),
application:set_env(emqx, listeners, OriginalListeners),
ok.
init_per_group(tls12, Config) ->
[{tls_vsn, "-tls1_2"} | Config];
init_per_group(tls13, Config) ->
[{tls_vsn, "-tls1_3"} | Config];
init_per_group(with_status_request, Config) ->
[{status_request, true} | Config];
init_per_group(without_status_request, Config) ->
[{status_request, false} | Config];
init_per_group(_Group, Config) ->
Config.
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(t_openssl_client, Config) ->
ct:timetrap(10_000),
OriginalListeners = application:get_env(emqx, listeners),
DataDir = ?config(data_dir, Config),
IssuerPem = filename:join([DataDir, "ocsp-issuer.pem"]),
ServerCert = filename:join([DataDir, "server.pem"]),
ServerKey = filename:join([DataDir, "server.key"]),
CACert = filename:join([DataDir, "ca.pem"]),
Handler =
fun(emqx) ->
Listeners0 = emqx:get_env(listeners, []),
{[SSLListener0 = #{opts := Opts0}], Listeners1} =
lists:partition(
fun(#{proto := P, name := N}) ->
N =:= "external" andalso P =:= ssl
end,
Listeners0),
SSLOpts0 = proplists:get_value(ssl_options, Opts0),
SSLOpts1 = lists:foldl(
fun proplists:delete/2,
SSLOpts0,
[certfile, keyfile]),
SSLOpts2 = lists:foldl(
fun({K, V}, Acc) ->
[{K, V} | Acc]
end,
SSLOpts1,
[ {certfile, ServerCert}
, {keyfile, ServerKey}
, {cacertfile, CACert}
]),
Opts1 = proplists:delete(ssl_options, Opts0),
OCSPOpts = [ {ocsp_stapling_enabled, true}
, {ocsp_responder_url, "http://127.0.0.1:9877"}
, {ocsp_issuer_pem, IssuerPem}
],
Opts2 = emqx_misc:merge_opts(Opts1, [ {ocsp_options, OCSPOpts}
, {ssl_options, SSLOpts2}]),
Listeners = [ SSLListener0#{opts => Opts2}
| Listeners1],
application:set_env(emqx, listeners, Listeners),
ok;
(_) ->
ok
end,
OCSPResponderPort = spawn_openssl_ocsp_responder(Config),
{os_pid, OCSPOSPid} = erlang:port_info(OCSPResponderPort, os_pid),
%%%%%%%% Warning!!!
%% Apparently, openssl 3.0.7 introduced a bug in the responder
%% that makes it hang forever if one probes the port with
%% `gen_tcp:open' / `gen_tcp:close'... Comment this out if
%% openssl gets updated in CI or in your local machine.
case openssl_version() of
"3." ++ _ ->
%% hope that the responder has started...
ok;
_ ->
ensure_port_open(9877)
end,
ct:sleep(1_000),
emqx_ct_helpers:start_apps([], Handler),
ct:sleep(1_000),
[ {original_listeners, OriginalListeners}
, {ocsp_responder_port, OCSPResponderPort}
, {ocsp_responder_os_pid, OCSPOSPid}
| Config];
init_per_testcase(_TestCase, Config) ->
ct:timetrap(10_000),
TestPid = self(),
ok = meck:new(emqx_ocsp_cache, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_ocsp_cache, http_get,
fun(URL, _HTTPTimeout) ->
TestPid ! {http_get, URL},
{ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"ocsp response">>}}
end),
{ok, CachePid} = emqx_ocsp_cache:start_link(),
DataDir = ?config(data_dir, Config),
OCSPOpts = [ {ocsp_stapling_enabled, true}
, {ocsp_responder_url, "http://localhost:9877"}
, {ocsp_issuer_pem,
filename:join(DataDir, "ocsp-issuer.pem")}
, {ocsp_refresh_http_timeout, 15_000}
, {ocsp_refresh_interval, 1_000}
],
application:set_env(
emqx, listeners,
[#{ proto => ssl
, name => "test_ocsp"
, opts => [ {ssl_options, [{certfile,
filename:join(DataDir, "server.pem")}]}
, {ocsp_options, OCSPOpts}
]
}]),
snabbkaffe:start_trace(),
[ {cache_pid, CachePid}
| Config].
end_per_testcase(t_openssl_client, Config) ->
OriginalListeners = ?config(original_listeners, Config),
OCSPResponderOSPid = ?config(ocsp_responder_os_pid, Config),
case OriginalListeners of
{ok, Listeners} -> application:set_env(emqx, listeners, Listeners);
_ -> ok
end,
catch kill_pid(OCSPResponderOSPid),
emqx_ct_helpers:stop_apps([]),
ok;
end_per_testcase(_TestCase, Config) ->
CachePid = ?config(cache_pid, Config),
catch gen_server:stop(CachePid),
meck:unload([emqx_ocsp_cache]),
ok.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
assert_no_http_get() ->
receive
{http_get, _URL} ->
error(should_be_cached)
after
0 ->
ok
end.
assert_http_get(0) -> ok;
assert_http_get(N) when N > 0 ->
receive
{http_get, URL} ->
?assertMatch("http://localhost:9877/" ++ _Request64, URL),
ok
after
0 ->
error(no_http_get)
end,
assert_http_get(N - 1).
spawn_openssl_client(TLSVsn, RequestStatus, Config) ->
DataDir = ?config(data_dir, Config),
ClientCert = filename:join([DataDir, "client.pem"]),
ClientKey = filename:join([DataDir, "client.key"]),
Cacert = filename:join([DataDir, "ca.pem"]),
Openssl = os:find_executable("openssl"),
StatusOpt = case RequestStatus of
true -> ["-status"];
false -> []
end,
open_port( {spawn_executable, Openssl}
, [ {args, [ "s_client"
, "-connect", "localhost:8883"
%% needed to trigger `sni_fun'
, "-servername", "localhost"
, TLSVsn
, "-CAfile", Cacert
, "-cert", ClientCert
, "-key", ClientKey
] ++ StatusOpt}
, binary
, stderr_to_stdout
]
).
spawn_openssl_ocsp_responder(Config) ->
DataDir = ?config(data_dir, Config),
IssuerCert = filename:join([DataDir, "ocsp-issuer.pem"]),
IssuerKey = filename:join([DataDir, "ocsp-issuer.key"]),
Cacert = filename:join([DataDir, "ca.pem"]),
Index = filename:join([DataDir, "index.txt"]),
Openssl = os:find_executable("openssl"),
open_port( {spawn_executable, Openssl}
, [ {args, [ "ocsp"
, "-ignore_err"
, "-port", "9877"
, "-CA", Cacert
, "-rkey", IssuerKey
, "-rsigner", IssuerCert
, "-index", Index
]}
, binary
, stderr_to_stdout
]
).
kill_pid(OSPid) ->
os:cmd("kill -9 " ++ integer_to_list(OSPid)).
test_ocsp_connection(TLSVsn, WithRequestStatus = true, Config) ->
ClientPort = spawn_openssl_client(TLSVsn, WithRequestStatus, Config),
{os_pid, ClientOSPid} = erlang:port_info(ClientPort, os_pid),
try
timer:sleep(timer:seconds(1)),
{messages, Messages} = process_info(self(), messages),
OCSPOutput0 = [Output || {_Port, {data, Output}} <- Messages,
re:run(Output, "OCSP response:") =/= nomatch],
?assertMatch([_], OCSPOutput0,
#{ all_messages => Messages
}),
[OCSPOutput] = OCSPOutput0,
?assertMatch({match, _}, re:run(OCSPOutput, "OCSP Response Status: successful"),
#{all_messages => Messages}),
?assertMatch({match, _}, re:run(OCSPOutput, "Cert Status: good"),
#{all_messages => Messages}),
ok
after
catch kill_pid(ClientOSPid)
end;
test_ocsp_connection(TLSVsn, WithRequestStatus = false, Config) ->
ClientPort = spawn_openssl_client(TLSVsn, WithRequestStatus, Config),
{os_pid, ClientOSPid} = erlang:port_info(ClientPort, os_pid),
try
timer:sleep(timer:seconds(1)),
{messages, Messages} = process_info(self(), messages),
OCSPOutput = [Output || {_Port, {data, Output}} <- Messages,
re:run(Output, "OCSP response:") =/= nomatch],
?assertEqual([], OCSPOutput,
#{all_messages => Messages}),
ok
after
catch kill_pid(ClientOSPid)
end.
ensure_port_open(Port) ->
do_ensure_port_open(Port, 10).
do_ensure_port_open(Port, 0) ->
error({port_not_open, Port});
do_ensure_port_open(Port, N) when N > 0 ->
Timeout = 1_000,
case gen_tcp:connect("localhost", Port, [], Timeout) of
{ok, Sock} ->
gen_tcp:close(Sock),
ok;
{error, _} ->
ct:sleep(500),
do_ensure_port_open(Port, N - 1)
end.
get_sni_fun(ListenerID) ->
#{opts := Opts} = emqx_listeners:find_by_id(ListenerID),
SSLOpts = proplists:get_value(ssl_options, Opts),
proplists:get_value(sni_fun, SSLOpts).
openssl_version() ->
Res0 = string:trim(os:cmd("openssl version"), trailing),
[_, Res] = string:split(Res0, " "),
{match, [Version]} = re:run(Res, "^([^ ]+)", [{capture, first, list}]),
Version.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_request_ocsp_response(_Config) ->
?check_trace(
begin
ListenerID = <<"mqtt:ssl:test_ocsp">>,
%% not yet cached.
?assertEqual([], ets:tab2list(?CACHE_TAB)),
?assertEqual({ok, <<"ocsp response">>},
emqx_ocsp_cache:fetch_response(ListenerID)),
assert_http_get(1),
?assertMatch([{_, <<"ocsp response">>}], ets:tab2list(?CACHE_TAB)),
%% already cached; should not perform request again.
?assertEqual({ok, <<"ocsp response">>},
emqx_ocsp_cache:fetch_response(ListenerID)),
assert_no_http_get(),
ok
end,
fun(Trace) ->
?assert(
?strict_causality(
#{?snk_kind := ocsp_cache_miss, listener_id := _ListenerID},
#{?snk_kind := ocsp_http_fetch_and_cache, listener_id := _ListenerID},
Trace)),
?assertMatch(
[_],
?of_kind(ocsp_cache_miss, Trace)),
?assertMatch(
[_],
?of_kind(ocsp_http_fetch_and_cache, Trace)),
?assertMatch(
[_],
?of_kind(ocsp_cache_hit, Trace)),
ok
end).
t_request_ocsp_response_restart_cache(Config) ->
process_flag(trap_exit, true),
CachePid = ?config(cache_pid, Config),
ListenerID = <<"mqtt:ssl:test_ocsp">>,
?check_trace(
begin
[] = ets:tab2list(?CACHE_TAB),
{ok, _} = emqx_ocsp_cache:fetch_response(ListenerID),
?wait_async_action(
begin
Ref = monitor(process, CachePid),
exit(CachePid, kill),
receive
{'DOWN', Ref, process, CachePid, killed} ->
ok
after
1_000 ->
error(cache_not_killed)
end,
{ok, _} = emqx_ocsp_cache:start_link(),
ok
end,
#{?snk_kind := ocsp_cache_init}),
{ok, _} = emqx_ocsp_cache:fetch_response(ListenerID),
ok
end,
fun(Trace) ->
?assertMatch(
[_, _],
?of_kind(ocsp_http_fetch_and_cache, Trace)),
assert_http_get(2),
ok
end).
t_request_ocsp_response_bad_http_status(_Config) ->
TestPid = self(),
meck:expect(emqx_ocsp_cache, http_get,
fun(URL, _HTTPTimeout) ->
TestPid ! {http_get, URL},
{ok, {{"HTTP/1.0", 404, 'Not Found'}, [], <<"not found">>}}
end),
ListenerID = <<"mqtt:ssl:test_ocsp">>,
%% not yet cached.
?assertEqual([], ets:tab2list(?CACHE_TAB)),
?assertEqual(error,
emqx_ocsp_cache:fetch_response(ListenerID)),
assert_http_get(1),
?assertEqual([], ets:tab2list(?CACHE_TAB)),
ok.
t_request_ocsp_response_timeout(_Config) ->
TestPid = self(),
meck:expect(emqx_ocsp_cache, http_get,
fun(URL, _HTTPTimeout) ->
TestPid ! {http_get, URL},
{error, timeout}
end),
ListenerID = <<"mqtt:ssl:test_ocsp">>,
%% not yet cached.
?assertEqual([], ets:tab2list(?CACHE_TAB)),
?assertEqual(error,
emqx_ocsp_cache:fetch_response(ListenerID)),
assert_http_get(1),
?assertEqual([], ets:tab2list(?CACHE_TAB)),
ok.
t_register_listener(_Config) ->
ListenerID = <<"mqtt:ssl:test_ocsp">>,
%% should fetch and cache immediately
{ok, {ok, _}} =
?wait_async_action(
emqx_ocsp_cache:register_listener(ListenerID),
#{?snk_kind := ocsp_http_fetch_and_cache, listener_id := ListenerID}),
assert_http_get(1),
?assertMatch([{_, <<"ocsp response">>}], ets:tab2list(?CACHE_TAB)),
ok.
t_register_twice(_Config) ->
ListenerID = <<"mqtt:ssl:test_ocsp">>,
{ok, {ok, _}} =
?wait_async_action(
emqx_ocsp_cache:register_listener(ListenerID),
#{?snk_kind := ocsp_http_fetch_and_cache, listener_id := ListenerID}),
assert_http_get(1),
?assertMatch([{_, <<"ocsp response">>}], ets:tab2list(?CACHE_TAB)),
%% should have no problem in registering the same listener again.
%% this prompts an immediate refresh.
{ok, {ok, _}} =
?wait_async_action(
emqx_ocsp_cache:register_listener(ListenerID),
#{?snk_kind := ocsp_http_fetch_and_cache, listener_id := ListenerID}),
ok.
t_refresh_periodically(_Config) ->
ListenerID = <<"mqtt:ssl:test_ocsp">>,
%% should refresh periodically
{ok, SubRef} =
snabbkaffe:subscribe(
fun(#{?snk_kind := ocsp_http_fetch_and_cache, listener_id := ListenerID0}) ->
ListenerID0 =:= ListenerID;
(_) ->
false
end,
_NEvents = 2,
_Timeout = 10_000),
ok = emqx_ocsp_cache:register_listener(ListenerID),
?assertMatch({ok, [_, _]}, snabbkaffe:receive_events(SubRef)),
assert_http_get(2),
ok.
t_sni_fun_success(_Config) ->
ListenerID = <<"mqtt:ssl:test_ocsp">>,
ServerName = "localhost",
?assertEqual(
[{certificate_status,
#certificate_status{
status_type = ?CERTIFICATE_STATUS_TYPE_OCSP,
response = <<"ocsp response">>
}}],
emqx_ocsp_cache:sni_fun(ServerName, ListenerID)),
ok.
t_sni_fun_http_error(_Config) ->
meck:expect(emqx_ocsp_cache, http_get,
fun(_URL, _HTTPTimeout) ->
{error, timeout}
end),
ListenerID = <<"mqtt:ssl:test_ocsp">>,
ServerName = "localhost",
?assertEqual(
[],
emqx_ocsp_cache:sni_fun(ServerName, ListenerID)),
ok.
t_openssl_client(Config) ->
TLSVsn = ?config(tls_vsn, Config),
WithStatusRequest = ?config(status_request, Config),
%% ensure ocsp response is already cached.
ListenerID = <<"mqtt:ssl:external">>,
?assertMatch(
{ok, _},
emqx_ocsp_cache:fetch_response(ListenerID),
#{msgs => process_info(self(), messages)}),
timer:sleep(500),
test_ocsp_connection(TLSVsn, WithStatusRequest, Config).