diff --git a/apps/emqx_auth/src/emqx_auth.app.src b/apps/emqx_auth/src/emqx_auth.app.src index db4b69ded..6db2d6213 100644 --- a/apps/emqx_auth/src/emqx_auth.app.src +++ b/apps/emqx_auth/src/emqx_auth.app.src @@ -7,7 +7,8 @@ {applications, [ kernel, stdlib, - emqx + emqx, + emqx_utils ]}, {mod, {emqx_auth_app, []}}, {env, []}, diff --git a/apps/emqx_auth/src/emqx_auth_utils.erl b/apps/emqx_auth/src/emqx_auth_utils.erl new file mode 100644 index 000000000..0abb336af --- /dev/null +++ b/apps/emqx_auth/src/emqx_auth_utils.erl @@ -0,0 +1,78 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_auth_utils). + +%% TODO +%% Move more identical authn and authz helpers here + +-export([parse_url/1]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec parse_url(binary()) -> + {_Base :: emqx_utils_uri:request_base(), _Path :: binary(), _Query :: binary()}. +parse_url(Url) -> + Parsed = emqx_utils_uri:parse(Url), + case Parsed of + #{scheme := undefined} -> + throw({invalid_url, {no_scheme, Url}}); + #{authority := undefined} -> + throw({invalid_url, {no_host, Url}}); + #{authority := #{userinfo := Userinfo}} when Userinfo =/= undefined -> + throw({invalid_url, {userinfo_not_supported, Url}}); + #{fragment := Fragment} when Fragment =/= undefined -> + throw({invalid_url, {fragments_not_supported, Url}}); + _ -> + case emqx_utils_uri:request_base(Parsed) of + {ok, Base} -> + {Base, emqx_utils_uri:path(Parsed), + emqx_maybe:define(emqx_utils_uri:query(Parsed), <<>>)}; + {error, Reason} -> + throw({invalid_url, {invalid_base, Reason, Url}}) + end + end. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +templates_test_() -> + [ + ?_assertEqual( + { + #{port => 80, scheme => http, host => "example.com"}, + <<"">>, + <<"client=${clientid}">> + }, + parse_url(<<"http://example.com?client=${clientid}">>) + ), + ?_assertEqual( + { + #{port => 80, scheme => http, host => "example.com"}, + <<"/path">>, + <<"client=${clientid}">> + }, + parse_url(<<"http://example.com/path?client=${clientid}">>) + ), + ?_assertEqual( + {#{port => 80, scheme => http, host => "example.com"}, <<"/path">>, <<>>}, + parse_url(<<"http://example.com/path">>) + ) + ]. + +-endif. diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_utils.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_utils.erl index 8d4d245da..a08ac260c 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_utils.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_utils.erl @@ -39,7 +39,6 @@ make_resource_id/1, without_password/1, to_bool/1, - parse_url/1, convert_headers/1, convert_headers_no_content_type/1, default_headers/0, @@ -287,25 +286,6 @@ to_bool(MaybeBinInt) when is_binary(MaybeBinInt) -> to_bool(_) -> false. -parse_url(Url) -> - case string:split(Url, "//", leading) of - [Scheme, UrlRem] -> - case string:split(UrlRem, "/", leading) of - [HostPort, Remaining] -> - BaseUrl = iolist_to_binary([Scheme, "//", HostPort]), - case string:split(Remaining, "?", leading) of - [Path, QueryString] -> - {BaseUrl, <<"/", Path/binary>>, QueryString}; - [Path] -> - {BaseUrl, <<"/", Path/binary>>, <<>>} - end; - [HostPort] -> - {iolist_to_binary([Scheme, "//", HostPort]), <<>>, <<>>} - end; - [Url] -> - throw({invalid_url, Url}) - end. - convert_headers(Headers) -> transform_header_name(Headers). diff --git a/apps/emqx_auth_http/src/emqx_authn_http.erl b/apps/emqx_auth_http/src/emqx_authn_http.erl index 96eb1215a..18995cb9d 100644 --- a/apps/emqx_auth_http/src/emqx_authn_http.erl +++ b/apps/emqx_auth_http/src/emqx_authn_http.erl @@ -134,21 +134,25 @@ parse_config( request_timeout := RequestTimeout } = Config ) -> - {BaseUrl0, Path, Query} = emqx_authn_utils:parse_url(RawUrl), - {ok, BaseUrl} = emqx_http_lib:uri_parse(BaseUrl0), + {RequestBase, Path, Query} = emqx_auth_utils:parse_url(RawUrl), State = #{ method => Method, path => Path, headers => ensure_header_name_type(Headers), base_path_template => emqx_authn_utils:parse_str(Path), base_query_template => emqx_authn_utils:parse_deep( - cow_qs:parse_qs(to_bin(Query)) + cow_qs:parse_qs(Query) ), body_template => emqx_authn_utils:parse_deep(maps:get(body, Config, #{})), request_timeout => RequestTimeout, url => RawUrl }, - {ok, Config#{base_url => BaseUrl, pool_type => random}, State}. + {ok, + Config#{ + request_base => RequestBase, + pool_type => random + }, + State}. generate_request(Credential, #{ method := Method, @@ -246,14 +250,14 @@ request_for_log(Credential, #{url := Url, method := Method} = State) -> {PathQuery, Headers} -> #{ method => Method, - base_url => Url, + url => Url, path_query => PathQuery, headers => Headers }; {PathQuery, Headers, Body} -> #{ method => Method, - base_url => Url, + url => Url, path_query => PathQuery, headers => Headers, body => Body @@ -274,11 +278,6 @@ to_list(B) when is_binary(B) -> to_list(L) when is_list(L) -> L. -to_bin(B) when is_binary(B) -> - B; -to_bin(L) when is_list(L) -> - list_to_binary(L). - ensure_header_name_type(Headers) -> Fun = fun (Key, _Val, Acc) when is_binary(Key) -> diff --git a/apps/emqx_auth_http/src/emqx_authz_http.erl b/apps/emqx_auth_http/src/emqx_authz_http.erl index e4d4326ff..6b0152b7d 100644 --- a/apps/emqx_auth_http/src/emqx_authz_http.erl +++ b/apps/emqx_auth_http/src/emqx_authz_http.erl @@ -29,8 +29,7 @@ update/1, destroy/1, authorize/4, - merge_defaults/1, - parse_url/1 + merge_defaults/1 ]). -ifdef(TEST). @@ -160,15 +159,14 @@ parse_config( request_timeout := ReqTimeout } = Conf ) -> - {BaseUrl0, Path, Query} = parse_url(RawUrl), - {ok, BaseUrl} = emqx_http_lib:uri_parse(BaseUrl0), + {RequestBase, Path, Query} = emqx_auth_utils:parse_url(RawUrl), Conf#{ method => Method, - base_url => BaseUrl, + request_base => RequestBase, headers => Headers, base_path_template => emqx_authz_utils:parse_str(Path, allowed_vars()), base_query_template => emqx_authz_utils:parse_deep( - cow_qs:parse_qs(to_bin(Query)), + cow_qs:parse_qs(Query), allowed_vars() ), body_template => emqx_authz_utils:parse_deep( @@ -180,25 +178,6 @@ parse_config( pool_type => random }. -parse_url(Url) -> - case string:split(Url, "//", leading) of - [Scheme, UrlRem] -> - case string:split(UrlRem, "/", leading) of - [HostPort, Remaining] -> - BaseUrl = iolist_to_binary([Scheme, "//", HostPort]), - case string:split(Remaining, "?", leading) of - [Path, QueryString] -> - {BaseUrl, <<"/", Path/binary>>, QueryString}; - [Path] -> - {BaseUrl, <<"/", Path/binary>>, <<>>} - end; - [HostPort] -> - {iolist_to_binary([Scheme, "//", HostPort]), <<>>, <<>>} - end; - [Url] -> - throw({invalid_url, Url}) - end. - generate_request( Action, Topic, @@ -272,10 +251,6 @@ to_list(B) when is_binary(B) -> to_list(L) when is_list(L) -> L. -to_bin(B) when is_binary(B) -> B; -to_bin(L) when is_list(L) -> list_to_binary(L); -to_bin(X) -> X. - allowed_vars() -> allowed_vars(emqx_authz:feature_available(rich_actions)). diff --git a/apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl b/apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl index 864aa6e0e..dc1443b19 100644 --- a/apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl +++ b/apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl @@ -112,7 +112,11 @@ t_create_invalid(_Config) -> InvalidConfigs = [ AuthConfig#{<<"headers">> => []}, - AuthConfig#{<<"method">> => <<"delete">>} + AuthConfig#{<<"method">> => <<"delete">>}, + AuthConfig#{<<"url">> => <<"localhost">>}, + AuthConfig#{<<"url">> => <<"http://foo.com/xxx#fragment">>}, + AuthConfig#{<<"url">> => <<"http://${foo}.com/xxx">>}, + AuthConfig#{<<"url">> => <<"//foo.com/xxx">>} ], lists:foreach( diff --git a/apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl b/apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl index 3fb2c0572..70822e802 100644 --- a/apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl +++ b/apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl @@ -639,6 +639,8 @@ t_create_replace(_Config) -> listener => {tcp, default} }, + ValidConfig = raw_http_authz_config(), + %% Create with valid URL ok = setup_handler_and_config( fun(Req0, State) -> @@ -656,13 +658,10 @@ t_create_replace(_Config) -> ), %% Changing to valid config - OkConfig = maps:merge( - raw_http_authz_config(), - #{ - <<"url">> => - <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">> - } - ), + OkConfig = ValidConfig#{ + <<"url">> => + <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">> + }, ?assertMatch( {ok, _}, @@ -672,6 +671,37 @@ t_create_replace(_Config) -> ?assertEqual( allow, emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH, <<"t">>) + ), + + ?assertMatch( + {error, _}, + emqx_authz:update({?CMD_REPLACE, http}, ValidConfig#{ + <<"url">> => <<"localhost">> + }) + ), + + ?assertMatch( + {error, _}, + emqx_authz:update({?CMD_REPLACE, http}, ValidConfig#{ + <<"url">> => <<"//foo.bar/x/y?q=z">> + }) + ), + + ?assertMatch( + {error, _}, + emqx_authz:update({?CMD_REPLACE, http}, ValidConfig#{ + <<"url">> => + <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}#fragment">> + }) + ). + +t_uri_normalization(_Config) -> + ok = emqx_authz_test_lib:setup_config( + raw_http_authz_config(), + #{ + <<"url">> => + <<"http://127.0.0.1:33333?topic=${topic}&action=${action}">> + } ). %%------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index f334a8607..d2408ca73 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -336,21 +336,13 @@ parse_confs( } = Conf ) -> Url1 = bin(Url), - {BaseUrl, Path} = parse_url(Url1), - BaseUrl1 = - case emqx_http_lib:uri_parse(BaseUrl) of - {ok, BUrl} -> - BUrl; - {error, Reason} -> - Reason1 = emqx_utils:readable_error_msg(Reason), - invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>) - end, + {RequestBase, Path} = parse_url(Url1), RequestTTL = emqx_utils_maps:deep_get( [resource_opts, request_ttl], Conf ), Conf#{ - base_url => BaseUrl1, + request_base => RequestBase, request => #{ path => Path, @@ -422,16 +414,24 @@ connector_config(BridgeType, Config) -> end. parse_url(Url) -> - case string:split(Url, "//", leading) of - [Scheme, UrlRem] -> - case string:split(UrlRem, "/", leading) of - [HostPort, Path] -> - {iolist_to_binary([Scheme, "//", HostPort]), Path}; - [HostPort] -> - {iolist_to_binary([Scheme, "//", HostPort]), <<>>} - end; - [Url] -> - invalid_data(<<"Missing scheme in URL: ", Url/binary>>) + Parsed = emqx_utils_uri:parse(Url), + case Parsed of + #{scheme := undefined} -> + invalid_data(<<"Missing scheme in URL: ", Url/binary>>); + #{authority := undefined} -> + invalid_data(<<"Missing host in URL: ", Url/binary>>); + #{authority := #{userinfo := Userinfo}} when Userinfo =/= undefined -> + invalid_data(<<"Userinfo is not supported in URL: ", Url/binary>>); + #{fragment := Fragment} when Fragment =/= undefined -> + invalid_data(<<"Fragments are not supported in URL: ", Url/binary>>); + _ -> + case emqx_utils_uri:request_base(Parsed) of + {ok, Base} -> + {Base, emqx_maybe:define(emqx_utils_uri:path(Parsed), <<>>)}; + {error, Reason0} -> + Reason1 = emqx_utils:readable_error_msg(Reason0), + invalid_data(<<"Invalid URL: ", Url/binary, ", details: ", Reason1/binary>>) + end end. bin(Bin) when is_binary(Bin) -> Bin; diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index ca7d46ec5..0ffd143dc 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_dynamo, [ {description, "EMQX Enterprise Dynamo Bridge"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 4e974a8a9..82f5fb18d 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -86,7 +86,7 @@ on_start( config => redact(Config) }), - {Schema, Server, DefaultPort} = get_host_info(to_str(Url)), + {Scheme, Server, DefaultPort} = get_host_info(to_str(Url)), #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{ default_port => DefaultPort }), @@ -97,7 +97,7 @@ on_start( port => Port, aws_access_key_id => to_str(AccessKeyID), aws_secret_access_key => SecretAccessKey, - schema => Schema + scheme => Scheme }}, {pool_size, PoolSize} ], diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl index cc37c8dd0..0613ca3bb 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl @@ -63,11 +63,11 @@ init(#{ aws_secret_access_key := Secret, host := Host, port := Port, - schema := Schema + scheme := Scheme }) -> %% TODO: teach `erlcloud` to to accept 0-arity closures as passwords. SecretAccessKey = to_str(emqx_secret:unwrap(Secret)), - erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema), + erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Scheme), {ok, #{}}. handle_call(is_connected, _From, State) -> diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.app.src b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src index 9ed9d572d..7e5f6203b 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.app.src +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_es, [ {description, "EMQX Enterprise Elastic Search Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {modules, [ emqx_bridge_es, emqx_bridge_es_connector diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index d94ce8e15..20de92e6e 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -44,11 +44,10 @@ -type config() :: #{ - base_url := #{ + request_base := #{ scheme := http | https, host := iolist(), - port := inet:port_number(), - path := _ + port := inet:port_number() }, connect_timeout := pos_integer(), pool_type := random | hash, @@ -59,7 +58,6 @@ -type state() :: #{ - base_path := _, connect_timeout := pos_integer(), pool_type := random | hash, channels := map(), diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src index bb89461ec..57fceae74 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src +++ b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_http, [ {description, "EMQX HTTP Bridge and Connector Application"}, - {vsn, "0.3.0"}, + {vsn, "0.3.1"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, ehttpc]}, {env, [ diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl index f84112700..6bb65babc 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl @@ -98,15 +98,6 @@ validate_webhook_url(undefined) -> required_field => <<"url">> }); validate_webhook_url(Url) -> - {BaseUrl, _Path} = emqx_connector_resource:parse_url(Url), - case emqx_http_lib:uri_parse(BaseUrl) of - {ok, _} -> - ok; - {error, Reason} -> - throw(#{ - kind => validation_error, - reason => invalid_url, - url => Url, - error => emqx_utils:readable_error_msg(Reason) - }) - end. + %% parse_url throws if the URL is invalid + {_RequestBase, _Path} = emqx_connector_resource:parse_url(Url), + ok. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index f639596b6..785424c67 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -53,7 +53,7 @@ %% for other http-like connectors. -export([redact_request/1]). --export([validate_method/1, join_paths/2, formalize_request/3, transform_result/1]). +-export([validate_method/1, join_paths/2, formalize_request/2, transform_result/1]). -define(DEFAULT_PIPELINE_SIZE, 100). -define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000). @@ -189,11 +189,10 @@ callback_mode() -> async_if_possible. on_start( InstId, #{ - base_url := #{ + request_base := #{ scheme := Scheme, host := Host, - port := Port, - path := BasePath + port := Port }, connect_timeout := ConnectTimeout, pool_type := PoolType, @@ -227,13 +226,13 @@ on_start( {transport_opts, NTransportOpts}, {enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)} ], + State = #{ pool_name => InstId, pool_type => PoolType, host => Host, port => Port, connect_timeout => ConnectTimeout, - base_path => BasePath, scheme => Scheme, request => preprocess_request(maps:get(request, Config, undefined)) }, @@ -362,7 +361,7 @@ on_query(InstId, {Method, Request, Timeout}, State) -> on_query( InstId, {ActionId, KeyOrNum, Method, Request, Timeout, Retry}, - #{base_path := BasePath, host := Host, scheme := Scheme, port := Port} = State + #{host := Host, scheme := Scheme, port := Port} = State ) -> ?TRACE( "QUERY", @@ -375,7 +374,7 @@ on_query( state => redact(State) } ), - NRequest = formalize_request(Method, BasePath, Request), + NRequest = formalize_request(Method, Request), trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout), Worker = resolve_pool_worker(State, KeyOrNum), Result0 = ehttpc:request( @@ -472,7 +471,7 @@ on_query_async( InstId, {ActionId, KeyOrNum, Method, Request, Timeout}, ReplyFunAndArgs, - #{base_path := BasePath, host := Host, port := Port, scheme := Scheme} = State + #{host := Host, port := Port, scheme := Scheme} = State ) -> Worker = resolve_pool_worker(State, KeyOrNum), ?TRACE( @@ -485,7 +484,7 @@ on_query_async( state => redact(State) } ), - NRequest = formalize_request(Method, BasePath, Request), + NRequest = formalize_request(Method, Request), trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout), MaxAttempts = maps:get(max_attempts, State, 3), Context = #{ @@ -824,6 +823,9 @@ make_method(M) when M == <<"PUT">>; M == <<"put">> -> put; make_method(M) when M == <<"GET">>; M == <<"get">> -> get; make_method(M) when M == <<"DELETE">>; M == <<"delete">> -> delete. +formalize_request(Method, Request) -> + formalize_request(Method, "/", Request). + formalize_request(Method, BasePath, {Path, Headers, _Body}) when Method =:= get; Method =:= delete -> diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl index 7f9418bd3..e1d0e9724 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl @@ -85,6 +85,16 @@ init_per_testcase(t_path_not_found, Config) -> ), ok = emqx_bridge_http_connector_test_server:set_handler(not_found_http_handler()), [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; +init_per_testcase(t_empty_path, Config) -> + HTTPPath = <<"/">>, + ServerSSLOpts = false, + {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link( + _Port = random, HTTPPath, ServerSSLOpts + ), + ok = emqx_bridge_http_connector_test_server:set_handler( + emqx_bridge_http_test_lib:success_http_handler() + ), + [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; init_per_testcase(t_too_many_requests, Config) -> HTTPPath = <<"/path">>, ServerSSLOpts = false, @@ -122,6 +132,7 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(TestCase, _Config) when TestCase =:= t_path_not_found; + TestCase =:= t_empty_path; TestCase =:= t_too_many_requests; TestCase =:= t_service_unavailable; TestCase =:= t_rule_action_expired; @@ -588,6 +599,45 @@ t_path_not_found(Config) -> ), ok. +t_empty_path(Config) -> + ?check_trace( + begin + #{port := Port, path := _Path} = ?config(http_server, Config), + MQTTTopic = <<"t/webhook">>, + BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{ + type => ?BRIDGE_TYPE, + name => ?BRIDGE_NAME, + local_topic => MQTTTopic, + port => Port, + path => <<"">> + }), + {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig), + Msg = emqx_message:make(MQTTTopic, <<"{}">>), + emqx:publish(Msg), + wait_http_request(), + ?retry( + _Interval = 500, + _NAttempts = 20, + ?assertMatch( + #{ + counters := #{ + matched := 1, + failed := 0, + success := 1 + } + }, + get_metrics(?BRIDGE_NAME) + ) + ), + ok + end, + fun(Trace) -> + ?assertEqual([], ?of_kind(http_will_retry_async, Trace)), + ok + end + ), + ok. + t_too_many_requests(Config) -> check_send_is_retried(Config). diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl index 94cc4c01a..becf221a4 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl @@ -32,11 +32,10 @@ wrap_auth_headers_test_() -> end, fun meck:unload/1, fun(_) -> Config = #{ - base_url => #{ + request_base => #{ scheme => http, host => "localhost", - port => 18083, - path => "/status" + port => 18083 }, connect_timeout => 1000, pool_type => random, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index 08d8c9ccc..b3c5767db 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_connector diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 65fbda936..78866ef79 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -44,11 +44,10 @@ -type config() :: #{ - base_url := #{ + request_base := #{ scheme := http | https, host := iolist(), - port := inet:port_number(), - path := _ + port := inet:port_number() }, connect_timeout := pos_integer(), pool_type := random | hash, @@ -60,7 +59,6 @@ -type state() :: #{ - base_path := _, connect_timeout := pos_integer(), pool_type := random | hash, channels := map(), @@ -245,10 +243,10 @@ on_stop(InstanceId, State) -> -spec on_get_status(manager_id(), state()) -> connected | connecting | {disconnected, state(), term()}. -on_get_status(InstanceId, #{base_path := BasePath} = State) -> +on_get_status(InstanceId, State) -> Func = fun(Worker, Timeout) -> Request = {?IOTDB_PING_PATH, [], undefined}, - NRequest = emqx_bridge_http_connector:formalize_request(get, BasePath, Request), + NRequest = emqx_bridge_http_connector:formalize_request(get, Request), Result0 = ehttpc:request(Worker, get, NRequest, Timeout), case emqx_bridge_http_connector:transform_result(Result0) of {ok, 200, _, Body} -> diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index f0848b599..c71afca60 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -281,17 +281,9 @@ parse_confs( } = Conf ) -> Url1 = bin(Url), - {BaseUrl, Path} = parse_url(Url1), - BaseUrl1 = - case emqx_http_lib:uri_parse(BaseUrl) of - {ok, BUrl} -> - BUrl; - {error, Reason} -> - Reason1 = emqx_utils:readable_error_msg(Reason), - invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>) - end, + {RequestBase, Path} = parse_url(Url1), Conf#{ - base_url => BaseUrl1, + request_base => RequestBase, request => #{ path => Path, @@ -324,16 +316,24 @@ connector_config(ConnectorType, Name, Config) -> end. parse_url(Url) -> - case string:split(Url, "//", leading) of - [Scheme, UrlRem] -> - case string:split(UrlRem, "/", leading) of - [HostPort, Path] -> - {iolist_to_binary([Scheme, "//", HostPort]), Path}; - [HostPort] -> - {iolist_to_binary([Scheme, "//", HostPort]), <<>>} - end; - [Url] -> - invalid_data(<<"Missing scheme in URL: ", Url/binary>>) + Parsed = emqx_utils_uri:parse(Url), + case Parsed of + #{scheme := undefined} -> + invalid_data(<<"Missing scheme in URL: ", Url/binary>>); + #{authority := undefined} -> + invalid_data(<<"Missing host in URL: ", Url/binary>>); + #{authority := #{userinfo := Userinfo}} when Userinfo =/= undefined -> + invalid_data(<<"Userinfo is not supported in URL: ", Url/binary>>); + #{fragment := Fragment} when Fragment =/= undefined -> + invalid_data(<<"Fragments are not supported in URL: ", Url/binary>>); + _ -> + case emqx_utils_uri:request_base(Parsed) of + {ok, Base} -> + {Base, emqx_maybe:define(emqx_utils_uri:path(Parsed), <<>>)}; + {error, Reason0} -> + Reason1 = emqx_utils:readable_error_msg(Reason0), + invalid_data(<<"Invalid URL: ", Url/binary, ", details: ", Reason1/binary>>) + end end. -spec invalid_data(binary()) -> no_return(). diff --git a/apps/emqx_utils/rebar.config b/apps/emqx_utils/rebar.config index 7aa4d34d0..3852ac87e 100644 --- a/apps/emqx_utils/rebar.config +++ b/apps/emqx_utils/rebar.config @@ -5,7 +5,8 @@ ]}. {deps, [ - {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}} + {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}, + {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}} ]}. {project_plugins, [erlfmt]}. diff --git a/apps/emqx_utils/src/emqx_utils.app.src b/apps/emqx_utils/src/emqx_utils.app.src index bac23cefb..959a3a37a 100644 --- a/apps/emqx_utils/src/emqx_utils.app.src +++ b/apps/emqx_utils/src/emqx_utils.app.src @@ -15,7 +15,8 @@ {applications, [ kernel, stdlib, - jiffy + jiffy, + emqx_http_lib ]}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx_utils/src/emqx_utils_uri.erl b/apps/emqx_utils/src/emqx_utils_uri.erl new file mode 100644 index 000000000..d36cd9050 --- /dev/null +++ b/apps/emqx_utils/src/emqx_utils_uri.erl @@ -0,0 +1,392 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% This module provides a loose parser for URIs. +%% The standard library's `uri_string' module is strict and does not allow +%% to parse invalid URIs, like templates: `http://example.com/${username}'. + +-module(emqx_utils_uri). + +-export([parse/1, format/1]). + +-export([ + scheme/1, + userinfo/1, + host/1, + port/1, + path/1, + query/1, + fragment/1, + base_url/1, + request_base/1 +]). + +-type scheme() :: binary(). +-type userinfo() :: binary(). +-type host() :: binary(). +-type port_number() :: inet:port_number(). +-type path() :: binary(). +-type query() :: binary(). +-type fragment() :: binary(). +-type request_base() :: #{ + scheme := http | https, + host := iolist(), + port := inet:port_number() +}. + +-type authority() :: #{ + userinfo := emqx_maybe:t(userinfo()), + host := host(), + %% Types: + %% ipv6: `\[[a-z\d:\.]*\]` — bracketed "ivp6-like" address + %% regular: `example.com` — arbitrary host not containg `:` which is forbidden in hostnames other than ipv6 + %% loose: non ipv6-like host containing `:`, probably invalid for a strictly valid URI + host_type := ipv6 | regular | loose, + port := emqx_maybe:t(port_number()) +}. + +-type uri() :: #{ + scheme := emqx_maybe:t(scheme()), + authority := emqx_maybe:t(authority()), + path := path(), + query := emqx_maybe:t(query()), + fragment := emqx_maybe:t(fragment()) +}. + +-export_type([ + scheme/0, + userinfo/0, + host/0, + port_number/0, + path/0, + query/0, + fragment/0, + authority/0, + uri/0, + request_base/0 +]). + +%%-------------------------------------------------------------------- +%% API +%%------------------------------------------------------------------- + +-spec parse(binary()) -> uri(). +parse(URIString) -> + {match, [SchemeMatch, AuthorityMatch, PathMatch, QueryMatch, FragmentMatch]} = re:run( + URIString, uri_regexp(), [{capture, [scheme, authority, path, query, fragment], binary}] + ), + Scheme = parse_scheme(SchemeMatch), + Authority = parse_authority(AuthorityMatch), + Path = PathMatch, + Query = parse_query(QueryMatch), + Fragment = parse_fragment(FragmentMatch), + + #{ + scheme => Scheme, + authority => Authority, + path => Path, + query => Query, + fragment => Fragment + }. + +-spec base_url(uri()) -> iodata(). +base_url(#{scheme := Scheme, authority := Authority}) -> + [format_scheme(Scheme), format_authority(Authority)]. + +-spec format(uri()) -> iodata(). +format(#{path := Path, query := Query, fragment := Fragment} = URI) -> + [ + base_url(URI), + Path, + format_query(Query), + format_fragment(Fragment) + ]. + +-spec scheme(uri()) -> emqx_maybe:t(scheme()). +scheme(#{scheme := Scheme}) -> Scheme. + +-spec userinfo(uri()) -> emqx_maybe:t(userinfo()). +userinfo(#{authority := undefined}) -> undefined; +userinfo(#{authority := #{userinfo := UserInfo}}) -> UserInfo. + +-spec host(uri()) -> emqx_maybe:t(host()). +host(#{authority := undefined}) -> undefined; +host(#{authority := #{host := Host}}) -> Host. + +-spec port(uri()) -> emqx_maybe:t(port_number()). +port(#{authority := undefined}) -> undefined; +port(#{authority := #{port := Port}}) -> Port. + +-spec path(uri()) -> path(). +path(#{path := Path}) -> Path. + +-spec query(uri()) -> emqx_maybe:t(query()). +query(#{query := Query}) -> Query. + +-spec fragment(uri()) -> emqx_maybe:t(fragment()). +fragment(#{fragment := Fragment}) -> Fragment. + +-spec request_base(uri()) -> {ok, request_base()} | {error, term()}. +request_base(URI) when is_map(URI) -> + case emqx_http_lib:uri_parse(iolist_to_binary(base_url(URI))) of + {error, Reason} -> {error, Reason}; + {ok, URIMap} -> {ok, maps:with([scheme, host, port], URIMap)} + end; +request_base(URIString) when is_list(URIString) orelse is_binary(URIString) -> + request_base(parse(URIString)). + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +parse_scheme(<<>>) -> undefined; +parse_scheme(Scheme) -> Scheme. + +parse_query(<<>>) -> undefined; +parse_query(<<$?, Query/binary>>) -> Query. + +parse_fragment(<<>>) -> undefined; +parse_fragment(<<$#, Fragment/binary>>) -> Fragment. + +-define(AUTHORITY_REGEX, + ("^(?.*@)?" + "(?:(?:\\[(?[a-z\\d\\.:]*)\\])|(?[^:]*?)|(?.*?))" + "(?:\\d+)?$") +). + +authority_regexp() -> + {ok, RE} = re:compile(?AUTHORITY_REGEX, [caseless]), + RE. + +parse_authority(<<>>) -> + undefined; +parse_authority(<<$/, $/, Authority/binary>>) -> + %% Authority regexp always matches + {match, [UserInfoMatch, HostIPv6, HostRegular, HostLoose, PortMatch]} = re:run( + Authority, authority_regexp(), [ + {capture, [userinfo, host_ipv6, host_regular, host_loose, port], binary} + ] + ), + UserInfo = parse_userinfo(UserInfoMatch), + {HostType, Host} = parse_host(HostIPv6, HostRegular, HostLoose), + Port = parse_port(PortMatch), + #{ + userinfo => UserInfo, + host => Host, + host_type => HostType, + port => Port + }. + +parse_userinfo(<<>>) -> undefined; +parse_userinfo(UserInfoMatch) -> binary:part(UserInfoMatch, 0, byte_size(UserInfoMatch) - 1). + +parse_host(<<>>, <<>>, Host) -> {loose, Host}; +parse_host(<<>>, Host, <<>>) -> {regular, Host}; +parse_host(Host, <<>>, <<>>) -> {ipv6, Host}. + +parse_port(<<>>) -> undefined; +parse_port(<<$:, Port/binary>>) -> binary_to_integer(Port). + +%% https://datatracker.ietf.org/doc/html/rfc3986#appendix-B +%% +%% > The following line is the regular expression for breaking-down a +%% > well-formed URI reference into its components. +%% +%% > ^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))? +%% +%% We skip capturing some unused parts of the regex. + +-define(URI_REGEX, + ("^(?:(?[^:/?#]+):)?(?//[^/?#]*)?" + "(?[^?#]*)(?\\?[^#]*)?(?#.*)?") +). + +uri_regexp() -> + {ok, RE} = re:compile(?URI_REGEX, [caseless]), + RE. + +format_scheme(undefined) -> <<>>; +format_scheme(Scheme) -> [Scheme, $:]. + +format_authority(undefined) -> + <<>>; +format_authority(#{userinfo := UserInfo, host := Host, host_type := HostType, port := Port}) -> + [$/, $/, format_userinfo(UserInfo), format_host(HostType, Host), format_port(Port)]. + +format_userinfo(undefined) -> <<>>; +format_userinfo(UserInfo) -> [UserInfo, $@]. + +format_host(ipv6, Host) -> [$[, Host, $]]; +format_host(_, Host) -> Host. + +format_port(undefined) -> <<>>; +format_port(Port) -> [$:, integer_to_binary(Port)]. + +format_query(undefined) -> <<>>; +format_query(Query) -> [$?, Query]. + +format_fragment(undefined) -> <<>>; +format_fragment(Fragment) -> [$#, Fragment]. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +-define(URLS, [ + "https://www.example.com/page", + "http://subdomain.example.com/path/to/page", + "https://www.example.com:8080/path/to/page", + "https://user:password@example.com/path/to/page", + "https://www.example.com/path%20with%20${spaces}", + "http://192.0.2.1/path/to/page", + "http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]/${path}/to/page", + "http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]/to/page", + "http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:4444/to/page", + "ftp://ftp.example.com/${path}/to/file", + "ftps://ftp.example.com/path/to/file", + "mailto:user@example.com", + "tel:+1234567890", + "sms:+1234567890?body=Hello%20World", + "git://github.com/user/repo.git", + "a:b:c", + "svn://svn.example.com/project/trunk", + "https://www.${example}.com/path/to/page?query_param=value", + "https://www.example.com/path/to/page?query_param1=value1&query_param2=value2", + "https://www.example.com?query_param1=value1&query_param2=value2", + "https://www.example.com/path/to/page#section1", + "https://www.example.com/path/to/page?query_param=value#section1", + "https://www.example.com/path/to/page?query_param1=value1&query_param2=${value2}#section1", + "https://www.example.com?query_param1=value1&query_param2=value2#section1", + "file:///path/to/file.txt", + "localhost", + "localhost:8080", + "localhost:8080/path/to/page", + "localhost:8080/path/to/page?query_param=value", + "localhost:8080/path/to/page?query_param1=value1&query_param2=value2", + "/abc/${def}", + "/abc/def?query_param=value", + "?query_param=value", + "#section1" +]). + +parse_format_test_() -> + [ + {URI, ?_assertEqual(list_to_binary(URI), iolist_to_binary(format(parse(URI))))} + || URI <- ?URLS + ]. + +base_url_test_() -> + [ + {URI, ?_assert(is_prefix(iolist_to_binary(base_url(parse(URI))), list_to_binary(URI)))} + || URI <- ?URLS + ]. + +scheme_test_() -> + [ + if_parseable_by_uri_string(URI, fun(Expected, Parsed) -> + ?assertEqual(maybe_get_bin(scheme, Expected), scheme(Parsed)) + end) + || URI <- ?URLS + ]. + +host_test_() -> + [ + if_parseable_by_uri_string(URI, fun(Expected, Parsed) -> + ?assertEqual(maybe_get_bin(host, Expected), host(Parsed)) + end) + || URI <- ?URLS + ]. + +path_test_() -> + [ + if_parseable_by_uri_string(URI, fun(Expected, Parsed) -> + ?assertEqual(maybe_get_bin(path, Expected), path(Parsed)) + end) + || URI <- ?URLS + ]. + +query_test_() -> + [ + if_parseable_by_uri_string(URI, fun(Expected, Parsed) -> + ?assertEqual(maybe_get_bin(query, Expected), query(Parsed)) + end) + || URI <- ?URLS + ]. + +fragment_test_() -> + [ + if_parseable_by_uri_string(URI, fun(Expected, Parsed) -> + ?assertEqual(maybe_get_bin(fragment, Expected), fragment(Parsed)) + end) + || URI <- ?URLS + ]. + +templates_test_() -> + [ + {"template in path", + ?_assertEqual( + <<"/${client_attrs.group}">>, + path(parse("https://www.example.com/${client_attrs.group}")) + )}, + {"template in query, no path", + ?_assertEqual( + <<"group=${client_attrs.group}">>, + query(parse("https://www.example.com?group=${client_attrs.group}")) + )}, + {"template in query, path", + ?_assertEqual( + <<"group=${client_attrs.group}">>, + query(parse("https://www.example.com/path/?group=${client_attrs.group}")) + )} + ]. + +request_target_test_() -> + [ + ?_assertEqual( + {ok, #{port => 443, scheme => https, host => "www.example.com"}}, + request_base(parse("https://www.example.com/path/to/page?query_param=value#fr")) + ), + ?_assertEqual( + {error, empty_host_not_allowed}, + request_base(parse("localhost?query_param=value#fr")) + ), + ?_assertEqual( + {error, {unsupported_scheme, <<"ftp">>}}, + request_base(parse("ftp://localhost")) + ) + ]. + +is_prefix(Prefix, Binary) -> + case Binary of + <> -> true; + _ -> false + end. + +if_parseable_by_uri_string(URI, Fun) -> + case uri_string:parse(URI) of + {error, _, _} -> + {"skipped", fun() -> true end}; + ExpectedMap -> + ParsedMap = parse(URI), + {URI, fun() -> Fun(ExpectedMap, ParsedMap) end} + end. + +maybe_get_bin(Key, Map) -> + maybe_bin(maps:get(Key, Map, undefined)). + +maybe_bin(String) when is_list(String) -> list_to_binary(String); +maybe_bin(undefined) -> undefined. + +-endif. diff --git a/changes/ce/fix-13273.en.md b/changes/ce/fix-13273.en.md new file mode 100644 index 000000000..c917cebb1 --- /dev/null +++ b/changes/ce/fix-13273.en.md @@ -0,0 +1,4 @@ +Fixed and improved handling of URIs in several configurations. +Previously, +* In authentication or authorization configurations, valid pathless URIs (`https://example.com?q=x`) were not accepted as valid. +* In bridge connectors, some kinds of URIs that couldn't be correctly handled were nevertheless accepted. E.g., URIs with user info or fragment parts.