Merge pull request #13453 from thalesmg/20240711-r57-mt-fixes

batch of message transformation fixes
This commit is contained in:
Thales Macedo Garitezi 2024-07-17 12:45:19 -03:00 committed by GitHub
commit cd8bf2725a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 231 additions and 46 deletions

View File

@ -62,9 +62,11 @@
node := _,
payload := _,
peername := _,
pub_props := _,
publish_received_at := _,
qos := _,
retain := _,
timestamp := _,
topic := _,
user_property := _,
username := _,
@ -345,6 +347,7 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
undefined
end,
Username = maps:get(username, Headers, undefined),
Timestamp = erlang:system_time(millisecond),
#{
dirty => Dirty,
@ -355,9 +358,11 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
node => node(),
payload => Payload,
peername => Peername,
pub_props => Props,
publish_received_at => Message#message.timestamp,
qos => Message#message.qos,
retain => emqx_message:get_flag(retain, Message, false),
timestamp => Timestamp,
topic => Message#message.topic,
user_property => UserProperties,
username => Username
@ -462,6 +467,17 @@ decode(
}
},
{error, TraceFailureContext};
throw:{schema_decode_error, ExtraContext} ->
TraceFailureContext = #trace_failure_context{
transformation = Transformation,
tag = "payload_decode_error",
context = ExtraContext#{
decoder => protobuf,
schema_name => SerdeName,
message_type => MessageType
}
},
{error, TraceFailureContext};
Class:Error:Stacktrace ->
TraceFailureContext = #trace_failure_context{
transformation = Transformation,

View File

