From 95a67f390fc82aa1c214335a322a8b879af43375 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 12 May 2023 18:39:51 +0800 Subject: [PATCH 01/16] fix(limiter): adjust type for compatibility --- apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 667a38396..b9643dae4 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -61,7 +61,7 @@ -type limiter_id() :: atom(). -type bucket_name() :: atom(). -type rate() :: infinity | float(). --type burst_rate() :: 0 | float(). +-type burst_rate() :: number(). %% this is a compatible type for the deprecated field and type `capacity`. -type burst() :: burst_rate(). %% the capacity of the token bucket From 5ef2a603a12ac7f8335dcc27eb72bc99faaa0ac8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Fri, 12 May 2023 20:27:36 +0800 Subject: [PATCH 02/16] chore: bump to v5.0.25 --- apps/emqx/include/emqx_release.hrl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 2bb5877f1..1b4f8a6b6 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,7 +32,7 @@ %% `apps/emqx/src/bpapi/README.md' %% Community edition --define(EMQX_RELEASE_CE, "5.0.25-rc.1"). +-define(EMQX_RELEASE_CE, "5.0.25"). %% Enterprise edition -define(EMQX_RELEASE_EE, "5.0.4-alpha.1"). From 46f05056deb3f89836c6a6afc5c24a8adfb4dedc Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 15 May 2023 16:29:52 +0200 Subject: [PATCH 03/16] docs: clarify description of bridge username and password Fixes: https://emqx.atlassian.net/browse/EMQX-9613 --- rel/i18n/emqx_connector_schema_lib.hocon | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rel/i18n/emqx_connector_schema_lib.hocon b/rel/i18n/emqx_connector_schema_lib.hocon index 0e8a2e9a3..2f923c81b 100644 --- a/rel/i18n/emqx_connector_schema_lib.hocon +++ b/rel/i18n/emqx_connector_schema_lib.hocon @@ -13,7 +13,7 @@ database_desc.label: """Database Name""" password.desc: -"""EMQX's password in the external database.""" +"""The password associated with the bridge, used for authentication with the external database.""" password.label: """Password""" @@ -37,7 +37,7 @@ ssl.label: """Enable SSL""" username.desc: -"""EMQX's username in the external database.""" +"""The username associated with the bridge in the external database used for authentication or identification purposes.""" username.label: """Username""" From b1a4f6ea05c0e95e1a2b84ce7affdb9601e982fe Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 15 May 2023 16:37:54 +0200 Subject: [PATCH 04/16] docs: add changelog entry --- changes/ce/fix-10708.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/fix-10708.md diff --git a/changes/ce/fix-10708.md b/changes/ce/fix-10708.md new file mode 100644 index 000000000..617bda24f --- /dev/null +++ b/changes/ce/fix-10708.md @@ -0,0 +1 @@ +Enhanced clarity of the descriptions for the bridge configuration fields (username and password) to better guide users during setup. From 50e7de9db2d55357e17ab69bb928dd92e6dca659 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 16 May 2023 13:30:41 +0800 Subject: [PATCH 05/16] fix(limiter): a bucket with an infinity rate shouldn't be added to limiter server --- .../emqx_limiter/src/emqx_limiter_schema.erl | 1 + .../emqx_limiter/src/emqx_limiter_server.erl | 5 +++-- apps/emqx/test/emqx_ratelimiter_SUITE.erl | 18 ++++++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index b9643dae4..64a8bf7a7 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -36,6 +36,7 @@ calc_capacity/1, extract_with_type/2, default_client_config/0, + default_bucket_config/0, short_paths_fields/1, get_listener_opts/1, get_node_opts/1, diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 488f47851..fcb1fd66c 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -131,6 +131,9 @@ connect(Id, Type, Cfg) -> -spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok. add_bucket(_Id, _Type, undefined) -> ok; +%% a bucket with an infinity rate shouldn't be added to this server, because it is always full +add_bucket(_Id, _Type, #{rate := infinity}) -> + ok; add_bucket(Id, Type, Cfg) -> ?CALL(Type, {add_bucket, Id, Cfg}). @@ -507,8 +510,6 @@ make_root(#{rate := Rate, burst := Burst}) -> correction => 0 }. -do_add_bucket(_Id, #{rate := infinity}, #{root := #{rate := infinity}} = State) -> - State; do_add_bucket(Id, #{rate := Rate} = Cfg, #{buckets := Buckets} = State) -> case maps:get(Id, Buckets, undefined) of undefined -> diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index 6f488eaa9..331fe1b3c 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -617,6 +617,24 @@ t_extract_with_type(_) -> ) ). +t_add_bucket(_) -> + Checker = fun(Size) -> + #{buckets := Buckets} = sys:get_state(emqx_limiter_server:whereis(bytes)), + ?assertEqual(Size, maps:size(Buckets), Buckets) + end, + DefBucket = emqx_limiter_schema:default_bucket_config(), + ?assertEqual(ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, undefined)), + Checker(0), + ?assertEqual(ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, DefBucket)), + Checker(0), + ?assertEqual( + ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, DefBucket#{rate := 100}) + ), + Checker(1), + ?assertEqual(ok, emqx_limiter_server:del_bucket(?FUNCTION_NAME, bytes)), + Checker(0), + ok. + %%-------------------------------------------------------------------- %% Test Cases Create Instance %%-------------------------------------------------------------------- From 7d7c069257a3f4a2ee9d74308edc75ad7dde2615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Tue, 16 May 2023 15:32:43 +0800 Subject: [PATCH 06/16] feat: update wehbook's request_timeout into resource_opts --- apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- apps/emqx_bridge/src/emqx_bridge_resource.erl | 26 +++++++--- .../src/schema/emqx_bridge_schema.erl | 49 +------------------ .../src/schema/emqx_bridge_webhook_schema.erl | 28 ++++++++--- 4 files changed, 42 insertions(+), 63 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index e408250be..d2bf0f0c2 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.18"}, + {vsn, "0.1.19"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 0d2feef83..a756f535e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -165,20 +165,20 @@ create(BridgeId, Conf) -> create(Type, Name, Conf) -> create(Type, Name, Conf, #{}). -create(Type, Name, Conf, Opts0) -> +create(Type, Name, Conf, Opts) -> ?SLOG(info, #{ msg => "create bridge", type => Type, name => Name, config => emqx_utils:redact(Conf) }), - Opts = override_start_after_created(Conf, Opts0), + TypeBin = bin(Type), {ok, _Data} = emqx_resource:create_local( resource_id(Type, Name), <<"emqx_bridge">>, bridge_to_resource_type(Type), - parse_confs(bin(Type), Name, Conf), - Opts + parse_confs(TypeBin, Name, Conf), + parse_opts(TypeBin, Conf, Opts) ), ok. @@ -189,7 +189,7 @@ update(BridgeId, {OldConf, Conf}) -> update(Type, Name, {OldConf, Conf}) -> update(Type, Name, {OldConf, Conf}, #{}). -update(Type, Name, {OldConf, Conf}, Opts0) -> +update(Type, Name, {OldConf, Conf}, Opts) -> %% TODO: sometimes its not necessary to restart the bridge connection. %% %% - if the connection related configs like `servers` is updated, we should restart/start @@ -198,7 +198,6 @@ update(Type, Name, {OldConf, Conf}, Opts0) -> %% the `method` or `headers` of a WebHook is changed, then the bridge can be updated %% without restarting the bridge. %% - Opts = override_start_after_created(Conf, Opts0), case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of false -> ?SLOG(info, #{ @@ -241,11 +240,12 @@ recreate(Type, Name, Conf) -> recreate(Type, Name, Conf, #{}). recreate(Type, Name, Conf, Opts) -> + TypeBin = bin(Type), emqx_resource:recreate_local( resource_id(Type, Name), bridge_to_resource_type(Type), - parse_confs(bin(Type), Name, Conf), - Opts + parse_confs(TypeBin, Name, Conf), + parse_opts(TypeBin, Conf, Opts) ). create_dry_run(Type, Conf0) -> @@ -402,6 +402,16 @@ bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). +parse_opts(Type, Conf, Opts0) -> + Opts1 = override_start_after_created(Conf, Opts0), + override_resource_request_timeout(Type, Conf, Opts1). + +%% Put webhook's http request_timeout into the resource options +override_resource_request_timeout(<<"webhook">>, #{request_timeout := ReqTimeout}, Opts) -> + Opts#{request_timeout => ReqTimeout}; +override_resource_request_timeout(_Type, _Conf, Opts) -> + Opts. + override_start_after_created(Config, Opts) -> Enabled = maps:get(enable, Config, true), StartAfterCreated = Enabled andalso maps:get(start_after_created, Opts, Enabled), diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index f58805b6b..d1755bf73 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -223,51 +223,6 @@ node_name() -> {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. webhook_bridge_converter(Conf0, _HoconOpts) -> - Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee( + emqx_bridge_compatible_config:upgrade_pre_ee( Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 - ), - case Conf1 of - undefined -> - undefined; - _ -> - maps:map( - fun(_Name, Conf) -> - do_convert_webhook_config(Conf) - end, - Conf1 - ) - end. - -do_convert_webhook_config( - #{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf -) -> - %% ok: same values - Conf; -do_convert_webhook_config( - #{ - <<"request_timeout">> := ReqTRootRaw, - <<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw} - } = Conf0 -) -> - %% different values; we set them to the same, if they are valid - %% durations - MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw), - MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw), - case {MReqTRoot, MReqTResource} of - {{ok, ReqTRoot}, {ok, ReqTResource}} -> - {_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}), - Conf1 = emqx_utils_maps:deep_merge( - Conf0, - #{ - <<"request_timeout">> => ReqTRaw, - <<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw} - } - ), - Conf1; - _ -> - %% invalid values; let the type checker complain about - %% that. - Conf0 - end; -do_convert_webhook_config(Conf) -> - Conf. + ). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl index 1540f77bf..83a3dba9b 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl @@ -40,12 +40,15 @@ fields("put") -> fields("get") -> emqx_bridge_schema:status_fields() ++ fields("post"); fields("creation_opts") -> - lists:filter( - fun({K, _V}) -> - not lists:member(K, unsupported_opts()) - end, - emqx_resource_schema:fields("creation_opts") - ). + [ + deprecated_request_timeout() + | lists:filter( + fun({K, _V}) -> + not lists:member(K, unsupported_opts()) + end, + emqx_resource_schema:fields("creation_opts") + ) + ]. desc("config") -> ?DESC("desc_config"); @@ -163,7 +166,8 @@ unsupported_opts() -> [ enable_batch, batch_size, - batch_time + batch_time, + request_timeout ]. %%====================================================================================== @@ -190,3 +194,13 @@ name_field() -> method() -> enum([post, put, get, delete]). + +deprecated_request_timeout() -> + {request_timeout, + mk( + hoconsc:union([infinity, emqx_schema:duration_ms()]), + #{ + default => <<"15s">>, + deprecated => {since, "5.0.26"} + } + )}. From 255f616d261a07c4cb0044003750e825ff5cdcb9 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 16 May 2023 16:36:36 +0800 Subject: [PATCH 07/16] chore: bump emqx app version --- apps/emqx/src/emqx.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 5ca8fc797..be68b438f 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -3,7 +3,7 @@ {id, "emqx"}, {description, "EMQX Core"}, % strict semver, bump manually! - {vsn, "5.0.25"}, + {vsn, "5.0.26"}, {modules, []}, {registered, []}, {applications, [ From a2aa6b46667630a99a3b2b07a17d85f6f473820f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Tue, 16 May 2023 20:57:57 +0800 Subject: [PATCH 08/16] chore: make ci happy again --- .../test/emqx_bridge_api_SUITE.erl | 14 ++++---- .../emqx_bridge_compatible_config_tests.erl | 34 ++++++++----------- changes/ce/feat-10713.en.md | 3 ++ 3 files changed, 23 insertions(+), 28 deletions(-) create mode 100644 changes/ce/feat-10713.en.md diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index d55b92138..1b5e51a11 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1284,19 +1284,17 @@ t_inconsistent_webhook_request_timeouts(Config) -> <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>} } ), - ?assertMatch( - {ok, 201, #{ - %% note: same value on both fields - <<"request_timeout">> := <<"2s">>, - <<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>} - }}, + {ok, 201, #{ + <<"request_timeout">> := <<"1s">>, + <<"resource_opts">> := ResourceOpts + }} = request_json( post, uri(["bridges"]), BadBridgeParams, Config - ) - ), + ), + ?assertNot(maps:is_key(<<"request_timeout">>, ResourceOpts)), ok. %% diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index acafb84ca..3481ac30c 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -59,27 +59,21 @@ webhook_config_test() -> }, check(Conf2) ), - - %% the converter should pick the greater of the two - %% request_timeouts and place them in the root and inside - %% resource_opts. - ?assertMatch( - #{ - <<"bridges">> := #{ - <<"webhook">> := #{ - <<"the_name">> := - #{ - <<"method">> := get, - <<"request_timeout">> := 60_000, - <<"resource_opts">> := #{<<"request_timeout">> := 60_000}, - <<"body">> := <<"${payload}">> - } - } + #{ + <<"bridges">> := #{ + <<"webhook">> := #{ + <<"the_name">> := + #{ + <<"method">> := get, + <<"request_timeout">> := RequestTime, + <<"resource_opts">> := ResourceOpts, + <<"body">> := <<"${payload}">> + } } - }, - check(Conf3) - ), - + } + } = check(Conf3), + ?assertEqual(60_000, RequestTime), + ?assertNot(maps:is_key(<<"requst_timeout">>, ResourceOpts)), ok. up(#{<<"bridges">> := Bridges0} = Conf0) -> diff --git a/changes/ce/feat-10713.en.md b/changes/ce/feat-10713.en.md new file mode 100644 index 000000000..0e28a1a12 --- /dev/null +++ b/changes/ce/feat-10713.en.md @@ -0,0 +1,3 @@ +We deprecated the request_timeout in resource_option of the webhook to keep it consistent with the http request_timeout of the webhook. +From now on, when configuring a webhook through API or configuration files, +it is no longer necessary to configure the request_timeout of the resource. Only configuring the http request_timeout is sufficient, and the request_timeout in the resource will automatically be consistent with the http request_timeout. From 657df05ad9d762fb6e4bf3c2758d02b31c7ae3cc Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 16 May 2023 09:45:54 -0300 Subject: [PATCH 09/16] fix(buffer_worker): avoid setting flush timer when inflight is full MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes https://emqx.atlassian.net/browse/EMQX-9902 When the buffer worker inflight window is full, we don’t need to set a timer to flush the messages again because there’s no more room, and one of the inflight windows will flush the buffer worker by calling `flush_worker`. Currently, we do set the timer on such situation, and this fact combined with the default batch time of 0 yields a busy loop situation where the CPU spins a lot while inflight messages do not return. --- apps/emqx_resource/src/emqx_resource.app.src | 2 +- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 3 +-- changes/ce/fix-10717.en.md | 1 + 3 files changed, 3 insertions(+), 3 deletions(-) create mode 100644 changes/ce/fix-10717.en.md diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 3e264cb3e..3b92f1200 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.15"}, + {vsn, "0.1.16"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 2dd14c46b..8c5311f2a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -495,8 +495,7 @@ flush(Data0) -> {keep_state, Data1}; {_, true} -> ?tp(buffer_worker_flush_but_inflight_full, #{}), - Data2 = ensure_flush_timer(Data1), - {keep_state, Data2}; + {keep_state, Data1}; {_, false} -> ?tp(buffer_worker_flush_before_pop, #{}), {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), diff --git a/changes/ce/fix-10717.en.md b/changes/ce/fix-10717.en.md new file mode 100644 index 000000000..4c33d6971 --- /dev/null +++ b/changes/ce/fix-10717.en.md @@ -0,0 +1 @@ +Fixed an issue where the buffering layer processes could use a lot of CPU when inflight window is full. From cebde87114d0a36d9b55155bfb2dcd67f1595413 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 15 May 2023 09:05:21 -0300 Subject: [PATCH 10/16] fix(pulsar): use a binary duration as default `health_check_interval` Fixes https://emqx.atlassian.net/browse/EMQX-9885 The frontend needs the default value to match the duration (binary) type to display correctly. --- apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src | 2 +- apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index b169aa2c4..1665548ae 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index 18faf0e3b..5a87d8a0c 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -140,7 +140,7 @@ fields(producer_resource_opts) -> lists:filtermap( fun ({health_check_interval = Field, MetaFn}) -> - {true, {Field, override_default(MetaFn, 1_000)}}; + {true, {Field, override_default(MetaFn, <<"1s">>)}}; ({Field, _Meta}) -> lists:member(Field, SupportedOpts) end, From 77cf19c96c0a09ce76dce6215f182bf6c9c0e9f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Tue, 16 May 2023 22:59:08 +0800 Subject: [PATCH 11/16] chore: update 10340's changelog --- changes/ce/fix-10340-en.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 changes/ce/fix-10340-en.md diff --git a/changes/ce/fix-10340-en.md b/changes/ce/fix-10340-en.md new file mode 100644 index 000000000..c9ae7b81b --- /dev/null +++ b/changes/ce/fix-10340-en.md @@ -0,0 +1,6 @@ +Fixed the issue that could lead to crash logs being printed when stopping EMQ X via systemd. +``` +2023-03-29T16:43:25.915761+08:00 [error] Generic server memsup terminating. Reason: {port_died,normal}. Last message: {'EXIT',<0.2117.0>,{port_died,normal}}. State: [{data,[{"Timeout",60000}]},{items,{"Memory Usage",[{"Allocated",929959936},{"Total",3832242176}]}},{items,{"Worst Memory User",[{"Pid",<0.2031.0>},{"Memory",4720472}]}}]. +2023-03-29T16:43:25.924764+08:00 [error] crasher: initial call: memsup:init/1, pid: <0.2116.0>, registered_name: memsup, exit: {{port_died,normal},[{gen_server,handle_common_reply,8,[{file,"gen_server.erl"},{line,811}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}, ancestors: [os_mon_sup,<0.2114.0>], message_queue_len: 0, messages: [], links: [<0.2115.0>], dictionary: [], trap_exit: true, status: running, heap_size: 4185, stack_size: 29, reductions: 187637; neighbours: +2023-03-29T16:43:25.924979+08:00 [error] Supervisor: {local,os_mon_sup}. Context: child_terminated. Reason: {port_died,normal}. Offender: id=memsup,pid=<0.2116.0>. +``` From d4b60c561036db0aa0ab6580067d2bc91e290df5 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Tue, 16 May 2023 18:59:12 +0300 Subject: [PATCH 12/16] chore(rebalance): improve debug logging --- apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src | 2 +- apps/emqx_node_rebalance/src/emqx_node_rebalance.erl | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src index 381001b87..69cf91f4c 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src @@ -1,6 +1,6 @@ {application, emqx_node_rebalance, [ {description, "EMQX Node Rebalance"}, - {vsn, "5.0.0"}, + {vsn, "5.0.1"}, {registered, [ emqx_node_rebalance_sup, emqx_node_rebalance, diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl index 1f2adc565..70c022308 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl @@ -267,6 +267,9 @@ evict_conns(#{donors := DonorNodes, recipients := RecipientNodes, opts := Opts} ConnEvictRate = maps:get(conn_evict_rate, Opts), NodesToEvict = nodes_to_evict(RecipientAvg, DonorNodeCounts), ?SLOG(warning, #{ + donor_conn_avg => DonorAvg, + recipient_conn_avg => RecipientAvg, + thresholds => Thresholds, msg => "node_rebalance_evict_conns", nodes => NodesToEvict, counts => ConnEvictRate @@ -297,6 +300,9 @@ evict_sessions(#{donors := DonorNodes, recipients := RecipientNodes, opts := Opt SessEvictRate = maps:get(sess_evict_rate, Opts), NodesToEvict = nodes_to_evict(RecipientAvg, DonorNodeCounts), ?SLOG(warning, #{ + donor_sess_avg => DonorAvg, + recipient_sess_avg => RecipientAvg, + thresholds => Thresholds, msg => "node_rebalance_evict_sessions", nodes => NodesToEvict, counts => SessEvictRate From 85089a32109bd6057181fc07e52b6fb3e1d03bdc Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 16 May 2023 17:15:42 -0300 Subject: [PATCH 13/16] fix(buffer_worker): correctly flush the buffer workers when inflight table room is made The previous commit uncovered another bug that was hidden by it: `maybe_flush_after_async_reply` was sending a message to the wrong PID. It was sending a message to `self()` meaning to target a buffer worker, but `self()` in that context is never the buffer worker, it's the connector's worker. This change also revealed a race condition where the buffer workers could stop flushing messages. So we piggy-backed on the atomic update of the table size count to check if the buffer worker should be poked to continue flushing. This allows us to get rid of `maybe_flush_after_async_reply` altogether. --- .../src/emqx_resource_buffer_worker.erl | 102 +++++++++--------- .../test/emqx_resource_SUITE.erl | 6 +- 2 files changed, 57 insertions(+), 51 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 8c5311f2a..6145c3d87 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -70,18 +70,6 @@ -define(RETRY_IDX, 3). -define(WORKER_MREF_IDX, 4). --define(ENSURE_ASYNC_FLUSH(InflightTID, EXPR), - (fun() -> - IsFullBefore = is_inflight_full(InflightTID), - case (EXPR) of - blocked -> - ok; - ok -> - ok = maybe_flush_after_async_reply(IsFullBefore) - end - end)() -). - -type id() :: binary(). -type index() :: pos_integer(). -type expire_at() :: infinity | integer(). @@ -337,7 +325,8 @@ resume_from_blocked(Data) -> {next_state, running, Data} end; {expired, Ref, Batch} -> - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + WorkerPid = self(), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)), ?tp(buffer_worker_retry_expired, #{expired => Batch}), resume_from_blocked(Data); @@ -389,7 +378,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> {keep_state, Data0, {state_timeout, ResumeT, unblock}}; %% Send ok or failed but the resource is working {ack, PostFn} -> - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + WorkerPid = self(), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), %% we need to defer bumping the counters after %% `inflight_drop' to avoid the race condition when an %% inflight request might get completed concurrently with @@ -595,13 +585,14 @@ do_flush( %% must ensure the async worker is being monitored for %% such requests. IsUnrecoverableError = is_unrecoverable_error(Result), + WorkerPid = self(), case is_async_return(Result) of true when IsUnrecoverableError -> - ack_inflight(InflightTID, Ref, Id, Index); + ack_inflight(InflightTID, Ref, Id, Index, WorkerPid); true -> ok; false -> - ack_inflight(InflightTID, Ref, Id, Index) + ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) end, {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), @@ -679,13 +670,14 @@ do_flush(#{queue := Q1} = Data0, #{ %% must ensure the async worker is being monitored for %% such requests. IsUnrecoverableError = is_unrecoverable_error(Result), + WorkerPid = self(), case is_async_return(Result) of true when IsUnrecoverableError -> - ack_inflight(InflightTID, Ref, Id, Index); + ack_inflight(InflightTID, Ref, Id, Index, WorkerPid); true -> ok; false -> - ack_inflight(InflightTID, Ref, Id, Index) + ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) end, {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), @@ -1005,7 +997,7 @@ handle_async_reply( discard -> ok; continue -> - ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_reply1(ReplyContext, Result)) + handle_async_reply1(ReplyContext, Result) end. handle_async_reply1( @@ -1014,6 +1006,7 @@ handle_async_reply1( inflight_tid := InflightTID, resource_id := Id, worker_index := Index, + buffer_worker := WorkerPid, min_query := ?QUERY(_, _, _, ExpireAt) = _Query } = ReplyContext, Result @@ -1025,7 +1018,7 @@ handle_async_reply1( Now = now_(), case is_expired(ExpireAt, Now) of true -> - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), ?tp(handle_async_reply_expired, #{expired => [_Query]}), ok; @@ -1039,7 +1032,7 @@ do_handle_async_reply( resource_id := Id, request_ref := Ref, worker_index := Index, - buffer_worker := Pid, + buffer_worker := WorkerPid, inflight_tid := InflightTID, min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query }, @@ -1062,10 +1055,10 @@ do_handle_async_reply( nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ok = ?MODULE:block(Pid), + ok = ?MODULE:block(WorkerPid), blocked; ack -> - ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) + ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) end. handle_async_batch_reply( @@ -1080,7 +1073,7 @@ handle_async_batch_reply( discard -> ok; continue -> - ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_batch_reply1(ReplyContext, Result)) + handle_async_batch_reply1(ReplyContext, Result) end. handle_async_batch_reply1( @@ -1118,6 +1111,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> #{ resource_id := Id, worker_index := Index, + buffer_worker := WorkerPid, inflight_tid := InflightTID, request_ref := Ref, min_batch := Batch @@ -1140,7 +1134,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> case RealNotExpired of [] -> %% all expired, no need to update back the inflight batch - _ = ack_inflight(InflightTID, Ref, Id, Index), + _ = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), ok; _ -> %% some queries are not expired, put them back to the inflight batch @@ -1151,7 +1145,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> do_handle_async_batch_reply( #{ - buffer_worker := Pid, + buffer_worker := WorkerPid, resource_id := Id, worker_index := Index, inflight_tid := InflightTID, @@ -1172,14 +1166,14 @@ do_handle_async_batch_reply( nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ok = ?MODULE:block(Pid), + ok = ?MODULE:block(WorkerPid), blocked; ack -> - ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) + ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) end. -do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) -> - IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index), +do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) -> + IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), case maps:get(simple_query, QueryOpts, false) of true -> PostFn(); @@ -1190,18 +1184,6 @@ do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) -> end, ok. -maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) -> - %% inflight was not full before async reply is handled, - %% after it is handled, the inflight table must be even smaller - %% hance we can rely on the buffer worker's flush timer to trigger - %% the next flush - ?tp(skip_flushing_worker, #{}), - ok; -maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> - %% the inflight table was full before handling aync reply - ?tp(do_flushing_worker, #{}), - ok = ?MODULE:flush_worker(self()). - %% check if the async reply is valid. %% e.g. if a connector evaluates the callback more than once: %% 1. If the request was previously deleted from inflight table due to @@ -1428,9 +1410,9 @@ store_async_worker_reference(InflightTID, Ref, WorkerMRef) when ), ok. -ack_inflight(undefined, _Ref, _Id, _Index) -> +ack_inflight(undefined, _Ref, _Id, _Index, _WorkerPid) -> false; -ack_inflight(InflightTID, Ref, Id, Index) -> +ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) -> {Count, Removed} = case ets:take(InflightTID, Ref) of [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] -> @@ -1440,7 +1422,11 @@ ack_inflight(InflightTID, Ref, Id, Index) -> [] -> {0, false} end, - ok = dec_inflight_remove(InflightTID, Count, Removed), + FlushCheck = dec_inflight_remove(InflightTID, Count, Removed), + case FlushCheck of + continue -> ok; + flush -> ?MODULE:flush_worker(WorkerPid) + end, IsKnownRef = (Count > 0), case IsKnownRef of true -> @@ -1475,16 +1461,32 @@ inc_inflight(InflightTID, Count) -> _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}), ok. +-spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) -> + continue | flush. dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) -> - ok; + continue; dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) -> - _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), - ok; + NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), + MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0), + %% if the new value is Max - 1, it means that we've just made room + %% in the inflight table, so we should poke the buffer worker to + %% make it continue flushing. + case NewValue =:= MaxValue - 1 of + true -> flush; + false -> continue + end; dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 -> %% If Count > 0, it must have been removed - _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), + NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), - ok. + MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0), + %% if the new value is Max - 1, it means that we've just made room + %% in the inflight table, so we should poke the buffer worker to + %% make it continue flushing. + case NewValue =:= MaxValue - 1 of + true -> flush; + false -> continue + end. dec_inflight_update(_InflightTID, _Count = 0) -> ok; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 809f101a8..fc338b512 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1627,7 +1627,11 @@ t_retry_async_inflight_full(_Config) -> end ] ), - ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), + ?retry( + _Sleep = 300, + _Attempts0 = 20, + ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)) + ), ok. %% this test case is to ensure the buffer worker will not go crazy even From 18043150bea8c294772f96949cf4fd3ae5b42d70 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 17 May 2023 10:55:47 +0800 Subject: [PATCH 14/16] fix: cannot access columns exported by FOREACH in DO clause --- apps/emqx_rule_engine/src/emqx_rule_runtime.erl | 4 ++-- .../emqx_rule_engine/test/emqx_rule_engine_SUITE.erl | 12 ++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index d7412d03c..5b7f962fb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -144,14 +144,14 @@ do_apply_rule( ) of true -> - Collection2 = filter_collection(Columns, InCase, DoEach, Collection), + Collection2 = filter_collection(ColumnsAndSelected, InCase, DoEach, Collection), case Collection2 of [] -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'); _ -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed') end, - NewEnvs = maps:merge(Columns, Envs), + NewEnvs = maps:merge(ColumnsAndSelected, Envs), {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- Collection2]}; false -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index eb253e516..9c3e5513a 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -1735,11 +1735,12 @@ t_sqlparse_foreach_7(_Config) -> ) ). +-define(COLL, #{<<"info">> := [<<"haha">>, #{<<"name">> := <<"cmd1">>, <<"cmd">> := <<"1">>}]}). t_sqlparse_foreach_8(_Config) -> %% Verify foreach-do-incase and cascaded AS Sql = "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info " - "do info.cmd as msg_type, info.name as name " + "do info.cmd as msg_type, info.name as name, s, c " "incase is_map(info) " "from \"t/#\" " "where s.page = '2' ", @@ -1748,7 +1749,14 @@ t_sqlparse_foreach_8(_Config) -> "{\"info\":[\"haha\", {\"name\":\"cmd1\", \"cmd\":\"1\"}]} } }" >>, ?assertMatch( - {ok, [#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}]}, + {ok, [ + #{ + <<"name">> := <<"cmd1">>, + <<"msg_type">> := <<"1">>, + <<"s">> := #{<<"page">> := 2, <<"collection">> := ?COLL}, + <<"c">> := ?COLL + } + ]}, emqx_rule_sqltester:test( #{ sql => Sql, From 1f7ede90a429c7a568965d688de3cfe0fef75cb0 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 17 May 2023 11:01:31 +0800 Subject: [PATCH 15/16] chore: update app version && changes --- apps/emqx_rule_engine/src/emqx_rule_engine.app.src | 2 +- changes/ce/fix-10728.en.md | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) create mode 100644 changes/ce/fix-10728.en.md diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 94a48fb35..8dc78958a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.0.16"}, + {vsn, "5.0.17"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]}, diff --git a/changes/ce/fix-10728.en.md b/changes/ce/fix-10728.en.md new file mode 100644 index 000000000..82bd123e9 --- /dev/null +++ b/changes/ce/fix-10728.en.md @@ -0,0 +1,11 @@ +Fixed an issue where the rule engine was unable to access variables exported by `FOREACH` in the `DO` clause. + + Given a payload: `{"date": "2023-05-06", "array": ["a"]}`, as well as the following SQL statement: + ``` + FOREACH payload.date as date, payload.array as elem + DO date, elem + FROM "t/#" + ``` + Prior to the fix, the `date` variable exported by `FOREACH` could not be accessed in the `DO` clause of the above SQL, resulting in the following output for the SQL statement: + `[{"elem": "a","date": "undefined"}]`. + After the fix, the output of the SQL statement is: `[{"elem": "a","date": "2023-05-06"}]` From 2b99a9b988c336345a1a19bfe98de583a8c01666 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Wed, 17 May 2023 13:31:45 +0800 Subject: [PATCH 16/16] feat: hide resource_opts's request_timeout --- apps/emqx_bridge/src/emqx_bridge_api.erl | 13 +++++++--- apps/emqx_bridge/src/emqx_bridge_resource.erl | 15 ++++-------- .../src/schema/emqx_bridge_schema.erl | 23 ++++++++++++++++-- .../src/schema/emqx_bridge_webhook_schema.erl | 8 +++---- .../test/emqx_bridge_api_SUITE.erl | 24 +++++++++++++++++++ .../emqx_bridge_compatible_config_tests.erl | 2 +- changes/ce/feat-10713.en.md | 2 +- 7 files changed, 65 insertions(+), 22 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index c7e48990b..9802d5fe8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -892,11 +892,18 @@ fill_defaults(Type, RawConf) -> pack_bridge_conf(Type, RawConf) -> #{<<"bridges">> => #{bin(Type) => #{<<"foo">> => RawConf}}}. -unpack_bridge_conf(Type, PackedConf) -> - #{<<"bridges">> := Bridges} = PackedConf, - #{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges), +%% Hide webhook's resource_opts.request_timeout from user. +filter_raw_conf(<<"webhook">>, RawConf0) -> + emqx_utils_maps:deep_remove([<<"resource_opts">>, <<"request_timeout">>], RawConf0); +filter_raw_conf(_TypeBin, RawConf) -> RawConf. +unpack_bridge_conf(Type, PackedConf) -> + TypeBin = bin(Type), + #{<<"bridges">> := Bridges} = PackedConf, + #{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges), + filter_raw_conf(TypeBin, RawConf). + is_ok(ok) -> ok; is_ok(OkResult = {ok, _}) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index a756f535e..60ee242d1 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -178,7 +178,7 @@ create(Type, Name, Conf, Opts) -> <<"emqx_bridge">>, bridge_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), - parse_opts(TypeBin, Conf, Opts) + parse_opts(Conf, Opts) ), ok. @@ -245,7 +245,7 @@ recreate(Type, Name, Conf, Opts) -> resource_id(Type, Name), bridge_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), - parse_opts(TypeBin, Conf, Opts) + parse_opts(Conf, Opts) ). create_dry_run(Type, Conf0) -> @@ -402,15 +402,8 @@ bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). -parse_opts(Type, Conf, Opts0) -> - Opts1 = override_start_after_created(Conf, Opts0), - override_resource_request_timeout(Type, Conf, Opts1). - -%% Put webhook's http request_timeout into the resource options -override_resource_request_timeout(<<"webhook">>, #{request_timeout := ReqTimeout}, Opts) -> - Opts#{request_timeout => ReqTimeout}; -override_resource_request_timeout(_Type, _Conf, Opts) -> - Opts. +parse_opts(Conf, Opts0) -> + override_start_after_created(Conf, Opts0). override_start_after_created(Config, Opts) -> Enabled = maps:get(enable, Config, true), diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index d1755bf73..b590f0cd4 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -223,6 +223,25 @@ node_name() -> {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. webhook_bridge_converter(Conf0, _HoconOpts) -> - emqx_bridge_compatible_config:upgrade_pre_ee( + Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee( Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 - ). + ), + case Conf1 of + undefined -> + undefined; + _ -> + maps:map( + fun(_Name, Conf) -> + do_convert_webhook_config(Conf) + end, + Conf1 + ) + end. + +%% We hide resource_opts.request_timeout from user. +do_convert_webhook_config( + #{<<"request_timeout">> := ReqT, <<"resource_opts">> := ResOpts} = Conf +) -> + Conf#{<<"resource_opts">> => ResOpts#{<<"request_timeout">> => ReqT}}; +do_convert_webhook_config(Conf) -> + Conf. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl index 83a3dba9b..de1e09abb 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl @@ -41,7 +41,7 @@ fields("get") -> emqx_bridge_schema:status_fields() ++ fields("post"); fields("creation_opts") -> [ - deprecated_request_timeout() + hidden_request_timeout() | lists:filter( fun({K, _V}) -> not lists:member(K, unsupported_opts()) @@ -195,12 +195,12 @@ name_field() -> method() -> enum([post, put, get, delete]). -deprecated_request_timeout() -> +hidden_request_timeout() -> {request_timeout, mk( hoconsc:union([infinity, emqx_schema:duration_ms()]), #{ - default => <<"15s">>, - deprecated => {since, "5.0.26"} + required => false, + importance => ?IMPORTANCE_HIDDEN } )}. diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 1b5e51a11..288b1da29 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1295,8 +1295,32 @@ t_inconsistent_webhook_request_timeouts(Config) -> Config ), ?assertNot(maps:is_key(<<"request_timeout">>, ResourceOpts)), + validate_resource_request_timeout(proplists:get_value(group, Config), 1000, Name), ok. +validate_resource_request_timeout(single, Timeout, Name) -> + SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000}, + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), + ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name), + ?check_trace( + begin + {ok, Res} = + ?wait_async_action( + emqx_bridge:send_message(BridgeID, SentData), + #{?snk_kind := async_query}, + 1000 + ), + ?assertMatch({ok, #{id := ResId, query_opts := #{timeout := Timeout}}}, Res) + end, + fun(Trace0) -> + Trace = ?of_kind(async_query, Trace0), + ?assertMatch([#{query_opts := #{timeout := Timeout}}], Trace), + ok + end + ); +validate_resource_request_timeout(_Cluster, _Timeout, _Name) -> + ignore. + %% request(Method, URL, Config) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index 3481ac30c..080ca9107 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -73,7 +73,7 @@ webhook_config_test() -> } } = check(Conf3), ?assertEqual(60_000, RequestTime), - ?assertNot(maps:is_key(<<"requst_timeout">>, ResourceOpts)), + ?assertMatch(#{<<"request_timeout">> := 60_000}, ResourceOpts), ok. up(#{<<"bridges">> := Bridges0} = Conf0) -> diff --git a/changes/ce/feat-10713.en.md b/changes/ce/feat-10713.en.md index 0e28a1a12..6de542be6 100644 --- a/changes/ce/feat-10713.en.md +++ b/changes/ce/feat-10713.en.md @@ -1,3 +1,3 @@ -We deprecated the request_timeout in resource_option of the webhook to keep it consistent with the http request_timeout of the webhook. +We hide the request_timeout in resource_option of the webhook to keep it consistent with the http request_timeout of the webhook. From now on, when configuring a webhook through API or configuration files, it is no longer necessary to configure the request_timeout of the resource. Only configuring the http request_timeout is sufficient, and the request_timeout in the resource will automatically be consistent with the http request_timeout.