feat(message transformation): implement dryrun endpoint

Follow up to https://github.com/emqx/emqx/pull/13199
This commit is contained in:
Thales Macedo Garitezi 2024-06-11 16:03:13 -03:00
parent c70e8252fe
commit 9b3c806ba7
5 changed files with 455 additions and 110 deletions

View File

@ -26,20 +26,28 @@
on_message_publish/1
]).
%% Internal exports
-export([run_transformation/2, trace_failure_context_to_map/1]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-define(TRACE_TAG, "MESSAGE_TRANSFORMATION").
-define(CONF_ROOT, message_transformation).
-define(CONF_ROOT_BIN, <<"message_transformation">>).
-define(TRANSFORMATIONS_CONF_PATH, [?CONF_ROOT, transformations]).
-record(trace_failure_context, {
transformation :: transformation(),
tag :: string(),
context :: map()
}).
-type trace_failure_context() :: #trace_failure_context{}.
-type transformation_name() :: binary().
%% TODO: make more specific typespec
-type transformation() :: #{atom() => term()}.
%% TODO: make more specific typespec
-type variform() :: any().
-type failure_action() :: ignore | drop | disconnect.
-type operation() :: #{key := [binary(), ...], value := variform()}.
-type qos() :: 0..2.
-type rendered_value() :: qos() | boolean() | binary().
@ -62,7 +70,8 @@
-export_type([
transformation/0,
transformation_name/0
transformation_name/0,
failure_action/0
]).
%%------------------------------------------------------------------------------
@ -125,19 +134,50 @@ on_message_publish(Message = #message{topic = Topic}) ->
%% Internal exports
%%------------------------------------------------------------------------------
-spec run_transformation(transformation(), emqx_types:message()) ->
{ok, emqx_types:message()} | {failure_action(), trace_failure_context()}.
run_transformation(Transformation, MessageIn) ->
#{
operations := Operations,
failure_action := FailureAction,
payload_decoder := PayloadDecoder
} = Transformation,
Fun = fun(Operation, Acc) ->
case eval_operation(Operation, Transformation, Acc) of
{ok, NewAcc} -> {cont, NewAcc};
{error, TraceFailureContext} -> {halt, {error, TraceFailureContext}}
end
end,
PayloadIn = MessageIn#message.payload,
case decode(PayloadIn, PayloadDecoder, Transformation) of
{ok, InitPayload} ->
InitAcc = message_to_context(MessageIn, InitPayload, Transformation),
case emqx_utils:foldl_while(Fun, InitAcc, Operations) of
#{} = ContextOut ->
context_to_message(MessageIn, ContextOut, Transformation);
{error, TraceFailureContext} ->
{FailureAction, TraceFailureContext}
end;
{error, TraceFailureContext} ->
{FailureAction, TraceFailureContext}
end.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
-spec eval_operation(operation(), transformation(), eval_context()) -> {ok, eval_context()} | error.
-spec eval_operation(operation(), transformation(), eval_context()) ->
{ok, eval_context()} | {error, trace_failure_context()}.
eval_operation(Operation, Transformation, Context) ->
#{key := K, value := V} = Operation,
case eval_variform(K, V, Context) of
{error, Reason} ->
trace_failure(Transformation, "transformation_eval_operation_failure", #{
reason => Reason
}),
error;
FailureContext = #trace_failure_context{
transformation = Transformation,
tag = "transformation_eval_operation_failure",
context = #{reason => Reason}
},
{error, FailureContext};
{ok, Rendered} ->
NewContext = put_value(K, Rendered, Context),
{ok, NewContext}
@ -233,14 +273,16 @@ do_run_transformations(Transformations, Message) ->
#{name := Name} = Transformation,
emqx_message_transformation_registry:inc_matched(Name),
case run_transformation(Transformation, MessageAcc) of
#message{} = NewAcc ->
{ok, #message{} = NewAcc} ->
emqx_message_transformation_registry:inc_succeeded(Name),
{cont, NewAcc};
ignore ->
{ignore, TraceFailureContext} ->
trace_failure_from_context(TraceFailureContext),
emqx_message_transformation_registry:inc_failed(Name),
run_message_transformation_failed_hook(Message, Transformation),
{cont, MessageAcc};
FailureAction ->
{FailureAction, TraceFailureContext} ->
trace_failure_from_context(TraceFailureContext),
trace_failure(Transformation, "transformation_failed", #{
transformation => Name,
action => FailureAction
@ -270,33 +312,6 @@ do_run_transformations(Transformations, Message) ->
FailureAction
end.
run_transformation(Transformation, MessageIn) ->
#{
operations := Operations,
failure_action := FailureAction,
payload_decoder := PayloadDecoder
} = Transformation,
Fun = fun(Operation, Acc) ->
case eval_operation(Operation, Transformation, Acc) of
{ok, NewAcc} -> {cont, NewAcc};
error -> {halt, FailureAction}
end
end,
PayloadIn = MessageIn#message.payload,
case decode(PayloadIn, PayloadDecoder, Transformation) of
{ok, InitPayload} ->
InitAcc = message_to_context(MessageIn, InitPayload, Transformation),
case emqx_utils:foldl_while(Fun, InitAcc, Operations) of
#{} = ContextOut ->
context_to_message(MessageIn, ContextOut, Transformation);
_ ->
FailureAction
end;
error ->
%% Error already logged
FailureAction
end.
-spec message_to_context(emqx_types:message(), _Payload, transformation()) -> eval_context().
message_to_context(#message{} = Message, Payload, Transformation) ->
#{
@ -321,7 +336,7 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
}.
-spec context_to_message(emqx_types:message(), eval_context(), transformation()) ->
{ok, emqx_types:message()} | _TODO.
{ok, emqx_types:message()} | {failure_action(), trace_failure_context()}.
context_to_message(Message, Context, Transformation) ->
#{
failure_action := FailureAction,
@ -330,9 +345,9 @@ context_to_message(Message, Context, Transformation) ->
#{payload := PayloadOut} = Context,
case encode(PayloadOut, PayloadEncoder, Transformation) of
{ok, Payload} ->
take_from_context(Context#{payload := Payload}, Message);
error ->
FailureAction
{ok, take_from_context(Context#{payload := Payload}, Message)};
{error, TraceFailureContext} ->
{FailureAction, TraceFailureContext}
end.
take_from_context(Context, Message) ->
@ -362,31 +377,43 @@ decode(Payload, #{type := json}, Transformation) ->
{ok, JSON} ->
{ok, JSON};
{error, Reason} ->
trace_failure(Transformation, "payload_decode_failed", #{
decoder => json,
reason => Reason
}),
error
TraceFailureContext = #trace_failure_context{
transformation = Transformation,
tag = "payload_decode_failed",
context = #{
decoder => json,
reason => Reason
}
},
{error, TraceFailureContext}
end;
decode(Payload, #{type := avro, schema := SerdeName}, Transformation) ->
try
{ok, emqx_schema_registry_serde:decode(SerdeName, Payload)}
catch
error:{serde_not_found, _} ->
trace_failure(Transformation, "payload_decode_schema_not_found", #{
decoder => avro,
schema_name => SerdeName
}),
error;
TraceFailureContext = #trace_failure_context{
transformation = Transformation,
tag = "payload_decode_schema_not_found",
context = #{
decoder => avro,
schema_name => SerdeName
}
},
{error, TraceFailureContext};
Class:Error:Stacktrace ->
trace_failure(Transformation, "payload_decode_schema_failure", #{
decoder => avro,
schema_name => SerdeName,
kind => Class,
reason => Error,
stacktrace => Stacktrace
}),
error
TraceFailureContext = #trace_failure_context{
transformation = Transformation,
tag = "payload_decode_schema_failure",
context = #{
decoder => avro,
schema_name => SerdeName,
kind => Class,
reason => Error,
stacktrace => Stacktrace
}
},
{error, TraceFailureContext}
end;
decode(
Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation
@ -395,22 +422,30 @@ decode(
{ok, emqx_schema_registry_serde:decode(SerdeName, Payload, [MessageType])}
catch
error:{serde_not_found, _} ->
trace_failure(Transformation, "payload_decode_schema_not_found", #{
decoder => protobuf,
schema_name => SerdeName,
message_type => MessageType
}),
error;
TraceFailureContext = #trace_failure_context{
transformation = Transformation,
tag = "payload_decode_schema_not_found",
context = #{
decoder => protobuf,
schema_name => SerdeName,
message_type => MessageType
}
},
{error, TraceFailureContext};
Class:Error:Stacktrace ->
trace_failure(Transformation, "payload_decode_schema_failure", #{
decoder => protobuf,
schema_name => SerdeName,
message_type => MessageType,
kind => Class,
reason => Error,
stacktrace => Stacktrace
}),
error
TraceFailureContext = #trace_failure_context{
transformation = Transformation,
tag = "payload_decode_schema_failure",
context = #{
decoder => protobuf,
schema_name => SerdeName,
message_type => MessageType,
kind => Class,
reason => Error,
stacktrace => Stacktrace
}
},
{error, TraceFailureContext}
end.
encode(Payload, #{type := none}, _Transformation) ->
@ -420,31 +455,43 @@ encode(Payload, #{type := json}, Transformation) ->
{ok, Bin} ->
{ok, Bin};
{error, Reason} ->
trace_failure(Transformation, "payload_encode_failed", #{
encoder => json,
reason => Reason
}),
error
TraceFailureContext = #trace_failure_context{
transformation = Transformation,
tag = "payload_encode_failed",
context = #{
encoder => json,
reason => Reason
}
},
{error, TraceFailureContext}
end;
encode(Payload, #{type := avro, schema := SerdeName}, Transformation) ->
try
{ok, emqx_schema_registry_serde:encode(SerdeName, Payload)}
catch
error:{serde_not_found, _} ->
trace_failure(Transformation, "payload_encode_schema_not_found", #{
encoder => avro,
schema_name => SerdeName
}),
error;
TraceFailureContext = #trace_failure_context{
transformation = Transformation,
tag = "payload_encode_schema_not_found",
context = #{
encoder => avro,
schema_name => SerdeName
}
},
{error, TraceFailureContext};
Class:Error:Stacktrace ->
trace_failure(Transformation, "payload_encode_schema_failure", #{
encoder => avro,
schema_name => SerdeName,
kind => Class,
reason => Error,
stacktrace => Stacktrace
}),
error
TraceFailureContext = #trace_failure_context{
transformation = Transformation,
tag = "payload_encode_schema_failure",
context = #{
encoder => avro,
schema_name => SerdeName,
kind => Class,
reason => Error,
stacktrace => Stacktrace
}
},
{error, TraceFailureContext}
end;
encode(
Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation
@ -453,24 +500,50 @@ encode(
{ok, emqx_schema_registry_serde:encode(SerdeName, Payload, [MessageType])}
catch
error:{serde_not_found, _} ->
trace_failure(Transformation, "payload_encode_schema_not_found", #{
encoder => protobuf,
schema_name => SerdeName,
message_type => MessageType
}),
error;
TraceFailureContext = #trace_failure_context{
transformation = Transformation,
tag = "payload_encode_schema_failure",
context = #{
encoder => protobuf,
schema_name => SerdeName,
message_type => MessageType
}
},
{error, TraceFailureContext};
Class:Error:Stacktrace ->
trace_failure(Transformation, "payload_encode_schema_failure", #{
encoder => protobuf,
schema_name => SerdeName,
message_type => MessageType,
kind => Class,
reason => Error,
stacktrace => Stacktrace
}),
error
TraceFailureContext = #trace_failure_context{
transformation = Transformation,
tag = "payload_encode_schema_failure",
context = #{
encoder => protobuf,
schema_name => SerdeName,
message_type => MessageType,
kind => Class,
reason => Error,
stacktrace => Stacktrace
}
},
{error, TraceFailureContext}
end.
trace_failure_from_context(
#trace_failure_context{
transformation = Transformation,
tag = Tag,
context = Context
}
) ->
trace_failure(Transformation, Tag, Context).
%% Internal export for HTTP API.
trace_failure_context_to_map(
#trace_failure_context{
tag = Tag,
context = Context
}
) ->
Context#{msg => list_to_binary(Tag)}.
trace_failure(#{log_failure := #{level := none}} = Transformation, _Msg, _Meta) ->
#{
name := _Name,

View File

@ -8,6 +8,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
%% `minirest' and `minirest_trails' API
@ -23,6 +24,7 @@
-export([
'/message_transformations'/2,
'/message_transformations/reorder'/2,
'/message_transformations/dryrun'/2,
'/message_transformations/transformation/:name'/2,
'/message_transformations/transformation/:name/metrics'/2,
'/message_transformations/transformation/:name/metrics/reset'/2,
@ -36,6 +38,9 @@
-define(TAGS, [<<"Message Transformation">>]).
-define(METRIC_NAME, message_transformation).
-type user_property() :: #{binary() => binary()}.
-reflect_type([user_property/0]).
%%-------------------------------------------------------------------------------------------------
%% `minirest' and `minirest_trails' API
%%-------------------------------------------------------------------------------------------------
@ -49,6 +54,7 @@ paths() ->
[
"/message_transformations",
"/message_transformations/reorder",
"/message_transformations/dryrun",
"/message_transformations/transformation/:name",
"/message_transformations/transformation/:name/metrics",
"/message_transformations/transformation/:name/metrics/reset",
@ -143,6 +149,25 @@ schema("/message_transformations/reorder") ->
}
}
};
schema("/message_transformations/dryrun") ->
#{
'operationId' => '/message_transformations/dryrun',
post => #{
tags => ?TAGS,
summary => <<"Test an input against a configuration">>,
description => ?DESC("test_transformation"),
'requestBody' =>
emqx_dashboard_swagger:schema_with_examples(
ref(test_transformation),
example_input_test_transformation()
),
responses =>
#{
200 => <<"TODO">>,
400 => error_schema('BAD_REQUEST', <<"Bad request">>)
}
}
};
schema("/message_transformations/transformation/:name") ->
#{
'operationId' => '/message_transformations/transformation/:name',
@ -267,6 +292,29 @@ fields(reorder) ->
[
{order, mk(array(binary()), #{required => true, in => body})}
];
fields(test_transformation) ->
[
{transformation,
mk(
hoconsc:ref(emqx_message_transformation_schema, transformation),
#{required => true, in => body}
)},
{message, mk(ref(test_input_message), #{required => true, in => body})}
];
fields(test_input_message) ->
%% See `emqx_message_transformation:eval_context()'.
[
{client_attrs, mk(map(), #{required => true})},
{payload, mk(binary(), #{required => true})},
{qos, mk(range(0, 2), #{required => true})},
{retain, mk(boolean(), #{required => true})},
{topic, mk(binary(), #{required => true})},
{user_property,
mk(
typerefl:alias("map(binary(), binary())", user_property()),
#{required => true}
)}
];
fields(get_metrics) ->
[
{metrics, mk(ref(metrics), #{})},
@ -343,6 +391,9 @@ fields(node_metrics) ->
'/message_transformations/reorder'(post, #{body := #{<<"order">> := Order}}) ->
do_reorder(Order).
'/message_transformations/dryrun'(post, #{body := Params}) ->
do_transformation_dryrun(Params).
'/message_transformations/transformation/:name/enable/:enable'(post, #{
bindings := #{name := Name, enable := Enable}
}) ->
@ -436,6 +487,17 @@ example_input_reorder() ->
}
}.
example_input_test_transformation() ->
#{
<<"test">> =>
#{
summary => <<"Test an input against a configuration">>,
value => #{
todo => true
}
}
}.
example_return_list() ->
OtherVal0 = example_transformation([example_avro_check()]),
OtherVal = OtherVal0#{name => <<"other_transformation">>},
@ -541,6 +603,20 @@ do_reorder(Order) ->
?BAD_REQUEST(Error)
end.
do_transformation_dryrun(Params) ->
#{
transformation := Transformation,
message := Message
} = dryrun_input_message_in(Params),
case emqx_message_transformation:run_transformation(Transformation, Message) of
{ok, #message{} = FinalMessage} ->
MessageOut = dryrun_input_message_out(FinalMessage),
?OK(MessageOut);
{_FailureAction, TraceFailureContext} ->
Result = trace_failure_context_out(TraceFailureContext),
{400, Result}
end.
do_enable_disable(Transformation, Enable) ->
RawTransformation = make_serializable(Transformation),
case emqx_message_transformation:update(RawTransformation#{<<"enable">> => Enable}) of
@ -654,3 +730,74 @@ operation_out(Operation0) ->
fun(Path) -> iolist_to_binary(lists:join(".", Path)) end,
Operation
).
dryrun_input_message_in(Params) ->
%% We already check the params against the schema at the API boundary, so we can
%% expect it to succeed here.
#{root := Result = #{message := Message0}} =
hocon_tconf:check_plain(
#{roots => [{root, ref(test_transformation)}]},
#{<<"root">> => Params},
#{atom_key => true}
),
#{
client_attrs := ClientAttrs,
payload := Payload,
qos := QoS,
retain := Retain,
topic := Topic,
user_property := UserProperty0
} = Message0,
UserProperty = maps:to_list(UserProperty0),
Message1 = #{
id => emqx_guid:gen(),
timestamp => emqx_message:timestamp_now(),
extra => #{},
from => <<"test-clientid">>,
flags => #{retain => Retain},
qos => QoS,
topic => Topic,
payload => Payload,
headers => #{
client_attrs => ClientAttrs,
properties => #{'User-Property' => UserProperty}
}
},
Message = emqx_message:from_map(Message1),
Result#{message := Message}.
dryrun_input_message_out(#message{} = Message) ->
Retain = emqx_message:get_flag(retain, Message, false),
Props = emqx_message:get_header(properties, Message, #{}),
UserProperty0 = maps:get('User-Property', Props, []),
UserProperty = maps:from_list(UserProperty0),
MessageOut0 = emqx_message:to_map(Message),
MessageOut = maps:with([payload, qos, topic], MessageOut0),
MessageOut#{
retain => Retain,
user_property => UserProperty
}.
trace_failure_context_out(TraceFailureContext) ->
Context0 = emqx_message_transformation:trace_failure_context_to_map(TraceFailureContext),
%% Some context keys may not be JSON-encodable.
maps:filtermap(
fun
(reason, Reason) ->
case emqx_utils_json:safe_encode(Reason) of
{ok, _} ->
%% Let minirest encode it if it's structured.
true;
{error, _} ->
%% "Best effort"
{true, iolist_to_binary(io_lib:format("~p", [Reason]))}
end;
(stacktrace, _Stacktrace) ->
%% Log?
false;
(_Key, _Value) ->
true
end,
Context0
).

View File

@ -140,6 +140,31 @@ topic_operation(VariformExpr) ->
operation(Key, VariformExpr) ->
{Key, VariformExpr}.
json_serde() ->
#{<<"type">> => <<"json">>}.
avro_serde(SerdeName) ->
#{<<"type">> => <<"avro">>, <<"schema">> => SerdeName}.
dryrun_input_message() ->
dryrun_input_message(_Overrides = #{}).
dryrun_input_message(Overrides) ->
dryrun_input_message(Overrides, _Opts = #{}).
dryrun_input_message(Overrides, Opts) ->
Encoder = maps:get(encoder, Opts, fun emqx_utils_json:encode/1),
Defaults = #{
client_attrs => #{},
payload => #{},
qos => 2,
retain => true,
topic => <<"t/u/v">>,
user_property => #{}
},
InputMessage0 = emqx_utils_maps:deep_merge(Defaults, Overrides),
maps:update_with(payload, Encoder, InputMessage0).
api_root() -> "message_transformations".
simplify_result(Res) ->
@ -246,6 +271,13 @@ import_backup(BackupName) ->
Res = request(post, Path, Body),
simplify_result(Res).
dryrun_transformation(Transformation, Message) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), "dryrun"]),
Params = #{transformation => Transformation, message => Message},
Res = request(post, Path, Params),
ct:pal("dryrun transformation result:\n ~p", [Res]),
simplify_result(Res).
connect(ClientId) ->
connect(ClientId, _IsPersistent = false).
@ -1491,3 +1523,93 @@ t_client_attrs(_Config) ->
[]
),
ok.
%% Smoke tests for the dryrun endpoint.
t_dryrun_transformation(_Config) ->
?check_trace(
begin
Name1 = <<"foo">>,
Operations = [
operation(qos, <<"payload.q">>),
operation(topic, <<"concat([topic, '/', payload.t])">>),
operation(retain, <<"payload.r">>),
operation(<<"user_property.a">>, <<"payload.u.a">>),
operation(<<"payload">>, <<"payload.p.hello">>)
],
Transformation1 = transformation(Name1, Operations),
%% Good input
Message1 = dryrun_input_message(#{
payload => #{
p => #{<<"hello">> => <<"world">>},
q => 1,
r => true,
t => <<"t">>,
u => #{a => <<"b">>}
}
}),
?assertMatch(
{200, #{
<<"payload">> := <<"\"world\"">>,
<<"qos">> := 1,
<<"retain">> := true,
<<"topic">> := <<"t/u/v/t">>,
<<"user_property">> := #{<<"a">> := <<"b">>}
}},
dryrun_transformation(Transformation1, Message1)
),
%% Bad input: fails to decode
Message2 = dryrun_input_message(#{payload => "{"}, #{encoder => fun(X) -> X end}),
?assertMatch(
{400, #{
<<"decoder">> := <<"json">>,
<<"reason">> := <<_/binary>>
}},
dryrun_transformation(Transformation1, Message2)
),
%% Bad output: fails to encode
MissingSerde = <<"missing_serde">>,
Transformation2 = transformation(Name1, [dummy_operation()], #{
<<"payload_decoder">> => json_serde(),
<<"payload_encoder">> => avro_serde(MissingSerde)
}),
?assertMatch(
{400, #{
<<"msg">> := <<"payload_encode_schema_not_found">>,
<<"encoder">> := <<"avro">>,
<<"schema_name">> := MissingSerde
}},
dryrun_transformation(Transformation2, Message1)
),
%% Bad input: unbound var during one of the operations
Message3 = dryrun_input_message(#{
payload => #{
p => #{<<"hello">> => <<"world">>},
q => 1,
%% Missing:
%% r => true,
t => <<"t">>,
u => #{a => <<"b">>}
}
}),
?assertMatch(
{400, #{
<<"msg">> :=
<<"transformation_eval_operation_failure">>,
<<"reason">> :=
#{
<<"reason">> := <<"var_unbound">>,
<<"var_name">> := <<"payload.r">>
}
}},
dryrun_transformation(Transformation1, Message3)
),
ok
end,
[]
),
ok.

View File

@ -276,7 +276,7 @@ resolve_var_value(VarName, Bindings, _Opts) ->
Value;
{error, _Reason} ->
throw(#{
var_name => VarName,
var_name => iolist_to_binary(VarName),
reason => var_unbound
})
end.

View File

@ -18,6 +18,9 @@ emqx_message_transformation_http_api {
reorder_transformations.desc:
"""Reorder of all transformations"""
test_transformation.desc:
"""Test an input against a transformation"""
enable_disable_transformation.desc:
"""Enable or disable a particular transformation"""