From 5629fe60c12d0d7e94f3e95a6c05ee8b6a99ad55 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 10 Jun 2024 12:12:29 -0300 Subject: [PATCH] refactor: index positions rather than names --- .../emqx_message_transformation_config.erl | 22 ++-- .../emqx_message_transformation_registry.erl | 107 +++++++++++------- 2 files changed, 75 insertions(+), 54 deletions(-) diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation_config.erl b/apps/emqx_message_transformation/src/emqx_message_transformation_config.erl index 3ddd84741..ea7f11fdf 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation_config.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation_config.erl @@ -65,10 +65,10 @@ load() -> unload() -> Transformations = emqx:get_config(?TRANSFORMATIONS_CONF_PATH, []), lists:foreach( - fun(Transformation) -> - ok = emqx_message_transformation_registry:delete(Transformation) + fun({Pos, Transformation}) -> + ok = emqx_message_transformation_registry:delete(Transformation, Pos) end, - Transformations + lists:enumerate(Transformations) ). -spec list() -> [transformation()]. @@ -147,11 +147,11 @@ post_config_update(?TRANSFORMATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, ok = emqx_message_transformation_registry:update(OldTransformation, Pos, NewTransformation), ok; post_config_update(?TRANSFORMATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) -> - {_Pos, Transformation} = fetch_with_index(Old, Name), - ok = emqx_message_transformation_registry:delete(Transformation), + {Pos, Transformation} = fetch_with_index(Old, Name), + ok = emqx_message_transformation_registry:delete(Transformation, Pos), ok; -post_config_update(?TRANSFORMATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) -> - ok = emqx_message_transformation_registry:reindex_positions(New), +post_config_update(?TRANSFORMATIONS_CONF_PATH, {reorder, _Order}, New, Old, _AppEnvs) -> + ok = emqx_message_transformation_registry:reindex_positions(New, Old), ok; post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) -> #{transformations := ResultingTransformations} = ResultingConfig, @@ -182,8 +182,8 @@ post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnv #{transformations := OldTransformations} = Old, lists:foreach( fun(Name) -> - {_Pos, Transformation} = fetch_with_index(OldTransformations, Name), - ok = emqx_message_transformation_registry:delete(Transformation) + {Pos, Transformation} = fetch_with_index(OldTransformations, Name), + ok = emqx_message_transformation_registry:delete(Transformation, Pos) end, DeletedTransformations ), @@ -206,7 +206,9 @@ post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnv end, ChangedTransformations0 ), - ok = emqx_message_transformation_registry:reindex_positions(ResultingTransformations), + ok = emqx_message_transformation_registry:reindex_positions( + ResultingTransformations, OldTransformations + ), {ok, #{changed_transformations => ChangedTransformations}}. %%------------------------------------------------------------------------------ diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation_registry.erl b/apps/emqx_message_transformation/src/emqx_message_transformation_registry.erl index 0e933f0e7..15e06ec7d 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation_registry.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation_registry.erl @@ -10,8 +10,8 @@ lookup/1, insert/2, update/3, - delete/1, - reindex_positions/1, + delete/2, + reindex_positions/2, matching_transformations/1, @@ -52,10 +52,13 @@ -type transformation() :: #{atom() => term()}. -type position_index() :: pos_integer(). --record(reindex_positions, {transformations :: [transformation()]}). +-record(reindex_positions, { + new_transformations :: [transformation()], + old_transformations :: [transformation()] +}). -record(insert, {pos :: position_index(), transformation :: transformation()}). -record(update, {old :: transformation(), pos :: position_index(), new :: transformation()}). --record(delete, {transformation :: transformation()}). +-record(delete, {transformation :: transformation(), pos :: position_index()}). %%------------------------------------------------------------------------------ %% API @@ -75,9 +78,16 @@ lookup(Name) -> {ok, Transformation} end. --spec reindex_positions([transformation()]) -> ok. -reindex_positions(Transformations) -> - gen_server:call(?MODULE, #reindex_positions{transformations = Transformations}, infinity). +-spec reindex_positions([transformation()], [transformation()]) -> ok. +reindex_positions(NewTransformations, OldTransformations) -> + gen_server:call( + ?MODULE, + #reindex_positions{ + new_transformations = NewTransformations, + old_transformations = OldTransformations + }, + infinity + ). -spec insert(position_index(), transformation()) -> ok. insert(Pos, Transformation) -> @@ -87,9 +97,9 @@ insert(Pos, Transformation) -> update(Old, Pos, New) -> gen_server:call(?MODULE, #update{old = Old, pos = Pos, new = New}, infinity). --spec delete(transformation()) -> ok. -delete(Transformation) -> - gen_server:call(?MODULE, #delete{transformation = Transformation}, infinity). +-spec delete(transformation(), position_index()) -> ok. +delete(Transformation, Pos) -> + gen_server:call(?MODULE, #delete{transformation = Transformation, pos = Pos}, infinity). %% @doc Returns a list of matching transformation names, sorted by their configuration order. -spec matching_transformations(emqx_types:topic()) -> [transformation()]. @@ -98,30 +108,25 @@ matching_transformations(Topic) -> lists:flatmap( fun(M) -> case emqx_topic_index:get_record(M, ?TRANSFORMATION_TOPIC_INDEX) of - [Pos] -> - [{Pos, emqx_topic_index:get_id(M)}]; + [Name] -> + [Name]; _ -> [] end end, emqx_topic_index:matches(Topic, ?TRANSFORMATION_TOPIC_INDEX, [unique]) ), - Transformations1 = - lists:flatmap( - fun({Pos, Id}) -> - case lookup(Id) of - {ok, Transformation} -> - [{Pos, Transformation}]; - _ -> - [] - end - end, - Transformations0 - ), - Transformations2 = lists:sort( - fun({Pos1, _V1}, {Pos2, _V2}) -> Pos1 =< Pos2 end, Transformations1 - ), - lists:map(fun({_Pos, V}) -> V end, Transformations2). + lists:flatmap( + fun(Name) -> + case lookup(Name) of + {ok, Transformation} -> + [Transformation]; + _ -> + [] + end + end, + Transformations0 + ). -spec metrics_worker_spec() -> supervisor:child_spec(). metrics_worker_spec() -> @@ -152,8 +157,15 @@ init(_) -> State = #{}, {ok, State}. -handle_call(#reindex_positions{transformations = Transformations}, _From, State) -> - do_reindex_positions(Transformations), +handle_call( + #reindex_positions{ + new_transformations = NewTransformations, + old_transformations = OldTransformations + }, + _From, + State +) -> + do_reindex_positions(NewTransformations, OldTransformations), {reply, ok, State}; handle_call(#insert{pos = Pos, transformation = Transformation}, _From, State) -> do_insert(Pos, Transformation), @@ -161,8 +173,8 @@ handle_call(#insert{pos = Pos, transformation = Transformation}, _From, State) - handle_call(#update{old = OldTransformation, pos = Pos, new = NewTransformation}, _From, State) -> ok = do_update(OldTransformation, Pos, NewTransformation), {reply, ok, State}; -handle_call(#delete{transformation = Transformation}, _From, State) -> - do_delete(Transformation), +handle_call(#delete{transformation = Transformation, pos = Pos}, _From, State) -> + do_delete(Transformation, Pos), {reply, ok, State}; handle_call(_Call, _From, State) -> {reply, ignored, State}. @@ -181,7 +193,14 @@ create_tables() -> _ = emqx_utils_ets:new(?TRANSFORMATION_TAB, [public, ordered_set, {read_concurrency, true}]), ok. -do_reindex_positions(Transformations) -> +do_reindex_positions(NewTransformations, OldTransformations) -> + lists:foreach( + fun({Pos, Transformation}) -> + #{topics := Topics} = Transformation, + delete_topic_index(Pos, Topics) + end, + lists:enumerate(OldTransformations) + ), lists:foreach( fun({Pos, Transformation}) -> #{ @@ -189,9 +208,9 @@ do_reindex_positions(Transformations) -> topics := Topics } = Transformation, do_insert_into_tab(Name, Transformation, Pos), - update_topic_index(Name, Pos, Topics) + upsert_topic_index(Name, Pos, Topics) end, - lists:enumerate(Transformations) + lists:enumerate(NewTransformations) ). do_insert(Pos, Transformation) -> @@ -202,7 +221,7 @@ do_insert(Pos, Transformation) -> } = Transformation, maybe_create_metrics(Name), do_insert_into_tab(Name, Transformation, Pos), - Enabled andalso update_topic_index(Name, Pos, Topics), + Enabled andalso upsert_topic_index(Name, Pos, Topics), ok. do_update(OldTransformation, Pos, NewTransformation) -> @@ -214,17 +233,17 @@ do_update(OldTransformation, Pos, NewTransformation) -> } = NewTransformation, maybe_create_metrics(Name), do_insert_into_tab(Name, NewTransformation, Pos), - delete_topic_index(Name, OldTopics), - Enabled andalso update_topic_index(Name, Pos, NewTopics), + delete_topic_index(Pos, OldTopics), + Enabled andalso upsert_topic_index(Name, Pos, NewTopics), ok. -do_delete(Transformation) -> +do_delete(Transformation, Pos) -> #{ name := Name, topics := Topics } = Transformation, ets:delete(?TRANSFORMATION_TAB, Name), - delete_topic_index(Name, Topics), + delete_topic_index(Pos, Topics), drop_metrics(Name), ok. @@ -244,18 +263,18 @@ maybe_create_metrics(Name) -> drop_metrics(Name) -> ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, Name). -update_topic_index(Name, Pos, Topics) -> +upsert_topic_index(Name, Pos, Topics) -> lists:foreach( fun(Topic) -> - true = emqx_topic_index:insert(Topic, Name, Pos, ?TRANSFORMATION_TOPIC_INDEX) + true = emqx_topic_index:insert(Topic, Pos, Name, ?TRANSFORMATION_TOPIC_INDEX) end, Topics ). -delete_topic_index(Name, Topics) -> +delete_topic_index(Pos, Topics) -> lists:foreach( fun(Topic) -> - true = emqx_topic_index:delete(Topic, Name, ?TRANSFORMATION_TOPIC_INDEX) + true = emqx_topic_index:delete(Topic, Pos, ?TRANSFORMATION_TOPIC_INDEX) end, Topics ).