emqx/test/emqx_crl_cache_SUITE.erl

647 lines
23 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_crl_cache_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% from ssl_manager.erl
-record(state, {
session_cache_client,
session_cache_client_cb,
session_lifetime,
certificate_db,
session_validation_timer,
session_cache_client_max,
session_client_invalidator,
options,
client_session_order
}).
%%--------------------------------------------------------------------
%% CT boilerplate
%%--------------------------------------------------------------------
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
Config.
end_per_suite(_Config) ->
ok.
init_per_testcase(TestCase, Config)
when TestCase =:= t_empty_cache;
TestCase =:= t_filled_cache;
TestCase =:= t_revoked ->
DataDir = ?config(data_dir, Config),
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
{ok, CRLPem} = file:read_file(CRLFile),
[{'CertificateList', CRLDer, not_encrypted}] = public_key:pem_decode(CRLPem),
ServerPid = start_crl_server(CRLPem),
IsCached = lists:member(TestCase, [t_filled_cache, t_revoked]),
ok = setup_crl_options(Config, #{is_cached => IsCached}),
[ {crl_pem, CRLPem}
, {crl_der, CRLDer}
, {http_server, ServerPid}
| Config];
init_per_testcase(t_revoke_then_refresh, Config) ->
DataDir = ?config(data_dir, Config),
CRLFileNotRevoked = filename:join([DataDir, "intermediate-not-revoked.crl.pem"]),
{ok, CRLPemNotRevoked} = file:read_file(CRLFileNotRevoked),
[{'CertificateList', CRLDerNotRevoked, not_encrypted}] = public_key:pem_decode(CRLPemNotRevoked),
CRLFileRevoked = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
{ok, CRLPemRevoked} = file:read_file(CRLFileRevoked),
[{'CertificateList', CRLDerRevoked, not_encrypted}] = public_key:pem_decode(CRLPemRevoked),
ServerPid = start_crl_server(CRLPemNotRevoked),
OldListeners = emqx:get_env(listeners),
OldRefreshInterval = emqx:get_env(crl_cache_refresh_interval),
NewRefreshInterval = timer:seconds(10),
ExtraHandler =
fun(emqx) ->
application:set_env(emqx, crl_cache_refresh_interval, NewRefreshInterval),
ok;
(_) ->
ok
end,
ok = setup_crl_options(Config, #{is_cached => true, extra_handler => ExtraHandler}),
ok = snabbkaffe:start_trace(),
{ok, {ok, _}} =
?wait_async_action(
emqx_crl_cache:refresh_config(),
#{?snk_kind := crl_cache_refresh_config},
_Timeout = 10_000),
[ {crl_pem_not_revoked, CRLPemNotRevoked}
, {crl_der_not_revoked, CRLDerNotRevoked}
, {crl_pem_revoked, CRLPemRevoked}
, {crl_der_revoked, CRLDerRevoked}
, {http_server, ServerPid}
, {old_configs, [ {listeners, OldListeners}
, {crl_cache_refresh_interval, OldRefreshInterval}
]}
| Config];
init_per_testcase(t_not_cached_and_unreachable, Config) ->
DataDir = ?config(data_dir, Config),
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
{ok, CRLPem} = file:read_file(CRLFile),
[{'CertificateList', CRLDer, not_encrypted}] = public_key:pem_decode(CRLPem),
application:stop(cowboy),
ok = setup_crl_options(Config, #{is_cached => false}),
[ {crl_pem, CRLPem}
, {crl_der, CRLDer}
| Config];
init_per_testcase(t_refresh_config, Config) ->
DataDir = ?config(data_dir, Config),
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
{ok, CRLPem} = file:read_file(CRLFile),
[{'CertificateList', CRLDer, not_encrypted}] = public_key:pem_decode(CRLPem),
TestPid = self(),
ok = meck:new(emqx_crl_cache, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_crl_cache, http_get,
fun(URL, _HTTPTimeout) ->
TestPid ! {http_get, URL},
{ok, {{"HTTP/1.0", 200, 'OK'}, [], CRLPem}}
end),
OldListeners = emqx:get_env(listeners),
OldRefreshInterval = emqx:get_env(crl_cache_refresh_interval),
OldHTTPTimeout = emqx:get_env(crl_cache_http_timeout),
ok = setup_crl_options(Config, #{is_cached => false}),
[ {crl_pem, CRLPem}
, {crl_der, CRLDer}
, {old_configs, [ {listeners, OldListeners}
, {crl_cache_refresh_interval, OldRefreshInterval}
, {crl_cache_http_timeout, OldHTTPTimeout}
]}
| Config];
init_per_testcase(_TestCase, Config) ->
DataDir = ?config(data_dir, Config),
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
{ok, CRLPem} = file:read_file(CRLFile),
[{'CertificateList', CRLDer, not_encrypted}] = public_key:pem_decode(CRLPem),
TestPid = self(),
ok = meck:new(emqx_crl_cache, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_crl_cache, http_get,
fun(URL, _HTTPTimeout) ->
TestPid ! {http_get, URL},
{ok, {{"HTTP/1.0", 200, 'OK'}, [], CRLPem}}
end),
[ {crl_pem, CRLPem}
, {crl_der, CRLDer}
| Config].
end_per_testcase(TestCase, Config)
when TestCase =:= t_empty_cache;
TestCase =:= t_filled_cache;
TestCase =:= t_revoked ->
ServerPid = ?config(http_server, Config),
emqx_crl_cache_http_server:stop(ServerPid),
emqx_ct_helpers:stop_apps([]),
emqx_ct_helpers:change_emqx_opts(
ssl_twoway, [ {crl_options, [ {crl_check_enabled, false}
, {crl_cache_urls, []}
]}
]),
application:stop(cowboy),
clear_crl_cache(),
ok = snabbkaffe:stop(),
ok;
end_per_testcase(t_revoke_then_refresh, Config) ->
ServerPid = ?config(http_server, Config),
emqx_crl_cache_http_server:stop(ServerPid),
emqx_ct_helpers:stop_apps([]),
OldConfigs = ?config(old_configs, Config),
clear_crl_cache(),
emqx_ct_helpers:stop_apps([]),
emqx_ct_helpers:change_emqx_opts(
ssl_twoway, [ {crl_options, [ {crl_check_enabled, false}
, {crl_cache_urls, []}
]}
]),
clear_crl_cache(),
lists:foreach(
fun({Key, MValue}) ->
case MValue of
undefined -> ok;
Value -> application:set_env(emqx, Key, Value)
end
end,
OldConfigs),
application:stop(cowboy),
ok = snabbkaffe:stop(),
ok;
end_per_testcase(t_not_cached_and_unreachable, _Config) ->
emqx_ct_helpers:stop_apps([]),
emqx_ct_helpers:change_emqx_opts(
ssl_twoway, [ {crl_options, [ {crl_check_enabled, false}
, {crl_cache_urls, []}
]}
]),
clear_crl_cache(),
ok = snabbkaffe:stop(),
ok;
end_per_testcase(t_refresh_config, Config) ->
OldConfigs = ?config(old_configs, Config),
meck:unload([emqx_crl_cache]),
clear_crl_cache(),
emqx_ct_helpers:stop_apps([]),
emqx_ct_helpers:change_emqx_opts(
ssl_twoway, [ {crl_options, [ {crl_check_enabled, false}
, {crl_cache_urls, []}
]}
]),
clear_crl_cache(),
lists:foreach(
fun({Key, MValue}) ->
case MValue of
undefined -> ok;
Value -> application:set_env(emqx, Key, Value)
end
end,
OldConfigs),
application:stop(cowboy),
ok = snabbkaffe:stop(),
ok;
end_per_testcase(_TestCase, _Config) ->
meck:unload([emqx_crl_cache]),
clear_crl_cache(),
ok = snabbkaffe:stop(),
ok.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
assert_http_get(URL) ->
receive
{http_get, URL} ->
ok
after
1000 ->
error({should_have_requested, URL})
end.
get_crl_cache_table() ->
#state{certificate_db = [_, _, _, {Ref, _}]} = sys:get_state(ssl_manager),
Ref.
start_crl_server(Port, CRLPem) ->
{ok, LSock} = gen_tcp:listen(Port, [binary, {active, true}, reusedaddr]),
spawn_link(fun() -> accept_loop(LSock, CRLPem) end),
ok.
accept_loop(LSock, CRLPem) ->
case gen_tcp:accept(LSock) of
{ok, Sock} ->
Worker = spawn_link(fun() -> crl_loop(Sock, CRLPem) end),
gen_tcp:controlling_process(Sock, Worker),
accept_loop(LSock, CRLPem);
{error, Reason} ->
error({accept_error, Reason})
end.
crl_loop(Sock, CRLPem) ->
receive
{tcp, Sock, _Data} ->
gen_tcp:send(Sock, CRLPem),
crl_loop(Sock, CRLPem);
_Msg ->
ok
end.
drain_msgs() ->
receive
_Msg ->
drain_msgs()
after
0 ->
ok
end.
clear_crl_cache() ->
%% reset the CRL cache
exit(whereis(ssl_manager), kill),
ok.
force_cacertfile(Cacertfile) ->
{SSLListeners0, OtherListeners} = lists:partition(
fun(#{proto := Proto}) -> Proto =:= ssl end,
emqx:get_env(listeners)),
SSLListeners =
lists:map(
fun(Listener = #{opts := Opts0}) ->
SSLOpts0 = proplists:get_value(ssl_options, Opts0),
%% it injects some garbage...
SSLOpts1 = lists:keydelete(cacertfile, 1, lists:keydelete(cacertfile, 1, SSLOpts0)),
SSLOpts2 = [{cacertfile, Cacertfile} | SSLOpts1],
Opts1 = lists:keyreplace(ssl_options, 1, Opts0, {ssl_options, SSLOpts2}),
Listener#{opts => Opts1}
end,
SSLListeners0),
application:set_env(emqx, listeners, SSLListeners ++ OtherListeners),
ok.
setup_crl_options(Config, Opts = #{is_cached := IsCached}) ->
DataDir = ?config(data_dir, Config),
Cacertfile = filename:join(DataDir, "ca-chain.cert.pem"),
Certfile = filename:join(DataDir, "server.cert.pem"),
Keyfile = filename:join(DataDir, "server.key.pem"),
URLs = case IsCached of
false -> [];
true -> ["http://localhost:9878/intermediate.crl.pem"]
end,
ExtraHandler = maps:get(extra_handler, Opts, fun(_) -> ok end),
Handler =
fun(emqx) ->
emqx_ct_helpers:change_emqx_opts(
ssl_twoway, [ {ssl_options, [ {certfile, Certfile}
, {keyfile, Keyfile}
, {verify, verify_peer}
%% {crl_check, true} does not work; probably bug in OTP
, {crl_check, peer}
, {crl_cache,
{ssl_crl_cache, {internal, [{http, timer:seconds(15)}]}}}
]}
, {crl_options, [ {crl_check_enabled, true}
, {crl_cache_urls, URLs}
]}
]),
%% emqx_ct_helpers:change_emqx_opts has cacertfile hardcoded....
ok = force_cacertfile(Cacertfile),
ExtraHandler(emqx),
ok;
(App) ->
ExtraHandler(App),
ok
end,
emqx_ct_helpers:start_apps([], Handler),
case IsCached of
true ->
%% wait the cache to be filled
receive
http_get -> ok
after
1_000 -> error(crl_cache_not_filled)
end;
false ->
%% ensure cache is empty
clear_crl_cache(),
ct:sleep(200),
ok
end,
drain_msgs(),
ok.
start_crl_server(CRLPem) ->
application:ensure_all_started(cowboy),
{ok, ServerPid} = emqx_crl_cache_http_server:start_link(self(), 9878, CRLPem, []),
receive
{ServerPid, ready} -> ok
after
1000 -> error(timeout_starting_http_server)
end,
ServerPid.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_init_empty_urls(_Config) ->
Ref = get_crl_cache_table(),
?assertEqual([], ets:tab2list(Ref)),
?assertMatch({ok, _}, emqx_crl_cache:start_link()),
receive
{http_get, _} ->
error(should_not_make_http_request)
after
1000 -> ok
end,
?assertEqual([], ets:tab2list(Ref)),
ok.
t_init_refresh(Config) ->
CRLDer = ?config(crl_der, Config),
Ref = get_crl_cache_table(),
?assertEqual([], ets:tab2list(Ref)),
URL1 = "http://localhost/crl1.pem",
URL2 = "http://localhost/crl2.pem",
Opts = #{ urls => [URL1, URL2]
, refresh_interval => timer:minutes(15)
, http_timeout => timer:seconds(15)
},
ok = snabbkaffe:start_trace(),
{ok, SubRef} = snabbkaffe:subscribe(
fun(#{?snk_kind := Kind}) ->
Kind =:= crl_cache_insert
end,
_NEvents = 2,
_Timeout = 2_000),
?assertMatch({ok, _}, emqx_crl_cache:start_link(Opts)),
lists:foreach(fun assert_http_get/1, [URL1, URL2]),
{ok, _} = snabbkaffe:receive_events(SubRef),
snabbkaffe:stop(),
?assertEqual(
[{"crl1.pem", [CRLDer]}, {"crl2.pem", [CRLDer]}],
lists:sort(ets:tab2list(Ref))),
ok.
t_manual_refresh(Config) ->
CRLDer = ?config(crl_der, Config),
Ref = get_crl_cache_table(),
?assertEqual([], ets:tab2list(Ref)),
{ok, _} = emqx_crl_cache:start_link(),
URL = "http://localhost/crl.pem",
ok = snabbkaffe:start_trace(),
?wait_async_action(
?assertEqual(ok, emqx_crl_cache:refresh(URL)),
#{?snk_kind := crl_cache_insert},
5_000),
ok = snabbkaffe:stop(),
?assertEqual(
[{"crl.pem", [CRLDer]}],
ets:tab2list(Ref)),
ok.
t_refresh_request_error(_Config) ->
meck:expect(emqx_crl_cache, http_get,
fun(_URL, _HTTPTimeout) ->
{ok, {{"HTTP/1.0", 404, 'Not Found'}, [], <<"not found">>}}
end),
{ok, _} = emqx_crl_cache:start_link(),
URL = "http://localhost/crl.pem",
?check_trace(
?wait_async_action(
?assertEqual(ok, emqx_crl_cache:refresh(URL)),
#{?snk_kind := crl_cache_insert},
5_000),
fun(Trace) ->
?assertMatch(
[#{error := {bad_response, #{code := 404}}}],
?of_kind(crl_refresh_failure, Trace)),
ok
end),
ok = snabbkaffe:stop(),
ok.
t_refresh_invalid_response(_Config) ->
meck:expect(emqx_crl_cache, http_get,
fun(_URL, _HTTPTimeout) ->
{ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"not a crl">>}}
end),
{ok, _} = emqx_crl_cache:start_link(),
URL = "http://localhost/crl.pem",
?check_trace(
?wait_async_action(
?assertEqual(ok, emqx_crl_cache:refresh(URL)),
#{?snk_kind := crl_cache_insert},
5_000),
fun(Trace) ->
?assertMatch(
[#{crls := []}],
?of_kind(crl_cache_insert, Trace)),
ok
end),
ok = snabbkaffe:stop(),
ok.
t_refresh_http_error(_Config) ->
meck:expect(emqx_crl_cache, http_get,
fun(_URL, _HTTPTimeout) ->
{error, timeout}
end),
{ok, _} = emqx_crl_cache:start_link(),
URL = "http://localhost/crl.pem",
?check_trace(
?wait_async_action(
?assertEqual(ok, emqx_crl_cache:refresh(URL)),
#{?snk_kind := crl_cache_insert},
5_000),
fun(Trace) ->
?assertMatch(
[#{error := {http_error, timeout}}],
?of_kind(crl_refresh_failure, Trace)),
ok
end),
ok = snabbkaffe:stop(),
ok.
t_unknown_messages(_Config) ->
{ok, Server} = emqx_crl_cache:start_link(),
gen_server:call(Server, foo),
gen_server:cast(Server, foo),
Server ! foo,
ok.
t_evict(_Config) ->
{ok, _} = emqx_crl_cache:start_link(),
URL = "http://localhost/crl.pem",
ok = snabbkaffe:start_trace(),
?wait_async_action(
?assertEqual(ok, emqx_crl_cache:refresh(URL)),
#{?snk_kind := crl_cache_insert},
5_000),
ok = snabbkaffe:stop(),
Ref = get_crl_cache_table(),
?assertMatch([{"crl.pem", _}], ets:tab2list(Ref)),
snabbkaffe:start_trace(),
{ok, {ok, _}} = ?wait_async_action(emqx_crl_cache:evict(URL),
#{?snk_kind := crl_cache_evict}),
snabbkaffe:stop(),
?assertEqual([], ets:tab2list(Ref)),
ok.
%% check that the URL in the certificate is checked on the fly if the
%% cache is empty.
t_empty_cache(Config) ->
DataDir = ?config(data_dir, Config),
ClientCert = filename:join(DataDir, "client.cert.pem"),
ClientKey = filename:join(DataDir, "client.key.pem"),
{ok, C} = emqtt:start_link([ {ssl, true}
, {ssl_opts, [ {certfile, ClientCert}
, {keyfile, ClientKey}
]}
, {port, 8883}
]),
{ok, _} = emqtt:connect(C),
receive
http_get -> ok
after
2_000 -> error(should_have_checked_server)
end,
emqtt:disconnect(C),
ok.
%% check that the URL in the certificate is *not* checked if the cache
%% contains that URL.
t_filled_cache(Config) ->
DataDir = ?config(data_dir, Config),
ClientCert = filename:join(DataDir, "client.cert.pem"),
ClientKey = filename:join(DataDir, "client.key.pem"),
{ok, C} = emqtt:start_link([ {ssl, true}
, {ssl_opts, [ {certfile, ClientCert}
, {keyfile, ClientKey}
]}
, {port, 8883}
]),
{ok, _} = emqtt:connect(C),
receive
http_get -> error(should_have_used_cache)
after
2_000 -> ok
end,
emqtt:disconnect(C),
ok.
t_refresh_config(_Config) ->
URLs = [ "http://localhost:9878/some.crl.pem"
, "http://localhost:9878/another.crl.pem"
],
SortedURLs = lists:sort(URLs),
emqx_ct_helpers:change_emqx_opts(
ssl_twoway, [ {crl_options, [ {crl_check_enabled, true}
, {crl_cache_urls, URLs}
]}
]),
%% has to be more than 1 minute
NewRefreshInterval = timer:seconds(64),
NewHTTPTimeout = timer:seconds(7),
application:set_env(emqx, crl_cache_refresh_interval, NewRefreshInterval),
application:set_env(emqx, crl_cache_http_timeout, NewHTTPTimeout),
?check_trace(
?wait_async_action(
emqx_crl_cache:refresh_config(),
#{?snk_kind := crl_cache_refresh_config},
_Timeout = 10_000),
fun(Res, Trace) ->
?assertMatch({ok, {ok, _}}, Res),
?assertMatch(
[#{ urls := SortedURLs
, refresh_interval := NewRefreshInterval
, http_timeout := NewHTTPTimeout
}],
?of_kind(crl_cache_refresh_config, Trace),
#{ expected => #{ urls => SortedURLs
, refresh_interval => NewRefreshInterval
, http_timeout => NewHTTPTimeout
}
}),
?assertEqual(SortedURLs, ?projection(url, ?of_kind(crl_cache_ensure_timer, Trace))),
ok
end),
ok.
%% If the CRL is not cached when the client tries to connect and the
%% CRL server is unreachable, the client will be denied connection.
t_not_cached_and_unreachable(Config) ->
DataDir = ?config(data_dir, Config),
ClientCert = filename:join(DataDir, "client.cert.pem"),
ClientKey = filename:join(DataDir, "client.key.pem"),
{ok, C} = emqtt:start_link([ {ssl, true}
, {ssl_opts, [ {certfile, ClientCert}
, {keyfile, ClientKey}
]}
, {port, 8883}
]),
Ref = get_crl_cache_table(),
?assertEqual([], ets:tab2list(Ref)),
process_flag(trap_exit, true),
?assertMatch({error, {{shutdown, {tls_alert, {bad_certificate, _}}}, _}}, emqtt:connect(C)),
ok.
t_revoked(Config) ->
DataDir = ?config(data_dir, Config),
ClientCert = filename:join(DataDir, "client-revoked.cert.pem"),
ClientKey = filename:join(DataDir, "client-revoked.key.pem"),
{ok, C} = emqtt:start_link([ {ssl, true}
, {ssl_opts, [ {certfile, ClientCert}
, {keyfile, ClientKey}
]}
, {port, 8883}
]),
process_flag(trap_exit, true),
?assertMatch({error, {{shutdown, {tls_alert, {certificate_revoked, _}}}, _}}, emqtt:connect(C)),
ok.
t_revoke_then_refresh(Config) ->
DataDir = ?config(data_dir, Config),
CRLPemRevoked = ?config(crl_pem_revoked, Config),
ClientCert = filename:join(DataDir, "client-revoked.cert.pem"),
ClientKey = filename:join(DataDir, "client-revoked.key.pem"),
{ok, C0} = emqtt:start_link([ {ssl, true}
, {ssl_opts, [ {certfile, ClientCert}
, {keyfile, ClientKey}
]}
, {port, 8883}
]),
%% At first, the CRL contains no revoked entries, so the client
%% should be allowed connection.
?assertMatch({ok, _}, emqtt:connect(C0)),
emqtt:stop(C0),
%% Now we update the CRL on the server and wait for the cache to
%% be refreshed.
ok = snabbkaffe:start_trace(),
{true, {ok, _}} =
?wait_async_action(
emqx_crl_cache_http_server:set_crl(CRLPemRevoked),
#{?snk_kind := crl_refresh_timer_done},
70_000),
%% The *same client* should now be denied connection.
{ok, C1} = emqtt:start_link([ {ssl, true}
, {ssl_opts, [ {certfile, ClientCert}
, {keyfile, ClientKey}
]}
, {port, 8883}
]),
process_flag(trap_exit, true),
?assertMatch({error, {{shutdown, {tls_alert, {certificate_revoked, _}}}, _}}, emqtt:connect(C1)),
ok.