Merge pull request #12971 from savonarola/0502-fix-url-parse

fix(auth,http): improve URI handling
This commit is contained in:
Ilia Averianov 2024-05-06 13:41:10 +03:00 committed by GitHub
commit e19222fc0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 667 additions and 149 deletions

View File

@ -7,7 +7,8 @@
{applications, [ {applications, [
kernel, kernel,
stdlib, stdlib,
emqx emqx,
emqx_utils
]}, ]},
{mod, {emqx_auth_app, []}}, {mod, {emqx_auth_app, []}},
{env, []}, {env, []},

View File

@ -0,0 +1,78 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_auth_utils).
%% TODO
%% Move more identical authn and authz helpers here
-export([parse_url/1]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec parse_url(binary()) ->
{_Base :: emqx_utils_uri:request_base(), _Path :: binary(), _Query :: binary()}.
parse_url(Url) ->
Parsed = emqx_utils_uri:parse(Url),
case Parsed of
#{scheme := undefined} ->
throw({invalid_url, {no_scheme, Url}});
#{authority := undefined} ->
throw({invalid_url, {no_host, Url}});
#{authority := #{userinfo := Userinfo}} when Userinfo =/= undefined ->
throw({invalid_url, {userinfo_not_supported, Url}});
#{fragment := Fragment} when Fragment =/= undefined ->
throw({invalid_url, {fragments_not_supported, Url}});
_ ->
case emqx_utils_uri:request_base(Parsed) of
{ok, Base} ->
{Base, emqx_utils_uri:path(Parsed),
emqx_maybe:define(emqx_utils_uri:query(Parsed), <<>>)};
{error, Reason} ->
throw({invalid_url, {invalid_base, Reason, Url}})
end
end.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
templates_test_() ->
[
?_assertEqual(
{
#{port => 80, scheme => http, host => "example.com"},
<<"">>,
<<"client=${clientid}">>
},
parse_url(<<"http://example.com?client=${clientid}">>)
),
?_assertEqual(
{
#{port => 80, scheme => http, host => "example.com"},
<<"/path">>,
<<"client=${clientid}">>
},
parse_url(<<"http://example.com/path?client=${clientid}">>)
),
?_assertEqual(
{#{port => 80, scheme => http, host => "example.com"}, <<"/path">>, <<>>},
parse_url(<<"http://example.com/path">>)
)
].
-endif.

View File

@ -40,7 +40,6 @@
make_resource_id/1, make_resource_id/1,
without_password/1, without_password/1,
to_bool/1, to_bool/1,
parse_url/1,
convert_headers/1, convert_headers/1,
convert_headers_no_content_type/1, convert_headers_no_content_type/1,
default_headers/0, default_headers/0,
@ -290,25 +289,6 @@ to_bool(MaybeBinInt) when is_binary(MaybeBinInt) ->
to_bool(_) -> to_bool(_) ->
false. false.
parse_url(Url) ->
case string:split(Url, "//", leading) of
[Scheme, UrlRem] ->
case string:split(UrlRem, "/", leading) of
[HostPort, Remaining] ->
BaseUrl = iolist_to_binary([Scheme, "//", HostPort]),
case string:split(Remaining, "?", leading) of
[Path, QueryString] ->
{BaseUrl, <<"/", Path/binary>>, QueryString};
[Path] ->
{BaseUrl, <<"/", Path/binary>>, <<>>}
end;
[HostPort] ->
{iolist_to_binary([Scheme, "//", HostPort]), <<>>, <<>>}
end;
[Url] ->
throw({invalid_url, Url})
end.
convert_headers(Headers) -> convert_headers(Headers) ->
transform_header_name(Headers). transform_header_name(Headers).

View File

@ -134,21 +134,25 @@ parse_config(
request_timeout := RequestTimeout request_timeout := RequestTimeout
} = Config } = Config
) -> ) ->
{BaseUrl0, Path, Query} = emqx_authn_utils:parse_url(RawUrl), {RequestBase, Path, Query} = emqx_auth_utils:parse_url(RawUrl),
{ok, BaseUrl} = emqx_http_lib:uri_parse(BaseUrl0),
State = #{ State = #{
method => Method, method => Method,
path => Path, path => Path,
headers => ensure_header_name_type(Headers), headers => ensure_header_name_type(Headers),
base_path_template => emqx_authn_utils:parse_str(Path), base_path_template => emqx_authn_utils:parse_str(Path),
base_query_template => emqx_authn_utils:parse_deep( base_query_template => emqx_authn_utils:parse_deep(
cow_qs:parse_qs(to_bin(Query)) cow_qs:parse_qs(Query)
), ),
body_template => emqx_authn_utils:parse_deep(maps:get(body, Config, #{})), body_template => emqx_authn_utils:parse_deep(maps:get(body, Config, #{})),
request_timeout => RequestTimeout, request_timeout => RequestTimeout,
url => RawUrl url => RawUrl
}, },
{ok, Config#{base_url => BaseUrl, pool_type => random}, State}. {ok,
Config#{
request_base => RequestBase,
pool_type => random
},
State}.
generate_request(Credential, #{ generate_request(Credential, #{
method := Method, method := Method,
@ -244,14 +248,14 @@ request_for_log(Credential, #{url := Url, method := Method} = State) ->
{PathQuery, Headers} -> {PathQuery, Headers} ->
#{ #{
method => Method, method => Method,
base_url => Url, url => Url,
path_query => PathQuery, path_query => PathQuery,
headers => Headers headers => Headers
}; };
{PathQuery, Headers, Body} -> {PathQuery, Headers, Body} ->
#{ #{
method => Method, method => Method,
base_url => Url, url => Url,
path_query => PathQuery, path_query => PathQuery,
headers => Headers, headers => Headers,
body => Body body => Body
@ -272,11 +276,6 @@ to_list(B) when is_binary(B) ->
to_list(L) when is_list(L) -> to_list(L) when is_list(L) ->
L. L.
to_bin(B) when is_binary(B) ->
B;
to_bin(L) when is_list(L) ->
list_to_binary(L).
ensure_header_name_type(Headers) -> ensure_header_name_type(Headers) ->
Fun = fun Fun = fun
(Key, _Val, Acc) when is_binary(Key) -> (Key, _Val, Acc) when is_binary(Key) ->