@ -108,8 +108,7 @@ fields(transformation) ->
hoconsc:array(ref(operation)),
#{
desc => ?DESC("operation"),
required => true,
validator => fun validate_operations/1
default => []
}
)}
];
@ -253,11 +252,6 @@ validate_unique_topics(Topics) ->
{error, Msg}
end.
validate_operations([]) ->
{error, <<"at least one operation must be defined">>};
validate_operations([_ | _]) ->
ok.
compile_variform(Expression, #{make_serializable := true}) ->
case is_binary(Expression) of
true ->

View File

@ -504,7 +504,7 @@ assert_monitor_metrics() ->
receive
PATTERN = ____Msg0 -> ____Msg0
after TIMEOUT ->
error({message_not_received, ?LINE})
error({message_not_received, {line, ?LINE}})
end
end)()
).
@ -608,6 +608,8 @@ t_smoke_test(_Config) ->
%% * peername
%% * publish_received_at
%% * username
%% * timestamp
%% * pub_props (and specific fields within containing hyphens)
t_smoke_test_2(_Config) ->
Name1 = <<"foo">>,
Operations = [
@ -617,14 +619,22 @@ t_smoke_test_2(_Config) ->
operation(<<"payload.peername">>, <<"peername">>),
operation(<<"payload.publish_received_at">>, <<"publish_received_at">>),
operation(<<"payload.username">>, <<"username">>),
operation(<<"payload.flags">>, <<"flags">>)
operation(<<"payload.flags">>, <<"flags">>),
operation(<<"payload.timestamp">>, <<"timestamp">>),
operation(<<"payload.pub_props">>, <<"pub_props">>),
operation(<<"payload.content_type">>, <<"pub_props.Content-Type">>)
],
Transformation1 = transformation(Name1, Operations),
{201, _} = insert(Transformation1),
ClientId = atom_to_binary(?FUNCTION_NAME),
C1 = connect(ClientId),
{ok, _, [_]} = emqtt:subscribe(C1, <<"t/#">>, [{qos, 2}]),
ok = publish(C1, <<"t/1">>, #{}),
ok = publish(C1, <<"t/1">>, #{}, _QoS = 0, #{
props => #{
'Content-Type' => <<"application/json">>,
'User-Property' => [{<<"a">>, <<"b">>}]
}
}),
{publish, #{payload := Payload0}} = ?assertReceiveReturn({publish, _}, 1_000),
NodeBin = atom_to_binary(node()),
?assertMatch(
@ -635,7 +645,13 @@ t_smoke_test_2(_Config) ->
<<"peername">> := <<"127.0.0.1:", _/binary>>,
<<"publish_received_at">> := PRAt,
<<"username">> := <<"undefined">>,
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false}
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false},
<<"timestamp">> := _,
<<"pub_props">> := #{
<<"Content-Type">> := <<"application/json">>,
<<"User-Property">> := #{<<"a">> := <<"b">>}
},
<<"content_type">> := <<"application/json">>
} when is_integer(PRAt),
emqx_utils_json:decode(Payload0, [return_maps])
),
@ -644,7 +660,12 @@ t_smoke_test_2(_Config) ->
Username = <<"myusername">>,
C2 = connect(ClientId, _IsPersistent = false, #{start_props => #{username => Username}}),
{ok, _, [_]} = emqtt:subscribe(C2, <<"t/#">>, [{qos, 2}]),
ok = publish(C2, <<"t/1">>, #{}),
ok = publish(C2, <<"t/1">>, #{}, _QoS = 0, #{
props => #{
'Content-Type' => <<"application/json">>,
'User-Property' => [{<<"a">>, <<"b">>}]
}
}),
{publish, #{payload := Payload1}} = ?assertReceiveReturn({publish, _}, 1_000),
?assertMatch(
#{
@ -654,7 +675,13 @@ t_smoke_test_2(_Config) ->
<<"peername">> := <<"127.0.0.1:", _/binary>>,
<<"publish_received_at">> := PRAt,
<<"username">> := Username,
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false}
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false},
<<"timestamp">> := _,
<<"pub_props">> := #{
<<"Content-Type">> := <<"application/json">>,
<<"User-Property">> := #{<<"a">> := <<"b">>}
},
<<"content_type">> := <<"application/json">>
} when is_integer(PRAt),
emqx_utils_json:decode(Payload1, [return_maps])
),
@ -1367,6 +1394,94 @@ t_protobuf(_Config) ->
ok.
%% Checks what happens if a wrong transformation chain is used. In this case, the second
%% transformation attempts to protobuf-decode a message that was already decoded but not
%% re-encoded by the first transformation.
t_protobuf_bad_chain(_Config) ->
?check_trace(
begin
SerdeName = <<"myserde">>,
MessageType = <<"Person">>,
protobuf_create_serde(SerdeName),
Name1 = <<"foo">>,
PayloadSerde = #{
<<"type">> => <<"protobuf">>,
<<"schema">> => SerdeName,
<<"message_type">> => MessageType
},
NoneSerde = #{<<"type">> => <<"none">>},
JSONSerde = #{<<"type">> => <<"json">>},
Transformation1 = transformation(Name1, _Ops1 = [], #{
<<"payload_decoder">> => PayloadSerde,
<<"payload_encoder">> => NoneSerde
}),
{201, _} = insert(Transformation1),
%% WRONG: after the first transformation, payload is already decoded, so we
%% shouldn't use protobuf again.
Name2 = <<"bar">>,
Transformation2A = transformation(Name2, [], #{
<<"payload_decoder">> => PayloadSerde,
<<"payload_encoder">> => JSONSerde
}),
{201, _} = insert(Transformation2A),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
[Payload | _] = protobuf_valid_payloads(SerdeName, MessageType),
ok = publish(C, <<"t/1">>, {raw, Payload}),
?assertNotReceive({publish, _}),
ok
end,
fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]),
?assertMatch(
[],
[
E
|| #{
?snk_kind := message_transformation_failed,
message := "payload_decode_schema_failure",
reason := function_clause
} = E <- Trace
]
),
%% No unexpected crashes
?assertMatch(
[],
[
E
|| #{
?snk_kind := message_transformation_failed,
stacktrace := _
} = E <- Trace
]
),
?assertMatch(
[
#{
explain :=
<<"Attempted to schema decode an already decoded message.", _/binary>>
}
| _
],
[
E
|| #{
?snk_kind := message_transformation_failed,
message := "payload_decode_error"
} = E <- Trace
]
),
ok
end
),
ok.
%% Tests that restoring a backup config works.
%% * Existing transformations (identified by `name') are left untouched.
%% * No transformations are removed.

View File

@ -114,14 +114,9 @@ schema_test_() ->
transformation(<<"foo">>, [dummy_operation()])
])
)},
{"operations must be non-empty",
?_assertThrow(
{_Schema, [
#{
reason := <<"at least one operation must be defined">>,
kind := validation_error
}
]},
{"operations may be empty",
?_assertMatch(
[#{<<"operations">> := []}],
parse_and_check([
transformation(
<<"foo">>,

View File

@ -326,6 +326,13 @@ fields("ctx_schema_validation_failed") ->
{"event_type", event_type_sc(Event)},
{"validation", sc(binary(), #{desc => ?DESC("event_validation")})}
| msg_event_common_fields()
];
fields("ctx_message_transformation_failed") ->
Event = 'message.transformation_failed',
[
{"event_type", event_type_sc(Event)},
{"transformation", sc(binary(), #{desc => ?DESC("event_transformation")})}
| msg_event_common_fields()
].
rule_input_message_context() ->
@ -345,7 +352,8 @@ rule_input_message_context() ->
ref("ctx_check_authn_complete"),
ref("ctx_bridge_mqtt"),
ref("ctx_delivery_dropped"),
ref("ctx_schema_validation_failed")
ref("ctx_schema_validation_failed"),
ref("ctx_message_transformation_failed")
]),
#{
desc => ?DESC("test_context"),

View File

@ -206,6 +206,8 @@ is_test_runtime_env() ->
%% is different from `topic'.
get_in_topic(#{event_type := schema_validation_failed}) ->
<<"$events/schema_validation_failed">>;
get_in_topic(#{event_type := message_transformation_failed}) ->
<<"$events/message_transformation_failed">>;
get_in_topic(Context) ->
case maps:find(event_topic, Context) of
{ok, EventTopic} ->

View File

@ -257,6 +257,21 @@ t_ctx_schema_validation_failed(_) ->
Expected = check_result([validation], [], Context),
do_test(SQL, Context, Expected).
t_ctx_message_transformation_failed(_) ->
SQL =
<<"SELECT transformation FROM \"$events/message_transformation_failed\"">>,
Context = #{
<<"clientid">> => <<"c_emqx">>,
<<"event_type">> => <<"message_transformation_failed">>,
<<"payload">> => <<"{\"msg\": \"hello\"}">>,
<<"qos">> => 1,
<<"topic">> => <<"t/a">>,
<<"username">> => <<"u_emqx">>,
<<"transformation">> => <<"m">>
},
Expected = check_result([transformation], [], Context),
do_test(SQL, Context, Expected).
t_mongo_date_function_should_return_string_in_test_env(_) ->
SQL =
<<"SELECT mongo_date() as mongo_date FROM \"$events/client_check_authz_complete\"">>,

View File

@ -90,21 +90,7 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) ->
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
);
handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
try
decode(SchemaId, Data, MoreArgs)
catch
error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} ->
throw(
{schema_decode_error, #{
error_type => decoding_failure,
schema_id => SchemaId,
data => Data,
more_args => MoreArgs,
explain =>
<<"The given data could not be decoded. Please check the input data and the schema.">>
}}
)
end;
decode(SchemaId, Data, MoreArgs);
handle_rule_function(schema_decode, Args) ->
error({args_count_error, {schema_decode, Args}});
handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
@ -162,7 +148,17 @@ encode(SerdeName, Data, VarArgs) when is_list(VarArgs) ->
with_serde(Name, F) ->
case emqx_schema_registry:get_serde(Name) of
{ok, Serde} ->
F(Serde);
Meta =
case logger:get_process_metadata() of
undefined -> #{};
Meta0 -> Meta0
end,
logger:update_process_metadata(#{schema_name => Name}),
try
F(Serde)
after
logger:set_process_metadata(Meta)
end;
{error, not_found} ->
error({serde_not_found, Name})
end.
@ -199,10 +195,39 @@ make_serde(json, Name, Source) ->
eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
avro_binary_decoder:decode(Data, Name, Store, Opts);
eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) ->
MessageName = binary_to_existing_atom(MessageName0, utf8),
Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]),
emqx_utils_maps:binary_key_map(Decoded);
eval_decode(#serde{type = protobuf}, [#{} = DecodedData, MessageType]) ->
%% Already decoded, so it's an user error.
throw(
{schema_decode_error, #{
error_type => decoding_failure,
data => DecodedData,
message_type => MessageType,
explain =>
<<
"Attempted to schema decode an already decoded message."
" Check your rules or transformation pipeline."
>>
}}
);
eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageType0]) ->
MessageType = binary_to_existing_atom(MessageType0, utf8),
try
Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageType]),
emqx_utils_maps:binary_key_map(Decoded)
catch
error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} ->
#{schema_name := SchemaName} = logger:get_process_metadata(),
throw(
{schema_decode_error, #{
error_type => decoding_failure,
data => EncodedData,
message_type => MessageType,
schema_name => SchemaName,
explain =>
<<"The given data could not be decoded. Please check the input data and the schema.">>
}}
)
end;
eval_decode(#serde{type = json, name = Name}, [Data]) ->
true = is_binary(Data),
Term = json_decode(Data),

View File

@ -555,8 +555,8 @@ t_decode_fail(_Config) ->
data := <<"ss">>,
error_type := decoding_failure,
explain := _,
more_args := [<<"Person">>],
schema_id := <<"my_serde">>
message_type := 'Person',
schema_name := <<"my_serde">>
}},
emqx_rule_funcs:schema_decode(<<"my_serde">>, Payload, <<"Person">>)
),

