diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 47017f718..73e2f78e7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -1115,7 +1115,11 @@ date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) -> '$handle_undefined_function'(schema_decode, Args) -> error({args_count_error, {schema_decode, Args}}); '$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) -> - emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs); + %% encode outputs iolists, but when the rule actions process those + %% it might wrongly encode them as JSON lists, so we force them to + %% binaries here. + IOList = emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs), + iolist_to_binary(IOList); '$handle_undefined_function'(schema_encode, Args) -> error({args_count_error, {schema_encode, Args}}); '$handle_undefined_function'(sprintf, [Format | Args]) -> diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl index 7ad01fa06..1f53766e3 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -82,8 +82,12 @@ make_trace_fn_action() -> #{function => Fn, args => #{}}. create_rule_http(RuleParams) -> + create_rule_http(RuleParams, _Overrides = #{}). + +create_rule_http(RuleParams, Overrides) -> RepublishTopic = <<"republish/schema_registry">>, emqx:subscribe(RepublishTopic), + PayloadTemplate = maps:get(payload_template, Overrides, <<>>), DefaultParams = #{ enable => true, actions => [ @@ -93,7 +97,7 @@ create_rule_http(RuleParams) -> <<"args">> => #{ <<"topic">> => RepublishTopic, - <<"payload">> => <<>>, + <<"payload">> => PayloadTemplate, <<"qos">> => 0, <<"retain">> => false, <<"user_properties">> => <<>> @@ -177,10 +181,12 @@ test_params_for(avro, encode1) -> "from t\n" >>, Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + PayloadTemplate = <<"${.encoded}">>, ExtraArgs = [], #{ sql => SQL, payload => Payload, + payload_template => PayloadTemplate, extra_args => ExtraArgs }; test_params_for(avro, decode1) -> @@ -251,10 +257,12 @@ test_params_for(protobuf, encode1) -> "from t\n" >>, Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>}, + PayloadTemplate = <<"${.encoded}">>, ExtraArgs = [<<"Person">>], #{ sql => SQL, payload => Payload, + payload_template => PayloadTemplate, extra_args => ExtraArgs }; test_params_for(protobuf, union1) -> @@ -487,17 +495,18 @@ t_encode(Config) -> #{ sql := SQL, payload := Payload, + payload_template := PayloadTemplate, extra_args := ExtraArgs } = test_params_for(SerdeType, encode1), - {ok, _} = create_rule_http(#{sql => SQL}), + {ok, _} = create_rule_http(#{sql => SQL}, #{payload_template => PayloadTemplate}), PayloadBin = emqx_utils_json:encode(Payload), emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), Published = receive_published(?LINE), ?assertMatch( - #{payload := #{<<"encoded">> := _}}, + #{payload := P} when is_binary(P), Published ), - #{payload := #{<<"encoded">> := Encoded}} = Published, + #{payload := Encoded} = Published, {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName), ?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])), ok.