View File

@ -29,8 +29,7 @@
update/1, update/1,
destroy/1, destroy/1,
authorize/4, authorize/4,
merge_defaults/1, merge_defaults/1
parse_url/1
]). ]).
-ifdef(TEST). -ifdef(TEST).
@ -152,15 +151,14 @@ parse_config(
request_timeout := ReqTimeout request_timeout := ReqTimeout
} = Conf } = Conf
) -> ) ->
{BaseUrl0, Path, Query} = parse_url(RawUrl), {RequestBase, Path, Query} = emqx_auth_utils:parse_url(RawUrl),
{ok, BaseUrl} = emqx_http_lib:uri_parse(BaseUrl0),
Conf#{ Conf#{
method => Method, method => Method,
base_url => BaseUrl, request_base => RequestBase,
headers => Headers, headers => Headers,
base_path_template => emqx_authz_utils:parse_str(Path, allowed_vars()), base_path_template => emqx_authz_utils:parse_str(Path, allowed_vars()),
base_query_template => emqx_authz_utils:parse_deep( base_query_template => emqx_authz_utils:parse_deep(
cow_qs:parse_qs(to_bin(Query)), cow_qs:parse_qs(Query),
allowed_vars() allowed_vars()
), ),
body_template => emqx_authz_utils:parse_deep( body_template => emqx_authz_utils:parse_deep(
@ -172,25 +170,6 @@ parse_config(
pool_type => random pool_type => random
}. }.
parse_url(Url) ->
case string:split(Url, "//", leading) of
[Scheme, UrlRem] ->
case string:split(UrlRem, "/", leading) of
[HostPort, Remaining] ->
BaseUrl = iolist_to_binary([Scheme, "//", HostPort]),
case string:split(Remaining, "?", leading) of
[Path, QueryString] ->
{BaseUrl, <<"/", Path/binary>>, QueryString};
[Path] ->
{BaseUrl, <<"/", Path/binary>>, <<>>}
end;
[HostPort] ->
{iolist_to_binary([Scheme, "//", HostPort]), <<>>, <<>>}
end;
[Url] ->
throw({invalid_url, Url})
end.
generate_request( generate_request(
Action, Action,
Topic, Topic,
@ -257,10 +236,6 @@ to_list(B) when is_binary(B) ->
to_list(L) when is_list(L) -> to_list(L) when is_list(L) ->
L. L.
to_bin(B) when is_binary(B) -> B;
to_bin(L) when is_list(L) -> list_to_binary(L);
to_bin(X) -> X.
allowed_vars() -> allowed_vars() ->
allowed_vars(emqx_authz:feature_available(rich_actions)). allowed_vars(emqx_authz:feature_available(rich_actions)).

View File

@ -111,7 +111,11 @@ t_create_invalid(_Config) ->
InvalidConfigs = InvalidConfigs =
[ [
AuthConfig#{<<"headers">> => []}, AuthConfig#{<<"headers">> => []},
AuthConfig#{<<"method">> => <<"delete">>} AuthConfig#{<<"method">> => <<"delete">>},
AuthConfig#{<<"url">> => <<"localhost">>},
AuthConfig#{<<"url">> => <<"http://foo.com/xxx#fragment">>},
AuthConfig#{<<"url">> => <<"http://${foo}.com/xxx">>},
AuthConfig#{<<"url">> => <<"//foo.com/xxx">>}
], ],
lists:foreach( lists:foreach(

View File

@ -564,6 +564,8 @@ t_create_replace(_Config) ->
listener => {tcp, default} listener => {tcp, default}
}, },
ValidConfig = raw_http_authz_config(),
%% Create with valid URL %% Create with valid URL
ok = setup_handler_and_config( ok = setup_handler_and_config(
fun(Req0, State) -> fun(Req0, State) ->
@ -581,13 +583,10 @@ t_create_replace(_Config) ->
), ),
%% Changing to valid config %% Changing to valid config
OkConfig = maps:merge( OkConfig = ValidConfig#{
raw_http_authz_config(), <<"url">> =>
#{ <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>
<<"url">> => },
<<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>
}
),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
@ -597,6 +596,37 @@ t_create_replace(_Config) ->
?assertEqual( ?assertEqual(
allow, allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH, <<"t">>) emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH, <<"t">>)
),
?assertMatch(
{error, _},
emqx_authz:update({?CMD_REPLACE, http}, ValidConfig#{
<<"url">> => <<"localhost">>
})
),
?assertMatch(
{error, _},
emqx_authz:update({?CMD_REPLACE, http}, ValidConfig#{
<<"url">> => <<"//foo.bar/x/y?q=z">>
})
),
?assertMatch(
{error, _},
emqx_authz:update({?CMD_REPLACE, http}, ValidConfig#{
<<"url">> =>
<<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}#fragment">>
})
).
t_uri_normalization(_Config) ->
ok = emqx_authz_test_lib:setup_config(
raw_http_authz_config(),
#{
<<"url">> =>
<<"http://127.0.0.1:33333?topic=${topic}&action=${action}">>
}
). ).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -336,21 +336,13 @@ parse_confs(
} = Conf } = Conf
) -> ) ->
Url1 = bin(Url), Url1 = bin(Url),
{BaseUrl, Path} = parse_url(Url1), {RequestBase, Path} = parse_url(Url1),
BaseUrl1 =
case emqx_http_lib:uri_parse(BaseUrl) of
{ok, BUrl} ->
BUrl;
{error, Reason} ->
Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end,
RequestTTL = emqx_utils_maps:deep_get( RequestTTL = emqx_utils_maps:deep_get(
[resource_opts, request_ttl], [resource_opts, request_ttl],
Conf Conf
), ),
Conf#{ Conf#{
base_url => BaseUrl1, request_base => RequestBase,
request => request =>
#{ #{
path => Path, path => Path,
@ -422,16 +414,24 @@ connector_config(BridgeType, Config) ->
end. end.
parse_url(Url) -> parse_url(Url) ->
case string:split(Url, "//", leading) of Parsed = emqx_utils_uri:parse(Url),
[Scheme, UrlRem] -> case Parsed of
case string:split(UrlRem, "/", leading) of #{scheme := undefined} ->
[HostPort, Path] -> invalid_data(<<"Missing scheme in URL: ", Url/binary>>);
{iolist_to_binary([Scheme, "//", HostPort]), Path}; #{authority := undefined} ->
[HostPort] -> invalid_data(<<"Missing host in URL: ", Url/binary>>);
{iolist_to_binary([Scheme, "//", HostPort]), <<>>} #{authority := #{userinfo := Userinfo}} when Userinfo =/= undefined ->
end; invalid_data(<<"Userinfo is not supported in URL: ", Url/binary>>);
[Url] -> #{fragment := Fragment} when Fragment =/= undefined ->
invalid_data(<<"Missing scheme in URL: ", Url/binary>>) invalid_data(<<"Fragments are not supported in URL: ", Url/binary>>);
_ ->
case emqx_utils_uri:request_base(Parsed) of
{ok, Base} ->
{Base, emqx_maybe:define(emqx_utils_uri:path(Parsed), <<>>)};
{error, Reason0} ->
Reason1 = emqx_utils:readable_error_msg(Reason0),
invalid_data(<<"Invalid URL: ", Url/binary, ", details: ", Reason1/binary>>)
end
end. end.
bin(Bin) when is_binary(Bin) -> Bin; bin(Bin) when is_binary(Bin) -> Bin;

