diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry.app.src b/apps/emqx_schema_registry/src/emqx_schema_registry.app.src index f05295579..efd9f1162 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry.app.src +++ b/apps/emqx_schema_registry/src/emqx_schema_registry.app.src @@ -1,6 +1,6 @@ {application, emqx_schema_registry, [ {description, "EMQX Schema Registry"}, - {vsn, "0.3.0"}, + {vsn, "0.3.1"}, {registered, [emqx_schema_registry_sup]}, {mod, {emqx_schema_registry_app, []}}, {included_applications, [ diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl index 638147fd1..e8da35449 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl @@ -64,7 +64,21 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) -> [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs] ); handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) -> - decode(SchemaId, Data, MoreArgs); + try + decode(SchemaId, Data, MoreArgs) + catch + error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} -> + throw( + {schema_decode_error, #{ + error_type => decoding_failure, + schema_id => SchemaId, + data => Data, + more_args => MoreArgs, + explain => + <<"The given data could not be decoded. Please check the input data and the schema.">> + }} + ) + end; handle_rule_function(schema_decode, Args) -> error({args_count_error, {schema_decode, Args}}); handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) -> diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl index 22252b7c3..d9286f266 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl @@ -44,7 +44,8 @@ sparkplug_tests() -> t_sparkplug_decode, t_sparkplug_encode, t_sparkplug_decode_encode_with_message_name, - t_sparkplug_encode_float_to_uint64_key + t_sparkplug_encode_float_to_uint64_key, + t_decode_fail ]. init_per_suite(Config) -> @@ -532,6 +533,23 @@ t_encode(Config) -> end, ok. +t_decode_fail(_Config) -> + SerdeName = my_serde, + SerdeType = protobuf, + ok = create_serde(SerdeType, SerdeName), + Payload = <<"ss">>, + ?assertThrow( + {schema_decode_error, #{ + data := <<"ss">>, + error_type := decoding_failure, + explain := _, + more_args := [<<"Person">>], + schema_id := <<"my_serde">> + }}, + emqx_rule_funcs:schema_decode(<<"my_serde">>, Payload, <<"Person">>) + ), + ok. + t_decode(Config) -> SerdeType = ?config(serde_type, Config), SerdeName = my_serde, diff --git a/changes/ee/fix-13147.en.md b/changes/ee/fix-13147.en.md new file mode 100644 index 000000000..4f717892d --- /dev/null +++ b/changes/ee/fix-13147.en.md @@ -0,0 +1 @@ +Error messages for decoding failures in the rule engine protobuf decode functions have been improved by adding a clear descriptions to indicate what went wrong when message decoding fails.