refactor: apply review remarks

This commit is contained in:
Thales Macedo Garitezi 2024-06-10 11:08:18 -03:00
parent e54cf2f218
commit 8c7a7cf9db
2 changed files with 48 additions and 29 deletions

View File

@ -112,27 +112,12 @@ unregister_hooks() ->
-spec on_message_publish(emqx_types:message()) ->
{ok, emqx_types:message()} | {stop, emqx_types:message()}.
on_message_publish(Message = #message{topic = Topic, headers = Headers}) ->
on_message_publish(Message = #message{topic = Topic}) ->
case emqx_message_transformation_registry:matching_transformations(Topic) of
[] ->
ok;
Transformations ->
case run_transformations(Transformations, Message) of
#message{} = FinalMessage ->
emqx_metrics:inc('messages.transformation_succeeded'),
{ok, FinalMessage};
drop ->
emqx_metrics:inc('messages.transformation_failed'),
{stop, Message#message{headers = Headers#{allow_publish => false}}};
disconnect ->
emqx_metrics:inc('messages.transformation_failed'),
{stop, Message#message{
headers = Headers#{
allow_publish => false,
should_disconnect => true
}
}}
end
run_transformations(Transformations, Message)
end.
%%------------------------------------------------------------------------------
@ -224,7 +209,25 @@ map_result(RetainBin, [<<"retain">>]) ->
map_result(Rendered, _Key) ->
{ok, Rendered}.
run_transformations(Transformations, Message) ->
run_transformations(Transformations, Message = #message{headers = Headers}) ->
case do_run_transformations(Transformations, Message) of
#message{} = FinalMessage ->
emqx_metrics:inc('messages.transformation_succeeded'),
{ok, FinalMessage};
drop ->
emqx_metrics:inc('messages.transformation_failed'),
{stop, Message#message{headers = Headers#{allow_publish => false}}};
disconnect ->
emqx_metrics:inc('messages.transformation_failed'),
{stop, Message#message{
headers = Headers#{
allow_publish => false,
should_disconnect => true
}
}}
end.
do_run_transformations(Transformations, Message) ->
Fun = fun(Transformation, MessageAcc) ->
#{name := Name} = Transformation,
emqx_message_transformation_registry:inc_matched(Name),

View File

@ -94,18 +94,34 @@ delete(Transformation) ->
%% @doc Returns a list of matching transformation names, sorted by their configuration order.
-spec matching_transformations(emqx_types:topic()) -> [transformation()].
matching_transformations(Topic) ->
Transformations0 = [
{Pos, Transformation}
|| M <- emqx_topic_index:matches(Topic, ?TRANSFORMATION_TOPIC_INDEX, [unique]),
[Pos] <- [emqx_topic_index:get_record(M, ?TRANSFORMATION_TOPIC_INDEX)],
{ok, Transformation} <- [
lookup(emqx_topic_index:get_id(M))
]
],
Transformations1 = lists:sort(
fun({Pos1, _V1}, {Pos2, _V2}) -> Pos1 =< Pos2 end, Transformations0
Transformations0 =
lists:flatmap(
fun(M) ->
case emqx_topic_index:get_record(M, ?TRANSFORMATION_TOPIC_INDEX) of
[Pos] ->
[{Pos, emqx_topic_index:get_id(M)}];
_ ->
[]
end
end,
emqx_topic_index:matches(Topic, ?TRANSFORMATION_TOPIC_INDEX, [unique])
),
lists:map(fun({_Pos, V}) -> V end, Transformations1).
Transformations1 =
lists:flatmap(
fun({Pos, Id}) ->
case lookup(Id) of
{ok, Transformation} ->
[{Pos, Transformation}];
_ ->
[]
end
end,
Transformations0
),
Transformations2 = lists:sort(
fun({Pos1, _V1}, {Pos2, _V2}) -> Pos1 =< Pos2 end, Transformations1
),
lists:map(fun({_Pos, V}) -> V end, Transformations2).
-spec metrics_worker_spec() -> supervisor:child_spec().
metrics_worker_spec() ->