Merge pull request #13221 from thalesmg/refactor-sv-topic-index-r57-20240610

refactor(schema validation): module organization and position indexing
This commit is contained in:
Thales Macedo Garitezi 2024-06-11 13:29:48 -03:00 committed by GitHub
commit 2c6aa80b3c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 471 additions and 373 deletions

View File

@ -10,12 +10,6 @@
%% API %% API
-export([ -export([
add_handler/0,
remove_handler/0,
load/0,
unload/0,
list/0, list/0,
reorder/1, reorder/1,
lookup/1, lookup/1,
@ -32,13 +26,6 @@
on_message_publish/1 on_message_publish/1
]). ]).
%% `emqx_config_handler' API
-export([pre_config_update/3, post_config_update/5]).
%% `emqx_config_backup' API
-behaviour(emqx_config_backup).
-export([import_config/1]).
%% Internal exports %% Internal exports
-export([parse_sql_check/1]). -export([parse_sql_check/1]).
@ -52,91 +39,46 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-define(TRACE_TAG, "SCHEMA_VALIDATION"). -define(TRACE_TAG, "SCHEMA_VALIDATION").
-define(CONF_ROOT, schema_validation).
-define(CONF_ROOT_BIN, <<"schema_validation">>).
-define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]).
-type validation_name() :: binary(). -type validation_name() :: binary().
-type validation() :: _TODO. -type validation() :: _TODO.
-export_type([
validation/0,
validation_name/0
]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec add_handler() -> ok.
add_handler() ->
ok = emqx_config_handler:add_handler([?CONF_ROOT], ?MODULE),
ok = emqx_config_handler:add_handler(?VALIDATIONS_CONF_PATH, ?MODULE),
ok.
-spec remove_handler() -> ok.
remove_handler() ->
ok = emqx_config_handler:remove_handler(?VALIDATIONS_CONF_PATH),
ok = emqx_config_handler:remove_handler([?CONF_ROOT]),
ok.
load() ->
Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
lists:foreach(
fun({Pos, Validation}) ->
ok = emqx_schema_validation_registry:insert(Pos, Validation)
end,
lists:enumerate(Validations)
).
unload() ->
Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
lists:foreach(
fun(Validation) ->
ok = emqx_schema_validation_registry:delete(Validation)
end,
Validations
).
-spec list() -> [validation()]. -spec list() -> [validation()].
list() -> list() ->
emqx:get_config(?VALIDATIONS_CONF_PATH, []). emqx_schema_validation_config:list().
-spec reorder([validation_name()]) -> -spec reorder([validation_name()]) ->
{ok, _} | {error, _}. {ok, _} | {error, _}.
reorder(Order) -> reorder(Order) ->
emqx_conf:update( emqx_schema_validation_config:reorder(Order).
?VALIDATIONS_CONF_PATH,
{reorder, Order},
#{override_to => cluster}
).
-spec lookup(validation_name()) -> {ok, validation()} | {error, not_found}. -spec lookup(validation_name()) -> {ok, validation()} | {error, not_found}.
lookup(Name) -> lookup(Name) ->
Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []), emqx_schema_validation_config:lookup(Name).
do_lookup(Name, Validations).
-spec insert(validation()) -> -spec insert(validation()) ->
{ok, _} | {error, _}. {ok, _} | {error, _}.
insert(Validation) -> insert(Validation) ->
emqx_conf:update( emqx_schema_validation_config:insert(Validation).
?VALIDATIONS_CONF_PATH,
{append, Validation},
#{override_to => cluster}
).
-spec update(validation()) -> -spec update(validation()) ->
{ok, _} | {error, _}. {ok, _} | {error, _}.
update(Validation) -> update(Validation) ->
emqx_conf:update( emqx_schema_validation_config:update(Validation).
?VALIDATIONS_CONF_PATH,
{update, Validation},
#{override_to => cluster}
).
-spec delete(validation_name()) -> -spec delete(validation_name()) ->
{ok, _} | {error, _}. {ok, _} | {error, _}.
delete(Name) -> delete(Name) ->
emqx_conf:update( emqx_schema_validation_config:delete(Name).
?VALIDATIONS_CONF_PATH,
{delete, Name},
#{override_to => cluster}
).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Hooks %% Hooks
@ -175,116 +117,6 @@ on_message_publish(Message = #message{topic = Topic, headers = Headers}) ->
end end
end. end.
%%------------------------------------------------------------------------------
%% `emqx_config_handler' API
%%------------------------------------------------------------------------------
pre_config_update(?VALIDATIONS_CONF_PATH, {append, Validation}, OldValidations) ->
Validations = OldValidations ++ [Validation],
{ok, Validations};
pre_config_update(?VALIDATIONS_CONF_PATH, {update, Validation}, OldValidations) ->
replace(OldValidations, Validation);
pre_config_update(?VALIDATIONS_CONF_PATH, {delete, Validation}, OldValidations) ->
delete(OldValidations, Validation);
pre_config_update(?VALIDATIONS_CONF_PATH, {reorder, Order}, OldValidations) ->
reorder(OldValidations, Order);
pre_config_update([?CONF_ROOT], {merge, NewConfig}, OldConfig) ->
#{resulting_config := Config} = prepare_config_merge(NewConfig, OldConfig),
{ok, Config};
pre_config_update([?CONF_ROOT], {replace, NewConfig}, _OldConfig) ->
{ok, NewConfig}.
post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
{Pos, Validation} = fetch_with_index(New, Name),
ok = emqx_schema_validation_registry:insert(Pos, Validation),
ok;
post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) ->
{_Pos, OldValidation} = fetch_with_index(Old, Name),
{Pos, NewValidation} = fetch_with_index(New, Name),
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
ok;
post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
{_Pos, Validation} = fetch_with_index(Old, Name),
ok = emqx_schema_validation_registry:delete(Validation),
ok;
post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) ->
ok = emqx_schema_validation_registry:reindex_positions(New),
ok;
post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) ->
#{validations := ResultingValidations} = ResultingConfig,
#{validations := OldValidations} = Old,
#{added := NewValidations0} =
emqx_utils:diff_lists(
ResultingValidations,
OldValidations,
fun(#{name := N}) -> N end
),
NewValidations =
lists:map(
fun(#{name := Name}) ->
{Pos, Validation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:insert(Pos, Validation),
#{name => Name, pos => Pos}
end,
NewValidations0
),
{ok, #{new_validations => NewValidations}};
post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnvs) ->
#{
new_validations := NewValidations,
changed_validations := ChangedValidations0,
deleted_validations := DeletedValidations
} = prepare_config_replace(Input, Old),
#{validations := ResultingValidations} = ResultingConfig,
#{validations := OldValidations} = Old,
lists:foreach(
fun(Name) ->
{_Pos, Validation} = fetch_with_index(OldValidations, Name),
ok = emqx_schema_validation_registry:delete(Validation)
end,
DeletedValidations
),
lists:foreach(
fun(Name) ->
{Pos, Validation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:insert(Pos, Validation)
end,
NewValidations
),
ChangedValidations =
lists:map(
fun(Name) ->
{_Pos, OldValidation} = fetch_with_index(OldValidations, Name),
{Pos, NewValidation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
#{name => Name, pos => Pos}
end,
ChangedValidations0
),
ok = emqx_schema_validation_registry:reindex_positions(ResultingValidations),
{ok, #{changed_validations => ChangedValidations}}.
%%------------------------------------------------------------------------------
%% `emqx_config_backup' API
%%------------------------------------------------------------------------------
import_config(#{?CONF_ROOT_BIN := RawConf0}) ->
Result = emqx_conf:update(
[?CONF_ROOT],
{merge, RawConf0},
#{override_to => cluster, rawconf_with_defaults => true}
),
case Result of
{error, Reason} ->
{error, #{root_key => ?CONF_ROOT, reason => Reason}};
{ok, _} ->
Keys0 = maps:keys(RawConf0),
ChangedPaths = Keys0 -- [<<"validations">>],
{ok, #{root_key => ?CONF_ROOT, changed => ChangedPaths}}
end;
import_config(_RawConf) ->
{ok, #{root_key => ?CONF_ROOT, changed => []}}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal exports %% Internal exports
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -370,112 +202,6 @@ evaluate_schema_check(Check, Validation, #message{payload = Data}) ->
false false
end. end.
replace(OldValidations, Validation = #{<<"name">> := Name}) ->
{Found, RevNewValidations} =
lists:foldl(
fun
(#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name ->
{true, [Validation | Acc]};
(Val, {FoundIn, Acc}) ->
{FoundIn, [Val | Acc]}
end,
{false, []},
OldValidations
),
case Found of
true ->
{ok, lists:reverse(RevNewValidations)};
false ->
{error, not_found}
end.
delete(OldValidations, Name) ->
{Found, RevNewValidations} =
lists:foldl(
fun
(#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name ->
{true, Acc};
(Val, {FoundIn, Acc}) ->
{FoundIn, [Val | Acc]}
end,
{false, []},
OldValidations
),
case Found of
true ->
{ok, lists:reverse(RevNewValidations)};
false ->
{error, not_found}
end.
reorder(Validations, Order) ->
Context = #{
not_found => sets:new([{version, 2}]),
duplicated => sets:new([{version, 2}]),
res => [],
seen => sets:new([{version, 2}])
},
reorder(Validations, Order, Context).
reorder(NotReordered, _Order = [], #{not_found := NotFound0, duplicated := Duplicated0, res := Res}) ->
NotFound = sets:to_list(NotFound0),
Duplicated = sets:to_list(Duplicated0),
case {NotReordered, NotFound, Duplicated} of
{[], [], []} ->
{ok, lists:reverse(Res)};
{_, _, _} ->
Error = #{
not_found => NotFound,
duplicated => Duplicated,
not_reordered => [N || #{<<"name">> := N} <- NotReordered]
},
{error, Error}
end;
reorder(RemainingValidations, [Name | Rest], Context0 = #{seen := Seen0}) ->
case sets:is_element(Name, Seen0) of
true ->
Context = maps:update_with(
duplicated, fun(S) -> sets:add_element(Name, S) end, Context0
),
reorder(RemainingValidations, Rest, Context);
false ->
case safe_take(Name, RemainingValidations) of
error ->
Context = maps:update_with(
not_found, fun(S) -> sets:add_element(Name, S) end, Context0
),
reorder(RemainingValidations, Rest, Context);
{ok, {Validation, Front, Rear}} ->
Context1 = maps:update_with(
seen, fun(S) -> sets:add_element(Name, S) end, Context0
),
Context = maps:update_with(res, fun(Vs) -> [Validation | Vs] end, Context1),
reorder(Front ++ Rear, Rest, Context)
end
end.
fetch_with_index([{Pos, #{name := Name} = Validation} | _Rest], Name) ->
{Pos, Validation};
fetch_with_index([{_, _} | Rest], Name) ->
fetch_with_index(Rest, Name);
fetch_with_index(Validations, Name) ->
fetch_with_index(lists:enumerate(Validations), Name).
safe_take(Name, Validations) ->
case lists:splitwith(fun(#{<<"name">> := N}) -> N =/= Name end, Validations) of
{_Front, []} ->
error;
{Front, [Found | Rear]} ->
{ok, {Found, Front, Rear}}
end.
do_lookup(_Name, _Validations = []) ->
{error, not_found};
do_lookup(Name, [#{name := Name} = Validation | _Rest]) ->
{ok, Validation};
do_lookup(Name, [_ | Rest]) ->
do_lookup(Name, Rest).
run_validations(Validations, Message) -> run_validations(Validations, Message) ->
try try
emqx_rule_runtime:clear_rule_payload(), emqx_rule_runtime:clear_rule_payload(),
@ -557,55 +283,3 @@ run_schema_validation_failed_hook(Message, Validation) ->
#{name := Name} = Validation, #{name := Name} = Validation,
ValidationContext = #{name => Name}, ValidationContext = #{name => Name},
emqx_hooks:run('schema.validation_failed', [Message, ValidationContext]). emqx_hooks:run('schema.validation_failed', [Message, ValidationContext]).
%% "Merging" in the context of the validation array means:
%% * Existing validations (identified by `name') are left untouched.
%% * No validations are removed.
%% * New validations are appended to the existing list.
%% * Existing validations are not reordered.
prepare_config_merge(NewConfig0, OldConfig) ->
{ImportedRawValidations, NewConfigNoValidations} =
case maps:take(<<"validations">>, NewConfig0) of
error ->
{[], NewConfig0};
{V, R} ->
{V, R}
end,
OldRawValidations = maps:get(<<"validations">>, OldConfig, []),
#{added := NewRawValidations} = emqx_utils:diff_lists(
ImportedRawValidations,
OldRawValidations,
fun(#{<<"name">> := N}) -> N end
),
Config0 = emqx_utils_maps:deep_merge(OldConfig, NewConfigNoValidations),
Config = maps:update_with(
<<"validations">>,
fun(OldVs) -> OldVs ++ NewRawValidations end,
NewRawValidations,
Config0
),
#{
new_validations => NewRawValidations,
resulting_config => Config
}.
prepare_config_replace(NewConfig, OldConfig) ->
ImportedRawValidations = maps:get(<<"validations">>, NewConfig, []),
OldValidations = maps:get(validations, OldConfig, []),
%% Since, at this point, we have an input raw config but a parsed old config, we
%% project both to the to have only their names, and consider common names as changed.
#{
added := NewValidations,
removed := DeletedValidations,
changed := ChangedValidations0,
identical := ChangedValidations1
} = emqx_utils:diff_lists(
lists:map(fun(#{<<"name">> := N}) -> N end, ImportedRawValidations),
lists:map(fun(#{name := N}) -> N end, OldValidations),
fun(N) -> N end
),
#{
new_validations => NewValidations,
changed_validations => ChangedValidations0 ++ ChangedValidations1,
deleted_validations => DeletedValidations
}.

View File

@ -19,14 +19,14 @@
-spec start(application:start_type(), term()) -> {ok, pid()}. -spec start(application:start_type(), term()) -> {ok, pid()}.
start(_Type, _Args) -> start(_Type, _Args) ->
{ok, Sup} = emqx_schema_validation_sup:start_link(), {ok, Sup} = emqx_schema_validation_sup:start_link(),
ok = emqx_schema_validation:add_handler(), ok = emqx_schema_validation_config:add_handler(),
ok = emqx_schema_validation:register_hooks(), ok = emqx_schema_validation:register_hooks(),
ok = emqx_schema_validation:load(), ok = emqx_schema_validation_config:load(),
{ok, Sup}. {ok, Sup}.
-spec stop(term()) -> ok. -spec stop(term()) -> ok.
stop(_State) -> stop(_State) ->
ok = emqx_schema_validation:unload(), ok = emqx_schema_validation_config:unload(),
ok = emqx_schema_validation:unregister_hooks(), ok = emqx_schema_validation:unregister_hooks(),
ok = emqx_schema_validation:remove_handler(), ok = emqx_schema_validation_config:remove_handler(),
ok. ok.

View File

@ -0,0 +1,390 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_schema_validation_config).
%% API
-export([
add_handler/0,
remove_handler/0,
load/0,
unload/0,
list/0,
reorder/1,
lookup/1,
insert/1,
update/1,
delete/1
]).
%% `emqx_config_handler' API
-export([pre_config_update/3, post_config_update/5]).
%% `emqx_config_backup' API
-behaviour(emqx_config_backup).
-export([import_config/1]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-define(CONF_ROOT, schema_validation).
-define(CONF_ROOT_BIN, <<"schema_validation">>).
-define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]).
-type validation_name() :: emqx_schema_validation:validation_name().
-type validation() :: emqx_schema_validation:validation().
-type raw_validation() :: #{binary() => _}.
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
-spec add_handler() -> ok.
add_handler() ->
ok = emqx_config_handler:add_handler([?CONF_ROOT], ?MODULE),
ok = emqx_config_handler:add_handler(?VALIDATIONS_CONF_PATH, ?MODULE),
ok.
-spec remove_handler() -> ok.
remove_handler() ->
ok = emqx_config_handler:remove_handler(?VALIDATIONS_CONF_PATH),
ok = emqx_config_handler:remove_handler([?CONF_ROOT]),
ok.
load() ->
Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
lists:foreach(
fun({Pos, Validation}) ->
ok = emqx_schema_validation_registry:insert(Pos, Validation)
end,
lists:enumerate(Validations)
).
unload() ->
Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
lists:foreach(
fun({Pos, Validation}) ->
ok = emqx_schema_validation_registry:delete(Validation, Pos)
end,
lists:enumerate(Validations)
).
-spec list() -> [validation()].
list() ->
emqx:get_config(?VALIDATIONS_CONF_PATH, []).
-spec reorder([validation_name()]) ->
{ok, _} | {error, _}.
reorder(Order) ->
emqx_conf:update(
?VALIDATIONS_CONF_PATH,
{reorder, Order},
#{override_to => cluster}
).
-spec lookup(validation_name()) -> {ok, validation()} | {error, not_found}.
lookup(Name) ->
Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
do_lookup(Name, Validations).
-spec insert(raw_validation()) ->
{ok, _} | {error, _}.
insert(Validation) ->
emqx_conf:update(
?VALIDATIONS_CONF_PATH,
{append, Validation},
#{override_to => cluster}
).
-spec update(raw_validation()) ->
{ok, _} | {error, _}.
update(Validation) ->
emqx_conf:update(
?VALIDATIONS_CONF_PATH,
{update, Validation},
#{override_to => cluster}
).
-spec delete(validation_name()) ->
{ok, _} | {error, _}.
delete(Name) ->
emqx_conf:update(
?VALIDATIONS_CONF_PATH,
{delete, Name},
#{override_to => cluster}
).
%%------------------------------------------------------------------------------
%% `emqx_config_handler' API
%%------------------------------------------------------------------------------
pre_config_update(?VALIDATIONS_CONF_PATH, {append, Validation}, OldValidations) ->
Validations = OldValidations ++ [Validation],
{ok, Validations};
pre_config_update(?VALIDATIONS_CONF_PATH, {update, Validation}, OldValidations) ->
replace(OldValidations, Validation);
pre_config_update(?VALIDATIONS_CONF_PATH, {delete, Validation}, OldValidations) ->
delete(OldValidations, Validation);
pre_config_update(?VALIDATIONS_CONF_PATH, {reorder, Order}, OldValidations) ->
reorder(OldValidations, Order);
pre_config_update([?CONF_ROOT], {merge, NewConfig}, OldConfig) ->
#{resulting_config := Config} = prepare_config_merge(NewConfig, OldConfig),
{ok, Config};
pre_config_update([?CONF_ROOT], {replace, NewConfig}, _OldConfig) ->
{ok, NewConfig}.
post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
{Pos, Validation} = fetch_with_index(New, Name),
ok = emqx_schema_validation_registry:insert(Pos, Validation),
ok;
post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) ->
{_Pos, OldValidation} = fetch_with_index(Old, Name),
{Pos, NewValidation} = fetch_with_index(New, Name),
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
ok;
post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
{Pos, Validation} = fetch_with_index(Old, Name),
ok = emqx_schema_validation_registry:delete(Validation, Pos),
ok;
post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, Old, _AppEnvs) ->
ok = emqx_schema_validation_registry:reindex_positions(New, Old),
ok;
post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) ->
#{validations := ResultingValidations} = ResultingConfig,
#{validations := OldValidations} = Old,
#{added := NewValidations0} =
emqx_utils:diff_lists(
ResultingValidations,
OldValidations,
fun(#{name := N}) -> N end
),
NewValidations =
lists:map(
fun(#{name := Name}) ->
{Pos, Validation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:insert(Pos, Validation),
#{name => Name, pos => Pos}
end,
NewValidations0
),
{ok, #{new_validations => NewValidations}};
post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnvs) ->
#{
new_validations := NewValidations,
changed_validations := ChangedValidations0,
deleted_validations := DeletedValidations
} = prepare_config_replace(Input, Old),
#{validations := ResultingValidations} = ResultingConfig,
#{validations := OldValidations} = Old,
lists:foreach(
fun(Name) ->
{Pos, Validation} = fetch_with_index(OldValidations, Name),
ok = emqx_schema_validation_registry:delete(Validation, Pos)
end,
DeletedValidations
),
lists:foreach(
fun(Name) ->
{Pos, Validation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:insert(Pos, Validation)
end,
NewValidations
),
ChangedValidations =
lists:map(
fun(Name) ->
{_Pos, OldValidation} = fetch_with_index(OldValidations, Name),
{Pos, NewValidation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
#{name => Name, pos => Pos}
end,
ChangedValidations0
),
ok = emqx_schema_validation_registry:reindex_positions(ResultingValidations, OldValidations),
{ok, #{changed_validations => ChangedValidations}}.
%%------------------------------------------------------------------------------
%% `emqx_config_backup' API
%%------------------------------------------------------------------------------
import_config(#{?CONF_ROOT_BIN := RawConf0}) ->
Result = emqx_conf:update(
[?CONF_ROOT],
{merge, RawConf0},
#{override_to => cluster, rawconf_with_defaults => true}
),
case Result of
{error, Reason} ->
{error, #{root_key => ?CONF_ROOT, reason => Reason}};
{ok, _} ->
Keys0 = maps:keys(RawConf0),
ChangedPaths = Keys0 -- [<<"validations">>],
{ok, #{root_key => ?CONF_ROOT, changed => ChangedPaths}}
end;
import_config(_RawConf) ->
{ok, #{root_key => ?CONF_ROOT, changed => []}}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
replace(OldValidations, Validation = #{<<"name">> := Name}) ->
{Found, RevNewValidations} =
lists:foldl(
fun
(#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name ->
{true, [Validation | Acc]};
(Val, {FoundIn, Acc}) ->
{FoundIn, [Val | Acc]}
end,
{false, []},
OldValidations
),
case Found of
true ->
{ok, lists:reverse(RevNewValidations)};
false ->
{error, not_found}
end.
delete(OldValidations, Name) ->
{Found, RevNewValidations} =
lists:foldl(
fun
(#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name ->
{true, Acc};
(Val, {FoundIn, Acc}) ->
{FoundIn, [Val | Acc]}
end,
{false, []},
OldValidations
),
case Found of
true ->
{ok, lists:reverse(RevNewValidations)};
false ->
{error, not_found}
end.
reorder(Validations, Order) ->
Context = #{
not_found => sets:new([{version, 2}]),
duplicated => sets:new([{version, 2}]),
res => [],
seen => sets:new([{version, 2}])
},
reorder(Validations, Order, Context).
reorder(NotReordered, _Order = [], #{not_found := NotFound0, duplicated := Duplicated0, res := Res}) ->
NotFound = sets:to_list(NotFound0),
Duplicated = sets:to_list(Duplicated0),
case {NotReordered, NotFound, Duplicated} of
{[], [], []} ->
{ok, lists:reverse(Res)};
{_, _, _} ->
Error = #{
not_found => NotFound,
duplicated => Duplicated,
not_reordered => [N || #{<<"name">> := N} <- NotReordered]
},
{error, Error}
end;
reorder(RemainingValidations, [Name | Rest], Context0 = #{seen := Seen0}) ->
case sets:is_element(Name, Seen0) of
true ->
Context = maps:update_with(
duplicated, fun(S) -> sets:add_element(Name, S) end, Context0
),
reorder(RemainingValidations, Rest, Context);
false ->
case safe_take(Name, RemainingValidations) of
error ->
Context = maps:update_with(
not_found, fun(S) -> sets:add_element(Name, S) end, Context0
),
reorder(RemainingValidations, Rest, Context);
{ok, {Validation, Front, Rear}} ->
Context1 = maps:update_with(
seen, fun(S) -> sets:add_element(Name, S) end, Context0
),
Context = maps:update_with(res, fun(Vs) -> [Validation | Vs] end, Context1),
reorder(Front ++ Rear, Rest, Context)
end
end.
fetch_with_index([{Pos, #{name := Name} = Validation} | _Rest], Name) ->
{Pos, Validation};
fetch_with_index([{_, _} | Rest], Name) ->
fetch_with_index(Rest, Name);
fetch_with_index(Validations, Name) ->
fetch_with_index(lists:enumerate(Validations), Name).
safe_take(Name, Validations) ->
case lists:splitwith(fun(#{<<"name">> := N}) -> N =/= Name end, Validations) of
{_Front, []} ->
error;
{Front, [Found | Rear]} ->
{ok, {Found, Front, Rear}}
end.
do_lookup(_Name, _Validations = []) ->
{error, not_found};
do_lookup(Name, [#{name := Name} = Validation | _Rest]) ->
{ok, Validation};
do_lookup(Name, [_ | Rest]) ->
do_lookup(Name, Rest).
%% "Merging" in the context of the validation array means:
%% * Existing validations (identified by `name') are left untouched.
%% * No validations are removed.
%% * New validations are appended to the existing list.
%% * Existing validations are not reordered.
prepare_config_merge(NewConfig0, OldConfig) ->
{ImportedRawValidations, NewConfigNoValidations} =
case maps:take(<<"validations">>, NewConfig0) of
error ->
{[], NewConfig0};
{V, R} ->
{V, R}
end,
OldRawValidations = maps:get(<<"validations">>, OldConfig, []),
#{added := NewRawValidations} = emqx_utils:diff_lists(
ImportedRawValidations,
OldRawValidations,
fun(#{<<"name">> := N}) -> N end
),
Config0 = emqx_utils_maps:deep_merge(OldConfig, NewConfigNoValidations),
Config = maps:update_with(
<<"validations">>,
fun(OldVs) -> OldVs ++ NewRawValidations end,
NewRawValidations,
Config0
),
#{
new_validations => NewRawValidations,
resulting_config => Config
}.
prepare_config_replace(NewConfig, OldConfig) ->
ImportedRawValidations = maps:get(<<"validations">>, NewConfig, []),
OldValidations = maps:get(validations, OldConfig, []),
%% Since, at this point, we have an input raw config but a parsed old config, we
%% project both to the to have only their names, and consider common names as changed.
#{
added := NewValidations,
removed := DeletedValidations,
changed := ChangedValidations0,
identical := ChangedValidations1
} = emqx_utils:diff_lists(
lists:map(fun(#{<<"name">> := N}) -> N end, ImportedRawValidations),
lists:map(fun(#{name := N}) -> N end, OldValidations),
fun(N) -> N end
),
#{
new_validations => NewValidations,
changed_validations => ChangedValidations0 ++ ChangedValidations1,
deleted_validations => DeletedValidations
}.

View File

@ -10,8 +10,8 @@
lookup/1, lookup/1,
insert/2, insert/2,
update/3, update/3,
delete/1, delete/2,
reindex_positions/1, reindex_positions/2,
matching_validations/1, matching_validations/1,
@ -51,10 +51,10 @@
-type validation() :: _TODO. -type validation() :: _TODO.
-type position_index() :: pos_integer(). -type position_index() :: pos_integer().
-record(reindex_positions, {validations :: [validation()]}). -record(reindex_positions, {new_validations :: [validation()], old_validations :: [validation()]}).
-record(insert, {pos :: position_index(), validation :: validation()}). -record(insert, {pos :: position_index(), validation :: validation()}).
-record(update, {old :: validation(), pos :: position_index(), new :: validation()}). -record(update, {old :: validation(), pos :: position_index(), new :: validation()}).
-record(delete, {validation :: validation()}). -record(delete, {validation :: validation(), pos :: position_index()}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
@ -74,9 +74,16 @@ lookup(Name) ->
{ok, Validation} {ok, Validation}
end. end.
-spec reindex_positions([validation()]) -> ok. -spec reindex_positions([validation()], [validation()]) -> ok.
reindex_positions(Validations) -> reindex_positions(NewValidations, OldValidations) ->
gen_server:call(?MODULE, #reindex_positions{validations = Validations}, infinity). gen_server:call(
?MODULE,
#reindex_positions{
new_validations = NewValidations,
old_validations = OldValidations
},
infinity
).
-spec insert(position_index(), validation()) -> ok. -spec insert(position_index(), validation()) -> ok.
insert(Pos, Validation) -> insert(Pos, Validation) ->
@ -86,23 +93,36 @@ insert(Pos, Validation) ->
update(Old, Pos, New) -> update(Old, Pos, New) ->
gen_server:call(?MODULE, #update{old = Old, pos = Pos, new = New}, infinity). gen_server:call(?MODULE, #update{old = Old, pos = Pos, new = New}, infinity).
-spec delete(validation()) -> ok. -spec delete(validation(), position_index()) -> ok.
delete(Validation) -> delete(Validation, Pos) ->
gen_server:call(?MODULE, #delete{validation = Validation}, infinity). gen_server:call(?MODULE, #delete{validation = Validation, pos = Pos}, infinity).
%% @doc Returns a list of matching validation names, sorted by their configuration order. %% @doc Returns a list of matching validation names, sorted by their configuration order.
-spec matching_validations(emqx_types:topic()) -> [validation()]. -spec matching_validations(emqx_types:topic()) -> [validation()].
matching_validations(Topic) -> matching_validations(Topic) ->
Validations0 = [ Validations0 =
{Pos, Validation} lists:flatmap(
|| M <- emqx_topic_index:matches(Topic, ?VALIDATION_TOPIC_INDEX, [unique]), fun(M) ->
[Pos] <- [emqx_topic_index:get_record(M, ?VALIDATION_TOPIC_INDEX)], case emqx_topic_index:get_record(M, ?VALIDATION_TOPIC_INDEX) of
{ok, Validation} <- [ [Name] ->
lookup(emqx_topic_index:get_id(M)) [Name];
] _ ->
], []
Validations1 = lists:sort(fun({Pos1, _V1}, {Pos2, _V2}) -> Pos1 =< Pos2 end, Validations0), end
lists:map(fun({_Pos, V}) -> V end, Validations1). end,
emqx_topic_index:matches(Topic, ?VALIDATION_TOPIC_INDEX, [unique])
),
lists:flatmap(
fun(Name) ->
case lookup(Name) of
{ok, Validation} ->
[Validation];
_ ->
[]
end
end,
Validations0
).
-spec metrics_worker_spec() -> supervisor:child_spec(). -spec metrics_worker_spec() -> supervisor:child_spec().
metrics_worker_spec() -> metrics_worker_spec() ->
@ -133,8 +153,15 @@ init(_) ->
State = #{}, State = #{},
{ok, State}. {ok, State}.
handle_call(#reindex_positions{validations = Validations}, _From, State) -> handle_call(
do_reindex_positions(Validations), #reindex_positions{
new_validations = NewValidations,
old_validations = OldValidations
},
_From,
State
) ->
do_reindex_positions(NewValidations, OldValidations),
{reply, ok, State}; {reply, ok, State};
handle_call(#insert{pos = Pos, validation = Validation}, _From, State) -> handle_call(#insert{pos = Pos, validation = Validation}, _From, State) ->
do_insert(Pos, Validation), do_insert(Pos, Validation),
@ -142,8 +169,8 @@ handle_call(#insert{pos = Pos, validation = Validation}, _From, State) ->
handle_call(#update{old = OldValidation, pos = Pos, new = NewValidation}, _From, State) -> handle_call(#update{old = OldValidation, pos = Pos, new = NewValidation}, _From, State) ->
ok = do_update(OldValidation, Pos, NewValidation), ok = do_update(OldValidation, Pos, NewValidation),
{reply, ok, State}; {reply, ok, State};
handle_call(#delete{validation = Validation}, _From, State) -> handle_call(#delete{validation = Validation, pos = Pos}, _From, State) ->
do_delete(Validation), do_delete(Validation, Pos),
{reply, ok, State}; {reply, ok, State};
handle_call(_Call, _From, State) -> handle_call(_Call, _From, State) ->
{reply, ignored, State}. {reply, ignored, State}.
@ -160,7 +187,14 @@ create_tables() ->
_ = emqx_utils_ets:new(?VALIDATION_TAB, [public, ordered_set, {read_concurrency, true}]), _ = emqx_utils_ets:new(?VALIDATION_TAB, [public, ordered_set, {read_concurrency, true}]),
ok. ok.
do_reindex_positions(Validations) -> do_reindex_positions(NewValidations, OldValidations) ->
lists:foreach(
fun({Pos, Validation}) ->
#{topics := Topics} = Validation,
delete_topic_index(Pos, Topics)
end,
lists:enumerate(OldValidations)
),
lists:foreach( lists:foreach(
fun({Pos, Validation}) -> fun({Pos, Validation}) ->
#{ #{
@ -170,7 +204,7 @@ do_reindex_positions(Validations) ->
do_insert_into_tab(Name, Validation, Pos), do_insert_into_tab(Name, Validation, Pos),
update_topic_index(Name, Pos, Topics) update_topic_index(Name, Pos, Topics)
end, end,
lists:enumerate(Validations) lists:enumerate(NewValidations)
). ).
do_insert(Pos, Validation) -> do_insert(Pos, Validation) ->
@ -193,17 +227,17 @@ do_update(OldValidation, Pos, NewValidation) ->
} = NewValidation, } = NewValidation,
maybe_create_metrics(Name), maybe_create_metrics(Name),
do_insert_into_tab(Name, NewValidation, Pos), do_insert_into_tab(Name, NewValidation, Pos),
delete_topic_index(Name, OldTopics), delete_topic_index(Pos, OldTopics),
Enabled andalso update_topic_index(Name, Pos, NewTopics), Enabled andalso update_topic_index(Name, Pos, NewTopics),
ok. ok.
do_delete(Validation) -> do_delete(Validation, Pos) ->
#{ #{
name := Name, name := Name,
topics := Topics topics := Topics
} = Validation, } = Validation,
ets:delete(?VALIDATION_TAB, Name), ets:delete(?VALIDATION_TAB, Name),
delete_topic_index(Name, Topics), delete_topic_index(Pos, Topics),
drop_metrics(Name), drop_metrics(Name),
ok. ok.
@ -226,15 +260,15 @@ drop_metrics(Name) ->
update_topic_index(Name, Pos, Topics) -> update_topic_index(Name, Pos, Topics) ->
lists:foreach( lists:foreach(
fun(Topic) -> fun(Topic) ->
true = emqx_topic_index:insert(Topic, Name, Pos, ?VALIDATION_TOPIC_INDEX) true = emqx_topic_index:insert(Topic, Pos, Name, ?VALIDATION_TOPIC_INDEX)
end, end,
Topics Topics
). ).
delete_topic_index(Name, Topics) -> delete_topic_index(Pos, Topics) ->
lists:foreach( lists:foreach(
fun(Topic) -> fun(Topic) ->
true = emqx_topic_index:delete(Topic, Name, ?VALIDATION_TOPIC_INDEX) true = emqx_topic_index:delete(Topic, Pos, ?VALIDATION_TOPIC_INDEX)
end, end,
Topics Topics
). ).