From c70e8252fee22268818753ca6775d894bdc006a5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 11 Jun 2024 13:35:02 -0300 Subject: [PATCH 1/3] fix: declare `emqx_schema_registry` as a dependency of `emqx_message_transformation` --- .../src/emqx_message_transformation.app.src | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.app.src b/apps/emqx_message_transformation/src/emqx_message_transformation.app.src index b8289c1f1..7643cbb9f 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.app.src +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.app.src @@ -6,7 +6,8 @@ {applications, [ kernel, stdlib, - emqx + emqx, + emqx_schema_registry ]}, {env, []}, {modules, []}, From 9b3c806ba7ae6b225cdc203df69044ba4b3319c2 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 11 Jun 2024 16:03:13 -0300 Subject: [PATCH 2/3] feat(message transformation): implement dryrun endpoint Follow up to https://github.com/emqx/emqx/pull/13199 --- .../src/emqx_message_transformation.erl | 291 +++++++++++------- .../emqx_message_transformation_http_api.erl | 147 +++++++++ ..._message_transformation_http_api_SUITE.erl | 122 ++++++++ apps/emqx_utils/src/emqx_variform.erl | 2 +- ...emqx_message_transformation_http_api.hocon | 3 + 5 files changed, 455 insertions(+), 110 deletions(-) diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.erl b/apps/emqx_message_transformation/src/emqx_message_transformation.erl index 612a30f78..14f44ec1f 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.erl @@ -26,20 +26,28 @@ on_message_publish/1 ]). +%% Internal exports +-export([run_transformation/2, trace_failure_context_to_map/1]). + %%------------------------------------------------------------------------------ %% Type declarations %%------------------------------------------------------------------------------ -define(TRACE_TAG, "MESSAGE_TRANSFORMATION"). --define(CONF_ROOT, message_transformation). --define(CONF_ROOT_BIN, <<"message_transformation">>). --define(TRANSFORMATIONS_CONF_PATH, [?CONF_ROOT, transformations]). + +-record(trace_failure_context, { + transformation :: transformation(), + tag :: string(), + context :: map() +}). +-type trace_failure_context() :: #trace_failure_context{}. -type transformation_name() :: binary(). %% TODO: make more specific typespec -type transformation() :: #{atom() => term()}. %% TODO: make more specific typespec -type variform() :: any(). +-type failure_action() :: ignore | drop | disconnect. -type operation() :: #{key := [binary(), ...], value := variform()}. -type qos() :: 0..2. -type rendered_value() :: qos() | boolean() | binary(). @@ -62,7 +70,8 @@ -export_type([ transformation/0, - transformation_name/0 + transformation_name/0, + failure_action/0 ]). %%------------------------------------------------------------------------------ @@ -125,19 +134,50 @@ on_message_publish(Message = #message{topic = Topic}) -> %% Internal exports %%------------------------------------------------------------------------------ +-spec run_transformation(transformation(), emqx_types:message()) -> + {ok, emqx_types:message()} | {failure_action(), trace_failure_context()}. +run_transformation(Transformation, MessageIn) -> + #{ + operations := Operations, + failure_action := FailureAction, + payload_decoder := PayloadDecoder + } = Transformation, + Fun = fun(Operation, Acc) -> + case eval_operation(Operation, Transformation, Acc) of + {ok, NewAcc} -> {cont, NewAcc}; + {error, TraceFailureContext} -> {halt, {error, TraceFailureContext}} + end + end, + PayloadIn = MessageIn#message.payload, + case decode(PayloadIn, PayloadDecoder, Transformation) of + {ok, InitPayload} -> + InitAcc = message_to_context(MessageIn, InitPayload, Transformation), + case emqx_utils:foldl_while(Fun, InitAcc, Operations) of + #{} = ContextOut -> + context_to_message(MessageIn, ContextOut, Transformation); + {error, TraceFailureContext} -> + {FailureAction, TraceFailureContext} + end; + {error, TraceFailureContext} -> + {FailureAction, TraceFailureContext} + end. + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ --spec eval_operation(operation(), transformation(), eval_context()) -> {ok, eval_context()} | error. +-spec eval_operation(operation(), transformation(), eval_context()) -> + {ok, eval_context()} | {error, trace_failure_context()}. eval_operation(Operation, Transformation, Context) -> #{key := K, value := V} = Operation, case eval_variform(K, V, Context) of {error, Reason} -> - trace_failure(Transformation, "transformation_eval_operation_failure", #{ - reason => Reason - }), - error; + FailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "transformation_eval_operation_failure", + context = #{reason => Reason} + }, + {error, FailureContext}; {ok, Rendered} -> NewContext = put_value(K, Rendered, Context), {ok, NewContext} @@ -233,14 +273,16 @@ do_run_transformations(Transformations, Message) -> #{name := Name} = Transformation, emqx_message_transformation_registry:inc_matched(Name), case run_transformation(Transformation, MessageAcc) of - #message{} = NewAcc -> + {ok, #message{} = NewAcc} -> emqx_message_transformation_registry:inc_succeeded(Name), {cont, NewAcc}; - ignore -> + {ignore, TraceFailureContext} -> + trace_failure_from_context(TraceFailureContext), emqx_message_transformation_registry:inc_failed(Name), run_message_transformation_failed_hook(Message, Transformation), {cont, MessageAcc}; - FailureAction -> + {FailureAction, TraceFailureContext} -> + trace_failure_from_context(TraceFailureContext), trace_failure(Transformation, "transformation_failed", #{ transformation => Name, action => FailureAction @@ -270,33 +312,6 @@ do_run_transformations(Transformations, Message) -> FailureAction end. -run_transformation(Transformation, MessageIn) -> - #{ - operations := Operations, - failure_action := FailureAction, - payload_decoder := PayloadDecoder - } = Transformation, - Fun = fun(Operation, Acc) -> - case eval_operation(Operation, Transformation, Acc) of - {ok, NewAcc} -> {cont, NewAcc}; - error -> {halt, FailureAction} - end - end, - PayloadIn = MessageIn#message.payload, - case decode(PayloadIn, PayloadDecoder, Transformation) of - {ok, InitPayload} -> - InitAcc = message_to_context(MessageIn, InitPayload, Transformation), - case emqx_utils:foldl_while(Fun, InitAcc, Operations) of - #{} = ContextOut -> - context_to_message(MessageIn, ContextOut, Transformation); - _ -> - FailureAction - end; - error -> - %% Error already logged - FailureAction - end. - -spec message_to_context(emqx_types:message(), _Payload, transformation()) -> eval_context(). message_to_context(#message{} = Message, Payload, Transformation) -> #{ @@ -321,7 +336,7 @@ message_to_context(#message{} = Message, Payload, Transformation) -> }. -spec context_to_message(emqx_types:message(), eval_context(), transformation()) -> - {ok, emqx_types:message()} | _TODO. + {ok, emqx_types:message()} | {failure_action(), trace_failure_context()}. context_to_message(Message, Context, Transformation) -> #{ failure_action := FailureAction, @@ -330,9 +345,9 @@ context_to_message(Message, Context, Transformation) -> #{payload := PayloadOut} = Context, case encode(PayloadOut, PayloadEncoder, Transformation) of {ok, Payload} -> - take_from_context(Context#{payload := Payload}, Message); - error -> - FailureAction + {ok, take_from_context(Context#{payload := Payload}, Message)}; + {error, TraceFailureContext} -> + {FailureAction, TraceFailureContext} end. take_from_context(Context, Message) -> @@ -362,31 +377,43 @@ decode(Payload, #{type := json}, Transformation) -> {ok, JSON} -> {ok, JSON}; {error, Reason} -> - trace_failure(Transformation, "payload_decode_failed", #{ - decoder => json, - reason => Reason - }), - error + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_decode_failed", + context = #{ + decoder => json, + reason => Reason + } + }, + {error, TraceFailureContext} end; decode(Payload, #{type := avro, schema := SerdeName}, Transformation) -> try {ok, emqx_schema_registry_serde:decode(SerdeName, Payload)} catch error:{serde_not_found, _} -> - trace_failure(Transformation, "payload_decode_schema_not_found", #{ - decoder => avro, - schema_name => SerdeName - }), - error; + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_decode_schema_not_found", + context = #{ + decoder => avro, + schema_name => SerdeName + } + }, + {error, TraceFailureContext}; Class:Error:Stacktrace -> - trace_failure(Transformation, "payload_decode_schema_failure", #{ - decoder => avro, - schema_name => SerdeName, - kind => Class, - reason => Error, - stacktrace => Stacktrace - }), - error + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_decode_schema_failure", + context = #{ + decoder => avro, + schema_name => SerdeName, + kind => Class, + reason => Error, + stacktrace => Stacktrace + } + }, + {error, TraceFailureContext} end; decode( Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation @@ -395,22 +422,30 @@ decode( {ok, emqx_schema_registry_serde:decode(SerdeName, Payload, [MessageType])} catch error:{serde_not_found, _} -> - trace_failure(Transformation, "payload_decode_schema_not_found", #{ - decoder => protobuf, - schema_name => SerdeName, - message_type => MessageType - }), - error; + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_decode_schema_not_found", + context = #{ + decoder => protobuf, + schema_name => SerdeName, + message_type => MessageType + } + }, + {error, TraceFailureContext}; Class:Error:Stacktrace -> - trace_failure(Transformation, "payload_decode_schema_failure", #{ - decoder => protobuf, - schema_name => SerdeName, - message_type => MessageType, - kind => Class, - reason => Error, - stacktrace => Stacktrace - }), - error + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_decode_schema_failure", + context = #{ + decoder => protobuf, + schema_name => SerdeName, + message_type => MessageType, + kind => Class, + reason => Error, + stacktrace => Stacktrace + } + }, + {error, TraceFailureContext} end. encode(Payload, #{type := none}, _Transformation) -> @@ -420,31 +455,43 @@ encode(Payload, #{type := json}, Transformation) -> {ok, Bin} -> {ok, Bin}; {error, Reason} -> - trace_failure(Transformation, "payload_encode_failed", #{ - encoder => json, - reason => Reason - }), - error + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_encode_failed", + context = #{ + encoder => json, + reason => Reason + } + }, + {error, TraceFailureContext} end; encode(Payload, #{type := avro, schema := SerdeName}, Transformation) -> try {ok, emqx_schema_registry_serde:encode(SerdeName, Payload)} catch error:{serde_not_found, _} -> - trace_failure(Transformation, "payload_encode_schema_not_found", #{ - encoder => avro, - schema_name => SerdeName - }), - error; + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_encode_schema_not_found", + context = #{ + encoder => avro, + schema_name => SerdeName + } + }, + {error, TraceFailureContext}; Class:Error:Stacktrace -> - trace_failure(Transformation, "payload_encode_schema_failure", #{ - encoder => avro, - schema_name => SerdeName, - kind => Class, - reason => Error, - stacktrace => Stacktrace - }), - error + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_encode_schema_failure", + context = #{ + encoder => avro, + schema_name => SerdeName, + kind => Class, + reason => Error, + stacktrace => Stacktrace + } + }, + {error, TraceFailureContext} end; encode( Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation @@ -453,24 +500,50 @@ encode( {ok, emqx_schema_registry_serde:encode(SerdeName, Payload, [MessageType])} catch error:{serde_not_found, _} -> - trace_failure(Transformation, "payload_encode_schema_not_found", #{ - encoder => protobuf, - schema_name => SerdeName, - message_type => MessageType - }), - error; + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_encode_schema_failure", + context = #{ + encoder => protobuf, + schema_name => SerdeName, + message_type => MessageType + } + }, + {error, TraceFailureContext}; Class:Error:Stacktrace -> - trace_failure(Transformation, "payload_encode_schema_failure", #{ - encoder => protobuf, - schema_name => SerdeName, - message_type => MessageType, - kind => Class, - reason => Error, - stacktrace => Stacktrace - }), - error + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_encode_schema_failure", + context = #{ + encoder => protobuf, + schema_name => SerdeName, + message_type => MessageType, + kind => Class, + reason => Error, + stacktrace => Stacktrace + } + }, + {error, TraceFailureContext} end. +trace_failure_from_context( + #trace_failure_context{ + transformation = Transformation, + tag = Tag, + context = Context + } +) -> + trace_failure(Transformation, Tag, Context). + +%% Internal export for HTTP API. +trace_failure_context_to_map( + #trace_failure_context{ + tag = Tag, + context = Context + } +) -> + Context#{msg => list_to_binary(Tag)}. + trace_failure(#{log_failure := #{level := none}} = Transformation, _Msg, _Meta) -> #{ name := _Name, diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl b/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl index 3b3132d0d..ee45b1fce 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl @@ -8,6 +8,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_utils_api.hrl"). %% `minirest' and `minirest_trails' API @@ -23,6 +24,7 @@ -export([ '/message_transformations'/2, '/message_transformations/reorder'/2, + '/message_transformations/dryrun'/2, '/message_transformations/transformation/:name'/2, '/message_transformations/transformation/:name/metrics'/2, '/message_transformations/transformation/:name/metrics/reset'/2, @@ -36,6 +38,9 @@ -define(TAGS, [<<"Message Transformation">>]). -define(METRIC_NAME, message_transformation). +-type user_property() :: #{binary() => binary()}. +-reflect_type([user_property/0]). + %%------------------------------------------------------------------------------------------------- %% `minirest' and `minirest_trails' API %%------------------------------------------------------------------------------------------------- @@ -49,6 +54,7 @@ paths() -> [ "/message_transformations", "/message_transformations/reorder", + "/message_transformations/dryrun", "/message_transformations/transformation/:name", "/message_transformations/transformation/:name/metrics", "/message_transformations/transformation/:name/metrics/reset", @@ -143,6 +149,25 @@ schema("/message_transformations/reorder") -> } } }; +schema("/message_transformations/dryrun") -> + #{ + 'operationId' => '/message_transformations/dryrun', + post => #{ + tags => ?TAGS, + summary => <<"Test an input against a configuration">>, + description => ?DESC("test_transformation"), + 'requestBody' => + emqx_dashboard_swagger:schema_with_examples( + ref(test_transformation), + example_input_test_transformation() + ), + responses => + #{ + 200 => <<"TODO">>, + 400 => error_schema('BAD_REQUEST', <<"Bad request">>) + } + } + }; schema("/message_transformations/transformation/:name") -> #{ 'operationId' => '/message_transformations/transformation/:name', @@ -267,6 +292,29 @@ fields(reorder) -> [ {order, mk(array(binary()), #{required => true, in => body})} ]; +fields(test_transformation) -> + [ + {transformation, + mk( + hoconsc:ref(emqx_message_transformation_schema, transformation), + #{required => true, in => body} + )}, + {message, mk(ref(test_input_message), #{required => true, in => body})} + ]; +fields(test_input_message) -> + %% See `emqx_message_transformation:eval_context()'. + [ + {client_attrs, mk(map(), #{required => true})}, + {payload, mk(binary(), #{required => true})}, + {qos, mk(range(0, 2), #{required => true})}, + {retain, mk(boolean(), #{required => true})}, + {topic, mk(binary(), #{required => true})}, + {user_property, + mk( + typerefl:alias("map(binary(), binary())", user_property()), + #{required => true} + )} + ]; fields(get_metrics) -> [ {metrics, mk(ref(metrics), #{})}, @@ -343,6 +391,9 @@ fields(node_metrics) -> '/message_transformations/reorder'(post, #{body := #{<<"order">> := Order}}) -> do_reorder(Order). +'/message_transformations/dryrun'(post, #{body := Params}) -> + do_transformation_dryrun(Params). + '/message_transformations/transformation/:name/enable/:enable'(post, #{ bindings := #{name := Name, enable := Enable} }) -> @@ -436,6 +487,17 @@ example_input_reorder() -> } }. +example_input_test_transformation() -> + #{ + <<"test">> => + #{ + summary => <<"Test an input against a configuration">>, + value => #{ + todo => true + } + } + }. + example_return_list() -> OtherVal0 = example_transformation([example_avro_check()]), OtherVal = OtherVal0#{name => <<"other_transformation">>}, @@ -541,6 +603,20 @@ do_reorder(Order) -> ?BAD_REQUEST(Error) end. +do_transformation_dryrun(Params) -> + #{ + transformation := Transformation, + message := Message + } = dryrun_input_message_in(Params), + case emqx_message_transformation:run_transformation(Transformation, Message) of + {ok, #message{} = FinalMessage} -> + MessageOut = dryrun_input_message_out(FinalMessage), + ?OK(MessageOut); + {_FailureAction, TraceFailureContext} -> + Result = trace_failure_context_out(TraceFailureContext), + {400, Result} + end. + do_enable_disable(Transformation, Enable) -> RawTransformation = make_serializable(Transformation), case emqx_message_transformation:update(RawTransformation#{<<"enable">> => Enable}) of @@ -654,3 +730,74 @@ operation_out(Operation0) -> fun(Path) -> iolist_to_binary(lists:join(".", Path)) end, Operation ). + +dryrun_input_message_in(Params) -> + %% We already check the params against the schema at the API boundary, so we can + %% expect it to succeed here. + #{root := Result = #{message := Message0}} = + hocon_tconf:check_plain( + #{roots => [{root, ref(test_transformation)}]}, + #{<<"root">> => Params}, + #{atom_key => true} + ), + #{ + client_attrs := ClientAttrs, + payload := Payload, + qos := QoS, + retain := Retain, + topic := Topic, + user_property := UserProperty0 + } = Message0, + UserProperty = maps:to_list(UserProperty0), + Message1 = #{ + id => emqx_guid:gen(), + timestamp => emqx_message:timestamp_now(), + extra => #{}, + from => <<"test-clientid">>, + + flags => #{retain => Retain}, + qos => QoS, + topic => Topic, + payload => Payload, + headers => #{ + client_attrs => ClientAttrs, + properties => #{'User-Property' => UserProperty} + } + }, + Message = emqx_message:from_map(Message1), + Result#{message := Message}. + +dryrun_input_message_out(#message{} = Message) -> + Retain = emqx_message:get_flag(retain, Message, false), + Props = emqx_message:get_header(properties, Message, #{}), + UserProperty0 = maps:get('User-Property', Props, []), + UserProperty = maps:from_list(UserProperty0), + MessageOut0 = emqx_message:to_map(Message), + MessageOut = maps:with([payload, qos, topic], MessageOut0), + MessageOut#{ + retain => Retain, + user_property => UserProperty + }. + +trace_failure_context_out(TraceFailureContext) -> + Context0 = emqx_message_transformation:trace_failure_context_to_map(TraceFailureContext), + %% Some context keys may not be JSON-encodable. + maps:filtermap( + fun + (reason, Reason) -> + case emqx_utils_json:safe_encode(Reason) of + {ok, _} -> + %% Let minirest encode it if it's structured. + true; + {error, _} -> + %% "Best effort" + {true, iolist_to_binary(io_lib:format("~p", [Reason]))} + end; + (stacktrace, _Stacktrace) -> + %% Log? + false; + (_Key, _Value) -> + true + end, + Context0 + ). diff --git a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl index b3b88ac69..5e4d9a0cb 100644 --- a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl +++ b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl @@ -140,6 +140,31 @@ topic_operation(VariformExpr) -> operation(Key, VariformExpr) -> {Key, VariformExpr}. +json_serde() -> + #{<<"type">> => <<"json">>}. + +avro_serde(SerdeName) -> + #{<<"type">> => <<"avro">>, <<"schema">> => SerdeName}. + +dryrun_input_message() -> + dryrun_input_message(_Overrides = #{}). + +dryrun_input_message(Overrides) -> + dryrun_input_message(Overrides, _Opts = #{}). + +dryrun_input_message(Overrides, Opts) -> + Encoder = maps:get(encoder, Opts, fun emqx_utils_json:encode/1), + Defaults = #{ + client_attrs => #{}, + payload => #{}, + qos => 2, + retain => true, + topic => <<"t/u/v">>, + user_property => #{} + }, + InputMessage0 = emqx_utils_maps:deep_merge(Defaults, Overrides), + maps:update_with(payload, Encoder, InputMessage0). + api_root() -> "message_transformations". simplify_result(Res) -> @@ -246,6 +271,13 @@ import_backup(BackupName) -> Res = request(post, Path, Body), simplify_result(Res). +dryrun_transformation(Transformation, Message) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), "dryrun"]), + Params = #{transformation => Transformation, message => Message}, + Res = request(post, Path, Params), + ct:pal("dryrun transformation result:\n ~p", [Res]), + simplify_result(Res). + connect(ClientId) -> connect(ClientId, _IsPersistent = false). @@ -1491,3 +1523,93 @@ t_client_attrs(_Config) -> [] ), ok. + +%% Smoke tests for the dryrun endpoint. +t_dryrun_transformation(_Config) -> + ?check_trace( + begin + Name1 = <<"foo">>, + Operations = [ + operation(qos, <<"payload.q">>), + operation(topic, <<"concat([topic, '/', payload.t])">>), + operation(retain, <<"payload.r">>), + operation(<<"user_property.a">>, <<"payload.u.a">>), + operation(<<"payload">>, <<"payload.p.hello">>) + ], + Transformation1 = transformation(Name1, Operations), + + %% Good input + Message1 = dryrun_input_message(#{ + payload => #{ + p => #{<<"hello">> => <<"world">>}, + q => 1, + r => true, + t => <<"t">>, + u => #{a => <<"b">>} + } + }), + ?assertMatch( + {200, #{ + <<"payload">> := <<"\"world\"">>, + <<"qos">> := 1, + <<"retain">> := true, + <<"topic">> := <<"t/u/v/t">>, + <<"user_property">> := #{<<"a">> := <<"b">>} + }}, + dryrun_transformation(Transformation1, Message1) + ), + + %% Bad input: fails to decode + Message2 = dryrun_input_message(#{payload => "{"}, #{encoder => fun(X) -> X end}), + ?assertMatch( + {400, #{ + <<"decoder">> := <<"json">>, + <<"reason">> := <<_/binary>> + }}, + dryrun_transformation(Transformation1, Message2) + ), + + %% Bad output: fails to encode + MissingSerde = <<"missing_serde">>, + Transformation2 = transformation(Name1, [dummy_operation()], #{ + <<"payload_decoder">> => json_serde(), + <<"payload_encoder">> => avro_serde(MissingSerde) + }), + ?assertMatch( + {400, #{ + <<"msg">> := <<"payload_encode_schema_not_found">>, + <<"encoder">> := <<"avro">>, + <<"schema_name">> := MissingSerde + }}, + dryrun_transformation(Transformation2, Message1) + ), + + %% Bad input: unbound var during one of the operations + Message3 = dryrun_input_message(#{ + payload => #{ + p => #{<<"hello">> => <<"world">>}, + q => 1, + %% Missing: + %% r => true, + t => <<"t">>, + u => #{a => <<"b">>} + } + }), + ?assertMatch( + {400, #{ + <<"msg">> := + <<"transformation_eval_operation_failure">>, + <<"reason">> := + #{ + <<"reason">> := <<"var_unbound">>, + <<"var_name">> := <<"payload.r">> + } + }}, + dryrun_transformation(Transformation1, Message3) + ), + + ok + end, + [] + ), + ok. diff --git a/apps/emqx_utils/src/emqx_variform.erl b/apps/emqx_utils/src/emqx_variform.erl index 7a0bc8118..1c2064c87 100644 --- a/apps/emqx_utils/src/emqx_variform.erl +++ b/apps/emqx_utils/src/emqx_variform.erl @@ -276,7 +276,7 @@ resolve_var_value(VarName, Bindings, _Opts) -> Value; {error, _Reason} -> throw(#{ - var_name => VarName, + var_name => iolist_to_binary(VarName), reason => var_unbound }) end. diff --git a/rel/i18n/emqx_message_transformation_http_api.hocon b/rel/i18n/emqx_message_transformation_http_api.hocon index 038e3e8ca..349b8ba79 100644 --- a/rel/i18n/emqx_message_transformation_http_api.hocon +++ b/rel/i18n/emqx_message_transformation_http_api.hocon @@ -18,6 +18,9 @@ emqx_message_transformation_http_api { reorder_transformations.desc: """Reorder of all transformations""" + test_transformation.desc: + """Test an input against a transformation""" + enable_disable_transformation.desc: """Enable or disable a particular transformation""" From ec4f46268440f0046e62e34a413bb9376259bca9 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 17 Jun 2024 10:31:49 -0300 Subject: [PATCH 3/3] refactor: apply review requests --- .../emqx_message_transformation_http_api.erl | 24 +++++++++---------- ...emqx_message_transformation_http_api.hocon | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl b/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl index ee45b1fce..1ba5cee8a 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl @@ -155,11 +155,11 @@ schema("/message_transformations/dryrun") -> post => #{ tags => ?TAGS, summary => <<"Test an input against a configuration">>, - description => ?DESC("test_transformation"), + description => ?DESC("dryrun_transformation"), 'requestBody' => emqx_dashboard_swagger:schema_with_examples( - ref(test_transformation), - example_input_test_transformation() + ref(dryrun_transformation), + example_input_dryrun_transformation() ), responses => #{ @@ -292,27 +292,27 @@ fields(reorder) -> [ {order, mk(array(binary()), #{required => true, in => body})} ]; -fields(test_transformation) -> +fields(dryrun_transformation) -> [ {transformation, mk( hoconsc:ref(emqx_message_transformation_schema, transformation), #{required => true, in => body} )}, - {message, mk(ref(test_input_message), #{required => true, in => body})} + {message, mk(ref(dryrun_input_message), #{required => true, in => body})} ]; -fields(test_input_message) -> +fields(dryrun_input_message) -> %% See `emqx_message_transformation:eval_context()'. [ - {client_attrs, mk(map(), #{required => true})}, + {client_attrs, mk(map(), #{default => #{}})}, {payload, mk(binary(), #{required => true})}, - {qos, mk(range(0, 2), #{required => true})}, - {retain, mk(boolean(), #{required => true})}, + {qos, mk(range(0, 2), #{default => 0})}, + {retain, mk(boolean(), #{default => false})}, {topic, mk(binary(), #{required => true})}, {user_property, mk( typerefl:alias("map(binary(), binary())", user_property()), - #{required => true} + #{default => #{}} )} ]; fields(get_metrics) -> @@ -487,7 +487,7 @@ example_input_reorder() -> } }. -example_input_test_transformation() -> +example_input_dryrun_transformation() -> #{ <<"test">> => #{ @@ -736,7 +736,7 @@ dryrun_input_message_in(Params) -> %% expect it to succeed here. #{root := Result = #{message := Message0}} = hocon_tconf:check_plain( - #{roots => [{root, ref(test_transformation)}]}, + #{roots => [{root, ref(dryrun_transformation)}]}, #{<<"root">> => Params}, #{atom_key => true} ), diff --git a/rel/i18n/emqx_message_transformation_http_api.hocon b/rel/i18n/emqx_message_transformation_http_api.hocon index 349b8ba79..a40347bf4 100644 --- a/rel/i18n/emqx_message_transformation_http_api.hocon +++ b/rel/i18n/emqx_message_transformation_http_api.hocon @@ -18,7 +18,7 @@ emqx_message_transformation_http_api { reorder_transformations.desc: """Reorder of all transformations""" - test_transformation.desc: + dryrun_transformation.desc: """Test an input against a transformation""" enable_disable_transformation.desc: