diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index ae1184f81..4dd74823c 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -23,6 +23,7 @@ **‼️ Note** : The previous API only returns array: `[RuleObj1,RuleObj2]`, after updating, it will become `{"data": [RuleObj1,RuleObj2], "meta":{"count":2, "limit":100, "page":1}`, which will carry the paging meta information. +* Fix the issue that webhook leaks TCP connections. [ehttpc#34](https://github.com/emqx/ehttpc/pull/34), [#8580](https://github.com/emqx/emqx/pull/8580) ## Enhancements @@ -31,6 +32,8 @@ * Remove `/configs/listeners` API, use `/listeners/` instead. [#8485](https://github.com/emqx/emqx/pull/8485) * Optimize performance of builtin database operations in processes with long message queue [#8439](https://github.com/emqx/emqx/pull/8439) * Improve authentication tracing. [#8554](https://github.com/emqx/emqx/pull/8554) +* Standardize the '/listeners' and `/gateway//listeners` API fields. + It will introduce some incompatible updates, see [#8571](https://github.com/emqx/emqx/pull/8571) # 5.0.3 diff --git a/Makefile b/Makefile index 9f656eac3..6c6c02ca1 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-d export EMQX_DEFAULT_RUNNER = debian:11-slim export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh) -export EMQX_DASHBOARD_VERSION ?= v1.0.5-beta.1 +export EMQX_DASHBOARD_VERSION ?= v1.0.5 export EMQX_REL_FORM ?= tgz export QUICER_DOWNLOAD_FROM_RELEASE = 1 ifeq ($(OS),Windows_NT) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 3c508bacc..09b923d0c 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -54,7 +54,7 @@ -export([pre_config_update/3, post_config_update/5]). --export([format_addr/1]). +-export([format_bind/1]). -define(CONF_KEY_PATH, [listeners, '?', '?']). -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). @@ -201,14 +201,14 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> ?tp(listener_started, #{type => Type, bind => Bind}), console_print( "Listener ~ts on ~ts started.~n", - [listener_id(Type, ListenerName), format_addr(Bind)] + [listener_id(Type, ListenerName), format_bind(Bind)] ), ok; {error, {already_started, Pid}} -> {error, {already_started, Pid}}; {error, Reason} -> ListenerId = listener_id(Type, ListenerName), - BindStr = format_addr(Bind), + BindStr = format_bind(Bind), ?ELOG( "Failed to start listener ~ts on ~ts: ~0p.~n", [ListenerId, BindStr, Reason] @@ -261,19 +261,19 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> ok -> console_print( "Listener ~ts on ~ts stopped.~n", - [listener_id(Type, ListenerName), format_addr(Bind)] + [listener_id(Type, ListenerName), format_bind(Bind)] ), ok; {error, not_found} -> ?ELOG( "Failed to stop listener ~ts on ~ts: ~0p~n", - [listener_id(Type, ListenerName), format_addr(Bind), already_stopped] + [listener_id(Type, ListenerName), format_bind(Bind), already_stopped] ), ok; {error, Reason} -> ?ELOG( "Failed to stop listener ~ts on ~ts: ~0p~n", - [listener_id(Type, ListenerName), format_addr(Bind), Reason] + [listener_id(Type, ListenerName), format_bind(Bind), Reason] ), {error, Reason} end. @@ -492,17 +492,32 @@ merge_default(Options) -> [{tcp_options, ?MQTT_SOCKOPTS} | Options] end. -format_addr(Port) when is_integer(Port) -> - io_lib:format("~w", [Port]); +-spec format_bind( + integer() | {tuple(), integer()} | string() | binary() +) -> io_lib:chars(). +format_bind(Port) when is_integer(Port) -> + io_lib:format(":~w", [Port]); %% Print only the port number when bound on all interfaces -format_addr({{0, 0, 0, 0}, Port}) -> - format_addr(Port); -format_addr({{0, 0, 0, 0, 0, 0, 0, 0}, Port}) -> - format_addr(Port); -format_addr({Addr, Port}) when is_list(Addr) -> +format_bind({{0, 0, 0, 0}, Port}) -> + format_bind(Port); +format_bind({{0, 0, 0, 0, 0, 0, 0, 0}, Port}) -> + format_bind(Port); +format_bind({Addr, Port}) when is_list(Addr) -> io_lib:format("~ts:~w", [Addr, Port]); -format_addr({Addr, Port}) when is_tuple(Addr) -> - io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]). +format_bind({Addr, Port}) when is_tuple(Addr), tuple_size(Addr) == 4 -> + io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]); +format_bind({Addr, Port}) when is_tuple(Addr), tuple_size(Addr) == 8 -> + io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]); +%% Support string, binary type for Port or IP:Port +format_bind(Str) when is_list(Str) -> + case emqx_schema:to_ip_port(Str) of + {ok, {Ip, Port}} -> + format_bind({Ip, Port}); + {error, _} -> + format_bind(list_to_integer(Str)) + end; +format_bind(Bin) when is_binary(Bin) -> + format_bind(binary_to_list(Bin)). listener_id(Type, ListenerName) -> list_to_atom(lists:append([str(Type), ":", str(ListenerName)])). diff --git a/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf index cd2cafd78..fcc817bef 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf @@ -127,6 +127,17 @@ HTTP 请求的正文。
} } + config_max_retries { + desc { + en: """HTTP request max retry times if failed.""" + zh: """HTTP 请求失败最大重试次数""" + } + label: { + en: "HTTP Request Max Retries" + zh: "HTTP 请求重试次数" + } + } + desc_type { desc { en: """The Bridge Type""" diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 70550efe4..fe19ed066 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "An OTP application"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index bc9b6c5a2..37a42ab3d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -225,7 +225,6 @@ info_example_basic(webhook, _) -> request_timeout => <<"15s">>, connect_timeout => <<"15s">>, max_retries => 3, - retry_interval => <<"10s">>, pool_type => <<"random">>, pool_size => 4, enable_pipelining => 100, diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 678aa1f10..d19cc8426 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -238,7 +238,8 @@ parse_confs( method := Method, body := Body, headers := Headers, - request_timeout := ReqTimeout + request_timeout := ReqTimeout, + max_retries := Retry } = Conf ) -> {BaseUrl, Path} = parse_url(Url), @@ -251,7 +252,8 @@ parse_confs( method => Method, body => Body, headers => Headers, - request_timeout => ReqTimeout + request_timeout => ReqTimeout, + max_retries => Retry } }; parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when diff --git a/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl index 972ba86bc..f11247d68 100644 --- a/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl @@ -14,60 +14,7 @@ namespace() -> "bridge". roots() -> []. fields("config") -> - basic_config() ++ - [ - {url, - mk( - binary(), - #{ - required => true, - desc => ?DESC("config_url") - } - )}, - {local_topic, - mk( - binary(), - #{desc => ?DESC("config_local_topic")} - )}, - {method, - mk( - method(), - #{ - default => post, - desc => ?DESC("config_method") - } - )}, - {headers, - mk( - map(), - #{ - default => #{ - <<"accept">> => <<"application/json">>, - <<"cache-control">> => <<"no-cache">>, - <<"connection">> => <<"keep-alive">>, - <<"content-type">> => <<"application/json">>, - <<"keep-alive">> => <<"timeout=5">> - }, - desc => ?DESC("config_headers") - } - )}, - {body, - mk( - binary(), - #{ - default => <<"${payload}">>, - desc => ?DESC("config_body") - } - )}, - {request_timeout, - mk( - emqx_schema:duration_ms(), - #{ - default => <<"15s">>, - desc => ?DESC("config_request_timeout") - } - )} - ]; + basic_config() ++ request_config(); fields("post") -> [ type_field(), @@ -106,6 +53,69 @@ basic_config() -> ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)). +request_config() -> + [ + {url, + mk( + binary(), + #{ + required => true, + desc => ?DESC("config_url") + } + )}, + {local_topic, + mk( + binary(), + #{desc => ?DESC("config_local_topic")} + )}, + {method, + mk( + method(), + #{ + default => post, + desc => ?DESC("config_method") + } + )}, + {headers, + mk( + map(), + #{ + default => #{ + <<"accept">> => <<"application/json">>, + <<"cache-control">> => <<"no-cache">>, + <<"connection">> => <<"keep-alive">>, + <<"content-type">> => <<"application/json">>, + <<"keep-alive">> => <<"timeout=5">> + }, + desc => ?DESC("config_headers") + } + )}, + {body, + mk( + binary(), + #{ + default => <<"${payload}">>, + desc => ?DESC("config_body") + } + )}, + {max_retries, + mk( + non_neg_integer(), + #{ + default => 2, + desc => ?DESC("config_max_retries") + } + )}, + {request_timeout, + mk( + emqx_schema:duration_ms(), + #{ + default => <<"15s">>, + desc => ?DESC("config_request_timeout") + } + )} + ]. + %%====================================================================================== type_field() -> diff --git a/apps/emqx_connector/i18n/emqx_connector_http.conf b/apps/emqx_connector/i18n/emqx_connector_http.conf index 0e29a15d3..8664d324f 100644 --- a/apps/emqx_connector/i18n/emqx_connector_http.conf +++ b/apps/emqx_connector/i18n/emqx_connector_http.conf @@ -41,17 +41,6 @@ base URL 只包含host和port。
} } - retry_interval { - desc { - en: "Interval between retries." - zh: "重试之间的间隔时间。" - } - label: { - en: "Retry Interval" - zh: "重试间隔" - } - } - pool_type { desc { en: "The type of the pool. Can be one of `random`, `hash`." diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index f9e63dc57..59b4ddffa 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -88,22 +88,6 @@ fields(config) -> desc => ?DESC("connect_timeout") } )}, - {max_retries, - sc( - non_neg_integer(), - #{ - default => 5, - desc => ?DESC("max_retries") - } - )}, - {retry_interval, - sc( - emqx_schema:duration(), - #{ - default => "1s", - desc => ?DESC("retry_interval") - } - )}, {pool_type, sc( pool_type(), @@ -147,6 +131,14 @@ fields("request") -> {path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})}, {body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})}, {headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})}, + {max_retries, + sc( + non_neg_integer(), + #{ + required => false, + desc => ?DESC("max_retries") + } + )}, {request_timeout, sc( emqx_schema:duration_ms(), @@ -182,8 +174,6 @@ on_start( path := BasePath }, connect_timeout := ConnectTimeout, - max_retries := MaxRetries, - retry_interval := RetryInterval, pool_type := PoolType, pool_size := PoolSize } = Config @@ -206,8 +196,6 @@ on_start( {host, Host}, {port, Port}, {connect_timeout, ConnectTimeout}, - {retry, MaxRetries}, - {retry_timeout, RetryInterval}, {keepalive, 30000}, {pool_type, PoolType}, {pool_size, PoolSize}, @@ -247,17 +235,23 @@ on_query(InstId, {send_message, Msg}, AfterQuery, State) -> path := Path, body := Body, headers := Headers, - request_timeout := Timeout + request_timeout := Timeout, + max_retries := Retry } = process_request(Request, Msg), - on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State) + on_query( + InstId, + {undefined, Method, {Path, Headers, Body}, Timeout, Retry}, + AfterQuery, + State + ) end; on_query(InstId, {Method, Request}, AfterQuery, State) -> - on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State); + on_query(InstId, {undefined, Method, Request, 5000, 2}, AfterQuery, State); on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) -> - on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State); + on_query(InstId, {undefined, Method, Request, Timeout, 2}, AfterQuery, State); on_query( InstId, - {KeyOrNum, Method, Request, Timeout}, + {KeyOrNum, Method, Request, Timeout, Retry}, AfterQuery, #{pool_name := PoolName, base_path := BasePath} = State ) -> @@ -275,7 +269,8 @@ on_query( end, Method, NRequest, - Timeout + Timeout, + Retry ) of {error, Reason} -> @@ -368,7 +363,8 @@ preprocess_request( path => emqx_plugin_libs_rule:preproc_tmpl(Path), body => emqx_plugin_libs_rule:preproc_tmpl(Body), headers => preproc_headers(Headers), - request_timeout => maps:get(request_timeout, Req, 30000) + request_timeout => maps:get(request_timeout, Req, 30000), + max_retries => maps:get(max_retries, Req, 2) }. preproc_headers(Headers) when is_map(Headers) -> diff --git a/apps/emqx_dashboard/etc/emqx_dashboard.conf b/apps/emqx_dashboard/etc/emqx_dashboard.conf index 2d54431fa..856779500 100644 --- a/apps/emqx_dashboard/etc/emqx_dashboard.conf +++ b/apps/emqx_dashboard/etc/emqx_dashboard.conf @@ -1,7 +1,7 @@ dashboard { listeners.http { - bind: 18083 + bind = 18083 } - default_username: "admin" - default_password: "public" + default_username = "admin" + default_password = "public" } diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl index 042c9d5d8..9bf981323 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard.erl @@ -92,7 +92,7 @@ start_listeners(Listeners) -> case minirest:start(Name, RanchOptions, Minirest) of {ok, _} -> ?ULOG("Listener ~ts on ~ts started.~n", [ - Name, emqx_listeners:format_addr(Bind) + Name, emqx_listeners:format_bind(Bind) ]), Acc; {error, _Reason} -> @@ -114,7 +114,7 @@ stop_listeners(Listeners) -> case minirest:stop(Name) of ok -> ?ULOG("Stop listener ~ts on ~ts successfully.~n", [ - Name, emqx_listeners:format_addr(Port) + Name, emqx_listeners:format_bind(Port) ]); {error, not_found} -> ?SLOG(warning, #{msg => "stop_listener_failed", name => Name, port => Port}) diff --git a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf index e4f7413d0..9c5de67c3 100644 --- a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf +++ b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf @@ -112,6 +112,13 @@ emqx_gateway_api_listeners { } } + listener_status { + desc { + en: """listener status """ + zh: """监听器状态""" + } + } + listener_node_status { desc { en: """listener status of each node in the cluster""" diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 0f0ec8606..79734bfc0 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -81,7 +81,7 @@ paths() -> listeners(get, #{bindings := #{name := Name0}}) -> with_gateway(Name0, fun(GwName, _) -> - Result = get_cluster_listeners_info(GwName), + Result = lists:map(fun bind2str/1, get_cluster_listeners_info(GwName)), {200, Result} end); listeners(post, #{bindings := #{name := Name0}, body := LConf}) -> @@ -119,7 +119,7 @@ listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) -> with_gateway(Name0, fun(_GwName, _) -> case emqx_gateway_conf:listener(ListenerId) of {ok, Listener} -> - {200, Listener}; + {200, bind2str(Listener)}; {error, not_found} -> return_http_error(404, "Listener not found"); {error, Reason} -> @@ -266,11 +266,14 @@ get_cluster_listeners_info(GwName) -> ClusterStatus ), - {MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus), + {MaxCons, CurrCons, Running} = aggregate_listener_status(NodeStatus), Listener#{ - max_connections => MaxCons, - current_connections => CurrCons, + status => #{ + running => Running, + max_connections => MaxCons, + current_connections => CurrCons + }, node_status => NodeStatus } end, @@ -292,20 +295,23 @@ do_listeners_cluster_status(Listeners) -> fun({Id, ListenOn}, Acc) -> BinId = erlang:atom_to_binary(Id), {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId), - Curr = + {Running, Curr} = try esockd:get_current_connections({Id, ListenOn}) of - Int -> Int + Int -> {true, Int} catch %% not started error:not_found -> - 0 + {false, 0} end, Acc#{ Id => #{ node => Node, - current_connections => Curr, - %% XXX: Since it is taken from raw-conf, it is possible a string - max_connections => int(Max) + status => #{ + running => Running, + current_connections => Curr, + %% XXX: Since it is taken from raw-conf, it is possible a string + max_connections => int(Max) + } } } end, @@ -317,6 +323,31 @@ int(B) when is_binary(B) -> binary_to_integer(B); int(I) when is_integer(I) -> I. +aggregate_listener_status(NodeStatus) -> + aggregate_listener_status(NodeStatus, 0, 0, undefined). + +aggregate_listener_status( + [ + #{status := #{running := Running, max_connections := Max, current_connections := Current}} + | T + ], + MaxAcc, + CurrAcc, + RunningAcc +) -> + NRunning = aggregate_running(Running, RunningAcc), + aggregate_listener_status(T, MaxAcc + Max, Current + CurrAcc, NRunning); +aggregate_listener_status([], MaxAcc, CurrAcc, RunningAcc) -> + {MaxAcc, CurrAcc, RunningAcc}. + +aggregate_running(R, R) -> R; +aggregate_running(R, undefined) -> R; +aggregate_running(_, _) -> inconsistent. + +bind2str(Listener = #{bind := Bind}) -> + Listener#{bind := iolist_to_binary(emqx_listeners:format_bind(Bind))}; +bind2str(Listener = #{<<"bind">> := Bind}) -> + Listener#{<<"bind">> := iolist_to_binary(emqx_listeners:format_bind(Bind))}. %%-------------------------------------------------------------------- %% Swagger defines @@ -590,22 +621,25 @@ params_paging_in_qs() -> roots() -> [listener]. -fields(listener_node_status) -> +fields(listener_status) -> [ - {current_connections, mk(non_neg_integer(), #{desc => ?DESC(current_connections)})}, + {status, + mk(ref(emqx_mgmt_api_listeners, status), #{ + desc => ?DESC(listener_status) + })}, {node_status, mk(hoconsc:array(ref(emqx_mgmt_api_listeners, node_status)), #{ desc => ?DESC(listener_node_status) })} ]; fields(tcp_listener) -> - emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status); + emqx_gateway_api:fields(tcp_listener) ++ fields(listener_status); fields(ssl_listener) -> - emqx_gateway_api:fields(ssl_listener) ++ fields(listener_node_status); + emqx_gateway_api:fields(ssl_listener) ++ fields(listener_status); fields(udp_listener) -> - emqx_gateway_api:fields(udp_listener) ++ fields(listener_node_status); + emqx_gateway_api:fields(udp_listener) ++ fields(listener_status); fields(dtls_listener) -> - emqx_gateway_api:fields(dtls_listener) ++ fields(listener_node_status); + emqx_gateway_api:fields(dtls_listener) ++ fields(listener_status); fields(_) -> []. @@ -623,12 +657,19 @@ listener_node_status_schema() -> examples_listener_list() -> Convert = fun(Cfg) -> Cfg#{ - current_connections => 0, + status => #{ + running => true, + max_connections => 1024000, + current_connections => 10 + }, node_status => [ #{ - node => <<"127.0.0.1">>, - current_connections => 0, - max_connections => 1024000 + node => <<"emqx@127.0.0.1">>, + status => #{ + running => true, + current_connections => 10, + max_connections => 1024000 + } } ] } diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index 50fc069fc..5fe858fa9 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -181,24 +181,11 @@ do_convert_listener(GwName, LType, Conf) -> do_convert_listener2(GwName, LType, LName, LConf) -> ListenerId = emqx_gateway_utils:listener_id(GwName, LType, LName), - Running = emqx_gateway_utils:is_running(ListenerId, LConf), - bind2str( - LConf#{ - id => ListenerId, - type => LType, - name => LName, - running => Running - } - ). - -bind2str(LConf = #{bind := Bind}) when is_integer(Bind) -> - maps:put(bind, integer_to_binary(Bind), LConf); -bind2str(LConf = #{<<"bind">> := Bind}) when is_integer(Bind) -> - maps:put(<<"bind">>, integer_to_binary(Bind), LConf); -bind2str(LConf = #{bind := Bind}) when is_binary(Bind) -> - LConf; -bind2str(LConf = #{<<"bind">> := Bind}) when is_binary(Bind) -> - LConf. + LConf#{ + id => ListenerId, + type => LType, + name => LName + }. get_bind(#{bind := Bind}) -> emqx_gateway_utils:parse_listenon(Bind); diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 6a491de3d..15359dea6 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -37,7 +37,6 @@ -export([ apply/2, - format_listenon/1, parse_listenon/1, unix_ts_to_rfc3339/1, unix_ts_to_rfc3339/2, @@ -165,7 +164,7 @@ start_listener( {Type, LisName, ListenOn, SocketOpts, Cfg}, ModCfg ) -> - ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), + ListenOnStr = emqx_listeners:format_bind(ListenOn), ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LisName), NCfg = maps:merge(Cfg, ModCfg), @@ -243,7 +242,7 @@ stop_listeners(GwName, Listeners) -> -spec stop_listener(GwName :: atom(), Listener :: tuple()) -> ok. stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), - ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), + ListenOnStr = emqx_listeners:format_bind(ListenOn), case StopRet of ok -> console_print( @@ -287,13 +286,6 @@ apply(F, A2) when -> erlang:apply(F, A2). -format_listenon(Port) when is_integer(Port) -> - io_lib:format("0.0.0.0:~w", [Port]); -format_listenon({Addr, Port}) when is_list(Addr) -> - io_lib:format("~ts:~w", [Addr, Port]); -format_listenon({Addr, Port}) when is_tuple(Addr) -> - io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]). - parse_listenon(Port) when is_integer(Port) -> Port; parse_listenon(IpPort) when is_tuple(IpPort) -> diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index 9965956f7..78e70def0 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -167,7 +167,7 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) -> )} ] end, - ListenOnStr = emqx_listeners:format_addr(ListenOn), + ListenOnStr = emqx_listeners:format_bind(ListenOn), case grpc:start_server(GwName, ListenOn, Services, SvrOptions) of {ok, _SvrPid} -> console_print( diff --git a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl index 90a060cd7..fb3207944 100644 --- a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl @@ -340,7 +340,7 @@ t_listeners_tcp(_) -> LisConf = #{ name => <<"def">>, type => <<"tcp">>, - bind => <<"61613">> + bind => <<"127.0.0.1:61613">> }, {201, _} = request(post, "/gateway/stomp/listeners", LisConf), {200, ConfResp} = request(get, "/gateway/stomp/listeners"), @@ -348,7 +348,7 @@ t_listeners_tcp(_) -> {200, ConfResp1} = request(get, "/gateway/stomp/listeners/stomp:tcp:def"), assert_confs(LisConf, ConfResp1), - LisConf2 = maps:merge(LisConf, #{bind => <<"61614">>}), + LisConf2 = maps:merge(LisConf, #{bind => <<"127.0.0.1:61614">>}), {200, _} = request( put, "/gateway/stomp/listeners/stomp:tcp:def", @@ -369,7 +369,7 @@ t_listeners_authn(_) -> #{ name => <<"def">>, type => <<"tcp">>, - bind => <<"61613">> + bind => <<"127.0.0.1:61613">> } ] }, @@ -405,7 +405,7 @@ t_listeners_authn_data_mgmt(_) -> #{ name => <<"def">>, type => <<"tcp">>, - bind => <<"61613">> + bind => <<"127.0.0.1:61613">> } ] }, diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 842ac4bfb..24fe710c7 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -218,11 +218,13 @@ fields(listener_type_status) -> fields(listener_id_status) -> fields(listener_id) ++ [ + {type, ?HOCON(?ENUM(listeners_type()), #{desc => "Listener type", required => true})}, + {name, ?HOCON(string(), #{desc => "Listener name", required => true})}, {enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})}, {number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId counter"})}, {bind, ?HOCON( - hoconsc:union([emqx_schema:ip_port(), integer()]), + emqx_schema:ip_port(), #{desc => "Listener bind addr", required => true} )}, {acceptors, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId acceptors"})}, @@ -231,12 +233,24 @@ fields(listener_id_status) -> ]; fields(status) -> [ + {running, + ?HOCON( + hoconsc:union([inconsistent, boolean()]), + #{desc => "Listener running status", required => true} + )}, {max_connections, ?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})}, {current_connections, ?HOCON(non_neg_integer(), #{desc => "Current connections"})} ]; fields(node_status) -> - fields(node) ++ fields(status); + [ + {"node", + ?HOCON(atom(), #{ + desc => "Node name", + example => "emqx@127.0.0.1" + })}, + {status, ?HOCON(?R_REF(status))} + ]; fields(Type) -> Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}), [Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type], @@ -311,7 +325,7 @@ listener_type_status(get, _Request) -> Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})), List = lists:map( fun({Type, L}) -> - L1 = maps:without([bind, acceptors], L), + L1 = maps:without([bind, acceptors, name], L), L1#{type => Type} end, Listeners @@ -453,7 +467,7 @@ listener_status_by_id(NodeL) -> fun({Id, L}) -> L1 = maps:remove(ids, L), #{node_status := Nodes} = L1, - L1#{number => maps:size(Nodes), id => Id} + L1#{number => length(Nodes), id => Id} end, Listeners ). @@ -510,67 +524,75 @@ wrap_rpc(Res) -> format_status(Key, Node, Listener, Acc) -> #{ <<"id">> := Id, + <<"type">> := Type, + <<"enabled">> := Enabled, <<"running">> := Running, <<"max_connections">> := MaxConnections, <<"current_connections">> := CurrentConnections, <<"acceptors">> := Acceptors, <<"bind">> := Bind } = Listener, + {ok, #{name := Name}} = emqx_listeners:parse_listener_id(Id), GroupKey = maps:get(Key, Listener), case maps:find(GroupKey, Acc) of error -> Acc#{ GroupKey => #{ - enable => Running, + name => Name, + type => Type, + enable => Enabled, ids => [Id], acceptors => Acceptors, - bind => Bind, + bind => iolist_to_binary(emqx_listeners:format_bind(Bind)), status => #{ + running => Running, max_connections => MaxConnections, current_connections => CurrentConnections }, - node_status => #{ - Node => #{ - max_connections => MaxConnections, - current_connections => CurrentConnections + node_status => [ + #{ + node => Node, + status => #{ + running => Running, + max_connections => MaxConnections, + current_connections => CurrentConnections + } } - } + ] } }; {ok, GroupValue} -> #{ ids := Ids, status := #{ + running := Running0, max_connections := MaxConnections0, current_connections := CurrentConnections0 }, node_status := NodeStatus0 } = GroupValue, - NodeStatus = - case maps:find(Node, NodeStatus0) of - error -> - NodeStatus0#{ - Node => #{ - max_connections => MaxConnections, - current_connections => CurrentConnections - } - }; - {ok, #{ - max_connections := PrevMax, - current_connections := PrevCurr - }} -> - NodeStatus0#{ - Node => #{ - max_connections => max_conn(MaxConnections, PrevMax), - current_connections => CurrentConnections + PrevCurr - } - } + NodeStatus = [ + #{ + node => Node, + status => #{ + running => Running, + max_connections => MaxConnections, + current_connections => CurrentConnections + } + } + | NodeStatus0 + ], + NRunning = + case Running == Running0 of + true -> Running0; + _ -> inconsistent end, Acc#{ GroupKey => GroupValue#{ ids => lists:usort([Id | Ids]), status => #{ + running => NRunning, max_connections => max_conn(MaxConnections0, MaxConnections), current_connections => CurrentConnections0 + CurrentConnections }, @@ -605,17 +627,27 @@ listener_type_status_example() -> #{ enable => false, ids => ["tcp:demo"], - node_status => #{ - 'emqx@127.0.0.1' => #{ - current_connections => 11, - max_connections => 1024000 - }, - 'emqx@127.0.0.2' => #{ - current_connections => 10, - max_connections => 1024000 - } - }, + node_status => + [ + #{ + node => 'emqx@127.0.0.1', + status => #{ + running => true, + current_connections => 11, + max_connections => 1024000 + } + }, + #{ + node => 'emqx@127.0.0.1', + status => #{ + running => true, + current_connections => 10, + max_connections => 1024000 + } + } + ], status => #{ + running => true, current_connections => 21, max_connections => 2048000 }, @@ -624,17 +656,28 @@ listener_type_status_example() -> #{ enable => false, ids => ["ssl:default"], - node_status => #{ - 'emqx@127.0.0.1' => #{ - current_connections => 31, - max_connections => infinity - }, - 'emqx@127.0.0.2' => #{ - current_connections => 40, - max_connections => infinity - } - }, + node_status => + [ + #{ + node => 'emqx@127.0.0.1', + status => #{ + running => true, + current_connections => 31, + max_connections => infinity + } + }, + #{ + node => 'emqx@127.0.0.1', + status => #{ + running => true, + current_connections => 40, + max_connections => infinity + } + } + ], + status => #{ + running => true, current_connections => 71, max_connections => infinity }, @@ -649,18 +692,30 @@ listener_id_status_example() -> bind => <<"0.0.0.0:1884">>, enable => true, id => <<"tcp:demo">>, - node_status => #{ - 'emqx@127.0.0.1' => #{ - current_connections => 100, - max_connections => 1024000 - }, - 'emqx@127.0.0.2' => #{ - current_connections => 101, - max_connections => 1024000 - } - }, + type => <<"tcp">>, + name => <<"demo">>, + node_status => + [ + #{ + node => 'emqx@127.0.0.1', + status => #{ + running => true, + current_connections => 100, + max_connections => 1024000 + } + }, + #{ + node => 'emqx@127.0.0.1', + status => #{ + running => true, + current_connections => 101, + max_connections => 1024000 + } + } + ], number => 2, status => #{ + running => true, current_connections => 201, max_connections => 2048000 } @@ -670,18 +725,30 @@ listener_id_status_example() -> bind => <<"0.0.0.0:1883">>, enable => true, id => <<"tcp:default">>, - node_status => #{ - 'emqx@127.0.0.1' => #{ - current_connections => 300, - max_connections => infinity - }, - 'emqx@127.0.0.2' => #{ - current_connections => 201, - max_connections => infinity - } - }, + type => <<"tcp">>, + name => <<"default">>, + node_status => + [ + #{ + node => 'emqx@127.0.0.1', + status => #{ + running => true, + current_connections => 200, + max_connections => infinity + } + }, + #{ + node => 'emqx@127.0.0.1', + status => #{ + running => true, + current_connections => 301, + max_connections => infinity + } + } + ], number => 2, status => #{ + running => true, current_connections => 501, max_connections => infinity } diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index c846c9d58..60b2f3b15 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -582,7 +582,7 @@ listeners([]) -> end, Info = [ - {listen_on, {string, format_listen_on(Bind)}}, + {listen_on, {string, emqx_listeners:format_bind(Bind)}}, {acceptors, Acceptors}, {proxy_protocol, ProxyProtocol}, {running, Running} @@ -802,15 +802,6 @@ indent_print({Key, {string, Val}}) -> indent_print({Key, Val}) -> emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]). -format_listen_on(Port) when is_integer(Port) -> - io_lib:format("0.0.0.0:~w", [Port]); -format_listen_on({Addr, Port}) when is_list(Addr) -> - io_lib:format("~ts:~w", [Addr, Port]); -format_listen_on({Addr, Port}) when is_tuple(Addr) andalso tuple_size(Addr) == 4 -> - io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]); -format_listen_on({Addr, Port}) when is_tuple(Addr) andalso tuple_size(Addr) == 8 -> - io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]). - name(Filter) -> iolist_to_binary(["CLI-", Filter]). diff --git a/mix.exs b/mix.exs index d67868ff8..d49051ab7 100644 --- a/mix.exs +++ b/mix.exs @@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do {:lc, github: "emqx/lc", tag: "0.3.1"}, {:redbug, "2.0.7"}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.2.1"}, + {:ehttpc, github: "emqx/ehttpc", tag: "0.3.0"}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, diff --git a/rebar.config b/rebar.config index 289d261ba..3d3d5968f 100644 --- a/rebar.config +++ b/rebar.config @@ -49,7 +49,7 @@ , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}} - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.1"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.3.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} diff --git a/scripts/relup-test/relup.lux b/scripts/relup-test/relup.lux index 8db8169f8..c05425e71 100644 --- a/scripts/relup-test/relup.lux +++ b/scripts/relup-test/relup.lux @@ -15,7 +15,7 @@ ?SH-PROMPT ## create a webhook data bridge with id "my_webhook" - !curl --user admin:public --silent --show-error 'http://localhost:18083/api/v5/bridges' -X 'POST' -H 'Content-Type: application/json' --data-binary '{"name":"my_webhook","body":"","method":"post","url":"http://webhook.emqx.io:7077/counter","headers":{"content-type":"application/json"},"pool_size":4,"enable_pipelining":100,"connect_timeout":"5s","request_timeout":"5s","max_retries":3,"type":"webhook","ssl":{"enable":false,"verify":"verify_none"}}' | jq '.status' + !curl --user admin:public --silent --show-error 'http://localhost:18083/api/v5/bridges' -X 'POST' -H 'Content-Type: application/json' --data-binary '{"name":"my_webhook","body":"","method":"post","url":"http://webhook.emqx.io:7077/counter","headers":{"content-type":"application/json"},"pool_size":4,"enable_pipelining":100,"connect_timeout":"5s","type":"webhook","ssl":{"enable":false,"verify":"verify_none"}}' | jq '.status' ?connected ?SH-PROMPT