diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.erl b/apps/emqx_message_transformation/src/emqx_message_transformation.erl index 569f79846..5d3aaa22e 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.erl @@ -467,6 +467,17 @@ decode( } }, {error, TraceFailureContext}; + throw:{schema_decode_error, ExtraContext} -> + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_decode_error", + context = ExtraContext#{ + decoder => protobuf, + schema_name => SerdeName, + message_type => MessageType + } + }, + {error, TraceFailureContext}; Class:Error:Stacktrace -> TraceFailureContext = #trace_failure_context{ transformation = Transformation, diff --git a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl index 8d08c4706..b1b70a89d 100644 --- a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl +++ b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl @@ -1394,6 +1394,94 @@ t_protobuf(_Config) -> ok. +%% Checks what happens if a wrong transformation chain is used. In this case, the second +%% transformation attempts to protobuf-decode a message that was already decoded but not +%% re-encoded by the first transformation. +t_protobuf_bad_chain(_Config) -> + ?check_trace( + begin + SerdeName = <<"myserde">>, + MessageType = <<"Person">>, + protobuf_create_serde(SerdeName), + + Name1 = <<"foo">>, + PayloadSerde = #{ + <<"type">> => <<"protobuf">>, + <<"schema">> => SerdeName, + <<"message_type">> => MessageType + }, + NoneSerde = #{<<"type">> => <<"none">>}, + JSONSerde = #{<<"type">> => <<"json">>}, + + Transformation1 = transformation(Name1, _Ops1 = [], #{ + <<"payload_decoder">> => PayloadSerde, + <<"payload_encoder">> => NoneSerde + }), + {201, _} = insert(Transformation1), + + %% WRONG: after the first transformation, payload is already decoded, so we + %% shouldn't use protobuf again. + Name2 = <<"bar">>, + Transformation2A = transformation(Name2, [], #{ + <<"payload_decoder">> => PayloadSerde, + <<"payload_encoder">> => JSONSerde + }), + {201, _} = insert(Transformation2A), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + + [Payload | _] = protobuf_valid_payloads(SerdeName, MessageType), + ok = publish(C, <<"t/1">>, {raw, Payload}), + ?assertNotReceive({publish, _}), + + ok + end, + fun(Trace) -> + ct:pal("trace:\n ~p", [Trace]), + ?assertMatch( + [], + [ + E + || #{ + ?snk_kind := message_transformation_failed, + message := "payload_decode_schema_failure", + reason := function_clause + } = E <- Trace + ] + ), + %% No unexpected crashes + ?assertMatch( + [], + [ + E + || #{ + ?snk_kind := message_transformation_failed, + stacktrace := _ + } = E <- Trace + ] + ), + ?assertMatch( + [ + #{ + explain := + <<"Attempted to schema decode an already decoded message.", _/binary>> + } + | _ + ], + [ + E + || #{ + ?snk_kind := message_transformation_failed, + message := "payload_decode_error" + } = E <- Trace + ] + ), + ok + end + ), + ok. + %% Tests that restoring a backup config works. %% * Existing transformations (identified by `name') are left untouched. %% * No transformations are removed. diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl index 1a2f90ee7..0e661c356 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl @@ -90,21 +90,7 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) -> [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs] ); handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) -> - try - decode(SchemaId, Data, MoreArgs) - catch - error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} -> - throw( - {schema_decode_error, #{ - error_type => decoding_failure, - schema_id => SchemaId, - data => Data, - more_args => MoreArgs, - explain => - <<"The given data could not be decoded. Please check the input data and the schema.">> - }} - ) - end; + decode(SchemaId, Data, MoreArgs); handle_rule_function(schema_decode, Args) -> error({args_count_error, {schema_decode, Args}}); handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) -> @@ -162,7 +148,17 @@ encode(SerdeName, Data, VarArgs) when is_list(VarArgs) -> with_serde(Name, F) -> case emqx_schema_registry:get_serde(Name) of {ok, Serde} -> - F(Serde); + Meta = + case logger:get_process_metadata() of + undefined -> #{}; + Meta0 -> Meta0 + end, + logger:update_process_metadata(#{schema_name => Name}), + try + F(Serde) + after + logger:set_process_metadata(Meta) + end; {error, not_found} -> error({serde_not_found, Name}) end. @@ -199,10 +195,39 @@ make_serde(json, Name, Source) -> eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) -> Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]), avro_binary_decoder:decode(Data, Name, Store, Opts); -eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) -> - MessageName = binary_to_existing_atom(MessageName0, utf8), - Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]), - emqx_utils_maps:binary_key_map(Decoded); +eval_decode(#serde{type = protobuf}, [#{} = DecodedData, MessageType]) -> + %% Already decoded, so it's an user error. + throw( + {schema_decode_error, #{ + error_type => decoding_failure, + data => DecodedData, + message_type => MessageType, + explain => + << + "Attempted to schema decode an already decoded message." + " Check your rules or transformation pipeline." + >> + }} + ); +eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageType0]) -> + MessageType = binary_to_existing_atom(MessageType0, utf8), + try + Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageType]), + emqx_utils_maps:binary_key_map(Decoded) + catch + error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} -> + #{schema_name := SchemaName} = logger:get_process_metadata(), + throw( + {schema_decode_error, #{ + error_type => decoding_failure, + data => EncodedData, + message_type => MessageType, + schema_name => SchemaName, + explain => + <<"The given data could not be decoded. Please check the input data and the schema.">> + }} + ) + end; eval_decode(#serde{type = json, name = Name}, [Data]) -> true = is_binary(Data), Term = json_decode(Data), diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl index a403c3976..b9d214676 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl @@ -555,8 +555,8 @@ t_decode_fail(_Config) -> data := <<"ss">>, error_type := decoding_failure, explain := _, - more_args := [<<"Person">>], - schema_id := <<"my_serde">> + message_type := 'Person', + schema_name := <<"my_serde">> }}, emqx_rule_funcs:schema_decode(<<"my_serde">>, Payload, <<"Person">>) ),