View File

@ -85,7 +85,7 @@ on_start(
config => redact(Config) config => redact(Config)
}), }),
{Schema, Server, DefaultPort} = get_host_info(to_str(Url)), {Scheme, Server, DefaultPort} = get_host_info(to_str(Url)),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{ #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{
default_port => DefaultPort default_port => DefaultPort
}), }),
@ -96,7 +96,7 @@ on_start(
port => Port, port => Port,
aws_access_key_id => to_str(AccessKeyID), aws_access_key_id => to_str(AccessKeyID),
aws_secret_access_key => SecretAccessKey, aws_secret_access_key => SecretAccessKey,
schema => Schema scheme => Scheme
}}, }},
{pool_size, PoolSize} {pool_size, PoolSize}
], ],

View File

@ -61,11 +61,11 @@ init(#{
aws_secret_access_key := Secret, aws_secret_access_key := Secret,
host := Host, host := Host,
port := Port, port := Port,
schema := Schema scheme := Scheme
}) -> }) ->
%% TODO: teach `erlcloud` to to accept 0-arity closures as passwords. %% TODO: teach `erlcloud` to to accept 0-arity closures as passwords.
SecretAccessKey = to_str(emqx_secret:unwrap(Secret)), SecretAccessKey = to_str(emqx_secret:unwrap(Secret)),
erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema), erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Scheme),
{ok, #{}}. {ok, #{}}.
handle_call(is_connected, _From, State) -> handle_call(is_connected, _From, State) ->

View File

@ -43,11 +43,10 @@
-type config() :: -type config() ::
#{ #{
base_url := #{ request_base := #{
scheme := http | https, scheme := http | https,
host := iolist(), host := iolist(),
port := inet:port_number(), port := inet:port_number()
path := _
}, },
connect_timeout := pos_integer(), connect_timeout := pos_integer(),
pool_type := random | hash, pool_type := random | hash,
@ -58,7 +57,6 @@
-type state() :: -type state() ::
#{ #{
base_path := _,
connect_timeout := pos_integer(), connect_timeout := pos_integer(),
pool_type := random | hash, pool_type := random | hash,
channels := map(), channels := map(),

