refactor: index positions rather than names

Addresses https://github.com/emqx/emqx/pull/13199#discussion_r1633096025
This commit is contained in:
Thales Macedo Garitezi 2024-06-10 15:54:37 -03:00
parent 5eff4a7544
commit 05ebb17cd6
2 changed files with 76 additions and 42 deletions

View File

@ -66,10 +66,10 @@ load() ->
unload() -> unload() ->
Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []), Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
lists:foreach( lists:foreach(
fun(Validation) -> fun({Pos, Validation}) ->
ok = emqx_schema_validation_registry:delete(Validation) ok = emqx_schema_validation_registry:delete(Validation, Pos)
end, end,
Validations lists:enumerate(Validations)
). ).
-spec list() -> [validation()]. -spec list() -> [validation()].
@ -146,11 +146,11 @@ post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New,
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation), ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
ok; ok;
post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) -> post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
{_Pos, Validation} = fetch_with_index(Old, Name), {Pos, Validation} = fetch_with_index(Old, Name),
ok = emqx_schema_validation_registry:delete(Validation), ok = emqx_schema_validation_registry:delete(Validation, Pos),
ok; ok;
post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) -> post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, Old, _AppEnvs) ->
ok = emqx_schema_validation_registry:reindex_positions(New), ok = emqx_schema_validation_registry:reindex_positions(New, Old),
ok; ok;
post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) -> post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) ->
#{validations := ResultingValidations} = ResultingConfig, #{validations := ResultingValidations} = ResultingConfig,
@ -181,8 +181,8 @@ post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnv
#{validations := OldValidations} = Old, #{validations := OldValidations} = Old,
lists:foreach( lists:foreach(
fun(Name) -> fun(Name) ->
{_Pos, Validation} = fetch_with_index(OldValidations, Name), {Pos, Validation} = fetch_with_index(OldValidations, Name),
ok = emqx_schema_validation_registry:delete(Validation) ok = emqx_schema_validation_registry:delete(Validation, Pos)
end, end,
DeletedValidations DeletedValidations
), ),
@ -203,7 +203,7 @@ post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnv
end, end,
ChangedValidations0 ChangedValidations0
), ),
ok = emqx_schema_validation_registry:reindex_positions(ResultingValidations), ok = emqx_schema_validation_registry:reindex_positions(ResultingValidations, OldValidations),
{ok, #{changed_validations => ChangedValidations}}. {ok, #{changed_validations => ChangedValidations}}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -10,8 +10,8 @@
lookup/1, lookup/1,
insert/2, insert/2,
update/3, update/3,
delete/1, delete/2,
reindex_positions/1, reindex_positions/2,
matching_validations/1, matching_validations/1,
@ -51,10 +51,10 @@
-type validation() :: _TODO. -type validation() :: _TODO.
-type position_index() :: pos_integer(). -type position_index() :: pos_integer().
-record(reindex_positions, {validations :: [validation()]}). -record(reindex_positions, {new_validations :: [validation()], old_validations :: [validation()]}).
-record(insert, {pos :: position_index(), validation :: validation()}). -record(insert, {pos :: position_index(), validation :: validation()}).
-record(update, {old :: validation(), pos :: position_index(), new :: validation()}). -record(update, {old :: validation(), pos :: position_index(), new :: validation()}).
-record(delete, {validation :: validation()}). -record(delete, {validation :: validation(), pos :: position_index()}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
@ -74,9 +74,16 @@ lookup(Name) ->
{ok, Validation} {ok, Validation}
end. end.
-spec reindex_positions([validation()]) -> ok. -spec reindex_positions([validation()], [validation()]) -> ok.
reindex_positions(Validations) -> reindex_positions(NewValidations, OldValidations) ->
gen_server:call(?MODULE, #reindex_positions{validations = Validations}, infinity). gen_server:call(
?MODULE,
#reindex_positions{
new_validations = NewValidations,
old_validations = OldValidations
},
infinity
).
-spec insert(position_index(), validation()) -> ok. -spec insert(position_index(), validation()) -> ok.
insert(Pos, Validation) -> insert(Pos, Validation) ->
@ -86,23 +93,36 @@ insert(Pos, Validation) ->
update(Old, Pos, New) -> update(Old, Pos, New) ->
gen_server:call(?MODULE, #update{old = Old, pos = Pos, new = New}, infinity). gen_server:call(?MODULE, #update{old = Old, pos = Pos, new = New}, infinity).
-spec delete(validation()) -> ok. -spec delete(validation(), position_index()) -> ok.
delete(Validation) -> delete(Validation, Pos) ->
gen_server:call(?MODULE, #delete{validation = Validation}, infinity). gen_server:call(?MODULE, #delete{validation = Validation, pos = Pos}, infinity).
%% @doc Returns a list of matching validation names, sorted by their configuration order. %% @doc Returns a list of matching validation names, sorted by their configuration order.
-spec matching_validations(emqx_types:topic()) -> [validation()]. -spec matching_validations(emqx_types:topic()) -> [validation()].
matching_validations(Topic) -> matching_validations(Topic) ->
Validations0 = [ Validations0 =
{Pos, Validation} lists:flatmap(
|| M <- emqx_topic_index:matches(Topic, ?VALIDATION_TOPIC_INDEX, [unique]), fun(M) ->
[Pos] <- [emqx_topic_index:get_record(M, ?VALIDATION_TOPIC_INDEX)], case emqx_topic_index:get_record(M, ?VALIDATION_TOPIC_INDEX) of
{ok, Validation} <- [ [Name] ->
lookup(emqx_topic_index:get_id(M)) [Name];
] _ ->
], []
Validations1 = lists:sort(fun({Pos1, _V1}, {Pos2, _V2}) -> Pos1 =< Pos2 end, Validations0), end
lists:map(fun({_Pos, V}) -> V end, Validations1). end,
emqx_topic_index:matches(Topic, ?VALIDATION_TOPIC_INDEX, [unique])
),
lists:flatmap(
fun(Name) ->
case lookup(Name) of
{ok, Validation} ->
[Validation];
_ ->
[]
end
end,
Validations0
).
-spec metrics_worker_spec() -> supervisor:child_spec(). -spec metrics_worker_spec() -> supervisor:child_spec().
metrics_worker_spec() -> metrics_worker_spec() ->
@ -133,8 +153,15 @@ init(_) ->
State = #{}, State = #{},
{ok, State}. {ok, State}.
handle_call(#reindex_positions{validations = Validations}, _From, State) -> handle_call(
do_reindex_positions(Validations), #reindex_positions{
new_validations = NewValidations,
old_validations = OldValidations
},
_From,
State
) ->
do_reindex_positions(NewValidations, OldValidations),
{reply, ok, State}; {reply, ok, State};
handle_call(#insert{pos = Pos, validation = Validation}, _From, State) -> handle_call(#insert{pos = Pos, validation = Validation}, _From, State) ->
do_insert(Pos, Validation), do_insert(Pos, Validation),
@ -142,8 +169,8 @@ handle_call(#insert{pos = Pos, validation = Validation}, _From, State) ->
handle_call(#update{old = OldValidation, pos = Pos, new = NewValidation}, _From, State) -> handle_call(#update{old = OldValidation, pos = Pos, new = NewValidation}, _From, State) ->
ok = do_update(OldValidation, Pos, NewValidation), ok = do_update(OldValidation, Pos, NewValidation),
{reply, ok, State}; {reply, ok, State};
handle_call(#delete{validation = Validation}, _From, State) -> handle_call(#delete{validation = Validation, pos = Pos}, _From, State) ->
do_delete(Validation), do_delete(Validation, Pos),
{reply, ok, State}; {reply, ok, State};
handle_call(_Call, _From, State) -> handle_call(_Call, _From, State) ->
{reply, ignored, State}. {reply, ignored, State}.
@ -160,7 +187,14 @@ create_tables() ->
_ = emqx_utils_ets:new(?VALIDATION_TAB, [public, ordered_set, {read_concurrency, true}]), _ = emqx_utils_ets:new(?VALIDATION_TAB, [public, ordered_set, {read_concurrency, true}]),
ok. ok.
do_reindex_positions(Validations) -> do_reindex_positions(NewValidations, OldValidations) ->
lists:foreach(
fun({Pos, Validation}) ->
#{topics := Topics} = Validation,
delete_topic_index(Pos, Topics)
end,
lists:enumerate(OldValidations)
),
lists:foreach( lists:foreach(
fun({Pos, Validation}) -> fun({Pos, Validation}) ->
#{ #{
@ -170,7 +204,7 @@ do_reindex_positions(Validations) ->
do_insert_into_tab(Name, Validation, Pos), do_insert_into_tab(Name, Validation, Pos),
update_topic_index(Name, Pos, Topics) update_topic_index(Name, Pos, Topics)
end, end,
lists:enumerate(Validations) lists:enumerate(NewValidations)
). ).
do_insert(Pos, Validation) -> do_insert(Pos, Validation) ->
@ -193,17 +227,17 @@ do_update(OldValidation, Pos, NewValidation) ->
} = NewValidation, } = NewValidation,
maybe_create_metrics(Name), maybe_create_metrics(Name),
do_insert_into_tab(Name, NewValidation, Pos), do_insert_into_tab(Name, NewValidation, Pos),
delete_topic_index(Name, OldTopics), delete_topic_index(Pos, OldTopics),
Enabled andalso update_topic_index(Name, Pos, NewTopics), Enabled andalso update_topic_index(Name, Pos, NewTopics),
ok. ok.
do_delete(Validation) -> do_delete(Validation, Pos) ->
#{ #{
name := Name, name := Name,
topics := Topics topics := Topics
} = Validation, } = Validation,
ets:delete(?VALIDATION_TAB, Name), ets:delete(?VALIDATION_TAB, Name),
delete_topic_index(Name, Topics), delete_topic_index(Pos, Topics),
drop_metrics(Name), drop_metrics(Name),
ok. ok.
@ -226,15 +260,15 @@ drop_metrics(Name) ->
update_topic_index(Name, Pos, Topics) -> update_topic_index(Name, Pos, Topics) ->
lists:foreach( lists:foreach(
fun(Topic) -> fun(Topic) ->
true = emqx_topic_index:insert(Topic, Name, Pos, ?VALIDATION_TOPIC_INDEX) true = emqx_topic_index:insert(Topic, Pos, Name, ?VALIDATION_TOPIC_INDEX)
end, end,
Topics Topics
). ).
delete_topic_index(Name, Topics) -> delete_topic_index(Pos, Topics) ->
lists:foreach( lists:foreach(
fun(Topic) -> fun(Topic) ->
true = emqx_topic_index:delete(Topic, Name, ?VALIDATION_TOPIC_INDEX) true = emqx_topic_index:delete(Topic, Pos, ?VALIDATION_TOPIC_INDEX)
end, end,
Topics Topics
). ).