Merge pull request #8589 from emqx/release-v5.0.4
Merge v5.0.4 into master branch
This commit is contained in:
commit
be679f8128
|
@ -23,6 +23,7 @@
|
||||||
**‼️ Note** : The previous API only returns array: `[RuleObj1,RuleObj2]`, after updating, it will become
|
**‼️ Note** : The previous API only returns array: `[RuleObj1,RuleObj2]`, after updating, it will become
|
||||||
`{"data": [RuleObj1,RuleObj2], "meta":{"count":2, "limit":100, "page":1}`,
|
`{"data": [RuleObj1,RuleObj2], "meta":{"count":2, "limit":100, "page":1}`,
|
||||||
which will carry the paging meta information.
|
which will carry the paging meta information.
|
||||||
|
* Fix the issue that webhook leaks TCP connections. [ehttpc#34](https://github.com/emqx/ehttpc/pull/34), [#8580](https://github.com/emqx/emqx/pull/8580)
|
||||||
|
|
||||||
## Enhancements
|
## Enhancements
|
||||||
|
|
||||||
|
@ -31,6 +32,8 @@
|
||||||
* Remove `/configs/listeners` API, use `/listeners/` instead. [#8485](https://github.com/emqx/emqx/pull/8485)
|
* Remove `/configs/listeners` API, use `/listeners/` instead. [#8485](https://github.com/emqx/emqx/pull/8485)
|
||||||
* Optimize performance of builtin database operations in processes with long message queue [#8439](https://github.com/emqx/emqx/pull/8439)
|
* Optimize performance of builtin database operations in processes with long message queue [#8439](https://github.com/emqx/emqx/pull/8439)
|
||||||
* Improve authentication tracing. [#8554](https://github.com/emqx/emqx/pull/8554)
|
* Improve authentication tracing. [#8554](https://github.com/emqx/emqx/pull/8554)
|
||||||
|
* Standardize the '/listeners' and `/gateway/<name>/listeners` API fields.
|
||||||
|
It will introduce some incompatible updates, see [#8571](https://github.com/emqx/emqx/pull/8571)
|
||||||
|
|
||||||
# 5.0.3
|
# 5.0.3
|
||||||
|
|
||||||
|
|
2
Makefile
2
Makefile
|
@ -7,7 +7,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-d
|
||||||
export EMQX_DEFAULT_RUNNER = debian:11-slim
|
export EMQX_DEFAULT_RUNNER = debian:11-slim
|
||||||
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
|
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
|
||||||
export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
|
export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
|
||||||
export EMQX_DASHBOARD_VERSION ?= v1.0.5-beta.1
|
export EMQX_DASHBOARD_VERSION ?= v1.0.5
|
||||||
export EMQX_REL_FORM ?= tgz
|
export EMQX_REL_FORM ?= tgz
|
||||||
export QUICER_DOWNLOAD_FROM_RELEASE = 1
|
export QUICER_DOWNLOAD_FROM_RELEASE = 1
|
||||||
ifeq ($(OS),Windows_NT)
|
ifeq ($(OS),Windows_NT)
|
||||||
|
|
|
@ -54,7 +54,7 @@
|
||||||
|
|
||||||
-export([pre_config_update/3, post_config_update/5]).
|
-export([pre_config_update/3, post_config_update/5]).
|
||||||
|
|
||||||
-export([format_addr/1]).
|
-export([format_bind/1]).
|
||||||
|
|
||||||
-define(CONF_KEY_PATH, [listeners, '?', '?']).
|
-define(CONF_KEY_PATH, [listeners, '?', '?']).
|
||||||
-define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
|
-define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
|
||||||
|
@ -201,14 +201,14 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
||||||
?tp(listener_started, #{type => Type, bind => Bind}),
|
?tp(listener_started, #{type => Type, bind => Bind}),
|
||||||
console_print(
|
console_print(
|
||||||
"Listener ~ts on ~ts started.~n",
|
"Listener ~ts on ~ts started.~n",
|
||||||
[listener_id(Type, ListenerName), format_addr(Bind)]
|
[listener_id(Type, ListenerName), format_bind(Bind)]
|
||||||
),
|
),
|
||||||
ok;
|
ok;
|
||||||
{error, {already_started, Pid}} ->
|
{error, {already_started, Pid}} ->
|
||||||
{error, {already_started, Pid}};
|
{error, {already_started, Pid}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
ListenerId = listener_id(Type, ListenerName),
|
ListenerId = listener_id(Type, ListenerName),
|
||||||
BindStr = format_addr(Bind),
|
BindStr = format_bind(Bind),
|
||||||
?ELOG(
|
?ELOG(
|
||||||
"Failed to start listener ~ts on ~ts: ~0p.~n",
|
"Failed to start listener ~ts on ~ts: ~0p.~n",
|
||||||
[ListenerId, BindStr, Reason]
|
[ListenerId, BindStr, Reason]
|
||||||
|
@ -261,19 +261,19 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
||||||
ok ->
|
ok ->
|
||||||
console_print(
|
console_print(
|
||||||
"Listener ~ts on ~ts stopped.~n",
|
"Listener ~ts on ~ts stopped.~n",
|
||||||
[listener_id(Type, ListenerName), format_addr(Bind)]
|
[listener_id(Type, ListenerName), format_bind(Bind)]
|
||||||
),
|
),
|
||||||
ok;
|
ok;
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
?ELOG(
|
?ELOG(
|
||||||
"Failed to stop listener ~ts on ~ts: ~0p~n",
|
"Failed to stop listener ~ts on ~ts: ~0p~n",
|
||||||
[listener_id(Type, ListenerName), format_addr(Bind), already_stopped]
|
[listener_id(Type, ListenerName), format_bind(Bind), already_stopped]
|
||||||
),
|
),
|
||||||
ok;
|
ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?ELOG(
|
?ELOG(
|
||||||
"Failed to stop listener ~ts on ~ts: ~0p~n",
|
"Failed to stop listener ~ts on ~ts: ~0p~n",
|
||||||
[listener_id(Type, ListenerName), format_addr(Bind), Reason]
|
[listener_id(Type, ListenerName), format_bind(Bind), Reason]
|
||||||
),
|
),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
@ -492,17 +492,32 @@ merge_default(Options) ->
|
||||||
[{tcp_options, ?MQTT_SOCKOPTS} | Options]
|
[{tcp_options, ?MQTT_SOCKOPTS} | Options]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
format_addr(Port) when is_integer(Port) ->
|
-spec format_bind(
|
||||||
io_lib:format("~w", [Port]);
|
integer() | {tuple(), integer()} | string() | binary()
|
||||||
|
) -> io_lib:chars().
|
||||||
|
format_bind(Port) when is_integer(Port) ->
|
||||||
|
io_lib:format(":~w", [Port]);
|
||||||
%% Print only the port number when bound on all interfaces
|
%% Print only the port number when bound on all interfaces
|
||||||
format_addr({{0, 0, 0, 0}, Port}) ->
|
format_bind({{0, 0, 0, 0}, Port}) ->
|
||||||
format_addr(Port);
|
format_bind(Port);
|
||||||
format_addr({{0, 0, 0, 0, 0, 0, 0, 0}, Port}) ->
|
format_bind({{0, 0, 0, 0, 0, 0, 0, 0}, Port}) ->
|
||||||
format_addr(Port);
|
format_bind(Port);
|
||||||
format_addr({Addr, Port}) when is_list(Addr) ->
|
format_bind({Addr, Port}) when is_list(Addr) ->
|
||||||
io_lib:format("~ts:~w", [Addr, Port]);
|
io_lib:format("~ts:~w", [Addr, Port]);
|
||||||
format_addr({Addr, Port}) when is_tuple(Addr) ->
|
format_bind({Addr, Port}) when is_tuple(Addr), tuple_size(Addr) == 4 ->
|
||||||
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]).
|
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]);
|
||||||
|
format_bind({Addr, Port}) when is_tuple(Addr), tuple_size(Addr) == 8 ->
|
||||||
|
io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]);
|
||||||
|
%% Support string, binary type for Port or IP:Port
|
||||||
|
format_bind(Str) when is_list(Str) ->
|
||||||
|
case emqx_schema:to_ip_port(Str) of
|
||||||
|
{ok, {Ip, Port}} ->
|
||||||
|
format_bind({Ip, Port});
|
||||||
|
{error, _} ->
|
||||||
|
format_bind(list_to_integer(Str))
|
||||||
|
end;
|
||||||
|
format_bind(Bin) when is_binary(Bin) ->
|
||||||
|
format_bind(binary_to_list(Bin)).
|
||||||
|
|
||||||
listener_id(Type, ListenerName) ->
|
listener_id(Type, ListenerName) ->
|
||||||
list_to_atom(lists:append([str(Type), ":", str(ListenerName)])).
|
list_to_atom(lists:append([str(Type), ":", str(ListenerName)])).
|
||||||
|
|
|
@ -127,6 +127,17 @@ HTTP 请求的正文。</br>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
config_max_retries {
|
||||||
|
desc {
|
||||||
|
en: """HTTP request max retry times if failed."""
|
||||||
|
zh: """HTTP 请求失败最大重试次数"""
|
||||||
|
}
|
||||||
|
label: {
|
||||||
|
en: "HTTP Request Max Retries"
|
||||||
|
zh: "HTTP 请求重试次数"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
desc_type {
|
desc_type {
|
||||||
desc {
|
desc {
|
||||||
en: """The Bridge Type"""
|
en: """The Bridge Type"""
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_bridge, [
|
{application, emqx_bridge, [
|
||||||
{description, "An OTP application"},
|
{description, "An OTP application"},
|
||||||
{vsn, "0.1.0"},
|
{vsn, "0.1.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_bridge_app, []}},
|
{mod, {emqx_bridge_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -225,7 +225,6 @@ info_example_basic(webhook, _) ->
|
||||||
request_timeout => <<"15s">>,
|
request_timeout => <<"15s">>,
|
||||||
connect_timeout => <<"15s">>,
|
connect_timeout => <<"15s">>,
|
||||||
max_retries => 3,
|
max_retries => 3,
|
||||||
retry_interval => <<"10s">>,
|
|
||||||
pool_type => <<"random">>,
|
pool_type => <<"random">>,
|
||||||
pool_size => 4,
|
pool_size => 4,
|
||||||
enable_pipelining => 100,
|
enable_pipelining => 100,
|
||||||
|
|
|
@ -238,7 +238,8 @@ parse_confs(
|
||||||
method := Method,
|
method := Method,
|
||||||
body := Body,
|
body := Body,
|
||||||
headers := Headers,
|
headers := Headers,
|
||||||
request_timeout := ReqTimeout
|
request_timeout := ReqTimeout,
|
||||||
|
max_retries := Retry
|
||||||
} = Conf
|
} = Conf
|
||||||
) ->
|
) ->
|
||||||
{BaseUrl, Path} = parse_url(Url),
|
{BaseUrl, Path} = parse_url(Url),
|
||||||
|
@ -251,7 +252,8 @@ parse_confs(
|
||||||
method => Method,
|
method => Method,
|
||||||
body => Body,
|
body => Body,
|
||||||
headers => Headers,
|
headers => Headers,
|
||||||
request_timeout => ReqTimeout
|
request_timeout => ReqTimeout,
|
||||||
|
max_retries => Retry
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when
|
parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when
|
||||||
|
|
|
@ -14,60 +14,7 @@ namespace() -> "bridge".
|
||||||
roots() -> [].
|
roots() -> [].
|
||||||
|
|
||||||
fields("config") ->
|
fields("config") ->
|
||||||
basic_config() ++
|
basic_config() ++ request_config();
|
||||||
[
|
|
||||||
{url,
|
|
||||||
mk(
|
|
||||||
binary(),
|
|
||||||
#{
|
|
||||||
required => true,
|
|
||||||
desc => ?DESC("config_url")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{local_topic,
|
|
||||||
mk(
|
|
||||||
binary(),
|
|
||||||
#{desc => ?DESC("config_local_topic")}
|
|
||||||
)},
|
|
||||||
{method,
|
|
||||||
mk(
|
|
||||||
method(),
|
|
||||||
#{
|
|
||||||
default => post,
|
|
||||||
desc => ?DESC("config_method")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{headers,
|
|
||||||
mk(
|
|
||||||
map(),
|
|
||||||
#{
|
|
||||||
default => #{
|
|
||||||
<<"accept">> => <<"application/json">>,
|
|
||||||
<<"cache-control">> => <<"no-cache">>,
|
|
||||||
<<"connection">> => <<"keep-alive">>,
|
|
||||||
<<"content-type">> => <<"application/json">>,
|
|
||||||
<<"keep-alive">> => <<"timeout=5">>
|
|
||||||
},
|
|
||||||
desc => ?DESC("config_headers")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{body,
|
|
||||||
mk(
|
|
||||||
binary(),
|
|
||||||
#{
|
|
||||||
default => <<"${payload}">>,
|
|
||||||
desc => ?DESC("config_body")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{request_timeout,
|
|
||||||
mk(
|
|
||||||
emqx_schema:duration_ms(),
|
|
||||||
#{
|
|
||||||
default => <<"15s">>,
|
|
||||||
desc => ?DESC("config_request_timeout")
|
|
||||||
}
|
|
||||||
)}
|
|
||||||
];
|
|
||||||
fields("post") ->
|
fields("post") ->
|
||||||
[
|
[
|
||||||
type_field(),
|
type_field(),
|
||||||
|
@ -106,6 +53,69 @@ basic_config() ->
|
||||||
] ++
|
] ++
|
||||||
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
||||||
|
|
||||||
|
request_config() ->
|
||||||
|
[
|
||||||
|
{url,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{
|
||||||
|
required => true,
|
||||||
|
desc => ?DESC("config_url")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{local_topic,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{desc => ?DESC("config_local_topic")}
|
||||||
|
)},
|
||||||
|
{method,
|
||||||
|
mk(
|
||||||
|
method(),
|
||||||
|
#{
|
||||||
|
default => post,
|
||||||
|
desc => ?DESC("config_method")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{headers,
|
||||||
|
mk(
|
||||||
|
map(),
|
||||||
|
#{
|
||||||
|
default => #{
|
||||||
|
<<"accept">> => <<"application/json">>,
|
||||||
|
<<"cache-control">> => <<"no-cache">>,
|
||||||
|
<<"connection">> => <<"keep-alive">>,
|
||||||
|
<<"content-type">> => <<"application/json">>,
|
||||||
|
<<"keep-alive">> => <<"timeout=5">>
|
||||||
|
},
|
||||||
|
desc => ?DESC("config_headers")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{body,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{
|
||||||
|
default => <<"${payload}">>,
|
||||||
|
desc => ?DESC("config_body")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{max_retries,
|
||||||
|
mk(
|
||||||
|
non_neg_integer(),
|
||||||
|
#{
|
||||||
|
default => 2,
|
||||||
|
desc => ?DESC("config_max_retries")
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{request_timeout,
|
||||||
|
mk(
|
||||||
|
emqx_schema:duration_ms(),
|
||||||
|
#{
|
||||||
|
default => <<"15s">>,
|
||||||
|
desc => ?DESC("config_request_timeout")
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
].
|
||||||
|
|
||||||
%%======================================================================================
|
%%======================================================================================
|
||||||
|
|
||||||
type_field() ->
|
type_field() ->
|
||||||
|
|
|
@ -41,17 +41,6 @@ base URL 只包含host和port。</br>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
retry_interval {
|
|
||||||
desc {
|
|
||||||
en: "Interval between retries."
|
|
||||||
zh: "重试之间的间隔时间。"
|
|
||||||
}
|
|
||||||
label: {
|
|
||||||
en: "Retry Interval"
|
|
||||||
zh: "重试间隔"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pool_type {
|
pool_type {
|
||||||
desc {
|
desc {
|
||||||
en: "The type of the pool. Can be one of `random`, `hash`."
|
en: "The type of the pool. Can be one of `random`, `hash`."
|
||||||
|
|
|
@ -88,22 +88,6 @@ fields(config) ->
|
||||||
desc => ?DESC("connect_timeout")
|
desc => ?DESC("connect_timeout")
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{max_retries,
|
|
||||||
sc(
|
|
||||||
non_neg_integer(),
|
|
||||||
#{
|
|
||||||
default => 5,
|
|
||||||
desc => ?DESC("max_retries")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{retry_interval,
|
|
||||||
sc(
|
|
||||||
emqx_schema:duration(),
|
|
||||||
#{
|
|
||||||
default => "1s",
|
|
||||||
desc => ?DESC("retry_interval")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{pool_type,
|
{pool_type,
|
||||||
sc(
|
sc(
|
||||||
pool_type(),
|
pool_type(),
|
||||||
|
@ -147,6 +131,14 @@ fields("request") ->
|
||||||
{path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})},
|
{path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})},
|
||||||
{body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})},
|
{body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})},
|
||||||
{headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})},
|
{headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})},
|
||||||
|
{max_retries,
|
||||||
|
sc(
|
||||||
|
non_neg_integer(),
|
||||||
|
#{
|
||||||
|
required => false,
|
||||||
|
desc => ?DESC("max_retries")
|
||||||
|
}
|
||||||
|
)},
|
||||||
{request_timeout,
|
{request_timeout,
|
||||||
sc(
|
sc(
|
||||||
emqx_schema:duration_ms(),
|
emqx_schema:duration_ms(),
|
||||||
|
@ -182,8 +174,6 @@ on_start(
|
||||||
path := BasePath
|
path := BasePath
|
||||||
},
|
},
|
||||||
connect_timeout := ConnectTimeout,
|
connect_timeout := ConnectTimeout,
|
||||||
max_retries := MaxRetries,
|
|
||||||
retry_interval := RetryInterval,
|
|
||||||
pool_type := PoolType,
|
pool_type := PoolType,
|
||||||
pool_size := PoolSize
|
pool_size := PoolSize
|
||||||
} = Config
|
} = Config
|
||||||
|
@ -206,8 +196,6 @@ on_start(
|
||||||
{host, Host},
|
{host, Host},
|
||||||
{port, Port},
|
{port, Port},
|
||||||
{connect_timeout, ConnectTimeout},
|
{connect_timeout, ConnectTimeout},
|
||||||
{retry, MaxRetries},
|
|
||||||
{retry_timeout, RetryInterval},
|
|
||||||
{keepalive, 30000},
|
{keepalive, 30000},
|
||||||
{pool_type, PoolType},
|
{pool_type, PoolType},
|
||||||
{pool_size, PoolSize},
|
{pool_size, PoolSize},
|
||||||
|
@ -247,17 +235,23 @@ on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
|
||||||
path := Path,
|
path := Path,
|
||||||
body := Body,
|
body := Body,
|
||||||
headers := Headers,
|
headers := Headers,
|
||||||
request_timeout := Timeout
|
request_timeout := Timeout,
|
||||||
|
max_retries := Retry
|
||||||
} = process_request(Request, Msg),
|
} = process_request(Request, Msg),
|
||||||
on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State)
|
on_query(
|
||||||
|
InstId,
|
||||||
|
{undefined, Method, {Path, Headers, Body}, Timeout, Retry},
|
||||||
|
AfterQuery,
|
||||||
|
State
|
||||||
|
)
|
||||||
end;
|
end;
|
||||||
on_query(InstId, {Method, Request}, AfterQuery, State) ->
|
on_query(InstId, {Method, Request}, AfterQuery, State) ->
|
||||||
on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State);
|
on_query(InstId, {undefined, Method, Request, 5000, 2}, AfterQuery, State);
|
||||||
on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
|
on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
|
||||||
on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State);
|
on_query(InstId, {undefined, Method, Request, Timeout, 2}, AfterQuery, State);
|
||||||
on_query(
|
on_query(
|
||||||
InstId,
|
InstId,
|
||||||
{KeyOrNum, Method, Request, Timeout},
|
{KeyOrNum, Method, Request, Timeout, Retry},
|
||||||
AfterQuery,
|
AfterQuery,
|
||||||
#{pool_name := PoolName, base_path := BasePath} = State
|
#{pool_name := PoolName, base_path := BasePath} = State
|
||||||
) ->
|
) ->
|
||||||
|
@ -275,7 +269,8 @@ on_query(
|
||||||
end,
|
end,
|
||||||
Method,
|
Method,
|
||||||
NRequest,
|
NRequest,
|
||||||
Timeout
|
Timeout,
|
||||||
|
Retry
|
||||||
)
|
)
|
||||||
of
|
of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -368,7 +363,8 @@ preprocess_request(
|
||||||
path => emqx_plugin_libs_rule:preproc_tmpl(Path),
|
path => emqx_plugin_libs_rule:preproc_tmpl(Path),
|
||||||
body => emqx_plugin_libs_rule:preproc_tmpl(Body),
|
body => emqx_plugin_libs_rule:preproc_tmpl(Body),
|
||||||
headers => preproc_headers(Headers),
|
headers => preproc_headers(Headers),
|
||||||
request_timeout => maps:get(request_timeout, Req, 30000)
|
request_timeout => maps:get(request_timeout, Req, 30000),
|
||||||
|
max_retries => maps:get(max_retries, Req, 2)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
preproc_headers(Headers) when is_map(Headers) ->
|
preproc_headers(Headers) when is_map(Headers) ->
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
dashboard {
|
dashboard {
|
||||||
listeners.http {
|
listeners.http {
|
||||||
bind: 18083
|
bind = 18083
|
||||||
}
|
}
|
||||||
default_username: "admin"
|
default_username = "admin"
|
||||||
default_password: "public"
|
default_password = "public"
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,7 @@ start_listeners(Listeners) ->
|
||||||
case minirest:start(Name, RanchOptions, Minirest) of
|
case minirest:start(Name, RanchOptions, Minirest) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
?ULOG("Listener ~ts on ~ts started.~n", [
|
?ULOG("Listener ~ts on ~ts started.~n", [
|
||||||
Name, emqx_listeners:format_addr(Bind)
|
Name, emqx_listeners:format_bind(Bind)
|
||||||
]),
|
]),
|
||||||
Acc;
|
Acc;
|
||||||
{error, _Reason} ->
|
{error, _Reason} ->
|
||||||
|
@ -114,7 +114,7 @@ stop_listeners(Listeners) ->
|
||||||
case minirest:stop(Name) of
|
case minirest:stop(Name) of
|
||||||
ok ->
|
ok ->
|
||||||
?ULOG("Stop listener ~ts on ~ts successfully.~n", [
|
?ULOG("Stop listener ~ts on ~ts successfully.~n", [
|
||||||
Name, emqx_listeners:format_addr(Port)
|
Name, emqx_listeners:format_bind(Port)
|
||||||
]);
|
]);
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
?SLOG(warning, #{msg => "stop_listener_failed", name => Name, port => Port})
|
?SLOG(warning, #{msg => "stop_listener_failed", name => Name, port => Port})
|
||||||
|
|
|
@ -112,6 +112,13 @@ emqx_gateway_api_listeners {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
listener_status {
|
||||||
|
desc {
|
||||||
|
en: """listener status """
|
||||||
|
zh: """监听器状态"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
listener_node_status {
|
listener_node_status {
|
||||||
desc {
|
desc {
|
||||||
en: """listener status of each node in the cluster"""
|
en: """listener status of each node in the cluster"""
|
||||||
|
|
|
@ -81,7 +81,7 @@ paths() ->
|
||||||
|
|
||||||
listeners(get, #{bindings := #{name := Name0}}) ->
|
listeners(get, #{bindings := #{name := Name0}}) ->
|
||||||
with_gateway(Name0, fun(GwName, _) ->
|
with_gateway(Name0, fun(GwName, _) ->
|
||||||
Result = get_cluster_listeners_info(GwName),
|
Result = lists:map(fun bind2str/1, get_cluster_listeners_info(GwName)),
|
||||||
{200, Result}
|
{200, Result}
|
||||||
end);
|
end);
|
||||||
listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
|
listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
|
||||||
|
@ -119,7 +119,7 @@ listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) ->
|
||||||
with_gateway(Name0, fun(_GwName, _) ->
|
with_gateway(Name0, fun(_GwName, _) ->
|
||||||
case emqx_gateway_conf:listener(ListenerId) of
|
case emqx_gateway_conf:listener(ListenerId) of
|
||||||
{ok, Listener} ->
|
{ok, Listener} ->
|
||||||
{200, Listener};
|
{200, bind2str(Listener)};
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
return_http_error(404, "Listener not found");
|
return_http_error(404, "Listener not found");
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -266,11 +266,14 @@ get_cluster_listeners_info(GwName) ->
|
||||||
ClusterStatus
|
ClusterStatus
|
||||||
),
|
),
|
||||||
|
|
||||||
{MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus),
|
{MaxCons, CurrCons, Running} = aggregate_listener_status(NodeStatus),
|
||||||
|
|
||||||
Listener#{
|
Listener#{
|
||||||
max_connections => MaxCons,
|
status => #{
|
||||||
current_connections => CurrCons,
|
running => Running,
|
||||||
|
max_connections => MaxCons,
|
||||||
|
current_connections => CurrCons
|
||||||
|
},
|
||||||
node_status => NodeStatus
|
node_status => NodeStatus
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
|
@ -292,20 +295,23 @@ do_listeners_cluster_status(Listeners) ->
|
||||||
fun({Id, ListenOn}, Acc) ->
|
fun({Id, ListenOn}, Acc) ->
|
||||||
BinId = erlang:atom_to_binary(Id),
|
BinId = erlang:atom_to_binary(Id),
|
||||||
{ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId),
|
{ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId),
|
||||||
Curr =
|
{Running, Curr} =
|
||||||
try esockd:get_current_connections({Id, ListenOn}) of
|
try esockd:get_current_connections({Id, ListenOn}) of
|
||||||
Int -> Int
|
Int -> {true, Int}
|
||||||
catch
|
catch
|
||||||
%% not started
|
%% not started
|
||||||
error:not_found ->
|
error:not_found ->
|
||||||
0
|
{false, 0}
|
||||||
end,
|
end,
|
||||||
Acc#{
|
Acc#{
|
||||||
Id => #{
|
Id => #{
|
||||||
node => Node,
|
node => Node,
|
||||||
current_connections => Curr,
|
status => #{
|
||||||
%% XXX: Since it is taken from raw-conf, it is possible a string
|
running => Running,
|
||||||
max_connections => int(Max)
|
current_connections => Curr,
|
||||||
|
%% XXX: Since it is taken from raw-conf, it is possible a string
|
||||||
|
max_connections => int(Max)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
|
@ -317,6 +323,31 @@ int(B) when is_binary(B) ->
|
||||||
binary_to_integer(B);
|
binary_to_integer(B);
|
||||||
int(I) when is_integer(I) ->
|
int(I) when is_integer(I) ->
|
||||||
I.
|
I.
|
||||||
|
aggregate_listener_status(NodeStatus) ->
|
||||||
|
aggregate_listener_status(NodeStatus, 0, 0, undefined).
|
||||||
|
|
||||||
|
aggregate_listener_status(
|
||||||
|
[
|
||||||
|
#{status := #{running := Running, max_connections := Max, current_connections := Current}}
|
||||||
|
| T
|
||||||
|
],
|
||||||
|
MaxAcc,
|
||||||
|
CurrAcc,
|
||||||
|
RunningAcc
|
||||||
|
) ->
|
||||||
|
NRunning = aggregate_running(Running, RunningAcc),
|
||||||
|
aggregate_listener_status(T, MaxAcc + Max, Current + CurrAcc, NRunning);
|
||||||
|
aggregate_listener_status([], MaxAcc, CurrAcc, RunningAcc) ->
|
||||||
|
{MaxAcc, CurrAcc, RunningAcc}.
|
||||||
|
|
||||||
|
aggregate_running(R, R) -> R;
|
||||||
|
aggregate_running(R, undefined) -> R;
|
||||||
|
aggregate_running(_, _) -> inconsistent.
|
||||||
|
|
||||||
|
bind2str(Listener = #{bind := Bind}) ->
|
||||||
|
Listener#{bind := iolist_to_binary(emqx_listeners:format_bind(Bind))};
|
||||||
|
bind2str(Listener = #{<<"bind">> := Bind}) ->
|
||||||
|
Listener#{<<"bind">> := iolist_to_binary(emqx_listeners:format_bind(Bind))}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Swagger defines
|
%% Swagger defines
|
||||||
|
@ -590,22 +621,25 @@ params_paging_in_qs() ->
|
||||||
roots() ->
|
roots() ->
|
||||||
[listener].
|
[listener].
|
||||||
|
|
||||||
fields(listener_node_status) ->
|
fields(listener_status) ->
|
||||||
[
|
[
|
||||||
{current_connections, mk(non_neg_integer(), #{desc => ?DESC(current_connections)})},
|
{status,
|
||||||
|
mk(ref(emqx_mgmt_api_listeners, status), #{
|
||||||
|
desc => ?DESC(listener_status)
|
||||||
|
})},
|
||||||
{node_status,
|
{node_status,
|
||||||
mk(hoconsc:array(ref(emqx_mgmt_api_listeners, node_status)), #{
|
mk(hoconsc:array(ref(emqx_mgmt_api_listeners, node_status)), #{
|
||||||
desc => ?DESC(listener_node_status)
|
desc => ?DESC(listener_node_status)
|
||||||
})}
|
})}
|
||||||
];
|
];
|
||||||
fields(tcp_listener) ->
|
fields(tcp_listener) ->
|
||||||
emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status);
|
emqx_gateway_api:fields(tcp_listener) ++ fields(listener_status);
|
||||||
fields(ssl_listener) ->
|
fields(ssl_listener) ->
|
||||||
emqx_gateway_api:fields(ssl_listener) ++ fields(listener_node_status);
|
emqx_gateway_api:fields(ssl_listener) ++ fields(listener_status);
|
||||||
fields(udp_listener) ->
|
fields(udp_listener) ->
|
||||||
emqx_gateway_api:fields(udp_listener) ++ fields(listener_node_status);
|
emqx_gateway_api:fields(udp_listener) ++ fields(listener_status);
|
||||||
fields(dtls_listener) ->
|
fields(dtls_listener) ->
|
||||||
emqx_gateway_api:fields(dtls_listener) ++ fields(listener_node_status);
|
emqx_gateway_api:fields(dtls_listener) ++ fields(listener_status);
|
||||||
fields(_) ->
|
fields(_) ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
|
@ -623,12 +657,19 @@ listener_node_status_schema() ->
|
||||||
examples_listener_list() ->
|
examples_listener_list() ->
|
||||||
Convert = fun(Cfg) ->
|
Convert = fun(Cfg) ->
|
||||||
Cfg#{
|
Cfg#{
|
||||||
current_connections => 0,
|
status => #{
|
||||||
|
running => true,
|
||||||
|
max_connections => 1024000,
|
||||||
|
current_connections => 10
|
||||||
|
},
|
||||||
node_status => [
|
node_status => [
|
||||||
#{
|
#{
|
||||||
node => <<"127.0.0.1">>,
|
node => <<"emqx@127.0.0.1">>,
|
||||||
current_connections => 0,
|
status => #{
|
||||||
max_connections => 1024000
|
running => true,
|
||||||
|
current_connections => 10,
|
||||||
|
max_connections => 1024000
|
||||||
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -181,24 +181,11 @@ do_convert_listener(GwName, LType, Conf) ->
|
||||||
|
|
||||||
do_convert_listener2(GwName, LType, LName, LConf) ->
|
do_convert_listener2(GwName, LType, LName, LConf) ->
|
||||||
ListenerId = emqx_gateway_utils:listener_id(GwName, LType, LName),
|
ListenerId = emqx_gateway_utils:listener_id(GwName, LType, LName),
|
||||||
Running = emqx_gateway_utils:is_running(ListenerId, LConf),
|
LConf#{
|
||||||
bind2str(
|
id => ListenerId,
|
||||||
LConf#{
|
type => LType,
|
||||||
id => ListenerId,
|
name => LName
|
||||||
type => LType,
|
}.
|
||||||
name => LName,
|
|
||||||
running => Running
|
|
||||||
}
|
|
||||||
).
|
|
||||||
|
|
||||||
bind2str(LConf = #{bind := Bind}) when is_integer(Bind) ->
|
|
||||||
maps:put(bind, integer_to_binary(Bind), LConf);
|
|
||||||
bind2str(LConf = #{<<"bind">> := Bind}) when is_integer(Bind) ->
|
|
||||||
maps:put(<<"bind">>, integer_to_binary(Bind), LConf);
|
|
||||||
bind2str(LConf = #{bind := Bind}) when is_binary(Bind) ->
|
|
||||||
LConf;
|
|
||||||
bind2str(LConf = #{<<"bind">> := Bind}) when is_binary(Bind) ->
|
|
||||||
LConf.
|
|
||||||
|
|
||||||
get_bind(#{bind := Bind}) ->
|
get_bind(#{bind := Bind}) ->
|
||||||
emqx_gateway_utils:parse_listenon(Bind);
|
emqx_gateway_utils:parse_listenon(Bind);
|
||||||
|
|
|
@ -37,7 +37,6 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
apply/2,
|
apply/2,
|
||||||
format_listenon/1,
|
|
||||||
parse_listenon/1,
|
parse_listenon/1,
|
||||||
unix_ts_to_rfc3339/1,
|
unix_ts_to_rfc3339/1,
|
||||||
unix_ts_to_rfc3339/2,
|
unix_ts_to_rfc3339/2,
|
||||||
|
@ -165,7 +164,7 @@ start_listener(
|
||||||
{Type, LisName, ListenOn, SocketOpts, Cfg},
|
{Type, LisName, ListenOn, SocketOpts, Cfg},
|
||||||
ModCfg
|
ModCfg
|
||||||
) ->
|
) ->
|
||||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
ListenOnStr = emqx_listeners:format_bind(ListenOn),
|
||||||
ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
||||||
|
|
||||||
NCfg = maps:merge(Cfg, ModCfg),
|
NCfg = maps:merge(Cfg, ModCfg),
|
||||||
|
@ -243,7 +242,7 @@ stop_listeners(GwName, Listeners) ->
|
||||||
-spec stop_listener(GwName :: atom(), Listener :: tuple()) -> ok.
|
-spec stop_listener(GwName :: atom(), Listener :: tuple()) -> ok.
|
||||||
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
||||||
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
|
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
|
||||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
ListenOnStr = emqx_listeners:format_bind(ListenOn),
|
||||||
case StopRet of
|
case StopRet of
|
||||||
ok ->
|
ok ->
|
||||||
console_print(
|
console_print(
|
||||||
|
@ -287,13 +286,6 @@ apply(F, A2) when
|
||||||
->
|
->
|
||||||
erlang:apply(F, A2).
|
erlang:apply(F, A2).
|
||||||
|
|
||||||
format_listenon(Port) when is_integer(Port) ->
|
|
||||||
io_lib:format("0.0.0.0:~w", [Port]);
|
|
||||||
format_listenon({Addr, Port}) when is_list(Addr) ->
|
|
||||||
io_lib:format("~ts:~w", [Addr, Port]);
|
|
||||||
format_listenon({Addr, Port}) when is_tuple(Addr) ->
|
|
||||||
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]).
|
|
||||||
|
|
||||||
parse_listenon(Port) when is_integer(Port) ->
|
parse_listenon(Port) when is_integer(Port) ->
|
||||||
Port;
|
Port;
|
||||||
parse_listenon(IpPort) when is_tuple(IpPort) ->
|
parse_listenon(IpPort) when is_tuple(IpPort) ->
|
||||||
|
|
|
@ -167,7 +167,7 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
|
||||||
)}
|
)}
|
||||||
]
|
]
|
||||||
end,
|
end,
|
||||||
ListenOnStr = emqx_listeners:format_addr(ListenOn),
|
ListenOnStr = emqx_listeners:format_bind(ListenOn),
|
||||||
case grpc:start_server(GwName, ListenOn, Services, SvrOptions) of
|
case grpc:start_server(GwName, ListenOn, Services, SvrOptions) of
|
||||||
{ok, _SvrPid} ->
|
{ok, _SvrPid} ->
|
||||||
console_print(
|
console_print(
|
||||||
|
|
|
@ -340,7 +340,7 @@ t_listeners_tcp(_) ->
|
||||||
LisConf = #{
|
LisConf = #{
|
||||||
name => <<"def">>,
|
name => <<"def">>,
|
||||||
type => <<"tcp">>,
|
type => <<"tcp">>,
|
||||||
bind => <<"61613">>
|
bind => <<"127.0.0.1:61613">>
|
||||||
},
|
},
|
||||||
{201, _} = request(post, "/gateway/stomp/listeners", LisConf),
|
{201, _} = request(post, "/gateway/stomp/listeners", LisConf),
|
||||||
{200, ConfResp} = request(get, "/gateway/stomp/listeners"),
|
{200, ConfResp} = request(get, "/gateway/stomp/listeners"),
|
||||||
|
@ -348,7 +348,7 @@ t_listeners_tcp(_) ->
|
||||||
{200, ConfResp1} = request(get, "/gateway/stomp/listeners/stomp:tcp:def"),
|
{200, ConfResp1} = request(get, "/gateway/stomp/listeners/stomp:tcp:def"),
|
||||||
assert_confs(LisConf, ConfResp1),
|
assert_confs(LisConf, ConfResp1),
|
||||||
|
|
||||||
LisConf2 = maps:merge(LisConf, #{bind => <<"61614">>}),
|
LisConf2 = maps:merge(LisConf, #{bind => <<"127.0.0.1:61614">>}),
|
||||||
{200, _} = request(
|
{200, _} = request(
|
||||||
put,
|
put,
|
||||||
"/gateway/stomp/listeners/stomp:tcp:def",
|
"/gateway/stomp/listeners/stomp:tcp:def",
|
||||||
|
@ -369,7 +369,7 @@ t_listeners_authn(_) ->
|
||||||
#{
|
#{
|
||||||
name => <<"def">>,
|
name => <<"def">>,
|
||||||
type => <<"tcp">>,
|
type => <<"tcp">>,
|
||||||
bind => <<"61613">>
|
bind => <<"127.0.0.1:61613">>
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
@ -405,7 +405,7 @@ t_listeners_authn_data_mgmt(_) ->
|
||||||
#{
|
#{
|
||||||
name => <<"def">>,
|
name => <<"def">>,
|
||||||
type => <<"tcp">>,
|
type => <<"tcp">>,
|
||||||
bind => <<"61613">>
|
bind => <<"127.0.0.1:61613">>
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|
|
@ -218,11 +218,13 @@ fields(listener_type_status) ->
|
||||||
fields(listener_id_status) ->
|
fields(listener_id_status) ->
|
||||||
fields(listener_id) ++
|
fields(listener_id) ++
|
||||||
[
|
[
|
||||||
|
{type, ?HOCON(?ENUM(listeners_type()), #{desc => "Listener type", required => true})},
|
||||||
|
{name, ?HOCON(string(), #{desc => "Listener name", required => true})},
|
||||||
{enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
|
{enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
|
||||||
{number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId counter"})},
|
{number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId counter"})},
|
||||||
{bind,
|
{bind,
|
||||||
?HOCON(
|
?HOCON(
|
||||||
hoconsc:union([emqx_schema:ip_port(), integer()]),
|
emqx_schema:ip_port(),
|
||||||
#{desc => "Listener bind addr", required => true}
|
#{desc => "Listener bind addr", required => true}
|
||||||
)},
|
)},
|
||||||
{acceptors, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId acceptors"})},
|
{acceptors, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId acceptors"})},
|
||||||
|
@ -231,12 +233,24 @@ fields(listener_id_status) ->
|
||||||
];
|
];
|
||||||
fields(status) ->
|
fields(status) ->
|
||||||
[
|
[
|
||||||
|
{running,
|
||||||
|
?HOCON(
|
||||||
|
hoconsc:union([inconsistent, boolean()]),
|
||||||
|
#{desc => "Listener running status", required => true}
|
||||||
|
)},
|
||||||
{max_connections,
|
{max_connections,
|
||||||
?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})},
|
?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})},
|
||||||
{current_connections, ?HOCON(non_neg_integer(), #{desc => "Current connections"})}
|
{current_connections, ?HOCON(non_neg_integer(), #{desc => "Current connections"})}
|
||||||
];
|
];
|
||||||
fields(node_status) ->
|
fields(node_status) ->
|
||||||
fields(node) ++ fields(status);
|
[
|
||||||
|
{"node",
|
||||||
|
?HOCON(atom(), #{
|
||||||
|
desc => "Node name",
|
||||||
|
example => "emqx@127.0.0.1"
|
||||||
|
})},
|
||||||
|
{status, ?HOCON(?R_REF(status))}
|
||||||
|
];
|
||||||
fields(Type) ->
|
fields(Type) ->
|
||||||
Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}),
|
Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}),
|
||||||
[Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
|
[Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
|
||||||
|
@ -311,7 +325,7 @@ listener_type_status(get, _Request) ->
|
||||||
Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})),
|
Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})),
|
||||||
List = lists:map(
|
List = lists:map(
|
||||||
fun({Type, L}) ->
|
fun({Type, L}) ->
|
||||||
L1 = maps:without([bind, acceptors], L),
|
L1 = maps:without([bind, acceptors, name], L),
|
||||||
L1#{type => Type}
|
L1#{type => Type}
|
||||||
end,
|
end,
|
||||||
Listeners
|
Listeners
|
||||||
|
@ -453,7 +467,7 @@ listener_status_by_id(NodeL) ->
|
||||||
fun({Id, L}) ->
|
fun({Id, L}) ->
|
||||||
L1 = maps:remove(ids, L),
|
L1 = maps:remove(ids, L),
|
||||||
#{node_status := Nodes} = L1,
|
#{node_status := Nodes} = L1,
|
||||||
L1#{number => maps:size(Nodes), id => Id}
|
L1#{number => length(Nodes), id => Id}
|
||||||
end,
|
end,
|
||||||
Listeners
|
Listeners
|
||||||
).
|
).
|
||||||
|
@ -510,67 +524,75 @@ wrap_rpc(Res) ->
|
||||||
format_status(Key, Node, Listener, Acc) ->
|
format_status(Key, Node, Listener, Acc) ->
|
||||||
#{
|
#{
|
||||||
<<"id">> := Id,
|
<<"id">> := Id,
|
||||||
|
<<"type">> := Type,
|
||||||
|
<<"enabled">> := Enabled,
|
||||||
<<"running">> := Running,
|
<<"running">> := Running,
|
||||||
<<"max_connections">> := MaxConnections,
|
<<"max_connections">> := MaxConnections,
|
||||||
<<"current_connections">> := CurrentConnections,
|
<<"current_connections">> := CurrentConnections,
|
||||||
<<"acceptors">> := Acceptors,
|
<<"acceptors">> := Acceptors,
|
||||||
<<"bind">> := Bind
|
<<"bind">> := Bind
|
||||||
} = Listener,
|
} = Listener,
|
||||||
|
{ok, #{name := Name}} = emqx_listeners:parse_listener_id(Id),
|
||||||
GroupKey = maps:get(Key, Listener),
|
GroupKey = maps:get(Key, Listener),
|
||||||
case maps:find(GroupKey, Acc) of
|
case maps:find(GroupKey, Acc) of
|
||||||
error ->
|
error ->
|
||||||
Acc#{
|
Acc#{
|
||||||
GroupKey => #{
|
GroupKey => #{
|
||||||
enable => Running,
|
name => Name,
|
||||||
|
type => Type,
|
||||||
|
enable => Enabled,
|
||||||
ids => [Id],
|
ids => [Id],
|
||||||
acceptors => Acceptors,
|
acceptors => Acceptors,
|
||||||
bind => Bind,
|
bind => iolist_to_binary(emqx_listeners:format_bind(Bind)),
|
||||||
status => #{
|
status => #{
|
||||||
|
running => Running,
|
||||||
max_connections => MaxConnections,
|
max_connections => MaxConnections,
|
||||||
current_connections => CurrentConnections
|
current_connections => CurrentConnections
|
||||||
},
|
},
|
||||||
node_status => #{
|
node_status => [
|
||||||
Node => #{
|
#{
|
||||||
max_connections => MaxConnections,
|
node => Node,
|
||||||
current_connections => CurrentConnections
|
status => #{
|
||||||
|
running => Running,
|
||||||
|
max_connections => MaxConnections,
|
||||||
|
current_connections => CurrentConnections
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
]
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
{ok, GroupValue} ->
|
{ok, GroupValue} ->
|
||||||
#{
|
#{
|
||||||
ids := Ids,
|
ids := Ids,
|
||||||
status := #{
|
status := #{
|
||||||
|
running := Running0,
|
||||||
max_connections := MaxConnections0,
|
max_connections := MaxConnections0,
|
||||||
current_connections := CurrentConnections0
|
current_connections := CurrentConnections0
|
||||||
},
|
},
|
||||||
node_status := NodeStatus0
|
node_status := NodeStatus0
|
||||||
} = GroupValue,
|
} = GroupValue,
|
||||||
NodeStatus =
|
NodeStatus = [
|
||||||
case maps:find(Node, NodeStatus0) of
|
#{
|
||||||
error ->
|
node => Node,
|
||||||
NodeStatus0#{
|
status => #{
|
||||||
Node => #{
|
running => Running,
|
||||||
max_connections => MaxConnections,
|
max_connections => MaxConnections,
|
||||||
current_connections => CurrentConnections
|
current_connections => CurrentConnections
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
{ok, #{
|
| NodeStatus0
|
||||||
max_connections := PrevMax,
|
],
|
||||||
current_connections := PrevCurr
|
NRunning =
|
||||||
}} ->
|
case Running == Running0 of
|
||||||
NodeStatus0#{
|
true -> Running0;
|
||||||
Node => #{
|
_ -> inconsistent
|
||||||
max_connections => max_conn(MaxConnections, PrevMax),
|
|
||||||
current_connections => CurrentConnections + PrevCurr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
end,
|
end,
|
||||||
Acc#{
|
Acc#{
|
||||||
GroupKey =>
|
GroupKey =>
|
||||||
GroupValue#{
|
GroupValue#{
|
||||||
ids => lists:usort([Id | Ids]),
|
ids => lists:usort([Id | Ids]),
|
||||||
status => #{
|
status => #{
|
||||||
|
running => NRunning,
|
||||||
max_connections => max_conn(MaxConnections0, MaxConnections),
|
max_connections => max_conn(MaxConnections0, MaxConnections),
|
||||||
current_connections => CurrentConnections0 + CurrentConnections
|
current_connections => CurrentConnections0 + CurrentConnections
|
||||||
},
|
},
|
||||||
|
@ -605,17 +627,27 @@ listener_type_status_example() ->
|
||||||
#{
|
#{
|
||||||
enable => false,
|
enable => false,
|
||||||
ids => ["tcp:demo"],
|
ids => ["tcp:demo"],
|
||||||
node_status => #{
|
node_status =>
|
||||||
'emqx@127.0.0.1' => #{
|
[
|
||||||
current_connections => 11,
|
#{
|
||||||
max_connections => 1024000
|
node => 'emqx@127.0.0.1',
|
||||||
},
|
status => #{
|
||||||
'emqx@127.0.0.2' => #{
|
running => true,
|
||||||
current_connections => 10,
|
current_connections => 11,
|
||||||
max_connections => 1024000
|
max_connections => 1024000
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
#{
|
||||||
|
node => 'emqx@127.0.0.1',
|
||||||
|
status => #{
|
||||||
|
running => true,
|
||||||
|
current_connections => 10,
|
||||||
|
max_connections => 1024000
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
status => #{
|
status => #{
|
||||||
|
running => true,
|
||||||
current_connections => 21,
|
current_connections => 21,
|
||||||
max_connections => 2048000
|
max_connections => 2048000
|
||||||
},
|
},
|
||||||
|
@ -624,17 +656,28 @@ listener_type_status_example() ->
|
||||||
#{
|
#{
|
||||||
enable => false,
|
enable => false,
|
||||||
ids => ["ssl:default"],
|
ids => ["ssl:default"],
|
||||||
node_status => #{
|
node_status =>
|
||||||
'emqx@127.0.0.1' => #{
|
[
|
||||||
current_connections => 31,
|
#{
|
||||||
max_connections => infinity
|
node => 'emqx@127.0.0.1',
|
||||||
},
|
status => #{
|
||||||
'emqx@127.0.0.2' => #{
|
running => true,
|
||||||
current_connections => 40,
|
current_connections => 31,
|
||||||
max_connections => infinity
|
max_connections => infinity
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
#{
|
||||||
|
node => 'emqx@127.0.0.1',
|
||||||
|
status => #{
|
||||||
|
running => true,
|
||||||
|
current_connections => 40,
|
||||||
|
max_connections => infinity
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
|
||||||
status => #{
|
status => #{
|
||||||
|
running => true,
|
||||||
current_connections => 71,
|
current_connections => 71,
|
||||||
max_connections => infinity
|
max_connections => infinity
|
||||||
},
|
},
|
||||||
|
@ -649,18 +692,30 @@ listener_id_status_example() ->
|
||||||
bind => <<"0.0.0.0:1884">>,
|
bind => <<"0.0.0.0:1884">>,
|
||||||
enable => true,
|
enable => true,
|
||||||
id => <<"tcp:demo">>,
|
id => <<"tcp:demo">>,
|
||||||
node_status => #{
|
type => <<"tcp">>,
|
||||||
'emqx@127.0.0.1' => #{
|
name => <<"demo">>,
|
||||||
current_connections => 100,
|
node_status =>
|
||||||
max_connections => 1024000
|
[
|
||||||
},
|
#{
|
||||||
'emqx@127.0.0.2' => #{
|
node => 'emqx@127.0.0.1',
|
||||||
current_connections => 101,
|
status => #{
|
||||||
max_connections => 1024000
|
running => true,
|
||||||
}
|
current_connections => 100,
|
||||||
},
|
max_connections => 1024000
|
||||||
|
}
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
node => 'emqx@127.0.0.1',
|
||||||
|
status => #{
|
||||||
|
running => true,
|
||||||
|
current_connections => 101,
|
||||||
|
max_connections => 1024000
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
number => 2,
|
number => 2,
|
||||||
status => #{
|
status => #{
|
||||||
|
running => true,
|
||||||
current_connections => 201,
|
current_connections => 201,
|
||||||
max_connections => 2048000
|
max_connections => 2048000
|
||||||
}
|
}
|
||||||
|
@ -670,18 +725,30 @@ listener_id_status_example() ->
|
||||||
bind => <<"0.0.0.0:1883">>,
|
bind => <<"0.0.0.0:1883">>,
|
||||||
enable => true,
|
enable => true,
|
||||||
id => <<"tcp:default">>,
|
id => <<"tcp:default">>,
|
||||||
node_status => #{
|
type => <<"tcp">>,
|
||||||
'emqx@127.0.0.1' => #{
|
name => <<"default">>,
|
||||||
current_connections => 300,
|
node_status =>
|
||||||
max_connections => infinity
|
[
|
||||||
},
|
#{
|
||||||
'emqx@127.0.0.2' => #{
|
node => 'emqx@127.0.0.1',
|
||||||
current_connections => 201,
|
status => #{
|
||||||
max_connections => infinity
|
running => true,
|
||||||
}
|
current_connections => 200,
|
||||||
},
|
max_connections => infinity
|
||||||
|
}
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
node => 'emqx@127.0.0.1',
|
||||||
|
status => #{
|
||||||
|
running => true,
|
||||||
|
current_connections => 301,
|
||||||
|
max_connections => infinity
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
number => 2,
|
number => 2,
|
||||||
status => #{
|
status => #{
|
||||||
|
running => true,
|
||||||
current_connections => 501,
|
current_connections => 501,
|
||||||
max_connections => infinity
|
max_connections => infinity
|
||||||
}
|
}
|
||||||
|
|
|
@ -582,7 +582,7 @@ listeners([]) ->
|
||||||
end,
|
end,
|
||||||
Info =
|
Info =
|
||||||
[
|
[
|
||||||
{listen_on, {string, format_listen_on(Bind)}},
|
{listen_on, {string, emqx_listeners:format_bind(Bind)}},
|
||||||
{acceptors, Acceptors},
|
{acceptors, Acceptors},
|
||||||
{proxy_protocol, ProxyProtocol},
|
{proxy_protocol, ProxyProtocol},
|
||||||
{running, Running}
|
{running, Running}
|
||||||
|
@ -802,15 +802,6 @@ indent_print({Key, {string, Val}}) ->
|
||||||
indent_print({Key, Val}) ->
|
indent_print({Key, Val}) ->
|
||||||
emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]).
|
emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]).
|
||||||
|
|
||||||
format_listen_on(Port) when is_integer(Port) ->
|
|
||||||
io_lib:format("0.0.0.0:~w", [Port]);
|
|
||||||
format_listen_on({Addr, Port}) when is_list(Addr) ->
|
|
||||||
io_lib:format("~ts:~w", [Addr, Port]);
|
|
||||||
format_listen_on({Addr, Port}) when is_tuple(Addr) andalso tuple_size(Addr) == 4 ->
|
|
||||||
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]);
|
|
||||||
format_listen_on({Addr, Port}) when is_tuple(Addr) andalso tuple_size(Addr) == 8 ->
|
|
||||||
io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]).
|
|
||||||
|
|
||||||
name(Filter) ->
|
name(Filter) ->
|
||||||
iolist_to_binary(["CLI-", Filter]).
|
iolist_to_binary(["CLI-", Filter]).
|
||||||
|
|
||||||
|
|
2
mix.exs
2
mix.exs
|
@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
{:lc, github: "emqx/lc", tag: "0.3.1"},
|
{:lc, github: "emqx/lc", tag: "0.3.1"},
|
||||||
{:redbug, "2.0.7"},
|
{:redbug, "2.0.7"},
|
||||||
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
|
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
|
||||||
{:ehttpc, github: "emqx/ehttpc", tag: "0.2.1"},
|
{:ehttpc, github: "emqx/ehttpc", tag: "0.3.0"},
|
||||||
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
|
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
|
||||||
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
||||||
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
||||||
|
|
|
@ -49,7 +49,7 @@
|
||||||
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
||||||
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
|
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
|
||||||
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
|
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
|
||||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.1"}}}
|
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.3.0"}}}
|
||||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
## create a webhook data bridge with id "my_webhook"
|
## create a webhook data bridge with id "my_webhook"
|
||||||
!curl --user admin:public --silent --show-error 'http://localhost:18083/api/v5/bridges' -X 'POST' -H 'Content-Type: application/json' --data-binary '{"name":"my_webhook","body":"","method":"post","url":"http://webhook.emqx.io:7077/counter","headers":{"content-type":"application/json"},"pool_size":4,"enable_pipelining":100,"connect_timeout":"5s","request_timeout":"5s","max_retries":3,"type":"webhook","ssl":{"enable":false,"verify":"verify_none"}}' | jq '.status'
|
!curl --user admin:public --silent --show-error 'http://localhost:18083/api/v5/bridges' -X 'POST' -H 'Content-Type: application/json' --data-binary '{"name":"my_webhook","body":"","method":"post","url":"http://webhook.emqx.io:7077/counter","headers":{"content-type":"application/json"},"pool_size":4,"enable_pipelining":100,"connect_timeout":"5s","type":"webhook","ssl":{"enable":false,"verify":"verify_none"}}' | jq '.status'
|
||||||
?connected
|
?connected
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue