diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index acdb9ff96..c0116fd8c 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -57,6 +57,7 @@ ]). -export([pre_config_update/3, post_config_update/5]). +-export([create_listener/3, remove_listener/3, update_listener/3]). -export([format_bind/1]). @@ -65,8 +66,8 @@ -endif. -type listener_id() :: atom() | binary(). - --define(CONF_KEY_PATH, [listeners, '?', '?']). +-define(ROOT_KEY, listeners). +-define(CONF_KEY_PATH, [?ROOT_KEY, '?', '?']). -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). -define(MARK_DEL, ?TOMBSTONE_CONFIG_CHANGE_REQ). @@ -212,7 +213,10 @@ shutdown_count(_, _, _) -> start() -> %% The ?MODULE:start/0 will be called by emqx_app when emqx get started, %% so we install the config handler here. + %% callback when http api request ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), + %% callback when reload from config file + ok = emqx_config_handler:add_handler([?ROOT_KEY], ?MODULE), foreach_listeners(fun start_listener/3). -spec start_listener(listener_id()) -> ok | {error, term()}. @@ -287,7 +291,8 @@ restart_listener(Type, ListenerName, OldConf, NewConf) -> stop() -> %% The ?MODULE:stop/0 will be called by emqx_app when emqx is going to shutdown, %% so we uninstall the config handler here. - _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH), + ok = emqx_config_handler:remove_handler(?CONF_KEY_PATH), + ok = emqx_config_handler:remove_handler([?ROOT_KEY]), foreach_listeners(fun stop_listener/3). -spec stop_listener(listener_id()) -> ok | {error, term()}. @@ -463,50 +468,34 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> end. %% Update the listeners at runtime -pre_config_update([listeners, Type, Name], {create, NewConf}, V) when +pre_config_update([?ROOT_KEY, Type, Name], {create, NewConf}, V) when V =:= undefined orelse V =:= ?TOMBSTONE_VALUE -> - CertsDir = certs_dir(Type, Name), - {ok, convert_certs(CertsDir, NewConf)}; -pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) -> + {ok, convert_certs(Type, Name, NewConf)}; +pre_config_update([?ROOT_KEY, _Type, _Name], {create, _NewConf}, _RawConf) -> {error, already_exist}; -pre_config_update([listeners, _Type, _Name], {update, _Request}, undefined) -> +pre_config_update([?ROOT_KEY, _Type, _Name], {update, _Request}, undefined) -> {error, not_found}; -pre_config_update([listeners, Type, Name], {update, Request}, RawConf) -> - NewConfT = emqx_utils_maps:deep_merge(RawConf, Request), - NewConf = ensure_override_limiter_conf(NewConfT, Request), - CertsDir = certs_dir(Type, Name), - {ok, convert_certs(CertsDir, NewConf)}; -pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) -> - NewConf = emqx_utils_maps:deep_merge(RawConf, Updated), - {ok, NewConf}; -pre_config_update([listeners, _Type, _Name], ?MARK_DEL, _RawConf) -> +pre_config_update([?ROOT_KEY, Type, Name], {update, Request}, RawConf) -> + RawConf1 = emqx_utils_maps:deep_merge(RawConf, Request), + RawConf2 = ensure_override_limiter_conf(RawConf1, Request), + {ok, convert_certs(Type, Name, RawConf2)}; +pre_config_update([?ROOT_KEY, _Type, _Name], {action, _Action, Updated}, RawConf) -> + {ok, emqx_utils_maps:deep_merge(RawConf, Updated)}; +pre_config_update([?ROOT_KEY, _Type, _Name], ?MARK_DEL, _RawConf) -> {ok, ?TOMBSTONE_VALUE}; -pre_config_update(_Path, _Request, RawConf) -> - {ok, RawConf}. +pre_config_update([?ROOT_KEY], RawConf, RawConf) -> + {ok, RawConf}; +pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> + {ok, convert_certs(NewConf)}. -post_config_update([listeners, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) -> - start_listener(Type, Name, NewConf); -post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) -> - try_clear_ssl_files(certs_dir(Type, Name), NewConf, OldConf), - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - case NewConf of - #{enabled := true} -> restart_listener(Type, Name, {OldConf, NewConf}); - _ -> ok - end; -post_config_update([listeners, Type, Name], Op, _, OldConf, _AppEnvs) when - Op =:= ?MARK_DEL andalso is_map(OldConf) --> - ok = unregister_ocsp_stapling_refresh(Type, Name), - case stop_listener(Type, Name, OldConf) of - ok -> - _ = emqx_authentication:delete_chain(listener_id(Type, Name)), - CertsDir = certs_dir(Type, Name), - clear_certs(CertsDir, OldConf); - Err -> - Err - end; -post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) -> +post_config_update([?ROOT_KEY, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) -> + create_listener(Type, Name, NewConf); +post_config_update([?ROOT_KEY, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) -> + update_listener(Type, Name, {OldConf, NewConf}); +post_config_update([?ROOT_KEY, Type, Name], ?MARK_DEL, _, OldConf = #{}, _AppEnvs) -> + remove_listener(Type, Name, OldConf); +post_config_update([?ROOT_KEY, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) -> #{enabled := NewEnabled} = NewConf, #{enabled := OldEnabled} = OldConf, case {NewEnabled, OldEnabled} of @@ -523,9 +512,65 @@ post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldCo ok = unregister_ocsp_stapling_refresh(Type, Name), stop_listener(Type, Name, OldConf) end; +post_config_update([?ROOT_KEY], _Request, OldConf, OldConf, _AppEnvs) -> + ok; +post_config_update([?ROOT_KEY], _Request, NewConf, OldConf, _AppEnvs) -> + #{added := Added, removed := Removed, changed := Changed} = diff_confs(NewConf, OldConf), + Updated = lists:map(fun({{{T, N}, Old}, {_, New}}) -> {{T, N}, {Old, New}} end, Changed), + perform_listener_changes([ + {fun ?MODULE:remove_listener/3, Removed}, + {fun ?MODULE:update_listener/3, Updated}, + {fun ?MODULE:create_listener/3, Added} + ]); post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) -> ok. +create_listener(Type, Name, NewConf) -> + Res = start_listener(Type, Name, NewConf), + recreate_authenticator(Res, Type, Name, NewConf). + +recreate_authenticator(ok, Type, Name, Conf) -> + Chain = listener_id(Type, Name), + _ = emqx_authentication:delete_chain(Chain), + case maps:get(authentication, Conf, []) of + [] -> ok; + AuthN -> emqx_authentication:create_authenticator(Chain, AuthN) + end; +recreate_authenticator(Error, _Type, _Name, _NewConf) -> + Error. + +remove_listener(Type, Name, OldConf) -> + ok = unregister_ocsp_stapling_refresh(Type, Name), + case stop_listener(Type, Name, OldConf) of + ok -> + _ = emqx_authentication:delete_chain(listener_id(Type, Name)), + clear_certs(certs_dir(Type, Name), OldConf); + Err -> + Err + end. + +update_listener(Type, Name, {OldConf, NewConf}) -> + try_clear_ssl_files(certs_dir(Type, Name), NewConf, OldConf), + ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), + Res = restart_listener(Type, Name, {OldConf, NewConf}), + recreate_authenticator(Res, Type, Name, NewConf). + +perform_listener_changes([]) -> + ok; +perform_listener_changes([{Action, ConfL} | Tasks]) -> + case perform_listener_changes(Action, ConfL) of + ok -> perform_listener_changes(Tasks); + {error, Reason} -> {error, Reason} + end. + +perform_listener_changes(_Action, []) -> + ok; +perform_listener_changes(Action, [{{Type, Name}, Diff} | MapConf]) -> + case Action(Type, Name, Diff) of + ok -> perform_listener_changes(Action, MapConf); + {error, Reason} -> {error, Reason} + end. + esockd_opts(ListenerId, Type, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), Limiter = limiter(Opts0), @@ -701,6 +746,29 @@ del_limiter_bucket(Id, Conf) -> ) end. +diff_confs(NewConfs, OldConfs) -> + emqx_utils:diff_lists( + flatten_confs(NewConfs), + flatten_confs(OldConfs), + fun({Key, _}) -> Key end + ). + +flatten_confs(Conf0) -> + lists:flatmap( + fun({Type, Conf}) -> + do_flatten_confs(Type, Conf) + end, + maps:to_list(Conf0) + ). + +do_flatten_confs(Type, Conf0) -> + FilterFun = + fun + ({_Name, ?TOMBSTONE_TYPE}) -> false; + ({Name, Conf}) -> {true, {{Type, Name}, Conf}} + end, + lists:filtermap(FilterFun, maps:to_list(Conf0)). + enable_authn(Opts) -> maps:get(enable_authn, Opts, true). @@ -762,14 +830,32 @@ parse_bind(#{<<"bind">> := Bind}) -> certs_dir(Type, Name) -> iolist_to_binary(filename:join(["listeners", Type, Name])). -convert_certs(CertsDir, Conf) -> +convert_certs(ListenerConf) -> + maps:fold( + fun(Type, Listeners0, Acc) -> + Listeners1 = + maps:fold( + fun(Name, Conf, Acc1) -> + Acc1#{Name => convert_certs(Type, Name, Conf)} + end, + #{}, + Listeners0 + ), + Acc#{Type => Listeners1} + end, + #{}, + ListenerConf + ). + +convert_certs(Type, Name, Conf) -> + CertsDir = certs_dir(Type, Name), case emqx_tls_lib:ensure_ssl_files(CertsDir, get_ssl_options(Conf)) of {ok, undefined} -> Conf; {ok, SSL} -> Conf#{<<"ssl_options">> => SSL}; {error, Reason} -> - ?SLOG(error, Reason#{msg => "bad_ssl_config"}), + ?SLOG(error, Reason#{msg => "bad_ssl_config", type => Type, name => Name}), throw({bad_ssl_config, Reason}) end. @@ -791,13 +877,15 @@ try_clear_ssl_files(CertsDir, NewConf, OldConf) -> OldSSL = get_ssl_options(OldConf), emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL, OldSSL). -get_ssl_options(Conf) -> +get_ssl_options(Conf = #{}) -> case maps:find(ssl_options, Conf) of {ok, SSL} -> SSL; error -> maps:get(<<"ssl_options">>, Conf, undefined) - end. + end; +get_ssl_options(_) -> + undefined. %% @doc Get QUIC optional settings for low level tunings. %% @see quicer:quic_settings() @@ -889,8 +977,5 @@ unregister_ocsp_stapling_refresh(Type, Name) -> emqx_ocsp_cache:unregister_listener(ListenerId), ok. -%% There is currently an issue with frontend -%% infinity is not a good value for it, so we use 5m for now default_max_conn() -> - %% TODO: <<"infinity">> - 5_000_000. + <<"infinity">>. diff --git a/apps/emqx/test/emqx_listeners_update_SUITE.erl b/apps/emqx/test/emqx_listeners_update_SUITE.erl new file mode 100644 index 000000000..c16a26f3a --- /dev/null +++ b/apps/emqx/test/emqx_listeners_update_SUITE.erl @@ -0,0 +1,154 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_listeners_update_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_schema.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-import(emqx_listeners, [current_conns/2, is_running/1]). + +-define(LISTENERS, [listeners]). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_common_test_helpers:boot_modules(all), + emqx_common_test_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_common_test_helpers:stop_apps([]). + +init_per_testcase(_TestCase, Config) -> + Init = emqx:get_raw_config(?LISTENERS), + [{init_conf, Init} | Config]. + +end_per_testcase(_TestCase, Config) -> + Conf = ?config(init_conf, Config), + {ok, _} = emqx:update_config(?LISTENERS, Conf), + ok. + +t_default_conf(_Config) -> + ?assertMatch( + #{ + <<"tcp">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:1883">>}}, + <<"ssl">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8883">>}}, + <<"ws">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8083">>}}, + <<"wss">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8084">>}} + }, + emqx:get_raw_config(?LISTENERS) + ), + ?assertMatch( + #{ + tcp := #{default := #{bind := {{0, 0, 0, 0}, 1883}}}, + ssl := #{default := #{bind := {{0, 0, 0, 0}, 8883}}}, + ws := #{default := #{bind := {{0, 0, 0, 0}, 8083}}}, + wss := #{default := #{bind := {{0, 0, 0, 0}, 8084}}} + }, + emqx:get_config(?LISTENERS) + ), + ok. + +t_update_conf(_Conf) -> + Raw = emqx:get_raw_config(?LISTENERS), + Raw1 = emqx_utils_maps:deep_put( + [<<"tcp">>, <<"default">>, <<"bind">>], Raw, <<"127.0.0.1:1883">> + ), + Raw2 = emqx_utils_maps:deep_put( + [<<"ssl">>, <<"default">>, <<"bind">>], Raw1, <<"127.0.0.1:8883">> + ), + Raw3 = emqx_utils_maps:deep_put( + [<<"ws">>, <<"default">>, <<"bind">>], Raw2, <<"0.0.0.0:8083">> + ), + Raw4 = emqx_utils_maps:deep_put( + [<<"wss">>, <<"default">>, <<"bind">>], Raw3, <<"127.0.0.1:8084">> + ), + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw4)), + ?assertMatch( + #{ + <<"tcp">> := #{<<"default">> := #{<<"bind">> := <<"127.0.0.1:1883">>}}, + <<"ssl">> := #{<<"default">> := #{<<"bind">> := <<"127.0.0.1:8883">>}}, + <<"ws">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8083">>}}, + <<"wss">> := #{<<"default">> := #{<<"bind">> := <<"127.0.0.1:8084">>}} + }, + emqx:get_raw_config(?LISTENERS) + ), + BindTcp = {{127, 0, 0, 1}, 1883}, + BindSsl = {{127, 0, 0, 1}, 8883}, + BindWs = {{0, 0, 0, 0}, 8083}, + BindWss = {{127, 0, 0, 1}, 8084}, + ?assertMatch( + #{ + tcp := #{default := #{bind := BindTcp}}, + ssl := #{default := #{bind := BindSsl}}, + ws := #{default := #{bind := BindWs}}, + wss := #{default := #{bind := BindWss}} + }, + emqx:get_config(?LISTENERS) + ), + ?assertError(not_found, current_conns(<<"tcp:default">>, {{0, 0, 0, 0}, 1883})), + ?assertError(not_found, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})), + + ?assertEqual(0, current_conns(<<"tcp:default">>, BindTcp)), + ?assertEqual(0, current_conns(<<"ssl:default">>, BindSsl)), + + ?assertEqual({0, 0, 0, 0}, proplists:get_value(ip, ranch:info('ws:default'))), + ?assertEqual({127, 0, 0, 1}, proplists:get_value(ip, ranch:info('wss:default'))), + ?assert(is_running('ws:default')), + ?assert(is_running('wss:default')), + ok. + +t_add_delete_conf(_Conf) -> + Raw = emqx:get_raw_config(?LISTENERS), + %% add + #{<<"tcp">> := #{<<"default">> := Tcp}} = Raw, + NewBind = <<"127.0.0.1:1987">>, + Raw1 = emqx_utils_maps:deep_put([<<"tcp">>, <<"new">>], Raw, Tcp#{<<"bind">> => NewBind}), + Raw2 = emqx_utils_maps:deep_put([<<"ssl">>, <<"default">>], Raw1, ?TOMBSTONE_VALUE), + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw2)), + ?assertEqual(0, current_conns(<<"tcp:new">>, {{127, 0, 0, 1}, 1987})), + ?assertError(not_found, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})), + %% deleted + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw)), + ?assertError(not_found, current_conns(<<"tcp:new">>, {{127, 0, 0, 1}, 1987})), + ?assertEqual(0, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})), + ok. + +t_delete_default_conf(_Conf) -> + Raw = emqx:get_raw_config(?LISTENERS), + %% delete default listeners + Raw1 = emqx_utils_maps:deep_put([<<"tcp">>, <<"default">>], Raw, ?TOMBSTONE_VALUE), + Raw2 = emqx_utils_maps:deep_put([<<"ssl">>, <<"default">>], Raw1, ?TOMBSTONE_VALUE), + Raw3 = emqx_utils_maps:deep_put([<<"ws">>, <<"default">>], Raw2, ?TOMBSTONE_VALUE), + Raw4 = emqx_utils_maps:deep_put([<<"wss">>, <<"default">>], Raw3, ?TOMBSTONE_VALUE), + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw4)), + ?assertError(not_found, current_conns(<<"tcp:default">>, {{0, 0, 0, 0}, 1883})), + ?assertError(not_found, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})), + ?assertMatch({error, not_found}, is_running('ws:default')), + ?assertMatch({error, not_found}, is_running('wss:default')), + + %% reset + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw)), + ?assertEqual(0, current_conns(<<"tcp:default">>, {{0, 0, 0, 0}, 1883})), + ?assertEqual(0, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})), + ?assert(is_running('ws:default')), + ?assert(is_running('wss:default')), + ok. diff --git a/apps/emqx_exhook/src/emqx_exhook_app.erl b/apps/emqx_exhook/src/emqx_exhook_app.erl index 6e9bc5242..2fc562ae8 100644 --- a/apps/emqx_exhook/src/emqx_exhook_app.erl +++ b/apps/emqx_exhook/src/emqx_exhook_app.erl @@ -22,8 +22,7 @@ -export([ start/2, - stop/1, - prep_stop/1 + stop/1 ]). %%-------------------------------------------------------------------- @@ -34,10 +33,6 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_exhook_sup:start_link(), {ok, Sup}. -prep_stop(State) -> - emqx_ctl:unregister_command(exhook), - State. - stop(_State) -> ok. diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index 0647c80ea..173a1fecb 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -23,6 +23,9 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-define(SERVERS, [exhook, servers]). +-define(EXHOOK, [exhook]). + %% APIs -export([start_link/0]). @@ -148,7 +151,7 @@ update_config(KeyPath, UpdateReq) -> Error end. -pre_config_update(_, {add, #{<<"name">> := Name} = Conf}, OldConf) -> +pre_config_update(?SERVERS, {add, #{<<"name">> := Name} = Conf}, OldConf) -> case lists:any(fun(#{<<"name">> := ExistedName}) -> ExistedName =:= Name end, OldConf) of true -> throw(already_exists); @@ -156,48 +159,36 @@ pre_config_update(_, {add, #{<<"name">> := Name} = Conf}, OldConf) -> NConf = maybe_write_certs(Conf), {ok, OldConf ++ [NConf]} end; -pre_config_update(_, {update, Name, Conf}, OldConf) -> - case replace_conf(Name, fun(_) -> Conf end, OldConf) of - not_found -> throw(not_found); - NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)} - end; -pre_config_update(_, {delete, ToDelete}, OldConf) -> - case do_delete(ToDelete, OldConf) of - not_found -> throw(not_found); - NewConf -> {ok, NewConf} - end; -pre_config_update(_, {move, Name, Position}, OldConf) -> - case do_move(Name, Position, OldConf) of - not_found -> throw(not_found); - NewConf -> {ok, NewConf} - end; -pre_config_update(_, {enable, Name, Enable}, OldConf) -> - case - replace_conf( - Name, - fun(Conf) -> Conf#{<<"enable">> => Enable} end, - OldConf - ) - of - not_found -> throw(not_found); - NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)} - end. +pre_config_update(?SERVERS, {update, Name, Conf}, OldConf) -> + NewConf = replace_conf(Name, fun(_) -> Conf end, OldConf), + {ok, lists:map(fun maybe_write_certs/1, NewConf)}; +pre_config_update(?SERVERS, {delete, ToDelete}, OldConf) -> + {ok, do_delete(ToDelete, OldConf)}; +pre_config_update(?SERVERS, {move, Name, Position}, OldConf) -> + {ok, do_move(Name, Position, OldConf)}; +pre_config_update(?SERVERS, {enable, Name, Enable}, OldConf) -> + ReplaceFun = fun(Conf) -> Conf#{<<"enable">> => Enable} end, + NewConf = replace_conf(Name, ReplaceFun, OldConf), + {ok, lists:map(fun maybe_write_certs/1, NewConf)}; +pre_config_update(?EXHOOK, NewConf, _OldConf) when NewConf =:= #{} -> + {ok, NewConf#{<<"servers">> => []}}; +pre_config_update(?EXHOOK, NewConf = #{<<"servers">> := Servers}, _OldConf) -> + {ok, NewConf#{<<"servers">> => lists:map(fun maybe_write_certs/1, Servers)}}. post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) -> - Result = call({update_config, UpdateReq, NewConf}), + Result = call({update_config, UpdateReq, NewConf, OldConf}), try_clear_ssl_files(UpdateReq, NewConf, OldConf), {ok, Result}. -%%===================================================================== - %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> process_flag(trap_exit, true), - emqx_conf:add_handler([exhook, servers], ?MODULE), - ServerL = emqx:get_config([exhook, servers]), + emqx_conf:add_handler(?EXHOOK, ?MODULE), + emqx_conf:add_handler(?SERVERS, ?MODULE), + ServerL = emqx:get_config(?SERVERS), Servers = load_all_servers(ServerL), Servers2 = reorder(ServerL, Servers), refresh_tick(), @@ -222,22 +213,16 @@ handle_call( OrderServers = sort_name_by_order(Infos, Servers), {reply, OrderServers, State}; handle_call( - {update_config, {move, _Name, _Position}, NewConfL}, + {update_config, {move, _Name, _Position}, NewConfL, _}, _From, #{servers := Servers} = State ) -> Servers2 = reorder(NewConfL, Servers), {reply, ok, State#{servers := Servers2}}; -handle_call({update_config, {delete, ToDelete}, _}, _From, State) -> - emqx_exhook_metrics:on_server_deleted(ToDelete), - - #{servers := Servers} = State2 = do_unload_server(ToDelete, State), - - Servers2 = maps:remove(ToDelete, Servers), - - {reply, ok, update_servers(Servers2, State2)}; +handle_call({update_config, {delete, ToDelete}, _, _}, _From, State) -> + {reply, ok, remove_server(ToDelete, State)}; handle_call( - {update_config, {add, RawConf}, NewConfL}, + {update_config, {add, RawConf}, NewConfL, _}, _From, #{servers := Servers} = State ) -> @@ -246,14 +231,30 @@ handle_call( Servers2 = Servers#{Name => Server}, Servers3 = reorder(NewConfL, Servers2), {reply, Result, State#{servers := Servers3}}; +handle_call({update_config, {update, Name, _Conf}, NewConfL, _}, _From, State) -> + {Result, State2} = restart_server(Name, NewConfL, State), + {reply, Result, State2}; +handle_call({update_config, {enable, Name, _Enable}, NewConfL, _}, _From, State) -> + {Result, State2} = restart_server(Name, NewConfL, State), + {reply, Result, State2}; +handle_call({update_config, _, ConfL, ConfL}, _From, State) -> + {reply, ok, State}; +handle_call({update_config, _, #{servers := NewConfL}, #{servers := OldConfL}}, _From, State) -> + #{ + removed := Removed, + added := Added, + changed := Updated + } = emqx_utils:diff_lists(NewConfL, OldConfL, fun(#{name := Name}) -> Name end), + State2 = remove_servers(Removed, State), + {UpdateRes, State3} = restart_servers(Updated, NewConfL, State2), + {AddRes, State4 = #{servers := Servers4}} = add_servers(Added, State3), + State5 = State4#{servers => reorder(NewConfL, Servers4)}, + case UpdateRes =:= [] andalso AddRes =:= [] of + true -> {reply, ok, State5}; + false -> {reply, {error, #{added => AddRes, updated => UpdateRes}}, State5} + end; handle_call({lookup, Name}, _From, State) -> {reply, where_is_server(Name, State), State}; -handle_call({update_config, {update, Name, _Conf}, NewConfL}, _From, State) -> - {Result, State2} = restart_server(Name, NewConfL, State), - {reply, Result, State2}; -handle_call({update_config, {enable, Name, _Enable}, NewConfL}, _From, State) -> - {Result, State2} = restart_server(Name, NewConfL, State), - {reply, Result, State2}; handle_call({server_info, Name}, _From, State) -> case where_is_server(Name, State) of not_found -> @@ -287,6 +288,22 @@ handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. +remove_servers(Removes, State) -> + lists:foldl( + fun(Conf, Acc) -> + ToDelete = maps:get(name, Conf), + remove_server(ToDelete, Acc) + end, + State, + Removes + ). + +remove_server(ToDelete, State) -> + emqx_exhook_metrics:on_server_deleted(ToDelete), + #{servers := Servers} = State2 = do_unload_server(ToDelete, State), + Servers2 = maps:remove(ToDelete, Servers), + update_servers(Servers2, State2). + handle_cast(_Msg, State) -> {noreply, State}. @@ -310,6 +327,8 @@ terminate(Reason, State = #{servers := Servers}) -> Servers ), ?tp(info, exhook_mgr_terminated, #{reason => Reason, servers => Servers}), + emqx_conf:remove_handler(?SERVERS), + emqx_conf:remove_handler(?EXHOOK), ok. code_change(_OldVsn, State, _Extra) -> @@ -325,6 +344,22 @@ unload_exhooks() -> || {Name, {M, F, _A}} <- ?ENABLED_HOOKS ]. +add_servers(Added, State) -> + lists:foldl( + fun(Conf = #{name := Name}, {ResAcc, StateAcc}) -> + case do_load_server(options_to_server(Conf)) of + {ok, Server} -> + #{servers := Servers} = StateAcc, + Servers2 = Servers#{Name => Server}, + {ResAcc, update_servers(Servers2, StateAcc)}; + {Err, StateAcc1} -> + {[Err | ResAcc], StateAcc1} + end + end, + {[], State}, + Added + ). + do_load_server(#{name := Name} = Server) -> case emqx_exhook_server:load(Name, Server) of {ok, ServerState} -> @@ -401,8 +436,7 @@ clean_reload_timer(#{timer := Timer}) -> _ = erlang:cancel_timer(Timer), ok. --spec do_move(binary(), position(), list(server_options())) -> - not_found | list(server_options()). +-spec do_move(binary(), position(), list(server_options())) -> list(server_options()). do_move(Name, Position, ConfL) -> move(ConfL, Name, Position, []). @@ -411,7 +445,7 @@ move([#{<<"name">> := Name} = Server | T], Name, Position, HeadL) -> move([Server | T], Name, Position, HeadL) -> move(T, Name, Position, [Server | HeadL]); move([], _Name, _Position, _HeadL) -> - not_found. + throw(not_found). move_to(?CMD_MOVE_FRONT, Server, ServerL) -> [Server | ServerL]; @@ -429,8 +463,7 @@ move_to([H | T], Position, Server, HeadL) -> move_to([], _Position, _Server, _HeadL) -> not_found. --spec do_delete(binary(), list(server_options())) -> - not_found | list(server_options()). +-spec do_delete(binary(), list(server_options())) -> list(server_options()). do_delete(ToDelete, OldConf) -> case lists:any(fun(#{<<"name">> := ExistedName}) -> ExistedName =:= ToDelete end, OldConf) of true -> @@ -439,7 +472,7 @@ do_delete(ToDelete, OldConf) -> OldConf ); false -> - not_found + throw(not_found) end. -spec reorder(list(server_options()), servers()) -> servers(). @@ -471,9 +504,7 @@ where_is_server(Name, #{servers := Servers}) -> -type replace_fun() :: fun((server_options()) -> server_options()). --spec replace_conf(binary(), replace_fun(), list(server_options())) -> - not_found - | list(server_options()). +-spec replace_conf(binary(), replace_fun(), list(server_options())) -> list(server_options()). replace_conf(Name, ReplaceFun, ConfL) -> replace_conf(ConfL, Name, ReplaceFun, []). @@ -483,7 +514,20 @@ replace_conf([#{<<"name">> := Name} = H | T], Name, ReplaceFun, HeadL) -> replace_conf([H | T], Name, ReplaceFun, HeadL) -> replace_conf(T, Name, ReplaceFun, [H | HeadL]); replace_conf([], _, _, _) -> - not_found. + throw(not_found). + +restart_servers(Updated, NewConfL, State) -> + lists:foldl( + fun({_Old, Conf}, {ResAcc, StateAcc}) -> + Name = maps:get(name, Conf), + case restart_server(Name, NewConfL, StateAcc) of + {ok, StateAcc1} -> {ResAcc, StateAcc1}; + {Err, StateAcc1} -> {[Err | ResAcc], StateAcc1} + end + end, + {[], State}, + Updated + ). -spec restart_server(binary(), list(server_options()), state()) -> {ok, state()} @@ -612,6 +656,16 @@ try_clear_ssl_files({Op, Name, _}, NewConfs, OldConfs) when NewSSL = find_server_ssl_cfg(Name, NewConfs), OldSSL = find_server_ssl_cfg(Name, OldConfs), emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL); +%% replace the whole config from the cli +try_clear_ssl_files(_Req, #{servers := NewServers}, #{servers := OldServers}) -> + lists:foreach( + fun(#{name := Name} = Conf) -> + NewSSL = find_server_ssl_cfg(Name, NewServers), + OldSSL = maps:get(ssl, Conf, undefined), + emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL) + end, + OldServers + ); try_clear_ssl_files(_Req, _NewConf, _OldConf) -> ok. diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index ff313c8c8..d9297fff6 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -196,9 +196,9 @@ t_error_update_conf(_) -> Path = [exhook, servers], Name = <<"error_update">>, ErrorCfg = #{<<"name">> => Name}, - {error, _} = emqx_exhook_mgr:update_config(Path, {update, Name, ErrorCfg}), - {error, _} = emqx_exhook_mgr:update_config(Path, {move, Name, top, <<>>}), - {error, _} = emqx_exhook_mgr:update_config(Path, {enable, Name, true}), + {error, not_found} = emqx_exhook_mgr:update_config(Path, {update, Name, ErrorCfg}), + {error, not_found} = emqx_exhook_mgr:update_config(Path, {move, Name, top}), + {error, not_found} = emqx_exhook_mgr:update_config(Path, {enable, Name, true}), ErrorAnd = #{<<"name">> => Name, <<"url">> => <<"http://127.0.0.1:9001">>}, {ok, _} = emqx_exhook_mgr:update_config(Path, {add, ErrorAnd}), @@ -210,12 +210,37 @@ t_error_update_conf(_) -> }, {ok, _} = emqx_exhook_mgr:update_config(Path, {update, Name, DisableAnd}), - {ok, _} = emqx_exhook_mgr:update_config(Path, {delete, <<"error">>}), - {error, not_found} = emqx_exhook_mgr:update_config( - Path, {delete, <<"delete_not_exists">>} - ), + {ok, _} = emqx_exhook_mgr:update_config(Path, {delete, Name}), + {error, not_found} = emqx_exhook_mgr:update_config(Path, {delete, Name}), ok. +t_update_conf(_Config) -> + Path = [exhook], + Conf = #{<<"servers">> := Servers} = emqx_config:get_raw(Path), + ?assert(length(Servers) > 1), + Servers1 = shuffle(Servers), + ReOrderedConf = Conf#{<<"servers">> => Servers1}, + validate_servers(Path, ReOrderedConf, Servers1), + [_ | Servers2] = Servers, + DeletedConf = Conf#{<<"servers">> => Servers2}, + validate_servers(Path, DeletedConf, Servers2), + [L1, L2 | Servers3] = Servers, + UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => 1000}, + UpdatedServers = [L1, UpdateL2 | Servers3], + UpdatedConf = Conf#{<<"servers">> => UpdatedServers}, + validate_servers(Path, UpdatedConf, UpdatedServers), + %% reset + validate_servers(Path, Conf, Servers), + ok. + +validate_servers(Path, ReOrderConf, Servers1) -> + {ok, _} = emqx_exhook_mgr:update_config(Path, ReOrderConf), + ?assertEqual(ReOrderConf, emqx_config:get_raw(Path)), + List = emqx_exhook_mgr:list(), + ExpectL = lists:map(fun(#{<<"name">> := Name}) -> Name end, Servers1), + L1 = lists:map(fun(#{name := Name}) -> Name end, List), + ?assertEqual(ExpectL, L1). + t_error_server_info(_) -> not_found = emqx_exhook_mgr:server_info(<<"not_exists">>), ok. @@ -490,6 +515,10 @@ data_file(Name) -> cert_file(Name) -> data_file(filename:join(["certs", Name])). -%% FIXME: this creats inter-test dependency +%% FIXME: this creates inter-test dependency stop_apps(Apps) -> emqx_common_test_helpers:stop_apps(Apps, #{erase_all_configs => false}). + +shuffle(List) -> + Sorted = lists:sort(lists:map(fun(L) -> {rand:uniform(), L} end, List)), + lists:map(fun({_, L}) -> L end, Sorted). diff --git a/apps/emqx_utils/src/emqx_utils.app.src b/apps/emqx_utils/src/emqx_utils.app.src index 605093875..0b172565a 100644 --- a/apps/emqx_utils/src/emqx_utils.app.src +++ b/apps/emqx_utils/src/emqx_utils.app.src @@ -2,7 +2,7 @@ {application, emqx_utils, [ {description, "Miscellaneous utilities for EMQX apps"}, % strict semver, bump manually! - {vsn, "5.0.2"}, + {vsn, "5.0.3"}, {modules, [ emqx_utils, emqx_utils_api, diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 2c6ddd9c1..edc76a1e6 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -54,7 +54,8 @@ safe_to_existing_atom/1, safe_to_existing_atom/2, pub_props_to_packet/1, - safe_filename/1 + safe_filename/1, + diff_lists/3 ]). -export([ @@ -748,3 +749,152 @@ safe_filename(Filename) when is_binary(Filename) -> binary:replace(Filename, <<":">>, <<"-">>, [global]); safe_filename(Filename) when is_list(Filename) -> lists:flatten(string:replace(Filename, ":", "-", all)). + +%% @doc Compares two lists of maps and returns the differences between them in a +%% map containing four keys – 'removed', 'added', 'identical', and 'changed' – +%% each holding a list of maps. Elements are compared using key function KeyFunc +%% to extract the comparison key used for matching. +%% +%% The return value is a map with the following keys and the list of maps as its values: +%% * 'removed' – a list of maps that were present in the Old list, but not found in the New list. +%% * 'added' – a list of maps that were present in the New list, but not found in the Old list. +%% * 'identical' – a list of maps that were present in both lists and have the same comparison key value. +%% * 'changed' – a list of pairs of maps representing the changes between maps present in the New and Old lists. +%% The first map in the pair represents the map in the Old list, and the second map +%% represents the potential modification in the New list. + +%% The KeyFunc parameter is a function that extracts the comparison key used +%% for matching from each map. The function should return a comparable term, +%% such as an atom, a number, or a string. This is used to determine if each +%% element is the same in both lists. + +-spec diff_lists(list(T), list(T), Func) -> + #{ + added := list(T), + identical := list(T), + removed := list(T), + changed := list({Old :: T, New :: T}) + } +when + Func :: fun((T) -> any()), + T :: any(). + +diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) -> + Removed = + lists:foldl( + fun(E, RemovedAcc) -> + case search(KeyFunc(E), KeyFunc, New) of + false -> [E | RemovedAcc]; + _ -> RemovedAcc + end + end, + [], + Old + ), + {Added, Identical, Changed} = + lists:foldl( + fun(E, Acc) -> + {Added0, Identical0, Changed0} = Acc, + case search(KeyFunc(E), KeyFunc, Old) of + false -> + {[E | Added0], Identical0, Changed0}; + E -> + {Added0, [E | Identical0], Changed0}; + E1 -> + {Added0, Identical0, [{E1, E} | Changed0]} + end + end, + {[], [], []}, + New + ), + #{ + removed => lists:reverse(Removed), + added => lists:reverse(Added), + identical => lists:reverse(Identical), + changed => lists:reverse(Changed) + }. + +search(_ExpectValue, _KeyFunc, []) -> + false; +search(ExpectValue, KeyFunc, [Item | List]) -> + case KeyFunc(Item) =:= ExpectValue of + true -> Item; + false -> search(ExpectValue, KeyFunc, List) + end. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +diff_lists_test() -> + KeyFunc = fun(#{name := Name}) -> Name end, + ?assertEqual( + #{ + removed => [], + added => [], + identical => [], + changed => [] + }, + diff_lists([], [], KeyFunc) + ), + %% test removed list + ?assertEqual( + #{ + removed => [#{name => a, value => 1}], + added => [], + identical => [], + changed => [] + }, + diff_lists([], [#{name => a, value => 1}], KeyFunc) + ), + %% test added list + ?assertEqual( + #{ + removed => [], + added => [#{name => a, value => 1}], + identical => [], + changed => [] + }, + diff_lists([#{name => a, value => 1}], [], KeyFunc) + ), + %% test identical list + ?assertEqual( + #{ + removed => [], + added => [], + identical => [#{name => a, value => 1}], + changed => [] + }, + diff_lists([#{name => a, value => 1}], [#{name => a, value => 1}], KeyFunc) + ), + Old = [ + #{name => a, value => 1}, + #{name => b, value => 4}, + #{name => e, value => 2}, + #{name => d, value => 4} + ], + New = [ + #{name => a, value => 1}, + #{name => b, value => 2}, + #{name => e, value => 2}, + #{name => c, value => 3} + ], + Diff = diff_lists(New, Old, KeyFunc), + ?assertEqual( + #{ + added => [ + #{name => c, value => 3} + ], + identical => [ + #{name => a, value => 1}, + #{name => e, value => 2} + ], + removed => [ + #{name => d, value => 4} + ], + changed => [{#{name => b, value => 4}, #{name => b, value => 2}}] + }, + Diff + ), + ok. + +-endif.