View File

@ -98,15 +98,6 @@ validate_webhook_url(undefined) ->
required_field => <<"url">> required_field => <<"url">>
}); });
validate_webhook_url(Url) -> validate_webhook_url(Url) ->
{BaseUrl, _Path} = emqx_connector_resource:parse_url(Url), %% parse_url throws if the URL is invalid
case emqx_http_lib:uri_parse(BaseUrl) of {_RequestBase, _Path} = emqx_connector_resource:parse_url(Url),
{ok, _} -> ok.
ok;
{error, Reason} ->
throw(#{
kind => validation_error,
reason => invalid_url,
url => Url,
error => emqx_utils:readable_error_msg(Reason)
})
end.

View File

@ -51,7 +51,7 @@
%% for other http-like connectors. %% for other http-like connectors.
-export([redact_request/1]). -export([redact_request/1]).
-export([validate_method/1, join_paths/2, formalize_request/3, transform_result/1]). -export([validate_method/1, join_paths/2, formalize_request/2, transform_result/1]).
-define(DEFAULT_PIPELINE_SIZE, 100). -define(DEFAULT_PIPELINE_SIZE, 100).
-define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000). -define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000).
@ -187,11 +187,10 @@ callback_mode() -> async_if_possible.
on_start( on_start(
InstId, InstId,
#{ #{
base_url := #{ request_base := #{
scheme := Scheme, scheme := Scheme,
host := Host, host := Host,
port := Port, port := Port
path := BasePath
}, },
connect_timeout := ConnectTimeout, connect_timeout := ConnectTimeout,
pool_type := PoolType, pool_type := PoolType,
@ -225,13 +224,13 @@ on_start(
{transport_opts, NTransportOpts}, {transport_opts, NTransportOpts},
{enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)} {enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)}
], ],
State = #{ State = #{
pool_name => InstId, pool_name => InstId,
pool_type => PoolType, pool_type => PoolType,
host => Host, host => Host,
port => Port, port => Port,
connect_timeout => ConnectTimeout, connect_timeout => ConnectTimeout,
base_path => BasePath,
request => preprocess_request(maps:get(request, Config, undefined)) request => preprocess_request(maps:get(request, Config, undefined))
}, },
case start_pool(InstId, PoolOpts) of case start_pool(InstId, PoolOpts) of
@ -359,7 +358,7 @@ on_query(InstId, {Method, Request, Timeout}, State) ->
on_query( on_query(
InstId, InstId,
{ActionId, KeyOrNum, Method, Request, Timeout, Retry}, {ActionId, KeyOrNum, Method, Request, Timeout, Retry},
#{base_path := BasePath, host := Host} = State #{host := Host} = State
) -> ) ->
?TRACE( ?TRACE(
"QUERY", "QUERY",
@ -372,7 +371,7 @@ on_query(
state => redact(State) state => redact(State)
} }
), ),
NRequest = formalize_request(Method, BasePath, Request), NRequest = formalize_request(Method, Request),
trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout), trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout),
Worker = resolve_pool_worker(State, KeyOrNum), Worker = resolve_pool_worker(State, KeyOrNum),
Result0 = ehttpc:request( Result0 = ehttpc:request(
@ -469,7 +468,7 @@ on_query_async(
InstId, InstId,
{ActionId, KeyOrNum, Method, Request, Timeout}, {ActionId, KeyOrNum, Method, Request, Timeout},
ReplyFunAndArgs, ReplyFunAndArgs,
#{base_path := BasePath, host := Host} = State #{host := Host} = State
) -> ) ->
Worker = resolve_pool_worker(State, KeyOrNum), Worker = resolve_pool_worker(State, KeyOrNum),
?TRACE( ?TRACE(
@ -482,7 +481,7 @@ on_query_async(
state => redact(State) state => redact(State)
} }
), ),
NRequest = formalize_request(Method, BasePath, Request), NRequest = formalize_request(Method, Request),
trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout), trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout),
MaxAttempts = maps:get(max_attempts, State, 3), MaxAttempts = maps:get(max_attempts, State, 3),
Context = #{ Context = #{
@ -765,6 +764,9 @@ make_method(M) when M == <<"PUT">>; M == <<"put">> -> put;
make_method(M) when M == <<"GET">>; M == <<"get">> -> get; make_method(M) when M == <<"GET">>; M == <<"get">> -> get;
make_method(M) when M == <<"DELETE">>; M == <<"delete">> -> delete. make_method(M) when M == <<"DELETE">>; M == <<"delete">> -> delete.
formalize_request(Method, Request) ->
formalize_request(Method, "/", Request).
formalize_request(Method, BasePath, {Path, Headers, _Body}) when formalize_request(Method, BasePath, {Path, Headers, _Body}) when
Method =:= get; Method =:= delete Method =:= get; Method =:= delete
-> ->

