diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 0ea42c0fb..cad599436 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -29,7 +29,7 @@ {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.9"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, - {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.0"}}}, + {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.3"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}} diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl index ff0bfaea7..fa0658f6a 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl @@ -112,8 +112,7 @@ t_update_with_invalid_config(_Config) -> #{ kind := validation_error, path := "authentication.server", - reason := required_field, - value := undefined + reason := required_field } ]} }}}, diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl index c07d920ad..b480e0262 100644 --- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl @@ -188,7 +188,7 @@ t_create_invalid_config(_Config) -> ?assertMatch( {error, #{ kind := validation_error, - path := "authorization.sources.1" + path := "authorization.sources.1.server" }}, emqx_authz:update(?CMD_REPLACE, [C]) ). diff --git a/apps/emqx_connector/i18n/emqx_connector_http.conf b/apps/emqx_connector/i18n/emqx_connector_http.conf index 7583a38ed..da886191b 100644 --- a/apps/emqx_connector/i18n/emqx_connector_http.conf +++ b/apps/emqx_connector/i18n/emqx_connector_http.conf @@ -4,12 +4,12 @@ emqx_connector_http { en: """ The base URL is the URL includes only the scheme, host and port.
When send an HTTP request, the real URL to be used is the concatenation of the base URL and the -path parameter (passed by the emqx_resource:query/2,3 or provided by the request parameter).
+path parameter
For example: `http://localhost:9901/` """ zh: """ base URL 只包含host和port。
-发送HTTP请求时,真实的URL是由base URL 和 path parameter连接而成(通过emqx_resource:query/2,3传递,或者通过请求参数提供)。
+发送HTTP请求时,真实的URL是由base URL 和 path parameter连接而成。
示例:`http://localhost:9901/` """ } @@ -76,14 +76,8 @@ base URL 只包含host和port。
request { desc { - en: """ -If the request is provided, the caller can send HTTP requests via -emqx_resource:query(ResourceId, {send_message, BridgeId, Message}) -""" - zh: """ -如果提供了请求,调用者可以通过以下方式发送 HTTP 请求 -emqx_resource:query(ResourceId, {send_message, BridgeId, Message}) -""" + en: """Configure HTTP request parameters.""" + zh: """设置 HTTP 请求的参数。""" } label: { en: "Request" diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 7f84c665a..bd129f1a8 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -234,6 +234,7 @@ on_start( PoolName = emqx_plugin_libs_pool:pool_name(InstId), State = #{ pool_name => PoolName, + pool_type => PoolType, host => Host, port => Port, connect_timeout => ConnectTimeout, @@ -264,9 +265,10 @@ on_query(InstId, {send_message, Msg}, State) -> path := Path, body := Body, headers := Headers, - request_timeout := Timeout, - max_retries := Retry + request_timeout := Timeout } = process_request(Request, Msg), + %% bridge buffer worker has retry, do not let ehttpc retry + Retry = 0, on_query( InstId, {undefined, Method, {Path, Headers, Body}, Timeout, Retry}, @@ -274,13 +276,15 @@ on_query(InstId, {send_message, Msg}, State) -> ) end; on_query(InstId, {Method, Request}, State) -> - on_query(InstId, {undefined, Method, Request, 5000, 2}, State); + %% TODO: Get retry from State + on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State); on_query(InstId, {Method, Request, Timeout}, State) -> - on_query(InstId, {undefined, Method, Request, Timeout, 2}, State); + %% TODO: Get retry from State + on_query(InstId, {undefined, Method, Request, Timeout, _Retry = 2}, State); on_query( InstId, {KeyOrNum, Method, Request, Timeout, Retry}, - #{pool_name := PoolName, base_path := BasePath} = State + #{base_path := BasePath} = State ) -> ?TRACE( "QUERY", @@ -288,12 +292,10 @@ on_query( #{request => Request, connector => InstId, state => State} ), NRequest = formalize_request(Method, BasePath, Request), + Worker = resolve_pool_worker(State, KeyOrNum), case ehttpc:request( - case KeyOrNum of - undefined -> PoolName; - _ -> {PoolName, KeyOrNum} - end, + Worker, Method, NRequest, Timeout, @@ -361,19 +363,15 @@ on_query_async( InstId, {KeyOrNum, Method, Request, Timeout}, ReplyFunAndArgs, - #{pool_name := PoolName, base_path := BasePath} = State + #{base_path := BasePath} = State ) -> + Worker = resolve_pool_worker(State, KeyOrNum), ?TRACE( "QUERY_ASYNC", "http_connector_received", #{request => Request, connector => InstId, state => State} ), NRequest = formalize_request(Method, BasePath, Request), - Worker = - case KeyOrNum of - undefined -> ehttpc_pool:pick_worker(PoolName); - _ -> ehttpc_pool:pick_worker(PoolName, KeyOrNum) - end, ok = ehttpc:request_async( Worker, Method, @@ -383,6 +381,16 @@ on_query_async( ), {ok, Worker}. +resolve_pool_worker(State, undefined) -> + resolve_pool_worker(State, self()); +resolve_pool_worker(#{pool_name := PoolName} = State, Key) -> + case maps:get(pool_type, State, random) of + random -> + ehttpc_pool:pick_worker(PoolName); + hash -> + ehttpc_pool:pick_worker(PoolName, Key) + end. + on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> case do_get_status(PoolName, Timeout) of ok -> diff --git a/mix.exs b/mix.exs index caa4b15b0..8e271390f 100644 --- a/mix.exs +++ b/mix.exs @@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do {:lc, github: "emqx/lc", tag: "0.3.2", override: true}, {:redbug, "2.0.8"}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.4.5", override: true}, + {:ehttpc, github: "emqx/ehttpc", tag: "0.4.6", override: true}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, @@ -68,7 +68,7 @@ defmodule EMQXUmbrella.MixProject do # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true}, - {:hocon, github: "emqx/hocon", tag: "0.35.0", override: true}, + {:hocon, github: "emqx/hocon", tag: "0.35.3", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, diff --git a/rebar.config b/rebar.config index 71a54a03d..4a35641c4 100644 --- a/rebar.config +++ b/rebar.config @@ -49,7 +49,7 @@ , {gpb, "4.19.5"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}} - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.5"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.6"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} @@ -68,7 +68,7 @@ , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.0"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.3"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}