refactor: index positions rather than names

This commit is contained in:
Thales Macedo Garitezi 2024-06-10 12:12:29 -03:00
parent 8c7a7cf9db
commit 5629fe60c1
2 changed files with 75 additions and 54 deletions

View File

@ -65,10 +65,10 @@ load() ->
unload() -> unload() ->
Transformations = emqx:get_config(?TRANSFORMATIONS_CONF_PATH, []), Transformations = emqx:get_config(?TRANSFORMATIONS_CONF_PATH, []),
lists:foreach( lists:foreach(
fun(Transformation) -> fun({Pos, Transformation}) ->
ok = emqx_message_transformation_registry:delete(Transformation) ok = emqx_message_transformation_registry:delete(Transformation, Pos)
end, end,
Transformations lists:enumerate(Transformations)
). ).
-spec list() -> [transformation()]. -spec list() -> [transformation()].
@ -147,11 +147,11 @@ post_config_update(?TRANSFORMATIONS_CONF_PATH, {update, #{<<"name">> := Name}},
ok = emqx_message_transformation_registry:update(OldTransformation, Pos, NewTransformation), ok = emqx_message_transformation_registry:update(OldTransformation, Pos, NewTransformation),
ok; ok;
post_config_update(?TRANSFORMATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) -> post_config_update(?TRANSFORMATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
{_Pos, Transformation} = fetch_with_index(Old, Name), {Pos, Transformation} = fetch_with_index(Old, Name),
ok = emqx_message_transformation_registry:delete(Transformation), ok = emqx_message_transformation_registry:delete(Transformation, Pos),
ok; ok;
post_config_update(?TRANSFORMATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) -> post_config_update(?TRANSFORMATIONS_CONF_PATH, {reorder, _Order}, New, Old, _AppEnvs) ->
ok = emqx_message_transformation_registry:reindex_positions(New), ok = emqx_message_transformation_registry:reindex_positions(New, Old),
ok; ok;
post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) -> post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) ->
#{transformations := ResultingTransformations} = ResultingConfig, #{transformations := ResultingTransformations} = ResultingConfig,
@ -182,8 +182,8 @@ post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnv
#{transformations := OldTransformations} = Old, #{transformations := OldTransformations} = Old,
lists:foreach( lists:foreach(
fun(Name) -> fun(Name) ->
{_Pos, Transformation} = fetch_with_index(OldTransformations, Name), {Pos, Transformation} = fetch_with_index(OldTransformations, Name),
ok = emqx_message_transformation_registry:delete(Transformation) ok = emqx_message_transformation_registry:delete(Transformation, Pos)
end, end,
DeletedTransformations DeletedTransformations
), ),
@ -206,7 +206,9 @@ post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnv
end, end,
ChangedTransformations0 ChangedTransformations0
), ),
ok = emqx_message_transformation_registry:reindex_positions(ResultingTransformations), ok = emqx_message_transformation_registry:reindex_positions(
ResultingTransformations, OldTransformations
),
{ok, #{changed_transformations => ChangedTransformations}}. {ok, #{changed_transformations => ChangedTransformations}}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -10,8 +10,8 @@
lookup/1, lookup/1,
insert/2, insert/2,
update/3, update/3,
delete/1, delete/2,
reindex_positions/1, reindex_positions/2,
matching_transformations/1, matching_transformations/1,
@ -52,10 +52,13 @@
-type transformation() :: #{atom() => term()}. -type transformation() :: #{atom() => term()}.
-type position_index() :: pos_integer(). -type position_index() :: pos_integer().
-record(reindex_positions, {transformations :: [transformation()]}). -record(reindex_positions, {
new_transformations :: [transformation()],
old_transformations :: [transformation()]
}).
-record(insert, {pos :: position_index(), transformation :: transformation()}). -record(insert, {pos :: position_index(), transformation :: transformation()}).
-record(update, {old :: transformation(), pos :: position_index(), new :: transformation()}). -record(update, {old :: transformation(), pos :: position_index(), new :: transformation()}).
-record(delete, {transformation :: transformation()}). -record(delete, {transformation :: transformation(), pos :: position_index()}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
@ -75,9 +78,16 @@ lookup(Name) ->
{ok, Transformation} {ok, Transformation}
end. end.
-spec reindex_positions([transformation()]) -> ok. -spec reindex_positions([transformation()], [transformation()]) -> ok.
reindex_positions(Transformations) -> reindex_positions(NewTransformations, OldTransformations) ->
gen_server:call(?MODULE, #reindex_positions{transformations = Transformations}, infinity). gen_server:call(
?MODULE,
#reindex_positions{
new_transformations = NewTransformations,
old_transformations = OldTransformations
},
infinity
).
-spec insert(position_index(), transformation()) -> ok. -spec insert(position_index(), transformation()) -> ok.
insert(Pos, Transformation) -> insert(Pos, Transformation) ->
@ -87,9 +97,9 @@ insert(Pos, Transformation) ->
update(Old, Pos, New) -> update(Old, Pos, New) ->
gen_server:call(?MODULE, #update{old = Old, pos = Pos, new = New}, infinity). gen_server:call(?MODULE, #update{old = Old, pos = Pos, new = New}, infinity).
-spec delete(transformation()) -> ok. -spec delete(transformation(), position_index()) -> ok.
delete(Transformation) -> delete(Transformation, Pos) ->
gen_server:call(?MODULE, #delete{transformation = Transformation}, infinity). gen_server:call(?MODULE, #delete{transformation = Transformation, pos = Pos}, infinity).
%% @doc Returns a list of matching transformation names, sorted by their configuration order. %% @doc Returns a list of matching transformation names, sorted by their configuration order.
-spec matching_transformations(emqx_types:topic()) -> [transformation()]. -spec matching_transformations(emqx_types:topic()) -> [transformation()].
@ -98,30 +108,25 @@ matching_transformations(Topic) ->
lists:flatmap( lists:flatmap(
fun(M) -> fun(M) ->
case emqx_topic_index:get_record(M, ?TRANSFORMATION_TOPIC_INDEX) of case emqx_topic_index:get_record(M, ?TRANSFORMATION_TOPIC_INDEX) of
[Pos] -> [Name] ->
[{Pos, emqx_topic_index:get_id(M)}]; [Name];
_ -> _ ->
[] []
end end
end, end,
emqx_topic_index:matches(Topic, ?TRANSFORMATION_TOPIC_INDEX, [unique]) emqx_topic_index:matches(Topic, ?TRANSFORMATION_TOPIC_INDEX, [unique])
), ),
Transformations1 = lists:flatmap(
lists:flatmap( fun(Name) ->
fun({Pos, Id}) -> case lookup(Name) of
case lookup(Id) of {ok, Transformation} ->
{ok, Transformation} -> [Transformation];
[{Pos, Transformation}]; _ ->
_ -> []
[] end
end end,
end, Transformations0
Transformations0 ).
),
Transformations2 = lists:sort(
fun({Pos1, _V1}, {Pos2, _V2}) -> Pos1 =< Pos2 end, Transformations1
),
lists:map(fun({_Pos, V}) -> V end, Transformations2).
-spec metrics_worker_spec() -> supervisor:child_spec(). -spec metrics_worker_spec() -> supervisor:child_spec().
metrics_worker_spec() -> metrics_worker_spec() ->
@ -152,8 +157,15 @@ init(_) ->
State = #{}, State = #{},
{ok, State}. {ok, State}.
handle_call(#reindex_positions{transformations = Transformations}, _From, State) -> handle_call(
do_reindex_positions(Transformations), #reindex_positions{
new_transformations = NewTransformations,
old_transformations = OldTransformations
},
_From,
State
) ->
do_reindex_positions(NewTransformations, OldTransformations),
{reply, ok, State}; {reply, ok, State};
handle_call(#insert{pos = Pos, transformation = Transformation}, _From, State) -> handle_call(#insert{pos = Pos, transformation = Transformation}, _From, State) ->
do_insert(Pos, Transformation), do_insert(Pos, Transformation),
@ -161,8 +173,8 @@ handle_call(#insert{pos = Pos, transformation = Transformation}, _From, State) -
handle_call(#update{old = OldTransformation, pos = Pos, new = NewTransformation}, _From, State) -> handle_call(#update{old = OldTransformation, pos = Pos, new = NewTransformation}, _From, State) ->
ok = do_update(OldTransformation, Pos, NewTransformation), ok = do_update(OldTransformation, Pos, NewTransformation),
{reply, ok, State}; {reply, ok, State};
handle_call(#delete{transformation = Transformation}, _From, State) -> handle_call(#delete{transformation = Transformation, pos = Pos}, _From, State) ->
do_delete(Transformation), do_delete(Transformation, Pos),
{reply, ok, State}; {reply, ok, State};
handle_call(_Call, _From, State) -> handle_call(_Call, _From, State) ->
{reply, ignored, State}. {reply, ignored, State}.
@ -181,7 +193,14 @@ create_tables() ->
_ = emqx_utils_ets:new(?TRANSFORMATION_TAB, [public, ordered_set, {read_concurrency, true}]), _ = emqx_utils_ets:new(?TRANSFORMATION_TAB, [public, ordered_set, {read_concurrency, true}]),
ok. ok.
do_reindex_positions(Transformations) -> do_reindex_positions(NewTransformations, OldTransformations) ->
lists:foreach(
fun({Pos, Transformation}) ->
#{topics := Topics} = Transformation,
delete_topic_index(Pos, Topics)
end,
lists:enumerate(OldTransformations)
),
lists:foreach( lists:foreach(
fun({Pos, Transformation}) -> fun({Pos, Transformation}) ->
#{ #{
@ -189,9 +208,9 @@ do_reindex_positions(Transformations) ->
topics := Topics topics := Topics
} = Transformation, } = Transformation,
do_insert_into_tab(Name, Transformation, Pos), do_insert_into_tab(Name, Transformation, Pos),
update_topic_index(Name, Pos, Topics) upsert_topic_index(Name, Pos, Topics)
end, end,
lists:enumerate(Transformations) lists:enumerate(NewTransformations)
). ).
do_insert(Pos, Transformation) -> do_insert(Pos, Transformation) ->
@ -202,7 +221,7 @@ do_insert(Pos, Transformation) ->
} = Transformation, } = Transformation,
maybe_create_metrics(Name), maybe_create_metrics(Name),
do_insert_into_tab(Name, Transformation, Pos), do_insert_into_tab(Name, Transformation, Pos),
Enabled andalso update_topic_index(Name, Pos, Topics), Enabled andalso upsert_topic_index(Name, Pos, Topics),
ok. ok.
do_update(OldTransformation, Pos, NewTransformation) -> do_update(OldTransformation, Pos, NewTransformation) ->
@ -214,17 +233,17 @@ do_update(OldTransformation, Pos, NewTransformation) ->
} = NewTransformation, } = NewTransformation,
maybe_create_metrics(Name), maybe_create_metrics(Name),
do_insert_into_tab(Name, NewTransformation, Pos), do_insert_into_tab(Name, NewTransformation, Pos),
delete_topic_index(Name, OldTopics), delete_topic_index(Pos, OldTopics),
Enabled andalso update_topic_index(Name, Pos, NewTopics), Enabled andalso upsert_topic_index(Name, Pos, NewTopics),
ok. ok.
do_delete(Transformation) -> do_delete(Transformation, Pos) ->
#{ #{
name := Name, name := Name,
topics := Topics topics := Topics
} = Transformation, } = Transformation,
ets:delete(?TRANSFORMATION_TAB, Name), ets:delete(?TRANSFORMATION_TAB, Name),
delete_topic_index(Name, Topics), delete_topic_index(Pos, Topics),
drop_metrics(Name), drop_metrics(Name),
ok. ok.
@ -244,18 +263,18 @@ maybe_create_metrics(Name) ->
drop_metrics(Name) -> drop_metrics(Name) ->
ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, Name). ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, Name).
update_topic_index(Name, Pos, Topics) -> upsert_topic_index(Name, Pos, Topics) ->
lists:foreach( lists:foreach(
fun(Topic) -> fun(Topic) ->
true = emqx_topic_index:insert(Topic, Name, Pos, ?TRANSFORMATION_TOPIC_INDEX) true = emqx_topic_index:insert(Topic, Pos, Name, ?TRANSFORMATION_TOPIC_INDEX)
end, end,
Topics Topics
). ).
delete_topic_index(Name, Topics) -> delete_topic_index(Pos, Topics) ->
lists:foreach( lists:foreach(
fun(Topic) -> fun(Topic) ->
true = emqx_topic_index:delete(Topic, Name, ?TRANSFORMATION_TOPIC_INDEX) true = emqx_topic_index:delete(Topic, Pos, ?TRANSFORMATION_TOPIC_INDEX)
end, end,
Topics Topics
). ).