View File

@ -85,6 +85,16 @@ init_per_testcase(t_path_not_found, Config) ->
), ),
ok = emqx_bridge_http_connector_test_server:set_handler(not_found_http_handler()), ok = emqx_bridge_http_connector_test_server:set_handler(not_found_http_handler()),
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
init_per_testcase(t_empty_path, Config) ->
HTTPPath = <<"/">>,
ServerSSLOpts = false,
{ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
_Port = random, HTTPPath, ServerSSLOpts
),
ok = emqx_bridge_http_connector_test_server:set_handler(
emqx_bridge_http_test_lib:success_http_handler()
),
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
init_per_testcase(t_too_many_requests, Config) -> init_per_testcase(t_too_many_requests, Config) ->
HTTPPath = <<"/path">>, HTTPPath = <<"/path">>,
ServerSSLOpts = false, ServerSSLOpts = false,
@ -122,6 +132,7 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(TestCase, _Config) when end_per_testcase(TestCase, _Config) when
TestCase =:= t_path_not_found; TestCase =:= t_path_not_found;
TestCase =:= t_empty_path;
TestCase =:= t_too_many_requests; TestCase =:= t_too_many_requests;
TestCase =:= t_service_unavailable; TestCase =:= t_service_unavailable;
TestCase =:= t_rule_action_expired; TestCase =:= t_rule_action_expired;
@ -588,6 +599,45 @@ t_path_not_found(Config) ->
), ),
ok. ok.
t_empty_path(Config) ->
?check_trace(
begin
#{port := Port, path := _Path} = ?config(http_server, Config),
MQTTTopic = <<"t/webhook">>,
BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{
type => ?BRIDGE_TYPE,
name => ?BRIDGE_NAME,
local_topic => MQTTTopic,
port => Port,
path => <<"">>
}),
{ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig),
Msg = emqx_message:make(MQTTTopic, <<"{}">>),
emqx:publish(Msg),
wait_http_request(),
?retry(
_Interval = 500,
_NAttempts = 20,
?assertMatch(
#{
counters := #{
matched := 1,
failed := 0,
success := 1
}
},
get_metrics(?BRIDGE_NAME)
)
),
ok
end,
fun(Trace) ->
?assertEqual([], ?of_kind(http_will_retry_async, Trace)),
ok
end
),
ok.
t_too_many_requests(Config) -> t_too_many_requests(Config) ->
check_send_is_retried(Config). check_send_is_retried(Config).

View File

@ -32,11 +32,10 @@ wrap_auth_headers_test_() ->
end, end,
fun meck:unload/1, fun(_) -> fun meck:unload/1, fun(_) ->
Config = #{ Config = #{
base_url => #{ request_base => #{
scheme => http, scheme => http,
host => "localhost", host => "localhost",
port => 18083, port => 18083
path => "/status"
}, },
connect_timeout => 1000, connect_timeout => 1000,
pool_type => random, pool_type => random,

View File

@ -43,11 +43,10 @@
-type config() :: -type config() ::
#{ #{
base_url := #{ request_base := #{
scheme := http | https, scheme := http | https,
host := iolist(), host := iolist(),
port := inet:port_number(), port := inet:port_number()
path := _
}, },
connect_timeout := pos_integer(), connect_timeout := pos_integer(),
pool_type := random | hash, pool_type := random | hash,
@ -59,7 +58,6 @@
-type state() :: -type state() ::
#{ #{
base_path := _,
connect_timeout := pos_integer(), connect_timeout := pos_integer(),
pool_type := random | hash, pool_type := random | hash,
channels := map(), channels := map(),
@ -244,10 +242,10 @@ on_stop(InstanceId, State) ->
-spec on_get_status(manager_id(), state()) -> -spec on_get_status(manager_id(), state()) ->
connected | connecting | {disconnected, state(), term()}. connected | connecting | {disconnected, state(), term()}.
on_get_status(InstanceId, #{base_path := BasePath} = State) -> on_get_status(InstanceId, State) ->
Func = fun(Worker, Timeout) -> Func = fun(Worker, Timeout) ->
Request = {?IOTDB_PING_PATH, [], undefined}, Request = {?IOTDB_PING_PATH, [], undefined},
NRequest = emqx_bridge_http_connector:formalize_request(get, BasePath, Request), NRequest = emqx_bridge_http_connector:formalize_request(get, Request),
Result0 = ehttpc:request(Worker, get, NRequest, Timeout), Result0 = ehttpc:request(Worker, get, NRequest, Timeout),
case emqx_bridge_http_connector:transform_result(Result0) of case emqx_bridge_http_connector:transform_result(Result0) of
{ok, 200, _, Body} -> {ok, 200, _, Body} ->

View File

@ -281,17 +281,9 @@ parse_confs(
} = Conf } = Conf
) -> ) ->
Url1 = bin(Url), Url1 = bin(Url),
{BaseUrl, Path} = parse_url(Url1), {RequestBase, Path} = parse_url(Url1),
BaseUrl1 =
case emqx_http_lib:uri_parse(BaseUrl) of
{ok, BUrl} ->
BUrl;
{error, Reason} ->
Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end,
Conf#{ Conf#{
base_url => BaseUrl1, request_base => RequestBase,
request => request =>
#{ #{
path => Path, path => Path,
@ -324,16 +316,24 @@ connector_config(ConnectorType, Name, Config) ->
end. end.
parse_url(Url) -> parse_url(Url) ->
case string:split(Url, "//", leading) of Parsed = emqx_utils_uri:parse(Url),
[Scheme, UrlRem] -> case Parsed of
case string:split(UrlRem, "/", leading) of #{scheme := undefined} ->
[HostPort, Path] -> invalid_data(<<"Missing scheme in URL: ", Url/binary>>);
{iolist_to_binary([Scheme, "//", HostPort]), Path}; #{authority := undefined} ->
[HostPort] -> invalid_data(<<"Missing host in URL: ", Url/binary>>);
{iolist_to_binary([Scheme, "//", HostPort]), <<>>} #{authority := #{userinfo := Userinfo}} when Userinfo =/= undefined ->
end; invalid_data(<<"Userinfo is not supported in URL: ", Url/binary>>);
[Url] -> #{fragment := Fragment} when Fragment =/= undefined ->
invalid_data(<<"Missing scheme in URL: ", Url/binary>>) invalid_data(<<"Fragments are not supported in URL: ", Url/binary>>);
_ ->
case emqx_utils_uri:request_base(Parsed) of
{ok, Base} ->
{Base, emqx_maybe:define(emqx_utils_uri:path(Parsed), <<>>)};
{error, Reason0} ->
Reason1 = emqx_utils:readable_error_msg(Reason0),
invalid_data(<<"Invalid URL: ", Url/binary, ", details: ", Reason1/binary>>)
end
end. end.
-spec invalid_data(binary()) -> no_return(). -spec invalid_data(binary()) -> no_return().

