feat(http connector): support http connector (#5192)
- support http connector - support http authn
This commit is contained in:
parent
54c776ebdf
commit
df92a60085
|
@ -0,0 +1,299 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_authn_http).
|
||||||
|
|
||||||
|
-include("emqx_authn.hrl").
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
|
||||||
|
-behaviour(hocon_schema).
|
||||||
|
|
||||||
|
-export([ structs/0
|
||||||
|
, fields/1
|
||||||
|
, validations/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-type accept() :: 'application/json' | 'application/x-www-form-urlencoded'.
|
||||||
|
-type content_type() :: accept().
|
||||||
|
|
||||||
|
-reflect_type([ accept/0
|
||||||
|
, content_type/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ create/3
|
||||||
|
, update/4
|
||||||
|
, authenticate/2
|
||||||
|
, destroy/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Hocon Schema
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
structs() -> [""].
|
||||||
|
|
||||||
|
fields("") ->
|
||||||
|
[ {config, #{type => hoconsc:union(
|
||||||
|
[ hoconsc:ref(?MODULE, get)
|
||||||
|
, hoconsc:ref(?MODULE, post)
|
||||||
|
])}}
|
||||||
|
];
|
||||||
|
|
||||||
|
fields(get) ->
|
||||||
|
[ {method, #{type => get,
|
||||||
|
default => get}}
|
||||||
|
] ++ common_fields();
|
||||||
|
|
||||||
|
fields(post) ->
|
||||||
|
[ {method, #{type => post,
|
||||||
|
default => get}}
|
||||||
|
, {content_type, fun content_type/1}
|
||||||
|
] ++ common_fields().
|
||||||
|
|
||||||
|
common_fields() ->
|
||||||
|
[ {url, fun url/1}
|
||||||
|
, {accept, fun accept/1}
|
||||||
|
, {headers, fun headers/1}
|
||||||
|
, {form_data, fun form_data/1}
|
||||||
|
, {request_timeout, fun request_timeout/1}
|
||||||
|
] ++ proplists:delete(base_url, emqx_connector_http:fields(config)).
|
||||||
|
|
||||||
|
validations() ->
|
||||||
|
[ {check_ssl_opts, fun check_ssl_opts/1} ].
|
||||||
|
|
||||||
|
url(type) -> binary();
|
||||||
|
url(nullable) -> false;
|
||||||
|
url(validate) -> [fun check_url/1];
|
||||||
|
url(_) -> undefined.
|
||||||
|
|
||||||
|
accept(type) -> accept();
|
||||||
|
accept(default) -> 'application/json';
|
||||||
|
accept(_) -> undefined.
|
||||||
|
|
||||||
|
content_type(type) -> content_type();
|
||||||
|
content_type(default) -> 'application/json';
|
||||||
|
content_type(_) -> undefined.
|
||||||
|
|
||||||
|
headers(type) -> list();
|
||||||
|
headers(default) -> [];
|
||||||
|
headers(_) -> undefined.
|
||||||
|
|
||||||
|
form_data(type) -> binary();
|
||||||
|
form_data(nullable) -> false;
|
||||||
|
form_data(validate) -> [fun check_form_data/1];
|
||||||
|
form_data(_) -> undefined.
|
||||||
|
|
||||||
|
request_timeout(type) -> non_neg_integer();
|
||||||
|
request_timeout(default) -> 5000;
|
||||||
|
request_timeout(_) -> undefined.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% APIs
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
create(ChainID, AuthenticatorName,
|
||||||
|
#{method := Method,
|
||||||
|
url := URL,
|
||||||
|
accept := Accept,
|
||||||
|
content_type := ContentType,
|
||||||
|
headers := Headers,
|
||||||
|
form_data := FormData,
|
||||||
|
request_timeout := RequestTimeout} = Config) ->
|
||||||
|
NHeaders = maps:merge(#{<<"accept">> => atom_to_binary(Accept, utf8),
|
||||||
|
<<"content-type">> => atom_to_binary(ContentType, utf8)}, Headers),
|
||||||
|
NFormData = preprocess_form_data(FormData),
|
||||||
|
#{path := Path,
|
||||||
|
query := Query} = URIMap = parse_url(URL),
|
||||||
|
BaseURL = generate_base_url(URIMap),
|
||||||
|
State = #{method => Method,
|
||||||
|
path => Path,
|
||||||
|
base_query => cow_qs:parse_qs(Query),
|
||||||
|
accept => Accept,
|
||||||
|
content_type => ContentType,
|
||||||
|
headers => NHeaders,
|
||||||
|
form_data => NFormData,
|
||||||
|
request_timeout => RequestTimeout},
|
||||||
|
ResourceID = <<ChainID/binary, "/", AuthenticatorName/binary>>,
|
||||||
|
case emqx_resource:create_local(ResourceID, emqx_connector_http, Config#{base_url := BaseURL}) of
|
||||||
|
{ok, _} ->
|
||||||
|
{ok, State#{resource_id => ResourceID}};
|
||||||
|
{error, already_created} ->
|
||||||
|
{ok, State#{resource_id => ResourceID}};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
update(_ChainID, _AuthenticatorName, Config, #{resource_id := ResourceID} = State) ->
|
||||||
|
case emqx_resource:update_local(ResourceID, emqx_connector_http, Config, []) of
|
||||||
|
{ok, _} -> {ok, State};
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
authenticate(ClientInfo, #{resource_id := ResourceID,
|
||||||
|
method := Method,
|
||||||
|
request_timeout := RequestTimeout} = State) ->
|
||||||
|
Request = generate_request(ClientInfo, State),
|
||||||
|
case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of
|
||||||
|
{ok, 204, _Headers} -> ok;
|
||||||
|
{ok, 200, Headers, Body} ->
|
||||||
|
ContentType = proplists:get_value(<<"content-type">>, Headers, <<"application/json">>),
|
||||||
|
case safely_parse_body(ContentType, Body) of
|
||||||
|
{ok, _NBody} ->
|
||||||
|
%% TODO: Return by user property
|
||||||
|
ok;
|
||||||
|
{error, Reason} ->
|
||||||
|
{stop, Reason}
|
||||||
|
end;
|
||||||
|
{error, _Reason} ->
|
||||||
|
ignore
|
||||||
|
end.
|
||||||
|
|
||||||
|
destroy(#{resource_id := ResourceID}) ->
|
||||||
|
_ = emqx_resource:remove_local(ResourceID),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
check_url(URL) ->
|
||||||
|
case emqx_http_lib:uri_parse(URL) of
|
||||||
|
{ok, _} -> true;
|
||||||
|
{error, _} -> false
|
||||||
|
end.
|
||||||
|
|
||||||
|
check_form_data(FormData) ->
|
||||||
|
KVs = binary:split(FormData, [<<"&">>], [global]),
|
||||||
|
case false =:= lists:any(fun(T) -> T =:= <<>> end, KVs) of
|
||||||
|
true ->
|
||||||
|
NKVs = [list_to_tuple(binary:split(KV, [<<"=">>], [global])) || KV <- KVs],
|
||||||
|
false =:=
|
||||||
|
lists:any(fun({K, V}) ->
|
||||||
|
K =:= <<>> orelse V =:= <<>>;
|
||||||
|
(_) ->
|
||||||
|
true
|
||||||
|
end, NKVs);
|
||||||
|
false ->
|
||||||
|
false
|
||||||
|
end.
|
||||||
|
|
||||||
|
check_ssl_opts(Conf) ->
|
||||||
|
URL = hocon_schema:get_value("url", Conf),
|
||||||
|
{ok, #{scheme := Scheme}} = emqx_http_lib:uri_parse(URL),
|
||||||
|
SSLOpts = hocon_schema:get_value("ssl_opts", Conf),
|
||||||
|
case {Scheme, SSLOpts} of
|
||||||
|
{http, undefined} -> true;
|
||||||
|
{http, _} -> false;
|
||||||
|
{https, undefined} -> false;
|
||||||
|
{https, _} -> true
|
||||||
|
end.
|
||||||
|
|
||||||
|
preprocess_form_data(FormData) ->
|
||||||
|
KVs = binary:split(FormData, [<<"&">>], [global]),
|
||||||
|
[list_to_tuple(binary:split(KV, [<<"=">>], [global])) || KV <- KVs].
|
||||||
|
|
||||||
|
parse_url(URL) ->
|
||||||
|
{ok, URIMap} = emqx_http_lib:uri_parse(URL),
|
||||||
|
case maps:get(query, URIMap, undefined) of
|
||||||
|
undefined ->
|
||||||
|
URIMap#{query => ""};
|
||||||
|
_ ->
|
||||||
|
URIMap
|
||||||
|
end.
|
||||||
|
|
||||||
|
generate_base_url(#{scheme := Scheme,
|
||||||
|
host := Host,
|
||||||
|
port := Port}) ->
|
||||||
|
iolist_to_binary(io_lib:format("~p://~s:~p", [Scheme, Host, Port])).
|
||||||
|
|
||||||
|
generate_request(ClientInfo, #{method := Method,
|
||||||
|
path := Path,
|
||||||
|
base_query := BaseQuery,
|
||||||
|
content_type := ContentType,
|
||||||
|
headers := Headers,
|
||||||
|
form_data := FormData0}) ->
|
||||||
|
FormData = replace_placeholders(FormData0, ClientInfo),
|
||||||
|
case Method of
|
||||||
|
get ->
|
||||||
|
NPath = append_query(Path, BaseQuery ++ FormData),
|
||||||
|
{NPath, Headers};
|
||||||
|
post ->
|
||||||
|
NPath = append_query(Path, BaseQuery),
|
||||||
|
Body = serialize_body(ContentType, FormData),
|
||||||
|
{NPath, Headers, Body}
|
||||||
|
end.
|
||||||
|
|
||||||
|
replace_placeholders(FormData0, ClientInfo) ->
|
||||||
|
FormData = lists:map(fun({K, V0}) ->
|
||||||
|
case replace_placeholder(V0, ClientInfo) of
|
||||||
|
undefined -> {K, undefined};
|
||||||
|
V -> {K, bin(V)}
|
||||||
|
end
|
||||||
|
end, FormData0),
|
||||||
|
lists:filter(fun({_, V}) ->
|
||||||
|
V =/= undefined
|
||||||
|
end, FormData).
|
||||||
|
|
||||||
|
replace_placeholder(<<"${mqtt-username}">>, ClientInfo) ->
|
||||||
|
maps:get(username, ClientInfo, undefined);
|
||||||
|
replace_placeholder(<<"${mqtt-clientid}">>, ClientInfo) ->
|
||||||
|
maps:get(clientid, ClientInfo, undefined);
|
||||||
|
replace_placeholder(<<"${ip-address}">>, ClientInfo) ->
|
||||||
|
maps:get(peerhost, ClientInfo, undefined);
|
||||||
|
replace_placeholder(<<"${cert-subject}">>, ClientInfo) ->
|
||||||
|
maps:get(dn, ClientInfo, undefined);
|
||||||
|
replace_placeholder(<<"${cert-common-name}">>, ClientInfo) ->
|
||||||
|
maps:get(cn, ClientInfo, undefined);
|
||||||
|
replace_placeholder(Constant, _) ->
|
||||||
|
Constant.
|
||||||
|
|
||||||
|
append_query(Path, []) ->
|
||||||
|
Path;
|
||||||
|
append_query(Path, Query) ->
|
||||||
|
Path ++ "?" ++ binary_to_list(qs(Query)).
|
||||||
|
|
||||||
|
qs(KVs) ->
|
||||||
|
qs(KVs, []).
|
||||||
|
|
||||||
|
qs([], Acc) ->
|
||||||
|
<<$&, Qs/binary>> = iolist_to_binary(lists:reverse(Acc)),
|
||||||
|
Qs;
|
||||||
|
qs([{K, V} | More], Acc) ->
|
||||||
|
qs(More, [["&", emqx_http_lib:uri_encode(K), "=", emqx_http_lib:uri_encode(V)] | Acc]).
|
||||||
|
|
||||||
|
serialize_body('application/json', FormData) ->
|
||||||
|
emqx_json:encode(FormData);
|
||||||
|
serialize_body('application/x-www-form-urlencoded', FormData) ->
|
||||||
|
qs(FormData).
|
||||||
|
|
||||||
|
safely_parse_body(ContentType, Body) ->
|
||||||
|
try parse_body(ContentType, Body) of
|
||||||
|
Result -> Result
|
||||||
|
catch
|
||||||
|
_Class:_Reason ->
|
||||||
|
{error, invalid_body}
|
||||||
|
end.
|
||||||
|
|
||||||
|
parse_body(<<"application/json">>, Body) ->
|
||||||
|
{ok, emqx_json:decode(Body)};
|
||||||
|
parse_body(<<"application/x-www-form-urlencoded">>, Body) ->
|
||||||
|
{ok, cow_qs:parse_qs(Body)};
|
||||||
|
parse_body(ContentType, _) ->
|
||||||
|
{error, {unsupported_content_type, ContentType}}.
|
||||||
|
|
||||||
|
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
||||||
|
bin(L) when is_list(L) -> list_to_binary(L);
|
||||||
|
bin(X) -> X.
|
|
@ -294,12 +294,16 @@ do_verify_claims(Claims, [{Name, Value} | More]) ->
|
||||||
{error, {claims, {Name, Value0}}}
|
{error, {claims, {Name, Value0}}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_verify_claims([]) ->
|
check_verify_claims(Conf) ->
|
||||||
|
Claims = hocon_schema:get_value("verify_claims", Conf),
|
||||||
|
do_check_verify_claims(Claims).
|
||||||
|
|
||||||
|
do_check_verify_claims([]) ->
|
||||||
false;
|
false;
|
||||||
check_verify_claims([{Name, Expected} | More]) ->
|
do_check_verify_claims([{Name, Expected} | More]) ->
|
||||||
check_claim_name(Name) andalso
|
check_claim_name(Name) andalso
|
||||||
check_claim_expected(Expected) andalso
|
check_claim_expected(Expected) andalso
|
||||||
check_verify_claims(More).
|
do_check_verify_claims(More).
|
||||||
|
|
||||||
check_claim_name(exp) ->
|
check_claim_name(exp) ->
|
||||||
false;
|
false;
|
||||||
|
|
|
@ -58,10 +58,11 @@ query_timeout(_) -> undefined.
|
||||||
%% APIs
|
%% APIs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
create(ChainID, ServiceName, #{query := Query0,
|
create(ChainID, AuthenticatorName,
|
||||||
password_hash_algorithm := Algorithm} = Config) ->
|
#{query := Query0,
|
||||||
|
password_hash_algorithm := Algorithm} = Config) ->
|
||||||
{Query, PlaceHolders} = parse_query(Query0),
|
{Query, PlaceHolders} = parse_query(Query0),
|
||||||
ResourceID = iolist_to_binary(io_lib:format("~s/~s",[ChainID, ServiceName])),
|
ResourceID = iolist_to_binary(io_lib:format("~s/~s",[ChainID, AuthenticatorName])),
|
||||||
State = #{query => Query,
|
State = #{query => Query,
|
||||||
placeholders => PlaceHolders,
|
placeholders => PlaceHolders,
|
||||||
password_hash_algorithm => Algorithm},
|
password_hash_algorithm => Algorithm},
|
||||||
|
@ -74,7 +75,7 @@ create(ChainID, ServiceName, #{query := Query0,
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
update(_ChainID, _ServiceName, Config, #{resource_id := ResourceID} = State) ->
|
update(_ChainID, _AuthenticatorName, Config, #{resource_id := ResourceID} = State) ->
|
||||||
case emqx_resource:update_local(ResourceID, emqx_connector_mysql, Config, []) of
|
case emqx_resource:update_local(ResourceID, emqx_connector_mysql, Config, []) of
|
||||||
{ok, _} -> {ok, State};
|
{ok, _} -> {ok, State};
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
|
|
|
@ -0,0 +1,213 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_connector_http).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
|
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
|
||||||
|
|
||||||
|
%% callbacks of behaviour emqx_resource
|
||||||
|
-export([ on_start/2
|
||||||
|
, on_stop/2
|
||||||
|
, on_query/4
|
||||||
|
, on_health_check/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ structs/0
|
||||||
|
, fields/1
|
||||||
|
, validations/0]).
|
||||||
|
|
||||||
|
-type connect_timeout() :: non_neg_integer() | infinity.
|
||||||
|
-type pool_type() :: random | hash.
|
||||||
|
|
||||||
|
-reflect_type([ connect_timeout/0
|
||||||
|
, pool_type/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
%%=====================================================================
|
||||||
|
%% Hocon schema
|
||||||
|
structs() -> [""].
|
||||||
|
|
||||||
|
fields("") ->
|
||||||
|
[{config, #{type => hoconsc:ref(?MODULE, config)}}];
|
||||||
|
|
||||||
|
fields(config) ->
|
||||||
|
[ {base_url, fun base_url/1}
|
||||||
|
, {connect_timeout, fun connect_timeout/1}
|
||||||
|
, {max_retries, fun max_retries/1}
|
||||||
|
, {retry_interval, fun retry_interval/1}
|
||||||
|
, {keepalive, fun keepalive/1}
|
||||||
|
, {pool_type, fun pool_type/1}
|
||||||
|
, {pool_size, fun pool_size/1}
|
||||||
|
, {ssl_opts, #{type => hoconsc:ref(?MODULE, ssl_opts),
|
||||||
|
nullable => true}}
|
||||||
|
];
|
||||||
|
|
||||||
|
fields(ssl_opts) ->
|
||||||
|
[ {cacertfile, fun cacertfile/1}
|
||||||
|
, {keyfile, fun keyfile/1}
|
||||||
|
, {certfile, fun certfile/1}
|
||||||
|
, {verify, fun verify/1}
|
||||||
|
].
|
||||||
|
|
||||||
|
validations() ->
|
||||||
|
[ {check_ssl_opts, fun check_ssl_opts/1} ].
|
||||||
|
|
||||||
|
base_url(type) -> binary();
|
||||||
|
base_url(nullable) -> false;
|
||||||
|
base_url(validate) -> [fun check_base_url/1];
|
||||||
|
base_url(_) -> undefined.
|
||||||
|
|
||||||
|
connect_timeout(type) -> connect_timeout();
|
||||||
|
connect_timeout(default) -> 5000;
|
||||||
|
connect_timeout(_) -> undefined.
|
||||||
|
|
||||||
|
max_retries(type) -> non_neg_integer();
|
||||||
|
max_retries(default) -> 5;
|
||||||
|
max_retries(_) -> undefined.
|
||||||
|
|
||||||
|
retry_interval(type) -> non_neg_integer();
|
||||||
|
retry_interval(default) -> 1000;
|
||||||
|
retry_interval(_) -> undefined.
|
||||||
|
|
||||||
|
keepalive(type) -> non_neg_integer();
|
||||||
|
keepalive(default) -> 5000;
|
||||||
|
keepalive(_) -> undefined.
|
||||||
|
|
||||||
|
pool_type(type) -> pool_type();
|
||||||
|
pool_type(default) -> random;
|
||||||
|
pool_type(_) -> undefined.
|
||||||
|
|
||||||
|
pool_size(type) -> non_neg_integer();
|
||||||
|
pool_size(default) -> 8;
|
||||||
|
pool_size(_) -> undefined.
|
||||||
|
|
||||||
|
cacertfile(type) -> string();
|
||||||
|
cacertfile(nullable) -> true;
|
||||||
|
cacertfile(_) -> undefined.
|
||||||
|
|
||||||
|
keyfile(type) -> string();
|
||||||
|
keyfile(nullable) -> true;
|
||||||
|
keyfile(_) -> undefined.
|
||||||
|
|
||||||
|
certfile(type) -> string();
|
||||||
|
certfile(nullable) -> false;
|
||||||
|
certfile(_) -> undefined.
|
||||||
|
|
||||||
|
verify(type) -> boolean();
|
||||||
|
verify(default) -> false;
|
||||||
|
verify(_) -> undefined.
|
||||||
|
|
||||||
|
%% ===================================================================
|
||||||
|
on_start(InstId, #{url := URL,
|
||||||
|
connect_timeout := ConnectTimeout,
|
||||||
|
max_retries := MaxRetries,
|
||||||
|
retry_interval := RetryInterval,
|
||||||
|
keepalive := Keepalive,
|
||||||
|
pool_type := PoolType,
|
||||||
|
pool_size := PoolSize} = Config) ->
|
||||||
|
logger:info("starting http connector: ~p, config: ~p", [InstId, Config]),
|
||||||
|
{ok, #{scheme := Scheme,
|
||||||
|
host := Host,
|
||||||
|
port := Port,
|
||||||
|
path := BasePath}} = emqx_http_lib:uri_parse(URL),
|
||||||
|
{Transport, TransportOpts} = case Scheme of
|
||||||
|
http ->
|
||||||
|
{tcp, []};
|
||||||
|
https ->
|
||||||
|
SSLOpts = emqx_plugin_libs_ssl:save_files_return_opts(
|
||||||
|
maps:get(ssl_opts, Config), "connectors", InstId),
|
||||||
|
{tls, SSLOpts}
|
||||||
|
end,
|
||||||
|
NTransportOpts = emqx_misc:ipv6_probe(TransportOpts),
|
||||||
|
PoolOpts = [ {host, Host}
|
||||||
|
, {port, Port}
|
||||||
|
, {connect_timeout, ConnectTimeout}
|
||||||
|
, {retry, MaxRetries}
|
||||||
|
, {retry_timeout, RetryInterval}
|
||||||
|
, {keepalive, Keepalive}
|
||||||
|
, {pool_type, PoolType}
|
||||||
|
, {pool_size, PoolSize}
|
||||||
|
, {transport, Transport}
|
||||||
|
, {transport, NTransportOpts}],
|
||||||
|
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
||||||
|
{ok, _} = ehttpc_sup:start_pool(PoolName, PoolOpts),
|
||||||
|
{ok, #{pool_name => PoolName,
|
||||||
|
host => Host,
|
||||||
|
port => Port,
|
||||||
|
base_path => BasePath}}.
|
||||||
|
|
||||||
|
on_stop(InstId, #{pool_name := PoolName}) ->
|
||||||
|
logger:info("stopping http connector: ~p", [InstId]),
|
||||||
|
ehttpc_sup:stop_pool(PoolName).
|
||||||
|
|
||||||
|
on_query(InstId, {Method, Request}, AfterQuery, State) ->
|
||||||
|
on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State);
|
||||||
|
on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
|
||||||
|
on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State);
|
||||||
|
on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, #{pool_name := PoolName,
|
||||||
|
base_path := BasePath} = State) ->
|
||||||
|
logger:debug("http connector ~p received request: ~p, at state: ~p", [InstId, Request, State]),
|
||||||
|
NRequest = update_path(BasePath, Request),
|
||||||
|
case Result = ehttpc:request(case KeyOrNum of
|
||||||
|
undefined -> PoolName;
|
||||||
|
_ -> {PoolName, KeyOrNum}
|
||||||
|
end, Method, NRequest, Timeout) of
|
||||||
|
{error, Reason} ->
|
||||||
|
logger:debug("http connector ~p do reqeust failed, sql: ~p, reason: ~p", [InstId, NRequest, Reason]),
|
||||||
|
emqx_resource:query_failed(AfterQuery);
|
||||||
|
_ ->
|
||||||
|
emqx_resource:query_success(AfterQuery)
|
||||||
|
end,
|
||||||
|
Result.
|
||||||
|
|
||||||
|
on_health_check(_InstId, #{server := {Host, Port}} = State) ->
|
||||||
|
case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), 3000) of
|
||||||
|
{ok, Sock} ->
|
||||||
|
gen_tcp:close(Sock),
|
||||||
|
{ok, State};
|
||||||
|
{error, _Reason} ->
|
||||||
|
{error, test_query_failed, State}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
check_base_url(URL) ->
|
||||||
|
case emqx_http_lib:uri_parse(URL) of
|
||||||
|
{error, _} -> false;
|
||||||
|
{ok, #{query := _}} -> false;
|
||||||
|
_ -> true
|
||||||
|
end.
|
||||||
|
|
||||||
|
check_ssl_opts(Conf) ->
|
||||||
|
URL = hocon_schema:get_value("url", Conf),
|
||||||
|
{ok, #{scheme := Scheme}} = emqx_http_lib:uri_parse(URL),
|
||||||
|
SSLOpts = hocon_schema:get_value("ssl_opts", Conf),
|
||||||
|
case {Scheme, SSLOpts} of
|
||||||
|
{http, undefined} -> true;
|
||||||
|
{http, _} -> false;
|
||||||
|
{https, undefined} -> false;
|
||||||
|
{https, _} -> true
|
||||||
|
end.
|
||||||
|
|
||||||
|
update_path(BasePath, {Path, Headers}) ->
|
||||||
|
{filename:join(BasePath, Path), Headers};
|
||||||
|
update_path(BasePath, {Path, Headers, Body}) ->
|
||||||
|
{filename:join(BasePath, Path), Headers, Body}.
|
|
@ -43,7 +43,7 @@
|
||||||
|
|
||||||
{deps,
|
{deps,
|
||||||
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
||||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.6"}}}
|
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.7"}}}
|
||||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
|
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
|
||||||
|
|
Loading…
Reference in New Issue