diff --git a/.github/workflows/build_and_push_docker_images.yaml b/.github/workflows/build_and_push_docker_images.yaml index e8a080808..57dc2cb45 100644 --- a/.github/workflows/build_and_push_docker_images.yaml +++ b/.github/workflows/build_and_push_docker_images.yaml @@ -43,9 +43,10 @@ jobs: - name: Get profiles to build id: get_profile + env: + INPUTS_PROFILE: ${{ github.event.inputs.profile }} run: | cd source - tag=${{ github.ref }} # tag docker-latest-ce or docker-latest-ee if git describe --tags --exact --match 'docker-latest-*' 2>/dev/null; then echo 'is_latest=true due to docker-latest-* tag' @@ -57,38 +58,33 @@ jobs: echo 'is_latest=false' is_latest=false fi - if git describe --tags --match "[v|e]*" --exact; then + # resolve profile + if git describe --tags --match "v*" --exact; then echo "This is an exact git tag, will publish images" is_exact='true' + PROFILE=emqx + elif git describe --tags --match "e*" --exact; then + echo "This is an exact git tag, will publish images" + is_exact='true' + PROFILE=emqx-enterprise else echo "This is NOT an exact git tag, will not publish images" is_exact='false' fi - case $tag in - refs/tags/v*) - PROFILE='emqx' + + case "${PROFILE:-$INPUTS_PROFILE}" in + emqx) EDITION='Opensource' ;; - refs/tags/e*) - PROFILE=emqx-enterprise + emqx-enterprise) EDITION='Enterprise' ;; *) - PROFILE=${{ github.event.inputs.profile }} - case "$PROFILE" in - emqx) - EDITION='Opensource' - ;; - emqx-enterprise) - EDITION='Enterprise' - ;; - *) - echo "ERROR: Failed to resolve build profile" - exit 1 - ;; - esac + echo "ERROR: Failed to resolve build profile" + exit 1 ;; esac + VSN="$(./pkg-vsn.sh "$PROFILE")" echo "Building emqx/$PROFILE:$VSN image (latest=$is_latest)" echo "Push = $is_exact" diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index 9bd6b7e16..273b63b8f 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -8,6 +8,7 @@ on: push: branches: - master + - release-50 pull_request: # GitHub pull_request action is by default triggered when # opened reopened or synchronize, diff --git a/Makefile b/Makefile index 3d582fafd..2f7ab5244 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ export EMQX_DEFAULT_RUNNER = debian:11-slim export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh) export EMQX_DASHBOARD_VERSION ?= v1.1.8 -export EMQX_EE_DASHBOARD_VERSION ?= e1.0.4-beta.3 +export EMQX_EE_DASHBOARD_VERSION ?= e1.0.4 export EMQX_REL_FORM ?= tgz export QUICER_DOWNLOAD_FROM_RELEASE = 1 ifeq ($(OS),Windows_NT) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index ffaa4aa36..c2326ac95 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,10 +32,10 @@ %% `apps/emqx/src/bpapi/README.md' %% Community edition --define(EMQX_RELEASE_CE, "5.0.19"). +-define(EMQX_RELEASE_CE, "5.0.20"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.0.1-beta.1"). +-define(EMQX_RELEASE_EE, "5.0.1-rc.1"). %% the HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 798153a57..0ecbbfc1a 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -27,7 +27,7 @@ {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.2"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.3"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.36.0"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, diff --git a/apps/emqx/rebar.config.script b/apps/emqx/rebar.config.script index 2025f5ad5..0827570ff 100644 --- a/apps/emqx/rebar.config.script +++ b/apps/emqx/rebar.config.script @@ -24,7 +24,7 @@ IsQuicSupp = fun() -> end, Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}, -Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.111"}}}. +Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.113"}}}. Dialyzer = fun(Config) -> {dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config), diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 3030ccb06..e195107ed 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -3,7 +3,7 @@ {id, "emqx"}, {description, "EMQX Core"}, % strict semver, bump manually! - {vsn, "5.0.18"}, + {vsn, "5.0.19"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index bb4520aa9..a673fa898 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1280,7 +1280,18 @@ fields("listener_wss_opts") -> true ); fields("listener_quic_ssl_opts") -> - server_ssl_opts_schema(#{}, false); + %% Mark unsupported TLS options deprecated. + lists:map( + fun({Name, Schema}) -> + case is_quic_ssl_opts(Name) of + true -> + {Name, Schema}; + false -> + {Name, Schema#{deprecated => {since, "5.0.20"}}} + end + end, + server_ssl_opts_schema(#{}, false) + ); fields("ssl_client_opts") -> client_ssl_opts_schema(#{}); fields("deflate_opts") -> @@ -2841,3 +2852,18 @@ quic_lowlevel_settings_uint(Low, High, Desc) -> desc => Desc } ). + +-spec is_quic_ssl_opts(string()) -> boolean(). +is_quic_ssl_opts(Name) -> + lists:member(Name, [ + "cacertfile", + "certfile", + "keyfile", + "verify" + %% Followings are planned + %% , "password" + %% , "hibernate_after" + %% , "fail_if_no_peer_cert" + %% , "handshake_timeout" + %% , "gc_after_handshake" + ]). diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index dd88b013d..ce3a39dcf 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -537,10 +537,12 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) -> mountpoint => <<>>, zone => default }, - emqx_config:put([listeners, quic, Name], maps:merge(Conf, ExtraSettings)), - case emqx_listeners:start_listener(quic, Name, Conf) of + Conf2 = maps:merge(Conf, ExtraSettings), + emqx_config:put([listeners, quic, Name], Conf2), + case emqx_listeners:start_listener(emqx_listeners:listener_id(quic, Name)) of ok -> ok; - {error, {already_started, _Pid}} -> ok + {error, {already_started, _Pid}} -> ok; + Other -> throw(Other) end. %% diff --git a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl index b0121314c..b0eefba0d 100644 --- a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl +++ b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl @@ -33,7 +33,8 @@ all() -> {group, mstream}, {group, shutdown}, {group, misc}, - t_listener_with_lowlevel_settings + t_listener_with_lowlevel_settings, + t_listener_inval_settings ]. groups() -> @@ -1885,8 +1886,17 @@ t_multi_streams_sub_0_rtt_stream_data_cont(Config) -> ok = emqtt:disconnect(C), ok = emqtt:disconnect(C0). +t_listener_inval_settings(_Config) -> + LPort = select_port(), + %% too small + LowLevelTunings = #{stream_recv_buffer_default => 1024}, + ?assertThrow( + {error, {failed_to_start, _}}, + emqx_common_test_helpers:ensure_quic_listener(?FUNCTION_NAME, LPort, LowLevelTunings) + ). + t_listener_with_lowlevel_settings(_Config) -> - LPort = 24567, + LPort = select_port(), LowLevelTunings = #{ max_bytes_per_key => 274877906, %% In conf schema we use handshake_idle_timeout @@ -1897,7 +1907,7 @@ t_listener_with_lowlevel_settings(_Config) -> %% tls_client_max_send_buffer, tls_server_max_send_buffer => 10240, stream_recv_window_default => 1024, - stream_recv_buffer_default => 1024, + stream_recv_buffer_default => 10240, conn_flow_control_window => 1024, max_stateless_operations => 16, initial_window_packets => 1300, @@ -1936,8 +1946,7 @@ t_listener_with_lowlevel_settings(_Config) -> {ok, _, [_SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ {<<"test/1/3">>, [{qos, 2}]} ]), - ok = emqtt:disconnect(C), - emqx_listeners:stop_listener(emqx_listeners:listener_id(quic, ?FUNCTION_NAME)). + ok = emqtt:disconnect(C). %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx_authz/src/emqx_authz.app.src b/apps/emqx_authz/src/emqx_authz.app.src index f016db09a..943978519 100644 --- a/apps/emqx_authz/src/emqx_authz.app.src +++ b/apps/emqx_authz/src/emqx_authz.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_authz, [ {description, "An OTP application"}, - {vsn, "0.1.14"}, + {vsn, "0.1.15"}, {registered, []}, {mod, {emqx_authz_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index dbc94c943..e46ebb5a1 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -558,7 +558,8 @@ schema("/bridges_probe") -> RequestMeta = #{module => ?MODULE, method => post, path => "/bridges_probe"}, case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of {ok, #{body := #{<<"type">> := ConnType} = Params}} -> - case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of + Params1 = maybe_deobfuscate_bridge_probe(Params), + case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of ok -> {204}; {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> @@ -568,6 +569,18 @@ schema("/bridges_probe") -> BadRequest end. +maybe_deobfuscate_bridge_probe(#{<<"type">> := BridgeType, <<"name">> := BridgeName} = Params) -> + case emqx_bridge:lookup(BridgeType, BridgeName) of + {ok, _} -> + RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}), + deobfuscate(Params, RawConf); + _ -> + %% A bridge may be probed before it's created, so not finding it here is fine + Params + end; +maybe_deobfuscate_bridge_probe(Params) -> + Params. + lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> FormatFun = fun format_bridge_info_without_metrics/1, do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index ed2baec8f..74d2a5ca1 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -17,6 +17,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). -import(hoconsc, [mk/2, ref/2]). @@ -140,11 +141,7 @@ fields(bridges) -> #{ desc => ?DESC("bridges_webhook"), required => false, - converter => fun(X, _HoconOpts) -> - emqx_bridge_compatible_config:upgrade_pre_ee( - X, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 - ) - end + converter => fun webhook_bridge_converter/2 } )}, {mqtt, @@ -212,3 +209,48 @@ status() -> node_name() -> {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. + +webhook_bridge_converter(Conf0, _HoconOpts) -> + Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee( + Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 + ), + case Conf1 of + undefined -> + undefined; + _ -> + do_convert_webhook_config(Conf1) + end. + +do_convert_webhook_config( + #{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf +) -> + %% ok: same values + Conf; +do_convert_webhook_config( + #{ + <<"request_timeout">> := ReqTRootRaw, + <<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw} + } = Conf0 +) -> + %% different values; we set them to the same, if they are valid + %% durations + MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw), + MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw), + case {MReqTRoot, MReqTResource} of + {{ok, ReqTRoot}, {ok, ReqTResource}} -> + {_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}), + Conf1 = emqx_map_lib:deep_merge( + Conf0, + #{ + <<"request_timeout">> => ReqTRaw, + <<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw} + } + ), + Conf1; + _ -> + %% invalid values; let the type checker complain about + %% that. + Conf0 + end; +do_convert_webhook_config(Conf) -> + Conf. diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 02d0b7cd8..d30a9bff8 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -891,6 +891,35 @@ t_metrics(Config) -> ), ok. +%% request_timeout in bridge root should match request_timeout in +%% resource_opts. +t_inconsistent_webhook_request_timeouts(Config) -> + Port = ?config(port, Config), + URL1 = ?URL(Port, "path1"), + Name = ?BRIDGE_NAME, + BadBridgeParams = + emqx_map_lib:deep_merge( + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name), + #{ + <<"request_timeout">> => <<"1s">>, + <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>} + } + ), + {ok, 201, RawResponse} = request( + post, + uri(["bridges"]), + BadBridgeParams + ), + %% note: same value on both fields + ?assertMatch( + #{ + <<"request_timeout">> := <<"2s">>, + <<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>} + }, + emqx_json:decode(RawResponse, [return_maps]) + ), + ok. + operation_path(node, Oper, BridgeID) -> uri(["nodes", node(), "bridges", BridgeID, Oper]); operation_path(cluster, Oper, BridgeID) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index 5e0b4912f..acafb84ca 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -28,6 +28,7 @@ empty_config_test() -> webhook_config_test() -> Conf1 = parse(webhook_v5011_hocon()), Conf2 = parse(full_webhook_v5011_hocon()), + Conf3 = parse(full_webhook_v5019_hocon()), ?assertMatch( #{ @@ -59,6 +60,26 @@ webhook_config_test() -> check(Conf2) ), + %% the converter should pick the greater of the two + %% request_timeouts and place them in the root and inside + %% resource_opts. + ?assertMatch( + #{ + <<"bridges">> := #{ + <<"webhook">> := #{ + <<"the_name">> := + #{ + <<"method">> := get, + <<"request_timeout">> := 60_000, + <<"resource_opts">> := #{<<"request_timeout">> := 60_000}, + <<"body">> := <<"${payload}">> + } + } + } + }, + check(Conf3) + ), + ok. up(#{<<"bridges">> := Bridges0} = Conf0) -> @@ -124,7 +145,7 @@ bridges{ max_retries = 3 method = \"get\" pool_size = 4 - request_timeout = \"5s\" + request_timeout = \"15s\" ssl {enable = false, verify = \"verify_peer\"} url = \"http://localhost:8080\" } @@ -164,6 +185,41 @@ full_webhook_v5011_hocon() -> "}\n" "". +%% does not contain direction +full_webhook_v5019_hocon() -> + "" + "\n" + "bridges{\n" + " webhook {\n" + " the_name{\n" + " body = \"${payload}\"\n" + " connect_timeout = \"5s\"\n" + " enable_pipelining = 100\n" + " headers {\"content-type\" = \"application/json\"}\n" + " max_retries = 3\n" + " method = \"get\"\n" + " pool_size = 4\n" + " pool_type = \"random\"\n" + " request_timeout = \"1m\"\n" + " resource_opts = {\n" + " request_timeout = \"7s\"\n" + " }\n" + " ssl {\n" + " ciphers = \"\"\n" + " depth = 10\n" + " enable = false\n" + " reuse_sessions = true\n" + " secure_renegotiate = true\n" + " user_lookup_fun = \"emqx_tls_psk:lookup\"\n" + " verify = \"verify_peer\"\n" + " versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]\n" + " }\n" + " url = \"http://localhost:8080\"\n" + " }\n" + " }\n" + "}\n" + "". + %% erlfmt-ignore %% this is a generated from v5.0.11 mqtt_v5011_hocon() -> diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl new file mode 100644 index 000000000..4c349c7a0 --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -0,0 +1,267 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_webhook_SUITE). + +%% This suite should contains testcases that are specific for the webhook +%% bridge. There are also some test cases that implicitly tests the webhook +%% bridge in emqx_bridge_api_SUITE + +-compile(nowarn_export_all). +-compile(export_all). + +-import(emqx_mgmt_api_test_util, [request/3, uri/1]). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +groups() -> + []. + +init_per_suite(_Config) -> + emqx_common_test_helpers:render_and_load_app_config(emqx_conf), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource]), + {ok, _} = application:ensure_all_started(emqx_connector), + snabbkaffe:fix_ct_logging(), + []. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge]), + ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), + _ = application:stop(emqx_connector), + _ = application:stop(emqx_bridge), + ok. + +suite() -> + [{timetrap, {seconds, 60}}]. + +%%------------------------------------------------------------------------------ +%% HTTP server for testing +%% (Orginally copied from emqx_bridge_api_SUITE) +%%------------------------------------------------------------------------------ +start_http_server(HTTPServerConfig) -> + ct:pal("Start server\n"), + process_flag(trap_exit, true), + Parent = self(), + {ok, {Port, Sock}} = listen_on_random_port(), + Acceptor = spawn(fun() -> + accept_loop(Sock, Parent, HTTPServerConfig) + end), + timer:sleep(100), + #{port => Port, sock => Sock, acceptor => Acceptor}. + +stop_http_server(#{sock := Sock, acceptor := Acceptor}) -> + ct:pal("Stop server\n"), + exit(Acceptor, kill), + gen_tcp:close(Sock). + +listen_on_random_port() -> + SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], + case gen_tcp:listen(0, SockOpts) of + {ok, Sock} -> + {ok, Port} = inet:port(Sock), + {ok, {Port, Sock}}; + {error, Reason} when Reason =/= eaddrinuse -> + {error, Reason} + end. + +accept_loop(Sock, Parent, HTTPServerConfig) -> + process_flag(trap_exit, true), + case gen_tcp:accept(Sock) of + {ok, Conn} -> + spawn(fun() -> handle_fun_200_ok(Conn, Parent, HTTPServerConfig, <<>>) end), + %%gen_tcp:controlling_process(Conn, Handler), + accept_loop(Sock, Parent, HTTPServerConfig); + {error, closed} -> + %% socket owner died + ok + end. + +make_response(CodeStr, Str) -> + B = iolist_to_binary(Str), + iolist_to_binary( + io_lib:fwrite( + "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s", + [CodeStr, size(B), B] + ) + ). + +handle_fun_200_ok(Conn, Parent, HTTPServerConfig, Acc) -> + ResponseDelayMS = maps:get(response_delay_ms, HTTPServerConfig, 0), + ct:pal("Waiting for request~n"), + case gen_tcp:recv(Conn, 0) of + {ok, ReqStr} -> + ct:pal("The http handler got request: ~p", [ReqStr]), + case parse_http_request(<>) of + {ok, incomplete, NewAcc} -> + handle_fun_200_ok(Conn, Parent, HTTPServerConfig, NewAcc); + {ok, Req, NewAcc} -> + timer:sleep(ResponseDelayMS), + Parent ! {http_server, received, Req}, + gen_tcp:send(Conn, make_response("200 OK", "Request OK")), + handle_fun_200_ok(Conn, Parent, HTTPServerConfig, NewAcc) + end; + {error, closed} -> + ct:pal("http connection closed"); + {error, Reason} -> + ct:pal("the http handler recv error: ~p", [Reason]), + timer:sleep(100), + gen_tcp:close(Conn) + end. + +parse_http_request(ReqStr) -> + try + parse_http_request_assertive(ReqStr) + catch + _:_ -> + {ok, incomplete, ReqStr} + end. + +parse_http_request_assertive(ReqStr0) -> + %% find body length + [_, LengthStr0] = string:split(ReqStr0, "content-length:"), + [LengthStr, _] = string:split(LengthStr0, "\r\n"), + Length = binary_to_integer(string:trim(LengthStr, both)), + %% split between multiple requests + [Method, ReqStr1] = string:split(ReqStr0, " ", leading), + [Path, ReqStr2] = string:split(ReqStr1, " ", leading), + [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading), + [_HeaderStr, Rest] = string:split(ReqStr3, "\r\n\r\n", leading), + <> = Rest, + {ok, #{method => Method, path => Path, body => Body}, Remain}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Helper functions +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +bridge_async_config(#{port := Port} = Config) -> + Type = maps:get(type, Config, <<"webhook">>), + Name = maps:get(name, Config, atom_to_binary(?MODULE)), + PoolSize = maps:get(pool_size, Config, 1), + QueryMode = maps:get(query_mode, Config, "async"), + ConnectTimeout = maps:get(connect_timeout, Config, 1), + RequestTimeout = maps:get(request_timeout, Config, 10000), + ResourceRequestTimeout = maps:get(resouce_request_timeout, Config, "infinity"), + ConfigString = io_lib:format( + "bridges.~s.~s {\n" + " url = \"http://localhost:~p\"\n" + " connect_timeout = \"~ps\"\n" + " enable = true\n" + " enable_pipelining = 100\n" + " max_retries = 2\n" + " method = \"post\"\n" + " pool_size = ~p\n" + " pool_type = \"random\"\n" + " request_timeout = \"~ps\"\n" + " body = \"${id}\"" + " resource_opts {\n" + " async_inflight_window = 100\n" + " auto_restart_interval = \"60s\"\n" + " health_check_interval = \"15s\"\n" + " max_queue_bytes = \"1GB\"\n" + " query_mode = \"~s\"\n" + " request_timeout = \"~s\"\n" + " start_after_created = \"true\"\n" + " start_timeout = \"5s\"\n" + " worker_pool_size = \"1\"\n" + " }\n" + " ssl {\n" + " enable = false\n" + " }\n" + "}\n", + [ + Type, + Name, + Port, + ConnectTimeout, + PoolSize, + RequestTimeout, + QueryMode, + ResourceRequestTimeout + ] + ), + ct:pal(ConfigString), + parse_and_check(ConfigString, Type, Name). + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, + RetConfig. + +make_bridge(Config) -> + Type = <<"webhook">>, + Name = atom_to_binary(?MODULE), + BridgeConfig = bridge_async_config(Config#{ + name => Name, + type => Type + }), + {ok, _} = emqx_bridge:create( + Type, + Name, + BridgeConfig + ), + emqx_bridge_resource:bridge_id(Type, Name). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +%% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed. +%% When the connection time out all the queued requests where dropped in +t_send_async_connection_timeout(_Config) -> + ResponseDelayMS = 90, + #{port := Port} = Server = start_http_server(#{response_delay_ms => 900}), + % Port = 9000, + BridgeID = make_bridge(#{ + port => Port, + pool_size => 1, + query_mode => "async", + connect_timeout => ResponseDelayMS * 2, + request_timeout => 10000, + resouce_request_timeout => "infinity" + }), + NumberOfMessagesToSend = 10, + [ + emqx_bridge:send_message(BridgeID, #{<<"id">> => Id}) + || Id <- lists:seq(1, NumberOfMessagesToSend) + ], + %% Make sure server recive all messages + ct:pal("Sent messages\n"), + MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void), + receive_request_notifications(MessageIDs, ResponseDelayMS), + stop_http_server(Server), + ok. + +receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 -> + ok; +receive_request_notifications(MessageIDs, ResponseDelay) -> + receive + {http_server, received, Req} -> + RemainingMessageIDs = remove_message_id(MessageIDs, Req), + receive_request_notifications(RemainingMessageIDs, ResponseDelay) + after (30 * 1000) -> + ct:pal("Waited to long time but did not get any message\n"), + ct:fail("All requests did not reach server at least once") + end. + +remove_message_id(MessageIDs, #{body := IDBin}) -> + ID = erlang:binary_to_integer(IDBin), + %% It is acceptable to get the same message more than once + maps:without([ID], MessageIDs). diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index dfcf52902..f0d51a9ce 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.1.15"}, + {vsn, "0.1.16"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 29d5136bb..401fc8812 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -566,7 +566,9 @@ bin(Atom) when is_atom(Atom) -> reply_delegator(ReplyFunAndArgs, Result) -> case Result of - {error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> + %% The normal reason happens when the HTTP connection times out before + %% the request has been fully processed + {error, Reason} when Reason =:= econnrefused; Reason =:= timeout; Reason =:= normal -> Result1 = {error, {recoverable_error, Reason}}, emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); _ -> diff --git a/apps/emqx_gateway/src/emqx_gateway.app.src b/apps/emqx_gateway/src/emqx_gateway.app.src index 787af7429..59eed7f3f 100644 --- a/apps/emqx_gateway/src/emqx_gateway.app.src +++ b/apps/emqx_gateway/src/emqx_gateway.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway, [ {description, "The Gateway management application"}, - {vsn, "0.1.12"}, + {vsn, "0.1.13"}, {registered, []}, {mod, {emqx_gateway_app, []}}, {applications, [kernel, stdlib, grpc, emqx, emqx_authn, emqx_ctl]}, diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src index 7c62b0685..0bee30e35 100644 --- a/apps/emqx_machine/src/emqx_machine.app.src +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -3,7 +3,7 @@ {id, "emqx_machine"}, {description, "The EMQX Machine"}, % strict semver, bump manually! - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index f3ac2fd97..fb6b2eb06 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -102,12 +102,12 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise request_timeout { desc { - en: """Timeout for requests. If query_mode is sync, calls to the resource will be blocked for this amount of time before timing out.""" - zh: """请求的超时。 如果query_modesync,对资源的调用将在超时前被阻断这一时间。""" + en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired.""" + zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。""" } label { - en: """Request timeout""" - zh: """请求超时""" + en: """Request Expiry""" + zh: """请求超期""" } } @@ -159,12 +159,12 @@ When disabled the messages are buffered in RAM only.""" batch_time { desc { - en: """Maximum batch waiting interval.""" - zh: """最大批量请求等待时间。""" + en: """Maximum waiting interval when accumulating a batch at a low message rates for more efficient resource usage.""" + zh: """在较低消息率情况下尝试累积批量输出时的最大等待间隔,以提高资源的利用率。""" } label { - en: """Batch time""" - zh: """批量等待间隔""" + en: """Max Batch Wait Time""" + zh: """批量等待最大间隔""" } } diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index fa7f2eb38..be570e694 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -97,8 +97,8 @@ -define(DEFAULT_BATCH_SIZE, 1). %% milliseconds --define(DEFAULT_BATCH_TIME, 20). --define(DEFAULT_BATCH_TIME_RAW, <<"20ms">>). +-define(DEFAULT_BATCH_TIME, 0). +-define(DEFAULT_BATCH_TIME_RAW, <<"0ms">>). %% count -define(DEFAULT_INFLIGHT, 100). diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index cb26c7f09..0cc013099 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 6ddfb5af2..711833963 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -196,13 +196,16 @@ init({Id, Index, Opts}) -> InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightTID = inflight_new(InflightWinSize, Id, Index), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), + RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), + BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), + BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), Data = #{ id => Id, index => Index, inflight_tid => InflightTID, async_workers => #{}, batch_size => BatchSize, - batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), + batch_time => BatchTime, queue => Queue, resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), tref => undefined @@ -1546,6 +1549,12 @@ clear_disk_queue_dir(Id, Index) -> ensure_flush_timer(Data = #{batch_time := T}) -> ensure_flush_timer(Data, T). +ensure_flush_timer(Data = #{tref := undefined}, 0) -> + %% if the batch_time is 0, we don't need to start a timer, which + %% can be costly at high rates. + Ref = make_ref(), + self() ! {flush, Ref}, + Data#{tref => {Ref, Ref}}; ensure_flush_timer(Data = #{tref := undefined}, T) -> Ref = make_ref(), TRef = erlang:send_after(T, self(), {flush, Ref}), @@ -1648,3 +1657,53 @@ do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query. -else. do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt). -endif. + +%% To avoid message loss due to misconfigurations, we adjust +%% `batch_time' based on `request_timeout'. If `batch_time' > +%% `request_timeout', all requests will timeout before being sent if +%% the message rate is low. Even worse if `pool_size' is high. +%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb. +adjust_batch_time(_Id, _RequestTimeout = infinity, BatchTime0) -> + BatchTime0; +adjust_batch_time(Id, RequestTimeout, BatchTime0) -> + BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)), + case BatchTime =:= BatchTime0 of + false -> + ?SLOG(info, #{ + id => Id, + msg => adjusting_buffer_worker_batch_time, + new_batch_time => BatchTime + }); + true -> + ok + end, + BatchTime. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +adjust_batch_time_test_() -> + %% just for logging + Id = some_id, + [ + {"batch time smaller than request_time/2", + ?_assertEqual( + 100, + adjust_batch_time(Id, 500, 100) + )}, + {"batch time equal to request_time/2", + ?_assertEqual( + 100, + adjust_batch_time(Id, 200, 100) + )}, + {"batch time greater than request_time/2", + ?_assertEqual( + 50, + adjust_batch_time(Id, 100, 100) + )}, + {"batch time smaller than request_time/2 (request_time = infinity)", + ?_assertEqual( + 100, + adjust_batch_time(Id, infinity, 100) + )} + ]. +-endif. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 06ed059a4..4c924b824 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.0.9"}, + {vsn, "5.0.10"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 95c028a1e..62e1553d2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -423,7 +423,22 @@ param_path_id() -> %% Internal functions %%------------------------------------------------------------------------------ -err_msg(Msg) -> emqx_misc:readable_error_msg(Msg). +err_msg({RuleError, {_E, Reason, _S}}) -> + emqx_misc:readable_error_msg(encode_nested_error(RuleError, Reason)); +err_msg({Reason, _Details}) -> + emqx_misc:readable_error_msg(Reason); +err_msg(Msg) -> + emqx_misc:readable_error_msg(Msg). + +encode_nested_error(RuleError, Reason) when is_tuple(Reason) -> + encode_nested_error(RuleError, element(1, Reason)); +encode_nested_error(RuleError, Reason) -> + case emqx_json:safe_encode([{RuleError, Reason}]) of + {ok, Json} -> + Json; + _ -> + {RuleError, Reason} + end. format_rule_resp(Rules) when is_list(Rules) -> [format_rule_resp(R) || R <- Rules]; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index 82a305009..d89bc2651 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -40,6 +40,9 @@ end_per_suite(_Config) -> init_per_testcase(_, Config) -> Config. +end_per_testcase(t_crud_rule_api, Config) -> + meck:unload(emqx_json), + end_per_testcase(common, Config); end_per_testcase(_, _Config) -> {200, #{data := Rules}} = emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}), @@ -119,12 +122,54 @@ t_crud_rule_api(_Config) -> emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}) ), + {400, #{ + code := 'BAD_REQUEST', + message := SelectAndTransformJsonError + }} = + emqx_rule_engine_api:'/rule_test'( + post, + test_rule_params(<<"SELECT\n payload.msg\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hel">>) + ), + ?assertMatch( + #{<<"select_and_transform_error">> := <<"decode_json_failed">>}, + emqx_json:decode(SelectAndTransformJsonError, [return_maps]) + ), + {400, #{ + code := 'BAD_REQUEST', + message := SelectAndTransformBadArgError + }} = + emqx_rule_engine_api:'/rule_test'( + post, + test_rule_params( + <<"SELECT\n payload.msg > 1\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hello\"}">> + ) + ), + ?assertMatch( + #{<<"select_and_transform_error">> := <<"badarg">>}, + emqx_json:decode(SelectAndTransformBadArgError, [return_maps]) + ), + {400, #{ + code := 'BAD_REQUEST', + message := BadSqlMessage + }} = emqx_rule_engine_api:'/rule_test'( + post, + test_rule_params( + <<"BAD_SQL">>, <<"{\"msg\": \"hello\"}">> + ) + ), + ?assertMatch({match, _}, re:run(BadSqlMessage, "syntax error")), + meck:expect(emqx_json, safe_encode, 1, {error, foo}), ?assertMatch( {400, #{ code := 'BAD_REQUEST', - message := <<"{select_and_transform_error,{error,{decode_json_failed,", _/binary>> + message := <<"{select_and_transform_error,badarg}">> }}, - emqx_rule_engine_api:'/rule_test'(post, test_rule_params()) + emqx_rule_engine_api:'/rule_test'( + post, + test_rule_params( + <<"SELECT\n payload.msg > 1\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hello\"}">> + ) + ) ), ok. @@ -221,19 +266,18 @@ t_reset_metrics_on_disable(_Config) -> ?assertMatch(#{passed := 0, matched := 0}, Metrics1), ok. -test_rule_params() -> +test_rule_params(Sql, Payload) -> #{ body => #{ <<"context">> => #{ <<"clientid">> => <<"c_emqx">>, <<"event_type">> => <<"message_publish">>, - <<"payload">> => <<"{\"msg\": \"hel">>, + <<"payload">> => Payload, <<"qos">> => 1, <<"topic">> => <<"t/a">>, <<"username">> => <<"u_emqx">> }, - <<"sql">> => - <<"SELECT\n payload.msg\nFROM\n \"t/#\"">> + <<"sql">> => Sql } }. diff --git a/apps/emqx_statsd/src/emqx_statsd.app.src b/apps/emqx_statsd/src/emqx_statsd.app.src index 27f842ce2..67825162e 100644 --- a/apps/emqx_statsd/src/emqx_statsd.app.src +++ b/apps/emqx_statsd/src/emqx_statsd.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_statsd, [ {description, "EMQX Statsd"}, - {vsn, "5.0.5"}, + {vsn, "5.0.6"}, {registered, []}, {mod, {emqx_statsd_app, []}}, {applications, [ diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 770320ddd..75c15fa9e 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -38,7 +38,7 @@ ]). %% Interface --export([start_link/0]). +-export([start_link/1]). %% Internal Exports -export([ @@ -68,17 +68,18 @@ do_restart() -> ok = do_start(), ok. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +start_link(Conf) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, Conf, []). -init([]) -> +init(Conf) -> process_flag(trap_exit, true), #{ tags := TagsRaw, server := Server, sample_time_interval := SampleTimeInterval, flush_time_interval := FlushTimeInterval - } = emqx_conf:get([statsd]), + } = Conf, + FlushTimeInterval1 = flush_interval(FlushTimeInterval, SampleTimeInterval), {Host, Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS), Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw), Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}], @@ -86,7 +87,7 @@ init([]) -> {ok, ensure_timer(#{ sample_time_interval => SampleTimeInterval, - flush_time_interval => FlushTimeInterval, + flush_time_interval => FlushTimeInterval1, estatsd_pid => Pid })}. @@ -129,6 +130,19 @@ terminate(_Reason, #{estatsd_pid := Pid}) -> %% Internal function %%------------------------------------------------------------------------------ +flush_interval(FlushInterval, SampleInterval) when FlushInterval >= SampleInterval -> + FlushInterval; +flush_interval(_FlushInterval, SampleInterval) -> + ?SLOG( + warning, + #{ + msg => + "Configured flush_time_interval is lower than sample_time_interval, " + "setting: flush_time_interval = sample_time_interval." + } + ), + SampleInterval. + ensure_timer(State = #{sample_time_interval := SampleTimeInterval}) -> State#{timer => emqx_misc:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}. diff --git a/apps/emqx_statsd/src/emqx_statsd_config.erl b/apps/emqx_statsd/src/emqx_statsd_config.erl index b818d2691..6bc430956 100644 --- a/apps/emqx_statsd/src/emqx_statsd_config.erl +++ b/apps/emqx_statsd/src/emqx_statsd_config.erl @@ -45,9 +45,9 @@ remove_handler() -> ok = emqx_config_handler:remove_handler(?STATSD), ok. -post_config_update(?STATSD, _Req, #{enable := true}, _Old, _AppEnvs) -> +post_config_update(?STATSD, _Req, #{enable := true} = New, _Old, _AppEnvs) -> emqx_statsd_sup:ensure_child_stopped(?APP), - emqx_statsd_sup:ensure_child_started(?APP); + emqx_statsd_sup:ensure_child_started(?APP, New); post_config_update(?STATSD, _Req, #{enable := false}, _Old, _AppEnvs) -> emqx_statsd_sup:ensure_child_stopped(?APP); post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) -> diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index 2845fb505..35c1d332c 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -25,6 +25,7 @@ -export([ start_link/0, ensure_child_started/1, + ensure_child_started/2, ensure_child_stopped/1 ]). @@ -45,7 +46,11 @@ start_link() -> -spec ensure_child_started(atom()) -> ok. ensure_child_started(Mod) when is_atom(Mod) -> - assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, []))). + ensure_child_started(Mod, emqx_conf:get([statsd], #{})). + +-spec ensure_child_started(atom(), map()) -> ok. +ensure_child_started(Mod, Conf) when is_atom(Mod) -> + assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, [Conf]))). %% @doc Stop the child worker process. -spec ensure_child_stopped(any()) -> ok. @@ -61,9 +66,9 @@ ensure_child_stopped(ChildId) -> init([]) -> Children = - case emqx_conf:get([statsd, enable], false) of - true -> [?CHILD(emqx_statsd, [])]; - false -> [] + case emqx_conf:get([statsd], #{}) of + #{enable := true} = Conf -> [?CHILD(emqx_statsd, [Conf])]; + _ -> [] end, {ok, {{one_for_one, 100, 3600}, Children}}. diff --git a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl index 2f8fa5a69..a203ef7d5 100644 --- a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl +++ b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl @@ -113,6 +113,32 @@ t_kill_exit(_) -> ?assertNotEqual(Estatsd, Estatsd1), ok. +t_config_update(_) -> + OldRawConf = emqx_conf:get_raw([statsd]), + {ok, _} = emqx_statsd_config:update(OldRawConf#{<<"enable">> => true}), + CommonKeys = [flush_time_interval, sample_time_interval], + OldConf = emqx_conf:get([statsd]), + OldStatsDState = sys:get_state(emqx_statsd), + OldPid = erlang:whereis(emqx_statsd), + ?assertEqual(maps:with(CommonKeys, OldConf), maps:with(CommonKeys, OldStatsDState)), + NewRawConfExpect = OldRawConf#{ + <<"flush_time_interval">> := <<"42s">>, + <<"sample_time_interval">> := <<"42s">> + }, + try + {ok, _} = emqx_statsd_config:update(NewRawConfExpect), + NewRawConf = emqx_conf:get_raw([statsd]), + NewConf = emqx_conf:get([statsd]), + NewStatsDState = sys:get_state(emqx_statsd), + NewPid = erlang:whereis(emqx_statsd), + ?assertNotEqual(OldRawConf, NewRawConf), + ?assertEqual(NewRawConfExpect, NewRawConf), + ?assertEqual(maps:with(CommonKeys, NewConf), maps:with(CommonKeys, NewStatsDState)), + ?assertNotEqual(OldPid, NewPid) + after + {ok, _} = emqx_statsd_config:update(OldRawConf) + end. + request(Method) -> request(Method, []). request(Method, Body) -> diff --git a/changes/ce/feat-10059.en.md b/changes/ce/feat-10059.en.md new file mode 100644 index 000000000..2c4de015c --- /dev/null +++ b/changes/ce/feat-10059.en.md @@ -0,0 +1 @@ +Errors returned by rule engine API are formatted in a more human readable way rather than dumping the raw error including the stacktrace. diff --git a/changes/ce/feat-10059.zh.md b/changes/ce/feat-10059.zh.md new file mode 100644 index 000000000..99f8fe8ee --- /dev/null +++ b/changes/ce/feat-10059.zh.md @@ -0,0 +1 @@ +规则引擎 API 返回用户可读的错误信息而不是原始的栈追踪信息。 diff --git a/changes/ce/fix-10058.en.md b/changes/ce/fix-10058.en.md new file mode 100644 index 000000000..337ac5d47 --- /dev/null +++ b/changes/ce/fix-10058.en.md @@ -0,0 +1,7 @@ +Deprecate unused QUIC TLS options. +Only following TLS options are kept for the QUIC listeners: + +- cacertfile +- certfile +- keyfile +- verify diff --git a/changes/ce/fix-10058.zh.md b/changes/ce/fix-10058.zh.md new file mode 100644 index 000000000..d1dea37c3 --- /dev/null +++ b/changes/ce/fix-10058.zh.md @@ -0,0 +1,8 @@ +废弃未使用的 QUIC TLS 选项。 +QUIC 监听器只保留以下 TLS 选项: + +- cacertfile +- certfile +- keyfile +- verify + diff --git a/changes/ce/fix-10076.en.md b/changes/ce/fix-10076.en.md new file mode 100644 index 000000000..5bbbffa32 --- /dev/null +++ b/changes/ce/fix-10076.en.md @@ -0,0 +1,2 @@ +Fix webhook bridge error handling: connection timeout should be a retriable error. +Prior to this fix, connection timeout was classified as unrecoverable error and led to request being dropped. diff --git a/changes/ce/fix-10076.zh.md b/changes/ce/fix-10076.zh.md new file mode 100644 index 000000000..516345f92 --- /dev/null +++ b/changes/ce/fix-10076.zh.md @@ -0,0 +1,2 @@ +修复 HTTP 桥接的一个异常处理:连接超时错误发生后,发生错误的请求可以被重试。 +在此修复前,连接超时后,被当作不可重试类型的错误处理,导致请求被丢弃。 diff --git a/changes/ce/fix-10078.en.md b/changes/ce/fix-10078.en.md new file mode 100644 index 000000000..afb7bcbe0 --- /dev/null +++ b/changes/ce/fix-10078.en.md @@ -0,0 +1,2 @@ +Fix an issue that invalid QUIC listener setting could casue segfault. + diff --git a/changes/ce/fix-10078.zh.md b/changes/ce/fix-10078.zh.md new file mode 100644 index 000000000..47a774d1e --- /dev/null +++ b/changes/ce/fix-10078.zh.md @@ -0,0 +1,2 @@ +修复了无效的 QUIC 监听器设置可能导致 segfault 的问题。 + diff --git a/changes/ce/fix-10084.en.md b/changes/ce/fix-10084.en.md new file mode 100644 index 000000000..90da7d660 --- /dev/null +++ b/changes/ce/fix-10084.en.md @@ -0,0 +1,3 @@ +Fix problem when joining core nodes running different EMQX versions into a cluster. + +[Mria PR](https://github.com/emqx/mria/pull/127) diff --git a/changes/ce/fix-10084.zh.md b/changes/ce/fix-10084.zh.md new file mode 100644 index 000000000..dd44533cf --- /dev/null +++ b/changes/ce/fix-10084.zh.md @@ -0,0 +1,3 @@ +修正将运行不同 EMQX 版本的核心节点加入集群的问题。 + +[Mria PR](https://github.com/emqx/mria/pull/127) diff --git a/changes/ce/fix-10086.en.md b/changes/ce/fix-10086.en.md new file mode 100644 index 000000000..31e8b6453 --- /dev/null +++ b/changes/ce/fix-10086.en.md @@ -0,0 +1,4 @@ +Upgrade HTTP client ehttpc to `0.4.7`. +Prior to this upgrade, HTTP clients for authentication, authorization and webhook may crash +if `body` is empty but content-type HTTP header is set. +For more details see [ehttpc PR#44](https://github.com/emqx/ehttpc/pull/44). diff --git a/changes/ce/fix-10086.zh.md b/changes/ce/fix-10086.zh.md new file mode 100644 index 000000000..b7c110ea4 --- /dev/null +++ b/changes/ce/fix-10086.zh.md @@ -0,0 +1,3 @@ +HTTP 客户端库 `ehttpc` 升级到 0.4.7。 +在升级前,如果 HTTP 客户端,例如 认证,授权,webhook 等配置中使用了content-type HTTP 头,但是没有配置 body,则可能会发生异常。 +详情见 [ehttpc PR#44](https://github.com/emqx/ehttpc/pull/44)。 diff --git a/changes/ee/fix-10087.en.md b/changes/ee/fix-10087.en.md new file mode 100644 index 000000000..fd6e10b7b --- /dev/null +++ b/changes/ee/fix-10087.en.md @@ -0,0 +1,2 @@ +Use default template `${timestamp}` if the `timestamp` config is empty (undefined) when inserting data in InfluxDB. +Prior to this change, InfluxDB bridge inserted a wrong timestamp when template is not provided. diff --git a/changes/ee/fix-10087.zh.md b/changes/ee/fix-10087.zh.md new file mode 100644 index 000000000..e08e61f37 --- /dev/null +++ b/changes/ee/fix-10087.zh.md @@ -0,0 +1,2 @@ +在 InfluxDB 中插入数据时,如果时间戳为空(未定义),则使用默认的占位符 `${timestamp}`。 +在此修复前,如果时间戳字段没有设置,InfluxDB 桥接使用了一个错误的时间戳。 diff --git a/deploy/charts/emqx/Chart.yaml b/deploy/charts/emqx/Chart.yaml index 71c2122b1..bccccb0c0 100644 --- a/deploy/charts/emqx/Chart.yaml +++ b/deploy/charts/emqx/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.0.19 +version: 5.0.20 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.0.19 +appVersion: 5.0.20 diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf index af2a93f82..b8fa3b43a 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf @@ -77,8 +77,8 @@ emqx_ee_bridge_gcp_pubsub { request_timeout { desc { - en: "HTTP request timeout." - zh: "HTTP 请求超时。" + en: "Deprecated: Configure the request timeout in the buffer settings." + zh: "废弃的。在缓冲区设置中配置请求超时。" } label: { en: "Request Timeout" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index c30c927f2..05d893a79 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_bridge, [ {description, "EMQX Enterprise data bridges"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl index e00483839..352a7163a 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl @@ -84,6 +84,7 @@ fields(bridge_config) -> emqx_schema:duration_ms(), #{ required => false, + deprecated => {since, "e5.0.1"}, default => <<"15s">>, desc => ?DESC("request_timeout") } diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 222acb77b..452b7a4d2 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -282,7 +282,6 @@ gcp_pubsub_config(Config) -> "bridges.gcp_pubsub.~s {\n" " enable = true\n" " connect_timeout = 1s\n" - " request_timeout = 1s\n" " service_account_json = ~s\n" " payload_template = ~p\n" " pubsub_topic = ~s\n" diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index c9ef38330..2b2214df0 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -527,7 +527,8 @@ t_start_ok(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"payload">> => Payload + <<"payload">> => Payload, + <<"timestamp">> => erlang:system_time(millisecond) }, ?check_trace( begin @@ -685,7 +686,8 @@ t_const_timestamp(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"payload">> => Payload + <<"payload">> => Payload, + <<"timestamp">> => erlang:system_time(millisecond) }, ?assertEqual(ok, send_message(Config, SentData)), case QueryMode of @@ -740,7 +742,7 @@ t_boolean_variants(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"timestamp">> => erlang:system_time(nanosecond), + <<"timestamp">> => erlang:system_time(millisecond), <<"payload">> => Payload }, ?assertEqual(ok, send_message(Config, SentData)), @@ -805,7 +807,7 @@ t_bad_timestamp(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"timestamp">> => erlang:system_time(nanosecond), + <<"timestamp">> => erlang:system_time(millisecond), <<"payload">> => Payload }, ?check_trace( @@ -949,7 +951,7 @@ t_write_failure(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"timestamp">> => erlang:system_time(nanosecond), + <<"timestamp">> => erlang:system_time(millisecond), <<"payload">> => Payload }, ?check_trace( diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 67a9b4a05..4eeebfaf8 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -540,7 +540,9 @@ resource_configs() -> <<"query_mode">> => <<"sync">>, <<"worker_pool_size">> => <<"1">>, <<"batch_size">> => integer_to_binary(?BATCH_SIZE), - <<"start_timeout">> => <<"15s">> + <<"start_timeout">> => <<"15s">>, + <<"batch_time">> => <<"4s">>, + <<"request_timeout">> => <<"30s">> } }. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 5fcb83baa..6f6bc57c8 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_connector, [ {description, "EMQX Enterprise connectors"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl index 898c36fe0..f07cbceab 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl @@ -33,7 +33,7 @@ connect_timeout := emqx_schema:duration_ms(), max_retries := non_neg_integer(), pubsub_topic := binary(), - request_timeout := emqx_schema:duration_ms(), + resource_opts := #{request_timeout := emqx_schema:duration_ms(), any() => term()}, service_account_json := service_account_json(), any() => term() }. @@ -71,7 +71,7 @@ on_start( payload_template := PayloadTemplate, pool_size := PoolSize, pubsub_topic := PubSubTopic, - request_timeout := RequestTimeout + resource_opts := #{request_timeout := RequestTimeout} } = Config ) -> ?SLOG(info, #{ diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index a1496cabd..5c99a23a8 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -35,11 +35,15 @@ desc/1 ]). +-type ts_precision() :: ns | us | ms | s. + %% influxdb servers don't need parse -define(INFLUXDB_HOST_OPTIONS, #{ default_port => ?INFLUXDB_DEFAULT_PORT }). +-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}"). + %% ------------------------------------------------------------------------------------------------- %% resource callback callback_mode() -> async_if_possible. @@ -232,15 +236,14 @@ do_start_client( ClientConfig, Config = #{write_syntax := Lines} ) -> + Precision = maps:get(precision, Config, ms), case influxdb:start_client(ClientConfig) of {ok, Client} -> case influxdb:is_alive(Client, true) of true -> State = #{ client => Client, - write_syntax => to_config( - Lines, proplists:get_value(precision, ClientConfig) - ) + write_syntax => to_config(Lines, Precision) }, ?SLOG(info, #{ msg => "starting influxdb connector success", @@ -409,27 +412,36 @@ to_config(Lines, Precision) -> to_config([], Acc, _Precision) -> lists:reverse(Acc); to_config([Item0 | Rest], Acc, Precision) -> - Ts = maps:get(timestamp, Item0, undefined), + Ts0 = maps:get(timestamp, Item0, undefined), + {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision), Item = #{ measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)), - timestamp => preproc_tmpl_timestamp(Ts, Precision), + timestamp => Ts, + precision => {FromPrecision, ToPrecision}, tags => to_kv_config(maps:get(tags, Item0)), fields => to_kv_config(maps:get(fields, Item0)) }, to_config(Rest, [Item | Acc], Precision). -preproc_tmpl_timestamp(undefined, <<"ns">>) -> - erlang:system_time(nanosecond); -preproc_tmpl_timestamp(undefined, <<"us">>) -> - erlang:system_time(microsecond); -preproc_tmpl_timestamp(undefined, <<"ms">>) -> - erlang:system_time(millisecond); -preproc_tmpl_timestamp(undefined, <<"s">>) -> - erlang:system_time(second); -preproc_tmpl_timestamp(Ts, _) when is_integer(Ts) -> - Ts; -preproc_tmpl_timestamp(Ts, _) when is_binary(Ts); is_list(Ts) -> - emqx_plugin_libs_rule:preproc_tmpl(Ts). +%% pre-process the timestamp template +%% returns a tuple of three elements: +%% 1. The timestamp template itself. +%% 2. The source timestamp precision (ms if the template ${timestamp} is used). +%% 3. The target timestamp precision (configured for the client). +preproc_tmpl_timestamp(undefined, Precision) -> + %% not configured, we default it to the message timestamp + preproc_tmpl_timestamp(?DEFAULT_TIMESTAMP_TMPL, Precision); +preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) -> + %% a const value is used which is very much unusual, but we have to add a special handling + {Ts, Precision, Precision}; +preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) -> + preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision); +preproc_tmpl_timestamp(<> = Ts, Precision) -> + {emqx_plugin_libs_rule:preproc_tmpl(Ts), ms, Precision}; +preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) -> + %% a placehold is in use. e.g. ${payload.my_timestamp} + %% we can only hope it the value will be of the same precision in the configs + {emqx_plugin_libs_rule:preproc_tmpl(Ts), Precision, Precision}. to_kv_config(KVfields) -> maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)). @@ -472,7 +484,8 @@ parse_batch_data(InstId, BatchData, SyntaxLines) -> fields := [{binary(), binary()}], measurement := binary(), tags := [{binary(), binary()}], - timestamp := emqx_plugin_libs_rule:tmpl_token() | integer() + timestamp := emqx_plugin_libs_rule:tmpl_token() | integer(), + precision := {From :: ts_precision(), To :: ts_precision()} } ]) -> {ok, [map()]} | {error, term()}. data_to_points(Data, SyntaxLines) -> @@ -531,16 +544,27 @@ line_to_point( #{ measurement := Measurement, tags := Tags, - fields := Fields + fields := Fields, + timestamp := Ts, + precision := Precision } = Item ) -> {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), - Item#{ + maps:without([precision], Item#{ measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), tags => EncodedTags, - fields => EncodedFields - }. + fields => EncodedFields, + timestamp => maybe_convert_time_unit(Ts, Precision) + }). + +maybe_convert_time_unit(Ts, {FromPrecision, ToPrecision}) -> + erlang:convert_time_unit(Ts, time_unit(FromPrecision), time_unit(ToPrecision)). + +time_unit(s) -> second; +time_unit(ms) -> millisecond; +time_unit(us) -> microsecond; +time_unit(ns) -> nanosecond. maps_config_to_data(K, V, {Data, Res}) -> KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, diff --git a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl index f5e43c0bb..72fc11a67 100644 --- a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl @@ -227,5 +227,6 @@ test_query() -> {send_message, #{ <<"clientid">> => <<"something">>, <<"payload">> => #{bool => true}, - <<"topic">> => <<"connector_test">> + <<"topic">> => <<"connector_test">>, + <<"timestamp">> => 1678220316257 }}. diff --git a/mix.exs b/mix.exs index 33828d11f..26411e983 100644 --- a/mix.exs +++ b/mix.exs @@ -48,13 +48,13 @@ defmodule EMQXUmbrella.MixProject do {:redbug, "2.0.8"}, {:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.4.6", override: true}, + {:ehttpc, github: "emqx/ehttpc", tag: "0.4.7", override: true}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, {:esockd, github: "emqx/esockd", tag: "5.9.4", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-9", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.14.2", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.14.3", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.8", override: true}, @@ -649,7 +649,7 @@ defmodule EMQXUmbrella.MixProject do defp quicer_dep() do if enable_quicer?(), # in conflict with emqx and emqtt - do: [{:quicer, github: "emqx/quic", tag: "0.0.111", override: true}], + do: [{:quicer, github: "emqx/quic", tag: "0.0.113", override: true}], else: [] end diff --git a/rebar.config b/rebar.config index c774a4f95..6309f9233 100644 --- a/rebar.config +++ b/rebar.config @@ -50,13 +50,13 @@ , {gpb, "4.19.5"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}} - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.6"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.7"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-9"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.2"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.3"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.8"}}} diff --git a/rebar.config.erl b/rebar.config.erl index 9d9b0f874..349770487 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -39,7 +39,7 @@ bcrypt() -> {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}. quicer() -> - {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.111"}}}. + {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.113"}}}. jq() -> {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.9"}}}. diff --git a/scripts/apps-version-check.sh b/scripts/apps-version-check.sh index 473005c9c..797204cc8 100755 --- a/scripts/apps-version-check.sh +++ b/scripts/apps-version-check.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash set -euo pipefail -exit 0 + latest_release=$(git describe --abbrev=0 --tags --exclude '*rc*' --exclude '*alpha*' --exclude '*beta*' --exclude '*docker*') echo "Compare base: $latest_release" diff --git a/scripts/rel/cut.sh b/scripts/rel/cut.sh index 60fe2f28d..2a1c213fe 100755 --- a/scripts/rel/cut.sh +++ b/scripts/rel/cut.sh @@ -19,15 +19,23 @@ RELEASE_GIT_TAG is a 'v*' or 'e*' tag for example: e5.0.0-beta.6 options: - -h|--help: Print this usage. - -b|--base: Specify the current release base branch, can be one of - release-50 - NOTE: this option should be used when --dryrun. - --dryrun: Do not actually create the git tag. - --skip-appup: Skip checking appup - Useful when you are sure that appup is already updated' - --prev-tag: Provide the prev tag to automatically generate changelogs - If this option is absent, the tag found by git describe will be used + -h|--help: Print this usage. + + -b|--base: Specify the current release base branch, can be one of + release-50 + NOTE: this option should be used when --dryrun. + + --dryrun: Do not actually create the git tag. + + --skip-appup: Skip checking appup + Useful when you are sure that appup is already updated' + + --prev-tag : Provide the prev tag to automatically generate changelogs + If this option is absent, the tag found by git describe will be used + + --docker-latest: Set this option to assign :latest tag on the corresponding docker image + in addition to regular : one + NOTE: For 5.0 series the current working branch must be 'release-50' for opensource edition and 'release-e50' for enterprise edition. @@ -40,23 +48,31 @@ EOF logerr() { echo "$(tput setaf 1)ERROR: $1$(tput sgr0)" } + +logwarn() { + echo "$(tput setaf 3)WARNING: $1$(tput sgr0)" +} + logmsg() { echo "INFO: $1" } TAG="${1:-}" +DOCKER_LATEST_TAG= case "$TAG" in v*) TAG_PREFIX='v' PROFILE='emqx' SKIP_APPUP='yes' + DOCKER_LATEST_TAG='docker-latest-ce' ;; e*) TAG_PREFIX='e' PROFILE='emqx-enterprise' #TODO change to no when we are ready to support hot-upgrade SKIP_APPUP='yes' + DOCKER_LATEST_TAG='docker-latest-ee' ;; -h|--help) usage @@ -72,6 +88,7 @@ esac shift 1 DRYRUN='no' +DOCKER_LATEST='no' while [ "$#" -gt 0 ]; do case $1 in -h|--help) @@ -99,6 +116,10 @@ while [ "$#" -gt 0 ]; do PREV_TAG="$1" shift ;; + --docker-latest) + DOCKER_LATEST='yes' + shift + ;; *) logerr "Unknown option $1" exit 1 @@ -180,11 +201,11 @@ assert_release_version() { assert_release_version "$TAG" ## Check if all upstream branches are merged -if [ -z "${BASE_BR:-}" ]; then - ./scripts/rel/sync-remotes.sh -else - ./scripts/rel/sync-remotes.sh --base "$BASE_BR" -fi +SYNC_REMOTES_ARGS= +[ -n "${BASE_BR:-}" ] && SYNC_REMOTES_ARGS="--base $BASE_BR $SYNC_REMOTES_ARGS" +[ "$DRYRUN" = 'yes' ] && SYNC_REMOTES_ARGS="--dryrun $SYNC_REMOTES_ARGS" +# shellcheck disable=SC2086 +./scripts/rel/sync-remotes.sh $SYNC_REMOTES_ARGS ## Check if the Chart versions are in sync ./scripts/rel/check-chart-vsn.sh "$PROFILE" @@ -231,6 +252,9 @@ generate_changelog () { if [ "$DRYRUN" = 'yes' ]; then logmsg "Release tag is ready to be created with command: git tag $TAG" + if [ "$DOCKER_LATEST" = 'yes' ]; then + logmsg "Docker latest tag is ready to be created with command: git tag --force $DOCKER_LATEST_TAG" + fi else case "$TAG" in *rc*) @@ -252,4 +276,14 @@ else esac git tag "$TAG" logmsg "$TAG is created OK." + if [ "$DOCKER_LATEST" = 'yes' ]; then + git tag --force "$DOCKER_LATEST_TAG" + logmsg "$DOCKER_LATEST_TAG is created OK." + fi + logwarn "Don't forget to push the tags!" + if [ "$DOCKER_LATEST" = 'yes' ]; then + echo "git push --atomic --force origin $TAG $DOCKER_LATEST_TAG" + else + echo "git push origin $TAG" + fi fi diff --git a/scripts/rel/sync-remotes.sh b/scripts/rel/sync-remotes.sh index dda910785..eddce0cd7 100755 --- a/scripts/rel/sync-remotes.sh +++ b/scripts/rel/sync-remotes.sh @@ -33,12 +33,17 @@ options: Without this option, the script executes 'git merge' command with '--ff-only' option which conveniently pulls remote updates if there is any, and fails when fast-forward is not possible + + --dryrun: + Do not perform merge. Run the checks, fetch from remote, + and show what's going to happen. EOF } logerr() { echo "$(tput setaf 1)ERROR: $1$(tput sgr0)" } + logwarn() { echo "$(tput setaf 3)WARNING: $1$(tput sgr0)" } @@ -48,6 +53,7 @@ logmsg() { } INTERACTIVE='no' +DRYRUN='no' while [ "$#" -gt 0 ]; do case $1 in -h|--help) @@ -63,6 +69,10 @@ while [ "$#" -gt 0 ]; do BASE_BRANCH="$1" shift ;; + --dryrun) + shift + DRYRUN='yes' + ;; *) logerr "Unknown option $1" exit 1 @@ -151,6 +161,10 @@ upstream_branches() { } for remote_ref in $(upstream_branches "$BASE_BRANCH"); do - logmsg "Merging $remote_ref" - git merge $MERGE_OPTS "$remote_ref" + if [ "$DRYRUN" = 'yes' ]; then + logmsg "Merge with this command: git merge $MERGE_OPTS $remote_ref" + else + logmsg "Merging $remote_ref" + git merge $MERGE_OPTS "$remote_ref" + fi done