Merge pull request #9324 from thalesmg/ocsp-fixes-rv44
OCSP + CRL fixes (rv4.4)
This commit is contained in:
commit
2fe841c451
|
@ -1521,6 +1521,13 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
|
|||
## Value: File
|
||||
listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
|
||||
|
||||
## Whether to enable OCSP stapling for the listener. If set to true,
|
||||
## requires definining the OCSP responder URL.
|
||||
##
|
||||
## Value: boolean
|
||||
## Default: false
|
||||
## listener.ssl.external.enable_ocsp_stapling = true
|
||||
|
||||
## URL for the OCSP responder to check the server certificate against.
|
||||
##
|
||||
## Value: String
|
||||
|
@ -1542,6 +1549,32 @@ listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
|
|||
## Value: Duration
|
||||
## listener.ssl.external.ocsp_refresh_http_timeout = 15s
|
||||
|
||||
## Whether to enable CRL verification and caching for this listener.
|
||||
## If set to true, requires specifying the CRL server URLs.
|
||||
##
|
||||
## Value: boolean
|
||||
## Default: false
|
||||
## listener.ssl.external.enable_crl_cache = true
|
||||
|
||||
## Comma-separated URL list for CRL servers to fetch and cache CRLs
|
||||
## from. Must include the path to the CRL file(s).
|
||||
##
|
||||
## Value: String
|
||||
## listener.ssl.external.crl_cache_urls = http://my.crl.server/intermediate.crl.pem, http://my.other.crl.server/another.crl.pem
|
||||
|
||||
## The timeout for the HTTP request when fetching CRLs.
|
||||
##
|
||||
## Value: Duration
|
||||
## Default: 15 s
|
||||
## listener.ssl.external.crl_cache_http_timeout = 15s
|
||||
|
||||
## The period to refresh the CRLs from the servers. This is global
|
||||
## for all URLs and listeners.
|
||||
##
|
||||
## Value: Duration
|
||||
## Default: 15 m
|
||||
## crl_cache.refresh_interval = 15m
|
||||
|
||||
## The Ephemeral Diffie-Helman key exchange is a very effective way of
|
||||
## ensuring Forward Secrecy by exchanging a set of keys that never hit
|
||||
## the wire. Since the DH key is effectively signed by the private key,
|
||||
|
|
|
@ -1679,6 +1679,11 @@ end}.
|
|||
{datatype, {duration, ms}}
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ssl.$name.enable_ocsp_stapling", "emqx.listeners", [
|
||||
{default, false},
|
||||
{datatype, {enum, [true, false]}}
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ssl.$name.ocsp_responder_url", "emqx.listeners", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
@ -1702,6 +1707,11 @@ end}.
|
|||
{datatype, {enum, [true, false]}}
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ssl.$name.crl_cache_urls", "emqx.listeners", [
|
||||
{default, ""},
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ssl.$name.crl_cache_http_timeout", "emqx.listeners", [
|
||||
{default, "15s"},
|
||||
{datatype, {duration, ms}}
|
||||
|
@ -1712,16 +1722,6 @@ end}.
|
|||
{datatype, {duration, ms}}
|
||||
]}.
|
||||
|
||||
{mapping, "crl_cache.urls", "emqx.crl_cache_urls", [
|
||||
{default, ""},
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{translation, "emqx.crl_cache_urls", fun(Conf) ->
|
||||
Val = cuttlefish:conf_get("crl_cache.urls", Conf),
|
||||
string:tokens(Val, ", ")
|
||||
end}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT/WebSocket Listeners
|
||||
|
||||
|
@ -2224,6 +2224,14 @@ end}.
|
|||
lists:flatten(OriginList)
|
||||
end
|
||||
end,
|
||||
OCSPOpts = fun(Prefix) ->
|
||||
Filter([ {ocsp_stapling_enabled, cuttlefish:conf_get(Prefix ++ ".enable_ocsp_stapling", Conf, undefined)}
|
||||
, {ocsp_responder_url, cuttlefish:conf_get(Prefix ++ ".ocsp_responder_url", Conf, undefined)}
|
||||
, {ocsp_issuer_pem, cuttlefish:conf_get(Prefix ++ ".ocsp_issuer_pem", Conf, undefined)}
|
||||
, {ocsp_refresh_interval, cuttlefish:conf_get(Prefix ++ ".ocsp_refresh_interval", Conf, undefined)}
|
||||
, {ocsp_refresh_http_timeout, cuttlefish:conf_get(Prefix ++ ".ocsp_refresh_http_timeout", Conf, undefined)}
|
||||
])
|
||||
end,
|
||||
|
||||
LisOpts = fun(Prefix) ->
|
||||
Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)},
|
||||
|
@ -2242,10 +2250,6 @@ end}.
|
|||
{supported_subprotocols, string:tokens(cuttlefish:conf_get(Prefix ++ ".supported_subprotocols", Conf, ""), ", ")},
|
||||
{peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
|
||||
{peer_cert_as_clientid, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_clientid", Conf, undefined)},
|
||||
{ocsp_responder_url, cuttlefish:conf_get(Prefix ++ ".ocsp_responder_url", Conf, undefined)},
|
||||
{ocsp_issuer_pem, cuttlefish:conf_get(Prefix ++ ".ocsp_issuer_pem", Conf, undefined)},
|
||||
{ocsp_refresh_interval, cuttlefish:conf_get(Prefix ++ ".ocsp_refresh_interval", Conf, undefined)},
|
||||
{ocsp_refresh_http_timeout, cuttlefish:conf_get(Prefix ++ ".ocsp_refresh_http_timeout", Conf, undefined)},
|
||||
{compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)},
|
||||
{idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)},
|
||||
{max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)},
|
||||
|
@ -2364,7 +2368,16 @@ end}.
|
|||
{hibernate_after, cuttlefish:conf_get(Prefix ++ ".hibernate_after", Conf, undefined)}
|
||||
])
|
||||
end,
|
||||
|
||||
CRLOpts =
|
||||
fun(Prefix) ->
|
||||
CRLURLs = case cuttlefish:conf_get(Prefix ++ ".crl_cache_urls", Conf, undefined) of
|
||||
undefined -> undefined;
|
||||
URLs -> string:tokens(URLs, ", ")
|
||||
end,
|
||||
Filter([ {crl_cache_enabled, cuttlefish:conf_get(Prefix ++ ".enable_crl_cache", Conf, false)}
|
||||
, {crl_cache_urls, CRLURLs}
|
||||
])
|
||||
end,
|
||||
Listen_fix = fun({Ip, Port}) -> case inet:parse_address(Ip) of
|
||||
{ok, R} -> {R, Port};
|
||||
_ -> {Ip, Port}
|
||||
|
@ -2400,6 +2413,8 @@ end}.
|
|||
, opts => [ {deflate_options, DeflateOpts(Prefix)}
|
||||
, {tcp_options, TcpOpts(Prefix)}
|
||||
, {ssl_options, SslOpts(Prefix)}
|
||||
, {crl_options, CRLOpts(Prefix)}
|
||||
, {ocsp_options, OCSPOpts(Prefix)}
|
||||
| LisOpts(Prefix)
|
||||
]
|
||||
}
|
||||
|
|
|
@ -313,7 +313,7 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) ->
|
|||
true ->
|
||||
[];
|
||||
false ->
|
||||
[{load_module, M, brutal_purge, soft_purge, []} || M <- Changed, not is_secret_module(M)] ++
|
||||
[{load_module, M, brutal_purge, soft_purge, []} || M <- Changed, not is_const_module(M)] ++
|
||||
[{add_module, M} || M <- New]
|
||||
end,
|
||||
{OldActionsWithStop, OldActionsAfterStop} =
|
||||
|
@ -325,14 +325,17 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) ->
|
|||
true ->
|
||||
[];
|
||||
false ->
|
||||
[{delete_module, M} || M <- Deleted, not is_secret_module(M)]
|
||||
[{delete_module, M} || M <- Deleted, not is_const_module(M)]
|
||||
end ++
|
||||
AppSpecific.
|
||||
|
||||
%% Do not reload or delet _secret modules
|
||||
is_secret_module(Module) ->
|
||||
is_const_module(Module) when is_atom(Module) ->
|
||||
is_const_module(atom_to_list(Module));
|
||||
is_const_module("emqx_const_" ++ _) ->
|
||||
true;
|
||||
is_const_module(Module) ->
|
||||
Suffix = "_secret",
|
||||
case string:right(atom_to_list(Module), length(Suffix)) of
|
||||
case string:right(Module, length(Suffix)) of
|
||||
Suffix -> true;
|
||||
_ -> false
|
||||
end.
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
[{"4.4.10",
|
||||
[{add_module,emqx_ocsp_cache},
|
||||
{add_module,emqx_crl_cache},
|
||||
{add_module,emqx_const_v1},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
|
@ -19,6 +20,7 @@
|
|||
{"4.4.9",
|
||||
[{add_module,emqx_ocsp_cache},
|
||||
{add_module,emqx_crl_cache},
|
||||
{add_module,emqx_const_v1},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]},
|
||||
|
@ -41,6 +43,7 @@
|
|||
{"4.4.8",
|
||||
[{add_module,emqx_ocsp_cache},
|
||||
{add_module,emqx_crl_cache},
|
||||
{add_module,emqx_const_v1},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]},
|
||||
|
@ -64,6 +67,7 @@
|
|||
{"4.4.7",
|
||||
[{add_module,emqx_ocsp_cache},
|
||||
{add_module,emqx_crl_cache},
|
||||
{add_module,emqx_const_v1},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]},
|
||||
|
@ -87,6 +91,7 @@
|
|||
{"4.4.6",
|
||||
[{add_module,emqx_ocsp_cache},
|
||||
{add_module,emqx_crl_cache},
|
||||
{add_module,emqx_const_v1},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]},
|
||||
|
@ -110,6 +115,7 @@
|
|||
{"4.4.5",
|
||||
[{add_module,emqx_ocsp_cache},
|
||||
{add_module,emqx_crl_cache},
|
||||
{add_module,emqx_const_v1},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]},
|
||||
|
@ -135,6 +141,7 @@
|
|||
{"4.4.4",
|
||||
[{add_module,emqx_ocsp_cache},
|
||||
{add_module,emqx_crl_cache},
|
||||
{add_module,emqx_const_v1},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]},
|
||||
|
@ -167,6 +174,7 @@
|
|||
{"4.4.3",
|
||||
[{add_module,emqx_ocsp_cache},
|
||||
{add_module,emqx_crl_cache},
|
||||
{add_module,emqx_const_v1},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]},
|
||||
|
@ -206,6 +214,7 @@
|
|||
{"4.4.2",
|
||||
[{add_module,emqx_ocsp_cache},
|
||||
{add_module,emqx_crl_cache},
|
||||
{add_module,emqx_const_v1},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]},
|
||||
|
@ -246,6 +255,7 @@
|
|||
{"4.4.1",
|
||||
[{add_module,emqx_ocsp_cache},
|
||||
{add_module,emqx_crl_cache},
|
||||
{add_module,emqx_const_v1},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
|
@ -291,6 +301,7 @@
|
|||
{"4.4.0",
|
||||
[{add_module,emqx_ocsp_cache},
|
||||
{add_module,emqx_crl_cache},
|
||||
{add_module,emqx_const_v1},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% @doc Never update this module, create a v2 instead.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_const_v1).
|
||||
|
||||
-export([make_sni_fun/1]).
|
||||
|
||||
make_sni_fun(ListenerID) ->
|
||||
fun(SN) -> emqx_ocsp_cache:sni_fun(SN, ListenerID) end.
|
|
@ -54,16 +54,19 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
start_link() ->
|
||||
URLs = emqx:get_env(crl_cache_urls, []),
|
||||
RefreshIntervalMS = emqx:get_env(crl_cache_refresh_interval,
|
||||
timer:minutes(15)),
|
||||
Listeners = emqx:get_env(listeners, []),
|
||||
URLs = collect_urls(Listeners),
|
||||
RefreshIntervalMS0 = emqx:get_env(crl_cache_refresh_interval,
|
||||
timer:minutes(15)),
|
||||
MinimumRefreshInverval = timer:minutes(1),
|
||||
RefreshIntervalMS = max(RefreshIntervalMS0, MinimumRefreshInverval),
|
||||
start_link(#{urls => URLs, refresh_interval => RefreshIntervalMS}).
|
||||
|
||||
start_link(Opts = #{urls := _, refresh_interval := _}) ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, Opts, []).
|
||||
|
||||
refresh(URL) ->
|
||||
gen_server:call(?MODULE, {refresh, URL}, ?HTTP_TIMEOUT + 2_000).
|
||||
gen_server:cast(?MODULE, {refresh, URL}).
|
||||
|
||||
evict(URL) ->
|
||||
gen_server:cast(?MODULE, {evict, URL}).
|
||||
|
@ -78,16 +81,6 @@ init(#{urls := URLs, refresh_interval := RefreshIntervalMS}) ->
|
|||
URLs),
|
||||
{ok, State}.
|
||||
|
||||
handle_call({refresh, URL}, _From, State0) ->
|
||||
case do_http_fetch_and_cache(URL) of
|
||||
{error, Error} ->
|
||||
?LOG(error, "failed to fetch crl response for ~p; error: ~p",
|
||||
[URL, Error]),
|
||||
{reply, error, ensure_timer(URL, State0, ?RETRY_TIMEOUT)};
|
||||
{ok, CRLs} ->
|
||||
?LOG(debug, "fetched crl response for ~p", [URL]),
|
||||
{reply, {ok, CRLs}, ensure_timer(URL, State0)}
|
||||
end;
|
||||
handle_call(Call, _From, State) ->
|
||||
{reply, {error, {bad_call, Call}}, State}.
|
||||
|
||||
|
@ -101,6 +94,17 @@ handle_cast({evict, URL}, State0 = #state{refresh_timers = RefreshTimers0}) ->
|
|||
#{ url => URL
|
||||
}),
|
||||
{noreply, State};
|
||||
handle_cast({refresh, URL}, State0) ->
|
||||
case do_http_fetch_and_cache(URL) of
|
||||
{error, Error} ->
|
||||
?tp(crl_refresh_failure, #{error => Error, url => URL}),
|
||||
?LOG(error, "failed to fetch crl response for ~p; error: ~p",
|
||||
[URL, Error]),
|
||||
{noreply, ensure_timer(URL, State0, ?RETRY_TIMEOUT)};
|
||||
{ok, _CRLs} ->
|
||||
?LOG(debug, "fetched crl response for ~p", [URL]),
|
||||
{noreply, ensure_timer(URL, State0)}
|
||||
end;
|
||||
handle_cast(_Cast, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
|
@ -148,7 +152,7 @@ do_http_fetch_and_cache(URL) ->
|
|||
{error, invalid_crl};
|
||||
CRLs ->
|
||||
ssl_crl_cache:insert(URL, {der, CRLs}),
|
||||
?tp(crl_cache_insert, #{url => URL}),
|
||||
?tp(crl_cache_insert, #{url => URL, crls => CRLs}),
|
||||
{ok, CRLs}
|
||||
end;
|
||||
{ok, {{_, Code, _}, _, Body}} ->
|
||||
|
@ -175,3 +179,20 @@ ensure_timer(URL, State = #state{refresh_timers = RefreshTimers0}, Timeout) ->
|
|||
Timeout,
|
||||
{refresh, URL})},
|
||||
State#state{refresh_timers = RefreshTimers}.
|
||||
|
||||
collect_urls(Listeners) ->
|
||||
CRLOpts0 = [CRLOpts || #{proto := ssl, opts := Opts} <- Listeners,
|
||||
{crl_options, CRLOpts} <- Opts],
|
||||
CRLOpts1 =
|
||||
lists:filter(
|
||||
fun(CRLOpts) ->
|
||||
proplists:get_bool(crl_cache_enabled, CRLOpts)
|
||||
end,
|
||||
CRLOpts0),
|
||||
CRLURLs =
|
||||
lists:flatmap(
|
||||
fun(CRLOpts) ->
|
||||
proplists:get_value(crl_cache_urls, CRLOpts, [])
|
||||
end,
|
||||
CRLOpts1),
|
||||
lists:usort(CRLURLs).
|
||||
|
|
|
@ -41,7 +41,9 @@
|
|||
, format_listen_on/1
|
||||
]).
|
||||
|
||||
-type(listener() :: #{ name := binary()
|
||||
-type(listener_name() :: binary()).
|
||||
-type(listener_id() :: binary()).
|
||||
-type(listener() :: #{ name := listener_name()
|
||||
, proto := esockd:proto()
|
||||
, listen_on := esockd:listen_on()
|
||||
, opts := [esockd:option()]
|
||||
|
@ -70,7 +72,7 @@ find_by_id(Id) ->
|
|||
find_by_id(iolist_to_binary(Id), emqx:get_env(listeners, [])).
|
||||
|
||||
%% @doc Return the ID of the given listener.
|
||||
-spec identifier(listener()) -> binary().
|
||||
-spec identifier(listener()) -> listener_id().
|
||||
identifier(#{proto := Proto, name := Name}) ->
|
||||
identifier(Proto, Name).
|
||||
|
||||
|
@ -88,7 +90,8 @@ ensure_all_started() ->
|
|||
ensure_all_started([], []) -> ok;
|
||||
ensure_all_started([], Failed) -> error(Failed);
|
||||
ensure_all_started([L | Rest], Results) ->
|
||||
#{proto := Proto, listen_on := ListenOn, opts := Options} = L,
|
||||
#{proto := Proto, listen_on := ListenOn, opts := Options0} = L,
|
||||
Options = [{listener_id, identifier(L)} | Options0],
|
||||
NewResults =
|
||||
case start_listener(Proto, ListenOn, Options) of
|
||||
{ok, _Pid} ->
|
||||
|
@ -105,10 +108,10 @@ ensure_all_started([L | Rest], Results) ->
|
|||
format_listen_on(ListenOn) -> format(ListenOn).
|
||||
|
||||
-spec(start_listener(listener()) -> ok).
|
||||
start_listener(Listener = #{proto := Proto, name := Name, listen_on := ListenOn}) ->
|
||||
start_listener(#{proto := Proto, name := Name, listen_on := ListenOn, opts := Opts0}) ->
|
||||
ID = identifier(Proto, Name),
|
||||
Options = emqx_ocsp_cache:inject_sni_fun(Listener),
|
||||
case start_listener(Proto, ListenOn, Options) of
|
||||
Opts = [{listener_id, ID} | Opts0],
|
||||
case start_listener(Proto, ListenOn, Opts) of
|
||||
{ok, _} ->
|
||||
console_print("Start ~s listener on ~s successfully.~n", [ID, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
|
@ -125,13 +128,17 @@ console_print(_Fmt, _Args) -> ok.
|
|||
-endif.
|
||||
|
||||
%% Start MQTT/TCP listener
|
||||
-spec(start_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
|
||||
-spec(start_listener(esockd:proto(), esockd:listen_on(), [ esockd:option()
|
||||
| {listener_id, binary()}])
|
||||
-> {ok, pid()} | {error, term()}).
|
||||
start_listener(tcp, ListenOn, Options) ->
|
||||
start_mqtt_listener('mqtt:tcp', ListenOn, Options);
|
||||
|
||||
%% Start MQTT/TLS listener
|
||||
start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls ->
|
||||
start_listener(Proto, ListenOn, Options0) when Proto == ssl; Proto == tls ->
|
||||
ListenerID = proplists:get_value(listener_id, Options0),
|
||||
Options1 = proplists:delete(listener_id, Options0),
|
||||
Options = emqx_ocsp_cache:inject_sni_fun(ListenerID, Options1),
|
||||
start_mqtt_listener('mqtt:ssl', ListenOn, Options);
|
||||
|
||||
%% Start MQTT/WS listener
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
, sni_fun/2
|
||||
, fetch_response/1
|
||||
, register_listener/1
|
||||
, inject_sni_fun/1
|
||||
, inject_sni_fun/2
|
||||
]).
|
||||
|
||||
%% gen_server API
|
||||
|
@ -48,6 +48,11 @@
|
|||
-define(CALL_TIMEOUT, 20_000).
|
||||
-define(RETRY_TIMEOUT, 5_000).
|
||||
-define(REFRESH_TIMER(LID), {refresh_timer, LID}).
|
||||
-ifdef(TEST).
|
||||
-define(MIN_REFRESH_INTERVAL, timer:seconds(5)).
|
||||
-else.
|
||||
-define(MIN_REFRESH_INTERVAL, timer:minutes(1)).
|
||||
-endif.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
|
@ -87,25 +92,25 @@ fetch_response(ListenerID) ->
|
|||
register_listener(ListenerID) ->
|
||||
gen_server:call(?MODULE, {register_listener, ListenerID}, ?CALL_TIMEOUT).
|
||||
|
||||
-spec inject_sni_fun(emqx_listeners:listener()) -> [esockd:option()].
|
||||
inject_sni_fun(Listener = #{proto := Proto, name := Name, opts := Options0}) ->
|
||||
-spec inject_sni_fun(emqx_listeners:listener_id(), [esockd:option()]) -> [esockd:option()].
|
||||
inject_sni_fun(ListenerID, Options0) ->
|
||||
%% We need to patch `sni_fun' here and not in `emqx.schema'
|
||||
%% because otherwise an anonymous function will end up in
|
||||
%% `app.*.config'...
|
||||
ListenerID = emqx_listeners:identifier(Listener),
|
||||
case proplists:get_value(ocsp_responder_url, Options0, undefined) of
|
||||
undefined ->
|
||||
OCSPOpts = proplists:get_value(ocsp_options, Options0, []),
|
||||
case proplists:get_bool(ocsp_stapling_enabled, OCSPOpts) of
|
||||
false ->
|
||||
Options0;
|
||||
_URL ->
|
||||
true ->
|
||||
SSLOpts0 = proplists:get_value(ssl_options, Options0, []),
|
||||
SNIFun = fun(SN) -> emqx_ocsp_cache:sni_fun(SN, ListenerID) end,
|
||||
SNIFun = emqx_const_v1:make_sni_fun(ListenerID),
|
||||
Options1 = proplists:delete(ssl_options, Options0),
|
||||
Options = [{ssl_options, [{sni_fun, SNIFun} | SSLOpts0]} | Options1],
|
||||
%% save to env
|
||||
{[ThisListener0], Listeners} =
|
||||
lists:partition(
|
||||
fun(#{name := N, proto := P}) ->
|
||||
N =:= Name andalso P =:= Proto
|
||||
fun(L) ->
|
||||
emqx_listeners:identifier(L) =:= ListenerID
|
||||
end,
|
||||
emqx:get_env(listeners)),
|
||||
ThisListener = ThisListener0#{opts => Options},
|
||||
|
@ -141,7 +146,9 @@ handle_call({http_fetch, ListenerID}, _From, State) ->
|
|||
handle_call({register_listener, ListenerID}, _From, State0) ->
|
||||
?LOG(debug, "registering ocsp cache for ~p", [ListenerID]),
|
||||
#{opts := Opts} = emqx_listeners:find_by_id(ListenerID),
|
||||
RefreshInterval = proplists:get_value(ocsp_refresh_interval, Opts),
|
||||
OCSPOpts = proplists:get_value(ocsp_options, Opts),
|
||||
RefreshInterval0 = proplists:get_value(ocsp_refresh_interval, OCSPOpts),
|
||||
RefreshInterval = max(RefreshInterval0, ?MIN_REFRESH_INTERVAL),
|
||||
State = State0#{{refresh_interval, ListenerID} => RefreshInterval},
|
||||
{reply, ok, ensure_timer(ListenerID, State, 0)};
|
||||
handle_call(Call, _From, State) ->
|
||||
|
@ -170,20 +177,6 @@ handle_info(_Info, State) ->
|
|||
{noreply, State}.
|
||||
|
||||
code_change(_Vsn, State, _Extra) ->
|
||||
%% we need to re-create the `sni_fun' lambda that the SSL
|
||||
%% listeners are holding onto to avoid them becoming `badfun''s.
|
||||
ListenersToPatch =
|
||||
lists:filter(
|
||||
fun(#{opts := Opts}) ->
|
||||
undefined =/= proplists:get_value(ocsp_responder_url, Opts)
|
||||
end,
|
||||
emqx:get_env(listeners, [])),
|
||||
PatchedListeners = [L#{opts => ?MODULE:inject_sni_fun(L)} || L <- ListenersToPatch],
|
||||
lists:foreach(
|
||||
fun(L) ->
|
||||
emqx_listeners:update_listeners_env(update, L)
|
||||
end,
|
||||
PatchedListeners),
|
||||
{ok, State}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -222,9 +215,10 @@ read_server_cert(ServerCertPemPath0) ->
|
|||
|
||||
do_http_fetch_and_cache(ListenerID) ->
|
||||
#{opts := Options} = emqx_listeners:find_by_id(ListenerID),
|
||||
ResponderURL0 = proplists:get_value(ocsp_responder_url, Options, undefined),
|
||||
OCSPOpts = proplists:get_value(ocsp_options, Options),
|
||||
ResponderURL0 = proplists:get_value(ocsp_responder_url, OCSPOpts, undefined),
|
||||
ResponderURL = uri_string:normalize(ResponderURL0),
|
||||
IssuerPemPath = proplists:get_value(ocsp_issuer_pem, Options, undefined),
|
||||
IssuerPemPath = proplists:get_value(ocsp_issuer_pem, OCSPOpts, undefined),
|
||||
SSLOpts = proplists:get_value(ssl_options, Options, undefined),
|
||||
ServerCertPemPath = proplists:get_value(certfile, SSLOpts, undefined),
|
||||
IssuerPem = case file:read_file(IssuerPemPath) of
|
||||
|
@ -233,7 +227,7 @@ do_http_fetch_and_cache(ListenerID) ->
|
|||
end,
|
||||
ServerCert = read_server_cert(ServerCertPemPath),
|
||||
Request = build_ocsp_request(IssuerPem, ServerCert),
|
||||
HTTPTimeout = proplists:get_value(ocsp_refresh_http_timeout, Options),
|
||||
HTTPTimeout = proplists:get_value(ocsp_refresh_http_timeout, OCSPOpts),
|
||||
?tp(ocsp_http_fetch, #{ listener_id => ListenerID
|
||||
, responder_url => ResponderURL
|
||||
, timeout => HTTPTimeout
|
||||
|
|
|
@ -79,13 +79,21 @@ end_per_testcase(TestCase, Config)
|
|||
ServerPid = ?config(http_server, Config),
|
||||
emqx_crl_cache_http_server:stop(ServerPid),
|
||||
emqx_ct_helpers:stop_apps([]),
|
||||
application:set_env(emqx, crl_cache_urls, []),
|
||||
emqx_ct_helpers:change_emqx_opts(
|
||||
ssl_twoway, [ {crl_options, [ {crl_cache_enabled, false}
|
||||
, {crl_cache_urls, []}
|
||||
]}
|
||||
]),
|
||||
application:stop(cowboy),
|
||||
clear_crl_cache(),
|
||||
ok;
|
||||
end_per_testcase(t_not_cached_and_unreachable, _Config) ->
|
||||
emqx_ct_helpers:stop_apps([]),
|
||||
application:set_env(emqx, crl_cache_urls, []),
|
||||
emqx_ct_helpers:change_emqx_opts(
|
||||
ssl_twoway, [ {crl_options, [ {crl_cache_enabled, false}
|
||||
, {crl_cache_urls, []}
|
||||
]}
|
||||
]),
|
||||
clear_crl_cache(),
|
||||
ok;
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
|
@ -177,16 +185,19 @@ setup_crl_options(Config, #{is_cached := IsCached}) ->
|
|||
end,
|
||||
Handler =
|
||||
fun(emqx) ->
|
||||
application:set_env(emqx, crl_cache_urls, URLs),
|
||||
emqx_ct_helpers:change_emqx_opts(
|
||||
ssl_twoway, [{ssl_options, [ {certfile, Certfile}
|
||||
, {keyfile, Keyfile}
|
||||
, {verify, verify_peer}
|
||||
%% {crl_check, true} does not work; probably bug in OTP
|
||||
, {crl_check, peer}
|
||||
, {crl_cache,
|
||||
{ssl_crl_cache, {internal, [{http, timer:seconds(15)}]}}}
|
||||
]}]),
|
||||
ssl_twoway, [ {ssl_options, [ {certfile, Certfile}
|
||||
, {keyfile, Keyfile}
|
||||
, {verify, verify_peer}
|
||||
%% {crl_check, true} does not work; probably bug in OTP
|
||||
, {crl_check, peer}
|
||||
, {crl_cache,
|
||||
{ssl_crl_cache, {internal, [{http, timer:seconds(15)}]}}}
|
||||
]}
|
||||
, {crl_options, [ {crl_cache_enabled, true}
|
||||
, {crl_cache_urls, URLs}
|
||||
]}
|
||||
]),
|
||||
%% emqx_ct_helpers:change_emqx_opts has cacertfile hardcoded....
|
||||
ok = force_cacertfile(Cacertfile),
|
||||
ok;
|
||||
|
@ -269,7 +280,12 @@ t_manual_refresh(Config) ->
|
|||
?assertEqual([], ets:tab2list(Ref)),
|
||||
{ok, _} = emqx_crl_cache:start_link(),
|
||||
URL = "http://localhost/crl.pem",
|
||||
?assertEqual({ok, [CRLDer]}, emqx_crl_cache:refresh(URL)),
|
||||
ok = snabbkaffe:start_trace(),
|
||||
?wait_async_action(
|
||||
?assertEqual(ok, emqx_crl_cache:refresh(URL)),
|
||||
#{?snk_kind := crl_cache_insert},
|
||||
5_000),
|
||||
ok = snabbkaffe:stop(),
|
||||
?assertEqual(
|
||||
[{"crl.pem", [CRLDer]}],
|
||||
ets:tab2list(Ref)),
|
||||
|
@ -282,7 +298,18 @@ t_refresh_request_error(_Config) ->
|
|||
end),
|
||||
{ok, _} = emqx_crl_cache:start_link(),
|
||||
URL = "http://localhost/crl.pem",
|
||||
?assertEqual(error, emqx_crl_cache:refresh(URL)),
|
||||
?check_trace(
|
||||
?wait_async_action(
|
||||
?assertEqual(ok, emqx_crl_cache:refresh(URL)),
|
||||
#{?snk_kind := crl_cache_insert},
|
||||
5_000),
|
||||
fun(Trace) ->
|
||||
?assertMatch(
|
||||
[#{error := {bad_response, #{code := 404}}}],
|
||||
?of_kind(crl_refresh_failure, Trace)),
|
||||
ok
|
||||
end),
|
||||
ok = snabbkaffe:stop(),
|
||||
ok.
|
||||
|
||||
t_refresh_invalid_response(_Config) ->
|
||||
|
@ -292,7 +319,18 @@ t_refresh_invalid_response(_Config) ->
|
|||
end),
|
||||
{ok, _} = emqx_crl_cache:start_link(),
|
||||
URL = "http://localhost/crl.pem",
|
||||
?assertEqual({ok, []}, emqx_crl_cache:refresh(URL)),
|
||||
?check_trace(
|
||||
?wait_async_action(
|
||||
?assertEqual(ok, emqx_crl_cache:refresh(URL)),
|
||||
#{?snk_kind := crl_cache_insert},
|
||||
5_000),
|
||||
fun(Trace) ->
|
||||
?assertMatch(
|
||||
[#{crls := []}],
|
||||
?of_kind(crl_cache_insert, Trace)),
|
||||
ok
|
||||
end),
|
||||
ok = snabbkaffe:stop(),
|
||||
ok.
|
||||
|
||||
t_refresh_http_error(_Config) ->
|
||||
|
@ -302,7 +340,18 @@ t_refresh_http_error(_Config) ->
|
|||
end),
|
||||
{ok, _} = emqx_crl_cache:start_link(),
|
||||
URL = "http://localhost/crl.pem",
|
||||
?assertEqual(error, emqx_crl_cache:refresh(URL)),
|
||||
?check_trace(
|
||||
?wait_async_action(
|
||||
?assertEqual(ok, emqx_crl_cache:refresh(URL)),
|
||||
#{?snk_kind := crl_cache_insert},
|
||||
5_000),
|
||||
fun(Trace) ->
|
||||
?assertMatch(
|
||||
[#{error := {http_error, timeout}}],
|
||||
?of_kind(crl_refresh_failure, Trace)),
|
||||
ok
|
||||
end),
|
||||
ok = snabbkaffe:stop(),
|
||||
ok.
|
||||
|
||||
t_unknown_messages(_Config) ->
|
||||
|
@ -315,7 +364,12 @@ t_unknown_messages(_Config) ->
|
|||
t_evict(_Config) ->
|
||||
{ok, _} = emqx_crl_cache:start_link(),
|
||||
URL = "http://localhost/crl.pem",
|
||||
{ok, [_]} = emqx_crl_cache:refresh(URL),
|
||||
ok = snabbkaffe:start_trace(),
|
||||
?wait_async_action(
|
||||
?assertEqual(ok, emqx_crl_cache:refresh(URL)),
|
||||
#{?snk_kind := crl_cache_insert},
|
||||
5_000),
|
||||
ok = snabbkaffe:stop(),
|
||||
Ref = get_crl_cache_table(),
|
||||
?assertMatch([{"crl.pem", _}], ets:tab2list(Ref)),
|
||||
snabbkaffe:start_trace(),
|
||||
|
|
|
@ -96,10 +96,12 @@ init_per_testcase(t_openssl_client, Config) ->
|
|||
, {cacertfile, CACert}
|
||||
]),
|
||||
Opts1 = proplists:delete(ssl_options, Opts0),
|
||||
Opts2 = [ {ocsp_responder_url, "http://127.0.0.1:9877"}
|
||||
, {ocsp_issuer_pem, IssuerPem}
|
||||
, {ssl_options, SSLOpts2}
|
||||
| Opts1],
|
||||
OCSPOpts = [ {ocsp_stapling_enabled, true}
|
||||
, {ocsp_responder_url, "http://127.0.0.1:9877"}
|
||||
, {ocsp_issuer_pem, IssuerPem}
|
||||
],
|
||||
Opts2 = emqx_misc:merge_opts(Opts1, [ {ocsp_options, OCSPOpts}
|
||||
, {ssl_options, SSLOpts2}]),
|
||||
Listeners = [ SSLListener0#{opts => Opts2}
|
||||
| Listeners1],
|
||||
application:set_env(emqx, listeners, Listeners),
|
||||
|
@ -109,7 +111,18 @@ init_per_testcase(t_openssl_client, Config) ->
|
|||
end,
|
||||
OCSPResponderPort = spawn_openssl_ocsp_responder(Config),
|
||||
{os_pid, OCSPOSPid} = erlang:port_info(OCSPResponderPort, os_pid),
|
||||
ensure_port_open(9877),
|
||||
%%%%%%%% Warning!!!
|
||||
%% Apparently, openssl 3.0.7 introduced a bug in the responder
|
||||
%% that makes it hang forever if one probes the port with
|
||||
%% `gen_tcp:open' / `gen_tcp:close'... Comment this out if
|
||||
%% openssl gets updated in CI or in your local machine.
|
||||
case openssl_version() of
|
||||
"3." ++ _ ->
|
||||
%% hope that the responder has started...
|
||||
ok;
|
||||
_ ->
|
||||
ensure_port_open(9877)
|
||||
end,
|
||||
ct:sleep(1_000),
|
||||
emqx_ct_helpers:start_apps([], Handler),
|
||||
ct:sleep(1_000),
|
||||
|
@ -128,17 +141,20 @@ init_per_testcase(_TestCase, Config) ->
|
|||
end),
|
||||
{ok, CachePid} = emqx_ocsp_cache:start_link(),
|
||||
DataDir = ?config(data_dir, Config),
|
||||
OCSPOpts = [ {ocsp_stapling_enabled, true}
|
||||
, {ocsp_responder_url, "http://localhost:9877"}
|
||||
, {ocsp_issuer_pem,
|
||||
filename:join(DataDir, "ocsp-issuer.pem")}
|
||||
, {ocsp_refresh_http_timeout, 15_000}
|
||||
, {ocsp_refresh_interval, 1_000}
|
||||
],
|
||||
application:set_env(
|
||||
emqx, listeners,
|
||||
[#{ proto => ssl
|
||||
, name => "test_ocsp"
|
||||
, opts => [ {ssl_options, [{certfile,
|
||||
filename:join(DataDir, "server.pem")}]}
|
||||
, {ocsp_responder_url, "http://localhost:9877"}
|
||||
, {ocsp_issuer_pem,
|
||||
filename:join(DataDir, "ocsp-issuer.pem")}
|
||||
, {ocsp_refresh_http_timeout, 15_000}
|
||||
, {ocsp_refresh_interval, 1_000}
|
||||
, {ocsp_options, OCSPOpts}
|
||||
]
|
||||
}]),
|
||||
snabbkaffe:start_trace(),
|
||||
|
@ -291,6 +307,12 @@ get_sni_fun(ListenerID) ->
|
|||
SSLOpts = proplists:get_value(ssl_options, Opts),
|
||||
proplists:get_value(sni_fun, SSLOpts).
|
||||
|
||||
openssl_version() ->
|
||||
Res0 = string:trim(os:cmd("openssl version"), trailing),
|
||||
[_, Res] = string:split(Res0, " "),
|
||||
{match, [Version]} = re:run(Res, "^([^ ]+)", [{capture, first, list}]),
|
||||
Version.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test cases
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -406,6 +428,22 @@ t_register_listener(_Config) ->
|
|||
?assertMatch([{_, <<"ocsp response">>}], ets:tab2list(?CACHE_TAB)),
|
||||
ok.
|
||||
|
||||
t_register_twice(_Config) ->
|
||||
ListenerID = <<"mqtt:ssl:test_ocsp">>,
|
||||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
emqx_ocsp_cache:register_listener(ListenerID),
|
||||
#{?snk_kind := ocsp_http_fetch_and_cache, listener_id := ListenerID}),
|
||||
assert_http_get(1),
|
||||
?assertMatch([{_, <<"ocsp response">>}], ets:tab2list(?CACHE_TAB)),
|
||||
%% should have no problem in registering the same listener again.
|
||||
%% this prompts an immediate refresh.
|
||||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
emqx_ocsp_cache:register_listener(ListenerID),
|
||||
#{?snk_kind := ocsp_http_fetch_and_cache, listener_id := ListenerID}),
|
||||
ok.
|
||||
|
||||
t_refresh_periodically(_Config) ->
|
||||
ListenerID = <<"mqtt:ssl:test_ocsp">>,
|
||||
%% should refresh periodically
|
||||
|
@ -417,7 +455,7 @@ t_refresh_periodically(_Config) ->
|
|||
false
|
||||
end,
|
||||
_NEvents = 2,
|
||||
_Timeout = 5_000),
|
||||
_Timeout = 10_000),
|
||||
ok = emqx_ocsp_cache:register_listener(ListenerID),
|
||||
?assertMatch({ok, [_, _]}, snabbkaffe:receive_events(SubRef)),
|
||||
assert_http_get(2),
|
||||
|
@ -447,13 +485,6 @@ t_sni_fun_http_error(_Config) ->
|
|||
emqx_ocsp_cache:sni_fun(ServerName, ListenerID)),
|
||||
ok.
|
||||
|
||||
t_code_change(_Config) ->
|
||||
ListenerID = <<"mqtt:ssl:test_ocsp">>,
|
||||
SNIFun0 = get_sni_fun(ListenerID),
|
||||
?assertMatch({ok, _}, emqx_ocsp_cache:code_change(vsn, state, extra)),
|
||||
SNIFun1 = get_sni_fun(ListenerID),
|
||||
?assertNotEqual(SNIFun0, SNIFun1).
|
||||
|
||||
t_openssl_client(Config) ->
|
||||
TLSVsn = ?config(tls_vsn, Config),
|
||||
WithStatusRequest = ?config(status_request, Config),
|
||||
|
|
Loading…
Reference in New Issue