From dfc9141da66ddf520f78d23b430eba24a2b91776 Mon Sep 17 00:00:00 2001 From: Meggielqk <126552073+Meggielqk@users.noreply.github.com> Date: Tue, 28 Mar 2023 11:18:55 +0800 Subject: [PATCH 01/13] docs: Refined Chinese UI Also updated the English hints according to the refined Chinese according to Guowei's comments. Co-authored-by: Kjell Winblad --- .../i18n/emqx_ee_bridge_clickhouse.conf | 36 +++---------------- 1 file changed, 5 insertions(+), 31 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf index 6a28b371a..5c6ae05f2 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf @@ -19,17 +19,8 @@ will be forwarded. } sql_template { desc { - en: """SQL Template. The template string can contain placeholders -for message metadata and payload field. The placeholders are inserted -without any checking and special formatting, so it is important to -ensure that the inserted values are formatted and escaped correctly.""" - zh: - """SQL模板。模板字符串可以包含消息元数据和有效载荷字段的占位符。占位符 -的插入不需要任何检查和特殊格式化,因此必须确保插入的数值格式化和转义正确。模板字符串可以包含占位符 -模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入 -所以必须确保插入的值的格式正确。因此,确保插入的值格式化和转义正确是非常重要的。模板字符串可以包含占位符 -模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入 -所以必须确保插入的值的格式正确。确保插入的值被正确地格式化和转义。""" + en: """The template string can contain ${field} placeholders for message metadata and payload field. Make sure that the inserted values are formatted and escaped correctly. [Prepared Statement](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridges.html#Prepared-Statement) is not supported.""" + zh: """可以使用 ${field} 占位符来引用消息与客户端上下文中的变量,请确保对应字段存在且数据格式符合预期。此处不支持 [SQL 预处理](https://docs.emqx.com/zh/enterprise/v5.0/data-integration/data-bridges.html#sql-预处理)。""" } label { en: "SQL Template" @@ -38,29 +29,12 @@ ensure that the inserted values are formatted and escaped correctly.""" } batch_value_separator { desc { - en: """The bridge repeats what comes after the VALUES or FORMAT FormatType in the -SQL template to form a batch request. The value specified with -this parameter will be inserted between the values. The default -value ',' works for the VALUES format, but other values -might be needed if you specify some other format with the -clickhouse FORMAT syntax. - -See https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ and -https://clickhouse.com/docs/en/interfaces/formats#formats for more information about -the format syntax and the available formats.""" - zh: """桥接会重复 VALUES 或 FORMAT 格式类型之后的内容。中 VALUES 或 -FORMAT FormatType 后面的内容,以形成一个批处理请求。用这个参数指定的值 -这个参数指定的值将被插入到这些值之间。默认的 -默认值','适用于VALUES格式,但是如果你指定了其他的格式,可能需要其他的值。可能需要其他值,如果你用 -"clickhouse FORMAT "语法指定其他格式。语法指定其他格式。 - -参见https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ 和 -https://clickhouse.com/docs/en/interfaces/formats#formats 了解更多关于 -格式语法和可用的格式。""" + en: """The default value ',' works for the VALUES format. You can also use other separator if other format is specified. See [INSERT INTO Statement](https://clickhouse.com/docs/en/sql-reference/statements/insert-into).""" + zh: """默认为逗号 ',',适用于 VALUE 格式。您也可以使用其他分隔符, 请参考 [INSERT INTO 语句](https://clickhouse.com/docs/en/sql-reference/statements/insert-into)。""" } label { en: "Batch Value Separator" - zh: "批量值分离器" + zh: "批量值分隔符" } } config_enable { From 6f6d12f930cfbff084b4ce62184e9c9d20cb0919 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 31 Mar 2023 09:39:04 +0200 Subject: [PATCH 02/13] docs: better Chinese labels Co-authored-by: LenaLenaPan <120552185+LenaLenaPan@users.noreply.github.com> --- lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf index 5c6ae05f2..4cfb4df00 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf @@ -34,7 +34,7 @@ will be forwarded. } label { en: "Batch Value Separator" - zh: "批量值分隔符" + zh: "分隔符" } } config_enable { From 53712e61462e8e617783f2886c2a17387f460ae3 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 30 Mar 2023 15:11:06 +0800 Subject: [PATCH 03/13] fix: running nodes should not include replica nodes --- apps/emqx_conf/src/emqx_conf.app.src | 2 +- apps/emqx_conf/src/emqx_conf_app.erl | 10 +++++----- changes/ce/fix-10313.en.md | 2 ++ changes/ce/fix-10313.zh.md | 2 ++ 4 files changed, 10 insertions(+), 6 deletions(-) create mode 100644 changes/ce/fix-10313.en.md create mode 100644 changes/ce/fix-10313.zh.md diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index fbbffba1f..37707431a 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.1.14"}, + {vsn, "0.1.15"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 34224c3f2..af42f0e1a 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -90,13 +90,13 @@ init_conf() -> emqx_app:set_init_config_load_done(). cluster_nodes() -> - maps:get(running_nodes, ekka_cluster:info()) -- [node()]. + mria_mnesia:running_nodes() -- [node()]. copy_override_conf_from_core_node() -> case cluster_nodes() of %% The first core nodes is self. [] -> - ?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}), + ?SLOG(debug, #{msg => "skip_copy_override_conf_from_core_node"}), {ok, ?DEFAULT_INIT_TXN_ID}; Nodes -> {Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes), @@ -130,7 +130,7 @@ copy_override_conf_from_core_node() -> %% finish the boot sequence and load the %% config for other nodes to copy it. ?SLOG(info, #{ - msg => "skip_copy_overide_conf_from_core_node", + msg => "skip_copy_override_conf_from_core_node", loading_from_disk => true, nodes => Nodes, failed => Failed, @@ -142,7 +142,7 @@ copy_override_conf_from_core_node() -> Jitter = rand:uniform(2_000), Timeout = 10_000 + Jitter, ?SLOG(info, #{ - msg => "copy_overide_conf_from_core_node_retry", + msg => "copy_override_conf_from_core_node_retry", timeout => Timeout, nodes => Nodes, failed => Failed, @@ -155,7 +155,7 @@ copy_override_conf_from_core_node() -> [{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready), #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, ?SLOG(debug, #{ - msg => "copy_overide_conf_from_core_node_success", + msg => "copy_override_conf_from_core_node_success", node => Node, cluster_override_conf_file => application:get_env( emqx, cluster_override_conf_file diff --git a/changes/ce/fix-10313.en.md b/changes/ce/fix-10313.en.md new file mode 100644 index 000000000..ca1a3b391 --- /dev/null +++ b/changes/ce/fix-10313.en.md @@ -0,0 +1,2 @@ +Ensure that when the core or replicant node starting, the `cluster-override.conf` file is only copied from the core node. +Previously, when sorting nodes by startup time, the core node may have copied this file from the replicant node. diff --git a/changes/ce/fix-10313.zh.md b/changes/ce/fix-10313.zh.md new file mode 100644 index 000000000..94f118ece --- /dev/null +++ b/changes/ce/fix-10313.zh.md @@ -0,0 +1,2 @@ +确保当 core 或 replicant 节点启动时,仅从 core 节点复制 `cluster-override.conf` 文件。 +此前按照节点启动时间排序时,core 节点可能从 replicant 节点复制该文件。 From 2f2e8b8218a315560d3f7c1582a8b339da3bdbc2 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 3 Apr 2023 20:23:31 +0200 Subject: [PATCH 04/13] chore: bump version to e5.0.2-rc.3 --- apps/emqx/include/emqx_release.hrl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index d246b4639..b1cd5f98e 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.0.21"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.0.2-rc.2"). +-define(EMQX_RELEASE_EE, "5.0.2-rc.3"). %% the HTTP API version -define(EMQX_API_VERSION, "5.0"). From e1d589420933f55f528b9b2cccc07fda99a4593b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 3 Apr 2023 20:28:29 +0200 Subject: [PATCH 05/13] docs: delete clickhous bridge doc fix change log because the feature is not released yet (new to e5.0.2) --- changes/ee/fix-10289.en.md | 1 - changes/ee/fix-10289.zh.md | 1 - 2 files changed, 2 deletions(-) delete mode 100644 changes/ee/fix-10289.en.md delete mode 100644 changes/ee/fix-10289.zh.md diff --git a/changes/ee/fix-10289.en.md b/changes/ee/fix-10289.en.md deleted file mode 100644 index 65eed7b5d..000000000 --- a/changes/ee/fix-10289.en.md +++ /dev/null @@ -1 +0,0 @@ -Clickhouse has got a fix that makes the error message better when users click the test button in the settings dialog. diff --git a/changes/ee/fix-10289.zh.md b/changes/ee/fix-10289.zh.md deleted file mode 100644 index d47278c16..000000000 --- a/changes/ee/fix-10289.zh.md +++ /dev/null @@ -1 +0,0 @@ -Clickhouse 已经修复了一个问题,当用户在设置对话框中点击测试按钮时,错误信息会更清晰。 From b16c516e6b8eccf7deced1e36908965ba963e98c Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 4 Apr 2023 10:28:47 +0200 Subject: [PATCH 06/13] refactor: rename cluster_rpc_handler to cluster_rpc_cleaner this reflects what is actually does --- ...x_cluster_rpc_handler.erl => emqx_cluster_rpc_cleaner.erl} | 4 +++- apps/emqx_conf/src/emqx_conf_sup.erl | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) rename apps/emqx_conf/src/{emqx_cluster_rpc_handler.erl => emqx_cluster_rpc_cleaner.erl} (97%) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl b/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl similarity index 97% rename from apps/emqx_conf/src/emqx_cluster_rpc_handler.erl rename to apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl index c3d946a91..bce866c2d 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl @@ -13,7 +13,9 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_cluster_rpc_handler). + +%% @doc This module is responsible for cleaning up the cluster RPC MFA. +-module(emqx_cluster_rpc_cleaner). -behaviour(gen_server). diff --git a/apps/emqx_conf/src/emqx_conf_sup.erl b/apps/emqx_conf/src/emqx_conf_sup.erl index d4411af4b..6a3d795ae 100644 --- a/apps/emqx_conf/src/emqx_conf_sup.erl +++ b/apps/emqx_conf/src/emqx_conf_sup.erl @@ -36,7 +36,7 @@ init([]) -> ChildSpecs = [ child_spec(emqx_cluster_rpc, []), - child_spec(emqx_cluster_rpc_handler, []) + child_spec(emqx_cluster_rpc_cleaner, []) ], {ok, {SupFlags, ChildSpecs}}. From 974b180da8a3670bc75bf9428822ae165c50f52c Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 4 Apr 2023 12:39:33 +0200 Subject: [PATCH 07/13] build: fix buildx.sh with git config --- scripts/buildx.sh | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/scripts/buildx.sh b/scripts/buildx.sh index 4f12e0abc..5c3a65369 100755 --- a/scripts/buildx.sh +++ b/scripts/buildx.sh @@ -27,6 +27,13 @@ help() { echo " E.g. ghcr.io/emqx/emqx-builder/5.0-28:1.13.4-24.3.4.2-2-debian11" } +die() { + msg="$1" + echo "$msg" >&2 + help + exit 1 +} + while [ "$#" -gt 0 ]; do case $1 in -h|--help) @@ -81,13 +88,23 @@ while [ "$#" -gt 0 ]; do esac done -if [ -z "${PROFILE:-}" ] || - [ -z "${PKGTYPE:-}" ] || - [ -z "${BUILDER:-}" ] || - [ -z "${ARCH:-}" ]; then - help - exit 1 +## we have a different naming for them +if [[ $(uname -m) == "x86_64" ]]; then + NATIVE_ARCH='amd64' +elif [[ $(uname -m) == "aarch64" ]]; then + NATIVE_ARCH='arm64' +elif [[ $(uname -m) == "arm64" ]]; then + NATIVE_ARCH='arm64' +elif [[ $(uname -m) == "armv7l" ]]; then + # CHECKME: really ? + NATIVE_ARCH='arm64' fi +ARCH="${ARCH:-${NATIVE_ARCH:-}}" + +[ -z "${PROFILE:-}" ] && die "missing --prifile" +[ -z "${PKGTYPE:-}" ] && die "missing --pkgtyp" +[ -z "${BUILDER:-}" ] && die "missing --builder" +[ -z "${ARCH:-}" ] && die "missing --arch" # ensure dir cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.." @@ -128,13 +145,7 @@ if [[ "$HOST_SYSTEM" = "$BUILDER_SYSTEM" ]]; then fi IS_NATIVE_ARCH='no' -if [[ $(uname -m) == "x86_64" && "$ARCH" == "amd64" ]]; then - IS_NATIVE_ARCH='yes' -elif [[ $(uname -m) == "aarch64" && "$ARCH" == "arm64" ]]; then - IS_NATIVE_ARCH='yes' -elif [[ $(uname -m) == "arm64" && "$ARCH" == "arm64" ]]; then - IS_NATIVE_ARCH='yes' -elif [[ $(uname -m) == "armv7l" && "$ARCH" == "arm64" ]]; then +if [[ "$NATIVE_ARCH" == "$ARCH" ]]; then IS_NATIVE_ARCH='yes' fi @@ -151,7 +162,7 @@ elif docker info; then --platform="linux/$ARCH" \ --env ACLOCAL_PATH="/usr/share/aclocal:/usr/local/share/aclocal" \ "$BUILDER" \ - bash -euc "$CMD_RUN" + bash -euc "git config --global --add safe.directory /emqx && $CMD_RUN" else echo "Error: Docker not available on unsupported platform" exit 1; From 8fd9dd741e4eaff402398b0e93aaf8336c47e584 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 4 Apr 2023 12:39:55 +0200 Subject: [PATCH 08/13] fix(emqx_conf_app): wait for tables ready beofre starting apps --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 38 +++++++++++++++++++++++-- apps/emqx_conf/src/emqx_conf_app.erl | 3 ++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 89f678554..d03b57b03 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -270,9 +270,6 @@ fast_forward_to_commit(Node, ToTnxId) -> %% @private init([Node, RetryMs]) -> - %% Workaround for https://github.com/emqx/mria/issues/94: - _ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000), - _ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), State = #{node => Node, retry_interval => RetryMs}, TnxId = emqx_app:get_init_tnx_id(), @@ -281,6 +278,7 @@ init([Node, RetryMs]) -> %% @private handle_continue(?CATCH_UP, State) -> + ok = wait_for_emqx_ready(), {noreply, State, catch_up(State)}. handle_call(reset, _From, State) -> @@ -566,3 +564,37 @@ maybe_init_tnx_id(_Node, TnxId) when TnxId < 0 -> ok; maybe_init_tnx_id(Node, TnxId) -> {atomic, _} = transaction(fun ?MODULE:commit/2, [Node, TnxId]), ok. + +%% @priv Cannot proceed until emqx app is ready. +%% Otherwise the committed transaction catch up may fail. +wait_for_emqx_ready() -> + %% wait 10 seconds for emqx to start + ok = do_wait_for_emqx_ready(10). + +%% Wait for emqx app to be ready, +%% write a log message every 1 second +do_wait_for_emqx_ready(0) -> + timeout; +do_wait_for_emqx_ready(N) -> + %% check interval is 100ms + %% makes the total wait time 1 second + case do_wait_for_emqx_ready2(10) of + ok -> + ok; + timeout -> + ?SLOG(warning, #{msg => "stil_waiting_for_emqx_app_to_be_ready"}), + do_wait_for_emqx_ready(N - 1) + end. + +%% Wait for emqx app to be ready, +%% check interval is 100ms +do_wait_for_emqx_ready2(0) -> + timeout; +do_wait_for_emqx_ready2(N) -> + case emqx:is_running() of + true -> + ok; + false -> + timer:sleep(100), + do_wait_for_emqx_ready2(N - 1) + end. diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index af42f0e1a..e342436dc 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -84,6 +84,9 @@ init_load() -> -endif. init_conf() -> + %% Workaround for https://github.com/emqx/mria/issues/94: + _ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000), + _ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]), {ok, TnxId} = copy_override_conf_from_core_node(), emqx_app:set_init_tnx_id(TnxId), init_load(), From 0b6fd7fe145aaee2c39ae12998c6706cf5e3daec Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 16 Mar 2023 13:33:52 -0300 Subject: [PATCH 09/13] fix(buffer_worker): check request timeout and health check interval Port of https://github.com/emqx/emqx/pull/10154 for `release-50` Fixes https://emqx.atlassian.net/browse/EMQX-9099 Originally, the `resume_interval`, which is what defines how often a buffer worker will attempt to retry its inflight window, was set to the same as the `health_check_interval`. This had the problem that, with default values, `health_check_interval = request_timeout`. This meant that, if a buffer worker with those configs were ever blocked, all requests would have timed out by the time it retried them. Here we change the default `resume_interval` to a reasonable value dependent on `health_check_interval` and `request_timeout`, and also expose that as a hidden parameter for fine tuning if necessary. --- .../i18n/emqx_resource_schema_i18n.conf | 11 +++++++++++ .../src/emqx_resource_buffer_worker.erl | 17 ++++++++++++++++- .../src/schema/emqx_resource_schema.erl | 7 +++++++ changes/ce/fix-10154.en.md | 8 ++++++++ .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 14 ++++++++++++-- 5 files changed, 54 insertions(+), 3 deletions(-) create mode 100644 changes/ce/fix-10154.en.md diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 600289b1d..57b109497 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -45,6 +45,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise } } + resume_interval { + desc { + en: """The interval at which the buffer worker attempts to resend failed requests in the inflight window.""" + zh: """在发送失败后尝试重传飞行窗口中的请求的时间间隔。""" + } + label { + en: """Resume Interval""" + zh: """重试时间间隔""" + } + } + start_after_created { desc { en: """Whether start the resource right after created.""" diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 8bfd77e61..648587c25 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -88,6 +88,8 @@ -type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). -type request() :: term(). -type request_from() :: undefined | gen_statem:from(). +-type request_timeout() :: infinity | timer:time(). +-type health_check_interval() :: timer:time(). -type state() :: blocked | running. -type inflight_key() :: integer(). -type data() :: #{ @@ -199,6 +201,8 @@ init({Id, Index, Opts}) -> RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), + DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval), + ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval), Data = #{ id => Id, index => Index, @@ -207,7 +211,7 @@ init({Id, Index, Opts}) -> batch_size => BatchSize, batch_time => BatchTime, queue => Queue, - resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), + resume_interval => ResumeInterval, tref => undefined }, ?tp(buffer_worker_init, #{id => Id, index => Index}), @@ -1679,6 +1683,17 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) -> end, BatchTime. +%% The request timeout should be greater than the resume interval, as +%% it defines how often the buffer worker tries to unblock. If request +%% timeout is <= resume interval and the buffer worker is ever +%% blocked, than all queued requests will basically fail without being +%% attempted. +-spec default_resume_interval(request_timeout(), health_check_interval()) -> timer:time(). +default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) -> + max(1, HealthCheckInterval); +default_resume_interval(RequestTimeout, HealthCheckInterval) -> + max(1, min(HealthCheckInterval, RequestTimeout div 3)). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). adjust_batch_time_test_() -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index fdd65bc3c..b9ed176fe 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -55,6 +55,7 @@ fields("creation_opts") -> [ {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, + {resume_interval, fun resume_interval/1}, {start_after_created, fun start_after_created/1}, {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, @@ -81,6 +82,12 @@ worker_pool_size(default) -> ?WORKER_POOL_SIZE; worker_pool_size(required) -> false; worker_pool_size(_) -> undefined. +resume_interval(type) -> emqx_schema:duration_ms(); +resume_interval(hidden) -> true; +resume_interval(desc) -> ?DESC("resume_interval"); +resume_interval(required) -> false; +resume_interval(_) -> undefined. + health_check_interval(type) -> emqx_schema:duration_ms(); health_check_interval(desc) -> ?DESC("health_check_interval"); health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; diff --git a/changes/ce/fix-10154.en.md b/changes/ce/fix-10154.en.md new file mode 100644 index 000000000..24bc4bae1 --- /dev/null +++ b/changes/ce/fix-10154.en.md @@ -0,0 +1,8 @@ +Change the default `resume_interval` for bridges and connectors to be +the minimum of `health_check_interval` and `request_timeout / 3`. +Also exposes it as a hidden configuration to allow fine tuning. + +Before this change, the default values for `resume_interval` meant +that, if a buffer ever got blocked due to resource errors or high +message volumes, then, by the time the buffer would try to resume its +normal operations, almost all requests would have timed out. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 8424ddff0..f9968ee96 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -520,6 +520,7 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> #{measurements := #{gauge_set := ExpectedValue}} -> ok; #{measurements := #{gauge_set := Value}} -> + ct:pal("events: ~p", [Events]), ct:fail( "gauge ~p didn't reach expected value ~p; last value: ~p", [GaugeName, ExpectedValue, Value] @@ -972,7 +973,13 @@ t_publish_econnrefused(Config) -> ResourceId = ?config(resource_id, Config), %% set pipelining to 1 so that one of the 2 requests is `pending' %% in ehttpc. - {ok, _} = create_bridge(Config, #{<<"pipelining">> => 1}), + {ok, _} = create_bridge( + Config, + #{ + <<"pipelining">> => 1, + <<"resource_opts">> => #{<<"resume_interval">> => <<"15s">>} + } + ), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), assert_empty_metrics(ResourceId), @@ -986,7 +993,10 @@ t_publish_timeout(Config) -> %% requests are done separately. {ok, _} = create_bridge(Config, #{ <<"pipelining">> => 1, - <<"resource_opts">> => #{<<"batch_size">> => 1} + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"resume_interval">> => <<"15s">> + } }), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), From 196ca43fbb88afa78c246ce5e8ea048faafa567f Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 4 Apr 2023 14:19:48 +0200 Subject: [PATCH 10/13] fix(emqx_conf_app): call the right API to retrieve core nodes --- apps/emqx_conf/src/emqx_conf_app.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index e342436dc..b88fa1947 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -93,7 +93,7 @@ init_conf() -> emqx_app:set_init_config_load_done(). cluster_nodes() -> - mria_mnesia:running_nodes() -- [node()]. + mria:cluster_nodes(cores) -- [node()]. copy_override_conf_from_core_node() -> case cluster_nodes() of From aca65ca2d495152d3809ed4d9ba0f33a18e1cae4 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 4 Apr 2023 15:27:29 +0300 Subject: [PATCH 11/13] fix(rule_engine): don't increment unknown counter on unrecoverable errors Closes: EMQX-8786 --- apps/emqx_rule_engine/src/emqx_rule_runtime.erl | 6 +++--- changes/ce/fix-10327.en.md | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 changes/ce/fix-10327.en.md diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index ed6cd22de..153832246 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -514,6 +514,8 @@ inc_action_metrics({error, {recoverable_error, _}}, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); +inc_action_metrics({error, {unrecoverable_error, _}}, RuleId) -> + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'); inc_action_metrics(R, RuleId) -> case is_ok_result(R) of false -> @@ -523,9 +525,7 @@ inc_action_metrics(R, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success') end. -is_ok_result(ok) -> - true; is_ok_result(R) when is_tuple(R) -> ok == erlang:element(1, R); -is_ok_result(ok) -> +is_ok_result(_) -> false. diff --git a/changes/ce/fix-10327.en.md b/changes/ce/fix-10327.en.md new file mode 100644 index 000000000..4fa561779 --- /dev/null +++ b/changes/ce/fix-10327.en.md @@ -0,0 +1,4 @@ +Don't increment 'actions.failed.unknown' rule metrics counter upon receiving unrecoverable bridge errors. +This counter is displayed on the dashboard's rule overview tab ('Action statistics' - 'Unknown'). +The fix is only applicable for synchronous bridges, as all rule actions for asynchronous bridges +are counted as successful (they increment 'actions.success' which is displayed as 'Action statistics' - 'Success'). From 5925ff07c271a9dd108eb46cdceff2b422c01d1d Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 4 Apr 2023 15:20:30 +0200 Subject: [PATCH 12/13] test(emqx_cluster_rpc): fix test cases --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 2 ++ .../emqx_conf/test/emqx_cluster_rpc_SUITE.erl | 22 ++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index d03b57b03..92c8794cd 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -278,6 +278,8 @@ init([Node, RetryMs]) -> %% @private handle_continue(?CATCH_UP, State) -> + %% emqx app must be started before + %% trying to catch up the rpc commit logs ok = wait_for_emqx_ready(), {noreply, State, catch_up(State)}. diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index f7d3c76fd..8cdfcaeea 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -43,6 +43,7 @@ groups() -> []. init_per_suite(Config) -> application:load(emqx_conf), ok = ekka:start(), + ok = emqx_common_test_helpers:start_apps([]), ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), ok = emqx_config:put([node, cluster_call, retry_interval], 1000), meck:new(emqx_alarm, [non_strict, passthrough, no_link]), @@ -53,6 +54,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([]), ekka:stop(), mria:stop(), meck:unload(mria), @@ -255,13 +257,13 @@ t_fast_forward_commit(_Config) -> ), ok. -t_handler_unexpected_msg(_Config) -> - Handler = emqx_cluster_rpc_handler, - OldPid = erlang:whereis(Handler), - ok = gen_server:cast(Handler, unexpected_cast_msg), - ignore = gen_server:call(Handler, unexpected_cast_msg), - erlang:send(Handler, unexpected_info_msg), - NewPid = erlang:whereis(Handler), +t_cleaner_unexpected_msg(_Config) -> + Cleaner = emqx_cluster_cleaner, + OldPid = erlang:whereis(Cleaner), + ok = gen_server:cast(Cleaner, unexpected_cast_msg), + ignore = gen_server:call(Cleaner, unexpected_cast_msg), + erlang:send(Cleaner, unexpected_info_msg), + NewPid = erlang:whereis(Cleaner), ?assertEqual(OldPid, NewPid), ok. @@ -279,8 +281,8 @@ start() -> {ok, Pid1} = emqx_cluster_rpc:start_link(), {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500), {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500), - {ok, Pid4} = emqx_cluster_rpc_handler:start_link(100, 500), - true = erlang:register(emqx_cluster_rpc_handler, Pid4), + {ok, Pid4} = emqx_cluster_rpc_cleaner:start_link(100, 500), + true = erlang:register(emqx_cluster_rpc_cleaner, Pid4), {ok, [Pid1, Pid2, Pid3, Pid4]}. stop() -> @@ -296,7 +298,7 @@ stop() -> end || N <- [?NODE1, ?NODE2, ?NODE3] ], - gen_server:stop(emqx_cluster_rpc_handler, normal, 5000). + gen_server:stop(emqx_cluster_rpc_cleaner, normal, 5000). receive_msg(0, _Msg) -> ok; From d25db3ace4b90fdcd19b86c54ba6780b14e30da4 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 4 Apr 2023 16:38:25 +0200 Subject: [PATCH 13/13] chore: bump version to e5.0.2-rc.4 --- apps/emqx/include/emqx_release.hrl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index b1cd5f98e..76920928b 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.0.21"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.0.2-rc.3"). +-define(EMQX_RELEASE_EE, "5.0.2-rc.4"). %% the HTTP API version -define(EMQX_API_VERSION, "5.0").