View File

@ -5,7 +5,8 @@
]}. ]}.
{deps, [ {deps, [
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}} {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}
]}. ]}.
{project_plugins, [erlfmt]}. {project_plugins, [erlfmt]}.

View File

@ -15,7 +15,8 @@
{applications, [ {applications, [
kernel, kernel,
stdlib, stdlib,
jiffy jiffy,
emqx_http_lib
]}, ]},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -0,0 +1,407 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% This module provides a loose parser for URIs.
%% The standard library's `uri_string' module is strict and does not allow
%% to parse invalid URIs, like templates: `http://example.com/${username}'.
-module(emqx_utils_uri).
-export([parse/1, format/1]).
-export([
scheme/1,
userinfo/1,
host/1,
port/1,
path/1,
query/1,
fragment/1,
base_url/1,
request_base/1
]).
-type scheme() :: binary().
-type userinfo() :: binary().
-type host() :: binary().
-type port_number() :: inet:port_number().
-type path() :: binary().
-type query() :: binary().
-type fragment() :: binary().
-type request_base() :: #{
scheme := http | https,
host := iolist(),
port := inet:port_number()
}.
-type authority() :: #{
userinfo := emqx_maybe:t(userinfo()),
host := host(),
%% Types:
%% ipv6: `\[[a-z\d:\.]*\]` bracketed "ivp6-like" address
%% regular: `example.com` arbitrary host not containg `:` which is forbidden in hostnames other than ipv6
%% loose: non ipv6-like host containing `:`, probably invalid for a strictly valid URI
host_type := ipv6 | regular | loose,
port := emqx_maybe:t(port_number())
}.
-type uri() :: #{
scheme := emqx_maybe:t(scheme()),
authority := emqx_maybe:t(authority()),
path := path(),
query := emqx_maybe:t(query()),
fragment := emqx_maybe:t(fragment())
}.
-export_type([
scheme/0,
userinfo/0,
host/0,
port_number/0,
path/0,
query/0,
fragment/0,
authority/0,
uri/0,
request_base/0
]).
-on_load(init/0).
%% https://datatracker.ietf.org/doc/html/rfc3986#appendix-B
%%
%% > The following line is the regular expression for breaking-down a
%% > well-formed URI reference into its components.
%%
%% > ^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?
%%
%% We skip capturing some unused parts of the regex.
-define(URI_REGEX,
("^(?:(?<scheme>[^:/?#]+):)?(?<authority>//[^/?#]*)?"
"(?<path>[^?#]*)(?<query>\\?[^#]*)?(?<fragment>#.*)?")
).
-define(URI_REGEX_PT_KEY, {?MODULE, uri_re}).
-define(AUTHORITY_REGEX,
("^(?<userinfo>.*@)?"
"(?:(?:\\[(?<host_ipv6>[a-z\\d\\.:]*)\\])|(?<host_regular>[^:]*?)|(?<host_loose>.*?))"
"(?<port>:\\d+)?$")
).
-define(AUTHORITY_REGEX_PT_KEY, {?MODULE, authority_re}).
%%-------------------------------------------------------------------
%% Internal API
%%-------------------------------------------------------------------
init() ->
{ok, UriRE} = re:compile(?URI_REGEX),
persistent_term:put(?URI_REGEX_PT_KEY, UriRE),
{ok, AuthorityRE} = re:compile(?AUTHORITY_REGEX, [caseless]),
persistent_term:put(?AUTHORITY_REGEX_PT_KEY, AuthorityRE).
%%-------------------------------------------------------------------
%% API
%%-------------------------------------------------------------------
-spec parse(binary()) -> uri().
parse(URIString) ->
{match, [SchemeMatch, AuthorityMatch, PathMatch, QueryMatch, FragmentMatch]} = re:run(
URIString, uri_regexp(), [{capture, [scheme, authority, path, query, fragment], binary}]
),
Scheme = parse_scheme(SchemeMatch),
Authority = parse_authority(AuthorityMatch),
Path = PathMatch,
Query = parse_query(QueryMatch),
Fragment = parse_fragment(FragmentMatch),
#{
scheme => Scheme,
authority => Authority,
path => Path,
query => Query,
fragment => Fragment
}.
-spec base_url(uri()) -> iodata().
base_url(#{scheme := Scheme, authority := Authority}) ->
[format_scheme(Scheme), format_authority(Authority)].
-spec format(uri()) -> iodata().
format(#{path := Path, query := Query, fragment := Fragment} = URI) ->
[
base_url(URI),
Path,
format_query(Query),
format_fragment(Fragment)
].
-spec scheme(uri()) -> emqx_maybe:t(scheme()).
scheme(#{scheme := Scheme}) -> Scheme.
-spec userinfo(uri()) -> emqx_maybe:t(userinfo()).
userinfo(#{authority := undefined}) -> undefined;
userinfo(#{authority := #{userinfo := UserInfo}}) -> UserInfo.
-spec host(uri()) -> emqx_maybe:t(host()).
host(#{authority := undefined}) -> undefined;
host(#{authority := #{host := Host}}) -> Host.
-spec port(uri()) -> emqx_maybe:t(port_number()).
port(#{authority := undefined}) -> undefined;
port(#{authority := #{port := Port}}) -> Port.
-spec path(uri()) -> path().
path(#{path := Path}) -> Path.
-spec query(uri()) -> emqx_maybe:t(query()).
query(#{query := Query}) -> Query.
-spec fragment(uri()) -> emqx_maybe:t(fragment()).
fragment(#{fragment := Fragment}) -> Fragment.
-spec request_base(uri()) -> {ok, request_base()} | {error, term()}.
request_base(URI) when is_map(URI) ->
case emqx_http_lib:uri_parse(iolist_to_binary(base_url(URI))) of
{error, Reason} -> {error, Reason};
{ok, URIMap} -> {ok, maps:with([scheme, host, port], URIMap)}
end;
request_base(URIString) when is_list(URIString) orelse is_binary(URIString) ->
request_base(parse(URIString)).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
parse_scheme(<<>>) -> undefined;
parse_scheme(Scheme) -> Scheme.
parse_query(<<>>) -> undefined;
parse_query(<<$?, Query/binary>>) -> Query.
parse_fragment(<<>>) -> undefined;
parse_fragment(<<$#, Fragment/binary>>) -> Fragment.
authority_regexp() ->
persistent_term:get(?AUTHORITY_REGEX_PT_KEY).
parse_authority(<<>>) ->
undefined;
parse_authority(<<$/, $/, Authority/binary>>) ->
%% Authority regexp always matches
{match, [UserInfoMatch, HostIPv6, HostRegular, HostLoose, PortMatch]} = re:run(
Authority, authority_regexp(), [
{capture, [userinfo, host_ipv6, host_regular, host_loose, port], binary}
]
),
UserInfo = parse_userinfo(UserInfoMatch),
{HostType, Host} = parse_host(HostIPv6, HostRegular, HostLoose),
Port = parse_port(PortMatch),
#{
userinfo => UserInfo,
host => Host,
host_type => HostType,
port => Port
}.
parse_userinfo(<<>>) -> undefined;
parse_userinfo(UserInfoMatch) -> binary:part(UserInfoMatch, 0, byte_size(UserInfoMatch) - 1).
parse_host(<<>>, <<>>, Host) -> {loose, Host};
parse_host(<<>>, Host, <<>>) -> {regular, Host};
parse_host(Host, <<>>, <<>>) -> {ipv6, Host}.
parse_port(<<>>) -> undefined;
parse_port(<<$:, Port/binary>>) -> binary_to_integer(Port).
uri_regexp() ->
persistent_term:get(?URI_REGEX_PT_KEY).
format_scheme(undefined) -> <<>>;
format_scheme(Scheme) -> [Scheme, $:].
format_authority(undefined) ->
<<>>;
format_authority(#{userinfo := UserInfo, host := Host, host_type := HostType, port := Port}) ->
[$/, $/, format_userinfo(UserInfo), format_host(HostType, Host), format_port(Port)].
format_userinfo(undefined) -> <<>>;
format_userinfo(UserInfo) -> [UserInfo, $@].
format_host(ipv6, Host) -> [$[, Host, $]];
format_host(_, Host) -> Host.
format_port(undefined) -> <<>>;
format_port(Port) -> [$:, integer_to_binary(Port)].
format_query(undefined) -> <<>>;
format_query(Query) -> [$?, Query].
format_fragment(undefined) -> <<>>;
format_fragment(Fragment) -> [$#, Fragment].
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-define(URLS, [
"https://www.example.com/page",
"http://subdomain.example.com/path/to/page",
"https://www.example.com:8080/path/to/page",
"https://user:password@example.com/path/to/page",
"https://www.example.com/path%20with%20${spaces}",
"http://192.0.2.1/path/to/page",
"http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]/${path}/to/page",
"http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]/to/page",
"http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:4444/to/page",
"ftp://ftp.example.com/${path}/to/file",
"ftps://ftp.example.com/path/to/file",
"mailto:user@example.com",
"tel:+1234567890",
"sms:+1234567890?body=Hello%20World",
"git://github.com/user/repo.git",
"a:b:c",
"svn://svn.example.com/project/trunk",
"https://www.${example}.com/path/to/page?query_param=value",
"https://www.example.com/path/to/page?query_param1=value1&query_param2=value2",
"https://www.example.com?query_param1=value1&query_param2=value2",
"https://www.example.com/path/to/page#section1",
"https://www.example.com/path/to/page?query_param=value#section1",
"https://www.example.com/path/to/page?query_param1=value1&query_param2=${value2}#section1",
"https://www.example.com?query_param1=value1&query_param2=value2#section1",
"file:///path/to/file.txt",
"localhost",
"localhost:8080",
"localhost:8080/path/to/page",
"localhost:8080/path/to/page?query_param=value",
"localhost:8080/path/to/page?query_param1=value1&query_param2=value2",
"/abc/${def}",
"/abc/def?query_param=value",
"?query_param=value",
"#section1"
]).
parse_format_test_() ->
[
{URI, ?_assertEqual(list_to_binary(URI), iolist_to_binary(format(parse(URI))))}
|| URI <- ?URLS
].
base_url_test_() ->
[
{URI, ?_assert(is_prefix(iolist_to_binary(base_url(parse(URI))), list_to_binary(URI)))}
|| URI <- ?URLS
].
scheme_test_() ->
[
if_parseable_by_uri_string(URI, fun(Expected, Parsed) ->
?assertEqual(maybe_get_bin(scheme, Expected), scheme(Parsed))
end)
|| URI <- ?URLS
].
host_test_() ->
[
if_parseable_by_uri_string(URI, fun(Expected, Parsed) ->
?assertEqual(maybe_get_bin(host, Expected), host(Parsed))
end)
|| URI <- ?URLS
].
path_test_() ->
[
if_parseable_by_uri_string(URI, fun(Expected, Parsed) ->
?assertEqual(maybe_get_bin(path, Expected), path(Parsed))
end)
|| URI <- ?URLS
].
query_test_() ->
[
if_parseable_by_uri_string(URI, fun(Expected, Parsed) ->
?assertEqual(maybe_get_bin(query, Expected), query(Parsed))
end)
|| URI <- ?URLS
].
fragment_test_() ->
[
if_parseable_by_uri_string(URI, fun(Expected, Parsed) ->
?assertEqual(maybe_get_bin(fragment, Expected), fragment(Parsed))
end)
|| URI <- ?URLS
].
templates_test_() ->
[
{"template in path",
?_assertEqual(
<<"/${client_attrs.group}">>,
path(parse("https://www.example.com/${client_attrs.group}"))
)},
{"template in query, no path",
?_assertEqual(
<<"group=${client_attrs.group}">>,
query(parse("https://www.example.com?group=${client_attrs.group}"))
)},
{"template in query, path",
?_assertEqual(
<<"group=${client_attrs.group}">>,
query(parse("https://www.example.com/path/?group=${client_attrs.group}"))
)}
].
request_target_test_() ->
[
?_assertEqual(
{ok, #{port => 443, scheme => https, host => "www.example.com"}},
request_base(parse("https://www.example.com/path/to/page?query_param=value#fr"))
),
?_assertEqual(
{error, empty_host_not_allowed},
request_base(parse("localhost?query_param=value#fr"))
),
?_assertEqual(
{error, {unsupported_scheme, <<"ftp">>}},
request_base(parse("ftp://localhost"))
)
].
is_prefix(Prefix, Binary) ->
case Binary of
<<Prefix:(byte_size(Prefix))/binary, _/binary>> -> true;
_ -> false
end.
if_parseable_by_uri_string(URI, Fun) ->
case uri_string:parse(URI) of
{error, _, _} ->
{"skipped", fun() -> true end};
ExpectedMap ->
ParsedMap = parse(URI),
{URI, fun() -> Fun(ExpectedMap, ParsedMap) end}
end.
maybe_get_bin(Key, Map) ->
maybe_bin(maps:get(Key, Map, undefined)).
maybe_bin(String) when is_list(String) -> list_to_binary(String);
maybe_bin(undefined) -> undefined.
-endif.

View File

@ -0,0 +1,4 @@
Fixed and improved handling of URIs in several configurations.
Previously,
* In authentication or authorization configurations, valid pathless URIs (`https://example.com?q=x`) were not accepted as valid.
* In bridge connectors, some kinds of URIs that couldn't be correctly handled were nevertheless accepted. E.g., URIs with user info or fragment parts.