diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index ae1184f81..e305bf854 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -23,6 +23,7 @@ **‼️ Note** : The previous API only returns array: `[RuleObj1,RuleObj2]`, after updating, it will become `{"data": [RuleObj1,RuleObj2], "meta":{"count":2, "limit":100, "page":1}`, which will carry the paging meta information. +* Fix the issue that webhook leaks TCP connections. [ehttpc#34](https://github.com/emqx/ehttpc/pull/34), [#8580](https://github.com/emqx/emqx/pull/8580) ## Enhancements diff --git a/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf index cd2cafd78..fcc817bef 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf @@ -127,6 +127,17 @@ HTTP 请求的正文。
} } + config_max_retries { + desc { + en: """HTTP request max retry times if failed.""" + zh: """HTTP 请求失败最大重试次数""" + } + label: { + en: "HTTP Request Max Retries" + zh: "HTTP 请求重试次数" + } + } + desc_type { desc { en: """The Bridge Type""" diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 70550efe4..fe19ed066 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "An OTP application"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index bc9b6c5a2..37a42ab3d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -225,7 +225,6 @@ info_example_basic(webhook, _) -> request_timeout => <<"15s">>, connect_timeout => <<"15s">>, max_retries => 3, - retry_interval => <<"10s">>, pool_type => <<"random">>, pool_size => 4, enable_pipelining => 100, diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 678aa1f10..d19cc8426 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -238,7 +238,8 @@ parse_confs( method := Method, body := Body, headers := Headers, - request_timeout := ReqTimeout + request_timeout := ReqTimeout, + max_retries := Retry } = Conf ) -> {BaseUrl, Path} = parse_url(Url), @@ -251,7 +252,8 @@ parse_confs( method => Method, body => Body, headers => Headers, - request_timeout => ReqTimeout + request_timeout => ReqTimeout, + max_retries => Retry } }; parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when diff --git a/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl index 972ba86bc..f11247d68 100644 --- a/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl @@ -14,60 +14,7 @@ namespace() -> "bridge". roots() -> []. fields("config") -> - basic_config() ++ - [ - {url, - mk( - binary(), - #{ - required => true, - desc => ?DESC("config_url") - } - )}, - {local_topic, - mk( - binary(), - #{desc => ?DESC("config_local_topic")} - )}, - {method, - mk( - method(), - #{ - default => post, - desc => ?DESC("config_method") - } - )}, - {headers, - mk( - map(), - #{ - default => #{ - <<"accept">> => <<"application/json">>, - <<"cache-control">> => <<"no-cache">>, - <<"connection">> => <<"keep-alive">>, - <<"content-type">> => <<"application/json">>, - <<"keep-alive">> => <<"timeout=5">> - }, - desc => ?DESC("config_headers") - } - )}, - {body, - mk( - binary(), - #{ - default => <<"${payload}">>, - desc => ?DESC("config_body") - } - )}, - {request_timeout, - mk( - emqx_schema:duration_ms(), - #{ - default => <<"15s">>, - desc => ?DESC("config_request_timeout") - } - )} - ]; + basic_config() ++ request_config(); fields("post") -> [ type_field(), @@ -106,6 +53,69 @@ basic_config() -> ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)). +request_config() -> + [ + {url, + mk( + binary(), + #{ + required => true, + desc => ?DESC("config_url") + } + )}, + {local_topic, + mk( + binary(), + #{desc => ?DESC("config_local_topic")} + )}, + {method, + mk( + method(), + #{ + default => post, + desc => ?DESC("config_method") + } + )}, + {headers, + mk( + map(), + #{ + default => #{ + <<"accept">> => <<"application/json">>, + <<"cache-control">> => <<"no-cache">>, + <<"connection">> => <<"keep-alive">>, + <<"content-type">> => <<"application/json">>, + <<"keep-alive">> => <<"timeout=5">> + }, + desc => ?DESC("config_headers") + } + )}, + {body, + mk( + binary(), + #{ + default => <<"${payload}">>, + desc => ?DESC("config_body") + } + )}, + {max_retries, + mk( + non_neg_integer(), + #{ + default => 2, + desc => ?DESC("config_max_retries") + } + )}, + {request_timeout, + mk( + emqx_schema:duration_ms(), + #{ + default => <<"15s">>, + desc => ?DESC("config_request_timeout") + } + )} + ]. + %%====================================================================================== type_field() -> diff --git a/apps/emqx_connector/i18n/emqx_connector_http.conf b/apps/emqx_connector/i18n/emqx_connector_http.conf index 0e29a15d3..8664d324f 100644 --- a/apps/emqx_connector/i18n/emqx_connector_http.conf +++ b/apps/emqx_connector/i18n/emqx_connector_http.conf @@ -41,17 +41,6 @@ base URL 只包含host和port。
} } - retry_interval { - desc { - en: "Interval between retries." - zh: "重试之间的间隔时间。" - } - label: { - en: "Retry Interval" - zh: "重试间隔" - } - } - pool_type { desc { en: "The type of the pool. Can be one of `random`, `hash`." diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index f9e63dc57..59b4ddffa 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -88,22 +88,6 @@ fields(config) -> desc => ?DESC("connect_timeout") } )}, - {max_retries, - sc( - non_neg_integer(), - #{ - default => 5, - desc => ?DESC("max_retries") - } - )}, - {retry_interval, - sc( - emqx_schema:duration(), - #{ - default => "1s", - desc => ?DESC("retry_interval") - } - )}, {pool_type, sc( pool_type(), @@ -147,6 +131,14 @@ fields("request") -> {path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})}, {body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})}, {headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})}, + {max_retries, + sc( + non_neg_integer(), + #{ + required => false, + desc => ?DESC("max_retries") + } + )}, {request_timeout, sc( emqx_schema:duration_ms(), @@ -182,8 +174,6 @@ on_start( path := BasePath }, connect_timeout := ConnectTimeout, - max_retries := MaxRetries, - retry_interval := RetryInterval, pool_type := PoolType, pool_size := PoolSize } = Config @@ -206,8 +196,6 @@ on_start( {host, Host}, {port, Port}, {connect_timeout, ConnectTimeout}, - {retry, MaxRetries}, - {retry_timeout, RetryInterval}, {keepalive, 30000}, {pool_type, PoolType}, {pool_size, PoolSize}, @@ -247,17 +235,23 @@ on_query(InstId, {send_message, Msg}, AfterQuery, State) -> path := Path, body := Body, headers := Headers, - request_timeout := Timeout + request_timeout := Timeout, + max_retries := Retry } = process_request(Request, Msg), - on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State) + on_query( + InstId, + {undefined, Method, {Path, Headers, Body}, Timeout, Retry}, + AfterQuery, + State + ) end; on_query(InstId, {Method, Request}, AfterQuery, State) -> - on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State); + on_query(InstId, {undefined, Method, Request, 5000, 2}, AfterQuery, State); on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) -> - on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State); + on_query(InstId, {undefined, Method, Request, Timeout, 2}, AfterQuery, State); on_query( InstId, - {KeyOrNum, Method, Request, Timeout}, + {KeyOrNum, Method, Request, Timeout, Retry}, AfterQuery, #{pool_name := PoolName, base_path := BasePath} = State ) -> @@ -275,7 +269,8 @@ on_query( end, Method, NRequest, - Timeout + Timeout, + Retry ) of {error, Reason} -> @@ -368,7 +363,8 @@ preprocess_request( path => emqx_plugin_libs_rule:preproc_tmpl(Path), body => emqx_plugin_libs_rule:preproc_tmpl(Body), headers => preproc_headers(Headers), - request_timeout => maps:get(request_timeout, Req, 30000) + request_timeout => maps:get(request_timeout, Req, 30000), + max_retries => maps:get(max_retries, Req, 2) }. preproc_headers(Headers) when is_map(Headers) -> diff --git a/mix.exs b/mix.exs index 714279cfd..98269bfde 100644 --- a/mix.exs +++ b/mix.exs @@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do {:lc, github: "emqx/lc", tag: "0.3.1"}, {:redbug, "2.0.7"}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.2.1"}, + {:ehttpc, github: "emqx/ehttpc", tag: "0.3.0"}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, diff --git a/rebar.config b/rebar.config index 58b4b079b..7acbf90dd 100644 --- a/rebar.config +++ b/rebar.config @@ -49,7 +49,7 @@ , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}} - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.1"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.3.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} diff --git a/scripts/relup-test/relup.lux b/scripts/relup-test/relup.lux index 8db8169f8..c05425e71 100644 --- a/scripts/relup-test/relup.lux +++ b/scripts/relup-test/relup.lux @@ -15,7 +15,7 @@ ?SH-PROMPT ## create a webhook data bridge with id "my_webhook" - !curl --user admin:public --silent --show-error 'http://localhost:18083/api/v5/bridges' -X 'POST' -H 'Content-Type: application/json' --data-binary '{"name":"my_webhook","body":"","method":"post","url":"http://webhook.emqx.io:7077/counter","headers":{"content-type":"application/json"},"pool_size":4,"enable_pipelining":100,"connect_timeout":"5s","request_timeout":"5s","max_retries":3,"type":"webhook","ssl":{"enable":false,"verify":"verify_none"}}' | jq '.status' + !curl --user admin:public --silent --show-error 'http://localhost:18083/api/v5/bridges' -X 'POST' -H 'Content-Type: application/json' --data-binary '{"name":"my_webhook","body":"","method":"post","url":"http://webhook.emqx.io:7077/counter","headers":{"content-type":"application/json"},"pool_size":4,"enable_pipelining":100,"connect_timeout":"5s","type":"webhook","ssl":{"enable":false,"verify":"verify_none"}}' | jq '.status' ?connected ?SH-PROMPT