Merge pull request #9878 from zmstone/0201-upgrade-ehttpc-0.4.6

fix(ehttpc): fix crash when webhook bridge uses `hash` pool type.
This commit is contained in:
Zaiming (Stone) Shi 2023-02-01 15:00:28 +01:00 committed by GitHub
commit 32d18ec594
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 34 additions and 33 deletions

View File

@ -29,7 +29,7 @@
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.9"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.9"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.0"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.3"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}} {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}}

View File

@ -112,8 +112,7 @@ t_update_with_invalid_config(_Config) ->
#{ #{
kind := validation_error, kind := validation_error,
path := "authentication.server", path := "authentication.server",
reason := required_field, reason := required_field
value := undefined
} }
]} ]}
}}}, }}},

View File

@ -188,7 +188,7 @@ t_create_invalid_config(_Config) ->
?assertMatch( ?assertMatch(
{error, #{ {error, #{
kind := validation_error, kind := validation_error,
path := "authorization.sources.1" path := "authorization.sources.1.server"
}}, }},
emqx_authz:update(?CMD_REPLACE, [C]) emqx_authz:update(?CMD_REPLACE, [C])
). ).

View File

@ -4,12 +4,12 @@ emqx_connector_http {
en: """ en: """
The base URL is the URL includes only the scheme, host and port.<br/> The base URL is the URL includes only the scheme, host and port.<br/>
When send an HTTP request, the real URL to be used is the concatenation of the base URL and the When send an HTTP request, the real URL to be used is the concatenation of the base URL and the
path parameter (passed by the emqx_resource:query/2,3 or provided by the request parameter).<br/> path parameter<br/>
For example: `http://localhost:9901/` For example: `http://localhost:9901/`
""" """
zh: """ zh: """
base URL 只包含host和port。<br/> base URL 只包含host和port。<br/>
发送HTTP请求时真实的URL是由base URL 和 path parameter连接而成通过emqx_resource:query/2,3传递或者通过请求参数提供。<br/> 发送HTTP请求时真实的URL是由base URL 和 path parameter连接而成。<br/>
示例:`http://localhost:9901/` 示例:`http://localhost:9901/`
""" """
} }
@ -76,14 +76,8 @@ base URL 只包含host和port。<br/>
request { request {
desc { desc {
en: """ en: """Configure HTTP request parameters."""
If the request is provided, the caller can send HTTP requests via zh: """设置 HTTP 请求的参数。"""
<code>emqx_resource:query(ResourceId, {send_message, BridgeId, Message})</code>
"""
zh: """
如果提供了请求,调用者可以通过以下方式发送 HTTP 请求
<code>emqx_resource:query(ResourceId, {send_message, BridgeId, Message})</code>
"""
} }
label: { label: {
en: "Request" en: "Request"

View File

@ -234,6 +234,7 @@ on_start(
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
State = #{ State = #{
pool_name => PoolName, pool_name => PoolName,
pool_type => PoolType,
host => Host, host => Host,
port => Port, port => Port,
connect_timeout => ConnectTimeout, connect_timeout => ConnectTimeout,
@ -264,9 +265,10 @@ on_query(InstId, {send_message, Msg}, State) ->
path := Path, path := Path,
body := Body, body := Body,
headers := Headers, headers := Headers,
request_timeout := Timeout, request_timeout := Timeout
max_retries := Retry
} = process_request(Request, Msg), } = process_request(Request, Msg),
%% bridge buffer worker has retry, do not let ehttpc retry
Retry = 0,
on_query( on_query(
InstId, InstId,
{undefined, Method, {Path, Headers, Body}, Timeout, Retry}, {undefined, Method, {Path, Headers, Body}, Timeout, Retry},
@ -274,13 +276,15 @@ on_query(InstId, {send_message, Msg}, State) ->
) )
end; end;
on_query(InstId, {Method, Request}, State) -> on_query(InstId, {Method, Request}, State) ->
on_query(InstId, {undefined, Method, Request, 5000, 2}, State); %% TODO: Get retry from State
on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State);
on_query(InstId, {Method, Request, Timeout}, State) -> on_query(InstId, {Method, Request, Timeout}, State) ->
on_query(InstId, {undefined, Method, Request, Timeout, 2}, State); %% TODO: Get retry from State
on_query(InstId, {undefined, Method, Request, Timeout, _Retry = 2}, State);
on_query( on_query(
InstId, InstId,
{KeyOrNum, Method, Request, Timeout, Retry}, {KeyOrNum, Method, Request, Timeout, Retry},
#{pool_name := PoolName, base_path := BasePath} = State #{base_path := BasePath} = State
) -> ) ->
?TRACE( ?TRACE(
"QUERY", "QUERY",
@ -288,12 +292,10 @@ on_query(
#{request => Request, connector => InstId, state => State} #{request => Request, connector => InstId, state => State}
), ),
NRequest = formalize_request(Method, BasePath, Request), NRequest = formalize_request(Method, BasePath, Request),
Worker = resolve_pool_worker(State, KeyOrNum),
case case
ehttpc:request( ehttpc:request(
case KeyOrNum of Worker,
undefined -> PoolName;
_ -> {PoolName, KeyOrNum}
end,
Method, Method,
NRequest, NRequest,
Timeout, Timeout,
@ -361,19 +363,15 @@ on_query_async(
InstId, InstId,
{KeyOrNum, Method, Request, Timeout}, {KeyOrNum, Method, Request, Timeout},
ReplyFunAndArgs, ReplyFunAndArgs,
#{pool_name := PoolName, base_path := BasePath} = State #{base_path := BasePath} = State
) -> ) ->
Worker = resolve_pool_worker(State, KeyOrNum),
?TRACE( ?TRACE(
"QUERY_ASYNC", "QUERY_ASYNC",
"http_connector_received", "http_connector_received",
#{request => Request, connector => InstId, state => State} #{request => Request, connector => InstId, state => State}
), ),
NRequest = formalize_request(Method, BasePath, Request), NRequest = formalize_request(Method, BasePath, Request),
Worker =
case KeyOrNum of
undefined -> ehttpc_pool:pick_worker(PoolName);
_ -> ehttpc_pool:pick_worker(PoolName, KeyOrNum)
end,
ok = ehttpc:request_async( ok = ehttpc:request_async(
Worker, Worker,
Method, Method,
@ -383,6 +381,16 @@ on_query_async(
), ),
{ok, Worker}. {ok, Worker}.
resolve_pool_worker(State, undefined) ->
resolve_pool_worker(State, self());
resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
case maps:get(pool_type, State, random) of
random ->
ehttpc_pool:pick_worker(PoolName);
hash ->
ehttpc_pool:pick_worker(PoolName, Key)
end.
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of case do_get_status(PoolName, Timeout) of
ok -> ok ->

View File

@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do
{:lc, github: "emqx/lc", tag: "0.3.2", override: true}, {:lc, github: "emqx/lc", tag: "0.3.2", override: true},
{:redbug, "2.0.8"}, {:redbug, "2.0.8"},
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
{:ehttpc, github: "emqx/ehttpc", tag: "0.4.5", override: true}, {:ehttpc, github: "emqx/ehttpc", tag: "0.4.6", override: true},
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
@ -68,7 +68,7 @@ defmodule EMQXUmbrella.MixProject do
# in conflict by emqtt and hocon # in conflict by emqtt and hocon
{:getopt, "1.0.2", override: true}, {:getopt, "1.0.2", override: true},
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true},
{:hocon, github: "emqx/hocon", tag: "0.35.0", override: true}, {:hocon, github: "emqx/hocon", tag: "0.35.3", override: true},
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
{:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:esasl, github: "emqx/esasl", tag: "0.2.0"},
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},

View File

@ -49,7 +49,7 @@
, {gpb, "4.19.5"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {gpb, "4.19.5"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.5"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.6"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
@ -68,7 +68,7 @@
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
, {getopt, "1.0.2"} , {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.3"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}