Merge pull request #13273 from savonarola/0617-fix-url-parse-5.7
fix(auth,http): improve URI handling
This commit is contained in:
commit
420cd84cf1
|
@ -7,7 +7,8 @@
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
stdlib,
|
stdlib,
|
||||||
emqx
|
emqx,
|
||||||
|
emqx_utils
|
||||||
]},
|
]},
|
||||||
{mod, {emqx_auth_app, []}},
|
{mod, {emqx_auth_app, []}},
|
||||||
{env, []},
|
{env, []},
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_auth_utils).
|
||||||
|
|
||||||
|
%% TODO
|
||||||
|
%% Move more identical authn and authz helpers here
|
||||||
|
|
||||||
|
-export([parse_url/1]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec parse_url(binary()) ->
|
||||||
|
{_Base :: emqx_utils_uri:request_base(), _Path :: binary(), _Query :: binary()}.
|
||||||
|
parse_url(Url) ->
|
||||||
|
Parsed = emqx_utils_uri:parse(Url),
|
||||||
|
case Parsed of
|
||||||
|
#{scheme := undefined} ->
|
||||||
|
throw({invalid_url, {no_scheme, Url}});
|
||||||
|
#{authority := undefined} ->
|
||||||
|
throw({invalid_url, {no_host, Url}});
|
||||||
|
#{authority := #{userinfo := Userinfo}} when Userinfo =/= undefined ->
|
||||||
|
throw({invalid_url, {userinfo_not_supported, Url}});
|
||||||
|
#{fragment := Fragment} when Fragment =/= undefined ->
|
||||||
|
throw({invalid_url, {fragments_not_supported, Url}});
|
||||||
|
_ ->
|
||||||
|
case emqx_utils_uri:request_base(Parsed) of
|
||||||
|
{ok, Base} ->
|
||||||
|
{Base, emqx_utils_uri:path(Parsed),
|
||||||
|
emqx_maybe:define(emqx_utils_uri:query(Parsed), <<>>)};
|
||||||
|
{error, Reason} ->
|
||||||
|
throw({invalid_url, {invalid_base, Reason, Url}})
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
templates_test_() ->
|
||||||
|
[
|
||||||
|
?_assertEqual(
|
||||||
|
{
|
||||||
|
#{port => 80, scheme => http, host => "example.com"},
|
||||||
|
<<"">>,
|
||||||
|
<<"client=${clientid}">>
|
||||||
|
},
|
||||||
|
parse_url(<<"http://example.com?client=${clientid}">>)
|
||||||
|
),
|
||||||
|
?_assertEqual(
|
||||||
|
{
|
||||||
|
#{port => 80, scheme => http, host => "example.com"},
|
||||||
|
<<"/path">>,
|
||||||
|
<<"client=${clientid}">>
|
||||||
|
},
|
||||||
|
parse_url(<<"http://example.com/path?client=${clientid}">>)
|
||||||
|
),
|
||||||
|
?_assertEqual(
|
||||||
|
{#{port => 80, scheme => http, host => "example.com"}, <<"/path">>, <<>>},
|
||||||
|
parse_url(<<"http://example.com/path">>)
|
||||||
|
)
|
||||||
|
].
|
||||||
|
|
||||||
|
-endif.
|
|
@ -39,7 +39,6 @@
|
||||||
make_resource_id/1,
|
make_resource_id/1,
|
||||||
without_password/1,
|
without_password/1,
|
||||||
to_bool/1,
|
to_bool/1,
|
||||||
parse_url/1,
|
|
||||||
convert_headers/1,
|
convert_headers/1,
|
||||||
convert_headers_no_content_type/1,
|
convert_headers_no_content_type/1,
|
||||||
default_headers/0,
|
default_headers/0,
|
||||||
|
@ -287,25 +286,6 @@ to_bool(MaybeBinInt) when is_binary(MaybeBinInt) ->
|
||||||
to_bool(_) ->
|
to_bool(_) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
parse_url(Url) ->
|
|
||||||
case string:split(Url, "//", leading) of
|
|
||||||
[Scheme, UrlRem] ->
|
|
||||||
case string:split(UrlRem, "/", leading) of
|
|
||||||
[HostPort, Remaining] ->
|
|
||||||
BaseUrl = iolist_to_binary([Scheme, "//", HostPort]),
|
|
||||||
case string:split(Remaining, "?", leading) of
|
|
||||||
[Path, QueryString] ->
|
|
||||||
{BaseUrl, <<"/", Path/binary>>, QueryString};
|
|
||||||
[Path] ->
|
|
||||||
{BaseUrl, <<"/", Path/binary>>, <<>>}
|
|
||||||
end;
|
|
||||||
[HostPort] ->
|
|
||||||
{iolist_to_binary([Scheme, "//", HostPort]), <<>>, <<>>}
|
|
||||||
end;
|
|
||||||
[Url] ->
|
|
||||||
throw({invalid_url, Url})
|
|
||||||
end.
|
|
||||||
|
|
||||||
convert_headers(Headers) ->
|
convert_headers(Headers) ->
|
||||||
transform_header_name(Headers).
|
transform_header_name(Headers).
|
||||||
|
|
||||||
|
|
|
@ -134,21 +134,25 @@ parse_config(
|
||||||
request_timeout := RequestTimeout
|
request_timeout := RequestTimeout
|
||||||
} = Config
|
} = Config
|
||||||
) ->
|
) ->
|
||||||
{BaseUrl0, Path, Query} = emqx_authn_utils:parse_url(RawUrl),
|
{RequestBase, Path, Query} = emqx_auth_utils:parse_url(RawUrl),
|
||||||
{ok, BaseUrl} = emqx_http_lib:uri_parse(BaseUrl0),
|
|
||||||
State = #{
|
State = #{
|
||||||
method => Method,
|
method => Method,
|
||||||
path => Path,
|
path => Path,
|
||||||
headers => ensure_header_name_type(Headers),
|
headers => ensure_header_name_type(Headers),
|
||||||
base_path_template => emqx_authn_utils:parse_str(Path),
|
base_path_template => emqx_authn_utils:parse_str(Path),
|
||||||
base_query_template => emqx_authn_utils:parse_deep(
|
base_query_template => emqx_authn_utils:parse_deep(
|
||||||
cow_qs:parse_qs(to_bin(Query))
|
cow_qs:parse_qs(Query)
|
||||||
),
|
),
|
||||||
body_template => emqx_authn_utils:parse_deep(maps:get(body, Config, #{})),
|
body_template => emqx_authn_utils:parse_deep(maps:get(body, Config, #{})),
|
||||||
request_timeout => RequestTimeout,
|
request_timeout => RequestTimeout,
|
||||||
url => RawUrl
|
url => RawUrl
|
||||||
},
|
},
|
||||||
{ok, Config#{base_url => BaseUrl, pool_type => random}, State}.
|
{ok,
|
||||||
|
Config#{
|
||||||
|
request_base => RequestBase,
|
||||||
|
pool_type => random
|
||||||
|
},
|
||||||
|
State}.
|
||||||
|
|
||||||
generate_request(Credential, #{
|
generate_request(Credential, #{
|
||||||
method := Method,
|
method := Method,
|
||||||
|
@ -246,14 +250,14 @@ request_for_log(Credential, #{url := Url, method := Method} = State) ->
|
||||||
{PathQuery, Headers} ->
|
{PathQuery, Headers} ->
|
||||||
#{
|
#{
|
||||||
method => Method,
|
method => Method,
|
||||||
base_url => Url,
|
url => Url,
|
||||||
path_query => PathQuery,
|
path_query => PathQuery,
|
||||||
headers => Headers
|
headers => Headers
|
||||||
};
|
};
|
||||||
{PathQuery, Headers, Body} ->
|
{PathQuery, Headers, Body} ->
|
||||||
#{
|
#{
|
||||||
method => Method,
|
method => Method,
|
||||||
base_url => Url,
|
url => Url,
|
||||||
path_query => PathQuery,
|
path_query => PathQuery,
|
||||||
headers => Headers,
|
headers => Headers,
|
||||||
body => Body
|
body => Body
|
||||||
|
@ -274,11 +278,6 @@ to_list(B) when is_binary(B) ->
|
||||||
to_list(L) when is_list(L) ->
|
to_list(L) when is_list(L) ->
|
||||||
L.
|
L.
|
||||||
|
|
||||||
to_bin(B) when is_binary(B) ->
|
|
||||||
B;
|
|
||||||
to_bin(L) when is_list(L) ->
|
|
||||||
list_to_binary(L).
|
|
||||||
|
|
||||||
ensure_header_name_type(Headers) ->
|
ensure_header_name_type(Headers) ->
|
||||||
Fun = fun
|
Fun = fun
|
||||||
(Key, _Val, Acc) when is_binary(Key) ->
|
(Key, _Val, Acc) when is_binary(Key) ->
|
||||||
|
|
|
@ -29,8 +29,7 @@
|
||||||
update/1,
|
update/1,
|
||||||
destroy/1,
|
destroy/1,
|
||||||
authorize/4,
|
authorize/4,
|
||||||
merge_defaults/1,
|
merge_defaults/1
|
||||||
parse_url/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
@ -160,15 +159,14 @@ parse_config(
|
||||||
request_timeout := ReqTimeout
|
request_timeout := ReqTimeout
|
||||||
} = Conf
|
} = Conf
|
||||||
) ->
|
) ->
|
||||||
{BaseUrl0, Path, Query} = parse_url(RawUrl),
|
{RequestBase, Path, Query} = emqx_auth_utils:parse_url(RawUrl),
|
||||||
{ok, BaseUrl} = emqx_http_lib:uri_parse(BaseUrl0),
|
|
||||||
Conf#{
|
Conf#{
|
||||||
method => Method,
|
method => Method,
|
||||||
base_url => BaseUrl,
|
request_base => RequestBase,
|
||||||
headers => Headers,
|
headers => Headers,
|
||||||
base_path_template => emqx_authz_utils:parse_str(Path, allowed_vars()),
|
base_path_template => emqx_authz_utils:parse_str(Path, allowed_vars()),
|
||||||
base_query_template => emqx_authz_utils:parse_deep(
|
base_query_template => emqx_authz_utils:parse_deep(
|
||||||
cow_qs:parse_qs(to_bin(Query)),
|
cow_qs:parse_qs(Query),
|
||||||
allowed_vars()
|
allowed_vars()
|
||||||
),
|
),
|
||||||
body_template => emqx_authz_utils:parse_deep(
|
body_template => emqx_authz_utils:parse_deep(
|
||||||
|
@ -180,25 +178,6 @@ parse_config(
|
||||||
pool_type => random
|
pool_type => random
|
||||||
}.
|
}.
|
||||||
|
|
||||||
parse_url(Url) ->
|
|
||||||
case string:split(Url, "//", leading) of
|
|
||||||
[Scheme, UrlRem] ->
|
|
||||||
case string:split(UrlRem, "/", leading) of
|
|
||||||
[HostPort, Remaining] ->
|
|
||||||
BaseUrl = iolist_to_binary([Scheme, "//", HostPort]),
|
|
||||||
case string:split(Remaining, "?", leading) of
|
|
||||||
[Path, QueryString] ->
|
|
||||||
{BaseUrl, <<"/", Path/binary>>, QueryString};
|
|
||||||
[Path] ->
|
|
||||||
{BaseUrl, <<"/", Path/binary>>, <<>>}
|
|
||||||
end;
|
|
||||||
[HostPort] ->
|
|
||||||
{iolist_to_binary([Scheme, "//", HostPort]), <<>>, <<>>}
|
|
||||||
end;
|
|
||||||
[Url] ->
|
|
||||||
throw({invalid_url, Url})
|
|
||||||
end.
|
|
||||||
|
|
||||||
generate_request(
|
generate_request(
|
||||||
Action,
|
Action,
|
||||||
Topic,
|
Topic,
|
||||||
|
@ -272,10 +251,6 @@ to_list(B) when is_binary(B) ->
|
||||||
to_list(L) when is_list(L) ->
|
to_list(L) when is_list(L) ->
|
||||||
L.
|
L.
|
||||||
|
|
||||||
to_bin(B) when is_binary(B) -> B;
|
|
||||||
to_bin(L) when is_list(L) -> list_to_binary(L);
|
|
||||||
to_bin(X) -> X.
|
|
||||||
|
|
||||||
allowed_vars() ->
|
allowed_vars() ->
|
||||||
allowed_vars(emqx_authz:feature_available(rich_actions)).
|
allowed_vars(emqx_authz:feature_available(rich_actions)).
|
||||||
|
|
||||||
|
|
|
@ -112,7 +112,11 @@ t_create_invalid(_Config) ->
|
||||||
InvalidConfigs =
|
InvalidConfigs =
|
||||||
[
|
[
|
||||||
AuthConfig#{<<"headers">> => []},
|
AuthConfig#{<<"headers">> => []},
|
||||||
AuthConfig#{<<"method">> => <<"delete">>}
|
AuthConfig#{<<"method">> => <<"delete">>},
|
||||||
|
AuthConfig#{<<"url">> => <<"localhost">>},
|
||||||
|
AuthConfig#{<<"url">> => <<"http://foo.com/xxx#fragment">>},
|
||||||
|
AuthConfig#{<<"url">> => <<"http://${foo}.com/xxx">>},
|
||||||
|
AuthConfig#{<<"url">> => <<"//foo.com/xxx">>}
|
||||||
],
|
],
|
||||||
|
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
|
|
|
@ -639,6 +639,8 @@ t_create_replace(_Config) ->
|
||||||
listener => {tcp, default}
|
listener => {tcp, default}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
ValidConfig = raw_http_authz_config(),
|
||||||
|
|
||||||
%% Create with valid URL
|
%% Create with valid URL
|
||||||
ok = setup_handler_and_config(
|
ok = setup_handler_and_config(
|
||||||
fun(Req0, State) ->
|
fun(Req0, State) ->
|
||||||
|
@ -656,13 +658,10 @@ t_create_replace(_Config) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% Changing to valid config
|
%% Changing to valid config
|
||||||
OkConfig = maps:merge(
|
OkConfig = ValidConfig#{
|
||||||
raw_http_authz_config(),
|
|
||||||
#{
|
|
||||||
<<"url">> =>
|
<<"url">> =>
|
||||||
<<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>
|
<<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>
|
||||||
}
|
},
|
||||||
),
|
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, _},
|
{ok, _},
|
||||||
|
@ -672,6 +671,37 @@ t_create_replace(_Config) ->
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
allow,
|
allow,
|
||||||
emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH, <<"t">>)
|
emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH, <<"t">>)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{error, _},
|
||||||
|
emqx_authz:update({?CMD_REPLACE, http}, ValidConfig#{
|
||||||
|
<<"url">> => <<"localhost">>
|
||||||
|
})
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{error, _},
|
||||||
|
emqx_authz:update({?CMD_REPLACE, http}, ValidConfig#{
|
||||||
|
<<"url">> => <<"//foo.bar/x/y?q=z">>
|
||||||
|
})
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{error, _},
|
||||||
|
emqx_authz:update({?CMD_REPLACE, http}, ValidConfig#{
|
||||||
|
<<"url">> =>
|
||||||
|
<<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}#fragment">>
|
||||||
|
})
|
||||||
|
).
|
||||||
|
|
||||||
|
t_uri_normalization(_Config) ->
|
||||||
|
ok = emqx_authz_test_lib:setup_config(
|
||||||
|
raw_http_authz_config(),
|
||||||
|
#{
|
||||||
|
<<"url">> =>
|
||||||
|
<<"http://127.0.0.1:33333?topic=${topic}&action=${action}">>
|
||||||
|
}
|
||||||
).
|
).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -336,21 +336,13 @@ parse_confs(
|
||||||
} = Conf
|
} = Conf
|
||||||
) ->
|
) ->
|
||||||
Url1 = bin(Url),
|
Url1 = bin(Url),
|
||||||
{BaseUrl, Path} = parse_url(Url1),
|
{RequestBase, Path} = parse_url(Url1),
|
||||||
BaseUrl1 =
|
|
||||||
case emqx_http_lib:uri_parse(BaseUrl) of
|
|
||||||
{ok, BUrl} ->
|
|
||||||
BUrl;
|
|
||||||
{error, Reason} ->
|
|
||||||
Reason1 = emqx_utils:readable_error_msg(Reason),
|
|
||||||
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
|
|
||||||
end,
|
|
||||||
RequestTTL = emqx_utils_maps:deep_get(
|
RequestTTL = emqx_utils_maps:deep_get(
|
||||||
[resource_opts, request_ttl],
|
[resource_opts, request_ttl],
|
||||||
Conf
|
Conf
|
||||||
),
|
),
|
||||||
Conf#{
|
Conf#{
|
||||||
base_url => BaseUrl1,
|
request_base => RequestBase,
|
||||||
request =>
|
request =>
|
||||||
#{
|
#{
|
||||||
path => Path,
|
path => Path,
|
||||||
|
@ -422,16 +414,24 @@ connector_config(BridgeType, Config) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
parse_url(Url) ->
|
parse_url(Url) ->
|
||||||
case string:split(Url, "//", leading) of
|
Parsed = emqx_utils_uri:parse(Url),
|
||||||
[Scheme, UrlRem] ->
|
case Parsed of
|
||||||
case string:split(UrlRem, "/", leading) of
|
#{scheme := undefined} ->
|
||||||
[HostPort, Path] ->
|
invalid_data(<<"Missing scheme in URL: ", Url/binary>>);
|
||||||
{iolist_to_binary([Scheme, "//", HostPort]), Path};
|
#{authority := undefined} ->
|
||||||
[HostPort] ->
|
invalid_data(<<"Missing host in URL: ", Url/binary>>);
|
||||||
{iolist_to_binary([Scheme, "//", HostPort]), <<>>}
|
#{authority := #{userinfo := Userinfo}} when Userinfo =/= undefined ->
|
||||||
end;
|
invalid_data(<<"Userinfo is not supported in URL: ", Url/binary>>);
|
||||||
[Url] ->
|
#{fragment := Fragment} when Fragment =/= undefined ->
|
||||||
invalid_data(<<"Missing scheme in URL: ", Url/binary>>)
|
invalid_data(<<"Fragments are not supported in URL: ", Url/binary>>);
|
||||||
|
_ ->
|
||||||
|
case emqx_utils_uri:request_base(Parsed) of
|
||||||
|
{ok, Base} ->
|
||||||
|
{Base, emqx_maybe:define(emqx_utils_uri:path(Parsed), <<>>)};
|
||||||
|
{error, Reason0} ->
|
||||||
|
Reason1 = emqx_utils:readable_error_msg(Reason0),
|
||||||
|
invalid_data(<<"Invalid URL: ", Url/binary, ", details: ", Reason1/binary>>)
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
bin(Bin) when is_binary(Bin) -> Bin;
|
bin(Bin) when is_binary(Bin) -> Bin;
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_dynamo, [
|
{application, emqx_bridge_dynamo, [
|
||||||
{description, "EMQX Enterprise Dynamo Bridge"},
|
{description, "EMQX Enterprise Dynamo Bridge"},
|
||||||
{vsn, "0.2.0"},
|
{vsn, "0.2.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -86,7 +86,7 @@ on_start(
|
||||||
config => redact(Config)
|
config => redact(Config)
|
||||||
}),
|
}),
|
||||||
|
|
||||||
{Schema, Server, DefaultPort} = get_host_info(to_str(Url)),
|
{Scheme, Server, DefaultPort} = get_host_info(to_str(Url)),
|
||||||
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{
|
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{
|
||||||
default_port => DefaultPort
|
default_port => DefaultPort
|
||||||
}),
|
}),
|
||||||
|
@ -97,7 +97,7 @@ on_start(
|
||||||
port => Port,
|
port => Port,
|
||||||
aws_access_key_id => to_str(AccessKeyID),
|
aws_access_key_id => to_str(AccessKeyID),
|
||||||
aws_secret_access_key => SecretAccessKey,
|
aws_secret_access_key => SecretAccessKey,
|
||||||
schema => Schema
|
scheme => Scheme
|
||||||
}},
|
}},
|
||||||
{pool_size, PoolSize}
|
{pool_size, PoolSize}
|
||||||
],
|
],
|
||||||
|
|
|
@ -63,11 +63,11 @@ init(#{
|
||||||
aws_secret_access_key := Secret,
|
aws_secret_access_key := Secret,
|
||||||
host := Host,
|
host := Host,
|
||||||
port := Port,
|
port := Port,
|
||||||
schema := Schema
|
scheme := Scheme
|
||||||
}) ->
|
}) ->
|
||||||
%% TODO: teach `erlcloud` to to accept 0-arity closures as passwords.
|
%% TODO: teach `erlcloud` to to accept 0-arity closures as passwords.
|
||||||
SecretAccessKey = to_str(emqx_secret:unwrap(Secret)),
|
SecretAccessKey = to_str(emqx_secret:unwrap(Secret)),
|
||||||
erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema),
|
erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Scheme),
|
||||||
{ok, #{}}.
|
{ok, #{}}.
|
||||||
|
|
||||||
handle_call(is_connected, _From, State) ->
|
handle_call(is_connected, _From, State) ->
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_bridge_es, [
|
{application, emqx_bridge_es, [
|
||||||
{description, "EMQX Enterprise Elastic Search Bridge"},
|
{description, "EMQX Enterprise Elastic Search Bridge"},
|
||||||
{vsn, "0.1.1"},
|
{vsn, "0.1.2"},
|
||||||
{modules, [
|
{modules, [
|
||||||
emqx_bridge_es,
|
emqx_bridge_es,
|
||||||
emqx_bridge_es_connector
|
emqx_bridge_es_connector
|
||||||
|
|
|
@ -44,11 +44,10 @@
|
||||||
|
|
||||||
-type config() ::
|
-type config() ::
|
||||||
#{
|
#{
|
||||||
base_url := #{
|
request_base := #{
|
||||||
scheme := http | https,
|
scheme := http | https,
|
||||||
host := iolist(),
|
host := iolist(),
|
||||||
port := inet:port_number(),
|
port := inet:port_number()
|
||||||
path := _
|
|
||||||
},
|
},
|
||||||
connect_timeout := pos_integer(),
|
connect_timeout := pos_integer(),
|
||||||
pool_type := random | hash,
|
pool_type := random | hash,
|
||||||
|
@ -59,7 +58,6 @@
|
||||||
|
|
||||||
-type state() ::
|
-type state() ::
|
||||||
#{
|
#{
|
||||||
base_path := _,
|
|
||||||
connect_timeout := pos_integer(),
|
connect_timeout := pos_integer(),
|
||||||
pool_type := random | hash,
|
pool_type := random | hash,
|
||||||
channels := map(),
|
channels := map(),
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_http, [
|
{application, emqx_bridge_http, [
|
||||||
{description, "EMQX HTTP Bridge and Connector Application"},
|
{description, "EMQX HTTP Bridge and Connector Application"},
|
||||||
{vsn, "0.3.0"},
|
{vsn, "0.3.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel, stdlib, emqx_resource, ehttpc]},
|
{applications, [kernel, stdlib, emqx_resource, ehttpc]},
|
||||||
{env, [
|
{env, [
|
||||||
|
|
|
@ -98,15 +98,6 @@ validate_webhook_url(undefined) ->
|
||||||
required_field => <<"url">>
|
required_field => <<"url">>
|
||||||
});
|
});
|
||||||
validate_webhook_url(Url) ->
|
validate_webhook_url(Url) ->
|
||||||
{BaseUrl, _Path} = emqx_connector_resource:parse_url(Url),
|
%% parse_url throws if the URL is invalid
|
||||||
case emqx_http_lib:uri_parse(BaseUrl) of
|
{_RequestBase, _Path} = emqx_connector_resource:parse_url(Url),
|
||||||
{ok, _} ->
|
ok.
|
||||||
ok;
|
|
||||||
{error, Reason} ->
|
|
||||||
throw(#{
|
|
||||||
kind => validation_error,
|
|
||||||
reason => invalid_url,
|
|
||||||
url => Url,
|
|
||||||
error => emqx_utils:readable_error_msg(Reason)
|
|
||||||
})
|
|
||||||
end.
|
|
||||||
|
|
|
@ -53,7 +53,7 @@
|
||||||
%% for other http-like connectors.
|
%% for other http-like connectors.
|
||||||
-export([redact_request/1]).
|
-export([redact_request/1]).
|
||||||
|
|
||||||
-export([validate_method/1, join_paths/2, formalize_request/3, transform_result/1]).
|
-export([validate_method/1, join_paths/2, formalize_request/2, transform_result/1]).
|
||||||
|
|
||||||
-define(DEFAULT_PIPELINE_SIZE, 100).
|
-define(DEFAULT_PIPELINE_SIZE, 100).
|
||||||
-define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000).
|
-define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000).
|
||||||
|
@ -189,11 +189,10 @@ callback_mode() -> async_if_possible.
|
||||||
on_start(
|
on_start(
|
||||||
InstId,
|
InstId,
|
||||||
#{
|
#{
|
||||||
base_url := #{
|
request_base := #{
|
||||||
scheme := Scheme,
|
scheme := Scheme,
|
||||||
host := Host,
|
host := Host,
|
||||||
port := Port,
|
port := Port
|
||||||
path := BasePath
|
|
||||||
},
|
},
|
||||||
connect_timeout := ConnectTimeout,
|
connect_timeout := ConnectTimeout,
|
||||||
pool_type := PoolType,
|
pool_type := PoolType,
|
||||||
|
@ -227,13 +226,13 @@ on_start(
|
||||||
{transport_opts, NTransportOpts},
|
{transport_opts, NTransportOpts},
|
||||||
{enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)}
|
{enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)}
|
||||||
],
|
],
|
||||||
|
|
||||||
State = #{
|
State = #{
|
||||||
pool_name => InstId,
|
pool_name => InstId,
|
||||||
pool_type => PoolType,
|
pool_type => PoolType,
|
||||||
host => Host,
|
host => Host,
|
||||||
port => Port,
|
port => Port,
|
||||||
connect_timeout => ConnectTimeout,
|
connect_timeout => ConnectTimeout,
|
||||||
base_path => BasePath,
|
|
||||||
scheme => Scheme,
|
scheme => Scheme,
|
||||||
request => preprocess_request(maps:get(request, Config, undefined))
|
request => preprocess_request(maps:get(request, Config, undefined))
|
||||||
},
|
},
|
||||||
|
@ -362,7 +361,7 @@ on_query(InstId, {Method, Request, Timeout}, State) ->
|
||||||
on_query(
|
on_query(
|
||||||
InstId,
|
InstId,
|
||||||
{ActionId, KeyOrNum, Method, Request, Timeout, Retry},
|
{ActionId, KeyOrNum, Method, Request, Timeout, Retry},
|
||||||
#{base_path := BasePath, host := Host, scheme := Scheme, port := Port} = State
|
#{host := Host, scheme := Scheme, port := Port} = State
|
||||||
) ->
|
) ->
|
||||||
?TRACE(
|
?TRACE(
|
||||||
"QUERY",
|
"QUERY",
|
||||||
|
@ -375,7 +374,7 @@ on_query(
|
||||||
state => redact(State)
|
state => redact(State)
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
NRequest = formalize_request(Method, BasePath, Request),
|
NRequest = formalize_request(Method, Request),
|
||||||
trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout),
|
trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout),
|
||||||
Worker = resolve_pool_worker(State, KeyOrNum),
|
Worker = resolve_pool_worker(State, KeyOrNum),
|
||||||
Result0 = ehttpc:request(
|
Result0 = ehttpc:request(
|
||||||
|
@ -472,7 +471,7 @@ on_query_async(
|
||||||
InstId,
|
InstId,
|
||||||
{ActionId, KeyOrNum, Method, Request, Timeout},
|
{ActionId, KeyOrNum, Method, Request, Timeout},
|
||||||
ReplyFunAndArgs,
|
ReplyFunAndArgs,
|
||||||
#{base_path := BasePath, host := Host, port := Port, scheme := Scheme} = State
|
#{host := Host, port := Port, scheme := Scheme} = State
|
||||||
) ->
|
) ->
|
||||||
Worker = resolve_pool_worker(State, KeyOrNum),
|
Worker = resolve_pool_worker(State, KeyOrNum),
|
||||||
?TRACE(
|
?TRACE(
|
||||||
|
@ -485,7 +484,7 @@ on_query_async(
|
||||||
state => redact(State)
|
state => redact(State)
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
NRequest = formalize_request(Method, BasePath, Request),
|
NRequest = formalize_request(Method, Request),
|
||||||
trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout),
|
trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout),
|
||||||
MaxAttempts = maps:get(max_attempts, State, 3),
|
MaxAttempts = maps:get(max_attempts, State, 3),
|
||||||
Context = #{
|
Context = #{
|
||||||
|
@ -824,6 +823,9 @@ make_method(M) when M == <<"PUT">>; M == <<"put">> -> put;
|
||||||
make_method(M) when M == <<"GET">>; M == <<"get">> -> get;
|
make_method(M) when M == <<"GET">>; M == <<"get">> -> get;
|
||||||
make_method(M) when M == <<"DELETE">>; M == <<"delete">> -> delete.
|
make_method(M) when M == <<"DELETE">>; M == <<"delete">> -> delete.
|
||||||
|
|
||||||
|
formalize_request(Method, Request) ->
|
||||||
|
formalize_request(Method, "/", Request).
|
||||||
|
|
||||||
formalize_request(Method, BasePath, {Path, Headers, _Body}) when
|
formalize_request(Method, BasePath, {Path, Headers, _Body}) when
|
||||||
Method =:= get; Method =:= delete
|
Method =:= get; Method =:= delete
|
||||||
->
|
->
|
||||||
|
|
|
@ -85,6 +85,16 @@ init_per_testcase(t_path_not_found, Config) ->
|
||||||
),
|
),
|
||||||
ok = emqx_bridge_http_connector_test_server:set_handler(not_found_http_handler()),
|
ok = emqx_bridge_http_connector_test_server:set_handler(not_found_http_handler()),
|
||||||
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
|
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
|
||||||
|
init_per_testcase(t_empty_path, Config) ->
|
||||||
|
HTTPPath = <<"/">>,
|
||||||
|
ServerSSLOpts = false,
|
||||||
|
{ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
|
||||||
|
_Port = random, HTTPPath, ServerSSLOpts
|
||||||
|
),
|
||||||
|
ok = emqx_bridge_http_connector_test_server:set_handler(
|
||||||
|
emqx_bridge_http_test_lib:success_http_handler()
|
||||||
|
),
|
||||||
|
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
|
||||||
init_per_testcase(t_too_many_requests, Config) ->
|
init_per_testcase(t_too_many_requests, Config) ->
|
||||||
HTTPPath = <<"/path">>,
|
HTTPPath = <<"/path">>,
|
||||||
ServerSSLOpts = false,
|
ServerSSLOpts = false,
|
||||||
|
@ -122,6 +132,7 @@ init_per_testcase(_TestCase, Config) ->
|
||||||
|
|
||||||
end_per_testcase(TestCase, _Config) when
|
end_per_testcase(TestCase, _Config) when
|
||||||
TestCase =:= t_path_not_found;
|
TestCase =:= t_path_not_found;
|
||||||
|
TestCase =:= t_empty_path;
|
||||||
TestCase =:= t_too_many_requests;
|
TestCase =:= t_too_many_requests;
|
||||||
TestCase =:= t_service_unavailable;
|
TestCase =:= t_service_unavailable;
|
||||||
TestCase =:= t_rule_action_expired;
|
TestCase =:= t_rule_action_expired;
|
||||||
|
@ -588,6 +599,45 @@ t_path_not_found(Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_empty_path(Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
#{port := Port, path := _Path} = ?config(http_server, Config),
|
||||||
|
MQTTTopic = <<"t/webhook">>,
|
||||||
|
BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{
|
||||||
|
type => ?BRIDGE_TYPE,
|
||||||
|
name => ?BRIDGE_NAME,
|
||||||
|
local_topic => MQTTTopic,
|
||||||
|
port => Port,
|
||||||
|
path => <<"">>
|
||||||
|
}),
|
||||||
|
{ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig),
|
||||||
|
Msg = emqx_message:make(MQTTTopic, <<"{}">>),
|
||||||
|
emqx:publish(Msg),
|
||||||
|
wait_http_request(),
|
||||||
|
?retry(
|
||||||
|
_Interval = 500,
|
||||||
|
_NAttempts = 20,
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
counters := #{
|
||||||
|
matched := 1,
|
||||||
|
failed := 0,
|
||||||
|
success := 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
get_metrics(?BRIDGE_NAME)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertEqual([], ?of_kind(http_will_retry_async, Trace)),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_too_many_requests(Config) ->
|
t_too_many_requests(Config) ->
|
||||||
check_send_is_retried(Config).
|
check_send_is_retried(Config).
|
||||||
|
|
||||||
|
|
|
@ -32,11 +32,10 @@ wrap_auth_headers_test_() ->
|
||||||
end,
|
end,
|
||||||
fun meck:unload/1, fun(_) ->
|
fun meck:unload/1, fun(_) ->
|
||||||
Config = #{
|
Config = #{
|
||||||
base_url => #{
|
request_base => #{
|
||||||
scheme => http,
|
scheme => http,
|
||||||
host => "localhost",
|
host => "localhost",
|
||||||
port => 18083,
|
port => 18083
|
||||||
path => "/status"
|
|
||||||
},
|
},
|
||||||
connect_timeout => 1000,
|
connect_timeout => 1000,
|
||||||
pool_type => random,
|
pool_type => random,
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_bridge_iotdb, [
|
{application, emqx_bridge_iotdb, [
|
||||||
{description, "EMQX Enterprise Apache IoTDB Bridge"},
|
{description, "EMQX Enterprise Apache IoTDB Bridge"},
|
||||||
{vsn, "0.2.0"},
|
{vsn, "0.2.1"},
|
||||||
{modules, [
|
{modules, [
|
||||||
emqx_bridge_iotdb,
|
emqx_bridge_iotdb,
|
||||||
emqx_bridge_iotdb_connector
|
emqx_bridge_iotdb_connector
|
||||||
|
|
|
@ -44,11 +44,10 @@
|
||||||
|
|
||||||
-type config() ::
|
-type config() ::
|
||||||
#{
|
#{
|
||||||
base_url := #{
|
request_base := #{
|
||||||
scheme := http | https,
|
scheme := http | https,
|
||||||
host := iolist(),
|
host := iolist(),
|
||||||
port := inet:port_number(),
|
port := inet:port_number()
|
||||||
path := _
|
|
||||||
},
|
},
|
||||||
connect_timeout := pos_integer(),
|
connect_timeout := pos_integer(),
|
||||||
pool_type := random | hash,
|
pool_type := random | hash,
|
||||||
|
@ -60,7 +59,6 @@
|
||||||
|
|
||||||
-type state() ::
|
-type state() ::
|
||||||
#{
|
#{
|
||||||
base_path := _,
|
|
||||||
connect_timeout := pos_integer(),
|
connect_timeout := pos_integer(),
|
||||||
pool_type := random | hash,
|
pool_type := random | hash,
|
||||||
channels := map(),
|
channels := map(),
|
||||||
|
@ -245,10 +243,10 @@ on_stop(InstanceId, State) ->
|
||||||
|
|
||||||
-spec on_get_status(manager_id(), state()) ->
|
-spec on_get_status(manager_id(), state()) ->
|
||||||
connected | connecting | {disconnected, state(), term()}.
|
connected | connecting | {disconnected, state(), term()}.
|
||||||
on_get_status(InstanceId, #{base_path := BasePath} = State) ->
|
on_get_status(InstanceId, State) ->
|
||||||
Func = fun(Worker, Timeout) ->
|
Func = fun(Worker, Timeout) ->
|
||||||
Request = {?IOTDB_PING_PATH, [], undefined},
|
Request = {?IOTDB_PING_PATH, [], undefined},
|
||||||
NRequest = emqx_bridge_http_connector:formalize_request(get, BasePath, Request),
|
NRequest = emqx_bridge_http_connector:formalize_request(get, Request),
|
||||||
Result0 = ehttpc:request(Worker, get, NRequest, Timeout),
|
Result0 = ehttpc:request(Worker, get, NRequest, Timeout),
|
||||||
case emqx_bridge_http_connector:transform_result(Result0) of
|
case emqx_bridge_http_connector:transform_result(Result0) of
|
||||||
{ok, 200, _, Body} ->
|
{ok, 200, _, Body} ->
|
||||||
|
|
|
@ -281,17 +281,9 @@ parse_confs(
|
||||||
} = Conf
|
} = Conf
|
||||||
) ->
|
) ->
|
||||||
Url1 = bin(Url),
|
Url1 = bin(Url),
|
||||||
{BaseUrl, Path} = parse_url(Url1),
|
{RequestBase, Path} = parse_url(Url1),
|
||||||
BaseUrl1 =
|
|
||||||
case emqx_http_lib:uri_parse(BaseUrl) of
|
|
||||||
{ok, BUrl} ->
|
|
||||||
BUrl;
|
|
||||||
{error, Reason} ->
|
|
||||||
Reason1 = emqx_utils:readable_error_msg(Reason),
|
|
||||||
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
|
|
||||||
end,
|
|
||||||
Conf#{
|
Conf#{
|
||||||
base_url => BaseUrl1,
|
request_base => RequestBase,
|
||||||
request =>
|
request =>
|
||||||
#{
|
#{
|
||||||
path => Path,
|
path => Path,
|
||||||
|
@ -324,16 +316,24 @@ connector_config(ConnectorType, Name, Config) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
parse_url(Url) ->
|
parse_url(Url) ->
|
||||||
case string:split(Url, "//", leading) of
|
Parsed = emqx_utils_uri:parse(Url),
|
||||||
[Scheme, UrlRem] ->
|
case Parsed of
|
||||||
case string:split(UrlRem, "/", leading) of
|
#{scheme := undefined} ->
|
||||||
[HostPort, Path] ->
|
invalid_data(<<"Missing scheme in URL: ", Url/binary>>);
|
||||||
{iolist_to_binary([Scheme, "//", HostPort]), Path};
|
#{authority := undefined} ->
|
||||||
[HostPort] ->
|
invalid_data(<<"Missing host in URL: ", Url/binary>>);
|
||||||
{iolist_to_binary([Scheme, "//", HostPort]), <<>>}
|
#{authority := #{userinfo := Userinfo}} when Userinfo =/= undefined ->
|
||||||
end;
|
invalid_data(<<"Userinfo is not supported in URL: ", Url/binary>>);
|
||||||
[Url] ->
|
#{fragment := Fragment} when Fragment =/= undefined ->
|
||||||
invalid_data(<<"Missing scheme in URL: ", Url/binary>>)
|
invalid_data(<<"Fragments are not supported in URL: ", Url/binary>>);
|
||||||
|
_ ->
|
||||||
|
case emqx_utils_uri:request_base(Parsed) of
|
||||||
|
{ok, Base} ->
|
||||||
|
{Base, emqx_maybe:define(emqx_utils_uri:path(Parsed), <<>>)};
|
||||||
|
{error, Reason0} ->
|
||||||
|
Reason1 = emqx_utils:readable_error_msg(Reason0),
|
||||||
|
invalid_data(<<"Invalid URL: ", Url/binary, ", details: ", Reason1/binary>>)
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec invalid_data(binary()) -> no_return().
|
-spec invalid_data(binary()) -> no_return().
|
||||||
|
|
|
@ -5,7 +5,8 @@
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{deps, [
|
{deps, [
|
||||||
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}
|
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}},
|
||||||
|
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{project_plugins, [erlfmt]}.
|
{project_plugins, [erlfmt]}.
|
||||||
|
|
|
@ -15,7 +15,8 @@
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
stdlib,
|
stdlib,
|
||||||
jiffy
|
jiffy,
|
||||||
|
emqx_http_lib
|
||||||
]},
|
]},
|
||||||
{env, []},
|
{env, []},
|
||||||
{licenses, ["Apache-2.0"]},
|
{licenses, ["Apache-2.0"]},
|
||||||
|
|
|
@ -0,0 +1,407 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% This module provides a loose parser for URIs.
|
||||||
|
%% The standard library's `uri_string' module is strict and does not allow
|
||||||
|
%% to parse invalid URIs, like templates: `http://example.com/${username}'.
|
||||||
|
|
||||||
|
-module(emqx_utils_uri).
|
||||||
|
|
||||||
|
-export([parse/1, format/1]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
scheme/1,
|
||||||
|
userinfo/1,
|
||||||
|
host/1,
|
||||||
|
port/1,
|
||||||
|
path/1,
|
||||||
|
query/1,
|
||||||
|
fragment/1,
|
||||||
|
base_url/1,
|
||||||
|
request_base/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-type scheme() :: binary().
|
||||||
|
-type userinfo() :: binary().
|
||||||
|
-type host() :: binary().
|
||||||
|
-type port_number() :: inet:port_number().
|
||||||
|
-type path() :: binary().
|
||||||
|
-type query() :: binary().
|
||||||
|
-type fragment() :: binary().
|
||||||
|
-type request_base() :: #{
|
||||||
|
scheme := http | https,
|
||||||
|
host := iolist(),
|
||||||
|
port := inet:port_number()
|
||||||
|
}.
|
||||||
|
|
||||||
|
-type authority() :: #{
|
||||||
|
userinfo := emqx_maybe:t(userinfo()),
|
||||||
|
host := host(),
|
||||||
|
%% Types:
|
||||||
|
%% ipv6: `\[[a-z\d:\.]*\]` — bracketed "ivp6-like" address
|
||||||
|
%% regular: `example.com` — arbitrary host not containg `:` which is forbidden in hostnames other than ipv6
|
||||||
|
%% loose: non ipv6-like host containing `:`, probably invalid for a strictly valid URI
|
||||||
|
host_type := ipv6 | regular | loose,
|
||||||
|
port := emqx_maybe:t(port_number())
|
||||||
|
}.
|
||||||
|
|
||||||
|
-type uri() :: #{
|
||||||
|
scheme := emqx_maybe:t(scheme()),
|
||||||
|
authority := emqx_maybe:t(authority()),
|
||||||
|
path := path(),
|
||||||
|
query := emqx_maybe:t(query()),
|
||||||
|
fragment := emqx_maybe:t(fragment())
|
||||||
|
}.
|
||||||
|
|
||||||
|
-export_type([
|
||||||
|
scheme/0,
|
||||||
|
userinfo/0,
|
||||||
|
host/0,
|
||||||
|
port_number/0,
|
||||||
|
path/0,
|
||||||
|
query/0,
|
||||||
|
fragment/0,
|
||||||
|
authority/0,
|
||||||
|
uri/0,
|
||||||
|
request_base/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-on_load(init/0).
|
||||||
|
|
||||||
|
%% https://datatracker.ietf.org/doc/html/rfc3986#appendix-B
|
||||||
|
%%
|
||||||
|
%% > The following line is the regular expression for breaking-down a
|
||||||
|
%% > well-formed URI reference into its components.
|
||||||
|
%%
|
||||||
|
%% > ^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?
|
||||||
|
%%
|
||||||
|
%% We skip capturing some unused parts of the regex.
|
||||||
|
|
||||||
|
-define(URI_REGEX,
|
||||||
|
("^(?:(?<scheme>[^:/?#]+):)?(?<authority>//[^/?#]*)?"
|
||||||
|
"(?<path>[^?#]*)(?<query>\\?[^#]*)?(?<fragment>#.*)?")
|
||||||
|
).
|
||||||
|
|
||||||
|
-define(URI_REGEX_PT_KEY, {?MODULE, uri_re}).
|
||||||
|
|
||||||
|
-define(AUTHORITY_REGEX,
|
||||||
|
("^(?<userinfo>.*@)?"
|
||||||
|
"(?:(?:\\[(?<host_ipv6>[a-z\\d\\.:]*)\\])|(?<host_regular>[^:]*?)|(?<host_loose>.*?))"
|
||||||
|
"(?<port>:\\d+)?$")
|
||||||
|
).
|
||||||
|
|
||||||
|
-define(AUTHORITY_REGEX_PT_KEY, {?MODULE, authority_re}).
|
||||||
|
|
||||||
|
%%-------------------------------------------------------------------
|
||||||
|
%% Internal API
|
||||||
|
%%-------------------------------------------------------------------
|
||||||
|
|
||||||
|
init() ->
|
||||||
|
{ok, UriRE} = re:compile(?URI_REGEX),
|
||||||
|
persistent_term:put(?URI_REGEX_PT_KEY, UriRE),
|
||||||
|
|
||||||
|
{ok, AuthorityRE} = re:compile(?AUTHORITY_REGEX, [caseless]),
|
||||||
|
persistent_term:put(?AUTHORITY_REGEX_PT_KEY, AuthorityRE).
|
||||||
|
|
||||||
|
%%-------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%%-------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec parse(binary()) -> uri().
|
||||||
|
parse(URIString) ->
|
||||||
|
{match, [SchemeMatch, AuthorityMatch, PathMatch, QueryMatch, FragmentMatch]} = re:run(
|
||||||
|
URIString, uri_regexp(), [{capture, [scheme, authority, path, query, fragment], binary}]
|
||||||
|
),
|
||||||
|
Scheme = parse_scheme(SchemeMatch),
|
||||||
|
Authority = parse_authority(AuthorityMatch),
|
||||||
|
Path = PathMatch,
|
||||||
|
Query = parse_query(QueryMatch),
|
||||||
|
Fragment = parse_fragment(FragmentMatch),
|
||||||
|
|
||||||
|
#{
|
||||||
|
scheme => Scheme,
|
||||||
|
authority => Authority,
|
||||||
|
path => Path,
|
||||||
|
query => Query,
|
||||||
|
fragment => Fragment
|
||||||
|
}.
|
||||||
|
|
||||||
|
-spec base_url(uri()) -> iodata().
|
||||||
|
base_url(#{scheme := Scheme, authority := Authority}) ->
|
||||||
|
[format_scheme(Scheme), format_authority(Authority)].
|
||||||
|
|
||||||
|
-spec format(uri()) -> iodata().
|
||||||
|
format(#{path := Path, query := Query, fragment := Fragment} = URI) ->
|
||||||
|
[
|
||||||
|
base_url(URI),
|
||||||
|
Path,
|
||||||
|
format_query(Query),
|
||||||
|
format_fragment(Fragment)
|
||||||
|
].
|
||||||
|
|
||||||
|
-spec scheme(uri()) -> emqx_maybe:t(scheme()).
|
||||||
|
scheme(#{scheme := Scheme}) -> Scheme.
|
||||||
|
|
||||||
|
-spec userinfo(uri()) -> emqx_maybe:t(userinfo()).
|
||||||
|
userinfo(#{authority := undefined}) -> undefined;
|
||||||
|
userinfo(#{authority := #{userinfo := UserInfo}}) -> UserInfo.
|
||||||
|
|
||||||
|
-spec host(uri()) -> emqx_maybe:t(host()).
|
||||||
|
host(#{authority := undefined}) -> undefined;
|
||||||
|
host(#{authority := #{host := Host}}) -> Host.
|
||||||
|
|
||||||
|
-spec port(uri()) -> emqx_maybe:t(port_number()).
|
||||||
|
port(#{authority := undefined}) -> undefined;
|
||||||
|
port(#{authority := #{port := Port}}) -> Port.
|
||||||
|
|
||||||
|
-spec path(uri()) -> path().
|
||||||
|
path(#{path := Path}) -> Path.
|
||||||
|
|
||||||
|
-spec query(uri()) -> emqx_maybe:t(query()).
|
||||||
|
query(#{query := Query}) -> Query.
|
||||||
|
|
||||||
|
-spec fragment(uri()) -> emqx_maybe:t(fragment()).
|
||||||
|
fragment(#{fragment := Fragment}) -> Fragment.
|
||||||
|
|
||||||
|
-spec request_base(uri()) -> {ok, request_base()} | {error, term()}.
|
||||||
|
request_base(URI) when is_map(URI) ->
|
||||||
|
case emqx_http_lib:uri_parse(iolist_to_binary(base_url(URI))) of
|
||||||
|
{error, Reason} -> {error, Reason};
|
||||||
|
{ok, URIMap} -> {ok, maps:with([scheme, host, port], URIMap)}
|
||||||
|
end;
|
||||||
|
request_base(URIString) when is_list(URIString) orelse is_binary(URIString) ->
|
||||||
|
request_base(parse(URIString)).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Helper functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
parse_scheme(<<>>) -> undefined;
|
||||||
|
parse_scheme(Scheme) -> Scheme.
|
||||||
|
|
||||||
|
parse_query(<<>>) -> undefined;
|
||||||
|
parse_query(<<$?, Query/binary>>) -> Query.
|
||||||
|
|
||||||
|
parse_fragment(<<>>) -> undefined;
|
||||||
|
parse_fragment(<<$#, Fragment/binary>>) -> Fragment.
|
||||||
|
|
||||||
|
authority_regexp() ->
|
||||||
|
persistent_term:get(?AUTHORITY_REGEX_PT_KEY).
|
||||||
|
|
||||||
|
parse_authority(<<>>) ->
|
||||||
|
undefined;
|
||||||
|
parse_authority(<<$/, $/, Authority/binary>>) ->
|
||||||
|
%% Authority regexp always matches
|
||||||
|
{match, [UserInfoMatch, HostIPv6, HostRegular, HostLoose, PortMatch]} = re:run(
|
||||||
|
Authority, authority_regexp(), [
|
||||||
|
{capture, [userinfo, host_ipv6, host_regular, host_loose, port], binary}
|
||||||
|
]
|
||||||
|
),
|
||||||
|
UserInfo = parse_userinfo(UserInfoMatch),
|
||||||
|
{HostType, Host} = parse_host(HostIPv6, HostRegular, HostLoose),
|
||||||
|
Port = parse_port(PortMatch),
|
||||||
|
#{
|
||||||
|
userinfo => UserInfo,
|
||||||
|
host => Host,
|
||||||
|
host_type => HostType,
|
||||||
|
port => Port
|
||||||
|
}.
|
||||||
|
|
||||||
|
parse_userinfo(<<>>) -> undefined;
|
||||||
|
parse_userinfo(UserInfoMatch) -> binary:part(UserInfoMatch, 0, byte_size(UserInfoMatch) - 1).
|
||||||
|
|
||||||
|
parse_host(<<>>, <<>>, Host) -> {loose, Host};
|
||||||
|
parse_host(<<>>, Host, <<>>) -> {regular, Host};
|
||||||
|
parse_host(Host, <<>>, <<>>) -> {ipv6, Host}.
|
||||||
|
|
||||||
|
parse_port(<<>>) -> undefined;
|
||||||
|
parse_port(<<$:, Port/binary>>) -> binary_to_integer(Port).
|
||||||
|
|
||||||
|
uri_regexp() ->
|
||||||
|
persistent_term:get(?URI_REGEX_PT_KEY).
|
||||||
|
|
||||||
|
format_scheme(undefined) -> <<>>;
|
||||||
|
format_scheme(Scheme) -> [Scheme, $:].
|
||||||
|
|
||||||
|
format_authority(undefined) ->
|
||||||
|
<<>>;
|
||||||
|
format_authority(#{userinfo := UserInfo, host := Host, host_type := HostType, port := Port}) ->
|
||||||
|
[$/, $/, format_userinfo(UserInfo), format_host(HostType, Host), format_port(Port)].
|
||||||
|
|
||||||
|
format_userinfo(undefined) -> <<>>;
|
||||||
|
format_userinfo(UserInfo) -> [UserInfo, $@].
|
||||||
|
|
||||||
|
format_host(ipv6, Host) -> [$[, Host, $]];
|
||||||
|
format_host(_, Host) -> Host.
|
||||||
|
|
||||||
|
format_port(undefined) -> <<>>;
|
||||||
|
format_port(Port) -> [$:, integer_to_binary(Port)].
|
||||||
|
|
||||||
|
format_query(undefined) -> <<>>;
|
||||||
|
format_query(Query) -> [$?, Query].
|
||||||
|
|
||||||
|
format_fragment(undefined) -> <<>>;
|
||||||
|
format_fragment(Fragment) -> [$#, Fragment].
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-define(URLS, [
|
||||||
|
"https://www.example.com/page",
|
||||||
|
"http://subdomain.example.com/path/to/page",
|
||||||
|
"https://www.example.com:8080/path/to/page",
|
||||||
|
"https://user:password@example.com/path/to/page",
|
||||||
|
"https://www.example.com/path%20with%20${spaces}",
|
||||||
|
"http://192.0.2.1/path/to/page",
|
||||||
|
"http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]/${path}/to/page",
|
||||||
|
"http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]/to/page",
|
||||||
|
"http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:4444/to/page",
|
||||||
|
"ftp://ftp.example.com/${path}/to/file",
|
||||||
|
"ftps://ftp.example.com/path/to/file",
|
||||||
|
"mailto:user@example.com",
|
||||||
|
"tel:+1234567890",
|
||||||
|
"sms:+1234567890?body=Hello%20World",
|
||||||
|
"git://github.com/user/repo.git",
|
||||||
|
"a:b:c",
|
||||||
|
"svn://svn.example.com/project/trunk",
|
||||||
|
"https://www.${example}.com/path/to/page?query_param=value",
|
||||||
|
"https://www.example.com/path/to/page?query_param1=value1&query_param2=value2",
|
||||||
|
"https://www.example.com?query_param1=value1&query_param2=value2",
|
||||||
|
"https://www.example.com/path/to/page#section1",
|
||||||
|
"https://www.example.com/path/to/page?query_param=value#section1",
|
||||||
|
"https://www.example.com/path/to/page?query_param1=value1&query_param2=${value2}#section1",
|
||||||
|
"https://www.example.com?query_param1=value1&query_param2=value2#section1",
|
||||||
|
"file:///path/to/file.txt",
|
||||||
|
"localhost",
|
||||||
|
"localhost:8080",
|
||||||
|
"localhost:8080/path/to/page",
|
||||||
|
"localhost:8080/path/to/page?query_param=value",
|
||||||
|
"localhost:8080/path/to/page?query_param1=value1&query_param2=value2",
|
||||||
|
"/abc/${def}",
|
||||||
|
"/abc/def?query_param=value",
|
||||||
|
"?query_param=value",
|
||||||
|
"#section1"
|
||||||
|
]).
|
||||||
|
|
||||||
|
parse_format_test_() ->
|
||||||
|
[
|
||||||
|
{URI, ?_assertEqual(list_to_binary(URI), iolist_to_binary(format(parse(URI))))}
|
||||||
|
|| URI <- ?URLS
|
||||||
|
].
|
||||||
|
|
||||||
|
base_url_test_() ->
|
||||||
|
[
|
||||||
|
{URI, ?_assert(is_prefix(iolist_to_binary(base_url(parse(URI))), list_to_binary(URI)))}
|
||||||
|
|| URI <- ?URLS
|
||||||
|
].
|
||||||
|
|
||||||
|
scheme_test_() ->
|
||||||
|
[
|
||||||
|
if_parseable_by_uri_string(URI, fun(Expected, Parsed) ->
|
||||||
|
?assertEqual(maybe_get_bin(scheme, Expected), scheme(Parsed))
|
||||||
|
end)
|
||||||
|
|| URI <- ?URLS
|
||||||
|
].
|
||||||
|
|
||||||
|
host_test_() ->
|
||||||
|
[
|
||||||
|
if_parseable_by_uri_string(URI, fun(Expected, Parsed) ->
|
||||||
|
?assertEqual(maybe_get_bin(host, Expected), host(Parsed))
|
||||||
|
end)
|
||||||
|
|| URI <- ?URLS
|
||||||
|
].
|
||||||
|
|
||||||
|
path_test_() ->
|
||||||
|
[
|
||||||
|
if_parseable_by_uri_string(URI, fun(Expected, Parsed) ->
|
||||||
|
?assertEqual(maybe_get_bin(path, Expected), path(Parsed))
|
||||||
|
end)
|
||||||
|
|| URI <- ?URLS
|
||||||
|
].
|
||||||
|
|
||||||
|
query_test_() ->
|
||||||
|
[
|
||||||
|
if_parseable_by_uri_string(URI, fun(Expected, Parsed) ->
|
||||||
|
?assertEqual(maybe_get_bin(query, Expected), query(Parsed))
|
||||||
|
end)
|
||||||
|
|| URI <- ?URLS
|
||||||
|
].
|
||||||
|
|
||||||
|
fragment_test_() ->
|
||||||
|
[
|
||||||
|
if_parseable_by_uri_string(URI, fun(Expected, Parsed) ->
|
||||||
|
?assertEqual(maybe_get_bin(fragment, Expected), fragment(Parsed))
|
||||||
|
end)
|
||||||
|
|| URI <- ?URLS
|
||||||
|
].
|
||||||
|
|
||||||
|
templates_test_() ->
|
||||||
|
[
|
||||||
|
{"template in path",
|
||||||
|
?_assertEqual(
|
||||||
|
<<"/${client_attrs.group}">>,
|
||||||
|
path(parse("https://www.example.com/${client_attrs.group}"))
|
||||||
|
)},
|
||||||
|
{"template in query, no path",
|
||||||
|
?_assertEqual(
|
||||||
|
<<"group=${client_attrs.group}">>,
|
||||||
|
query(parse("https://www.example.com?group=${client_attrs.group}"))
|
||||||
|
)},
|
||||||
|
{"template in query, path",
|
||||||
|
?_assertEqual(
|
||||||
|
<<"group=${client_attrs.group}">>,
|
||||||
|
query(parse("https://www.example.com/path/?group=${client_attrs.group}"))
|
||||||
|
)}
|
||||||
|
].
|
||||||
|
|
||||||
|
request_target_test_() ->
|
||||||
|
[
|
||||||
|
?_assertEqual(
|
||||||
|
{ok, #{port => 443, scheme => https, host => "www.example.com"}},
|
||||||
|
request_base(parse("https://www.example.com/path/to/page?query_param=value#fr"))
|
||||||
|
),
|
||||||
|
?_assertEqual(
|
||||||
|
{error, empty_host_not_allowed},
|
||||||
|
request_base(parse("localhost?query_param=value#fr"))
|
||||||
|
),
|
||||||
|
?_assertEqual(
|
||||||
|
{error, {unsupported_scheme, <<"ftp">>}},
|
||||||
|
request_base(parse("ftp://localhost"))
|
||||||
|
)
|
||||||
|
].
|
||||||
|
|
||||||
|
is_prefix(Prefix, Binary) ->
|
||||||
|
case Binary of
|
||||||
|
<<Prefix:(byte_size(Prefix))/binary, _/binary>> -> true;
|
||||||
|
_ -> false
|
||||||
|
end.
|
||||||
|
|
||||||
|
if_parseable_by_uri_string(URI, Fun) ->
|
||||||
|
case uri_string:parse(URI) of
|
||||||
|
{error, _, _} ->
|
||||||
|
{"skipped", fun() -> true end};
|
||||||
|
ExpectedMap ->
|
||||||
|
ParsedMap = parse(URI),
|
||||||
|
{URI, fun() -> Fun(ExpectedMap, ParsedMap) end}
|
||||||
|
end.
|
||||||
|
|
||||||
|
maybe_get_bin(Key, Map) ->
|
||||||
|
maybe_bin(maps:get(Key, Map, undefined)).
|
||||||
|
|
||||||
|
maybe_bin(String) when is_list(String) -> list_to_binary(String);
|
||||||
|
maybe_bin(undefined) -> undefined.
|
||||||
|
|
||||||
|
-endif.
|
|
@ -0,0 +1,4 @@
|
||||||
|
Fixed and improved handling of URIs in several configurations.
|
||||||
|
Previously,
|
||||||
|
* In authentication or authorization configurations, valid pathless URIs (`https://example.com?q=x`) were not accepted as valid.
|
||||||
|
* In bridge connectors, some kinds of URIs that couldn't be correctly handled were nevertheless accepted. E.g., URIs with user info or fragment parts.
|
Loading…
Reference in New Issue