fix(schema_registry): ensure `schema_encode` output in rule engine is a binary (r5.0)

Fixes https://emqx.atlassian.net/browse/EMQX-9981
This commit is contained in:
Thales Macedo Garitezi 2023-05-23 16:25:27 -03:00
parent cc2bb87b82
commit 6c414ab991
2 changed files with 18 additions and 5 deletions

View File

@ -1115,7 +1115,11 @@ date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) ->
'$handle_undefined_function'(schema_decode, Args) ->
error({args_count_error, {schema_decode, Args}});
'$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) ->
emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs);
%% encode outputs iolists, but when the rule actions process those
%% it might wrongly encode them as JSON lists, so we force them to
%% binaries here.
IOList = emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs),
iolist_to_binary(IOList);
'$handle_undefined_function'(schema_encode, Args) ->
error({args_count_error, {schema_encode, Args}});
'$handle_undefined_function'(sprintf, [Format | Args]) ->

View File

@ -82,8 +82,12 @@ make_trace_fn_action() ->
#{function => Fn, args => #{}}.
create_rule_http(RuleParams) ->
create_rule_http(RuleParams, _Overrides = #{}).
create_rule_http(RuleParams, Overrides) ->
RepublishTopic = <<"republish/schema_registry">>,
emqx:subscribe(RepublishTopic),
PayloadTemplate = maps:get(payload_template, Overrides, <<>>),
DefaultParams = #{
enable => true,
actions => [
@ -93,7 +97,7 @@ create_rule_http(RuleParams) ->
<<"args">> =>
#{
<<"topic">> => RepublishTopic,
<<"payload">> => <<>>,
<<"payload">> => PayloadTemplate,
<<"qos">> => 0,
<<"retain">> => false,
<<"user_properties">> => <<>>
@ -177,10 +181,12 @@ test_params_for(avro, encode1) ->
"from t\n"
>>,
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
PayloadTemplate = <<"${.encoded}">>,
ExtraArgs = [],
#{
sql => SQL,
payload => Payload,
payload_template => PayloadTemplate,
extra_args => ExtraArgs
};
test_params_for(avro, decode1) ->
@ -251,10 +257,12 @@ test_params_for(protobuf, encode1) ->
"from t\n"
>>,
Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
PayloadTemplate = <<"${.encoded}">>,
ExtraArgs = [<<"Person">>],
#{
sql => SQL,
payload => Payload,
payload_template => PayloadTemplate,
extra_args => ExtraArgs
};
test_params_for(protobuf, union1) ->
@ -487,17 +495,18 @@ t_encode(Config) ->
#{
sql := SQL,
payload := Payload,
payload_template := PayloadTemplate,
extra_args := ExtraArgs
} = test_params_for(SerdeType, encode1),
{ok, _} = create_rule_http(#{sql => SQL}),
{ok, _} = create_rule_http(#{sql => SQL}, #{payload_template => PayloadTemplate}),
PayloadBin = emqx_utils_json:encode(Payload),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Published = receive_published(?LINE),
?assertMatch(
#{payload := #{<<"encoded">> := _}},
#{payload := P} when is_binary(P),
Published
),
#{payload := #{<<"encoded">> := Encoded}} = Published,
#{payload := Encoded} = Published,
{ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])),
ok.