diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index 1cdb563aa..47ff384c9 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -239,14 +239,30 @@ remove_config([RootName | _] = KeyPath, Opts) -> -spec reset_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -reset_config([RootName | _] = KeyPath, Opts) -> +reset_config([RootName | SubKeys] = KeyPath, Opts) -> case emqx_config:get_default_value(KeyPath) of {ok, Default} -> - emqx_config_handler:update_config( - emqx_config:get_schema_mod(RootName), - KeyPath, - {{update, Default}, Opts} - ); + Mod = emqx_config:get_schema_mod(RootName), + case SubKeys =:= [] of + true -> + emqx_config_handler:update_config( + Mod, + KeyPath, + {{update, Default}, Opts} + ); + false -> + NewConf = + emqx_utils_maps:deep_put( + SubKeys, + emqx_config:get_raw([RootName], #{}), + Default + ), + emqx_config_handler:update_config( + Mod, + [RootName], + {{update, NewConf}, Opts} + ) + end; {error, _} = Error -> Error end. diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index 0bad19f9e..5576f579d 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -31,8 +31,7 @@ remove_handler/1, update_config/3, get_raw_cluster_override_conf/0, - info/0, - merge_to_old_config/2 + info/0 ]). %% gen_server callbacks @@ -332,31 +331,16 @@ do_post_config_update( SubOldConf = get_sub_config(ConfKey, OldConf), SubNewConf = get_sub_config(ConfKey, NewConf), SubHandlers = get_sub_handlers(ConfKey, Handlers), - case - do_post_config_update( - SubConfKeyPath, - SubHandlers, - SubOldConf, - SubNewConf, - AppEnvs, - UpdateArgs, - Result, - ConfKeyPath - ) - of - {ok, Result1} -> - call_post_config_update( - Handlers, - OldConf, - NewConf, - AppEnvs, - up_req(UpdateArgs), - Result1, - ConfKeyPath - ); - Error -> - Error - end. + do_post_config_update( + SubConfKeyPath, + SubHandlers, + SubOldConf, + SubNewConf, + AppEnvs, + UpdateArgs, + Result, + ConfKeyPath + ). get_sub_handlers(ConfKey, Handlers) -> case maps:find(ConfKey, Handlers) of diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 7aaf93c99..54b3b3ca9 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -277,7 +277,7 @@ wait_for_app_processes(_) -> %% and stop others, and then the `application:start/2' callback is %% never called again for this application. perform_sanity_checks(emqx_rule_engine) -> - ensure_config_handler(emqx_rule_engine, [rule_engine, rules]), + ensure_config_handler(emqx_rule_engine, [rule_engine, rules, '?']), ok; perform_sanity_checks(emqx_bridge) -> ensure_config_handler(emqx_bridge, [bridges]), @@ -289,7 +289,7 @@ ensure_config_handler(Module, ConfigPath) -> #{handlers := Handlers} = sys:get_state(emqx_config_handler), case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of #{{mod} := Module} -> ok; - _NotFound -> error({config_handler_missing, ConfigPath, Module}) + NotFound -> error({config_handler_missing, ConfigPath, Module, NotFound}) end, ok. diff --git a/apps/emqx/test/emqx_config_handler_SUITE.erl b/apps/emqx/test/emqx_config_handler_SUITE.erl index e21c1867f..194198571 100644 --- a/apps/emqx/test/emqx_config_handler_SUITE.erl +++ b/apps/emqx/test/emqx_config_handler_SUITE.erl @@ -130,7 +130,7 @@ t_root_key_update(_Config) -> ?assertEqual( {ok, #{ config => 0.81, - post_config_update => #{?MODULE => ok}, + post_config_update => #{}, raw_config => <<"81%">> }}, emqx:update_config(SubKey, "81%", Opts) @@ -174,7 +174,7 @@ t_sub_key_update_remove(_Config) -> %% remove ?assertEqual( - {ok, #{post_config_update => #{emqx_config_handler_SUITE => ok}}}, + {ok, #{post_config_update => #{?MODULE => ok}}}, emqx:remove_config(KeyPath) ), ?assertError( @@ -184,18 +184,6 @@ t_sub_key_update_remove(_Config) -> ?assertEqual(false, lists:member(<<"cpu_check_interval">>, OSKey)), ?assert(length(OSKey) > 0), - ?assertEqual( - {ok, #{ - config => 60000, - post_config_update => #{?MODULE => ok}, - raw_config => <<"60s">> - }}, - emqx:reset_config(KeyPath, Opts) - ), - OSKey1 = maps:keys(emqx:get_raw_config([sysmon, os])), - ?assertEqual(true, lists:member(<<"cpu_check_interval">>, OSKey1)), - ?assert(length(OSKey1) > 1), - ok = emqx_config_handler:remove_handler(KeyPath), ok = emqx_config_handler:remove_handler(KeyPath2), ok. @@ -292,44 +280,6 @@ t_get_raw_cluster_override_conf(_Config) -> ?assertEqual(OldInfo, NewInfo), ok. -t_save_config_failed(_Config) -> - ok. - -t_update_sub(_Config) -> - PathKey = [sysmon], - Opts = #{rawconf_with_defaults => true}, - ok = emqx_config_handler:add_handler(PathKey, ?MODULE), - %% update sub key - #{<<"os">> := OS1} = emqx:get_raw_config(PathKey), - {ok, Res} = emqx:update_config(PathKey ++ [os, cpu_check_interval], <<"120s">>, Opts), - ?assertMatch( - #{ - config := 120000, - post_config_update := #{?MODULE := ok}, - raw_config := <<"120s">> - }, - Res - ), - ?assertMatch(#{os := #{cpu_check_interval := 120000}}, emqx:get_config(PathKey)), - #{<<"os">> := OS2} = emqx:get_raw_config(PathKey), - ?assertEqual(lists:sort(maps:keys(OS1)), lists:sort(maps:keys(OS2))), - - %% update sub key - SubKey = PathKey ++ [os, cpu_high_watermark], - ?assertEqual( - {ok, #{ - config => 0.81, - post_config_update => #{?MODULE => ok}, - raw_config => <<"81%">> - }}, - emqx:update_config(SubKey, "81%", Opts) - ), - ?assertEqual(0.81, emqx:get_config(SubKey)), - ?assertEqual("81%", emqx:get_raw_config(SubKey)), - - ok = emqx_config_handler:remove_handler(PathKey), - ok. - pre_config_update([sysmon], UpdateReq, _RawConf) -> {ok, UpdateReq}; pre_config_update([sysmon, os], UpdateReq, _RawConf) -> diff --git a/apps/emqx_authz/include/emqx_authz.hrl b/apps/emqx_authz/include/emqx_authz.hrl index b7afd5e84..967865868 100644 --- a/apps/emqx_authz/include/emqx_authz.hrl +++ b/apps/emqx_authz/include/emqx_authz.hrl @@ -43,6 +43,7 @@ -define(CMD_MOVE_BEFORE(Before), {before, Before}). -define(CMD_MOVE_AFTER(After), {'after', After}). +-define(ROOT_KEY, [authorization]). -define(CONF_KEY_PATH, [authorization, sources]). -define(RE_PLACEHOLDER, "\\$\\{[a-z0-9_]+\\}"). diff --git a/apps/emqx_authz/src/emqx_authz.app.src b/apps/emqx_authz/src/emqx_authz.app.src index 7620f5548..929f292a7 100644 --- a/apps/emqx_authz/src/emqx_authz.app.src +++ b/apps/emqx_authz/src/emqx_authz.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_authz, [ {description, "An OTP application"}, - {vsn, "0.1.21"}, + {vsn, "0.1.22"}, {registered, []}, {mod, {emqx_authz_app, []}}, {applications, [ diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 7ceacdb68..a8c678be1 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -101,6 +101,7 @@ init() -> ok = register_metrics(), ok = init_metrics(client_info_source()), emqx_conf:add_handler(?CONF_KEY_PATH, ?MODULE), + emqx_conf:add_handler(?ROOT_KEY, ?MODULE), Sources = emqx_conf:get(?CONF_KEY_PATH, []), ok = check_dup_types(Sources), NSources = create_sources(Sources), @@ -109,6 +110,7 @@ init() -> deinit() -> ok = emqx_hooks:del('client.authorize', {?MODULE, authorize}), emqx_conf:remove_handler(?CONF_KEY_PATH), + emqx_conf:remove_handler(?ROOT_KEY), emqx_authz_utils:cleanup_resources(). lookup() -> @@ -139,14 +141,29 @@ update({?CMD_DELETE, Type}, Sources) -> update(Cmd, Sources) -> emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}). -pre_config_update(_, Cmd, Sources) -> - try do_pre_config_update(Cmd, Sources) of +pre_config_update(Path, Cmd, Sources) -> + try do_pre_config_update(Path, Cmd, Sources) of {error, Reason} -> {error, Reason}; NSources -> {ok, NSources} catch _:Reason -> {error, Reason} end. +do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources) -> + do_pre_config_update(Cmd, Sources); +do_pre_config_update(?ROOT_KEY, NewConf, OldConf) -> + do_pre_config_replace(NewConf, OldConf). + +%% override the entire config when updating the root key +%% emqx_conf:update(?ROOT_KEY, Conf); +do_pre_config_replace(Conf, Conf) -> + Conf; +do_pre_config_replace(NewConf, OldConf) -> + #{<<"sources">> := NewSources} = NewConf, + #{<<"sources">> := OldSources} = OldConf, + NewSources1 = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources), + NewConf#{<<"sources">> := NewSources1}. + do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources) -> do_move(Cmd, Sources); do_pre_config_update({?CMD_PREPEND, Source}, Sources) -> @@ -179,47 +196,53 @@ do_pre_config_update({Op, Source}, Sources) -> post_config_update(_, _, undefined, _OldSource, _AppEnvs) -> ok; -post_config_update(_, Cmd, NewSources, _OldSource, _AppEnvs) -> - Actions = do_post_config_update(Cmd, NewSources), +post_config_update(Path, Cmd, NewSources, _OldSource, _AppEnvs) -> + Actions = do_post_config_update(Path, Cmd, NewSources), ok = update_authz_chain(Actions), ok = emqx_authz_cache:drain_cache(). -do_post_config_update({?CMD_MOVE, _Type, _Where} = Cmd, _Sources) -> - InitedSources = lookup(), - do_move(Cmd, InitedSources); -do_post_config_update({?CMD_PREPEND, RawNewSource}, Sources) -> - InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)), - %% create metrics +do_post_config_update(?CONF_KEY_PATH, {?CMD_MOVE, _Type, _Where} = Cmd, _Sources) -> + do_move(Cmd, lookup()); +do_post_config_update(?CONF_KEY_PATH, {?CMD_PREPEND, RawNewSource}, Sources) -> TypeName = type(RawNewSource), - ok = emqx_metrics_worker:create_metrics( - authz_metrics, - TypeName, - [total, allow, deny, nomatch], - [total] - ), - [InitedNewSource] ++ lookup(); -do_post_config_update({?CMD_APPEND, RawNewSource}, Sources) -> - InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)), - lookup() ++ [InitedNewSource]; -do_post_config_update({{?CMD_REPLACE, Type}, RawNewSource}, Sources) -> + NewSources = create_sources([get_source_by_type(TypeName, Sources)]), + NewSources ++ lookup(); +do_post_config_update(?CONF_KEY_PATH, {?CMD_APPEND, RawNewSource}, Sources) -> + NewSources = create_sources([get_source_by_type(type(RawNewSource), Sources)]), + lookup() ++ NewSources; +do_post_config_update(?CONF_KEY_PATH, {{?CMD_REPLACE, Type}, RawNewSource}, Sources) -> OldSources = lookup(), {OldSource, Front, Rear} = take(Type, OldSources), NewSource = get_source_by_type(type(RawNewSource), Sources), InitedSources = update_source(type(RawNewSource), OldSource, NewSource), Front ++ [InitedSources] ++ Rear; -do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) -> +do_post_config_update(?CONF_KEY_PATH, {{?CMD_DELETE, Type}, _RawNewSource}, _Sources) -> OldInitedSources = lookup(), {OldSource, Front, Rear} = take(Type, OldInitedSources), - %% delete metrics - ok = emqx_metrics_worker:clear_metrics(authz_metrics, Type), - ok = ensure_resource_deleted(OldSource), - clear_certs(OldSource), + ok = ensure_deleted(OldSource, #{clear_metric => true}), Front ++ Rear; -do_post_config_update({?CMD_REPLACE, _RawNewSources}, Sources) -> - %% overwrite the entire config! - OldInitedSources = lookup(), - lists:foreach(fun ensure_resource_deleted/1, OldInitedSources), - lists:foreach(fun clear_certs/1, OldInitedSources), +do_post_config_update(?CONF_KEY_PATH, {?CMD_REPLACE, _RawNewSources}, Sources) -> + overwrite_entire_sources(Sources); +do_post_config_update(?ROOT_KEY, Conf, Conf) -> + #{sources := Sources} = Conf, + Sources; +do_post_config_update(?ROOT_KEY, _Conf, NewConf) -> + #{sources := NewSources} = NewConf, + overwrite_entire_sources(NewSources). + +overwrite_entire_sources(Sources) -> + PrevSources = lookup(), + NewSourcesTypes = lists:map(fun type/1, Sources), + EnsureDelete = fun(S) -> + TypeName = type(S), + Opts = + case lists:member(TypeName, NewSourcesTypes) of + true -> #{clear_metric => false}; + false -> #{clear_metric => true} + end, + ensure_deleted(S, Opts) + end, + lists:foreach(EnsureDelete, PrevSources), create_sources(Sources). %% @doc do source move @@ -238,8 +261,14 @@ do_move({?CMD_MOVE, Type, ?CMD_MOVE_AFTER(After)}, Sources) -> {S2, Front2, Rear2} = take(After, Front1 ++ Rear1), Front2 ++ [S2, S1] ++ Rear2. -ensure_resource_deleted(#{enable := false}) -> +ensure_deleted(#{enable := false}, _) -> ok; +ensure_deleted(Source, #{clear_metric := ClearMetric}) -> + TypeName = type(Source), + ensure_resource_deleted(Source), + clear_certs(Source), + ClearMetric andalso emqx_metrics_worker:clear_metrics(authz_metrics, TypeName). + ensure_resource_deleted(#{type := Type} = Source) -> Module = authz_module(Type), Module:destroy(Source). @@ -287,12 +316,18 @@ update_source(Type, OldSource, NewSource) -> init_metrics(Source) -> TypeName = type(Source), - emqx_metrics_worker:create_metrics( - authz_metrics, - TypeName, - [total, allow, deny, nomatch], - [total] - ). + case emqx_metrics_worker:has_metrics(authz_metrics, TypeName) of + %% Don't reset the metrics if it already exists + true -> + ok; + false -> + emqx_metrics_worker:create_metrics( + authz_metrics, + TypeName, + [total, allow, deny, nomatch], + [total] + ) + end. %%-------------------------------------------------------------------- %% AuthZ callbacks @@ -487,7 +522,9 @@ write_acl_file(#{<<"rules">> := Rules} = Source0) -> ok = check_acl_file_rules(AclPath, Rules), ok = write_file(AclPath, Rules), Source1 = maps:remove(<<"rules">>, Source0), - maps:put(<<"path">>, AclPath, Source1). + maps:put(<<"path">>, AclPath, Source1); +write_acl_file(Source) -> + Source. %% @doc where the acl.conf file is stored. acl_conf_file() -> diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index 39c414617..702359509 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -272,9 +272,80 @@ t_update_source(_) -> ], emqx_conf:get([authorization, sources], []) ), + ?assertMatch( + [ + #{type := http, enable := false}, + #{type := mongodb, enable := false}, + #{type := mysql, enable := false}, + #{type := postgresql, enable := false}, + #{type := redis, enable := false}, + #{type := file, enable := false} + ], + emqx_authz:lookup() + ), {ok, _} = emqx_authz:update(?CMD_REPLACE, []). +t_replace_all(_) -> + RootKey = [<<"authorization">>], + Conf = emqx:get_raw_config(RootKey), + emqx_authz_utils:update_config(RootKey, Conf#{ + <<"sources">> => [ + ?SOURCE6, ?SOURCE5, ?SOURCE4, ?SOURCE3, ?SOURCE2, ?SOURCE1 + ] + }), + %% config + ?assertMatch( + [ + #{type := file, enable := true}, + #{type := redis, enable := true}, + #{type := postgresql, enable := true}, + #{type := mysql, enable := true}, + #{type := mongodb, enable := true}, + #{type := http, enable := true} + ], + emqx_conf:get([authorization, sources], []) + ), + %% hooks status + ?assertMatch( + [ + #{type := file, enable := true}, + #{type := redis, enable := true}, + #{type := postgresql, enable := true}, + #{type := mysql, enable := true}, + #{type := mongodb, enable := true}, + #{type := http, enable := true} + ], + emqx_authz:lookup() + ), + Ids = [http, mongodb, mysql, postgresql, redis, file], + %% metrics + lists:foreach( + fun(Id) -> + ?assert(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id) + end, + Ids + ), + + ?assertMatch( + {ok, _}, + emqx_authz_utils:update_config( + RootKey, + Conf#{<<"sources">> => [?SOURCE1#{<<"enable">> => false}]} + ) + ), + %% hooks status + ?assertMatch([#{type := http, enable := false}], emqx_authz:lookup()), + %% metrics + ?assert(emqx_metrics_worker:has_metrics(authz_metrics, http)), + lists:foreach( + fun(Id) -> + ?assertNot(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id) + end, + Ids -- [http] + ), + ok. + t_delete_source(_) -> {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE1]), diff --git a/apps/emqx_authz/test/emqx_authz_test_lib.erl b/apps/emqx_authz/test/emqx_authz_test_lib.erl index db604d764..e308e0de7 100644 --- a/apps/emqx_authz/test/emqx_authz_test_lib.erl +++ b/apps/emqx_authz/test/emqx_authz_test_lib.erl @@ -25,21 +25,26 @@ -define(DEFAULT_CHECK_AVAIL_TIMEOUT, 1000). reset_authorizers() -> - reset_authorizers(deny, false). + reset_authorizers(deny, false, []). restore_authorizers() -> - reset_authorizers(allow, true). + reset_authorizers(allow, true, []). -reset_authorizers(Nomatch, ChacheEnabled) -> +reset_authorizers(Nomatch, CacheEnabled, Source) -> {ok, _} = emqx:update_config( [authorization], #{ <<"no_match">> => atom_to_binary(Nomatch), - <<"cache">> => #{<<"enable">> => atom_to_binary(ChacheEnabled)}, - <<"sources">> => [] + <<"cache">> => #{<<"enable">> => atom_to_binary(CacheEnabled)}, + <<"sources">> => Source } ), ok. +%% Don't reset sources +reset_authorizers(Nomatch, CacheEnabled) -> + {ok, _} = emqx:update_config([<<"authorization">>, <<"no_match">>], Nomatch), + {ok, _} = emqx:update_config([<<"authorization">>, <<"cache">>, <<"enable">>], CacheEnabled), + ok. setup_config(BaseConfig, SpecialParams) -> Config = maps:merge(BaseConfig, SpecialParams), diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 46c822ed0..d5d26c770 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -39,7 +39,8 @@ disable_enable/3, remove/2, check_deps_and_remove/3, - list/0 + list/0, + reload_hook/1 ]). -export([ @@ -133,6 +134,10 @@ safe_load_bridge(Type, Name, Conf, Opts) -> }) end. +reload_hook(Bridges) -> + ok = unload_hook(), + ok = load_hook(Bridges). + load_hook() -> Bridges = emqx:get_config([bridges], #{}), load_hook(Bridges). diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index e59259c3e..b226b3b32 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -69,10 +69,32 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> {ok, ConfNew} end. -post_config_update(Path, '$remove', _, OldConf, _AppEnvs) -> - _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf); -post_config_update(Path, _Req, NewConf, OldConf, _AppEnvs) -> +post_config_update([bridges, BridgeType, BridgeName] = Path, '$remove', _, OldConf, _AppEnvs) -> + _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf), + ok = emqx_bridge_resource:remove(BridgeType, BridgeName), + Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([bridges])), + emqx_bridge:reload_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok; +post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, undefined, _AppEnvs) -> + _ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, undefined), + ResOpts = emqx_resource:fetch_creation_opts(NewConf), + ok = emqx_bridge_resource:create(BridgeType, BridgeName, NewConf, ResOpts), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([bridges]), NewConf + ), + emqx_bridge:reload_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok; +post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, OldConf, _AppEnvs) -> _ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, OldConf), + ResOpts = emqx_resource:fetch_creation_opts(NewConf), + ok = emqx_bridge_resource:update(BridgeType, BridgeName, {OldConf, NewConf}, ResOpts), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([bridges]), NewConf + ), + emqx_bridge:reload_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), ok. %% internal functions diff --git a/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl index e717fe262..9e6050f36 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl @@ -110,9 +110,13 @@ t_upload_error(Config) -> Name = "cool_name", Data = <<"data"/utf8>>, - {ok, _} = emqx_conf:update( - [file_transfer, storage, local, exporter, s3, bucket], <<"invalid-bucket">>, #{} + Conf = emqx_conf:get_raw([file_transfer], #{}), + Conf1 = emqx_utils_maps:deep_put( + [<<"storage">>, <<"local">>, <<"exporter">>, <<"s3">>, <<"bucket">>], + Conf, + <<"invalid-bucket">> ), + {ok, _} = emqx_conf:update([file_transfer], Conf1, #{}), ?assertEqual( {error, unspecified_error}, diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 51ad6ab85..e3fef7e62 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -121,3 +121,6 @@ false end) ). + +-define(KEY_PATH, [rule_engine, rules]). +-define(RULE_PATH(RULE), [rule_engine, rules, RULE]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index d8a798eb8..24ad2c5f0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -26,8 +26,7 @@ -export([start_link/0]). -export([ - post_config_update/5, - config_key_path/0 + post_config_update/5 ]). %% Rule Management @@ -102,9 +101,6 @@ -type action_name() :: binary() | #{function := binary()}. -config_key_path() -> - [rule_engine, rules]. - -spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}. start_link() -> gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []). @@ -112,7 +108,13 @@ start_link() -> %%------------------------------------------------------------------------------ %% The config handler for emqx_rule_engine %%------------------------------------------------------------------------------ -post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) -> +post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) -> + create_rule(NewRule#{id => bin(RuleId)}); +post_config_update(?RULE_PATH(RuleId), '$remove', undefined, _OldRule, _AppEnvs) -> + delete_rule(bin(RuleId)); +post_config_update(?RULE_PATH(RuleId), _Req, NewRule, _OldRule, _AppEnvs) -> + update_rule(NewRule#{id => bin(RuleId)}); +post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRules}, _AppEnvs) -> #{added := Added, removed := Removed, changed := Updated} = emqx_utils_maps:diff_maps(NewRules, OldRules), try @@ -134,7 +136,7 @@ post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) -> end, Added ), - {ok, get_rules()} + ok catch throw:#{kind := _} = Error -> {error, Error} @@ -247,11 +249,11 @@ ensure_action_removed(RuleId, ActionName) -> case emqx:get_raw_config([rule_engine, rules, RuleId], not_found) of not_found -> ok; - #{<<"actions">> := Acts} -> + #{<<"actions">> := Acts} = Conf -> NewActs = [AName || AName <- Acts, FilterFunc(AName, ActionName)], {ok, _} = emqx_conf:update( - emqx_rule_engine:config_key_path() ++ [RuleId, actions], - NewActs, + ?RULE_PATH(RuleId), + Conf#{<<"actions">> => NewActs}, #{override_to => cluster} ), ok @@ -372,7 +374,7 @@ init([]) -> {write_concurrency, true}, {read_concurrency, true} ]), - ok = emqx_config_handler:add_handler( + ok = emqx_conf:add_handler( [rule_engine, jq_implementation_module], emqx_rule_engine_schema ), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index b53763d47..54a739b6d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -351,14 +351,13 @@ param_path_id() -> {400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}}; Id -> Params = filter_out_request_body(add_metadata(Params0)), - ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx_rule_engine:get_rule(Id) of {ok, _Rule} -> {400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}}; not_found -> + ConfPath = ?RULE_PATH(Id), case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of - {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> - [Rule] = get_one_rule(AllRules, Id), + {ok, #{post_config_update := #{emqx_rule_engine := Rule}}} -> {201, format_rule_info_resp(Rule)}; {error, Reason} -> ?SLOG(error, #{ @@ -396,10 +395,9 @@ param_path_id() -> end; '/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) -> Params = filter_out_request_body(Params0), - ConfPath = emqx_rule_engine:config_key_path() ++ [Id], + ConfPath = ?RULE_PATH(Id), case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of - {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> - [Rule] = get_one_rule(AllRules, Id), + {ok, #{post_config_update := #{emqx_rule_engine := Rule}}} -> {200, format_rule_info_resp(Rule)}; {error, Reason} -> ?SLOG(error, #{ @@ -412,7 +410,7 @@ param_path_id() -> '/rules/:id'(delete, #{bindings := #{id := Id}}) -> case emqx_rule_engine:get_rule(Id) of {ok, _Rule} -> - ConfPath = emqx_rule_engine:config_key_path() ++ [Id], + ConfPath = ?RULE_PATH(Id), case emqx_conf:remove(ConfPath, #{override_to => cluster}) of {ok, _} -> {204}; @@ -655,9 +653,6 @@ aggregate_metrics(AllMetrics) -> AllMetrics ). -get_one_rule(AllRules, Id) -> - [R || R = #{id := Id0} <- AllRules, Id0 == Id]. - add_metadata(Params) -> Params#{ <<"metadata">> => #{ diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 14d2b1f95..d8b031bdd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -29,11 +29,15 @@ start(_Type, _Args) -> ok = emqx_rule_events:reload(), SupRet = emqx_rule_engine_sup:start_link(), ok = emqx_rule_engine:load_rules(), - emqx_conf:add_handler(emqx_rule_engine:config_key_path(), emqx_rule_engine), + RulePath = [RuleEngine | _] = ?KEY_PATH, + emqx_conf:add_handler(RulePath ++ ['?'], emqx_rule_engine), + emqx_conf:add_handler([RuleEngine], emqx_rule_engine), emqx_rule_engine_cli:load(), SupRet. stop(_State) -> emqx_rule_engine_cli:unload(), - emqx_conf:remove_handler(emqx_rule_engine:config_key_path()), + RulePath = [RuleEngine | _] = ?KEY_PATH, + emqx_conf:remove_handler(RulePath ++ ['?']), + emqx_conf:remove_handler([RuleEngine]), ok = emqx_rule_events:unload(). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index ca7832717..2ec32173f 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -472,19 +472,17 @@ t_ensure_action_removed(_) -> Id = <<"t_ensure_action_removed">>, GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>, emqx:update_config( - [rule_engine, rules], + [rule_engine, rules, Id], #{ - Id => #{ - <<"actions">> => [ - #{<<"function">> => GetSelectedData}, - #{<<"function">> => <<"console">>}, - #{<<"function">> => <<"republish">>}, - <<"mysql:foo">>, - <<"mqtt:bar">> - ], - <<"description">> => <<"">>, - <<"sql">> => <<"SELECT * FROM \"t/#\"">> - } + <<"actions">> => [ + #{<<"function">> => GetSelectedData}, + #{<<"function">> => <<"console">>}, + #{<<"function">> => <<"republish">>}, + <<"mysql:foo">>, + <<"mqtt:bar">> + ], + <<"description">> => <<"">>, + <<"sql">> => <<"SELECT * FROM \"t/#\"">> } ), ?assertMatch( diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src index b74e9fa7d..b71ed01e5 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_schema_registry, [ {description, "EMQX Schema Registry"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, [emqx_ee_schema_registry_sup]}, {mod, {emqx_ee_schema_registry_app, []}}, {applications, [ diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl index 5ffcb2ba6..1390f9bfe 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl @@ -104,38 +104,32 @@ list_schemas() -> %%------------------------------------------------------------------------------------------------- %% `emqx_config_handler' API %%------------------------------------------------------------------------------------------------- - +%% remove post_config_update( - [?CONF_KEY_ROOT, schemas] = _Path, - _Cmd, - NewConf = #{schemas := NewSchemas}, - OldConf = #{}, + [?CONF_KEY_ROOT, schemas, Name], + '$remove', + _NewSchemas, + _OldSchemas, _AppEnvs ) -> - OldSchemas = maps:get(schemas, OldConf, #{}), - #{ - added := Added, - changed := Changed0, - removed := Removed - } = emqx_utils_maps:diff_maps(NewSchemas, OldSchemas), - Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0), - RemovedNames = maps:keys(Removed), - case RemovedNames of - [] -> - ok; - _ -> - async_delete_serdes(RemovedNames) - end, - SchemasToBuild = maps:to_list(maps:merge(Changed, Added)), - case build_serdes(SchemasToBuild) of + async_delete_serdes([Name]), + ok; +%% add or update +post_config_update( + [?CONF_KEY_ROOT, schemas, NewName], + _Cmd, + NewSchemas, + %% undefined or OldSchemas + _, + _AppEnvs +) -> + case build_serdes([{NewName, NewSchemas}]) of ok -> - {ok, NewConf}; + {ok, #{NewName => NewSchemas}}; {error, Reason, SerdesToRollback} -> lists:foreach(fun ensure_serde_absent/1, SerdesToRollback), {error, Reason} - end; -post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) -> - {ok, NewConf}. + end. %%------------------------------------------------------------------------------------------------- %% `gen_server' API diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl index e82ed95bd..195a54c15 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl @@ -11,9 +11,9 @@ start(_StartType, _StartArgs) -> ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity), - emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry), + emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry), emqx_ee_schema_registry_sup:start_link(). stop(_State) -> - emqx_conf:remove_handler(?CONF_KEY_PATH), + emqx_conf:remove_handler([?CONF_KEY_ROOT, schemas, '?']), ok. diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl index 99c4fa155..71f7c7d8b 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -607,21 +607,25 @@ t_fail_rollback(Config) -> SerdeType = ?config(serde_type, Config), OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)), BrokenSchema = OkSchema#{<<"source">> := <<"{}">>}, - %% hopefully, for this small map, the key order is used. - Serdes = #{ - <<"a">> => OkSchema, - <<"z">> => BrokenSchema - }, + ?assertMatch( - {error, _}, + {ok, _}, emqx_conf:update( - [?CONF_KEY_ROOT, schemas], - Serdes, + [?CONF_KEY_ROOT, schemas, <<"a">>], + OkSchema, #{} ) ), - %% no serdes should be in the table - ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"a">>)), + ?assertMatch( + {error, _}, + emqx_conf:update( + [?CONF_KEY_ROOT, schemas, <<"z">>], + BrokenSchema, + #{} + ) + ), + ?assertMatch({ok, #{name := <<"a">>}}, emqx_ee_schema_registry:get_serde(<<"a">>)), + %% no z serdes should be in the table ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"z">>)), ok.