fix(webhook): the 'max_retries' opt means request retry times
This commit is contained in:
parent
ccc734c6ee
commit
23eeb21b13
|
@ -23,6 +23,7 @@
|
||||||
**‼️ Note** : The previous API only returns array: `[RuleObj1,RuleObj2]`, after updating, it will become
|
**‼️ Note** : The previous API only returns array: `[RuleObj1,RuleObj2]`, after updating, it will become
|
||||||
`{"data": [RuleObj1,RuleObj2], "meta":{"count":2, "limit":100, "page":1}`,
|
`{"data": [RuleObj1,RuleObj2], "meta":{"count":2, "limit":100, "page":1}`,
|
||||||
which will carry the paging meta information.
|
which will carry the paging meta information.
|
||||||
|
* Fix the issue that webhook leaks TCP connections. [ehttpc#34](https://github.com/emqx/ehttpc/pull/34), [#8580](https://github.com/emqx/emqx/pull/8580)
|
||||||
|
|
||||||
## Enhancements
|
## Enhancements
|
||||||
|
|
||||||
|
|
|
@ -127,6 +127,17 @@ HTTP 请求的正文。</br>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
config_max_retries {
|
||||||
|
desc {
|
||||||
|
en: """HTTP request max retry times if failed."""
|
||||||
|
zh: """HTTP 请求失败最大重试次数"""
|
||||||
|
}
|
||||||
|
label: {
|
||||||
|
en: "HTTP Request Max Retries"
|
||||||
|
zh: "HTTP 请求重试次数"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
desc_type {
|
desc_type {
|
||||||
desc {
|
desc {
|
||||||
en: """The Bridge Type"""
|
en: """The Bridge Type"""
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_bridge, [
|
{application, emqx_bridge, [
|
||||||
{description, "An OTP application"},
|
{description, "An OTP application"},
|
||||||
{vsn, "0.1.0"},
|
{vsn, "0.1.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_bridge_app, []}},
|
{mod, {emqx_bridge_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -225,7 +225,6 @@ info_example_basic(webhook, _) ->
|
||||||
request_timeout => <<"15s">>,
|
request_timeout => <<"15s">>,
|
||||||
connect_timeout => <<"15s">>,
|
connect_timeout => <<"15s">>,
|
||||||
max_retries => 3,
|
max_retries => 3,
|
||||||
retry_interval => <<"10s">>,
|
|
||||||
pool_type => <<"random">>,
|
pool_type => <<"random">>,
|
||||||
pool_size => 4,
|
pool_size => 4,
|
||||||
enable_pipelining => 100,
|
enable_pipelining => 100,
|
||||||
|
|
|
@ -238,7 +238,8 @@ parse_confs(
|
||||||
method := Method,
|
method := Method,
|
||||||
body := Body,
|
body := Body,
|
||||||
headers := Headers,
|
headers := Headers,
|
||||||
request_timeout := ReqTimeout
|
request_timeout := ReqTimeout,
|
||||||
|
max_retries := Retry
|
||||||
} = Conf
|
} = Conf
|
||||||
) ->
|
) ->
|
||||||
{BaseUrl, Path} = parse_url(Url),
|
{BaseUrl, Path} = parse_url(Url),
|
||||||
|
@ -251,7 +252,8 @@ parse_confs(
|
||||||
method => Method,
|
method => Method,
|
||||||
body => Body,
|
body => Body,
|
||||||
headers => Headers,
|
headers => Headers,
|
||||||
request_timeout => ReqTimeout
|
request_timeout => ReqTimeout,
|
||||||
|
max_retries => Retry
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when
|
parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when
|
||||||
|
|
|
@ -14,60 +14,7 @@ namespace() -> "bridge".
|
||||||
roots() -> [].
|
roots() -> [].
|
||||||
|
|
||||||
fields("config") ->
|
fields("config") ->
|
||||||
basic_config() ++
|
basic_config() ++ request_config();
|
||||||
[
|
|
||||||
{url,
|
|
||||||
mk(
|
|
||||||
binary(),
|
|
||||||
#{
|
|
||||||
required => true,
|
|
||||||
desc => ?DESC("config_url")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{local_topic,
|
|
||||||
mk(
|
|
||||||
binary(),
|
|
||||||
#{desc => ?DESC("config_local_topic")}
|
|
||||||
)},
|
|
||||||
{method,
|
|
||||||
mk(
|
|
||||||
method(),
|
|
||||||
#{
|
|
||||||
default => post,
|
|
||||||
desc => ?DESC("config_method")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{headers,
|
|
||||||
mk(
|
|
||||||
map(),
|
|
||||||
#{
|
|
||||||
default => #{
|
|
||||||
<<"accept">> => <<"application/json">>,
|
|
||||||
<<"cache-control">> => <<"no-cache">>,
|
|
||||||
<<"connection">> => <<"keep-alive">>,
|
|
||||||
<<"content-type">> => <<"application/json">>,
|
|
||||||
<<"keep-alive">> => <<"timeout=5">>
|
|
||||||
},
|
|
||||||
desc => ?DESC("config_headers")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{body,
|
|
||||||
mk(
|
|
||||||
binary(),
|
|
||||||
#{
|
|
||||||
default => <<"${payload}">>,
|
|
||||||
desc => ?DESC("config_body")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{request_timeout,
|
|
||||||
mk(
|
|
||||||
emqx_schema:duration_ms(),
|
|
||||||
#{
|
|
||||||
default => <<"15s">>,
|
|
||||||
desc => ?DESC("config_request_timeout")
|
|
||||||
}
|
|
||||||
)}
|
|
||||||
];
|
|
||||||
fields("post") ->
|
fields("post") ->
|
||||||
[
|
[
|
||||||
type_field(),
|
type_field(),
|
||||||
|
@ -106,6 +53,69 @@ basic_config() ->
|
||||||
] ++
|
] ++
|
||||||
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
||||||
|
|
||||||
|
request_config() ->
|
||||||
|
[
|
||||||
|
{url,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{
|
||||||
|
required => true,
|
||||||
|
desc => ?DESC("config_url")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{local_topic,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{desc => ?DESC("config_local_topic")}
|
||||||
|
)},
|
||||||
|
{method,
|
||||||
|
mk(
|
||||||
|
method(),
|
||||||
|
#{
|
||||||
|
default => post,
|
||||||
|
desc => ?DESC("config_method")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{headers,
|
||||||
|
mk(
|
||||||
|
map(),
|
||||||
|
#{
|
||||||
|
default => #{
|
||||||
|
<<"accept">> => <<"application/json">>,
|
||||||
|
<<"cache-control">> => <<"no-cache">>,
|
||||||
|
<<"connection">> => <<"keep-alive">>,
|
||||||
|
<<"content-type">> => <<"application/json">>,
|
||||||
|
<<"keep-alive">> => <<"timeout=5">>
|
||||||
|
},
|
||||||
|
desc => ?DESC("config_headers")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{body,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{
|
||||||
|
default => <<"${payload}">>,
|
||||||
|
desc => ?DESC("config_body")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{max_retries,
|
||||||
|
mk(
|
||||||
|
non_neg_integer(),
|
||||||
|
#{
|
||||||
|
default => 2,
|
||||||
|
desc => ?DESC("config_max_retries")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{request_timeout,
|
||||||
|
mk(
|
||||||
|
emqx_schema:duration_ms(),
|
||||||
|
#{
|
||||||
|
default => <<"15s">>,
|
||||||
|
desc => ?DESC("config_request_timeout")
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
].
|
||||||
|
|
||||||
%%======================================================================================
|
%%======================================================================================
|
||||||
|
|
||||||
type_field() ->
|
type_field() ->
|
||||||
|
|
|
@ -41,17 +41,6 @@ base URL 只包含host和port。</br>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
retry_interval {
|
|
||||||
desc {
|
|
||||||
en: "Interval between retries."
|
|
||||||
zh: "重试之间的间隔时间。"
|
|
||||||
}
|
|
||||||
label: {
|
|
||||||
en: "Retry Interval"
|
|
||||||
zh: "重试间隔"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pool_type {
|
pool_type {
|
||||||
desc {
|
desc {
|
||||||
en: "The type of the pool. Can be one of `random`, `hash`."
|
en: "The type of the pool. Can be one of `random`, `hash`."
|
||||||
|
|
|
@ -88,22 +88,6 @@ fields(config) ->
|
||||||
desc => ?DESC("connect_timeout")
|
desc => ?DESC("connect_timeout")
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{max_retries,
|
|
||||||
sc(
|
|
||||||
non_neg_integer(),
|
|
||||||
#{
|
|
||||||
default => 5,
|
|
||||||
desc => ?DESC("max_retries")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{retry_interval,
|
|
||||||
sc(
|
|
||||||
emqx_schema:duration(),
|
|
||||||
#{
|
|
||||||
default => "1s",
|
|
||||||
desc => ?DESC("retry_interval")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{pool_type,
|
{pool_type,
|
||||||
sc(
|
sc(
|
||||||
pool_type(),
|
pool_type(),
|
||||||
|
@ -147,6 +131,14 @@ fields("request") ->
|
||||||
{path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})},
|
{path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})},
|
||||||
{body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})},
|
{body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})},
|
||||||
{headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})},
|
{headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})},
|
||||||
|
{max_retries,
|
||||||
|
sc(
|
||||||
|
non_neg_integer(),
|
||||||
|
#{
|
||||||
|
required => false,
|
||||||
|
desc => ?DESC("max_retries")
|
||||||
|
}
|
||||||
|
)},
|
||||||
{request_timeout,
|
{request_timeout,
|
||||||
sc(
|
sc(
|
||||||
emqx_schema:duration_ms(),
|
emqx_schema:duration_ms(),
|
||||||
|
@ -182,8 +174,6 @@ on_start(
|
||||||
path := BasePath
|
path := BasePath
|
||||||
},
|
},
|
||||||
connect_timeout := ConnectTimeout,
|
connect_timeout := ConnectTimeout,
|
||||||
max_retries := MaxRetries,
|
|
||||||
retry_interval := RetryInterval,
|
|
||||||
pool_type := PoolType,
|
pool_type := PoolType,
|
||||||
pool_size := PoolSize
|
pool_size := PoolSize
|
||||||
} = Config
|
} = Config
|
||||||
|
@ -206,8 +196,6 @@ on_start(
|
||||||
{host, Host},
|
{host, Host},
|
||||||
{port, Port},
|
{port, Port},
|
||||||
{connect_timeout, ConnectTimeout},
|
{connect_timeout, ConnectTimeout},
|
||||||
{retry, MaxRetries},
|
|
||||||
{retry_timeout, RetryInterval},
|
|
||||||
{keepalive, 30000},
|
{keepalive, 30000},
|
||||||
{pool_type, PoolType},
|
{pool_type, PoolType},
|
||||||
{pool_size, PoolSize},
|
{pool_size, PoolSize},
|
||||||
|
@ -247,17 +235,23 @@ on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
|
||||||
path := Path,
|
path := Path,
|
||||||
body := Body,
|
body := Body,
|
||||||
headers := Headers,
|
headers := Headers,
|
||||||
request_timeout := Timeout
|
request_timeout := Timeout,
|
||||||
|
max_retries := Retry
|
||||||
} = process_request(Request, Msg),
|
} = process_request(Request, Msg),
|
||||||
on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State)
|
on_query(
|
||||||
|
InstId,
|
||||||
|
{undefined, Method, {Path, Headers, Body}, Timeout, Retry},
|
||||||
|
AfterQuery,
|
||||||
|
State
|
||||||
|
)
|
||||||
end;
|
end;
|
||||||
on_query(InstId, {Method, Request}, AfterQuery, State) ->
|
on_query(InstId, {Method, Request}, AfterQuery, State) ->
|
||||||
on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State);
|
on_query(InstId, {undefined, Method, Request, 5000, 2}, AfterQuery, State);
|
||||||
on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
|
on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
|
||||||
on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State);
|
on_query(InstId, {undefined, Method, Request, Timeout, 2}, AfterQuery, State);
|
||||||
on_query(
|
on_query(
|
||||||
InstId,
|
InstId,
|
||||||
{KeyOrNum, Method, Request, Timeout},
|
{KeyOrNum, Method, Request, Timeout, Retry},
|
||||||
AfterQuery,
|
AfterQuery,
|
||||||
#{pool_name := PoolName, base_path := BasePath} = State
|
#{pool_name := PoolName, base_path := BasePath} = State
|
||||||
) ->
|
) ->
|
||||||
|
@ -275,7 +269,8 @@ on_query(
|
||||||
end,
|
end,
|
||||||
Method,
|
Method,
|
||||||
NRequest,
|
NRequest,
|
||||||
Timeout
|
Timeout,
|
||||||
|
Retry
|
||||||
)
|
)
|
||||||
of
|
of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -368,7 +363,8 @@ preprocess_request(
|
||||||
path => emqx_plugin_libs_rule:preproc_tmpl(Path),
|
path => emqx_plugin_libs_rule:preproc_tmpl(Path),
|
||||||
body => emqx_plugin_libs_rule:preproc_tmpl(Body),
|
body => emqx_plugin_libs_rule:preproc_tmpl(Body),
|
||||||
headers => preproc_headers(Headers),
|
headers => preproc_headers(Headers),
|
||||||
request_timeout => maps:get(request_timeout, Req, 30000)
|
request_timeout => maps:get(request_timeout, Req, 30000),
|
||||||
|
max_retries => maps:get(max_retries, Req, 2)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
preproc_headers(Headers) when is_map(Headers) ->
|
preproc_headers(Headers) when is_map(Headers) ->
|
||||||
|
|
2
mix.exs
2
mix.exs
|
@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
{:lc, github: "emqx/lc", tag: "0.3.1"},
|
{:lc, github: "emqx/lc", tag: "0.3.1"},
|
||||||
{:redbug, "2.0.7"},
|
{:redbug, "2.0.7"},
|
||||||
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
|
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
|
||||||
{:ehttpc, github: "emqx/ehttpc", tag: "0.2.1"},
|
{:ehttpc, github: "emqx/ehttpc", tag: "0.3.0"},
|
||||||
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
|
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
|
||||||
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
||||||
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
||||||
|
|
|
@ -49,7 +49,7 @@
|
||||||
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
||||||
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
|
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
|
||||||
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
|
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
|
||||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.1"}}}
|
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.3.0"}}}
|
||||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
## create a webhook data bridge with id "my_webhook"
|
## create a webhook data bridge with id "my_webhook"
|
||||||
!curl --user admin:public --silent --show-error 'http://localhost:18083/api/v5/bridges' -X 'POST' -H 'Content-Type: application/json' --data-binary '{"name":"my_webhook","body":"","method":"post","url":"http://webhook.emqx.io:7077/counter","headers":{"content-type":"application/json"},"pool_size":4,"enable_pipelining":100,"connect_timeout":"5s","request_timeout":"5s","max_retries":3,"type":"webhook","ssl":{"enable":false,"verify":"verify_none"}}' | jq '.status'
|
!curl --user admin:public --silent --show-error 'http://localhost:18083/api/v5/bridges' -X 'POST' -H 'Content-Type: application/json' --data-binary '{"name":"my_webhook","body":"","method":"post","url":"http://webhook.emqx.io:7077/counter","headers":{"content-type":"application/json"},"pool_size":4,"enable_pipelining":100,"connect_timeout":"5s","type":"webhook","ssl":{"enable":false,"verify":"verify_none"}}' | jq '.status'
|
||||||
?connected
|
?connected
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue