diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index 1cdb563aa..47ff384c9 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -239,14 +239,30 @@ remove_config([RootName | _] = KeyPath, Opts) -> -spec reset_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -reset_config([RootName | _] = KeyPath, Opts) -> +reset_config([RootName | SubKeys] = KeyPath, Opts) -> case emqx_config:get_default_value(KeyPath) of {ok, Default} -> - emqx_config_handler:update_config( - emqx_config:get_schema_mod(RootName), - KeyPath, - {{update, Default}, Opts} - ); + Mod = emqx_config:get_schema_mod(RootName), + case SubKeys =:= [] of + true -> + emqx_config_handler:update_config( + Mod, + KeyPath, + {{update, Default}, Opts} + ); + false -> + NewConf = + emqx_utils_maps:deep_put( + SubKeys, + emqx_config:get_raw([RootName], #{}), + Default + ), + emqx_config_handler:update_config( + Mod, + [RootName], + {{update, NewConf}, Opts} + ) + end; {error, _} = Error -> Error end. diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index 0bad19f9e..5576f579d 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -31,8 +31,7 @@ remove_handler/1, update_config/3, get_raw_cluster_override_conf/0, - info/0, - merge_to_old_config/2 + info/0 ]). %% gen_server callbacks @@ -332,31 +331,16 @@ do_post_config_update( SubOldConf = get_sub_config(ConfKey, OldConf), SubNewConf = get_sub_config(ConfKey, NewConf), SubHandlers = get_sub_handlers(ConfKey, Handlers), - case - do_post_config_update( - SubConfKeyPath, - SubHandlers, - SubOldConf, - SubNewConf, - AppEnvs, - UpdateArgs, - Result, - ConfKeyPath - ) - of - {ok, Result1} -> - call_post_config_update( - Handlers, - OldConf, - NewConf, - AppEnvs, - up_req(UpdateArgs), - Result1, - ConfKeyPath - ); - Error -> - Error - end. + do_post_config_update( + SubConfKeyPath, + SubHandlers, + SubOldConf, + SubNewConf, + AppEnvs, + UpdateArgs, + Result, + ConfKeyPath + ). get_sub_handlers(ConfKey, Handlers) -> case maps:find(ConfKey, Handlers) of diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 7aaf93c99..54b3b3ca9 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -277,7 +277,7 @@ wait_for_app_processes(_) -> %% and stop others, and then the `application:start/2' callback is %% never called again for this application. perform_sanity_checks(emqx_rule_engine) -> - ensure_config_handler(emqx_rule_engine, [rule_engine, rules]), + ensure_config_handler(emqx_rule_engine, [rule_engine, rules, '?']), ok; perform_sanity_checks(emqx_bridge) -> ensure_config_handler(emqx_bridge, [bridges]), @@ -289,7 +289,7 @@ ensure_config_handler(Module, ConfigPath) -> #{handlers := Handlers} = sys:get_state(emqx_config_handler), case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of #{{mod} := Module} -> ok; - _NotFound -> error({config_handler_missing, ConfigPath, Module}) + NotFound -> error({config_handler_missing, ConfigPath, Module, NotFound}) end, ok. diff --git a/apps/emqx/test/emqx_config_handler_SUITE.erl b/apps/emqx/test/emqx_config_handler_SUITE.erl index e21c1867f..f04b8256c 100644 --- a/apps/emqx/test/emqx_config_handler_SUITE.erl +++ b/apps/emqx/test/emqx_config_handler_SUITE.erl @@ -130,7 +130,7 @@ t_root_key_update(_Config) -> ?assertEqual( {ok, #{ config => 0.81, - post_config_update => #{?MODULE => ok}, + post_config_update => #{}, raw_config => <<"81%">> }}, emqx:update_config(SubKey, "81%", Opts) @@ -302,11 +302,11 @@ t_update_sub(_Config) -> %% update sub key #{<<"os">> := OS1} = emqx:get_raw_config(PathKey), {ok, Res} = emqx:update_config(PathKey ++ [os, cpu_check_interval], <<"120s">>, Opts), - ?assertMatch( + ?assertEqual( #{ - config := 120000, - post_config_update := #{?MODULE := ok}, - raw_config := <<"120s">> + config => 120000, + post_config_update => #{}, + raw_config => <<"120s">> }, Res ), @@ -319,7 +319,7 @@ t_update_sub(_Config) -> ?assertEqual( {ok, #{ config => 0.81, - post_config_update => #{?MODULE => ok}, + post_config_update => #{}, raw_config => <<"81%">> }}, emqx:update_config(SubKey, "81%", Opts) diff --git a/apps/emqx_authz/include/emqx_authz.hrl b/apps/emqx_authz/include/emqx_authz.hrl index b7afd5e84..967865868 100644 --- a/apps/emqx_authz/include/emqx_authz.hrl +++ b/apps/emqx_authz/include/emqx_authz.hrl @@ -43,6 +43,7 @@ -define(CMD_MOVE_BEFORE(Before), {before, Before}). -define(CMD_MOVE_AFTER(After), {'after', After}). +-define(ROOT_KEY, [authorization]). -define(CONF_KEY_PATH, [authorization, sources]). -define(RE_PLACEHOLDER, "\\$\\{[a-z0-9_]+\\}"). diff --git a/apps/emqx_authz/src/emqx_authz.app.src b/apps/emqx_authz/src/emqx_authz.app.src index 7620f5548..929f292a7 100644 --- a/apps/emqx_authz/src/emqx_authz.app.src +++ b/apps/emqx_authz/src/emqx_authz.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_authz, [ {description, "An OTP application"}, - {vsn, "0.1.21"}, + {vsn, "0.1.22"}, {registered, []}, {mod, {emqx_authz_app, []}}, {applications, [ diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 7ceacdb68..c93b5c5e3 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -101,6 +101,7 @@ init() -> ok = register_metrics(), ok = init_metrics(client_info_source()), emqx_conf:add_handler(?CONF_KEY_PATH, ?MODULE), + emqx_conf:add_handler(?ROOT_KEY, ?MODULE), Sources = emqx_conf:get(?CONF_KEY_PATH, []), ok = check_dup_types(Sources), NSources = create_sources(Sources), @@ -109,6 +110,7 @@ init() -> deinit() -> ok = emqx_hooks:del('client.authorize', {?MODULE, authorize}), emqx_conf:remove_handler(?CONF_KEY_PATH), + emqx_conf:remove_handler(?ROOT_KEY), emqx_authz_utils:cleanup_resources(). lookup() -> @@ -139,14 +141,29 @@ update({?CMD_DELETE, Type}, Sources) -> update(Cmd, Sources) -> emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}). -pre_config_update(_, Cmd, Sources) -> - try do_pre_config_update(Cmd, Sources) of +pre_config_update(Path, Cmd, Sources) -> + try do_pre_config_update(Path, Cmd, Sources) of {error, Reason} -> {error, Reason}; NSources -> {ok, NSources} catch _:Reason -> {error, Reason} end. +do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources) -> + do_pre_config_update(Cmd, Sources); +do_pre_config_update(?ROOT_KEY, NewConf, OldConf) -> + do_pre_config_replace(NewConf, OldConf). + +%% override the entire config when updating the root key +%% emqx_conf:update(?ROOT_KEY, Conf); +do_pre_config_replace(Conf, Conf) -> + Conf; +do_pre_config_replace(NewConf, OldConf) -> + #{<<"sources">> := NewSources} = NewConf, + #{<<"sources">> := OldSources} = OldConf, + NewSources1 = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources), + NewConf#{<<"sources">> := NewSources1}. + do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources) -> do_move(Cmd, Sources); do_pre_config_update({?CMD_PREPEND, Source}, Sources) -> @@ -179,47 +196,53 @@ do_pre_config_update({Op, Source}, Sources) -> post_config_update(_, _, undefined, _OldSource, _AppEnvs) -> ok; -post_config_update(_, Cmd, NewSources, _OldSource, _AppEnvs) -> - Actions = do_post_config_update(Cmd, NewSources), +post_config_update(Path, Cmd, NewSources, _OldSource, _AppEnvs) -> + Actions = do_post_config_update(Path, Cmd, NewSources), ok = update_authz_chain(Actions), ok = emqx_authz_cache:drain_cache(). -do_post_config_update({?CMD_MOVE, _Type, _Where} = Cmd, _Sources) -> - InitedSources = lookup(), - do_move(Cmd, InitedSources); -do_post_config_update({?CMD_PREPEND, RawNewSource}, Sources) -> - InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)), - %% create metrics +do_post_config_update(?CONF_KEY_PATH, {?CMD_MOVE, _Type, _Where} = Cmd, _Sources) -> + do_move(Cmd, lookup()); +do_post_config_update(?CONF_KEY_PATH, {?CMD_PREPEND, RawNewSource}, Sources) -> TypeName = type(RawNewSource), - ok = emqx_metrics_worker:create_metrics( - authz_metrics, - TypeName, - [total, allow, deny, nomatch], - [total] - ), - [InitedNewSource] ++ lookup(); -do_post_config_update({?CMD_APPEND, RawNewSource}, Sources) -> - InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)), - lookup() ++ [InitedNewSource]; -do_post_config_update({{?CMD_REPLACE, Type}, RawNewSource}, Sources) -> + NewSources = create_sources([get_source_by_type(TypeName, Sources)]), + NewSources ++ lookup(); +do_post_config_update(?CONF_KEY_PATH, {?CMD_APPEND, RawNewSource}, Sources) -> + NewSources = create_sources([get_source_by_type(type(RawNewSource), Sources)]), + lookup() ++ NewSources; +do_post_config_update(?CONF_KEY_PATH, {{?CMD_REPLACE, Type}, RawNewSource}, Sources) -> OldSources = lookup(), {OldSource, Front, Rear} = take(Type, OldSources), NewSource = get_source_by_type(type(RawNewSource), Sources), InitedSources = update_source(type(RawNewSource), OldSource, NewSource), Front ++ [InitedSources] ++ Rear; -do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) -> +do_post_config_update(?CONF_KEY_PATH, {{?CMD_DELETE, Type}, _RawNewSource}, _Sources) -> OldInitedSources = lookup(), {OldSource, Front, Rear} = take(Type, OldInitedSources), - %% delete metrics - ok = emqx_metrics_worker:clear_metrics(authz_metrics, Type), - ok = ensure_resource_deleted(OldSource), - clear_certs(OldSource), + ok = ensure_deleted(OldSource, #{clear_metric => true}), Front ++ Rear; -do_post_config_update({?CMD_REPLACE, _RawNewSources}, Sources) -> - %% overwrite the entire config! - OldInitedSources = lookup(), - lists:foreach(fun ensure_resource_deleted/1, OldInitedSources), - lists:foreach(fun clear_certs/1, OldInitedSources), +do_post_config_update(?CONF_KEY_PATH, {?CMD_REPLACE, _RawNewSources}, Sources) -> + overwrite_entire_sources(Sources); +do_post_config_update(?ROOT_KEY, Conf, Conf) -> + #{sources := Sources} = Conf, + Sources; +do_post_config_update(?ROOT_KEY, _Conf, NewConf) -> + #{sources := NewSources} = NewConf, + overwrite_entire_sources(NewSources). + +overwrite_entire_sources(Sources) -> + PrevSources = lookup(), + NewSourcesTypes = lists:map(fun type/1, Sources), + EnsureDelete = fun(S) -> + TypeName = type(S), + Opts = + case lists:member(TypeName, NewSourcesTypes) of + true -> #{clear_metric => false}; + false -> #{clear_metric => true} + end, + ensure_deleted(S, Opts) + end, + lists:foreach(EnsureDelete, PrevSources), create_sources(Sources). %% @doc do source move @@ -238,8 +261,14 @@ do_move({?CMD_MOVE, Type, ?CMD_MOVE_AFTER(After)}, Sources) -> {S2, Front2, Rear2} = take(After, Front1 ++ Rear1), Front2 ++ [S2, S1] ++ Rear2. -ensure_resource_deleted(#{enable := false}) -> +ensure_deleted(#{enable := false}, _) -> ok; +ensure_deleted(Source, #{clear_metric := ClearMetric}) -> + TypeName = type(Source), + ensure_resource_deleted(Source), + clear_certs(Source), + ClearMetric andalso emqx_metrics_worker:clear_metrics(authz_metrics, TypeName). + ensure_resource_deleted(#{type := Type} = Source) -> Module = authz_module(Type), Module:destroy(Source). @@ -287,12 +316,18 @@ update_source(Type, OldSource, NewSource) -> init_metrics(Source) -> TypeName = type(Source), - emqx_metrics_worker:create_metrics( - authz_metrics, - TypeName, - [total, allow, deny, nomatch], - [total] - ). + case emqx_metrics_worker:has_metrics(authz_metrics, TypeName) of + %% Don't reset the metrics if it already exists + true -> + ok; + false -> + emqx_metrics_worker:create_metrics( + authz_metrics, + TypeName, + [total, allow, deny, nomatch], + [total] + ) + end. %%-------------------------------------------------------------------- %% AuthZ callbacks @@ -328,7 +363,10 @@ authorize( emqx_metrics:inc(?METRIC_SUPERUSER), {stop, #{result => allow, from => superuser}}; false -> - authorize_non_superuser(Client, PubSub, Topic, DefaultResult, Sources) + X = authorize_non_superuser(Client, PubSub, Topic, DefaultResult, Sources), + io:format("www:~p~n", [{Client, PubSub, Topic, DefaultResult, Sources}]), + io:format("res:~p~n", [X]), + X end. authorize_non_superuser( @@ -487,7 +525,9 @@ write_acl_file(#{<<"rules">> := Rules} = Source0) -> ok = check_acl_file_rules(AclPath, Rules), ok = write_file(AclPath, Rules), Source1 = maps:remove(<<"rules">>, Source0), - maps:put(<<"path">>, AclPath, Source1). + maps:put(<<"path">>, AclPath, Source1); +write_acl_file(Source) -> + Source. %% @doc where the acl.conf file is stored. acl_conf_file() -> diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index 39c414617..702359509 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -272,9 +272,80 @@ t_update_source(_) -> ], emqx_conf:get([authorization, sources], []) ), + ?assertMatch( + [ + #{type := http, enable := false}, + #{type := mongodb, enable := false}, + #{type := mysql, enable := false}, + #{type := postgresql, enable := false}, + #{type := redis, enable := false}, + #{type := file, enable := false} + ], + emqx_authz:lookup() + ), {ok, _} = emqx_authz:update(?CMD_REPLACE, []). +t_replace_all(_) -> + RootKey = [<<"authorization">>], + Conf = emqx:get_raw_config(RootKey), + emqx_authz_utils:update_config(RootKey, Conf#{ + <<"sources">> => [ + ?SOURCE6, ?SOURCE5, ?SOURCE4, ?SOURCE3, ?SOURCE2, ?SOURCE1 + ] + }), + %% config + ?assertMatch( + [ + #{type := file, enable := true}, + #{type := redis, enable := true}, + #{type := postgresql, enable := true}, + #{type := mysql, enable := true}, + #{type := mongodb, enable := true}, + #{type := http, enable := true} + ], + emqx_conf:get([authorization, sources], []) + ), + %% hooks status + ?assertMatch( + [ + #{type := file, enable := true}, + #{type := redis, enable := true}, + #{type := postgresql, enable := true}, + #{type := mysql, enable := true}, + #{type := mongodb, enable := true}, + #{type := http, enable := true} + ], + emqx_authz:lookup() + ), + Ids = [http, mongodb, mysql, postgresql, redis, file], + %% metrics + lists:foreach( + fun(Id) -> + ?assert(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id) + end, + Ids + ), + + ?assertMatch( + {ok, _}, + emqx_authz_utils:update_config( + RootKey, + Conf#{<<"sources">> => [?SOURCE1#{<<"enable">> => false}]} + ) + ), + %% hooks status + ?assertMatch([#{type := http, enable := false}], emqx_authz:lookup()), + %% metrics + ?assert(emqx_metrics_worker:has_metrics(authz_metrics, http)), + lists:foreach( + fun(Id) -> + ?assertNot(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id) + end, + Ids -- [http] + ), + ok. + t_delete_source(_) -> {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE1]), diff --git a/apps/emqx_authz/test/emqx_authz_test_lib.erl b/apps/emqx_authz/test/emqx_authz_test_lib.erl index db604d764..e308e0de7 100644 --- a/apps/emqx_authz/test/emqx_authz_test_lib.erl +++ b/apps/emqx_authz/test/emqx_authz_test_lib.erl @@ -25,21 +25,26 @@ -define(DEFAULT_CHECK_AVAIL_TIMEOUT, 1000). reset_authorizers() -> - reset_authorizers(deny, false). + reset_authorizers(deny, false, []). restore_authorizers() -> - reset_authorizers(allow, true). + reset_authorizers(allow, true, []). -reset_authorizers(Nomatch, ChacheEnabled) -> +reset_authorizers(Nomatch, CacheEnabled, Source) -> {ok, _} = emqx:update_config( [authorization], #{ <<"no_match">> => atom_to_binary(Nomatch), - <<"cache">> => #{<<"enable">> => atom_to_binary(ChacheEnabled)}, - <<"sources">> => [] + <<"cache">> => #{<<"enable">> => atom_to_binary(CacheEnabled)}, + <<"sources">> => Source } ), ok. +%% Don't reset sources +reset_authorizers(Nomatch, CacheEnabled) -> + {ok, _} = emqx:update_config([<<"authorization">>, <<"no_match">>], Nomatch), + {ok, _} = emqx:update_config([<<"authorization">>, <<"cache">>, <<"enable">>], CacheEnabled), + ok. setup_config(BaseConfig, SpecialParams) -> Config = maps:merge(BaseConfig, SpecialParams), diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 46c822ed0..d5d26c770 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -39,7 +39,8 @@ disable_enable/3, remove/2, check_deps_and_remove/3, - list/0 + list/0, + reload_hook/1 ]). -export([ @@ -133,6 +134,10 @@ safe_load_bridge(Type, Name, Conf, Opts) -> }) end. +reload_hook(Bridges) -> + ok = unload_hook(), + ok = load_hook(Bridges). + load_hook() -> Bridges = emqx:get_config([bridges], #{}), load_hook(Bridges). diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index e59259c3e..b226b3b32 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -69,10 +69,32 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> {ok, ConfNew} end. -post_config_update(Path, '$remove', _, OldConf, _AppEnvs) -> - _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf); -post_config_update(Path, _Req, NewConf, OldConf, _AppEnvs) -> +post_config_update([bridges, BridgeType, BridgeName] = Path, '$remove', _, OldConf, _AppEnvs) -> + _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf), + ok = emqx_bridge_resource:remove(BridgeType, BridgeName), + Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([bridges])), + emqx_bridge:reload_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok; +post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, undefined, _AppEnvs) -> + _ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, undefined), + ResOpts = emqx_resource:fetch_creation_opts(NewConf), + ok = emqx_bridge_resource:create(BridgeType, BridgeName, NewConf, ResOpts), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([bridges]), NewConf + ), + emqx_bridge:reload_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok; +post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, OldConf, _AppEnvs) -> _ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, OldConf), + ResOpts = emqx_resource:fetch_creation_opts(NewConf), + ok = emqx_bridge_resource:update(BridgeType, BridgeName, {OldConf, NewConf}, ResOpts), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([bridges]), NewConf + ), + emqx_bridge:reload_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), ok. %% internal functions diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index d8a798eb8..02e3a60ac 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -99,11 +99,13 @@ ]). -define(RATE_METRICS, ['matched']). +-define(KEY_PATH, [rule_engine, rules]). +-define(RULE_PATH(RULE), [rule_engine, rules, RULE]). -type action_name() :: binary() | #{function := binary()}. config_key_path() -> - [rule_engine, rules]. + ?KEY_PATH. -spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}. start_link() -> @@ -112,7 +114,13 @@ start_link() -> %%------------------------------------------------------------------------------ %% The config handler for emqx_rule_engine %%------------------------------------------------------------------------------ -post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) -> +post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) -> + create_rule(NewRule#{id => bin(RuleId)}); +post_config_update(?RULE_PATH(RuleId), '$remove', undefined, _OldRule, _AppEnvs) -> + delete_rule(bin(RuleId)); +post_config_update(?RULE_PATH(RuleId), _Req, NewRule, _OldRule, _AppEnvs) -> + update_rule(NewRule#{id => bin(RuleId)}); +post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRules}, _AppEnvs) -> #{added := Added, removed := Removed, changed := Updated} = emqx_utils_maps:diff_maps(NewRules, OldRules), try @@ -134,7 +142,7 @@ post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) -> end, Added ), - {ok, get_rules()} + ok catch throw:#{kind := _} = Error -> {error, Error} diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index b53763d47..b8fc8d67d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -357,8 +357,7 @@ param_path_id() -> {400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}}; not_found -> case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of - {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> - [Rule] = get_one_rule(AllRules, Id), + {ok, #{post_config_update := #{emqx_rule_engine := Rule}}} -> {201, format_rule_info_resp(Rule)}; {error, Reason} -> ?SLOG(error, #{ @@ -398,8 +397,7 @@ param_path_id() -> Params = filter_out_request_body(Params0), ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of - {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> - [Rule] = get_one_rule(AllRules, Id), + {ok, #{post_config_update := #{emqx_rule_engine := Rule}}} -> {200, format_rule_info_resp(Rule)}; {error, Reason} -> ?SLOG(error, #{ @@ -655,9 +653,6 @@ aggregate_metrics(AllMetrics) -> AllMetrics ). -get_one_rule(AllRules, Id) -> - [R || R = #{id := Id0} <- AllRules, Id0 == Id]. - add_metadata(Params) -> Params#{ <<"metadata">> => #{ diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 14d2b1f95..3b04f7a50 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -29,11 +29,15 @@ start(_Type, _Args) -> ok = emqx_rule_events:reload(), SupRet = emqx_rule_engine_sup:start_link(), ok = emqx_rule_engine:load_rules(), - emqx_conf:add_handler(emqx_rule_engine:config_key_path(), emqx_rule_engine), + RulePath = [RuleEngine | _] = emqx_rule_engine:config_key_path(), + emqx_conf:add_handler(RulePath ++ ['?'], emqx_rule_engine), + emqx_conf:add_handler([RuleEngine], emqx_rule_engine), emqx_rule_engine_cli:load(), SupRet. stop(_State) -> emqx_rule_engine_cli:unload(), - emqx_conf:remove_handler(emqx_rule_engine:config_key_path()), + RulePath = [RuleEngine | _] = emqx_rule_engine:config_key_path(), + emqx_conf:remove_handler(RulePath ++ ['?']), + emqx_conf:remove_handler([RuleEngine]), ok = emqx_rule_events:unload(). diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src index b74e9fa7d..b71ed01e5 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_schema_registry, [ {description, "EMQX Schema Registry"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, [emqx_ee_schema_registry_sup]}, {mod, {emqx_ee_schema_registry_app, []}}, {applications, [ diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl index 5ffcb2ba6..1390f9bfe 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl @@ -104,38 +104,32 @@ list_schemas() -> %%------------------------------------------------------------------------------------------------- %% `emqx_config_handler' API %%------------------------------------------------------------------------------------------------- - +%% remove post_config_update( - [?CONF_KEY_ROOT, schemas] = _Path, - _Cmd, - NewConf = #{schemas := NewSchemas}, - OldConf = #{}, + [?CONF_KEY_ROOT, schemas, Name], + '$remove', + _NewSchemas, + _OldSchemas, _AppEnvs ) -> - OldSchemas = maps:get(schemas, OldConf, #{}), - #{ - added := Added, - changed := Changed0, - removed := Removed - } = emqx_utils_maps:diff_maps(NewSchemas, OldSchemas), - Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0), - RemovedNames = maps:keys(Removed), - case RemovedNames of - [] -> - ok; - _ -> - async_delete_serdes(RemovedNames) - end, - SchemasToBuild = maps:to_list(maps:merge(Changed, Added)), - case build_serdes(SchemasToBuild) of + async_delete_serdes([Name]), + ok; +%% add or update +post_config_update( + [?CONF_KEY_ROOT, schemas, NewName], + _Cmd, + NewSchemas, + %% undefined or OldSchemas + _, + _AppEnvs +) -> + case build_serdes([{NewName, NewSchemas}]) of ok -> - {ok, NewConf}; + {ok, #{NewName => NewSchemas}}; {error, Reason, SerdesToRollback} -> lists:foreach(fun ensure_serde_absent/1, SerdesToRollback), {error, Reason} - end; -post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) -> - {ok, NewConf}. + end. %%------------------------------------------------------------------------------------------------- %% `gen_server' API diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl index e82ed95bd..195a54c15 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl @@ -11,9 +11,9 @@ start(_StartType, _StartArgs) -> ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity), - emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry), + emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry), emqx_ee_schema_registry_sup:start_link(). stop(_State) -> - emqx_conf:remove_handler(?CONF_KEY_PATH), + emqx_conf:remove_handler([?CONF_KEY_ROOT, schemas, '?']), ok. diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl index 99c4fa155..71f7c7d8b 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -607,21 +607,25 @@ t_fail_rollback(Config) -> SerdeType = ?config(serde_type, Config), OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)), BrokenSchema = OkSchema#{<<"source">> := <<"{}">>}, - %% hopefully, for this small map, the key order is used. - Serdes = #{ - <<"a">> => OkSchema, - <<"z">> => BrokenSchema - }, + ?assertMatch( - {error, _}, + {ok, _}, emqx_conf:update( - [?CONF_KEY_ROOT, schemas], - Serdes, + [?CONF_KEY_ROOT, schemas, <<"a">>], + OkSchema, #{} ) ), - %% no serdes should be in the table - ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"a">>)), + ?assertMatch( + {error, _}, + emqx_conf:update( + [?CONF_KEY_ROOT, schemas, <<"z">>], + BrokenSchema, + #{} + ) + ), + ?assertMatch({ok, #{name := <<"a">>}}, emqx_ee_schema_registry:get_serde(<<"a">>)), + %% no z serdes should be in the table ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"z">>)), ok.