From 8c7a7cf9db8f65b519fdcceef8d7908adcb424e7 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 10 Jun 2024 11:08:18 -0300 Subject: [PATCH] refactor: apply review remarks --- .../src/emqx_message_transformation.erl | 39 ++++++++++--------- .../emqx_message_transformation_registry.erl | 38 ++++++++++++------ 2 files changed, 48 insertions(+), 29 deletions(-) diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.erl b/apps/emqx_message_transformation/src/emqx_message_transformation.erl index 84af327f6..0ffb9f606 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.erl @@ -112,27 +112,12 @@ unregister_hooks() -> -spec on_message_publish(emqx_types:message()) -> {ok, emqx_types:message()} | {stop, emqx_types:message()}. -on_message_publish(Message = #message{topic = Topic, headers = Headers}) -> +on_message_publish(Message = #message{topic = Topic}) -> case emqx_message_transformation_registry:matching_transformations(Topic) of [] -> ok; Transformations -> - case run_transformations(Transformations, Message) of - #message{} = FinalMessage -> - emqx_metrics:inc('messages.transformation_succeeded'), - {ok, FinalMessage}; - drop -> - emqx_metrics:inc('messages.transformation_failed'), - {stop, Message#message{headers = Headers#{allow_publish => false}}}; - disconnect -> - emqx_metrics:inc('messages.transformation_failed'), - {stop, Message#message{ - headers = Headers#{ - allow_publish => false, - should_disconnect => true - } - }} - end + run_transformations(Transformations, Message) end. %%------------------------------------------------------------------------------ @@ -224,7 +209,25 @@ map_result(RetainBin, [<<"retain">>]) -> map_result(Rendered, _Key) -> {ok, Rendered}. -run_transformations(Transformations, Message) -> +run_transformations(Transformations, Message = #message{headers = Headers}) -> + case do_run_transformations(Transformations, Message) of + #message{} = FinalMessage -> + emqx_metrics:inc('messages.transformation_succeeded'), + {ok, FinalMessage}; + drop -> + emqx_metrics:inc('messages.transformation_failed'), + {stop, Message#message{headers = Headers#{allow_publish => false}}}; + disconnect -> + emqx_metrics:inc('messages.transformation_failed'), + {stop, Message#message{ + headers = Headers#{ + allow_publish => false, + should_disconnect => true + } + }} + end. + +do_run_transformations(Transformations, Message) -> Fun = fun(Transformation, MessageAcc) -> #{name := Name} = Transformation, emqx_message_transformation_registry:inc_matched(Name), diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation_registry.erl b/apps/emqx_message_transformation/src/emqx_message_transformation_registry.erl index dd692a55c..0e933f0e7 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation_registry.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation_registry.erl @@ -94,18 +94,34 @@ delete(Transformation) -> %% @doc Returns a list of matching transformation names, sorted by their configuration order. -spec matching_transformations(emqx_types:topic()) -> [transformation()]. matching_transformations(Topic) -> - Transformations0 = [ - {Pos, Transformation} - || M <- emqx_topic_index:matches(Topic, ?TRANSFORMATION_TOPIC_INDEX, [unique]), - [Pos] <- [emqx_topic_index:get_record(M, ?TRANSFORMATION_TOPIC_INDEX)], - {ok, Transformation} <- [ - lookup(emqx_topic_index:get_id(M)) - ] - ], - Transformations1 = lists:sort( - fun({Pos1, _V1}, {Pos2, _V2}) -> Pos1 =< Pos2 end, Transformations0 + Transformations0 = + lists:flatmap( + fun(M) -> + case emqx_topic_index:get_record(M, ?TRANSFORMATION_TOPIC_INDEX) of + [Pos] -> + [{Pos, emqx_topic_index:get_id(M)}]; + _ -> + [] + end + end, + emqx_topic_index:matches(Topic, ?TRANSFORMATION_TOPIC_INDEX, [unique]) + ), + Transformations1 = + lists:flatmap( + fun({Pos, Id}) -> + case lookup(Id) of + {ok, Transformation} -> + [{Pos, Transformation}]; + _ -> + [] + end + end, + Transformations0 + ), + Transformations2 = lists:sort( + fun({Pos1, _V1}, {Pos2, _V2}) -> Pos1 =< Pos2 end, Transformations1 ), - lists:map(fun({_Pos, V}) -> V end, Transformations1). + lists:map(fun({_Pos, V}) -> V end, Transformations2). -spec metrics_worker_spec() -> supervisor:child_spec(). metrics_worker_spec() ->