From 38e57f511cac240fe273c33af453f58303e883a4 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 31 May 2023 12:18:43 +0800 Subject: [PATCH] feat: update crl_cache conf at runtime --- apps/emqx/src/emqx_crl_cache.erl | 91 ++++++++++++--------- apps/emqx/test/emqx_common_test_helpers.erl | 1 + apps/emqx/test/emqx_crl_cache_SUITE.erl | 34 ++++++++ 3 files changed, 86 insertions(+), 40 deletions(-) diff --git a/apps/emqx/src/emqx_crl_cache.erl b/apps/emqx/src/emqx_crl_cache.erl index 084313420..0ca779181 100644 --- a/apps/emqx/src/emqx_crl_cache.erl +++ b/apps/emqx/src/emqx_crl_cache.erl @@ -21,10 +21,10 @@ %% API -export([ start_link/0, - start_link/1, register_der_crls/2, refresh/1, - evict/1 + evict/1, + update_config/1 ]). %% gen_server callbacks @@ -32,13 +32,17 @@ init/1, handle_call/3, handle_cast/2, - handle_info/2 + handle_info/2, + terminate/2 ]). +-export([post_config_update/5]). + %% internal exports -export([http_get/2]). -behaviour(gen_server). +-behaviour(emqx_config_handler). -include("logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -52,6 +56,7 @@ -endif. -define(DEFAULT_REFRESH_INTERVAL, timer:minutes(15)). -define(DEFAULT_CACHE_CAPACITY, 100). +-define(CONF_KEY_PATH, [crl_cache]). -record(state, { refresh_timers = #{} :: #{binary() => timer:tref()}, @@ -73,12 +78,11 @@ %% API %%-------------------------------------------------------------------- -start_link() -> - Config = gather_config(), - start_link(Config). +post_config_update(?CONF_KEY_PATH, _Req, Conf, Conf, _AppEnvs) -> ok; +post_config_update(?CONF_KEY_PATH, _Req, NewConf, _OldConf, _AppEnvs) -> update_config(NewConf). -start_link(Config = #{cache_capacity := _, refresh_interval := _, http_timeout := _}) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, Config, []). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -spec refresh(url()) -> ok. refresh(URL) -> @@ -88,6 +92,10 @@ refresh(URL) -> evict(URL) -> gen_server:cast(?MODULE, {evict, URL}). +-spec update_config(map()) -> ok. +update_config(Conf) -> + gen_server:cast(?MODULE, {update_config, Conf}). + %% Adds CRLs in DER format to the cache and register them for periodic %% refresh. -spec register_der_crls(url(), [public_key:der_encoded()]) -> ok. @@ -98,18 +106,18 @@ register_der_crls(URL, CRLs) when is_list(CRLs) -> %% gen_server behaviour %%-------------------------------------------------------------------- -init(Config) -> - #{ - cache_capacity := CacheCapacity, - refresh_interval := RefreshIntervalMS, - http_timeout := HTTPTimeoutMS - } = Config, - State = #state{ - cache_capacity = CacheCapacity, - refresh_interval = RefreshIntervalMS, - http_timeout = HTTPTimeoutMS - }, - {ok, State}. +init([]) -> + erlang:process_flag(trap_exit, true), + ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), + Conf = emqx:get_config( + ?CONF_KEY_PATH, + #{ + capacity => ?DEFAULT_CACHE_CAPACITY, + refresh_interval => ?DEFAULT_REFRESH_INTERVAL, + http_timeout => ?HTTP_TIMEOUT + } + ), + {ok, update_state_config(Conf, #state{})}. handle_call(Call, _From, State) -> {reply, {error, {bad_call, Call}}, State}. @@ -144,6 +152,8 @@ handle_cast({refresh, URL}, State0) -> }), {noreply, ensure_timer(URL, State0)} end; +handle_cast({update_config, Conf}, State0) -> + {noreply, update_state_config(Conf, State0)}; handle_cast(_Cast, State) -> {noreply, State}. @@ -175,10 +185,25 @@ handle_info( handle_info(_Info, State) -> {noreply, State}. +terminate(_, _) -> + emqx_config_handler:remove_handler(?CONF_KEY_PATH). + %%-------------------------------------------------------------------- %% internal functions %%-------------------------------------------------------------------- +update_state_config(Conf, State) -> + #{ + capacity := CacheCapacity, + refresh_interval := RefreshIntervalMS, + http_timeout := HTTPTimeoutMS + } = gather_config(Conf), + State#state{ + cache_capacity = CacheCapacity, + refresh_interval = RefreshIntervalMS, + http_timeout = HTTPTimeoutMS + }. + http_get(URL, HTTPTimeout) -> httpc:request( get, @@ -198,7 +223,7 @@ do_http_fetch_and_cache(URL, HTTPTimeoutMS) -> CRLs -> %% Note: must ensure it's a string and not a %% binary because that's what the ssl manager uses - %% when doing lookups. + %% when doing lookup. emqx_ssl_crl_cache:insert(to_string(URL), {der, CRLs}), ?tp(crl_cache_insert, #{url => URL, crls => CRLs}), {ok, CRLs} @@ -232,25 +257,11 @@ ensure_timer(URL, State = #state{refresh_timers = RefreshTimers0}, Timeout) -> }, State#state{refresh_timers = RefreshTimers}. --spec gather_config() -> - #{ - cache_capacity := pos_integer(), - refresh_interval := timer:time(), - http_timeout := timer:time() - }. -gather_config() -> - %% TODO: add a config handler to refresh the config when those - %% globals change? - CacheCapacity = emqx_config:get([crl_cache, capacity], ?DEFAULT_CACHE_CAPACITY), - RefreshIntervalMS0 = emqx_config:get([crl_cache, refresh_interval], ?DEFAULT_REFRESH_INTERVAL), - MinimumRefreshInverval = ?MIN_REFRESH_PERIOD, - RefreshIntervalMS = max(RefreshIntervalMS0, MinimumRefreshInverval), - HTTPTimeoutMS = emqx_config:get([crl_cache, http_timeout], ?HTTP_TIMEOUT), - #{ - cache_capacity => CacheCapacity, - refresh_interval => RefreshIntervalMS, - http_timeout => HTTPTimeoutMS - }. +gather_config(Conf) -> + RefreshIntervalMS0 = maps:get(refresh_interval, Conf), + MinimumRefreshInterval = ?MIN_REFRESH_PERIOD, + RefreshIntervalMS = max(RefreshIntervalMS0, MinimumRefreshInterval), + Conf#{refresh_interval => RefreshIntervalMS}. -spec handle_register_der_crls(state(), url(), [public_key:der_encoded()]) -> {noreply, state()}. handle_register_der_crls(State0, URL0, CRLs) -> diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index bfa0e36cb..f85ed2599 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -540,6 +540,7 @@ load_config(SchemaModule, Config) -> false -> Config end, ok = emqx_config:delete_override_conf_files(), + ok = copy_acl_conf(), ok = emqx_config:init_load(SchemaModule, ConfigBin). -spec is_all_tcp_servers_available(Servers) -> Result when diff --git a/apps/emqx/test/emqx_crl_cache_SUITE.erl b/apps/emqx/test/emqx_crl_cache_SUITE.erl index 1b8abb9c3..be8f49343 100644 --- a/apps/emqx/test/emqx_crl_cache_SUITE.erl +++ b/apps/emqx/test/emqx_crl_cache_SUITE.erl @@ -481,6 +481,7 @@ ensure_ssl_manager_alive() -> t_init_empty_urls(_Config) -> Ref = get_crl_cache_table(), ?assertEqual([], ets:tab2list(Ref)), + emqx_config_handler:start_link(), ?assertMatch({ok, _}, emqx_crl_cache:start_link()), receive {http_get, _} -> @@ -488,12 +489,34 @@ t_init_empty_urls(_Config) -> after 1000 -> ok end, ?assertEqual([], ets:tab2list(Ref)), + emqx_config_handler:stop(), + ok. + +t_update_config(_Config) -> + emqx_config:save_schema_mod_and_names(emqx_schema), + emqx_config_handler:start_link(), + {ok, Pid} = emqx_crl_cache:start_link(), + Conf = #{ + refresh_interval => timer:minutes(5), + http_timeout => timer:minutes(10), + capacity => 123 + }, + ?assertMatch({ok, _}, emqx:update_config([<<"crl_cache">>], Conf)), + State = sys:get_state(Pid), + ?assertEqual(Conf, #{ + refresh_interval => element(3, State), + http_timeout => element(4, State), + capacity => element(7, State) + }), + emqx_config:erase(<<"crl_cache">>), + emqx_config_handler:stop(), ok. t_manual_refresh(Config) -> CRLDer = ?config(crl_der, Config), Ref = get_crl_cache_table(), ?assertEqual([], ets:tab2list(Ref)), + emqx_config_handler:start_link(), {ok, _} = emqx_crl_cache:start_link(), URL = "http://localhost/crl.pem", ok = snabbkaffe:start_trace(), @@ -507,6 +530,7 @@ t_manual_refresh(Config) -> [{"crl.pem", [CRLDer]}], ets:tab2list(Ref) ), + emqx_config_handler:stop(), ok. t_refresh_request_error(_Config) -> @@ -517,6 +541,7 @@ t_refresh_request_error(_Config) -> {ok, {{"HTTP/1.0", 404, 'Not Found'}, [], <<"not found">>}} end ), + emqx_config_handler:start_link(), {ok, _} = emqx_crl_cache:start_link(), URL = "http://localhost/crl.pem", ?check_trace( @@ -534,6 +559,7 @@ t_refresh_request_error(_Config) -> end ), ok = snabbkaffe:stop(), + emqx_config_handler:stop(), ok. t_refresh_invalid_response(_Config) -> @@ -544,6 +570,7 @@ t_refresh_invalid_response(_Config) -> {ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"not a crl">>}} end ), + emqx_config_handler:start_link(), {ok, _} = emqx_crl_cache:start_link(), URL = "http://localhost/crl.pem", ?check_trace( @@ -561,6 +588,7 @@ t_refresh_invalid_response(_Config) -> end ), ok = snabbkaffe:stop(), + emqx_config_handler:stop(), ok. t_refresh_http_error(_Config) -> @@ -571,6 +599,7 @@ t_refresh_http_error(_Config) -> {error, timeout} end ), + emqx_config_handler:start_link(), {ok, _} = emqx_crl_cache:start_link(), URL = "http://localhost/crl.pem", ?check_trace( @@ -588,16 +617,20 @@ t_refresh_http_error(_Config) -> end ), ok = snabbkaffe:stop(), + emqx_config_handler:stop(), ok. t_unknown_messages(_Config) -> + emqx_config_handler:start_link(), {ok, Server} = emqx_crl_cache:start_link(), gen_server:call(Server, foo), gen_server:cast(Server, foo), Server ! foo, + emqx_config_handler:stop(), ok. t_evict(_Config) -> + emqx_config_handler:start_link(), {ok, _} = emqx_crl_cache:start_link(), URL = "http://localhost/crl.pem", ?wait_async_action( @@ -612,6 +645,7 @@ t_evict(_Config) -> #{?snk_kind := crl_cache_evict} ), ?assertEqual([], ets:tab2list(Ref)), + emqx_config_handler:stop(), ok. t_cache(Config) ->