From 44e4b3616df4f3d003d870754503ffa3c83c2b40 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 11 Jul 2024 11:59:50 -0300 Subject: [PATCH 1/5] feat(variform): allow hyphens in identifiers Fixes https://emqx.atlassian.net/browse/EMQX-12683 --- apps/emqx_utils/src/emqx_variform_scan.xrl | 2 +- apps/emqx_utils/test/emqx_variform_tests.erl | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/apps/emqx_utils/src/emqx_variform_scan.xrl b/apps/emqx_utils/src/emqx_variform_scan.xrl index 63c9fba29..d28fbaf81 100644 --- a/apps/emqx_utils/src/emqx_variform_scan.xrl +++ b/apps/emqx_utils/src/emqx_variform_scan.xrl @@ -1,6 +1,6 @@ Definitions. %% Define regular expressions for tokens -IDENTIFIER = [a-zA-Z][a-zA-Z0-9_.]* +IDENTIFIER = [a-zA-Z][-a-zA-Z0-9_.]* SQ_STRING = \'[^\']*\' DQ_STRING = \"[^\"]*\" INTEGER = [+-]?[0-9]+ diff --git a/apps/emqx_utils/test/emqx_variform_tests.erl b/apps/emqx_utils/test/emqx_variform_tests.erl index 4437280ea..779ee6648 100644 --- a/apps/emqx_utils/test/emqx_variform_tests.erl +++ b/apps/emqx_utils/test/emqx_variform_tests.erl @@ -23,7 +23,7 @@ -define(SYNTAX_ERROR, {error, "syntax error before:" ++ _}). -redner_test_() -> +render_test_() -> [ {"direct var reference", fun() -> ?assertEqual({ok, <<"1">>}, render("a", #{a => 1})) end}, {"concat strings", fun() -> @@ -32,6 +32,15 @@ redner_test_() -> {"concat empty string", fun() -> ?assertEqual({ok, <<"">>}, render("concat([''])", #{})) end}, + {"identifier with hyphen", fun() -> + ?assertEqual( + {ok, <<"10">>}, + render( + "pub_props.Message-Expiry-Interval", + #{pub_props => #{'Message-Expiry-Interval' => 10}} + ) + ) + end}, {"tokens 1st", fun() -> ?assertEqual({ok, <<"a">>}, render("nth(1,tokens(var, ','))", #{var => <<"a,b">>})) end}, From 01d89be74335dc6f7c4f6560c72b131ff42a3840 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 11 Jul 2024 12:13:42 -0300 Subject: [PATCH 2/5] feat(message transformation): add timestamp and pub_props fields to read context Fixes https://emqx.atlassian.net/browse/EMQX-12684 Fixes https://emqx.atlassian.net/browse/EMQX-12678 --- .../src/emqx_message_transformation.erl | 5 +++ ..._message_transformation_http_api_SUITE.erl | 39 ++++++++++++++++--- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.erl b/apps/emqx_message_transformation/src/emqx_message_transformation.erl index 94951ff4f..569f79846 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.erl @@ -62,9 +62,11 @@ node := _, payload := _, peername := _, + pub_props := _, publish_received_at := _, qos := _, retain := _, + timestamp := _, topic := _, user_property := _, username := _, @@ -345,6 +347,7 @@ message_to_context(#message{} = Message, Payload, Transformation) -> undefined end, Username = maps:get(username, Headers, undefined), + Timestamp = erlang:system_time(millisecond), #{ dirty => Dirty, @@ -355,9 +358,11 @@ message_to_context(#message{} = Message, Payload, Transformation) -> node => node(), payload => Payload, peername => Peername, + pub_props => Props, publish_received_at => Message#message.timestamp, qos => Message#message.qos, retain => emqx_message:get_flag(retain, Message, false), + timestamp => Timestamp, topic => Message#message.topic, user_property => UserProperties, username => Username diff --git a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl index fa83c8024..8d08c4706 100644 --- a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl +++ b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl @@ -504,7 +504,7 @@ assert_monitor_metrics() -> receive PATTERN = ____Msg0 -> ____Msg0 after TIMEOUT -> - error({message_not_received, ?LINE}) + error({message_not_received, {line, ?LINE}}) end end)() ). @@ -608,6 +608,8 @@ t_smoke_test(_Config) -> %% * peername %% * publish_received_at %% * username +%% * timestamp +%% * pub_props (and specific fields within containing hyphens) t_smoke_test_2(_Config) -> Name1 = <<"foo">>, Operations = [ @@ -617,14 +619,22 @@ t_smoke_test_2(_Config) -> operation(<<"payload.peername">>, <<"peername">>), operation(<<"payload.publish_received_at">>, <<"publish_received_at">>), operation(<<"payload.username">>, <<"username">>), - operation(<<"payload.flags">>, <<"flags">>) + operation(<<"payload.flags">>, <<"flags">>), + operation(<<"payload.timestamp">>, <<"timestamp">>), + operation(<<"payload.pub_props">>, <<"pub_props">>), + operation(<<"payload.content_type">>, <<"pub_props.Content-Type">>) ], Transformation1 = transformation(Name1, Operations), {201, _} = insert(Transformation1), ClientId = atom_to_binary(?FUNCTION_NAME), C1 = connect(ClientId), {ok, _, [_]} = emqtt:subscribe(C1, <<"t/#">>, [{qos, 2}]), - ok = publish(C1, <<"t/1">>, #{}), + ok = publish(C1, <<"t/1">>, #{}, _QoS = 0, #{ + props => #{ + 'Content-Type' => <<"application/json">>, + 'User-Property' => [{<<"a">>, <<"b">>}] + } + }), {publish, #{payload := Payload0}} = ?assertReceiveReturn({publish, _}, 1_000), NodeBin = atom_to_binary(node()), ?assertMatch( @@ -635,7 +645,13 @@ t_smoke_test_2(_Config) -> <<"peername">> := <<"127.0.0.1:", _/binary>>, <<"publish_received_at">> := PRAt, <<"username">> := <<"undefined">>, - <<"flags">> := #{<<"dup">> := false, <<"retain">> := false} + <<"flags">> := #{<<"dup">> := false, <<"retain">> := false}, + <<"timestamp">> := _, + <<"pub_props">> := #{ + <<"Content-Type">> := <<"application/json">>, + <<"User-Property">> := #{<<"a">> := <<"b">>} + }, + <<"content_type">> := <<"application/json">> } when is_integer(PRAt), emqx_utils_json:decode(Payload0, [return_maps]) ), @@ -644,7 +660,12 @@ t_smoke_test_2(_Config) -> Username = <<"myusername">>, C2 = connect(ClientId, _IsPersistent = false, #{start_props => #{username => Username}}), {ok, _, [_]} = emqtt:subscribe(C2, <<"t/#">>, [{qos, 2}]), - ok = publish(C2, <<"t/1">>, #{}), + ok = publish(C2, <<"t/1">>, #{}, _QoS = 0, #{ + props => #{ + 'Content-Type' => <<"application/json">>, + 'User-Property' => [{<<"a">>, <<"b">>}] + } + }), {publish, #{payload := Payload1}} = ?assertReceiveReturn({publish, _}, 1_000), ?assertMatch( #{ @@ -654,7 +675,13 @@ t_smoke_test_2(_Config) -> <<"peername">> := <<"127.0.0.1:", _/binary>>, <<"publish_received_at">> := PRAt, <<"username">> := Username, - <<"flags">> := #{<<"dup">> := false, <<"retain">> := false} + <<"flags">> := #{<<"dup">> := false, <<"retain">> := false}, + <<"timestamp">> := _, + <<"pub_props">> := #{ + <<"Content-Type">> := <<"application/json">>, + <<"User-Property">> := #{<<"a">> := <<"b">>} + }, + <<"content_type">> := <<"application/json">> } when is_integer(PRAt), emqx_utils_json:decode(Payload1, [return_maps]) ), From 5f595966d8e6ef5805a375d0feda9730a63ec014 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 11 Jul 2024 12:17:53 -0300 Subject: [PATCH 3/5] chore(message transformation): allow empty operation list Fixes https://emqx.atlassian.net/browse/EMQX-12682 --- .../src/emqx_message_transformation_schema.erl | 8 +------- .../test/emqx_message_transformation_tests.erl | 11 +++-------- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation_schema.erl b/apps/emqx_message_transformation/src/emqx_message_transformation_schema.erl index 169743e5b..ae1e22245 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation_schema.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation_schema.erl @@ -108,8 +108,7 @@ fields(transformation) -> hoconsc:array(ref(operation)), #{ desc => ?DESC("operation"), - required => true, - validator => fun validate_operations/1 + default => [] } )} ]; @@ -253,11 +252,6 @@ validate_unique_topics(Topics) -> {error, Msg} end. -validate_operations([]) -> - {error, <<"at least one operation must be defined">>}; -validate_operations([_ | _]) -> - ok. - compile_variform(Expression, #{make_serializable := true}) -> case is_binary(Expression) of true -> diff --git a/apps/emqx_message_transformation/test/emqx_message_transformation_tests.erl b/apps/emqx_message_transformation/test/emqx_message_transformation_tests.erl index 545773c00..6702cd974 100644 --- a/apps/emqx_message_transformation/test/emqx_message_transformation_tests.erl +++ b/apps/emqx_message_transformation/test/emqx_message_transformation_tests.erl @@ -114,14 +114,9 @@ schema_test_() -> transformation(<<"foo">>, [dummy_operation()]) ]) )}, - {"operations must be non-empty", - ?_assertThrow( - {_Schema, [ - #{ - reason := <<"at least one operation must be defined">>, - kind := validation_error - } - ]}, + {"operations may be empty", + ?_assertMatch( + [#{<<"operations">> := []}], parse_and_check([ transformation( <<"foo">>, From 2816170e9d476b1e28707201a836e09eb7883cbd Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 11 Jul 2024 13:57:08 -0300 Subject: [PATCH 4/5] chore: add `$events.message_transformation_failed` to rule engine tester Fixes https://emqx.atlassian.net/browse/EMQX-12679 --- .../emqx_rule_engine/src/emqx_rule_api_schema.erl | 10 +++++++++- apps/emqx_rule_engine/src/emqx_rule_sqltester.erl | 2 ++ .../test/emqx_rule_engine_api_rule_test_SUITE.erl | 15 +++++++++++++++ rel/i18n/emqx_rule_api_schema.hocon | 6 ++++++ 4 files changed, 32 insertions(+), 1 deletion(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 2450253c1..e33c1d6fb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -326,6 +326,13 @@ fields("ctx_schema_validation_failed") -> {"event_type", event_type_sc(Event)}, {"validation", sc(binary(), #{desc => ?DESC("event_validation")})} | msg_event_common_fields() + ]; +fields("ctx_message_transformation_failed") -> + Event = 'message.transformation_failed', + [ + {"event_type", event_type_sc(Event)}, + {"transformation", sc(binary(), #{desc => ?DESC("event_transformation")})} + | msg_event_common_fields() ]. rule_input_message_context() -> @@ -345,7 +352,8 @@ rule_input_message_context() -> ref("ctx_check_authn_complete"), ref("ctx_bridge_mqtt"), ref("ctx_delivery_dropped"), - ref("ctx_schema_validation_failed") + ref("ctx_schema_validation_failed"), + ref("ctx_message_transformation_failed") ]), #{ desc => ?DESC("test_context"), diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index f766cd273..39ed7440d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -206,6 +206,8 @@ is_test_runtime_env() -> %% is different from `topic'. get_in_topic(#{event_type := schema_validation_failed}) -> <<"$events/schema_validation_failed">>; +get_in_topic(#{event_type := message_transformation_failed}) -> + <<"$events/message_transformation_failed">>; get_in_topic(Context) -> case maps:find(event_topic, Context) of {ok, EventTopic} -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl index 481247e2a..5eb5ede99 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl @@ -257,6 +257,21 @@ t_ctx_schema_validation_failed(_) -> Expected = check_result([validation], [], Context), do_test(SQL, Context, Expected). +t_ctx_message_transformation_failed(_) -> + SQL = + <<"SELECT transformation FROM \"$events/message_transformation_failed\"">>, + Context = #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"message_transformation_failed">>, + <<"payload">> => <<"{\"msg\": \"hello\"}">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">>, + <<"transformation">> => <<"m">> + }, + Expected = check_result([transformation], [], Context), + do_test(SQL, Context, Expected). + t_mongo_date_function_should_return_string_in_test_env(_) -> SQL = <<"SELECT mongo_date() as mongo_date FROM \"$events/client_check_authz_complete\"">>, diff --git a/rel/i18n/emqx_rule_api_schema.hocon b/rel/i18n/emqx_rule_api_schema.hocon index 18d0990a2..b317fa3eb 100644 --- a/rel/i18n/emqx_rule_api_schema.hocon +++ b/rel/i18n/emqx_rule_api_schema.hocon @@ -366,6 +366,12 @@ event_validation.desc: event_validation.label: """Validation""" +event_transformation.desc: +"""Transformation""" + +event_transformation.label: +"""Transformation""" + root_rule_info.desc: """Schema for rule info""" From 96c9020727db036171e5517d8d51b05c9f7e1d4e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 12 Jul 2024 11:41:56 -0300 Subject: [PATCH 5/5] chore: improve protobuf decoding error messages Fixes https://emqx.atlassian.net/browse/EMQX-12677 --- .../src/emqx_message_transformation.erl | 11 +++ ..._message_transformation_http_api_SUITE.erl | 88 +++++++++++++++++++ .../src/emqx_schema_registry_serde.erl | 65 +++++++++----- .../test/emqx_schema_registry_SUITE.erl | 4 +- 4 files changed, 146 insertions(+), 22 deletions(-) diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.erl b/apps/emqx_message_transformation/src/emqx_message_transformation.erl index 569f79846..5d3aaa22e 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.erl @@ -467,6 +467,17 @@ decode( } }, {error, TraceFailureContext}; + throw:{schema_decode_error, ExtraContext} -> + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_decode_error", + context = ExtraContext#{ + decoder => protobuf, + schema_name => SerdeName, + message_type => MessageType + } + }, + {error, TraceFailureContext}; Class:Error:Stacktrace -> TraceFailureContext = #trace_failure_context{ transformation = Transformation, diff --git a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl index 8d08c4706..b1b70a89d 100644 --- a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl +++ b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl @@ -1394,6 +1394,94 @@ t_protobuf(_Config) -> ok. +%% Checks what happens if a wrong transformation chain is used. In this case, the second +%% transformation attempts to protobuf-decode a message that was already decoded but not +%% re-encoded by the first transformation. +t_protobuf_bad_chain(_Config) -> + ?check_trace( + begin + SerdeName = <<"myserde">>, + MessageType = <<"Person">>, + protobuf_create_serde(SerdeName), + + Name1 = <<"foo">>, + PayloadSerde = #{ + <<"type">> => <<"protobuf">>, + <<"schema">> => SerdeName, + <<"message_type">> => MessageType + }, + NoneSerde = #{<<"type">> => <<"none">>}, + JSONSerde = #{<<"type">> => <<"json">>}, + + Transformation1 = transformation(Name1, _Ops1 = [], #{ + <<"payload_decoder">> => PayloadSerde, + <<"payload_encoder">> => NoneSerde + }), + {201, _} = insert(Transformation1), + + %% WRONG: after the first transformation, payload is already decoded, so we + %% shouldn't use protobuf again. + Name2 = <<"bar">>, + Transformation2A = transformation(Name2, [], #{ + <<"payload_decoder">> => PayloadSerde, + <<"payload_encoder">> => JSONSerde + }), + {201, _} = insert(Transformation2A), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + + [Payload | _] = protobuf_valid_payloads(SerdeName, MessageType), + ok = publish(C, <<"t/1">>, {raw, Payload}), + ?assertNotReceive({publish, _}), + + ok + end, + fun(Trace) -> + ct:pal("trace:\n ~p", [Trace]), + ?assertMatch( + [], + [ + E + || #{ + ?snk_kind := message_transformation_failed, + message := "payload_decode_schema_failure", + reason := function_clause + } = E <- Trace + ] + ), + %% No unexpected crashes + ?assertMatch( + [], + [ + E + || #{ + ?snk_kind := message_transformation_failed, + stacktrace := _ + } = E <- Trace + ] + ), + ?assertMatch( + [ + #{ + explain := + <<"Attempted to schema decode an already decoded message.", _/binary>> + } + | _ + ], + [ + E + || #{ + ?snk_kind := message_transformation_failed, + message := "payload_decode_error" + } = E <- Trace + ] + ), + ok + end + ), + ok. + %% Tests that restoring a backup config works. %% * Existing transformations (identified by `name') are left untouched. %% * No transformations are removed. diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl index 1a2f90ee7..0e661c356 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl @@ -90,21 +90,7 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) -> [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs] ); handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) -> - try - decode(SchemaId, Data, MoreArgs) - catch - error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} -> - throw( - {schema_decode_error, #{ - error_type => decoding_failure, - schema_id => SchemaId, - data => Data, - more_args => MoreArgs, - explain => - <<"The given data could not be decoded. Please check the input data and the schema.">> - }} - ) - end; + decode(SchemaId, Data, MoreArgs); handle_rule_function(schema_decode, Args) -> error({args_count_error, {schema_decode, Args}}); handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) -> @@ -162,7 +148,17 @@ encode(SerdeName, Data, VarArgs) when is_list(VarArgs) -> with_serde(Name, F) -> case emqx_schema_registry:get_serde(Name) of {ok, Serde} -> - F(Serde); + Meta = + case logger:get_process_metadata() of + undefined -> #{}; + Meta0 -> Meta0 + end, + logger:update_process_metadata(#{schema_name => Name}), + try + F(Serde) + after + logger:set_process_metadata(Meta) + end; {error, not_found} -> error({serde_not_found, Name}) end. @@ -199,10 +195,39 @@ make_serde(json, Name, Source) -> eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) -> Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]), avro_binary_decoder:decode(Data, Name, Store, Opts); -eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) -> - MessageName = binary_to_existing_atom(MessageName0, utf8), - Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]), - emqx_utils_maps:binary_key_map(Decoded); +eval_decode(#serde{type = protobuf}, [#{} = DecodedData, MessageType]) -> + %% Already decoded, so it's an user error. + throw( + {schema_decode_error, #{ + error_type => decoding_failure, + data => DecodedData, + message_type => MessageType, + explain => + << + "Attempted to schema decode an already decoded message." + " Check your rules or transformation pipeline." + >> + }} + ); +eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageType0]) -> + MessageType = binary_to_existing_atom(MessageType0, utf8), + try + Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageType]), + emqx_utils_maps:binary_key_map(Decoded) + catch + error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} -> + #{schema_name := SchemaName} = logger:get_process_metadata(), + throw( + {schema_decode_error, #{ + error_type => decoding_failure, + data => EncodedData, + message_type => MessageType, + schema_name => SchemaName, + explain => + <<"The given data could not be decoded. Please check the input data and the schema.">> + }} + ) + end; eval_decode(#serde{type = json, name = Name}, [Data]) -> true = is_binary(Data), Term = json_decode(Data), diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl index a403c3976..b9d214676 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl @@ -555,8 +555,8 @@ t_decode_fail(_Config) -> data := <<"ss">>, error_type := decoding_failure, explain := _, - more_args := [<<"Person">>], - schema_id := <<"my_serde">> + message_type := 'Person', + schema_name := <<"my_serde">> }}, emqx_rule_funcs:schema_decode(<<"my_serde">>, Payload, <<"Person">>) ),