diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index 573dded9e..173a1fecb 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -23,6 +23,9 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-define(SERVERS, [exhook, servers]). +-define(EXHOOK, [exhook]). + %% APIs -export([start_link/0]). @@ -148,7 +151,7 @@ update_config(KeyPath, UpdateReq) -> Error end. -pre_config_update(_, {add, #{<<"name">> := Name} = Conf}, OldConf) -> +pre_config_update(?SERVERS, {add, #{<<"name">> := Name} = Conf}, OldConf) -> case lists:any(fun(#{<<"name">> := ExistedName}) -> ExistedName =:= Name end, OldConf) of true -> throw(already_exists); @@ -156,35 +159,20 @@ pre_config_update(_, {add, #{<<"name">> := Name} = Conf}, OldConf) -> NConf = maybe_write_certs(Conf), {ok, OldConf ++ [NConf]} end; -pre_config_update(_, {update, Name, Conf}, OldConf) -> - case replace_conf(Name, fun(_) -> Conf end, OldConf) of - not_found -> throw(not_found); - NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)} - end; -pre_config_update(_, {delete, ToDelete}, OldConf) -> - case do_delete(ToDelete, OldConf) of - not_found -> throw(not_found); - NewConf -> {ok, NewConf} - end; -pre_config_update(_, {move, Name, Position}, OldConf) -> - case do_move(Name, Position, OldConf) of - not_found -> throw(not_found); - NewConf -> {ok, NewConf} - end; -pre_config_update(_, {enable, Name, Enable}, OldConf) -> - case - replace_conf( - Name, - fun(Conf) -> Conf#{<<"enable">> => Enable} end, - OldConf - ) - of - not_found -> throw(not_found); - NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)} - end; -pre_config_update(_, NewConf, _OldConf) when NewConf =:= #{} -> +pre_config_update(?SERVERS, {update, Name, Conf}, OldConf) -> + NewConf = replace_conf(Name, fun(_) -> Conf end, OldConf), + {ok, lists:map(fun maybe_write_certs/1, NewConf)}; +pre_config_update(?SERVERS, {delete, ToDelete}, OldConf) -> + {ok, do_delete(ToDelete, OldConf)}; +pre_config_update(?SERVERS, {move, Name, Position}, OldConf) -> + {ok, do_move(Name, Position, OldConf)}; +pre_config_update(?SERVERS, {enable, Name, Enable}, OldConf) -> + ReplaceFun = fun(Conf) -> Conf#{<<"enable">> => Enable} end, + NewConf = replace_conf(Name, ReplaceFun, OldConf), + {ok, lists:map(fun maybe_write_certs/1, NewConf)}; +pre_config_update(?EXHOOK, NewConf, _OldConf) when NewConf =:= #{} -> {ok, NewConf#{<<"servers">> => []}}; -pre_config_update(_, NewConf = #{<<"servers">> := Servers}, _OldConf) -> +pre_config_update(?EXHOOK, NewConf = #{<<"servers">> := Servers}, _OldConf) -> {ok, NewConf#{<<"servers">> => lists:map(fun maybe_write_certs/1, Servers)}}. post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) -> @@ -192,17 +180,15 @@ post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) -> try_clear_ssl_files(UpdateReq, NewConf, OldConf), {ok, Result}. -%%===================================================================== - %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> process_flag(trap_exit, true), - emqx_conf:add_handler([exhook, servers], ?MODULE), - emqx_conf:add_handler([exhook], ?MODULE), - ServerL = emqx:get_config([exhook, servers]), + emqx_conf:add_handler(?EXHOOK, ?MODULE), + emqx_conf:add_handler(?SERVERS, ?MODULE), + ServerL = emqx:get_config(?SERVERS), Servers = load_all_servers(ServerL), Servers2 = reorder(ServerL, Servers), refresh_tick(), @@ -259,51 +245,13 @@ handle_call({update_config, _, #{servers := NewConfL}, #{servers := OldConfL}}, added := Added, changed := Updated } = emqx_utils:diff_lists(NewConfL, OldConfL, fun(#{name := Name}) -> Name end), - %% remove servers - State2 = lists:foldl( - fun(Conf, Acc) -> - ToDelete = maps:get(name, Conf), - remove_server(ToDelete, Acc) - end, - State, - Removed - ), - %% update servers - {UpdateRes, State3} = - lists:foldl( - fun({_Old, Conf}, {ResAcc, StateAcc}) -> - Name = maps:get(name, Conf), - case restart_server(Name, NewConfL, StateAcc) of - {ok, StateAcc1} -> {ResAcc, StateAcc1}; - {Err, StateAcc1} -> {[Err | ResAcc], StateAcc1} - end - end, - {[], State2}, - Updated - ), - %% Add servers - {AddRes, State4} = - lists:foldl( - fun(Conf, {ResAcc, StateAcc}) -> - case do_load_server(options_to_server(Conf)) of - {ok, Server} -> - #{servers := Servers} = StateAcc, - Name = maps:get(name, Conf), - Servers2 = Servers#{Name => Server}, - {ResAcc, update_servers(Servers2, StateAcc)}; - {Err, StateAcc1} -> - {[Err | ResAcc], StateAcc1} - end - end, - {[], State3}, - Added - ), - %% update order - #{servers := Servers4} = State4, + State2 = remove_servers(Removed, State), + {UpdateRes, State3} = restart_servers(Updated, NewConfL, State2), + {AddRes, State4 = #{servers := Servers4}} = add_servers(Added, State3), State5 = State4#{servers => reorder(NewConfL, Servers4)}, - case lists:append([UpdateRes, AddRes]) of - [] -> {reply, ok, State5}; - _ -> {reply, {error, #{added => AddRes, updated => UpdateRes}}, State5} + case UpdateRes =:= [] andalso AddRes =:= [] of + true -> {reply, ok, State5}; + false -> {reply, {error, #{added => AddRes, updated => UpdateRes}}, State5} end; handle_call({lookup, Name}, _From, State) -> {reply, where_is_server(Name, State), State}; @@ -340,6 +288,16 @@ handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. +remove_servers(Removes, State) -> + lists:foldl( + fun(Conf, Acc) -> + ToDelete = maps:get(name, Conf), + remove_server(ToDelete, Acc) + end, + State, + Removes + ). + remove_server(ToDelete, State) -> emqx_exhook_metrics:on_server_deleted(ToDelete), #{servers := Servers} = State2 = do_unload_server(ToDelete, State), @@ -369,8 +327,8 @@ terminate(Reason, State = #{servers := Servers}) -> Servers ), ?tp(info, exhook_mgr_terminated, #{reason => Reason, servers => Servers}), - emqx_conf:remove_handler([exhook, servers]), - emqx_conf:remove_handler([exhook]), + emqx_conf:remove_handler(?SERVERS), + emqx_conf:remove_handler(?EXHOOK), ok. code_change(_OldVsn, State, _Extra) -> @@ -386,6 +344,22 @@ unload_exhooks() -> || {Name, {M, F, _A}} <- ?ENABLED_HOOKS ]. +add_servers(Added, State) -> + lists:foldl( + fun(Conf = #{name := Name}, {ResAcc, StateAcc}) -> + case do_load_server(options_to_server(Conf)) of + {ok, Server} -> + #{servers := Servers} = StateAcc, + Servers2 = Servers#{Name => Server}, + {ResAcc, update_servers(Servers2, StateAcc)}; + {Err, StateAcc1} -> + {[Err | ResAcc], StateAcc1} + end + end, + {[], State}, + Added + ). + do_load_server(#{name := Name} = Server) -> case emqx_exhook_server:load(Name, Server) of {ok, ServerState} -> @@ -462,8 +436,7 @@ clean_reload_timer(#{timer := Timer}) -> _ = erlang:cancel_timer(Timer), ok. --spec do_move(binary(), position(), list(server_options())) -> - not_found | list(server_options()). +-spec do_move(binary(), position(), list(server_options())) -> list(server_options()). do_move(Name, Position, ConfL) -> move(ConfL, Name, Position, []). @@ -472,7 +445,7 @@ move([#{<<"name">> := Name} = Server | T], Name, Position, HeadL) -> move([Server | T], Name, Position, HeadL) -> move(T, Name, Position, [Server | HeadL]); move([], _Name, _Position, _HeadL) -> - not_found. + throw(not_found). move_to(?CMD_MOVE_FRONT, Server, ServerL) -> [Server | ServerL]; @@ -490,8 +463,7 @@ move_to([H | T], Position, Server, HeadL) -> move_to([], _Position, _Server, _HeadL) -> not_found. --spec do_delete(binary(), list(server_options())) -> - not_found | list(server_options()). +-spec do_delete(binary(), list(server_options())) -> list(server_options()). do_delete(ToDelete, OldConf) -> case lists:any(fun(#{<<"name">> := ExistedName}) -> ExistedName =:= ToDelete end, OldConf) of true -> @@ -500,7 +472,7 @@ do_delete(ToDelete, OldConf) -> OldConf ); false -> - not_found + throw(not_found) end. -spec reorder(list(server_options()), servers()) -> servers(). @@ -532,9 +504,7 @@ where_is_server(Name, #{servers := Servers}) -> -type replace_fun() :: fun((server_options()) -> server_options()). --spec replace_conf(binary(), replace_fun(), list(server_options())) -> - not_found - | list(server_options()). +-spec replace_conf(binary(), replace_fun(), list(server_options())) -> list(server_options()). replace_conf(Name, ReplaceFun, ConfL) -> replace_conf(ConfL, Name, ReplaceFun, []). @@ -544,7 +514,20 @@ replace_conf([#{<<"name">> := Name} = H | T], Name, ReplaceFun, HeadL) -> replace_conf([H | T], Name, ReplaceFun, HeadL) -> replace_conf(T, Name, ReplaceFun, [H | HeadL]); replace_conf([], _, _, _) -> - not_found. + throw(not_found). + +restart_servers(Updated, NewConfL, State) -> + lists:foldl( + fun({_Old, Conf}, {ResAcc, StateAcc}) -> + Name = maps:get(name, Conf), + case restart_server(Name, NewConfL, StateAcc) of + {ok, StateAcc1} -> {ResAcc, StateAcc1}; + {Err, StateAcc1} -> {[Err | ResAcc], StateAcc1} + end + end, + {[], State}, + Updated + ). -spec restart_server(binary(), list(server_options()), state()) -> {ok, state()} @@ -673,7 +656,7 @@ try_clear_ssl_files({Op, Name, _}, NewConfs, OldConfs) when NewSSL = find_server_ssl_cfg(Name, NewConfs), OldSSL = find_server_ssl_cfg(Name, OldConfs), emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL); -%% replace the whole config from the file +%% replace the whole config from the cli try_clear_ssl_files(_Req, #{servers := NewServers}, #{servers := OldServers}) -> lists:foreach( fun(#{name := Name} = Conf) -> @@ -682,7 +665,9 @@ try_clear_ssl_files(_Req, #{servers := NewServers}, #{servers := OldServers}) -> emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL) end, OldServers - ). + ); +try_clear_ssl_files(_Req, _NewConf, _OldConf) -> + ok. search_server_cfg(Name, Confs) -> lists:search( diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index ff313c8c8..d9297fff6 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -196,9 +196,9 @@ t_error_update_conf(_) -> Path = [exhook, servers], Name = <<"error_update">>, ErrorCfg = #{<<"name">> => Name}, - {error, _} = emqx_exhook_mgr:update_config(Path, {update, Name, ErrorCfg}), - {error, _} = emqx_exhook_mgr:update_config(Path, {move, Name, top, <<>>}), - {error, _} = emqx_exhook_mgr:update_config(Path, {enable, Name, true}), + {error, not_found} = emqx_exhook_mgr:update_config(Path, {update, Name, ErrorCfg}), + {error, not_found} = emqx_exhook_mgr:update_config(Path, {move, Name, top}), + {error, not_found} = emqx_exhook_mgr:update_config(Path, {enable, Name, true}), ErrorAnd = #{<<"name">> => Name, <<"url">> => <<"http://127.0.0.1:9001">>}, {ok, _} = emqx_exhook_mgr:update_config(Path, {add, ErrorAnd}), @@ -210,12 +210,37 @@ t_error_update_conf(_) -> }, {ok, _} = emqx_exhook_mgr:update_config(Path, {update, Name, DisableAnd}), - {ok, _} = emqx_exhook_mgr:update_config(Path, {delete, <<"error">>}), - {error, not_found} = emqx_exhook_mgr:update_config( - Path, {delete, <<"delete_not_exists">>} - ), + {ok, _} = emqx_exhook_mgr:update_config(Path, {delete, Name}), + {error, not_found} = emqx_exhook_mgr:update_config(Path, {delete, Name}), ok. +t_update_conf(_Config) -> + Path = [exhook], + Conf = #{<<"servers">> := Servers} = emqx_config:get_raw(Path), + ?assert(length(Servers) > 1), + Servers1 = shuffle(Servers), + ReOrderedConf = Conf#{<<"servers">> => Servers1}, + validate_servers(Path, ReOrderedConf, Servers1), + [_ | Servers2] = Servers, + DeletedConf = Conf#{<<"servers">> => Servers2}, + validate_servers(Path, DeletedConf, Servers2), + [L1, L2 | Servers3] = Servers, + UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => 1000}, + UpdatedServers = [L1, UpdateL2 | Servers3], + UpdatedConf = Conf#{<<"servers">> => UpdatedServers}, + validate_servers(Path, UpdatedConf, UpdatedServers), + %% reset + validate_servers(Path, Conf, Servers), + ok. + +validate_servers(Path, ReOrderConf, Servers1) -> + {ok, _} = emqx_exhook_mgr:update_config(Path, ReOrderConf), + ?assertEqual(ReOrderConf, emqx_config:get_raw(Path)), + List = emqx_exhook_mgr:list(), + ExpectL = lists:map(fun(#{<<"name">> := Name}) -> Name end, Servers1), + L1 = lists:map(fun(#{name := Name}) -> Name end, List), + ?assertEqual(ExpectL, L1). + t_error_server_info(_) -> not_found = emqx_exhook_mgr:server_info(<<"not_exists">>), ok. @@ -490,6 +515,10 @@ data_file(Name) -> cert_file(Name) -> data_file(filename:join(["certs", Name])). -%% FIXME: this creats inter-test dependency +%% FIXME: this creates inter-test dependency stop_apps(Apps) -> emqx_common_test_helpers:stop_apps(Apps, #{erase_all_configs => false}). + +shuffle(List) -> + Sorted = lists:sort(lists:map(fun(L) -> {rand:uniform(), L} end, List)), + lists:map(fun({_, L}) -> L end, Sorted).