From 0d36b179c06e8b314acae175412c4ece93623970 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Mar 2023 11:09:58 -0300 Subject: [PATCH 1/9] docs: fix kafka offset reset policy config description --- lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf index 39b9d48f4..af0afb72b 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -600,9 +600,12 @@ emqx_ee_bridge_kafka { consumer_offset_reset_policy { desc { en: "Defines how the consumers should reset the start offset when " - "a topic partition has and invalid or no initial offset." + "a topic partition has an invalid offset (i.e. when an `OffsetOutOfRange` occurs)." + " Note that this is not the same as the `begin_offset`, which defines where to start" + " consumption when there is no offset committed yet." zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时," - "消费者应如何重置开始偏移量。" + "消费者应如何重置开始偏移量。(即当发生 \"OffsetOutOfRange\" 时)。" + " 请注意,这与`begin_offset'不同,后者定义了在还没有提交偏移量的情况下从哪里开始消费。" } label { en: "Offset Reset Policy" From da5e6f3d0aba123d7ec5ed92ff5b5d95145701ab Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 27 Mar 2023 17:38:34 +0200 Subject: [PATCH 2/9] test: test with only one Kafka partition for bad config recover test --- .../test/emqx_bridge_impl_kafka_consumer_SUITE.erl | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index 15b4fbe40..e5c4a31e7 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -274,7 +274,18 @@ init_per_testcase(TestCase, Config) when [{skip_does_not_apply, true}] end; init_per_testcase(TestCase, Config) when - TestCase =:= t_failed_creation_then_fixed; + TestCase =:= t_failed_creation_then_fixed +-> + %% test with one partiton only for this case because + %% the wait probe may not be always sent to the same partition + HasProxy = proplists:get_value(has_proxy, Config, true), + case HasProxy of + false -> + [{skip_does_not_apply, true}]; + true -> + common_init_per_testcase(TestCase, [{num_partitions, 1} | Config]) + end; +init_per_testcase(TestCase, Config) when TestCase =:= t_on_get_status; TestCase =:= t_receive_after_recovery -> From 5cf09209cd7485ff82142ef244cca244907f18af Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Mar 2023 14:18:00 -0300 Subject: [PATCH 3/9] feat: tie `offset_reset_policy` and `begin_offset` together MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit To make the configuration more intuitive and avoid exposing more parameters to the user, we should: 1) Remove reset_by_subscriber as an enum constructor for `offset_reset_policy`, as that might make the consumer hang indefinitely without manual action. 2) Set the `begin_offset` `brod_consumer` parameter to `earliest` or `latest` depending on the value of `offset_reset_policy`, as that’s probably the user’s intention. --- .../i18n/emqx_ee_bridge_kafka.conf | 7 +-- .../emqx_ee_bridge/src/emqx_ee_bridge.app.src | 2 +- .../src/emqx_ee_bridge_kafka.erl | 2 +- .../kafka/emqx_bridge_impl_kafka_consumer.erl | 6 ++ .../emqx_bridge_impl_kafka_consumer_SUITE.erl | 55 +++++++++++++++++++ 5 files changed, 66 insertions(+), 6 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf index af0afb72b..5f6a31b39 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -600,12 +600,11 @@ emqx_ee_bridge_kafka { consumer_offset_reset_policy { desc { en: "Defines how the consumers should reset the start offset when " - "a topic partition has an invalid offset (i.e. when an `OffsetOutOfRange` occurs)." - " Note that this is not the same as the `begin_offset`, which defines where to start" - " consumption when there is no offset committed yet." + "a topic partition has an invalid offset (i.e. when an `OffsetOutOfRange` occurs) or" + " when there is no committed offset for the topic-partition yet." zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时," "消费者应如何重置开始偏移量。(即当发生 \"OffsetOutOfRange\" 时)。" - " 请注意,这与`begin_offset'不同,后者定义了在还没有提交偏移量的情况下从哪里开始消费。" + " 或者当主题分区还没有承诺的偏移量时。" } label { en: "Offset Reset Policy" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 6647ec212..156c3eeac 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_bridge, [ {description, "EMQX Enterprise data bridges"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, [emqx_ee_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 3db8dd5f1..61124f1c1 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -370,7 +370,7 @@ fields(consumer_kafka_opts) -> })}, {offset_reset_policy, mk( - enum([reset_to_latest, reset_to_earliest, reset_by_subscriber]), + enum([reset_to_latest, reset_to_earliest]), #{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)} )}, {offset_commit_interval_seconds, diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index 44633213c..5407bebf7 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -290,7 +290,13 @@ start_consumer(Config, InstanceId, ClientID) -> %% cluster, so that the load gets distributed between all %% consumers and we don't repeat messages in the same cluster. GroupID = consumer_group_id(BridgeName), + BeginOffset = + case OffsetResetPolicy of + reset_to_latest -> latest; + reset_to_earliest -> earliest + end, ConsumerConfig = [ + {begin_offset, BeginOffset}, {max_bytes, MaxBatchBytes}, {offset_reset_policy, OffsetResetPolicy} ], diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index 15b4fbe40..37e697add 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -53,6 +53,7 @@ sasl_only_tests() -> %% tests that do not need to be run on all groups only_once_tests() -> [ + t_begin_offset_earliest, t_bridge_rule_action_source, t_cluster_group, t_node_joins_existing_cluster, @@ -1915,3 +1916,57 @@ t_cluster_node_down(Config) -> end ), ok. + +t_begin_offset_earliest(Config) -> + MQTTTopic = ?config(mqtt_topic, Config), + ResourceId = resource_id(Config), + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + {ok, C} = emqtt:start_link([{proto_ver, v5}]), + on_exit(fun() -> emqtt:stop(C) end), + {ok, _} = emqtt:connect(C), + {ok, _, [2]} = emqtt:subscribe(C, MQTTTopic, 2), + + ?check_trace( + begin + %% publish a message before the bridge is started. + NumMessages = 5, + lists:foreach( + fun(N) -> + publish(Config, [ + #{ + key => <<"mykey", (integer_to_binary(N))/binary>>, + value => Payload, + headers => [{<<"hkey">>, <<"hvalue">>}] + } + ]) + end, + lists:seq(1, NumMessages) + ), + + {ok, _} = create_bridge(Config, #{ + <<"kafka">> => #{<<"offset_reset_policy">> => <<"reset_to_earliest">>} + }), + + #{num_published => NumMessages} + end, + fun(Res, _Trace) -> + #{num_published := NumMessages} = Res, + %% we should receive messages published before starting + %% the consumers + Published = receive_published(#{n => NumMessages}), + Payloads = lists:map( + fun(#{payload := P}) -> emqx_json:decode(P, [return_maps]) end, + Published + ), + ?assert( + lists:all( + fun(#{<<"value">> := V}) -> V =:= Payload end, + Payloads + ), + #{payloads => Payloads} + ), + ?assertEqual(NumMessages, emqx_resource_metrics:received_get(ResourceId)), + ok + end + ), + ok. From 69fc1123eeb02e53073912eccfaf73e3851c888e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Mar 2023 15:11:06 -0300 Subject: [PATCH 4/9] refactor: change enum constructors and improve docs --- deploy/docker/Dockerfile | 2 +- .../emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf | 10 ++++------ .../emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl | 6 +++--- .../src/kafka/emqx_bridge_impl_kafka_consumer.erl | 15 ++++++++------- .../emqx_bridge_impl_kafka_consumer_SUITE.erl | 4 ++-- .../test/emqx_ee_bridge_kafka_tests.erl | 2 +- 6 files changed, 19 insertions(+), 20 deletions(-) diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile index f26926bce..bb5c23ea4 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/docker/Dockerfile @@ -53,7 +53,7 @@ VOLUME ["/opt/emqx/log", "/opt/emqx/data"] # - 11883 port for internal MQTT/TCP # - 18083 for dashboard and API # - 4370 default Erlang distribution port -# - 5369 for backplain gen_rpc +# - 5369 for backplane gen_rpc EXPOSE 1883 8083 8084 8883 11883 18083 4370 5369 ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"] diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf index 5f6a31b39..787a39fdb 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -599,12 +599,10 @@ emqx_ee_bridge_kafka { } consumer_offset_reset_policy { desc { - en: "Defines how the consumers should reset the start offset when " - "a topic partition has an invalid offset (i.e. when an `OffsetOutOfRange` occurs) or" - " when there is no committed offset for the topic-partition yet." - zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时," - "消费者应如何重置开始偏移量。(即当发生 \"OffsetOutOfRange\" 时)。" - " 或者当主题分区还没有承诺的偏移量时。" + en: "Defines from which offset a consumer should start fetching when there" + " is no commit history or when the commit history becomes invalid." + zh: "当没有主题分区没有偏移量的历史记录,或则历史记录失效后," + "消费者应该使用哪个偏移量重新开始消费" } label { en: "Offset Reset Policy" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 61124f1c1..f623417b2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -105,7 +105,7 @@ values(consumer) -> #{ kafka => #{ max_batch_bytes => <<"896KB">>, - offset_reset_policy => <<"reset_to_latest">>, + offset_reset_policy => <<"latest">>, offset_commit_interval_seconds => 5 }, key_encoding_mode => <<"none">>, @@ -370,8 +370,8 @@ fields(consumer_kafka_opts) -> })}, {offset_reset_policy, mk( - enum([reset_to_latest, reset_to_earliest]), - #{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)} + enum([latest, earliest]), + #{default => latest, desc => ?DESC(consumer_offset_reset_policy)} )}, {offset_commit_interval_seconds, mk( diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index 5407bebf7..a05f6ec13 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -59,8 +59,7 @@ subscriber_id := subscriber_id(), kafka_client_id := brod:client_id() }. --type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber. -%% -type mqtt_payload() :: full_message | message_value. +-type offset_reset_policy() :: latest | earliest. -type encoding_mode() :: none | base64. -type consumer_init_data() :: #{ hookpoint := binary(), @@ -271,7 +270,7 @@ start_consumer(Config, InstanceId, ClientID) -> max_batch_bytes := MaxBatchBytes, max_rejoin_attempts := MaxRejoinAttempts, offset_commit_interval_seconds := OffsetCommitInterval, - offset_reset_policy := OffsetResetPolicy + offset_reset_policy := OffsetResetPolicy0 }, key_encoding_mode := KeyEncodingMode, topic_mapping := TopicMapping0, @@ -290,10 +289,12 @@ start_consumer(Config, InstanceId, ClientID) -> %% cluster, so that the load gets distributed between all %% consumers and we don't repeat messages in the same cluster. GroupID = consumer_group_id(BridgeName), - BeginOffset = - case OffsetResetPolicy of - reset_to_latest -> latest; - reset_to_earliest -> earliest + %% earliest or latest + BeginOffset = OffsetResetPolicy0, + OffsetResetPolicy = + case OffsetResetPolicy0 of + latest -> reset_to_latest; + earliest -> reset_to_earliest end, ConsumerConfig = [ {begin_offset, BeginOffset}, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index 37e697add..8c2ff4e66 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -575,7 +575,7 @@ kafka_config(TestCase, _KafkaType, Config) -> " max_rejoin_attempts = 5\n" " offset_commit_interval_seconds = 3\n" %% todo: matrix this - " offset_reset_policy = reset_to_latest\n" + " offset_reset_policy = latest\n" " }\n" "~s" " key_encoding_mode = none\n" @@ -1944,7 +1944,7 @@ t_begin_offset_earliest(Config) -> ), {ok, _} = create_bridge(Config, #{ - <<"kafka">> => #{<<"offset_reset_policy">> => <<"reset_to_earliest">>} + <<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>} }), #{num_published => NumMessages} diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl index 72096c7b1..1b32f856d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl @@ -260,7 +260,7 @@ bridges.kafka_consumer.my_consumer { max_batch_bytes = 896KB max_rejoin_attempts = 5 offset_commit_interval_seconds = 3 - offset_reset_policy = reset_to_latest + offset_reset_policy = latest } topic_mapping = [ { From 1c8333030c2b0b82574a0477515fdba3a3501354 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 28 Mar 2023 09:17:46 -0300 Subject: [PATCH 5/9] fix(kafka_producer): add back `is_buffer_supported` callback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes https://emqx.atlassian.net/browse/EMQX-9366 This callback was accidentally removed while adding another feature, which made the buffer workers to be used for this bridge while they shouldn’t be. --- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src | 2 +- .../src/kafka/emqx_bridge_impl_kafka_producer.erl | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 6647ec212..156c3eeac 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_bridge, [ {description, "EMQX Enterprise data bridges"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, [emqx_ee_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index d46f687dd..5703c69f5 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -7,6 +7,7 @@ %% callbacks of behaviour emqx_resource -export([ + is_buffer_supported/0, callback_mode/0, on_start/2, on_stop/2, @@ -26,6 +27,8 @@ %% to hocon; keeping this as just `kafka' for backwards compatibility. -define(BRIDGE_TYPE, kafka). +is_buffer_supported() -> true. + callback_mode() -> async_if_possible. %% @doc Config schema is defined in emqx_ee_bridge_kafka. From 1824e7efcc4bcf485111f9643cbe3feb79df811e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Mar 2023 13:49:35 -0300 Subject: [PATCH 6/9] fix(rule_engine): count referenced bridges in `from` clauses as dependencies (rv5.0) Fixes https://emqx.atlassian.net/browse/EMQX-9325 Currently, ingress bridges referenced in the `FROM` clause of rules are not being accounted as dependencies. When we try to delete an ingress bridge that's referenced in a rule like `select * from "$bridges/mqtt:ingress"`, that bridge does not trigger an UI warning about dependent actions. --- .../include/emqx_bridge_resource.hrl | 22 +++++++++++++ apps/emqx_bridge/src/emqx_bridge_resource.erl | 16 +++++++-- .../test/emqx_bridge_resource_tests.erl | 33 +++++++++++++++++++ .../emqx_rule_engine/src/emqx_rule_engine.erl | 33 ++++++++++++++++--- .../emqx_rule_engine/src/emqx_rule_events.erl | 7 ++-- .../test/emqx_rule_engine_SUITE.erl | 25 +++++++++++++- changes/ce/fix-10251.en.md | 3 ++ 7 files changed, 128 insertions(+), 11 deletions(-) create mode 100644 apps/emqx_bridge/include/emqx_bridge_resource.hrl create mode 100644 apps/emqx_bridge/test/emqx_bridge_resource_tests.erl create mode 100644 changes/ce/fix-10251.en.md diff --git a/apps/emqx_bridge/include/emqx_bridge_resource.hrl b/apps/emqx_bridge/include/emqx_bridge_resource.hrl new file mode 100644 index 000000000..fcf1c41a4 --- /dev/null +++ b/apps/emqx_bridge/include/emqx_bridge_resource.hrl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_BRIDGE_RESOURCE_HRL). +-define(EMQX_BRIDGE_RESOURCE_HRL, true). + +-define(BRIDGE_HOOKPOINT(BridgeId), <<"$bridges/", BridgeId/binary>>). + +-endif. diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 6426a46b7..b43cbe0ec 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -15,6 +15,7 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_resource). +-include("emqx_bridge_resource.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -23,7 +24,9 @@ resource_id/1, resource_id/2, bridge_id/2, - parse_bridge_id/1 + parse_bridge_id/1, + bridge_hookpoint/1, + bridge_hookpoint_to_bridge_id/1 ]). -export([ @@ -89,6 +92,14 @@ parse_bridge_id(BridgeId) -> ) end. +bridge_hookpoint(BridgeId) -> + <<"$bridges/", (bin(BridgeId))/binary>>. + +bridge_hookpoint_to_bridge_id(?BRIDGE_HOOKPOINT(BridgeId)) -> + {ok, BridgeId}; +bridge_hookpoint_to_bridge_id(_) -> + {error, bad_bridge_hookpoint}. + validate_name(Name0) -> Name = unicode:characters_to_list(Name0, utf8), case is_list(Name) andalso Name =/= [] of @@ -308,7 +319,8 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) -> %% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it %% receives a message from the external database. BId = bridge_id(Type, Name), - Conf#{hookpoint => <<"$bridges/", BId/binary>>, bridge_name => Name}; + BridgeHookpoint = bridge_hookpoint(BId), + Conf#{hookpoint => BridgeHookpoint, bridge_name => Name}; %% TODO: rename this to `kafka_producer' after alias support is added %% to hocon; keeping this as just `kafka' for backwards compatibility. parse_confs(<<"kafka">> = _Type, Name, Conf) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_resource_tests.erl b/apps/emqx_bridge/test/emqx_bridge_resource_tests.erl new file mode 100644 index 000000000..a8a83ff6a --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_resource_tests.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_resource_tests). + +-include_lib("eunit/include/eunit.hrl"). + +bridge_hookpoint_test_() -> + BridgeId = emqx_bridge_resource:bridge_id(type, name), + BridgeHookpoint = emqx_bridge_resource:bridge_hookpoint(BridgeId), + [ + ?_assertEqual(<<"$bridges/type:name">>, BridgeHookpoint), + ?_assertEqual( + {ok, BridgeId}, + emqx_bridge_resource:bridge_hookpoint_to_bridge_id(BridgeHookpoint) + ), + ?_assertEqual( + {error, bad_bridge_hookpoint}, + emqx_bridge_resource:bridge_hookpoint_to_bridge_id(BridgeId) + ) + ]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index f15290547..d494a4740 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -213,11 +213,12 @@ get_rules_with_same_event(Topic) -> ]. -spec get_rule_ids_by_action(action_name()) -> [rule_id()]. -get_rule_ids_by_action(ActionName) when is_binary(ActionName) -> +get_rule_ids_by_action(BridgeId) when is_binary(BridgeId) -> [ Id - || #{actions := Acts, id := Id} <- get_rules(), - lists:any(fun(A) -> A =:= ActionName end, Acts) + || #{actions := Acts, id := Id, from := Froms} <- get_rules(), + forwards_to_bridge(Acts, BridgeId) orelse + references_ingress_bridge(Froms, BridgeId) ]; get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) -> {Mod, Fun} = @@ -317,8 +318,14 @@ get_basic_usage_info() -> NumRules = length(EnabledRules), ReferencedBridges = lists:foldl( - fun(#{actions := Actions, from := From}, Acc) -> - BridgeIDs0 = [BridgeID || <<"$bridges/", BridgeID/binary>> <- From], + fun(#{actions := Actions, from := Froms}, Acc) -> + BridgeIDs0 = + [ + BridgeID + || From <- Froms, + {ok, BridgeID} <- + [emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)] + ], BridgeIDs1 = lists:filter(fun is_binary/1, Actions), tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc) end, @@ -478,3 +485,19 @@ contains_actions(Actions, Mod0, Func0) -> end, Actions ). + +forwards_to_bridge(Actions, BridgeId) -> + lists:any(fun(A) -> A =:= BridgeId end, Actions). + +references_ingress_bridge(Froms, BridgeId) -> + lists:any( + fun(ReferenceBridgeId) -> + BridgeId =:= ReferenceBridgeId + end, + [ + RefBridgeId + || From <- Froms, + {ok, RefBridgeId} <- + [emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)] + ] + ). diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 0c962f1fa..7f14f6d8b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -20,6 +20,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge_resource.hrl"). -export([ reload/0, @@ -1011,7 +1012,7 @@ hook_fun_name(HookPoint) -> HookFunName. %% return static function references to help static code checks -hook_fun(<<"$bridges/", _/binary>>) -> fun ?MODULE:on_bridge_message_received/2; +hook_fun(?BRIDGE_HOOKPOINT(_)) -> fun ?MODULE:on_bridge_message_received/2; hook_fun('client.connected') -> fun ?MODULE:on_client_connected/3; hook_fun('client.disconnected') -> fun ?MODULE:on_client_disconnected/4; hook_fun('client.connack') -> fun ?MODULE:on_client_connack/4; @@ -1034,7 +1035,7 @@ ntoa(undefined) -> undefined; ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)). -event_name(<<"$bridges/", _/binary>> = Bridge) -> Bridge; +event_name(?BRIDGE_HOOKPOINT(_) = Bridge) -> Bridge; event_name(<<"$events/client_connected">>) -> 'client.connected'; event_name(<<"$events/client_disconnected">>) -> 'client.disconnected'; event_name(<<"$events/client_connack">>) -> 'client.connack'; @@ -1047,7 +1048,7 @@ event_name(<<"$events/message_dropped">>) -> 'message.dropped'; event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped'; event_name(_) -> 'message.publish'. -event_topic(<<"$bridges/", _/binary>> = Bridge) -> Bridge; +event_topic(?BRIDGE_HOOKPOINT(_) = Bridge) -> Bridge; event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.disconnected') -> <<"$events/client_disconnected">>; event_topic('client.connack') -> <<"$events/client_connack">>; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index c986cd365..2de013975 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -25,6 +25,8 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + %%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())). -define(TMP_RULEID, atom_to_binary(?FUNCTION_NAME)). @@ -198,8 +200,11 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(t_events, Config) -> ets:delete(events_record_tab), - ok = delete_rule(?config(hook_points_rules, Config)); + ok = delete_rule(?config(hook_points_rules, Config)), + emqx_common_test_helpers:call_janitor(), + ok; end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(), ok. %%------------------------------------------------------------------------------ @@ -2683,6 +2688,24 @@ t_get_basic_usage_info_1(_Config) -> ), ok. +t_get_rule_ids_by_action_reference_ingress_bridge(_Config) -> + BridgeId = <<"mqtt:ingress">>, + RuleId = <<"rule:ingress_bridge_referenced">>, + {ok, _} = + emqx_rule_engine:create_rule( + #{ + id => RuleId, + sql => <<"select 1 from \"$bridges/", BridgeId/binary, "\"">>, + actions => [#{function => console}] + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + ?assertMatch( + [RuleId], + emqx_rule_engine:get_rule_ids_by_action(BridgeId) + ), + ok. + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------ diff --git a/changes/ce/fix-10251.en.md b/changes/ce/fix-10251.en.md new file mode 100644 index 000000000..84102f952 --- /dev/null +++ b/changes/ce/fix-10251.en.md @@ -0,0 +1,3 @@ +Consider bridges referenced in `FROM` rule clauses as dependencies. + +Before this fix, when one tried to delete an ingress rule referenced in an action like `select * from "$bridges/mqtt:ingress"`, the UI would not trigger a warning about dependent rule actions. From 64faccf50bcd24c6ef3d9c6d0070db1ff7a309ca Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 28 Mar 2023 13:43:22 -0300 Subject: [PATCH 7/9] test: fix flaky kafka consumer test --- apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- .../src/emqx_rule_engine.app.src | 2 +- .../emqx_bridge_impl_kafka_consumer_SUITE.erl | 26 +++++++++++++++---- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 99a49f8fd..f5bcb23e2 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.13"}, + {vsn, "0.1.14"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 1681297ec..8d50f60e3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.0.11"}, + {vsn, "5.0.12"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]}, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index 1211fd5e9..01f62e04c 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -1744,7 +1744,18 @@ t_node_joins_existing_cluster(Config) -> setup_group_subscriber_spy(N1), {{ok, _}, {ok, _}} = ?wait_async_action( - erpc:call(N1, fun() -> {ok, _} = create_bridge(Config) end), + erpc:call(N1, fun() -> + {ok, _} = create_bridge( + Config, + #{ + <<"kafka">> => + #{ + <<"offset_reset_policy">> => + <<"earliest">> + } + } + ) + end), #{?snk_kind := kafka_consumer_subscriber_started}, 15_000 ), @@ -1775,14 +1786,19 @@ t_node_joins_existing_cluster(Config) -> wait_for_cluster_rpc(N2), {ok, _} = snabbkaffe:receive_events(SRef0), - ?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId])), + ?retry( + _Sleep1 = 100, + _Attempts1 = 50, + ?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId])) + ), + %% Give some time for the consumers in both nodes to %% rebalance. {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000), %% Publish some messages so we can check they came from each node. ?retry( - _Sleep1 = 100, - _Attempts1 = 50, + _Sleep2 = 100, + _Attempts2 = 50, true = erpc:call(N2, emqx_router, has_routes, [MQTTTopic]) ), {ok, SRef1} = @@ -1792,7 +1808,7 @@ t_node_joins_existing_cluster(Config) -> ?snk_span := {complete, _} }), NPartitions, - 10_000 + 20_000 ), lists:foreach( fun(N) -> From 4b428f7a294b7d7139d4ad9f3843302dbf1972fe Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 28 Mar 2023 21:18:21 +0200 Subject: [PATCH 8/9] chore: bump version to e5.0.2-rc.2 --- apps/emqx/include/emqx_release.hrl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index fb8b19b69..d246b4639 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.0.21"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.0.2-rc.1"). +-define(EMQX_RELEASE_EE, "5.0.2-rc.2"). %% the HTTP API version -define(EMQX_API_VERSION, "5.0"). From 11c120f87c18b87b7fbaa3fcf1c32ec7b04ee7ad Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 28 Mar 2023 21:19:03 +0200 Subject: [PATCH 9/9] chore: bump emqx app vsn --- apps/emqx/src/emqx.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index d2831b74d..1cecd7b61 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -3,7 +3,7 @@ {id, "emqx"}, {description, "EMQX Core"}, % strict semver, bump manually! - {vsn, "5.0.20"}, + {vsn, "5.0.21"}, {modules, []}, {registered, []}, {applications, [