View File

@ -1,6 +1,6 @@
Definitions.
%% Define regular expressions for tokens
IDENTIFIER = [a-zA-Z][a-zA-Z0-9_.]*
IDENTIFIER = [a-zA-Z][-a-zA-Z0-9_.]*
SQ_STRING = \'[^\']*\'
DQ_STRING = \"[^\"]*\"
INTEGER = [+-]?[0-9]+

View File

@ -23,7 +23,7 @@
-define(SYNTAX_ERROR, {error, "syntax error before:" ++ _}).
redner_test_() ->
render_test_() ->
[
{"direct var reference", fun() -> ?assertEqual({ok, <<"1">>}, render("a", #{a => 1})) end},
{"concat strings", fun() ->
@ -32,6 +32,15 @@ redner_test_() ->
{"concat empty string", fun() ->
?assertEqual({ok, <<"">>}, render("concat([''])", #{}))
end},
{"identifier with hyphen", fun() ->
?assertEqual(
{ok, <<"10">>},
render(
"pub_props.Message-Expiry-Interval",
#{pub_props => #{'Message-Expiry-Interval' => 10}}
)
)
end},
{"tokens 1st", fun() ->
?assertEqual({ok, <<"a">>}, render("nth(1,tokens(var, ','))", #{var => <<"a,b">>}))
end},

View File

@ -366,6 +366,12 @@ event_validation.desc:
event_validation.label:
"""Validation"""
event_transformation.desc:
"""Transformation"""
event_transformation.label:
"""Transformation"""
root_rule_info.desc:
"""Schema for rule info"""