diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl index 49ca57c42..814051733 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl @@ -623,6 +623,30 @@ t_publish_success(Config) -> ), ok. +t_publish_success_infinity_timeout(Config) -> + ServiceAccountJSON = ?config(service_account_json, Config), + Topic = <<"t/topic">>, + {ok, _} = create_bridge(Config, #{ + <<"resource_opts">> => #{<<"request_timeout">> => <<"infinity">>} + }), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + Payload = <<"payload">>, + Message = emqx_message:make(Topic, Payload), + emqx:publish(Message), + DecodedMessages = assert_http_request(ServiceAccountJSON), + ?assertMatch( + [ + #{ + <<"topic">> := Topic, + <<"payload">> := Payload, + <<"metadata">> := #{<<"rule_id">> := RuleId} + } + ], + DecodedMessages + ), + ok. + t_publish_success_local_topic(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl index 6738d6fef..702bc35ce 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -422,7 +422,7 @@ decode_cursor(Cursor) -> true = is_list(Name), {Node, #{transfer => {ClientId, FileId}, name => Name}} catch - error:{_, invalid_json} -> + error:{Loc, JsonError} when is_integer(Loc), is_atom(JsonError) -> error({badarg, cursor}); error:{badmatch, _} -> error({badarg, cursor}); diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl index abb774f82..40944c0e8 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl @@ -167,7 +167,7 @@ parse_filepath(PathBin) -> throw({invalid, PathBin}) end, PathComponents = filename:split(PathBin), - case lists:any(fun is_special_component/1, PathComponents) of + case PathComponents == [] orelse lists:any(fun is_special_component/1, PathComponents) of false -> filename:join(PathComponents); true -> diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 7d64f9716..e582db01f 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -47,6 +47,7 @@ groups() -> t_invalid_topic_format, t_meta_conflict, t_nasty_clientids_fileids, + t_nasty_filenames, t_no_meta, t_no_segment, t_simple_transfer @@ -205,10 +206,6 @@ t_invalid_filename(Config) -> encode_meta(meta(lists:duplicate(1000, $A), <<>>)), 1 ) - ), - ?assertRCName( - success, - emqtt:publish(C, mk_init_topic(<<"f5">>), encode_meta(meta("146%", <<>>)), 1) ). t_simple_transfer(Config) -> @@ -265,6 +262,22 @@ t_nasty_clientids_fileids(_Config) -> Transfers ). +t_nasty_filenames(_Config) -> + Filenames = [ + {<<"nasty1">>, "146%"}, + {<<"nasty2">>, "🌚"}, + {<<"nasty3">>, "δΈ­ζ–‡.txt"} + ], + ok = lists:foreach( + fun({ClientId, Filename}) -> + FileId = unicode:characters_to_binary(Filename), + ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Filename, FileId), + [Export] = list_files(ClientId), + ?assertEqual({ok, FileId}, read_export(Export)) + end, + Filenames + ). + t_meta_conflict(Config) -> C = ?config(client, Config), diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl index f69e13a6d..18a8e9841 100644 --- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl @@ -140,10 +140,7 @@ t_download_transfer(Config) -> request( get, uri(["file_transfer", "file"]) ++ - query(#{ - fileref => FileId, - node => <<"nonode@nohost">> - }) + query(#{fileref => FileId, node => <<"nonode@nohost">>}) ) ), @@ -152,10 +149,25 @@ t_download_transfer(Config) -> request( get, uri(["file_transfer", "file"]) ++ - query(#{ - fileref => <<"unknown_file">>, - node => node() - }) + query(#{fileref => <<"unknown_file">>, node => node()}) + ) + ), + + ?assertMatch( + {ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}}, + request_json( + get, + uri(["file_transfer", "file"]) ++ + query(#{fileref => <<>>, node => node()}) + ) + ), + + ?assertMatch( + {ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}}, + request_json( + get, + uri(["file_transfer", "file"]) ++ + query(#{fileref => <<"/etc/passwd">>, node => node()}) ) ), @@ -204,6 +216,16 @@ t_list_files_paging(Config) -> request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0})) ), + ?assertMatch( + {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, + request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>})) + ), + + ?assertMatch( + {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, + request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>})) + ), + ?assertMatch( {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, request_json( diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 47017f718..73e2f78e7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -1115,7 +1115,11 @@ date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) -> '$handle_undefined_function'(schema_decode, Args) -> error({args_count_error, {schema_decode, Args}}); '$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) -> - emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs); + %% encode outputs iolists, but when the rule actions process those + %% it might wrongly encode them as JSON lists, so we force them to + %% binaries here. + IOList = emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs), + iolist_to_binary(IOList); '$handle_undefined_function'(schema_encode, Args) -> error({args_count_error, {schema_encode, Args}}); '$handle_undefined_function'(sprintf, [Format | Args]) -> diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl index d17c159c3..99c4fa155 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -82,8 +82,12 @@ make_trace_fn_action() -> #{function => Fn, args => #{}}. create_rule_http(RuleParams) -> + create_rule_http(RuleParams, _Overrides = #{}). + +create_rule_http(RuleParams, Overrides) -> RepublishTopic = <<"republish/schema_registry">>, emqx:subscribe(RepublishTopic), + PayloadTemplate = maps:get(payload_template, Overrides, <<>>), DefaultParams = #{ enable => true, actions => [ @@ -93,7 +97,7 @@ create_rule_http(RuleParams) -> <<"args">> => #{ <<"topic">> => RepublishTopic, - <<"payload">> => <<>>, + <<"payload">> => PayloadTemplate, <<"qos">> => 0, <<"retain">> => false, <<"user_properties">> => <<>> @@ -177,10 +181,12 @@ test_params_for(avro, encode1) -> "from t\n" >>, Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + PayloadTemplate = <<"${.encoded}">>, ExtraArgs = [], #{ sql => SQL, payload => Payload, + payload_template => PayloadTemplate, extra_args => ExtraArgs }; test_params_for(avro, decode1) -> @@ -251,10 +257,12 @@ test_params_for(protobuf, encode1) -> "from t\n" >>, Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>}, + PayloadTemplate = <<"${.encoded}">>, ExtraArgs = [<<"Person">>], #{ sql => SQL, payload => Payload, + payload_template => PayloadTemplate, extra_args => ExtraArgs }; test_params_for(protobuf, union1) -> @@ -487,17 +495,18 @@ t_encode(Config) -> #{ sql := SQL, payload := Payload, + payload_template := PayloadTemplate, extra_args := ExtraArgs } = test_params_for(SerdeType, encode1), - {ok, _} = create_rule_http(#{sql => SQL}), + {ok, _} = create_rule_http(#{sql => SQL}, #{payload_template => PayloadTemplate}), PayloadBin = emqx_utils_json:encode(Payload), emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), Published = receive_published(?LINE), ?assertMatch( - #{payload := #{<<"encoded">> := _}}, + #{payload := P} when is_binary(P), Published ), - #{payload := #{<<"encoded">> := Encoded}} = Published, + #{payload := Encoded} = Published, {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName), ?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])), ok. diff --git a/mix.exs b/mix.exs index d66e917ca..e1331ec76 100644 --- a/mix.exs +++ b/mix.exs @@ -49,7 +49,7 @@ defmodule EMQXUmbrella.MixProject do {:redbug, "2.0.8"}, {:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.4.8", override: true}, + {:ehttpc, github: "emqx/ehttpc", tag: "0.4.10", override: true}, {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, diff --git a/rebar.config b/rebar.config index b18364aee..faa7d88e1 100644 --- a/rebar.config +++ b/rebar.config @@ -56,7 +56,7 @@ , {gpb, "4.19.7"} , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}} - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.8"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.10"}}} , {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}