From bd29433997df051f4eeb593dfa955dac745e54a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Sun, 28 May 2023 23:16:37 +0800 Subject: [PATCH 01/17] feat: support emqx_conf:update([exhook],Conf) --- apps/emqx_exhook/src/emqx_exhook.app.src | 2 +- apps/emqx_exhook/src/emqx_exhook_app.erl | 7 +- apps/emqx_exhook/src/emqx_exhook_mgr.erl | 109 +++++++++++++--- apps/emqx_utils/src/emqx_utils.app.src | 2 +- apps/emqx_utils/src/emqx_utils.erl | 152 ++++++++++++++++++++++- 5 files changed, 243 insertions(+), 29 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index 194c91206..92a70cf37 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_exhook, [ {description, "EMQX Extension for Hook"}, - {vsn, "5.0.12"}, + {vsn, "5.0.13"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, diff --git a/apps/emqx_exhook/src/emqx_exhook_app.erl b/apps/emqx_exhook/src/emqx_exhook_app.erl index 6e9bc5242..2fc562ae8 100644 --- a/apps/emqx_exhook/src/emqx_exhook_app.erl +++ b/apps/emqx_exhook/src/emqx_exhook_app.erl @@ -22,8 +22,7 @@ -export([ start/2, - stop/1, - prep_stop/1 + stop/1 ]). %%-------------------------------------------------------------------- @@ -34,10 +33,6 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_exhook_sup:start_link(), {ok, Sup}. -prep_stop(State) -> - emqx_ctl:unregister_command(exhook), - State. - stop(_State) -> ok. diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index 0647c80ea..573dded9e 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -181,10 +181,14 @@ pre_config_update(_, {enable, Name, Enable}, OldConf) -> of not_found -> throw(not_found); NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)} - end. + end; +pre_config_update(_, NewConf, _OldConf) when NewConf =:= #{} -> + {ok, NewConf#{<<"servers">> => []}}; +pre_config_update(_, NewConf = #{<<"servers">> := Servers}, _OldConf) -> + {ok, NewConf#{<<"servers">> => lists:map(fun maybe_write_certs/1, Servers)}}. post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) -> - Result = call({update_config, UpdateReq, NewConf}), + Result = call({update_config, UpdateReq, NewConf, OldConf}), try_clear_ssl_files(UpdateReq, NewConf, OldConf), {ok, Result}. @@ -197,6 +201,7 @@ post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) -> init([]) -> process_flag(trap_exit, true), emqx_conf:add_handler([exhook, servers], ?MODULE), + emqx_conf:add_handler([exhook], ?MODULE), ServerL = emqx:get_config([exhook, servers]), Servers = load_all_servers(ServerL), Servers2 = reorder(ServerL, Servers), @@ -222,22 +227,16 @@ handle_call( OrderServers = sort_name_by_order(Infos, Servers), {reply, OrderServers, State}; handle_call( - {update_config, {move, _Name, _Position}, NewConfL}, + {update_config, {move, _Name, _Position}, NewConfL, _}, _From, #{servers := Servers} = State ) -> Servers2 = reorder(NewConfL, Servers), {reply, ok, State#{servers := Servers2}}; -handle_call({update_config, {delete, ToDelete}, _}, _From, State) -> - emqx_exhook_metrics:on_server_deleted(ToDelete), - - #{servers := Servers} = State2 = do_unload_server(ToDelete, State), - - Servers2 = maps:remove(ToDelete, Servers), - - {reply, ok, update_servers(Servers2, State2)}; +handle_call({update_config, {delete, ToDelete}, _, _}, _From, State) -> + {reply, ok, remove_server(ToDelete, State)}; handle_call( - {update_config, {add, RawConf}, NewConfL}, + {update_config, {add, RawConf}, NewConfL, _}, _From, #{servers := Servers} = State ) -> @@ -246,14 +245,68 @@ handle_call( Servers2 = Servers#{Name => Server}, Servers3 = reorder(NewConfL, Servers2), {reply, Result, State#{servers := Servers3}}; +handle_call({update_config, {update, Name, _Conf}, NewConfL, _}, _From, State) -> + {Result, State2} = restart_server(Name, NewConfL, State), + {reply, Result, State2}; +handle_call({update_config, {enable, Name, _Enable}, NewConfL, _}, _From, State) -> + {Result, State2} = restart_server(Name, NewConfL, State), + {reply, Result, State2}; +handle_call({update_config, _, ConfL, ConfL}, _From, State) -> + {reply, ok, State}; +handle_call({update_config, _, #{servers := NewConfL}, #{servers := OldConfL}}, _From, State) -> + #{ + removed := Removed, + added := Added, + changed := Updated + } = emqx_utils:diff_lists(NewConfL, OldConfL, fun(#{name := Name}) -> Name end), + %% remove servers + State2 = lists:foldl( + fun(Conf, Acc) -> + ToDelete = maps:get(name, Conf), + remove_server(ToDelete, Acc) + end, + State, + Removed + ), + %% update servers + {UpdateRes, State3} = + lists:foldl( + fun({_Old, Conf}, {ResAcc, StateAcc}) -> + Name = maps:get(name, Conf), + case restart_server(Name, NewConfL, StateAcc) of + {ok, StateAcc1} -> {ResAcc, StateAcc1}; + {Err, StateAcc1} -> {[Err | ResAcc], StateAcc1} + end + end, + {[], State2}, + Updated + ), + %% Add servers + {AddRes, State4} = + lists:foldl( + fun(Conf, {ResAcc, StateAcc}) -> + case do_load_server(options_to_server(Conf)) of + {ok, Server} -> + #{servers := Servers} = StateAcc, + Name = maps:get(name, Conf), + Servers2 = Servers#{Name => Server}, + {ResAcc, update_servers(Servers2, StateAcc)}; + {Err, StateAcc1} -> + {[Err | ResAcc], StateAcc1} + end + end, + {[], State3}, + Added + ), + %% update order + #{servers := Servers4} = State4, + State5 = State4#{servers => reorder(NewConfL, Servers4)}, + case lists:append([UpdateRes, AddRes]) of + [] -> {reply, ok, State5}; + _ -> {reply, {error, #{added => AddRes, updated => UpdateRes}}, State5} + end; handle_call({lookup, Name}, _From, State) -> {reply, where_is_server(Name, State), State}; -handle_call({update_config, {update, Name, _Conf}, NewConfL}, _From, State) -> - {Result, State2} = restart_server(Name, NewConfL, State), - {reply, Result, State2}; -handle_call({update_config, {enable, Name, _Enable}, NewConfL}, _From, State) -> - {Result, State2} = restart_server(Name, NewConfL, State), - {reply, Result, State2}; handle_call({server_info, Name}, _From, State) -> case where_is_server(Name, State) of not_found -> @@ -287,6 +340,12 @@ handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. +remove_server(ToDelete, State) -> + emqx_exhook_metrics:on_server_deleted(ToDelete), + #{servers := Servers} = State2 = do_unload_server(ToDelete, State), + Servers2 = maps:remove(ToDelete, Servers), + update_servers(Servers2, State2). + handle_cast(_Msg, State) -> {noreply, State}. @@ -310,6 +369,8 @@ terminate(Reason, State = #{servers := Servers}) -> Servers ), ?tp(info, exhook_mgr_terminated, #{reason => Reason, servers => Servers}), + emqx_conf:remove_handler([exhook, servers]), + emqx_conf:remove_handler([exhook]), ok. code_change(_OldVsn, State, _Extra) -> @@ -612,8 +673,16 @@ try_clear_ssl_files({Op, Name, _}, NewConfs, OldConfs) when NewSSL = find_server_ssl_cfg(Name, NewConfs), OldSSL = find_server_ssl_cfg(Name, OldConfs), emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL); -try_clear_ssl_files(_Req, _NewConf, _OldConf) -> - ok. +%% replace the whole config from the file +try_clear_ssl_files(_Req, #{servers := NewServers}, #{servers := OldServers}) -> + lists:foreach( + fun(#{name := Name} = Conf) -> + NewSSL = find_server_ssl_cfg(Name, NewServers), + OldSSL = maps:get(ssl, Conf, undefined), + emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL) + end, + OldServers + ). search_server_cfg(Name, Confs) -> lists:search( diff --git a/apps/emqx_utils/src/emqx_utils.app.src b/apps/emqx_utils/src/emqx_utils.app.src index 605093875..0b172565a 100644 --- a/apps/emqx_utils/src/emqx_utils.app.src +++ b/apps/emqx_utils/src/emqx_utils.app.src @@ -2,7 +2,7 @@ {application, emqx_utils, [ {description, "Miscellaneous utilities for EMQX apps"}, % strict semver, bump manually! - {vsn, "5.0.2"}, + {vsn, "5.0.3"}, {modules, [ emqx_utils, emqx_utils_api, diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 2c6ddd9c1..edc76a1e6 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -54,7 +54,8 @@ safe_to_existing_atom/1, safe_to_existing_atom/2, pub_props_to_packet/1, - safe_filename/1 + safe_filename/1, + diff_lists/3 ]). -export([ @@ -748,3 +749,152 @@ safe_filename(Filename) when is_binary(Filename) -> binary:replace(Filename, <<":">>, <<"-">>, [global]); safe_filename(Filename) when is_list(Filename) -> lists:flatten(string:replace(Filename, ":", "-", all)). + +%% @doc Compares two lists of maps and returns the differences between them in a +%% map containing four keys – 'removed', 'added', 'identical', and 'changed' – +%% each holding a list of maps. Elements are compared using key function KeyFunc +%% to extract the comparison key used for matching. +%% +%% The return value is a map with the following keys and the list of maps as its values: +%% * 'removed' – a list of maps that were present in the Old list, but not found in the New list. +%% * 'added' – a list of maps that were present in the New list, but not found in the Old list. +%% * 'identical' – a list of maps that were present in both lists and have the same comparison key value. +%% * 'changed' – a list of pairs of maps representing the changes between maps present in the New and Old lists. +%% The first map in the pair represents the map in the Old list, and the second map +%% represents the potential modification in the New list. + +%% The KeyFunc parameter is a function that extracts the comparison key used +%% for matching from each map. The function should return a comparable term, +%% such as an atom, a number, or a string. This is used to determine if each +%% element is the same in both lists. + +-spec diff_lists(list(T), list(T), Func) -> + #{ + added := list(T), + identical := list(T), + removed := list(T), + changed := list({Old :: T, New :: T}) + } +when + Func :: fun((T) -> any()), + T :: any(). + +diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) -> + Removed = + lists:foldl( + fun(E, RemovedAcc) -> + case search(KeyFunc(E), KeyFunc, New) of + false -> [E | RemovedAcc]; + _ -> RemovedAcc + end + end, + [], + Old + ), + {Added, Identical, Changed} = + lists:foldl( + fun(E, Acc) -> + {Added0, Identical0, Changed0} = Acc, + case search(KeyFunc(E), KeyFunc, Old) of + false -> + {[E | Added0], Identical0, Changed0}; + E -> + {Added0, [E | Identical0], Changed0}; + E1 -> + {Added0, Identical0, [{E1, E} | Changed0]} + end + end, + {[], [], []}, + New + ), + #{ + removed => lists:reverse(Removed), + added => lists:reverse(Added), + identical => lists:reverse(Identical), + changed => lists:reverse(Changed) + }. + +search(_ExpectValue, _KeyFunc, []) -> + false; +search(ExpectValue, KeyFunc, [Item | List]) -> + case KeyFunc(Item) =:= ExpectValue of + true -> Item; + false -> search(ExpectValue, KeyFunc, List) + end. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +diff_lists_test() -> + KeyFunc = fun(#{name := Name}) -> Name end, + ?assertEqual( + #{ + removed => [], + added => [], + identical => [], + changed => [] + }, + diff_lists([], [], KeyFunc) + ), + %% test removed list + ?assertEqual( + #{ + removed => [#{name => a, value => 1}], + added => [], + identical => [], + changed => [] + }, + diff_lists([], [#{name => a, value => 1}], KeyFunc) + ), + %% test added list + ?assertEqual( + #{ + removed => [], + added => [#{name => a, value => 1}], + identical => [], + changed => [] + }, + diff_lists([#{name => a, value => 1}], [], KeyFunc) + ), + %% test identical list + ?assertEqual( + #{ + removed => [], + added => [], + identical => [#{name => a, value => 1}], + changed => [] + }, + diff_lists([#{name => a, value => 1}], [#{name => a, value => 1}], KeyFunc) + ), + Old = [ + #{name => a, value => 1}, + #{name => b, value => 4}, + #{name => e, value => 2}, + #{name => d, value => 4} + ], + New = [ + #{name => a, value => 1}, + #{name => b, value => 2}, + #{name => e, value => 2}, + #{name => c, value => 3} + ], + Diff = diff_lists(New, Old, KeyFunc), + ?assertEqual( + #{ + added => [ + #{name => c, value => 3} + ], + identical => [ + #{name => a, value => 1}, + #{name => e, value => 2} + ], + removed => [ + #{name => d, value => 4} + ], + changed => [{#{name => b, value => 4}, #{name => b, value => 2}}] + }, + Diff + ), + ok. + +-endif. From 03160ef599781d82b2f0f9d6f271044fab22ece4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Sun, 4 Jun 2023 12:02:02 +0800 Subject: [PATCH 02/17] test: add more test for update_config --- apps/emqx_exhook/src/emqx_exhook_mgr.erl | 169 +++++++++----------- apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 45 +++++- 2 files changed, 114 insertions(+), 100 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index 573dded9e..173a1fecb 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -23,6 +23,9 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-define(SERVERS, [exhook, servers]). +-define(EXHOOK, [exhook]). + %% APIs -export([start_link/0]). @@ -148,7 +151,7 @@ update_config(KeyPath, UpdateReq) -> Error end. -pre_config_update(_, {add, #{<<"name">> := Name} = Conf}, OldConf) -> +pre_config_update(?SERVERS, {add, #{<<"name">> := Name} = Conf}, OldConf) -> case lists:any(fun(#{<<"name">> := ExistedName}) -> ExistedName =:= Name end, OldConf) of true -> throw(already_exists); @@ -156,35 +159,20 @@ pre_config_update(_, {add, #{<<"name">> := Name} = Conf}, OldConf) -> NConf = maybe_write_certs(Conf), {ok, OldConf ++ [NConf]} end; -pre_config_update(_, {update, Name, Conf}, OldConf) -> - case replace_conf(Name, fun(_) -> Conf end, OldConf) of - not_found -> throw(not_found); - NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)} - end; -pre_config_update(_, {delete, ToDelete}, OldConf) -> - case do_delete(ToDelete, OldConf) of - not_found -> throw(not_found); - NewConf -> {ok, NewConf} - end; -pre_config_update(_, {move, Name, Position}, OldConf) -> - case do_move(Name, Position, OldConf) of - not_found -> throw(not_found); - NewConf -> {ok, NewConf} - end; -pre_config_update(_, {enable, Name, Enable}, OldConf) -> - case - replace_conf( - Name, - fun(Conf) -> Conf#{<<"enable">> => Enable} end, - OldConf - ) - of - not_found -> throw(not_found); - NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)} - end; -pre_config_update(_, NewConf, _OldConf) when NewConf =:= #{} -> +pre_config_update(?SERVERS, {update, Name, Conf}, OldConf) -> + NewConf = replace_conf(Name, fun(_) -> Conf end, OldConf), + {ok, lists:map(fun maybe_write_certs/1, NewConf)}; +pre_config_update(?SERVERS, {delete, ToDelete}, OldConf) -> + {ok, do_delete(ToDelete, OldConf)}; +pre_config_update(?SERVERS, {move, Name, Position}, OldConf) -> + {ok, do_move(Name, Position, OldConf)}; +pre_config_update(?SERVERS, {enable, Name, Enable}, OldConf) -> + ReplaceFun = fun(Conf) -> Conf#{<<"enable">> => Enable} end, + NewConf = replace_conf(Name, ReplaceFun, OldConf), + {ok, lists:map(fun maybe_write_certs/1, NewConf)}; +pre_config_update(?EXHOOK, NewConf, _OldConf) when NewConf =:= #{} -> {ok, NewConf#{<<"servers">> => []}}; -pre_config_update(_, NewConf = #{<<"servers">> := Servers}, _OldConf) -> +pre_config_update(?EXHOOK, NewConf = #{<<"servers">> := Servers}, _OldConf) -> {ok, NewConf#{<<"servers">> => lists:map(fun maybe_write_certs/1, Servers)}}. post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) -> @@ -192,17 +180,15 @@ post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) -> try_clear_ssl_files(UpdateReq, NewConf, OldConf), {ok, Result}. -%%===================================================================== - %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> process_flag(trap_exit, true), - emqx_conf:add_handler([exhook, servers], ?MODULE), - emqx_conf:add_handler([exhook], ?MODULE), - ServerL = emqx:get_config([exhook, servers]), + emqx_conf:add_handler(?EXHOOK, ?MODULE), + emqx_conf:add_handler(?SERVERS, ?MODULE), + ServerL = emqx:get_config(?SERVERS), Servers = load_all_servers(ServerL), Servers2 = reorder(ServerL, Servers), refresh_tick(), @@ -259,51 +245,13 @@ handle_call({update_config, _, #{servers := NewConfL}, #{servers := OldConfL}}, added := Added, changed := Updated } = emqx_utils:diff_lists(NewConfL, OldConfL, fun(#{name := Name}) -> Name end), - %% remove servers - State2 = lists:foldl( - fun(Conf, Acc) -> - ToDelete = maps:get(name, Conf), - remove_server(ToDelete, Acc) - end, - State, - Removed - ), - %% update servers - {UpdateRes, State3} = - lists:foldl( - fun({_Old, Conf}, {ResAcc, StateAcc}) -> - Name = maps:get(name, Conf), - case restart_server(Name, NewConfL, StateAcc) of - {ok, StateAcc1} -> {ResAcc, StateAcc1}; - {Err, StateAcc1} -> {[Err | ResAcc], StateAcc1} - end - end, - {[], State2}, - Updated - ), - %% Add servers - {AddRes, State4} = - lists:foldl( - fun(Conf, {ResAcc, StateAcc}) -> - case do_load_server(options_to_server(Conf)) of - {ok, Server} -> - #{servers := Servers} = StateAcc, - Name = maps:get(name, Conf), - Servers2 = Servers#{Name => Server}, - {ResAcc, update_servers(Servers2, StateAcc)}; - {Err, StateAcc1} -> - {[Err | ResAcc], StateAcc1} - end - end, - {[], State3}, - Added - ), - %% update order - #{servers := Servers4} = State4, + State2 = remove_servers(Removed, State), + {UpdateRes, State3} = restart_servers(Updated, NewConfL, State2), + {AddRes, State4 = #{servers := Servers4}} = add_servers(Added, State3), State5 = State4#{servers => reorder(NewConfL, Servers4)}, - case lists:append([UpdateRes, AddRes]) of - [] -> {reply, ok, State5}; - _ -> {reply, {error, #{added => AddRes, updated => UpdateRes}}, State5} + case UpdateRes =:= [] andalso AddRes =:= [] of + true -> {reply, ok, State5}; + false -> {reply, {error, #{added => AddRes, updated => UpdateRes}}, State5} end; handle_call({lookup, Name}, _From, State) -> {reply, where_is_server(Name, State), State}; @@ -340,6 +288,16 @@ handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. +remove_servers(Removes, State) -> + lists:foldl( + fun(Conf, Acc) -> + ToDelete = maps:get(name, Conf), + remove_server(ToDelete, Acc) + end, + State, + Removes + ). + remove_server(ToDelete, State) -> emqx_exhook_metrics:on_server_deleted(ToDelete), #{servers := Servers} = State2 = do_unload_server(ToDelete, State), @@ -369,8 +327,8 @@ terminate(Reason, State = #{servers := Servers}) -> Servers ), ?tp(info, exhook_mgr_terminated, #{reason => Reason, servers => Servers}), - emqx_conf:remove_handler([exhook, servers]), - emqx_conf:remove_handler([exhook]), + emqx_conf:remove_handler(?SERVERS), + emqx_conf:remove_handler(?EXHOOK), ok. code_change(_OldVsn, State, _Extra) -> @@ -386,6 +344,22 @@ unload_exhooks() -> || {Name, {M, F, _A}} <- ?ENABLED_HOOKS ]. +add_servers(Added, State) -> + lists:foldl( + fun(Conf = #{name := Name}, {ResAcc, StateAcc}) -> + case do_load_server(options_to_server(Conf)) of + {ok, Server} -> + #{servers := Servers} = StateAcc, + Servers2 = Servers#{Name => Server}, + {ResAcc, update_servers(Servers2, StateAcc)}; + {Err, StateAcc1} -> + {[Err | ResAcc], StateAcc1} + end + end, + {[], State}, + Added + ). + do_load_server(#{name := Name} = Server) -> case emqx_exhook_server:load(Name, Server) of {ok, ServerState} -> @@ -462,8 +436,7 @@ clean_reload_timer(#{timer := Timer}) -> _ = erlang:cancel_timer(Timer), ok. --spec do_move(binary(), position(), list(server_options())) -> - not_found | list(server_options()). +-spec do_move(binary(), position(), list(server_options())) -> list(server_options()). do_move(Name, Position, ConfL) -> move(ConfL, Name, Position, []). @@ -472,7 +445,7 @@ move([#{<<"name">> := Name} = Server | T], Name, Position, HeadL) -> move([Server | T], Name, Position, HeadL) -> move(T, Name, Position, [Server | HeadL]); move([], _Name, _Position, _HeadL) -> - not_found. + throw(not_found). move_to(?CMD_MOVE_FRONT, Server, ServerL) -> [Server | ServerL]; @@ -490,8 +463,7 @@ move_to([H | T], Position, Server, HeadL) -> move_to([], _Position, _Server, _HeadL) -> not_found. --spec do_delete(binary(), list(server_options())) -> - not_found | list(server_options()). +-spec do_delete(binary(), list(server_options())) -> list(server_options()). do_delete(ToDelete, OldConf) -> case lists:any(fun(#{<<"name">> := ExistedName}) -> ExistedName =:= ToDelete end, OldConf) of true -> @@ -500,7 +472,7 @@ do_delete(ToDelete, OldConf) -> OldConf ); false -> - not_found + throw(not_found) end. -spec reorder(list(server_options()), servers()) -> servers(). @@ -532,9 +504,7 @@ where_is_server(Name, #{servers := Servers}) -> -type replace_fun() :: fun((server_options()) -> server_options()). --spec replace_conf(binary(), replace_fun(), list(server_options())) -> - not_found - | list(server_options()). +-spec replace_conf(binary(), replace_fun(), list(server_options())) -> list(server_options()). replace_conf(Name, ReplaceFun, ConfL) -> replace_conf(ConfL, Name, ReplaceFun, []). @@ -544,7 +514,20 @@ replace_conf([#{<<"name">> := Name} = H | T], Name, ReplaceFun, HeadL) -> replace_conf([H | T], Name, ReplaceFun, HeadL) -> replace_conf(T, Name, ReplaceFun, [H | HeadL]); replace_conf([], _, _, _) -> - not_found. + throw(not_found). + +restart_servers(Updated, NewConfL, State) -> + lists:foldl( + fun({_Old, Conf}, {ResAcc, StateAcc}) -> + Name = maps:get(name, Conf), + case restart_server(Name, NewConfL, StateAcc) of + {ok, StateAcc1} -> {ResAcc, StateAcc1}; + {Err, StateAcc1} -> {[Err | ResAcc], StateAcc1} + end + end, + {[], State}, + Updated + ). -spec restart_server(binary(), list(server_options()), state()) -> {ok, state()} @@ -673,7 +656,7 @@ try_clear_ssl_files({Op, Name, _}, NewConfs, OldConfs) when NewSSL = find_server_ssl_cfg(Name, NewConfs), OldSSL = find_server_ssl_cfg(Name, OldConfs), emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL); -%% replace the whole config from the file +%% replace the whole config from the cli try_clear_ssl_files(_Req, #{servers := NewServers}, #{servers := OldServers}) -> lists:foreach( fun(#{name := Name} = Conf) -> @@ -682,7 +665,9 @@ try_clear_ssl_files(_Req, #{servers := NewServers}, #{servers := OldServers}) -> emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL) end, OldServers - ). + ); +try_clear_ssl_files(_Req, _NewConf, _OldConf) -> + ok. search_server_cfg(Name, Confs) -> lists:search( diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index ff313c8c8..d9297fff6 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -196,9 +196,9 @@ t_error_update_conf(_) -> Path = [exhook, servers], Name = <<"error_update">>, ErrorCfg = #{<<"name">> => Name}, - {error, _} = emqx_exhook_mgr:update_config(Path, {update, Name, ErrorCfg}), - {error, _} = emqx_exhook_mgr:update_config(Path, {move, Name, top, <<>>}), - {error, _} = emqx_exhook_mgr:update_config(Path, {enable, Name, true}), + {error, not_found} = emqx_exhook_mgr:update_config(Path, {update, Name, ErrorCfg}), + {error, not_found} = emqx_exhook_mgr:update_config(Path, {move, Name, top}), + {error, not_found} = emqx_exhook_mgr:update_config(Path, {enable, Name, true}), ErrorAnd = #{<<"name">> => Name, <<"url">> => <<"http://127.0.0.1:9001">>}, {ok, _} = emqx_exhook_mgr:update_config(Path, {add, ErrorAnd}), @@ -210,12 +210,37 @@ t_error_update_conf(_) -> }, {ok, _} = emqx_exhook_mgr:update_config(Path, {update, Name, DisableAnd}), - {ok, _} = emqx_exhook_mgr:update_config(Path, {delete, <<"error">>}), - {error, not_found} = emqx_exhook_mgr:update_config( - Path, {delete, <<"delete_not_exists">>} - ), + {ok, _} = emqx_exhook_mgr:update_config(Path, {delete, Name}), + {error, not_found} = emqx_exhook_mgr:update_config(Path, {delete, Name}), ok. +t_update_conf(_Config) -> + Path = [exhook], + Conf = #{<<"servers">> := Servers} = emqx_config:get_raw(Path), + ?assert(length(Servers) > 1), + Servers1 = shuffle(Servers), + ReOrderedConf = Conf#{<<"servers">> => Servers1}, + validate_servers(Path, ReOrderedConf, Servers1), + [_ | Servers2] = Servers, + DeletedConf = Conf#{<<"servers">> => Servers2}, + validate_servers(Path, DeletedConf, Servers2), + [L1, L2 | Servers3] = Servers, + UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => 1000}, + UpdatedServers = [L1, UpdateL2 | Servers3], + UpdatedConf = Conf#{<<"servers">> => UpdatedServers}, + validate_servers(Path, UpdatedConf, UpdatedServers), + %% reset + validate_servers(Path, Conf, Servers), + ok. + +validate_servers(Path, ReOrderConf, Servers1) -> + {ok, _} = emqx_exhook_mgr:update_config(Path, ReOrderConf), + ?assertEqual(ReOrderConf, emqx_config:get_raw(Path)), + List = emqx_exhook_mgr:list(), + ExpectL = lists:map(fun(#{<<"name">> := Name}) -> Name end, Servers1), + L1 = lists:map(fun(#{name := Name}) -> Name end, List), + ?assertEqual(ExpectL, L1). + t_error_server_info(_) -> not_found = emqx_exhook_mgr:server_info(<<"not_exists">>), ok. @@ -490,6 +515,10 @@ data_file(Name) -> cert_file(Name) -> data_file(filename:join(["certs", Name])). -%% FIXME: this creats inter-test dependency +%% FIXME: this creates inter-test dependency stop_apps(Apps) -> emqx_common_test_helpers:stop_apps(Apps, #{erase_all_configs => false}). + +shuffle(List) -> + Sorted = lists:sort(lists:map(fun(L) -> {rand:uniform(), L} end, List)), + lists:map(fun({_, L}) -> L end, Sorted). From d0d6992a1471bb90b48bba873bc44b6e84a58155 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Sun, 4 Jun 2023 20:07:33 +0800 Subject: [PATCH 03/17] feat: update listeners from cli --- apps/emqx/src/emqx_listeners.erl | 183 +++++++++++++----- .../emqx/test/emqx_listeners_update_SUITE.erl | 154 +++++++++++++++ 2 files changed, 288 insertions(+), 49 deletions(-) create mode 100644 apps/emqx/test/emqx_listeners_update_SUITE.erl diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index acdb9ff96..c0116fd8c 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -57,6 +57,7 @@ ]). -export([pre_config_update/3, post_config_update/5]). +-export([create_listener/3, remove_listener/3, update_listener/3]). -export([format_bind/1]). @@ -65,8 +66,8 @@ -endif. -type listener_id() :: atom() | binary(). - --define(CONF_KEY_PATH, [listeners, '?', '?']). +-define(ROOT_KEY, listeners). +-define(CONF_KEY_PATH, [?ROOT_KEY, '?', '?']). -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). -define(MARK_DEL, ?TOMBSTONE_CONFIG_CHANGE_REQ). @@ -212,7 +213,10 @@ shutdown_count(_, _, _) -> start() -> %% The ?MODULE:start/0 will be called by emqx_app when emqx get started, %% so we install the config handler here. + %% callback when http api request ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), + %% callback when reload from config file + ok = emqx_config_handler:add_handler([?ROOT_KEY], ?MODULE), foreach_listeners(fun start_listener/3). -spec start_listener(listener_id()) -> ok | {error, term()}. @@ -287,7 +291,8 @@ restart_listener(Type, ListenerName, OldConf, NewConf) -> stop() -> %% The ?MODULE:stop/0 will be called by emqx_app when emqx is going to shutdown, %% so we uninstall the config handler here. - _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH), + ok = emqx_config_handler:remove_handler(?CONF_KEY_PATH), + ok = emqx_config_handler:remove_handler([?ROOT_KEY]), foreach_listeners(fun stop_listener/3). -spec stop_listener(listener_id()) -> ok | {error, term()}. @@ -463,50 +468,34 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> end. %% Update the listeners at runtime -pre_config_update([listeners, Type, Name], {create, NewConf}, V) when +pre_config_update([?ROOT_KEY, Type, Name], {create, NewConf}, V) when V =:= undefined orelse V =:= ?TOMBSTONE_VALUE -> - CertsDir = certs_dir(Type, Name), - {ok, convert_certs(CertsDir, NewConf)}; -pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) -> + {ok, convert_certs(Type, Name, NewConf)}; +pre_config_update([?ROOT_KEY, _Type, _Name], {create, _NewConf}, _RawConf) -> {error, already_exist}; -pre_config_update([listeners, _Type, _Name], {update, _Request}, undefined) -> +pre_config_update([?ROOT_KEY, _Type, _Name], {update, _Request}, undefined) -> {error, not_found}; -pre_config_update([listeners, Type, Name], {update, Request}, RawConf) -> - NewConfT = emqx_utils_maps:deep_merge(RawConf, Request), - NewConf = ensure_override_limiter_conf(NewConfT, Request), - CertsDir = certs_dir(Type, Name), - {ok, convert_certs(CertsDir, NewConf)}; -pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) -> - NewConf = emqx_utils_maps:deep_merge(RawConf, Updated), - {ok, NewConf}; -pre_config_update([listeners, _Type, _Name], ?MARK_DEL, _RawConf) -> +pre_config_update([?ROOT_KEY, Type, Name], {update, Request}, RawConf) -> + RawConf1 = emqx_utils_maps:deep_merge(RawConf, Request), + RawConf2 = ensure_override_limiter_conf(RawConf1, Request), + {ok, convert_certs(Type, Name, RawConf2)}; +pre_config_update([?ROOT_KEY, _Type, _Name], {action, _Action, Updated}, RawConf) -> + {ok, emqx_utils_maps:deep_merge(RawConf, Updated)}; +pre_config_update([?ROOT_KEY, _Type, _Name], ?MARK_DEL, _RawConf) -> {ok, ?TOMBSTONE_VALUE}; -pre_config_update(_Path, _Request, RawConf) -> - {ok, RawConf}. +pre_config_update([?ROOT_KEY], RawConf, RawConf) -> + {ok, RawConf}; +pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> + {ok, convert_certs(NewConf)}. -post_config_update([listeners, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) -> - start_listener(Type, Name, NewConf); -post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) -> - try_clear_ssl_files(certs_dir(Type, Name), NewConf, OldConf), - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - case NewConf of - #{enabled := true} -> restart_listener(Type, Name, {OldConf, NewConf}); - _ -> ok - end; -post_config_update([listeners, Type, Name], Op, _, OldConf, _AppEnvs) when - Op =:= ?MARK_DEL andalso is_map(OldConf) --> - ok = unregister_ocsp_stapling_refresh(Type, Name), - case stop_listener(Type, Name, OldConf) of - ok -> - _ = emqx_authentication:delete_chain(listener_id(Type, Name)), - CertsDir = certs_dir(Type, Name), - clear_certs(CertsDir, OldConf); - Err -> - Err - end; -post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) -> +post_config_update([?ROOT_KEY, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) -> + create_listener(Type, Name, NewConf); +post_config_update([?ROOT_KEY, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) -> + update_listener(Type, Name, {OldConf, NewConf}); +post_config_update([?ROOT_KEY, Type, Name], ?MARK_DEL, _, OldConf = #{}, _AppEnvs) -> + remove_listener(Type, Name, OldConf); +post_config_update([?ROOT_KEY, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) -> #{enabled := NewEnabled} = NewConf, #{enabled := OldEnabled} = OldConf, case {NewEnabled, OldEnabled} of @@ -523,9 +512,65 @@ post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldCo ok = unregister_ocsp_stapling_refresh(Type, Name), stop_listener(Type, Name, OldConf) end; +post_config_update([?ROOT_KEY], _Request, OldConf, OldConf, _AppEnvs) -> + ok; +post_config_update([?ROOT_KEY], _Request, NewConf, OldConf, _AppEnvs) -> + #{added := Added, removed := Removed, changed := Changed} = diff_confs(NewConf, OldConf), + Updated = lists:map(fun({{{T, N}, Old}, {_, New}}) -> {{T, N}, {Old, New}} end, Changed), + perform_listener_changes([ + {fun ?MODULE:remove_listener/3, Removed}, + {fun ?MODULE:update_listener/3, Updated}, + {fun ?MODULE:create_listener/3, Added} + ]); post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) -> ok. +create_listener(Type, Name, NewConf) -> + Res = start_listener(Type, Name, NewConf), + recreate_authenticator(Res, Type, Name, NewConf). + +recreate_authenticator(ok, Type, Name, Conf) -> + Chain = listener_id(Type, Name), + _ = emqx_authentication:delete_chain(Chain), + case maps:get(authentication, Conf, []) of + [] -> ok; + AuthN -> emqx_authentication:create_authenticator(Chain, AuthN) + end; +recreate_authenticator(Error, _Type, _Name, _NewConf) -> + Error. + +remove_listener(Type, Name, OldConf) -> + ok = unregister_ocsp_stapling_refresh(Type, Name), + case stop_listener(Type, Name, OldConf) of + ok -> + _ = emqx_authentication:delete_chain(listener_id(Type, Name)), + clear_certs(certs_dir(Type, Name), OldConf); + Err -> + Err + end. + +update_listener(Type, Name, {OldConf, NewConf}) -> + try_clear_ssl_files(certs_dir(Type, Name), NewConf, OldConf), + ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), + Res = restart_listener(Type, Name, {OldConf, NewConf}), + recreate_authenticator(Res, Type, Name, NewConf). + +perform_listener_changes([]) -> + ok; +perform_listener_changes([{Action, ConfL} | Tasks]) -> + case perform_listener_changes(Action, ConfL) of + ok -> perform_listener_changes(Tasks); + {error, Reason} -> {error, Reason} + end. + +perform_listener_changes(_Action, []) -> + ok; +perform_listener_changes(Action, [{{Type, Name}, Diff} | MapConf]) -> + case Action(Type, Name, Diff) of + ok -> perform_listener_changes(Action, MapConf); + {error, Reason} -> {error, Reason} + end. + esockd_opts(ListenerId, Type, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), Limiter = limiter(Opts0), @@ -701,6 +746,29 @@ del_limiter_bucket(Id, Conf) -> ) end. +diff_confs(NewConfs, OldConfs) -> + emqx_utils:diff_lists( + flatten_confs(NewConfs), + flatten_confs(OldConfs), + fun({Key, _}) -> Key end + ). + +flatten_confs(Conf0) -> + lists:flatmap( + fun({Type, Conf}) -> + do_flatten_confs(Type, Conf) + end, + maps:to_list(Conf0) + ). + +do_flatten_confs(Type, Conf0) -> + FilterFun = + fun + ({_Name, ?TOMBSTONE_TYPE}) -> false; + ({Name, Conf}) -> {true, {{Type, Name}, Conf}} + end, + lists:filtermap(FilterFun, maps:to_list(Conf0)). + enable_authn(Opts) -> maps:get(enable_authn, Opts, true). @@ -762,14 +830,32 @@ parse_bind(#{<<"bind">> := Bind}) -> certs_dir(Type, Name) -> iolist_to_binary(filename:join(["listeners", Type, Name])). -convert_certs(CertsDir, Conf) -> +convert_certs(ListenerConf) -> + maps:fold( + fun(Type, Listeners0, Acc) -> + Listeners1 = + maps:fold( + fun(Name, Conf, Acc1) -> + Acc1#{Name => convert_certs(Type, Name, Conf)} + end, + #{}, + Listeners0 + ), + Acc#{Type => Listeners1} + end, + #{}, + ListenerConf + ). + +convert_certs(Type, Name, Conf) -> + CertsDir = certs_dir(Type, Name), case emqx_tls_lib:ensure_ssl_files(CertsDir, get_ssl_options(Conf)) of {ok, undefined} -> Conf; {ok, SSL} -> Conf#{<<"ssl_options">> => SSL}; {error, Reason} -> - ?SLOG(error, Reason#{msg => "bad_ssl_config"}), + ?SLOG(error, Reason#{msg => "bad_ssl_config", type => Type, name => Name}), throw({bad_ssl_config, Reason}) end. @@ -791,13 +877,15 @@ try_clear_ssl_files(CertsDir, NewConf, OldConf) -> OldSSL = get_ssl_options(OldConf), emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL, OldSSL). -get_ssl_options(Conf) -> +get_ssl_options(Conf = #{}) -> case maps:find(ssl_options, Conf) of {ok, SSL} -> SSL; error -> maps:get(<<"ssl_options">>, Conf, undefined) - end. + end; +get_ssl_options(_) -> + undefined. %% @doc Get QUIC optional settings for low level tunings. %% @see quicer:quic_settings() @@ -889,8 +977,5 @@ unregister_ocsp_stapling_refresh(Type, Name) -> emqx_ocsp_cache:unregister_listener(ListenerId), ok. -%% There is currently an issue with frontend -%% infinity is not a good value for it, so we use 5m for now default_max_conn() -> - %% TODO: <<"infinity">> - 5_000_000. + <<"infinity">>. diff --git a/apps/emqx/test/emqx_listeners_update_SUITE.erl b/apps/emqx/test/emqx_listeners_update_SUITE.erl new file mode 100644 index 000000000..c16a26f3a --- /dev/null +++ b/apps/emqx/test/emqx_listeners_update_SUITE.erl @@ -0,0 +1,154 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_listeners_update_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_schema.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-import(emqx_listeners, [current_conns/2, is_running/1]). + +-define(LISTENERS, [listeners]). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_common_test_helpers:boot_modules(all), + emqx_common_test_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_common_test_helpers:stop_apps([]). + +init_per_testcase(_TestCase, Config) -> + Init = emqx:get_raw_config(?LISTENERS), + [{init_conf, Init} | Config]. + +end_per_testcase(_TestCase, Config) -> + Conf = ?config(init_conf, Config), + {ok, _} = emqx:update_config(?LISTENERS, Conf), + ok. + +t_default_conf(_Config) -> + ?assertMatch( + #{ + <<"tcp">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:1883">>}}, + <<"ssl">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8883">>}}, + <<"ws">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8083">>}}, + <<"wss">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8084">>}} + }, + emqx:get_raw_config(?LISTENERS) + ), + ?assertMatch( + #{ + tcp := #{default := #{bind := {{0, 0, 0, 0}, 1883}}}, + ssl := #{default := #{bind := {{0, 0, 0, 0}, 8883}}}, + ws := #{default := #{bind := {{0, 0, 0, 0}, 8083}}}, + wss := #{default := #{bind := {{0, 0, 0, 0}, 8084}}} + }, + emqx:get_config(?LISTENERS) + ), + ok. + +t_update_conf(_Conf) -> + Raw = emqx:get_raw_config(?LISTENERS), + Raw1 = emqx_utils_maps:deep_put( + [<<"tcp">>, <<"default">>, <<"bind">>], Raw, <<"127.0.0.1:1883">> + ), + Raw2 = emqx_utils_maps:deep_put( + [<<"ssl">>, <<"default">>, <<"bind">>], Raw1, <<"127.0.0.1:8883">> + ), + Raw3 = emqx_utils_maps:deep_put( + [<<"ws">>, <<"default">>, <<"bind">>], Raw2, <<"0.0.0.0:8083">> + ), + Raw4 = emqx_utils_maps:deep_put( + [<<"wss">>, <<"default">>, <<"bind">>], Raw3, <<"127.0.0.1:8084">> + ), + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw4)), + ?assertMatch( + #{ + <<"tcp">> := #{<<"default">> := #{<<"bind">> := <<"127.0.0.1:1883">>}}, + <<"ssl">> := #{<<"default">> := #{<<"bind">> := <<"127.0.0.1:8883">>}}, + <<"ws">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8083">>}}, + <<"wss">> := #{<<"default">> := #{<<"bind">> := <<"127.0.0.1:8084">>}} + }, + emqx:get_raw_config(?LISTENERS) + ), + BindTcp = {{127, 0, 0, 1}, 1883}, + BindSsl = {{127, 0, 0, 1}, 8883}, + BindWs = {{0, 0, 0, 0}, 8083}, + BindWss = {{127, 0, 0, 1}, 8084}, + ?assertMatch( + #{ + tcp := #{default := #{bind := BindTcp}}, + ssl := #{default := #{bind := BindSsl}}, + ws := #{default := #{bind := BindWs}}, + wss := #{default := #{bind := BindWss}} + }, + emqx:get_config(?LISTENERS) + ), + ?assertError(not_found, current_conns(<<"tcp:default">>, {{0, 0, 0, 0}, 1883})), + ?assertError(not_found, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})), + + ?assertEqual(0, current_conns(<<"tcp:default">>, BindTcp)), + ?assertEqual(0, current_conns(<<"ssl:default">>, BindSsl)), + + ?assertEqual({0, 0, 0, 0}, proplists:get_value(ip, ranch:info('ws:default'))), + ?assertEqual({127, 0, 0, 1}, proplists:get_value(ip, ranch:info('wss:default'))), + ?assert(is_running('ws:default')), + ?assert(is_running('wss:default')), + ok. + +t_add_delete_conf(_Conf) -> + Raw = emqx:get_raw_config(?LISTENERS), + %% add + #{<<"tcp">> := #{<<"default">> := Tcp}} = Raw, + NewBind = <<"127.0.0.1:1987">>, + Raw1 = emqx_utils_maps:deep_put([<<"tcp">>, <<"new">>], Raw, Tcp#{<<"bind">> => NewBind}), + Raw2 = emqx_utils_maps:deep_put([<<"ssl">>, <<"default">>], Raw1, ?TOMBSTONE_VALUE), + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw2)), + ?assertEqual(0, current_conns(<<"tcp:new">>, {{127, 0, 0, 1}, 1987})), + ?assertError(not_found, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})), + %% deleted + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw)), + ?assertError(not_found, current_conns(<<"tcp:new">>, {{127, 0, 0, 1}, 1987})), + ?assertEqual(0, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})), + ok. + +t_delete_default_conf(_Conf) -> + Raw = emqx:get_raw_config(?LISTENERS), + %% delete default listeners + Raw1 = emqx_utils_maps:deep_put([<<"tcp">>, <<"default">>], Raw, ?TOMBSTONE_VALUE), + Raw2 = emqx_utils_maps:deep_put([<<"ssl">>, <<"default">>], Raw1, ?TOMBSTONE_VALUE), + Raw3 = emqx_utils_maps:deep_put([<<"ws">>, <<"default">>], Raw2, ?TOMBSTONE_VALUE), + Raw4 = emqx_utils_maps:deep_put([<<"wss">>, <<"default">>], Raw3, ?TOMBSTONE_VALUE), + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw4)), + ?assertError(not_found, current_conns(<<"tcp:default">>, {{0, 0, 0, 0}, 1883})), + ?assertError(not_found, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})), + ?assertMatch({error, not_found}, is_running('ws:default')), + ?assertMatch({error, not_found}, is_running('wss:default')), + + %% reset + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw)), + ?assertEqual(0, current_conns(<<"tcp:default">>, {{0, 0, 0, 0}, 1883})), + ?assertEqual(0, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})), + ?assert(is_running('ws:default')), + ?assert(is_running('wss:default')), + ok. From bdcc069aac5c0b31666a4e7d63854d44620d6cae Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 5 Jun 2023 14:58:33 +0200 Subject: [PATCH 04/17] chore: start deprecating mcast cluster discovery --- apps/emqx_conf/src/emqx_conf_schema.erl | 4 ++-- rel/i18n/emqx_conf_schema.hocon | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index b4ba2bf47..f339f78a3 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -143,7 +143,7 @@ fields("cluster") -> )}, {"discovery_strategy", sc( - hoconsc:enum([manual, static, mcast, dns, etcd, k8s]), + hoconsc:enum([manual, static, dns, etcd, k8s, mcast]), #{ default => manual, desc => ?DESC(cluster_discovery_strategy), @@ -198,7 +198,7 @@ fields("cluster") -> {"mcast", sc( ?R_REF(cluster_mcast), - #{} + #{importance => ?IMPORTANCE_HIDDEN} )}, {"dns", sc( diff --git a/rel/i18n/emqx_conf_schema.hocon b/rel/i18n/emqx_conf_schema.hocon index 079132454..416b3ca76 100644 --- a/rel/i18n/emqx_conf_schema.hocon +++ b/rel/i18n/emqx_conf_schema.hocon @@ -755,7 +755,9 @@ cluster_discovery_strategy.desc: - static: Configure static nodes list by setting seeds in config file.
- dns: Use DNS A record to discover peer nodes.
- etcd: Use etcd to discover peer nodes.
-- k8s: Use Kubernetes API to discover peer pods.""" +- k8s: Use Kubernetes API to discover peer pods. +- mcast: Deprecated since 5.1, will be removed in 5.2. + This supports discovery via UDP multicast.""" cluster_discovery_strategy.label: """Cluster Discovery Strategy""" From 1dd2109b1fcbc8d787e6132cd9cb3eb166b3a418 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 5 Jun 2023 15:03:21 +0200 Subject: [PATCH 05/17] docs: add changelog for 10943 --- changes/ce/fix-10943.en.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changes/ce/fix-10943.en.md diff --git a/changes/ce/fix-10943.en.md b/changes/ce/fix-10943.en.md new file mode 100644 index 000000000..081f94c24 --- /dev/null +++ b/changes/ce/fix-10943.en.md @@ -0,0 +1,5 @@ +Deprecated UDP mcast mechanism for cluster discovery. + +This feature has been planed for deprecation since 5.0 mainly due to the lack of +actual production use. +This feature code is not yet removed in 5.1, but the document interface is demoted. From 67db9d6fe953a4cf45e3ed91fdfab6480d9700c4 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 5 Jun 2023 20:25:48 +0200 Subject: [PATCH 06/17] chore: bump version to 5.1.0-alpha.3 --- apps/emqx/include/emqx_release.hrl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 19854f40b..b128db61d 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,10 +32,10 @@ %% `apps/emqx/src/bpapi/README.md' %% Community edition --define(EMQX_RELEASE_CE, "5.1.0-alpha.2"). +-define(EMQX_RELEASE_CE, "5.1.0-alpha.3"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.1.0-alpha.2"). +-define(EMQX_RELEASE_EE, "5.1.0-alpha.3"). %% the HTTP API version -define(EMQX_API_VERSION, "5.0"). From f7a6648103d7527883548365d4c27c9d587c448d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 5 Jun 2023 23:32:13 +0300 Subject: [PATCH 07/17] fix(mqttconn): no warn if ingress poolsize is same as config --- apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 86787b33b..a1cfe687f 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -99,7 +99,7 @@ choose_ingress_pool_size( {_Filter, #{share := _Name}} -> % NOTE: this is shared subscription, many workers may subscribe PoolSize; - {_Filter, #{}} -> + {_Filter, #{}} when PoolSize > 1 -> % NOTE: this is regular subscription, only one worker should subscribe ?SLOG(warning, #{ msg => "mqtt_bridge_ingress_pool_size_ignored", @@ -110,6 +110,8 @@ choose_ingress_pool_size( config_pool_size => PoolSize, pool_size => 1 }), + 1; + {_Filter, #{}} when PoolSize == 1 -> 1 end. From cd04b7cf8b24351ea231531143c52b273b26b897 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 5 Jun 2023 22:49:22 +0200 Subject: [PATCH 08/17] ci: skip github action cache for macos arm64 --- .github/actions/package-macos/action.yaml | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/.github/actions/package-macos/action.yaml b/.github/actions/package-macos/action.yaml index 9a19fa703..d27507dd0 100644 --- a/.github/actions/package-macos/action.yaml +++ b/.github/actions/package-macos/action.yaml @@ -43,8 +43,18 @@ runs: echo "OTP_SOURCE_PATH=$OTP_SOURCE_PATH" >> $GITHUB_OUTPUT echo "OTP_INSTALL_PATH=$OTP_INSTALL_PATH" >> $GITHUB_OUTPUT mkdir -p "$OTP_SOURCE_PATH" "$OTP_INSTALL_PATH" + # we need this to skip using cache for self-hosted runners + case ${{ inputs.os }} in + *arm64) + echo "SELF_HOSTED=true" >> $GITHUB_OUTPUT + ;; + *) + echo "SELF_HOSTED=false" >> $GITHUB_OUTPUT + ;; + esac - uses: actions/cache@v3 id: cache + if: steps.prepare.outputs.SELF_HOSTED != 'true' with: path: ${{ steps.prepare.outputs.OTP_INSTALL_PATH }} key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit @@ -52,6 +62,17 @@ runs: if: steps.cache.outputs.cache-hit != 'true' shell: bash run: | + SELF_HOSTED="${{ steps.prepare.outputs.SELF_HOSTED}}" + # when it's self-hosted, it never hits the cache, + # skip rebuild if it's self-hosted and the install path already has a 'bin' + if [ "$SELF_HOSTED" = 'true' ]; then + if [ -d "$OTP_INSTALL_PATH/bin" ]; then + echo "Skip rebuilding OTP, found $OTP_INSTALL_PATH" + exit 0 + fi + fi + ## when it's not self-hosted, or the install path is not found, + ## build otp from source code. OTP_SOURCE_PATH="${{ steps.prepare.outputs.OTP_SOURCE_PATH }}" OTP_INSTALL_PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}" if [ -d "$OTP_SOURCE_PATH" ]; then From 541dc1b9cf6d68a2a020de94c311dfb835e8e12a Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 5 Jun 2023 23:19:34 +0200 Subject: [PATCH 09/17] ci: inspec erl in PATH for macos builds --- .github/actions/package-macos/action.yaml | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/.github/actions/package-macos/action.yaml b/.github/actions/package-macos/action.yaml index d27507dd0..b53be1666 100644 --- a/.github/actions/package-macos/action.yaml +++ b/.github/actions/package-macos/action.yaml @@ -62,19 +62,19 @@ runs: if: steps.cache.outputs.cache-hit != 'true' shell: bash run: | - SELF_HOSTED="${{ steps.prepare.outputs.SELF_HOSTED}}" + OTP_SOURCE_PATH="${{ steps.prepare.outputs.OTP_SOURCE_PATH }}" + OTP_INSTALL_PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}" + SELF_HOSTED="${{ steps.prepare.outputs.SELF_HOSTED }}" # when it's self-hosted, it never hits the cache, # skip rebuild if it's self-hosted and the install path already has a 'bin' if [ "$SELF_HOSTED" = 'true' ]; then - if [ -d "$OTP_INSTALL_PATH/bin" ]; then + if [ -n "$OTP_INSTALL_PATH" && -d "$OTP_INSTALL_PATH/bin" ]; then echo "Skip rebuilding OTP, found $OTP_INSTALL_PATH" exit 0 fi fi ## when it's not self-hosted, or the install path is not found, ## build otp from source code. - OTP_SOURCE_PATH="${{ steps.prepare.outputs.OTP_SOURCE_PATH }}" - OTP_INSTALL_PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}" if [ -d "$OTP_SOURCE_PATH" ]; then rm -rf "$OTP_SOURCE_PATH" fi @@ -108,6 +108,10 @@ runs: shell: bash run: | export PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}/bin:$PATH" + # inspec erl in PATH + which erl + # inspec erl command banner + erl -s init stop make ensure-rebar3 mkdir -p $HOME/bin cp rebar3 $HOME/bin/rebar3 From b3c079dc21946d397b8e77213f4e9ea0620eb7a0 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 6 Jun 2023 01:21:09 +0200 Subject: [PATCH 10/17] ci: try --with-odbc --- .github/actions/package-macos/action.yaml | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/.github/actions/package-macos/action.yaml b/.github/actions/package-macos/action.yaml index b53be1666..180621c67 100644 --- a/.github/actions/package-macos/action.yaml +++ b/.github/actions/package-macos/action.yaml @@ -67,8 +67,8 @@ runs: SELF_HOSTED="${{ steps.prepare.outputs.SELF_HOSTED }}" # when it's self-hosted, it never hits the cache, # skip rebuild if it's self-hosted and the install path already has a 'bin' - if [ "$SELF_HOSTED" = 'true' ]; then - if [ -n "$OTP_INSTALL_PATH" && -d "$OTP_INSTALL_PATH/bin" ]; then + if [ "${SELF_HOSTED:-false}" = 'true' ]; then + if [ -n "$OTP_INSTALL_PATH" ] && [ -d "$OTP_INSTALL_PATH/bin" ]; then echo "Skip rebuilding OTP, found $OTP_INSTALL_PATH" exit 0 fi @@ -81,10 +81,13 @@ runs: git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git "$OTP_SOURCE_PATH" cd "$OTP_SOURCE_PATH" if [ "$(arch)" = arm64 ]; then + export CFLAGS="-I$(brew --prefix unixodbc)/include" export LDFLAGS="-L$(brew --prefix unixodbc)/lib" - export CC="/usr/bin/gcc -I$(brew --prefix unixodbc)/include" + WITH_ODBC="--with-odbc=$(brew --prefix unixodbc)" + else + WITH_ODBC="" fi - ./configure --disable-dynamic-ssl-lib --with-ssl=$(brew --prefix openssl@1.1) --disable-hipe --disable-jit --prefix="$OTP_INSTALL_PATH" + ./configure --disable-dynamic-ssl-lib --with-ssl=$(brew --prefix openssl@1.1) ${WITH_ODBC} --disable-hipe --disable-jit --prefix="$OTP_INSTALL_PATH" make -j$(nproc) rm -rf "$OTP_INSTALL_PATH" make install From da8f3da4ccab7f5a65602692c9fc766f5d83d832 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 6 Jun 2023 01:45:51 +0200 Subject: [PATCH 11/17] ci: fix CFLAGS for macos otp build --- .github/actions/package-macos/action.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/actions/package-macos/action.yaml b/.github/actions/package-macos/action.yaml index 180621c67..8615a433a 100644 --- a/.github/actions/package-macos/action.yaml +++ b/.github/actions/package-macos/action.yaml @@ -81,7 +81,7 @@ runs: git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git "$OTP_SOURCE_PATH" cd "$OTP_SOURCE_PATH" if [ "$(arch)" = arm64 ]; then - export CFLAGS="-I$(brew --prefix unixodbc)/include" + export CFLAGS="-O2 -g -I$(brew --prefix unixodbc)/include" export LDFLAGS="-L$(brew --prefix unixodbc)/lib" WITH_ODBC="--with-odbc=$(brew --prefix unixodbc)" else @@ -92,8 +92,8 @@ runs: rm -rf "$OTP_INSTALL_PATH" make install if [ "$(arch)" = arm64 ]; then + unset CFLAGS unset LDFLAGS - unset CC fi - name: build env: From 47d7e6ce016181931583649db43b18333a8748d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Mon, 5 Jun 2023 15:55:54 +0800 Subject: [PATCH 12/17] feat: update gateway from cli --- apps/emqx_gateway/src/emqx_gateway.erl | 2 +- apps/emqx_gateway/src/emqx_gateway_conf.erl | 294 ++++++++++++++---- .../emqx_gateway/src/emqx_gateway_metrics.erl | 2 +- .../test/emqx_gateway_conf_SUITE.erl | 188 +++++++++++ .../test/emqx_gateway_test_utils.erl | 8 +- 5 files changed, 425 insertions(+), 69 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway.erl b/apps/emqx_gateway/src/emqx_gateway.erl index 0e6871db2..0d5ea79b1 100644 --- a/apps/emqx_gateway/src/emqx_gateway.erl +++ b/apps/emqx_gateway/src/emqx_gateway.erl @@ -16,7 +16,7 @@ -module(emqx_gateway). --include("include/emqx_gateway.hrl"). +-include("emqx_gateway.hrl"). %% Gateway APIs -export([ diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index da86d6a58..e7b8a1c65 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -74,18 +74,20 @@ -type listener_ref() :: {ListenerType :: atom_or_bin(), ListenerName :: atom_or_bin()}. -define(IS_SSL(T), (T == <<"ssl_options">> orelse T == <<"dtls_options">>)). +-define(IGNORE_KEYS, [<<"listeners">>, ?AUTHN_BIN]). %%-------------------------------------------------------------------- %% Load/Unload %%-------------------------------------------------------------------- +-define(GATEWAY, [gateway]). -spec load() -> ok. load() -> - emqx_conf:add_handler([gateway], ?MODULE). + emqx_conf:add_handler(?GATEWAY, ?MODULE). -spec unload() -> ok. unload() -> - emqx_conf:remove_handler([gateway]). + emqx_conf:remove_handler(?GATEWAY). %%-------------------------------------------------------------------- %% APIs @@ -104,7 +106,7 @@ unconvert_listeners(Ls) when is_list(Ls) -> lists:foldl( fun(Lis, Acc) -> {[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis), - _ = vaildate_listener_name(Name), + _ = validate_listener_name(Name), NLis1 = maps:without([<<"id">>, <<"running">>], Lis1), emqx_utils_maps:deep_merge(Acc, #{Type => #{Name => NLis1}}) end, @@ -122,7 +124,7 @@ maps_key_take([K | Ks], M, Acc) -> {V, M1} -> maps_key_take(Ks, M1, [V | Acc]) end. -vaildate_listener_name(Name) -> +validate_listener_name(Name) -> try {match, _} = re:run(Name, "^[0-9a-zA-Z_-]+$"), ok @@ -373,7 +375,7 @@ ret_listener_or_err(_, _, Err) -> emqx_config:raw_config() ) -> {ok, emqx_config:update_request()} | {error, term()}. -pre_config_update(_, {load_gateway, GwName, Conf}, RawConf) -> +pre_config_update(?GATEWAY, {load_gateway, GwName, Conf}, RawConf) -> case maps:get(GwName, RawConf, undefined) of undefined -> NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf), @@ -381,29 +383,25 @@ pre_config_update(_, {load_gateway, GwName, Conf}, RawConf) -> _ -> badres_gateway(already_exist, GwName) end; -pre_config_update(_, {update_gateway, GwName, Conf}, RawConf) -> +pre_config_update(?GATEWAY, {update_gateway, GwName, Conf}, RawConf) -> case maps:get(GwName, RawConf, undefined) of undefined -> badres_gateway(not_found, GwName); GwRawConf -> - Conf1 = maps:without([<<"listeners">>, ?AUTHN_BIN], Conf), + Conf1 = maps:without(?IGNORE_KEYS, Conf), NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf1), NConf1 = maps:merge(GwRawConf, NConf), {ok, emqx_utils_maps:deep_put([GwName], RawConf, NConf1)} end; -pre_config_update(_, {unload_gateway, GwName}, RawConf) -> +pre_config_update(?GATEWAY, {unload_gateway, GwName}, RawConf) -> _ = tune_gw_certs( fun clear_certs/2, GwName, maps:get(GwName, RawConf, #{}) ), {ok, maps:remove(GwName, RawConf)}; -pre_config_update(_, {add_listener, GwName, {LType, LName}, Conf}, RawConf) -> - case - emqx_utils_maps:deep_get( - [GwName, <<"listeners">>, LType, LName], RawConf, undefined - ) - of +pre_config_update(?GATEWAY, {add_listener, GwName, {LType, LName}, Conf}, RawConf) -> + case get_listener(GwName, LType, LName, RawConf) of undefined -> NConf = convert_certs(certs_dir(GwName), Conf), NListener = #{LType => #{LName => NConf}}, @@ -415,12 +413,8 @@ pre_config_update(_, {add_listener, GwName, {LType, LName}, Conf}, RawConf) -> _ -> badres_listener(already_exist, GwName, LType, LName) end; -pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) -> - case - emqx_utils_maps:deep_get( - [GwName, <<"listeners">>, LType, LName], RawConf, undefined - ) - of +pre_config_update(?GATEWAY, {update_listener, GwName, {LType, LName}, Conf}, RawConf) -> + case get_listener(GwName, LType, LName, RawConf) of undefined -> badres_listener(not_found, GwName, LType, LName); OldConf -> @@ -432,21 +426,17 @@ pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) - ), {ok, NRawConf} end; -pre_config_update(_, {remove_listener, GwName, {LType, LName}}, RawConf) -> - Path = [GwName, <<"listeners">>, LType, LName], - case emqx_utils_maps:deep_get(Path, RawConf, undefined) of +pre_config_update(?GATEWAY, {remove_listener, GwName, {LType, LName}}, RawConf) -> + case get_listener(GwName, LType, LName, RawConf) of undefined -> {ok, RawConf}; OldConf -> clear_certs(certs_dir(GwName), OldConf), + Path = [GwName, <<"listeners">>, LType, LName], {ok, emqx_utils_maps:deep_remove(Path, RawConf)} end; -pre_config_update(_, {add_authn, GwName, Conf}, RawConf) -> - case - emqx_utils_maps:deep_get( - [GwName, ?AUTHN_BIN], RawConf, undefined - ) - of +pre_config_update(?GATEWAY, {add_authn, GwName, Conf}, RawConf) -> + case get_authn(GwName, RawConf) of undefined -> CertsDir = authn_certs_dir(GwName, Conf), Conf1 = emqx_authentication_config:convert_certs(CertsDir, Conf), @@ -458,14 +448,8 @@ pre_config_update(_, {add_authn, GwName, Conf}, RawConf) -> _ -> badres_authn(already_exist, GwName) end; -pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) -> - case - emqx_utils_maps:deep_get( - [GwName, <<"listeners">>, LType, LName], - RawConf, - undefined - ) - of +pre_config_update(?GATEWAY, {add_authn, GwName, {LType, LName}, Conf}, RawConf) -> + case get_listener(GwName, LType, LName, RawConf) of undefined -> badres_listener(not_found, GwName, LType, LName); Listener -> @@ -486,12 +470,8 @@ pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) -> badres_listener_authn(already_exist, GwName, LType, LName) end end; -pre_config_update(_, {update_authn, GwName, Conf}, RawConf) -> - case - emqx_utils_maps:deep_get( - [GwName, ?AUTHN_BIN], RawConf, undefined - ) - of +pre_config_update(?GATEWAY, {update_authn, GwName, Conf}, RawConf) -> + case get_authn(GwName, RawConf) of undefined -> badres_authn(not_found, GwName); OldAuthnConf -> @@ -499,14 +479,8 @@ pre_config_update(_, {update_authn, GwName, Conf}, RawConf) -> Conf1 = emqx_authentication_config:convert_certs(CertsDir, Conf, OldAuthnConf), {ok, emqx_utils_maps:deep_put([GwName, ?AUTHN_BIN], RawConf, Conf1)} end; -pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) -> - case - emqx_utils_maps:deep_get( - [GwName, <<"listeners">>, LType, LName], - RawConf, - undefined - ) - of +pre_config_update(?GATEWAY, {update_authn, GwName, {LType, LName}, Conf}, RawConf) -> + case get_listener(GwName, LType, LName, RawConf) of undefined -> badres_listener(not_found, GwName, LType, LName); Listener -> @@ -533,12 +507,8 @@ pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) -> )} end end; -pre_config_update(_, {remove_authn, GwName}, RawConf) -> - case - emqx_utils_maps:deep_get( - [GwName, ?AUTHN_BIN], RawConf, undefined - ) - of +pre_config_update(?GATEWAY, {remove_authn, GwName}, RawConf) -> + case get_authn(GwName, RawConf) of undefined -> ok; OldAuthnConf -> @@ -549,7 +519,7 @@ pre_config_update(_, {remove_authn, GwName}, RawConf) -> emqx_utils_maps:deep_remove( [GwName, ?AUTHN_BIN], RawConf )}; -pre_config_update(_, {remove_authn, GwName, {LType, LName}}, RawConf) -> +pre_config_update(?GATEWAY, {remove_authn, GwName, {LType, LName}}, RawConf) -> Path = [GwName, <<"listeners">>, LType, LName, ?AUTHN_BIN], case emqx_utils_maps:deep_get( @@ -565,10 +535,184 @@ pre_config_update(_, {remove_authn, GwName, {LType, LName}}, RawConf) -> emqx_authentication_config:clear_certs(CertsDir, OldAuthnConf) end, {ok, emqx_utils_maps:deep_remove(Path, RawConf)}; -pre_config_update(_, UnknownReq, _RawConf) -> - logger:error("Unknown configuration update request: ~0p", [UnknownReq]), +pre_config_update(?GATEWAY, NewRawConf0 = #{}, OldRawConf = #{}) -> + %% FIXME don't support gateway's listener's authn update. + %% load all authentications + NewRawConf1 = pre_load_authentications(NewRawConf0, OldRawConf), + %% load all listeners + NewRawConf2 = pre_load_listeners(NewRawConf1, OldRawConf), + %% load all gateway + NewRawConf3 = pre_load_gateways(NewRawConf2, OldRawConf), + {ok, NewRawConf3}; +pre_config_update(Path, UnknownReq, _RawConf) -> + ?SLOG(error, #{ + msg => "unknown_gateway_update_request", + request => UnknownReq, + path => Path + }), {error, badreq}. +pre_load_gateways(NewConf, OldConf) -> + %% unload old gateways + maps:foreach( + fun(GwName, _OldGwConf) -> + case maps:find(GwName, NewConf) of + error -> pre_config_update(?GATEWAY, {unload_gateway, GwName}, OldConf); + _ -> ok + end + end, + OldConf + ), + %% load/update gateways + maps:map( + fun(GwName, NewGwConf) -> + case maps:find(GwName, OldConf) of + {ok, NewGwConf} -> + NewGwConf; + {ok, _OldGwConf} -> + {ok, #{GwName := NewGwConf1}} = pre_config_update( + ?GATEWAY, {update_gateway, GwName, NewGwConf}, OldConf + ), + %% update gateway should pass through ignore keys(listener/authn) + PassThroughConf = maps:with(?IGNORE_KEYS, NewGwConf), + NewGwConf2 = maps:without(?IGNORE_KEYS, NewGwConf1), + maps:merge(NewGwConf2, PassThroughConf); + error -> + {ok, #{GwName := NewGwConf1}} = pre_config_update( + ?GATEWAY, {load_gateway, GwName, NewGwConf}, OldConf + ), + NewGwConf1 + end + end, + NewConf + ). + +pre_load_listeners(NewConf, OldConf) -> + %% remove listeners + maps:foreach( + fun(GwName, GwConf) -> + Listeners = maps:get(<<"listeners">>, GwConf, #{}), + remove_listeners(GwName, NewConf, OldConf, Listeners) + end, + OldConf + ), + %% add/update listeners + maps:map( + fun(GwName, GwConf) -> + Listeners = maps:get(<<"listeners">>, GwConf, #{}), + NewListeners = create_or_update_listeners(GwName, OldConf, Listeners), + maps:put(<<"listeners">>, NewListeners, GwConf) + end, + NewConf + ). + +create_or_update_listeners(GwName, OldConf, Listeners) -> + maps:map( + fun(LType, LConf) -> + maps:map( + fun(LName, LConf1) -> + NConf = + case get_listener(GwName, LType, LName, OldConf) of + undefined -> + {ok, NConf0} = + pre_config_update( + ?GATEWAY, + {add_listener, GwName, {LType, LName}, LConf1}, + OldConf + ), + NConf0; + _ -> + {ok, NConf0} = + pre_config_update( + ?GATEWAY, + {update_listener, GwName, {LType, LName}, LConf1}, + OldConf + ), + NConf0 + end, + get_listener(GwName, LType, LName, NConf) + end, + LConf + ) + end, + Listeners + ). + +remove_listeners(GwName, NewConf, OldConf, Listeners) -> + maps:foreach( + fun(LType, LConf) -> + maps:foreach( + fun(LName, _LConf1) -> + case get_listener(GwName, LType, LName, NewConf) of + undefined -> + pre_config_update( + ?GATEWAY, {remove_listener, GwName, {LType, LName}}, OldConf + ); + _ -> + ok + end + end, + LConf + ) + end, + Listeners + ). + +get_listener(GwName, LType, LName, NewConf) -> + emqx_utils_maps:deep_get( + [GwName, <<"listeners">>, LType, LName], NewConf, undefined + ). + +get_authn(GwName, Conf) -> + emqx_utils_maps:deep_get([GwName, ?AUTHN_BIN], Conf, undefined). + +pre_load_authentications(NewConf, OldConf) -> + %% remove authentications when not in new config + maps:foreach( + fun(GwName, OldGwConf) -> + case + maps:get(?AUTHN_BIN, OldGwConf, undefined) =/= undefined andalso + get_authn(GwName, NewConf) =:= undefined + of + true -> + pre_config_update(?GATEWAY, {remove_authn, GwName}, OldConf); + false -> + ok + end + end, + OldConf + ), + %% add/update authentications + maps:map( + fun(GwName, NewGwConf) -> + case get_authn(GwName, OldConf) of + undefined -> + case maps:get(?AUTHN_BIN, NewGwConf, undefined) of + undefined -> + NewGwConf; + AuthN -> + {ok, #{GwName := #{?AUTHN_BIN := NAuthN}}} = + pre_config_update(?GATEWAY, {add_authn, GwName, AuthN}, OldConf), + maps:put(?AUTHN_BIN, NAuthN, NewGwConf) + end; + OldAuthN -> + case maps:get(?AUTHN_BIN, NewGwConf, undefined) of + undefined -> + NewGwConf; + OldAuthN -> + NewGwConf; + NewAuthN -> + {ok, #{GwName := #{?AUTHN_BIN := NAuthN}}} = + pre_config_update( + ?GATEWAY, {update_authn, GwName, NewAuthN}, OldConf + ), + maps:put(?AUTHN_BIN, NAuthN, NewGwConf) + end + end + end, + NewConf + ). + badres_gateway(not_found, GwName) -> {error, {badres, #{ @@ -642,7 +786,7 @@ badres_listener_authn(already_exist, GwName, LType, LName) -> ) -> ok | {ok, Result :: any()} | {error, Reason :: term()}. -post_config_update(_, Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) -> +post_config_update(?GATEWAY, Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) -> [_Tag, GwName0 | _] = tuple_to_list(Req), GwName = binary_to_existing_atom(GwName0), @@ -657,11 +801,35 @@ post_config_update(_, Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) -> {New, Old} when is_map(New), is_map(Old) -> emqx_gateway:update(GwName, New) end; -post_config_update(_, _Req, _NewConfig, _OldConfig, _AppEnvs) -> +post_config_update(?GATEWAY, _Req = #{}, NewConfig, OldConfig, _AppEnvs) -> + %% unload gateways + maps:foreach( + fun(GwName, _OldGwConf) -> + case maps:get(GwName, NewConfig, undefined) of + undefined -> + emqx_gateway:unload(GwName); + _ -> + ok + end + end, + OldConfig + ), + %% load/update gateways + maps:foreach( + fun(GwName, NewGwConf) -> + case maps:get(GwName, OldConfig, undefined) of + undefined -> + emqx_gateway:load(GwName, NewGwConf); + _ -> + emqx_gateway:update(GwName, NewGwConf) + end + end, + NewConfig + ), ok. %%-------------------------------------------------------------------- -%% Internal funcs +%% Internal functions %%-------------------------------------------------------------------- tune_gw_certs(Fun, GwName, Conf) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_metrics.erl b/apps/emqx_gateway/src/emqx_gateway_metrics.erl index 0aa2ff210..ef17b0954 100644 --- a/apps/emqx_gateway/src/emqx_gateway_metrics.erl +++ b/apps/emqx_gateway/src/emqx_gateway_metrics.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --include_lib("emqx_gateway/include/emqx_gateway.hrl"). +-include("emqx_gateway.hrl"). %% APIs -export([start_link/1]). diff --git a/apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl index ce709efc3..2b169f64b 100644 --- a/apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl @@ -277,6 +277,48 @@ t_load_unload_gateway(_) -> {config_not_found, [<<"gateway">>, stomp]}, emqx:get_raw_config([gateway, stomp]) ), + %% test update([gateway], Conf) + Raw0 = emqx:get_raw_config([gateway]), + #{<<"listeners">> := StompConfL1} = StompConf1, + StompConf11 = StompConf1#{ + <<"listeners">> => emqx_gateway_conf:unconvert_listeners(StompConfL1) + }, + #{<<"listeners">> := StompConfL2} = StompConf2, + StompConf22 = StompConf2#{ + <<"listeners">> => emqx_gateway_conf:unconvert_listeners(StompConfL2) + }, + Raw1 = Raw0#{<<"stomp">> => StompConf11}, + Raw2 = Raw0#{<<"stomp">> => StompConf22}, + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)), + assert_confs(StompConf1, emqx:get_raw_config([gateway, stomp])), + ?assertMatch( + #{ + config := #{ + authentication := #{backend := built_in_database, enable := true}, + listeners := #{tcp := #{default := #{bind := 61613}}}, + mountpoint := <<"t/">>, + idle_timeout := 10000 + } + }, + emqx_gateway:lookup('stomp') + ), + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw2)), + assert_confs(StompConf2, emqx:get_raw_config([gateway, stomp])), + ?assertMatch( + #{ + config := + #{ + authentication := #{backend := built_in_database, enable := true}, + listeners := #{tcp := #{default := #{bind := 61613}}}, + idle_timeout := 20000, + mountpoint := <<"t2/">> + } + }, + emqx_gateway:lookup('stomp') + ), + %% reset + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)), + ?assertEqual(undefined, emqx_gateway:lookup('stomp')), ok. t_load_remove_authn(_) -> @@ -310,6 +352,40 @@ t_load_remove_authn(_) -> {config_not_found, [<<"gateway">>, stomp, authentication]}, emqx:get_raw_config([gateway, stomp, authentication]) ), + %% test update([gateway], Conf) + Raw0 = emqx:get_raw_config([gateway]), + #{<<"listeners">> := StompConfL} = StompConf, + StompConf1 = StompConf#{ + <<"listeners">> => emqx_gateway_conf:unconvert_listeners(StompConfL), + <<"authentication">> => ?CONF_STOMP_AUTHN_1 + }, + Raw1 = maps:put(<<"stomp">>, StompConf1, Raw0), + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)), + assert_confs(StompConf1, emqx:get_raw_config([gateway, stomp])), + ?assertMatch( + #{ + stomp := + #{ + authn := <<"password_based:built_in_database">>, + listeners := [#{authn := <<"undefined">>, type := tcp}], + num_clients := 0 + } + }, + emqx_gateway:get_basic_usage_info() + ), + %% reset(remove authn) + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)), + ?assertMatch( + #{ + stomp := + #{ + authn := <<"undefined">>, + listeners := [#{authn := <<"undefined">>, type := tcp}], + num_clients := 0 + } + }, + emqx_gateway:get_basic_usage_info() + ), ok. t_load_remove_listeners(_) -> @@ -324,6 +400,7 @@ t_load_remove_listeners(_) -> {<<"tcp">>, <<"default">>}, ?CONF_STOMP_LISTENER_1 ), + assert_confs( maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_1)), emqx:get_raw_config([gateway, stomp]) @@ -355,6 +432,59 @@ t_load_remove_listeners(_) -> {config_not_found, [<<"gateway">>, stomp, listeners, tcp, default]}, emqx:get_raw_config([gateway, stomp, listeners, tcp, default]) ), + %% test update([gateway], Conf) + Raw0 = emqx:get_raw_config([gateway]), + Raw1 = emqx_utils_maps:deep_put( + [<<"stomp">>, <<"listeners">>, <<"tcp">>, <<"default">>], Raw0, ?CONF_STOMP_LISTENER_1 + ), + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)), + assert_confs( + maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_1)), + emqx:get_raw_config([gateway, stomp]) + ), + ?assertMatch( + #{ + stomp := + #{ + authn := <<"password_based:built_in_database">>, + listeners := [#{authn := <<"undefined">>, type := tcp}], + num_clients := 0 + } + }, + emqx_gateway:get_basic_usage_info() + ), + Raw2 = emqx_utils_maps:deep_put( + [<<"stomp">>, <<"listeners">>, <<"tcp">>, <<"default">>], Raw0, ?CONF_STOMP_LISTENER_2 + ), + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw2)), + assert_confs( + maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_2)), + emqx:get_raw_config([gateway, stomp]) + ), + ?assertMatch( + #{ + stomp := + #{ + authn := <<"password_based:built_in_database">>, + listeners := [#{authn := <<"undefined">>, type := tcp}], + num_clients := 0 + } + }, + emqx_gateway:get_basic_usage_info() + ), + %% reset(remove listener) + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)), + ?assertMatch( + #{ + stomp := + #{ + authn := <<"password_based:built_in_database">>, + listeners := [], + num_clients := 0 + } + }, + emqx_gateway:get_basic_usage_info() + ), ok. t_load_remove_listener_authn(_) -> @@ -417,6 +547,7 @@ t_load_gateway_with_certs_content(_) -> [<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>], emqx:get_raw_config([gateway, stomp]) ), + assert_ssl_confs_files_exist(SslConf), ok = emqx_gateway_conf:unload_gateway(<<"stomp">>), assert_ssl_confs_files_deleted(SslConf), ?assertException( @@ -424,6 +555,25 @@ t_load_gateway_with_certs_content(_) -> {config_not_found, [<<"gateway">>, stomp]}, emqx:get_raw_config([gateway, stomp]) ), + %% test update([gateway], Conf) + Raw0 = emqx:get_raw_config([gateway]), + #{<<"listeners">> := StompConfL} = StompConf, + StompConf1 = StompConf#{ + <<"listeners">> => emqx_gateway_conf:unconvert_listeners(StompConfL) + }, + Raw1 = emqx_utils_maps:deep_put([<<"stomp">>], Raw0, StompConf1), + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)), + assert_ssl_confs_files_exist(SslConf), + ?assertEqual( + SslConf, + emqx_utils_maps:deep_get( + [<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>], + emqx:get_raw_config([gateway, stomp]) + ) + ), + %% reset + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)), + assert_ssl_confs_files_deleted(SslConf), ok. %% TODO: Comment out this test case for now, because emqx_tls_lib @@ -475,6 +625,7 @@ t_add_listener_with_certs_content(_) -> [<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>], emqx:get_raw_config([gateway, stomp]) ), + assert_ssl_confs_files_exist(SslConf), ok = emqx_gateway_conf:remove_listener( <<"stomp">>, {<<"ssl">>, <<"default">>} ), @@ -492,6 +643,34 @@ t_add_listener_with_certs_content(_) -> {config_not_found, [<<"gateway">>, stomp, listeners, ssl, default]}, emqx:get_raw_config([gateway, stomp, listeners, ssl, default]) ), + + %% test update([gateway], Conf) + Raw0 = emqx:get_raw_config([gateway]), + Raw1 = emqx_utils_maps:deep_put( + [<<"stomp">>, <<"listeners">>, <<"ssl">>, <<"default">>], Raw0, ?CONF_STOMP_LISTENER_SSL + ), + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)), + SslConf1 = emqx_utils_maps:deep_get( + [<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>], + emqx:get_raw_config([gateway, stomp]) + ), + assert_ssl_confs_files_exist(SslConf1), + %% update + Raw2 = emqx_utils_maps:deep_put( + [<<"stomp">>, <<"listeners">>, <<"ssl">>, <<"default">>], Raw0, ?CONF_STOMP_LISTENER_SSL_2 + ), + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw2)), + SslConf2 = + emqx_utils_maps:deep_get( + [<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>], + emqx:get_raw_config([gateway, stomp]) + ), + assert_ssl_confs_files_exist(SslConf2), + %% reset + ?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)), + assert_ssl_confs_files_deleted(SslConf), + assert_ssl_confs_files_deleted(SslConf1), + assert_ssl_confs_files_deleted(SslConf2), ok. assert_ssl_confs_files_deleted(SslConf) when is_map(SslConf) -> @@ -503,6 +682,15 @@ assert_ssl_confs_files_deleted(SslConf) when is_map(SslConf) -> end, Ks ). +assert_ssl_confs_files_exist(SslConf) when is_map(SslConf) -> + Ks = [<<"cacertfile">>, <<"certfile">>, <<"keyfile">>], + lists:foreach( + fun(K) -> + Path = maps:get(K, SslConf), + {ok, _} = file:read_file(Path) + end, + Ks + ). %%-------------------------------------------------------------------- %% Utils diff --git a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl index bb378ef10..2e8a3a583 100644 --- a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl +++ b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl @@ -83,13 +83,13 @@ do_assert_confs(Key, Expected, Effected) -> maybe_unconvert_listeners(Conf) when is_map(Conf) -> case maps:take(<<"listeners">>, Conf) of - error -> - Conf; - {Ls, Conf1} -> + {Ls, Conf1} when is_list(Ls) -> Conf1#{ <<"listeners">> => emqx_gateway_conf:unconvert_listeners(Ls) - } + }; + _ -> + Conf end; maybe_unconvert_listeners(Conf) -> Conf. From 69b98c183078d7a0f49c6d61d42873dc86394ecd Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 5 Jun 2023 23:28:48 +0300 Subject: [PATCH 13/17] fix(ft): ensure temp filenames are under 255 bytes long --- apps/emqx_ft/src/emqx_ft_fs_util.erl | 23 +++++++++++++++++++ apps/emqx_ft/src/emqx_ft_schema.erl | 4 ++-- .../src/emqx_ft_storage_exporter_fs.erl | 3 +-- apps/emqx_ft/src/emqx_ft_storage_fs.erl | 12 ++-------- apps/emqx_ft/test/emqx_ft_SUITE.erl | 3 ++- apps/emqx_ft/test/emqx_ft_conf_SUITE.erl | 6 ++--- apps/emqx_ft/test/emqx_ft_fs_util_tests.erl | 21 +++++++++++++++++ 7 files changed, 54 insertions(+), 18 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_fs_util.erl b/apps/emqx_ft/src/emqx_ft_fs_util.erl index 9028722aa..544585110 100644 --- a/apps/emqx_ft/src/emqx_ft_fs_util.erl +++ b/apps/emqx_ft/src/emqx_ft_fs_util.erl @@ -29,6 +29,8 @@ -export([fold/4]). +-export([mk_temp_filename/1]). + -type foldfun(Acc) :: fun( ( @@ -178,3 +180,24 @@ fold(FoldFun, Acc, It) -> none -> Acc end. + +-spec mk_temp_filename(file:filename()) -> + file:filename(). +mk_temp_filename(Filename) -> + % NOTE + % Using only the first 200 characters of the filename to avoid making filenames + % exceeding 255 bytes in UTF-8. It's actually too conservative, `Suffix` can be + % at most 16 bytes. + Unique = erlang:unique_integer([positive]), + Suffix = binary:encode_hex(<>), + mk_filename([string:slice(Filename, 0, 200), ".", Suffix]). + +mk_filename(Comps) -> + lists:append(lists:map(fun mk_filename_component/1, Comps)). + +mk_filename_component(A) when is_atom(A) -> + atom_to_list(A); +mk_filename_component(B) when is_binary(B) -> + unicode:characters_to_list(B); +mk_filename_component(S) when is_list(S) -> + S. diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index 37508fe3e..2b98562b4 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -145,7 +145,7 @@ fields(local_storage_segments) -> [ {root, mk( - binary(), + string(), #{ desc => ?DESC("local_storage_segments_root"), required => false @@ -182,7 +182,7 @@ fields(local_storage_exporter) -> [ {root, mk( - binary(), + string(), #{ desc => ?DESC("local_storage_exporter_root"), required => false diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl index 702bc35ce..ae709dd52 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -452,8 +452,7 @@ mk_manifest_filename(Filename) when is_binary(Filename) -> <>. mk_temp_absfilepath(Options, Transfer, Filename) -> - Unique = erlang:unique_integer([positive]), - TempFilename = integer_to_list(Unique) ++ "." ++ Filename, + TempFilename = emqx_ft_fs_util:mk_temp_filename(Filename), filename:join(mk_absdir(Options, Transfer, temporary), TempFilename). mk_absdir(Options, _Transfer, temporary) -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 99720f521..e84d35328 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -445,16 +445,8 @@ write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) end. mk_temp_filepath(Storage, Transfer, Filename) -> - Unique = erlang:unique_integer([positive]), - filename:join(get_subdir(Storage, Transfer, temporary), mk_filename([Unique, ".", Filename])). - -mk_filename(Comps) -> - lists:append(lists:map(fun mk_filename_component/1, Comps)). - -mk_filename_component(I) when is_integer(I) -> integer_to_list(I); -mk_filename_component(A) when is_atom(A) -> atom_to_list(A); -mk_filename_component(B) when is_binary(B) -> unicode:characters_to_list(B); -mk_filename_component(S) when is_list(S) -> S. + TempFilename = emqx_ft_fs_util:mk_temp_filename(Filename), + filename:join(get_subdir(Storage, Transfer, temporary), TempFilename). write_contents(Filepath, Content) -> file:write_file(Filepath, Content). diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index ae274cd86..6861038e9 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -270,7 +270,8 @@ t_nasty_filenames(_Config) -> Filenames = [ {<<"nasty1">>, "146%"}, {<<"nasty2">>, "🌚"}, - {<<"nasty3">>, "中文.txt"} + {<<"nasty3">>, "中文.txt"}, + {<<"nasty4">>, _254Bytes = string:join(lists:duplicate(255 div 5, "LONG"), ".")} ], ok = lists:foreach( fun({ClientId, Filename}) -> diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl index 1f53f88af..f235b5ebb 100644 --- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl @@ -87,7 +87,7 @@ t_update_config(_Config) -> ) ), ?assertEqual( - <<"/tmp/path">>, + "/tmp/path", emqx_config:get([file_transfer, storage, local, segments, root]) ), ?assertEqual( @@ -150,7 +150,7 @@ t_disable_restore_config(Config) -> ), ok = emqtt:stop(Client), % Restore local storage backend - Root = iolist_to_binary(emqx_ft_test_helpers:root(Config, node(), [segments])), + Root = emqx_ft_test_helpers:root(Config, node(), [segments]), ?assertMatch( {ok, _}, emqx_conf:update( @@ -177,7 +177,7 @@ t_disable_restore_config(Config) -> [ #{ ?snk_kind := garbage_collection, - storage := #{segments := #{root := Root}} + storage := #{segments := #{gc := #{interval := 1000}}} } ], ?of_kind(garbage_collection, Trace) diff --git a/apps/emqx_ft/test/emqx_ft_fs_util_tests.erl b/apps/emqx_ft/test/emqx_ft_fs_util_tests.erl index 1939e74c6..f26dea7e7 100644 --- a/apps/emqx_ft/test/emqx_ft_fs_util_tests.erl +++ b/apps/emqx_ft/test/emqx_ft_fs_util_tests.erl @@ -63,3 +63,24 @@ unescape_filename_test_() -> ?_assertEqual(Input, emqx_ft_fs_util:unescape_filename(Filename)) || {Filename, Input} <- ?NAMES ]. + +mk_temp_filename_test_() -> + [ + ?_assertMatch( + "." ++ Suffix when length(Suffix) == 16, + emqx_ft_fs_util:mk_temp_filename(<<>>) + ), + ?_assertMatch( + "file.name." ++ Suffix when length(Suffix) == 16, + emqx_ft_fs_util:mk_temp_filename("file.name") + ), + ?_assertMatch( + "safe.🦺." ++ Suffix when length(Suffix) == 16, + emqx_ft_fs_util:mk_temp_filename(<<"safe.🦺"/utf8>>) + ), + ?_assertEqual( + % FilenameSlice + Dot + Suffix + 200 + 1 + 16, + length(emqx_ft_fs_util:mk_temp_filename(lists:duplicate(63, "LONG"))) + ) + ]. From dcd59e4f1b069498dd669d9f4efbffa51a624f79 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 6 Jun 2023 10:40:20 +0300 Subject: [PATCH 14/17] fix(ft): set more conservative filename length limit Otherwise, local fs exporter will have a hard time preserving the filemeta, because its filename is even 13 bytes longer. --- apps/emqx_ft/src/emqx_ft_schema.erl | 4 +++- apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl | 12 +++++++++++- apps/emqx_ft/test/emqx_ft_SUITE.erl | 4 +++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index 2b98562b4..c1ee41d0d 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -42,7 +42,9 @@ %% on most filesystems. Even though, say, S3 does not have such limitations, it's %% still useful to have a limit on the filename length, to avoid having to deal with %% limits in the storage backends. --define(MAX_FILENAME_BYTELEN, 255). +%% Usual realistic limit is 255 bytes actually, but we leave some room for backends +%% to spare. +-define(MAX_FILENAME_BYTELEN, 240). -import(hoconsc, [ref/2, mk/2]). diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl index ae709dd52..e211cb421 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -128,7 +128,17 @@ complete( Filemeta = FilemetaIn#{checksum => Checksum}, ok = file:close(Handle), _ = filelib:ensure_dir(ResultFilepath), - _ = file:write_file(mk_manifest_filename(ResultFilepath), encode_filemeta(Filemeta)), + ManifestFilepath = mk_manifest_filename(ResultFilepath), + case file:write_file(ManifestFilepath, encode_filemeta(Filemeta)) of + ok -> + ok; + {error, Reason} -> + ?SLOG(warning, "filemeta_write_failed", #{ + path => ManifestFilepath, + meta => Filemeta, + reason => Reason + }) + end, file:rename(Filepath, ResultFilepath). -spec discard(export_st()) -> diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 6861038e9..c48c77d93 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -261,6 +261,7 @@ t_nasty_clientids_fileids(_Config) -> fun({ClientId, FileId}) -> ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "justfile", ClientId), [Export] = list_files(ClientId), + ?assertMatch(#{meta := #{name := "justfile"}}, Export), ?assertEqual({ok, ClientId}, read_export(Export)) end, Transfers @@ -271,13 +272,14 @@ t_nasty_filenames(_Config) -> {<<"nasty1">>, "146%"}, {<<"nasty2">>, "🌚"}, {<<"nasty3">>, "中文.txt"}, - {<<"nasty4">>, _254Bytes = string:join(lists:duplicate(255 div 5, "LONG"), ".")} + {<<"nasty4">>, _239Bytes = string:join(lists:duplicate(240 div 5, "LONG"), ".")} ], ok = lists:foreach( fun({ClientId, Filename}) -> FileId = unicode:characters_to_binary(Filename), ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Filename, FileId), [Export] = list_files(ClientId), + ?assertMatch(#{meta := #{name := Filename}}, Export), ?assertEqual({ok, FileId}, read_export(Export)) end, Filenames From bcc47442eb10d980a11a8ddf254f6523a4d781f7 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 6 Jun 2023 18:44:27 +0800 Subject: [PATCH 15/17] fix(mqttsn): make mountpoint works for publish --- .../src/emqx_gateway_mqttsn.app.src | 2 +- .../src/emqx_mqttsn_channel.erl | 5 ++- .../test/emqx_sn_protocol_SUITE.erl | 45 +++++++++++++++++++ 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src index 76f0f45b5..b43201e1a 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src +++ b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_mqttsn, [ {description, "MQTT-SN Gateway"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index 914f837e1..84334875b 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -1111,15 +1111,16 @@ check_pub_authz( convert_pub_to_msg( {TopicName, Flags, Data}, - Channel = #channel{clientinfo = #{clientid := ClientId}} + Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint}} ) -> #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, NewQoS = get_corrected_qos(QoS), + NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName), Message = put_message_headers( emqx_message:make( ClientId, NewQoS, - TopicName, + NTopicName, Data, #{dup => Dup, retain => Retain}, #{} diff --git a/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl index cce4ce904..b22d7b4b0 100644 --- a/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl @@ -120,6 +120,13 @@ restart_mqttsn_with_subs_resume_off() -> Conf#{<<"subs_resume">> => <<"false">>} ). +restart_mqttsn_with_mountpoint(Mp) -> + Conf = emqx:get_raw_config([gateway, mqttsn]), + emqx_gateway_conf:update_gateway( + mqttsn, + Conf#{<<"mountpoint">> => Mp} + ). + default_config() -> ?CONF_DEFAULT. @@ -990,6 +997,44 @@ t_publish_qos2_case03(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). +t_publish_mountpoint(_) -> + restart_mqttsn_with_mountpoint(<<"mp/">>), + Dup = 0, + QoS = 1, + Retain = 0, + Will = 0, + CleanSession = 0, + MsgId = 1, + TopicId1 = ?MAX_PRED_TOPIC_ID + 1, + Topic = <<"abc">>, + {ok, Socket} = gen_udp:open(0, [binary]), + ClientId = ?CLIENTID, + send_connect_msg(Socket, ClientId), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), + ?assertEqual( + <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), + + Payload1 = <<20, 21, 22, 23>>, + send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1), + ?assertEqual( + <<7, ?SN_PUBACK, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket) + ), + timer:sleep(100), + + ?assertEqual( + <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>, + receive_response(Socket) + ), + + send_disconnect_msg(Socket, undefined), + restart_mqttsn_with_mountpoint(<<>>), + gen_udp:close(Socket). + t_delivery_qos1_register_invalid_topic_id(_) -> Dup = 0, QoS = 1, From f26b372e51507aca327b9d04af2a7b5c3a5292b4 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 6 Jun 2023 18:47:41 +0800 Subject: [PATCH 16/17] chore: update changes --- changes/ce/fix-10951.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/fix-10951.en.md diff --git a/changes/ce/fix-10951.en.md b/changes/ce/fix-10951.en.md new file mode 100644 index 000000000..89dabb4a7 --- /dev/null +++ b/changes/ce/fix-10951.en.md @@ -0,0 +1 @@ +Fix the issue in MQTT-SN gateway where the `mountpoint` does not take effect on message publishing. From 1968589f81d543b8c6b68c855b0c5e488c9f7e4d Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 6 Jun 2023 14:02:58 +0300 Subject: [PATCH 17/17] fix(emqx_schema): don't allow enabling `fail_if_no_peer_cert` if `verify_none` is set Setting `fail_if_no_peer_cert = true` and `verify = verify_none` causes connection errors. Closes: EMQX-9586 --- apps/emqx/src/emqx_schema.erl | 18 +++++- apps/emqx/test/emqx_schema_tests.erl | 61 +++++++++++++++++++ apps/emqx_gateway/src/emqx_gateway_schema.erl | 13 +++- changes/ce/fix-10952.en.md | 8 +++ 4 files changed, 96 insertions(+), 4 deletions(-) create mode 100644 changes/ce/fix-10952.en.md diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 37d5350a5..521293f7a 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -94,7 +94,8 @@ validate_keepalive_multiplier/1, non_empty_string/1, validations/0, - naive_env_interpolation/1 + naive_env_interpolation/1, + validate_server_ssl_opts/1 ]). -export([qos/0]). @@ -958,7 +959,7 @@ fields("mqtt_wss_listener") -> {"ssl_options", sc( ref("listener_wss_opts"), - #{} + #{validator => fun validate_server_ssl_opts/1} )}, {"websocket", sc( @@ -2426,8 +2427,21 @@ server_ssl_opts_schema(Defaults, IsRanchListener) -> ] ]. +validate_server_ssl_opts(#{<<"fail_if_no_peer_cert">> := true, <<"verify">> := Verify}) -> + validate_verify(Verify); +validate_server_ssl_opts(#{fail_if_no_peer_cert := true, verify := Verify}) -> + validate_verify(Verify); +validate_server_ssl_opts(_SSLOpts) -> + ok. + +validate_verify(verify_peer) -> + ok; +validate_verify(_) -> + {error, "verify must be verify_peer when fail_if_no_peer_cert is true"}. + mqtt_ssl_listener_ssl_options_validator(Conf) -> Checks = [ + fun validate_server_ssl_opts/1, fun ocsp_outer_validator/1, fun crl_outer_validator/1 ], diff --git a/apps/emqx/test/emqx_schema_tests.erl b/apps/emqx/test/emqx_schema_tests.erl index ad2341460..58f9a94d5 100644 --- a/apps/emqx/test/emqx_schema_tests.erl +++ b/apps/emqx/test/emqx_schema_tests.erl @@ -106,6 +106,67 @@ bad_cipher_test() -> ), ok. +fail_if_no_peer_cert_test_() -> + Sc = #{ + roots => [mqtt_ssl_listener], + fields => #{mqtt_ssl_listener => emqx_schema:fields("mqtt_ssl_listener")} + }, + Opts = #{atom_key => false, required => false}, + OptsAtomKey = #{atom_key => true, required => false}, + InvalidConf = #{ + <<"bind">> => <<"0.0.0.0:9883">>, + <<"ssl_options">> => #{ + <<"fail_if_no_peer_cert">> => true, + <<"verify">> => <<"verify_none">> + } + }, + InvalidListener = #{<<"mqtt_ssl_listener">> => InvalidConf}, + ValidListener = #{ + <<"mqtt_ssl_listener">> => InvalidConf#{ + <<"ssl_options">> => + #{ + <<"fail_if_no_peer_cert">> => true, + <<"verify">> => <<"verify_peer">> + } + } + }, + ValidListener1 = #{ + <<"mqtt_ssl_listener">> => InvalidConf#{ + <<"ssl_options">> => + #{ + <<"fail_if_no_peer_cert">> => false, + <<"verify">> => <<"verify_none">> + } + } + }, + Reason = "verify must be verify_peer when fail_if_no_peer_cert is true", + [ + ?_assertThrow( + {_Sc, [#{kind := validation_error, reason := Reason}]}, + hocon_tconf:check_plain(Sc, InvalidListener, Opts) + ), + ?_assertThrow( + {_Sc, [#{kind := validation_error, reason := Reason}]}, + hocon_tconf:check_plain(Sc, InvalidListener, OptsAtomKey) + ), + ?_assertMatch( + #{mqtt_ssl_listener := #{}}, + hocon_tconf:check_plain(Sc, ValidListener, OptsAtomKey) + ), + ?_assertMatch( + #{mqtt_ssl_listener := #{}}, + hocon_tconf:check_plain(Sc, ValidListener1, OptsAtomKey) + ), + ?_assertMatch( + #{<<"mqtt_ssl_listener">> := #{}}, + hocon_tconf:check_plain(Sc, ValidListener, Opts) + ), + ?_assertMatch( + #{<<"mqtt_ssl_listener">> := #{}}, + hocon_tconf:check_plain(Sc, ValidListener1, Opts) + ) + ]. + validate(Schema, Data0) -> Sc = #{ roots => [ssl_opts], diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 8c80fc1fa..3c5706e82 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -120,7 +120,10 @@ fields(ssl_listener) -> {ssl_options, sc( hoconsc:ref(emqx_schema, "listener_ssl_opts"), - #{desc => ?DESC(ssl_listener_options)} + #{ + desc => ?DESC(ssl_listener_options), + validator => fun emqx_schema:validate_server_ssl_opts/1 + } )} ]; fields(udp_listener) -> @@ -132,7 +135,13 @@ fields(udp_listener) -> fields(dtls_listener) -> [{acceptors, sc(integer(), #{default => 16, desc => ?DESC(dtls_listener_acceptors)})}] ++ fields(udp_listener) ++ - [{dtls_options, sc(ref(dtls_opts), #{desc => ?DESC(dtls_listener_dtls_opts)})}]; + [ + {dtls_options, + sc(ref(dtls_opts), #{ + desc => ?DESC(dtls_listener_dtls_opts), + validator => fun emqx_schema:validate_server_ssl_opts/1 + })} + ]; fields(udp_opts) -> [ {active_n, diff --git a/changes/ce/fix-10952.en.md b/changes/ce/fix-10952.en.md new file mode 100644 index 000000000..20792906f --- /dev/null +++ b/changes/ce/fix-10952.en.md @@ -0,0 +1,8 @@ +Disallow enabling `fail_if_no_peer_cert` in listener SSL options if `verify_none` is set. + +Setting `fail_if_no_peer_cert = true` and `verify = verify_none` caused connection errors +due to incompatible options. +This fix validates the options when creating or updating a listener to avoid these errors. + +Note: any old listener configuration with `fail_if_no_peer_cert = true` and `verify = verify_none` +that was previously allowed will fail to load after applying this fix and must be manually fixed.