feat: improve authz/bridge/rule_engine/schema_registry config update

This commit is contained in:
某文 2023-06-01 23:19:23 +08:00
parent 7740987a17
commit c27d844244
18 changed files with 290 additions and 141 deletions

View File

@ -239,14 +239,30 @@ remove_config([RootName | _] = KeyPath, Opts) ->
-spec reset_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset_config([RootName | _] = KeyPath, Opts) ->
reset_config([RootName | SubKeys] = KeyPath, Opts) ->
case emqx_config:get_default_value(KeyPath) of
{ok, Default} ->
emqx_config_handler:update_config(
emqx_config:get_schema_mod(RootName),
KeyPath,
{{update, Default}, Opts}
);
Mod = emqx_config:get_schema_mod(RootName),
case SubKeys =:= [] of
true ->
emqx_config_handler:update_config(
Mod,
KeyPath,
{{update, Default}, Opts}
);
false ->
NewConf =
emqx_utils_maps:deep_put(
SubKeys,
emqx_config:get_raw([RootName], #{}),
Default
),
emqx_config_handler:update_config(
Mod,
[RootName],
{{update, NewConf}, Opts}
)
end;
{error, _} = Error ->
Error
end.

View File

@ -31,8 +31,7 @@
remove_handler/1,
update_config/3,
get_raw_cluster_override_conf/0,
info/0,
merge_to_old_config/2
info/0
]).
%% gen_server callbacks
@ -332,31 +331,16 @@ do_post_config_update(
SubOldConf = get_sub_config(ConfKey, OldConf),
SubNewConf = get_sub_config(ConfKey, NewConf),
SubHandlers = get_sub_handlers(ConfKey, Handlers),
case
do_post_config_update(
SubConfKeyPath,
SubHandlers,
SubOldConf,
SubNewConf,
AppEnvs,
UpdateArgs,
Result,
ConfKeyPath
)
of
{ok, Result1} ->
call_post_config_update(
Handlers,
OldConf,
NewConf,
AppEnvs,
up_req(UpdateArgs),
Result1,
ConfKeyPath
);
Error ->
Error
end.
do_post_config_update(
SubConfKeyPath,
SubHandlers,
SubOldConf,
SubNewConf,
AppEnvs,
UpdateArgs,
Result,
ConfKeyPath
).
get_sub_handlers(ConfKey, Handlers) ->
case maps:find(ConfKey, Handlers) of

View File

@ -277,7 +277,7 @@ wait_for_app_processes(_) ->
%% and stop others, and then the `application:start/2' callback is
%% never called again for this application.
perform_sanity_checks(emqx_rule_engine) ->
ensure_config_handler(emqx_rule_engine, [rule_engine, rules]),
ensure_config_handler(emqx_rule_engine, [rule_engine, rules, '?']),
ok;
perform_sanity_checks(emqx_bridge) ->
ensure_config_handler(emqx_bridge, [bridges]),
@ -289,7 +289,7 @@ ensure_config_handler(Module, ConfigPath) ->
#{handlers := Handlers} = sys:get_state(emqx_config_handler),
case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of
#{{mod} := Module} -> ok;
_NotFound -> error({config_handler_missing, ConfigPath, Module})
NotFound -> error({config_handler_missing, ConfigPath, Module, NotFound})
end,
ok.

View File

@ -130,7 +130,7 @@ t_root_key_update(_Config) ->
?assertEqual(
{ok, #{
config => 0.81,
post_config_update => #{?MODULE => ok},
post_config_update => #{},
raw_config => <<"81%">>
}},
emqx:update_config(SubKey, "81%", Opts)
@ -302,11 +302,11 @@ t_update_sub(_Config) ->
%% update sub key
#{<<"os">> := OS1} = emqx:get_raw_config(PathKey),
{ok, Res} = emqx:update_config(PathKey ++ [os, cpu_check_interval], <<"120s">>, Opts),
?assertMatch(
?assertEqual(
#{
config := 120000,
post_config_update := #{?MODULE := ok},
raw_config := <<"120s">>
config => 120000,
post_config_update => #{},
raw_config => <<"120s">>
},
Res
),
@ -319,7 +319,7 @@ t_update_sub(_Config) ->
?assertEqual(
{ok, #{
config => 0.81,
post_config_update => #{?MODULE => ok},
post_config_update => #{},
raw_config => <<"81%">>
}},
emqx:update_config(SubKey, "81%", Opts)

View File

@ -43,6 +43,7 @@
-define(CMD_MOVE_BEFORE(Before), {before, Before}).
-define(CMD_MOVE_AFTER(After), {'after', After}).
-define(ROOT_KEY, [authorization]).
-define(CONF_KEY_PATH, [authorization, sources]).
-define(RE_PLACEHOLDER, "\\$\\{[a-z0-9_]+\\}").

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_authz, [
{description, "An OTP application"},
{vsn, "0.1.21"},
{vsn, "0.1.22"},
{registered, []},
{mod, {emqx_authz_app, []}},
{applications, [

View File

@ -101,6 +101,7 @@ init() ->
ok = register_metrics(),
ok = init_metrics(client_info_source()),
emqx_conf:add_handler(?CONF_KEY_PATH, ?MODULE),
emqx_conf:add_handler(?ROOT_KEY, ?MODULE),
Sources = emqx_conf:get(?CONF_KEY_PATH, []),
ok = check_dup_types(Sources),
NSources = create_sources(Sources),
@ -109,6 +110,7 @@ init() ->
deinit() ->
ok = emqx_hooks:del('client.authorize', {?MODULE, authorize}),
emqx_conf:remove_handler(?CONF_KEY_PATH),
emqx_conf:remove_handler(?ROOT_KEY),
emqx_authz_utils:cleanup_resources().
lookup() ->
@ -139,14 +141,29 @@ update({?CMD_DELETE, Type}, Sources) ->
update(Cmd, Sources) ->
emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}).
pre_config_update(_, Cmd, Sources) ->
try do_pre_config_update(Cmd, Sources) of
pre_config_update(Path, Cmd, Sources) ->
try do_pre_config_update(Path, Cmd, Sources) of
{error, Reason} -> {error, Reason};
NSources -> {ok, NSources}
catch
_:Reason -> {error, Reason}
end.
do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources) ->
do_pre_config_update(Cmd, Sources);
do_pre_config_update(?ROOT_KEY, NewConf, OldConf) ->
do_pre_config_replace(NewConf, OldConf).
%% override the entire config when updating the root key
%% emqx_conf:update(?ROOT_KEY, Conf);
do_pre_config_replace(Conf, Conf) ->
Conf;
do_pre_config_replace(NewConf, OldConf) ->
#{<<"sources">> := NewSources} = NewConf,
#{<<"sources">> := OldSources} = OldConf,
NewSources1 = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources),
NewConf#{<<"sources">> := NewSources1}.
do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources) ->
do_move(Cmd, Sources);
do_pre_config_update({?CMD_PREPEND, Source}, Sources) ->
@ -179,47 +196,53 @@ do_pre_config_update({Op, Source}, Sources) ->
post_config_update(_, _, undefined, _OldSource, _AppEnvs) ->
ok;
post_config_update(_, Cmd, NewSources, _OldSource, _AppEnvs) ->
Actions = do_post_config_update(Cmd, NewSources),
post_config_update(Path, Cmd, NewSources, _OldSource, _AppEnvs) ->
Actions = do_post_config_update(Path, Cmd, NewSources),
ok = update_authz_chain(Actions),
ok = emqx_authz_cache:drain_cache().
do_post_config_update({?CMD_MOVE, _Type, _Where} = Cmd, _Sources) ->
InitedSources = lookup(),
do_move(Cmd, InitedSources);
do_post_config_update({?CMD_PREPEND, RawNewSource}, Sources) ->
InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)),
%% create metrics
do_post_config_update(?CONF_KEY_PATH, {?CMD_MOVE, _Type, _Where} = Cmd, _Sources) ->
do_move(Cmd, lookup());
do_post_config_update(?CONF_KEY_PATH, {?CMD_PREPEND, RawNewSource}, Sources) ->
TypeName = type(RawNewSource),
ok = emqx_metrics_worker:create_metrics(
authz_metrics,
TypeName,
[total, allow, deny, nomatch],
[total]
),
[InitedNewSource] ++ lookup();
do_post_config_update({?CMD_APPEND, RawNewSource}, Sources) ->
InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)),
lookup() ++ [InitedNewSource];
do_post_config_update({{?CMD_REPLACE, Type}, RawNewSource}, Sources) ->
NewSources = create_sources([get_source_by_type(TypeName, Sources)]),
NewSources ++ lookup();
do_post_config_update(?CONF_KEY_PATH, {?CMD_APPEND, RawNewSource}, Sources) ->
NewSources = create_sources([get_source_by_type(type(RawNewSource), Sources)]),
lookup() ++ NewSources;
do_post_config_update(?CONF_KEY_PATH, {{?CMD_REPLACE, Type}, RawNewSource}, Sources) ->
OldSources = lookup(),
{OldSource, Front, Rear} = take(Type, OldSources),
NewSource = get_source_by_type(type(RawNewSource), Sources),
InitedSources = update_source(type(RawNewSource), OldSource, NewSource),
Front ++ [InitedSources] ++ Rear;
do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
do_post_config_update(?CONF_KEY_PATH, {{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
OldInitedSources = lookup(),
{OldSource, Front, Rear} = take(Type, OldInitedSources),
%% delete metrics
ok = emqx_metrics_worker:clear_metrics(authz_metrics, Type),
ok = ensure_resource_deleted(OldSource),
clear_certs(OldSource),
ok = ensure_deleted(OldSource, #{clear_metric => true}),
Front ++ Rear;
do_post_config_update({?CMD_REPLACE, _RawNewSources}, Sources) ->
%% overwrite the entire config!
OldInitedSources = lookup(),
lists:foreach(fun ensure_resource_deleted/1, OldInitedSources),
lists:foreach(fun clear_certs/1, OldInitedSources),
do_post_config_update(?CONF_KEY_PATH, {?CMD_REPLACE, _RawNewSources}, Sources) ->
overwrite_entire_sources(Sources);
do_post_config_update(?ROOT_KEY, Conf, Conf) ->
#{sources := Sources} = Conf,
Sources;
do_post_config_update(?ROOT_KEY, _Conf, NewConf) ->
#{sources := NewSources} = NewConf,
overwrite_entire_sources(NewSources).
overwrite_entire_sources(Sources) ->
PrevSources = lookup(),
NewSourcesTypes = lists:map(fun type/1, Sources),
EnsureDelete = fun(S) ->
TypeName = type(S),
Opts =
case lists:member(TypeName, NewSourcesTypes) of
true -> #{clear_metric => false};
false -> #{clear_metric => true}
end,
ensure_deleted(S, Opts)
end,
lists:foreach(EnsureDelete, PrevSources),
create_sources(Sources).
%% @doc do source move
@ -238,8 +261,14 @@ do_move({?CMD_MOVE, Type, ?CMD_MOVE_AFTER(After)}, Sources) ->
{S2, Front2, Rear2} = take(After, Front1 ++ Rear1),
Front2 ++ [S2, S1] ++ Rear2.
ensure_resource_deleted(#{enable := false}) ->
ensure_deleted(#{enable := false}, _) ->
ok;
ensure_deleted(Source, #{clear_metric := ClearMetric}) ->
TypeName = type(Source),
ensure_resource_deleted(Source),
clear_certs(Source),
ClearMetric andalso emqx_metrics_worker:clear_metrics(authz_metrics, TypeName).
ensure_resource_deleted(#{type := Type} = Source) ->
Module = authz_module(Type),
Module:destroy(Source).
@ -287,12 +316,18 @@ update_source(Type, OldSource, NewSource) ->
init_metrics(Source) ->
TypeName = type(Source),
emqx_metrics_worker:create_metrics(
authz_metrics,
TypeName,
[total, allow, deny, nomatch],
[total]
).
case emqx_metrics_worker:has_metrics(authz_metrics, TypeName) of
%% Don't reset the metrics if it already exists
true ->
ok;
false ->
emqx_metrics_worker:create_metrics(
authz_metrics,
TypeName,
[total, allow, deny, nomatch],
[total]
)
end.
%%--------------------------------------------------------------------
%% AuthZ callbacks
@ -328,7 +363,10 @@ authorize(
emqx_metrics:inc(?METRIC_SUPERUSER),
{stop, #{result => allow, from => superuser}};
false ->
authorize_non_superuser(Client, PubSub, Topic, DefaultResult, Sources)
X = authorize_non_superuser(Client, PubSub, Topic, DefaultResult, Sources),
io:format("www:~p~n", [{Client, PubSub, Topic, DefaultResult, Sources}]),
io:format("res:~p~n", [X]),
X
end.
authorize_non_superuser(
@ -487,7 +525,9 @@ write_acl_file(#{<<"rules">> := Rules} = Source0) ->
ok = check_acl_file_rules(AclPath, Rules),
ok = write_file(AclPath, Rules),
Source1 = maps:remove(<<"rules">>, Source0),
maps:put(<<"path">>, AclPath, Source1).
maps:put(<<"path">>, AclPath, Source1);
write_acl_file(Source) ->
Source.
%% @doc where the acl.conf file is stored.
acl_conf_file() ->

View File

@ -272,9 +272,80 @@ t_update_source(_) ->
],
emqx_conf:get([authorization, sources], [])
),
?assertMatch(
[
#{type := http, enable := false},
#{type := mongodb, enable := false},
#{type := mysql, enable := false},
#{type := postgresql, enable := false},
#{type := redis, enable := false},
#{type := file, enable := false}
],
emqx_authz:lookup()
),
{ok, _} = emqx_authz:update(?CMD_REPLACE, []).
t_replace_all(_) ->
RootKey = [<<"authorization">>],
Conf = emqx:get_raw_config(RootKey),
emqx_authz_utils:update_config(RootKey, Conf#{
<<"sources">> => [
?SOURCE6, ?SOURCE5, ?SOURCE4, ?SOURCE3, ?SOURCE2, ?SOURCE1
]
}),
%% config
?assertMatch(
[
#{type := file, enable := true},
#{type := redis, enable := true},
#{type := postgresql, enable := true},
#{type := mysql, enable := true},
#{type := mongodb, enable := true},
#{type := http, enable := true}
],
emqx_conf:get([authorization, sources], [])
),
%% hooks status
?assertMatch(
[
#{type := file, enable := true},
#{type := redis, enable := true},
#{type := postgresql, enable := true},
#{type := mysql, enable := true},
#{type := mongodb, enable := true},
#{type := http, enable := true}
],
emqx_authz:lookup()
),
Ids = [http, mongodb, mysql, postgresql, redis, file],
%% metrics
lists:foreach(
fun(Id) ->
?assert(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id)
end,
Ids
),
?assertMatch(
{ok, _},
emqx_authz_utils:update_config(
RootKey,
Conf#{<<"sources">> => [?SOURCE1#{<<"enable">> => false}]}
)
),
%% hooks status
?assertMatch([#{type := http, enable := false}], emqx_authz:lookup()),
%% metrics
?assert(emqx_metrics_worker:has_metrics(authz_metrics, http)),
lists:foreach(
fun(Id) ->
?assertNot(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id)
end,
Ids -- [http]
),
ok.
t_delete_source(_) ->
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE1]),

View File

@ -25,21 +25,26 @@
-define(DEFAULT_CHECK_AVAIL_TIMEOUT, 1000).
reset_authorizers() ->
reset_authorizers(deny, false).
reset_authorizers(deny, false, []).
restore_authorizers() ->
reset_authorizers(allow, true).
reset_authorizers(allow, true, []).
reset_authorizers(Nomatch, ChacheEnabled) ->
reset_authorizers(Nomatch, CacheEnabled, Source) ->
{ok, _} = emqx:update_config(
[authorization],
#{
<<"no_match">> => atom_to_binary(Nomatch),
<<"cache">> => #{<<"enable">> => atom_to_binary(ChacheEnabled)},
<<"sources">> => []
<<"cache">> => #{<<"enable">> => atom_to_binary(CacheEnabled)},
<<"sources">> => Source
}
),
ok.
%% Don't reset sources
reset_authorizers(Nomatch, CacheEnabled) ->
{ok, _} = emqx:update_config([<<"authorization">>, <<"no_match">>], Nomatch),
{ok, _} = emqx:update_config([<<"authorization">>, <<"cache">>, <<"enable">>], CacheEnabled),
ok.
setup_config(BaseConfig, SpecialParams) ->
Config = maps:merge(BaseConfig, SpecialParams),

View File

@ -39,7 +39,8 @@
disable_enable/3,
remove/2,
check_deps_and_remove/3,
list/0
list/0,
reload_hook/1
]).
-export([
@ -133,6 +134,10 @@ safe_load_bridge(Type, Name, Conf, Opts) ->
})
end.
reload_hook(Bridges) ->
ok = unload_hook(),
ok = load_hook(Bridges).
load_hook() ->
Bridges = emqx:get_config([bridges], #{}),
load_hook(Bridges).

View File

@ -69,10 +69,32 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
{ok, ConfNew}
end.
post_config_update(Path, '$remove', _, OldConf, _AppEnvs) ->
_ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf);
post_config_update(Path, _Req, NewConf, OldConf, _AppEnvs) ->
post_config_update([bridges, BridgeType, BridgeName] = Path, '$remove', _, OldConf, _AppEnvs) ->
_ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf),
ok = emqx_bridge_resource:remove(BridgeType, BridgeName),
Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([bridges])),
emqx_bridge:reload_hook(Bridges),
?tp(bridge_post_config_update_done, #{}),
ok;
post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, undefined, _AppEnvs) ->
_ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, undefined),
ResOpts = emqx_resource:fetch_creation_opts(NewConf),
ok = emqx_bridge_resource:create(BridgeType, BridgeName, NewConf, ResOpts),
Bridges = emqx_utils_maps:deep_put(
[BridgeType, BridgeName], emqx:get_config([bridges]), NewConf
),
emqx_bridge:reload_hook(Bridges),
?tp(bridge_post_config_update_done, #{}),
ok;
post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, OldConf, _AppEnvs) ->
_ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, OldConf),
ResOpts = emqx_resource:fetch_creation_opts(NewConf),
ok = emqx_bridge_resource:update(BridgeType, BridgeName, {OldConf, NewConf}, ResOpts),
Bridges = emqx_utils_maps:deep_put(
[BridgeType, BridgeName], emqx:get_config([bridges]), NewConf
),
emqx_bridge:reload_hook(Bridges),
?tp(bridge_post_config_update_done, #{}),
ok.
%% internal functions

View File

@ -99,11 +99,13 @@
]).
-define(RATE_METRICS, ['matched']).
-define(KEY_PATH, [rule_engine, rules]).
-define(RULE_PATH(RULE), [rule_engine, rules, RULE]).
-type action_name() :: binary() | #{function := binary()}.
config_key_path() ->
[rule_engine, rules].
?KEY_PATH.
-spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}.
start_link() ->
@ -112,7 +114,13 @@ start_link() ->
%%------------------------------------------------------------------------------
%% The config handler for emqx_rule_engine
%%------------------------------------------------------------------------------
post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) ->
create_rule(NewRule#{id => bin(RuleId)});
post_config_update(?RULE_PATH(RuleId), '$remove', undefined, _OldRule, _AppEnvs) ->
delete_rule(bin(RuleId));
post_config_update(?RULE_PATH(RuleId), _Req, NewRule, _OldRule, _AppEnvs) ->
update_rule(NewRule#{id => bin(RuleId)});
post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRules}, _AppEnvs) ->
#{added := Added, removed := Removed, changed := Updated} =
emqx_utils_maps:diff_maps(NewRules, OldRules),
try
@ -134,7 +142,7 @@ post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
end,
Added
),
{ok, get_rules()}
ok
catch
throw:#{kind := _} = Error ->
{error, Error}

View File

@ -357,8 +357,7 @@ param_path_id() ->
{400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}};
not_found ->
case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id),
{ok, #{post_config_update := #{emqx_rule_engine := Rule}}} ->
{201, format_rule_info_resp(Rule)};
{error, Reason} ->
?SLOG(error, #{
@ -398,8 +397,7 @@ param_path_id() ->
Params = filter_out_request_body(Params0),
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id),
{ok, #{post_config_update := #{emqx_rule_engine := Rule}}} ->
{200, format_rule_info_resp(Rule)};
{error, Reason} ->
?SLOG(error, #{
@ -655,9 +653,6 @@ aggregate_metrics(AllMetrics) ->
AllMetrics
).
get_one_rule(AllRules, Id) ->
[R || R = #{id := Id0} <- AllRules, Id0 == Id].
add_metadata(Params) ->
Params#{
<<"metadata">> => #{

View File

@ -29,11 +29,15 @@ start(_Type, _Args) ->
ok = emqx_rule_events:reload(),
SupRet = emqx_rule_engine_sup:start_link(),
ok = emqx_rule_engine:load_rules(),
emqx_conf:add_handler(emqx_rule_engine:config_key_path(), emqx_rule_engine),
RulePath = [RuleEngine | _] = emqx_rule_engine:config_key_path(),
emqx_conf:add_handler(RulePath ++ ['?'], emqx_rule_engine),
emqx_conf:add_handler([RuleEngine], emqx_rule_engine),
emqx_rule_engine_cli:load(),
SupRet.
stop(_State) ->
emqx_rule_engine_cli:unload(),
emqx_conf:remove_handler(emqx_rule_engine:config_key_path()),
RulePath = [RuleEngine | _] = emqx_rule_engine:config_key_path(),
emqx_conf:remove_handler(RulePath ++ ['?']),
emqx_conf:remove_handler([RuleEngine]),
ok = emqx_rule_events:unload().

View File

@ -1,6 +1,6 @@
{application, emqx_ee_schema_registry, [
{description, "EMQX Schema Registry"},
{vsn, "0.1.3"},
{vsn, "0.1.4"},
{registered, [emqx_ee_schema_registry_sup]},
{mod, {emqx_ee_schema_registry_app, []}},
{applications, [

View File

@ -104,38 +104,32 @@ list_schemas() ->
%%-------------------------------------------------------------------------------------------------
%% `emqx_config_handler' API
%%-------------------------------------------------------------------------------------------------
%% remove
post_config_update(
[?CONF_KEY_ROOT, schemas] = _Path,
_Cmd,
NewConf = #{schemas := NewSchemas},
OldConf = #{},
[?CONF_KEY_ROOT, schemas, Name],
'$remove',
_NewSchemas,
_OldSchemas,
_AppEnvs
) ->
OldSchemas = maps:get(schemas, OldConf, #{}),
#{
added := Added,
changed := Changed0,
removed := Removed
} = emqx_utils_maps:diff_maps(NewSchemas, OldSchemas),
Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0),
RemovedNames = maps:keys(Removed),
case RemovedNames of
[] ->
ok;
_ ->
async_delete_serdes(RemovedNames)
end,
SchemasToBuild = maps:to_list(maps:merge(Changed, Added)),
case build_serdes(SchemasToBuild) of
async_delete_serdes([Name]),
ok;
%% add or update
post_config_update(
[?CONF_KEY_ROOT, schemas, NewName],
_Cmd,
NewSchemas,
%% undefined or OldSchemas
_,
_AppEnvs
) ->
case build_serdes([{NewName, NewSchemas}]) of
ok ->
{ok, NewConf};
{ok, #{NewName => NewSchemas}};
{error, Reason, SerdesToRollback} ->
lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
{error, Reason}
end;
post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) ->
{ok, NewConf}.
end.
%%-------------------------------------------------------------------------------------------------
%% `gen_server' API

View File

@ -11,9 +11,9 @@
start(_StartType, _StartArgs) ->
ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity),
emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry),
emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry),
emqx_ee_schema_registry_sup:start_link().
stop(_State) ->
emqx_conf:remove_handler(?CONF_KEY_PATH),
emqx_conf:remove_handler([?CONF_KEY_ROOT, schemas, '?']),
ok.

View File

@ -607,21 +607,25 @@ t_fail_rollback(Config) ->
SerdeType = ?config(serde_type, Config),
OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)),
BrokenSchema = OkSchema#{<<"source">> := <<"{}">>},
%% hopefully, for this small map, the key order is used.
Serdes = #{
<<"a">> => OkSchema,
<<"z">> => BrokenSchema
},
?assertMatch(
{error, _},
{ok, _},
emqx_conf:update(
[?CONF_KEY_ROOT, schemas],
Serdes,
[?CONF_KEY_ROOT, schemas, <<"a">>],
OkSchema,
#{}
)
),
%% no serdes should be in the table
?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"a">>)),
?assertMatch(
{error, _},
emqx_conf:update(
[?CONF_KEY_ROOT, schemas, <<"z">>],
BrokenSchema,
#{}
)
),
?assertMatch({ok, #{name := <<"a">>}}, emqx_ee_schema_registry:get_serde(<<"a">>)),
%% no z serdes should be in the table
?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"z">>)),
ok.