diff --git a/.github/workflows/elixir_deps_check.yaml b/.github/workflows/elixir_deps_check.yaml index 511639a3c..d6449f563 100644 --- a/.github/workflows/elixir_deps_check.yaml +++ b/.github/workflows/elixir_deps_check.yaml @@ -23,7 +23,18 @@ jobs: mix local.hex --force mix local.rebar --force mix deps.get + # we check only enterprise because `rebar3 tree`, even if an + # enterprise app is excluded from `project_app_dirs` in + # `rebar.config.erl`, will still list dependencies from it. + # Since the enterprise profile is a superset of the + # community one and thus more complete, we use the former. + env: + MIX_ENV: emqx-enterprise + PROFILE: emqx-enterprise - name: check elixir deps run: ./scripts/check-elixir-deps-discrepancies.exs + env: + MIX_ENV: emqx-enterprise + PROFILE: emqx-enterprise ... diff --git a/apps/emqx_bridge_kafka/BSL.txt b/apps/emqx_bridge_kafka/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_bridge_kafka/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_kafka/README.md b/apps/emqx_bridge_kafka/README.md new file mode 100644 index 000000000..72cbeecc6 --- /dev/null +++ b/apps/emqx_bridge_kafka/README.md @@ -0,0 +1,19 @@ +# Kafka Data Integration Bridge + +This application houses the Kafka Producer and Consumer data +integration bridges for EMQX Enterprise Edition. It provides the +means to connect to Kafka and publish/consume messages to/from it. + +Currently, our Kafka Producer library (`wolff`) has its own `replayq` +buffering implementation, so this bridge does not require buffer +workers from `emqx_resource`. It implements the connection management +and interaction without need for a separate connector app, since it's +not used by authentication and authorization applications. + +## Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + +## License + +See [BSL](./BSL.txt). diff --git a/apps/emqx_bridge_kafka/docker-ct b/apps/emqx_bridge_kafka/docker-ct new file mode 100644 index 000000000..5288ee246 --- /dev/null +++ b/apps/emqx_bridge_kafka/docker-ct @@ -0,0 +1,2 @@ +toxiproxy +kafka diff --git a/apps/emqx_bridge_kafka/etc/emqx_bridge_kafka.conf b/apps/emqx_bridge_kafka/etc/emqx_bridge_kafka.conf new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config new file mode 100644 index 000000000..fd21fd15b --- /dev/null +++ b/apps/emqx_bridge_kafka/rebar.config @@ -0,0 +1,14 @@ +%% -*- mode: erlang; -*- +{erl_opts, [debug_info]}. +{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}} + , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}} + , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} + , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} + , {emqx_connector, {path, "../../apps/emqx_connector"}} + , {emqx_resource, {path, "../../apps/emqx_resource"}} + , {emqx_bridge, {path, "../../apps/emqx_bridge"}} + ]}. + +{shell, [ + {apps, [emqx_bridge_kafka]} +]}. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src new file mode 100644 index 000000000..a4fbe5673 --- /dev/null +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -0,0 +1,16 @@ +{application, emqx_bridge_kafka, [ + {description, "EMQX Enterprise Kafka Bridge"}, + {vsn, "0.1.0"}, + {registered, [emqx_bridge_kafka_consumer_sup]}, + {applications, [ + kernel, + stdlib, + telemetry, + wolff, + brod + ]}, + {env, []}, + {modules, []}, + + {links, []} +]}. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl similarity index 99% rename from lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl rename to apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index f3dfa5964..30f6cd60d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ee_bridge_kafka). +-module(emqx_bridge_kafka). -include_lib("emqx_connector/include/emqx_connector.hrl"). -include_lib("typerefl/include/types.hrl"). diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_ee_bridge_kafka_consumer_sup.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_sup.erl similarity index 98% rename from lib-ee/emqx_ee_bridge/src/kafka/emqx_ee_bridge_kafka_consumer_sup.erl rename to apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_sup.erl index feec8c09b..638c1def6 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_ee_bridge_kafka_consumer_sup.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_sup.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ee_bridge_kafka_consumer_sup). +-module(emqx_bridge_kafka_consumer_sup). -behaviour(supervisor). diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl similarity index 97% rename from lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl rename to apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl index c9dcce9a2..22a67c551 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl @@ -3,7 +3,7 @@ %%-------------------------------------------------------------------- %% Kafka connection configuration --module(emqx_bridge_impl_kafka). +-module(emqx_bridge_kafka_impl). -export([ hosts/1, diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl similarity index 94% rename from lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl rename to apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index f4dc3456e..2dc43a130 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_impl_kafka_consumer). +-module(emqx_bridge_kafka_impl_consumer). -behaviour(emqx_resource). @@ -52,8 +52,9 @@ ssl := _, any() => term() }. --type subscriber_id() :: emqx_ee_bridge_kafka_consumer_sup:child_id(). +-type subscriber_id() :: emqx_bridge_kafka_consumer_sup:child_id(). -type kafka_topic() :: brod:topic(). +-type kafka_message() :: #kafka_message{}. -type state() :: #{ kafka_topics := nonempty_list(kafka_topic()), subscriber_id := subscriber_id(), @@ -129,14 +130,14 @@ on_start(InstanceId, Config) -> ssl := SSL, topic_mapping := _ } = Config, - BootstrapHosts = emqx_bridge_impl_kafka:hosts(BootstrapHosts0), + BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0), KafkaType = kafka_consumer, %% Note: this is distinct per node. ClientID = make_client_id(InstanceId, KafkaType, BridgeName), ClientOpts0 = case Auth of none -> []; - Auth -> [{sasl, emqx_bridge_impl_kafka:sasl(Auth)}] + Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}] end, ClientOpts = add_ssl_opts(ClientOpts0, SSL), case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of @@ -191,7 +192,7 @@ init(GroupData, State0) -> State = State0#{kafka_topic => KafkaTopic}, {ok, State}. --spec handle_message(#kafka_message{}, consumer_state()) -> {ok, commit, consumer_state()}. +-spec handle_message(kafka_message(), consumer_state()) -> {ok, commit, consumer_state()}. handle_message(Message, State) -> ?tp_span( kafka_consumer_handle_message, @@ -240,13 +241,13 @@ add_ssl_opts(ClientOpts, #{enable := false}) -> add_ssl_opts(ClientOpts, SSL) -> [{ssl, emqx_tls_lib:to_client_opts(SSL)} | ClientOpts]. --spec make_subscriber_id(atom() | binary()) -> emqx_ee_bridge_kafka_consumer_sup:child_id(). +-spec make_subscriber_id(atom() | binary()) -> emqx_bridge_kafka_consumer_sup:child_id(). make_subscriber_id(BridgeName) -> BridgeNameBin = to_bin(BridgeName), <<"kafka_subscriber:", BridgeNameBin/binary>>. ensure_consumer_supervisor_started() -> - Mod = emqx_ee_bridge_kafka_consumer_sup, + Mod = emqx_bridge_kafka_consumer_sup, ChildSpec = #{ id => Mod, @@ -327,7 +328,7 @@ start_consumer(Config, InstanceId, ClientID) -> %% spawns one worker for each assigned topic-partition %% automatically, so we should not spawn duplicate workers. SubscriberId = make_subscriber_id(BridgeName), - case emqx_ee_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of + case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of {ok, _ConsumerPid} -> ?tp( kafka_consumer_subscriber_started, @@ -342,18 +343,18 @@ start_consumer(Config, InstanceId, ClientID) -> ?SLOG(error, #{ msg => "failed_to_start_kafka_consumer", instance_id => InstanceId, - kafka_hosts => emqx_bridge_impl_kafka:hosts(BootstrapHosts0), + kafka_hosts => emqx_bridge_kafka_impl:hosts(BootstrapHosts0), reason => emqx_misc:redact(Reason2) }), stop_client(ClientID), throw(failed_to_start_kafka_consumer) end. --spec stop_subscriber(emqx_ee_bridge_kafka_consumer_sup:child_id()) -> ok. +-spec stop_subscriber(emqx_bridge_kafka_consumer_sup:child_id()) -> ok. stop_subscriber(SubscriberId) -> _ = log_when_error( fun() -> - emqx_ee_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId) + emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId) end, #{ msg => "failed_to_delete_kafka_subscriber", @@ -437,7 +438,7 @@ do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) -> end. are_subscriber_workers_alive(SubscriberId) -> - Children = supervisor:which_children(emqx_ee_bridge_kafka_consumer_sup), + Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup), case lists:keyfind(SubscriberId, 1, Children) of false -> false; @@ -479,7 +480,7 @@ is_dry_run(InstanceId) -> make_client_id(InstanceId, KafkaType, KafkaName) -> case is_dry_run(InstanceId) of false -> - ClientID0 = emqx_bridge_impl_kafka:make_client_id(KafkaType, KafkaName), + ClientID0 = emqx_bridge_kafka_impl:make_client_id(KafkaType, KafkaName), binary_to_atom(ClientID0); true -> %% It is a dry run and we don't want to leak too many diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl similarity index 98% rename from lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl rename to apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 09713a431..7bee2c70d 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_impl_kafka_producer). +-module(emqx_bridge_kafka_impl_producer). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -47,15 +47,15 @@ on_start(InstId, Config) -> BridgeType = ?BRIDGE_TYPE, ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), _ = maybe_install_wolff_telemetry_handlers(ResourceId), - Hosts = emqx_bridge_impl_kafka:hosts(Hosts0), - ClientId = emqx_bridge_impl_kafka:make_client_id(BridgeType, BridgeName), + Hosts = emqx_bridge_kafka_impl:hosts(Hosts0), + ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), ClientConfig = #{ min_metadata_refresh_interval => MinMetaRefreshInterval, connect_timeout => ConnTimeout, client_id => ClientId, request_timeout => MetaReqTimeout, extra_sock_opts => socket_opts(SocketOpts), - sasl => emqx_bridge_impl_kafka:sasl(Auth), + sasl => emqx_bridge_kafka_impl:sasl(Auth), ssl => ssl(SSL) }, case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl similarity index 98% rename from lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl rename to apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 4019a9c42..fb7cf524c 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_impl_kafka_consumer_SUITE). +-module(emqx_bridge_kafka_impl_consumer_SUITE). -compile(nowarn_export_all). -compile(export_all). @@ -15,6 +15,7 @@ -import(emqx_common_test_helpers, [on_exit/1]). -define(BRIDGE_TYPE_BIN, <<"kafka_consumer">>). +-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_kafka]). %%------------------------------------------------------------------------------ %% CT boilerplate @@ -67,7 +68,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), + ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)), _ = application:stop(emqx_connector), ok. @@ -228,7 +229,7 @@ common_init_per_group() -> emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), application:load(emqx_bridge), ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), + ok = emqx_connector_test_helpers:start_apps(?APPS), {ok, _} = application:ensure_all_started(emqx_connector), emqx_mgmt_api_test_util:init_suite(), UniqueNum = integer_to_binary(erlang:unique_integer()), @@ -408,7 +409,7 @@ start_producers(TestCase, Config) -> DirectKafkaPort = ?config(direct_kafka_port, Config), UseTLS = ?config(use_tls, Config), UseSASL = ?config(use_sasl, Config), - Hosts = emqx_bridge_impl_kafka:hosts( + Hosts = emqx_bridge_kafka_impl:hosts( DirectKafkaHost ++ ":" ++ integer_to_list(DirectKafkaPort) ), SSL = @@ -876,7 +877,7 @@ ensure_connected(Config) -> consumer_clientid(Config) -> KafkaName = ?config(kafka_name, Config), - binary_to_atom(emqx_bridge_impl_kafka:make_client_id(kafka_consumer, KafkaName)). + binary_to_atom(emqx_bridge_kafka_impl:make_client_id(kafka_consumer, KafkaName)). get_client_connection(Config) -> KafkaHost = ?config(kafka_host, Config), @@ -885,7 +886,7 @@ get_client_connection(Config) -> brod_client:get_connection(ClientID, KafkaHost, KafkaPort). get_subscriber_workers() -> - [{_, SubscriberPid, _, _}] = supervisor:which_children(emqx_ee_bridge_kafka_consumer_sup), + [{_, SubscriberPid, _, _}] = supervisor:which_children(emqx_bridge_kafka_consumer_sup), brod_group_subscriber_v2:get_workers(SubscriberPid). wait_downs(Refs, _Timeout) when map_size(Refs) =:= 0 -> @@ -1069,7 +1070,7 @@ cluster(Config) -> Cluster = emqx_common_test_helpers:emqx_cluster( [core, core], [ - {apps, [emqx_conf, emqx_bridge, emqx_rule_engine]}, + {apps, [emqx_conf, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]}, {listener_ports, []}, {peer_mod, PeerModule}, {priv_data_dir, PrivDataDir}, @@ -1504,7 +1505,7 @@ do_t_receive_after_recovery(Config) -> _Interval = 500, _NAttempts = 20, begin - GroupId = emqx_bridge_impl_kafka_consumer:consumer_group_id(KafkaNameA), + GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(KafkaNameA), {ok, [#{partitions := Partitions}]} = brod:fetch_committed_offsets( KafkaClientId, GroupId ), @@ -1745,8 +1746,12 @@ t_node_joins_existing_cluster(Config) -> ?check_trace( begin [{Name1, Opts1}, {Name2, Opts2} | _] = Cluster, + ct:pal("starting ~p", [Name1]), N1 = emqx_common_test_helpers:start_slave(Name1, Opts1), - on_exit(fun() -> ok = emqx_common_test_helpers:stop_slave(N1) end), + on_exit(fun() -> + ct:pal("stopping ~p", [N1]), + ok = emqx_common_test_helpers:stop_slave(N1) + end), setup_group_subscriber_spy(N1), {{ok, _}, {ok, _}} = ?wait_async_action( @@ -1785,8 +1790,12 @@ t_node_joins_existing_cluster(Config) -> 1, 30_000 ), + ct:pal("starting ~p", [Name2]), N2 = emqx_common_test_helpers:start_slave(Name2, Opts2), - on_exit(fun() -> ok = emqx_common_test_helpers:stop_slave(N2) end), + on_exit(fun() -> + ct:pal("stopping ~p", [N2]), + ok = emqx_common_test_helpers:stop_slave(N2) + end), setup_group_subscriber_spy(N2), Nodes = [N1, N2], wait_for_cluster_rpc(N2), @@ -1873,7 +1882,10 @@ t_cluster_node_down(Config) -> Nodes = [N1, N2 | _] = lists:map( - fun({Name, Opts}) -> emqx_common_test_helpers:start_slave(Name, Opts) end, + fun({Name, Opts}) -> + ct:pal("starting ~p", [Name]), + emqx_common_test_helpers:start_slave(Name, Opts) + end, Cluster ), on_exit(fun() -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl similarity index 98% rename from lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl rename to apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 9b6ac05a7..6e3ddf5bb 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_impl_kafka_producer_SUITE). +-module(emqx_bridge_kafka_impl_producer_SUITE). -compile(nowarn_export_all). -compile(export_all). @@ -12,7 +12,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). --define(PRODUCER, emqx_bridge_impl_kafka_producer). +-define(PRODUCER, emqx_bridge_kafka_impl_producer). %%------------------------------------------------------------------------------ %% Things for REST API tests @@ -41,6 +41,8 @@ %% to hocon; keeping this as just `kafka' for backwards compatibility. -define(BRIDGE_TYPE, "kafka"). +-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]). + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -76,7 +78,7 @@ init_per_suite(Config) -> _ = emqx_ee_bridge:module_info(), application:load(emqx_bridge), ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), + ok = emqx_connector_test_helpers:start_apps(?APPS), {ok, _} = application:ensure_all_started(emqx_connector), emqx_mgmt_api_test_util:init_suite(), wait_until_kafka_is_up(), @@ -96,7 +98,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), + ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)), _ = application:stop(emqx_connector), ok. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl similarity index 99% rename from lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl rename to apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 1b32f856d..fa352ce89 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ee_bridge_kafka_tests). +-module(emqx_bridge_kafka_tests). -include_lib("eunit/include/eunit.hrl"). diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct index 963122082..0e947d89e 100644 --- a/lib-ee/emqx_ee_bridge/docker-ct +++ b/lib-ee/emqx_ee_bridge/docker-ct @@ -1,6 +1,5 @@ toxiproxy influxdb -kafka mongo mongo_rs_sharded mysql diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index 1c7d130ae..afd90f622 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,9 +1,5 @@ {erl_opts, [debug_info]}. -{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}} - , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}} - , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} - , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} - , {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.5.1"}}} +{deps, [ {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.5.1"}}} , {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 8316545a3..f1793d2e0 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -1,12 +1,13 @@ {application, emqx_ee_bridge, [ {description, "EMQX Enterprise data bridges"}, {vsn, "0.1.9"}, - {registered, [emqx_ee_bridge_kafka_consumer_sup]}, + {registered, []}, {applications, [ kernel, stdlib, emqx_ee_connector, - telemetry + telemetry, + emqx_bridge_kafka ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 1ddc1a110..84b0b98b0 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -15,8 +15,8 @@ api_schemas(Method) -> [ ref(emqx_ee_bridge_gcp_pubsub, Method), - ref(emqx_ee_bridge_kafka, Method ++ "_consumer"), - ref(emqx_ee_bridge_kafka, Method ++ "_producer"), + ref(emqx_bridge_kafka, Method ++ "_consumer"), + ref(emqx_bridge_kafka, Method ++ "_producer"), ref(emqx_ee_bridge_mysql, Method), ref(emqx_ee_bridge_pgsql, Method), ref(emqx_ee_bridge_mongodb, Method ++ "_rs"), @@ -39,7 +39,7 @@ api_schemas(Method) -> schema_modules() -> [ - emqx_ee_bridge_kafka, + emqx_bridge_kafka, emqx_ee_bridge_hstreamdb, emqx_ee_bridge_gcp_pubsub, emqx_ee_bridge_influxdb, @@ -69,10 +69,10 @@ examples(Method) -> lists:foldl(Fun, #{}, schema_modules()). resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); -resource_type(kafka_consumer) -> emqx_bridge_impl_kafka_consumer; +resource_type(kafka_consumer) -> emqx_bridge_kafka_impl_consumer; %% TODO: rename this to `kafka_producer' after alias support is added %% to hocon; keeping this as just `kafka' for backwards compatibility. -resource_type(kafka) -> emqx_bridge_impl_kafka_producer; +resource_type(kafka) -> emqx_bridge_kafka_impl_producer; resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub; resource_type(mongodb_rs) -> emqx_ee_connector_mongodb; @@ -174,16 +174,16 @@ kafka_structs() -> %% backwards compatibility. {kafka, mk( - hoconsc:map(name, ref(emqx_ee_bridge_kafka, kafka_producer)), + hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer)), #{ desc => <<"Kafka Producer Bridge Config">>, required => false, - converter => fun emqx_ee_bridge_kafka:kafka_producer_converter/2 + converter => fun emqx_bridge_kafka:kafka_producer_converter/2 } )}, {kafka_consumer, mk( - hoconsc:map(name, ref(emqx_ee_bridge_kafka, kafka_consumer)), + hoconsc:map(name, ref(emqx_bridge_kafka, kafka_consumer)), #{desc => <<"Kafka Consumer Bridge Config">>, required => false} )} ]. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index cd176081b..2e6406d70 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -8,8 +8,6 @@ hstreamdb_erl, influxdb, tdengine, - wolff, - brod, clickhouse, erlcloud, rocketmq, diff --git a/mix.exs b/mix.exs index 56840579d..6709f1774 100644 --- a/mix.exs +++ b/mix.exs @@ -107,6 +107,8 @@ defmodule EMQXUmbrella.MixProject do end defp umbrella_apps() do + enterprise_apps = enterprise_umbrella_apps() + "apps/*" |> Path.wildcard() |> Enum.map(fn path -> @@ -117,9 +119,20 @@ defmodule EMQXUmbrella.MixProject do {app, path: path, manager: :rebar3, override: true} end) + |> Enum.reject(fn dep_spec -> + dep_spec + |> elem(0) + |> then(&MapSet.member?(enterprise_apps, &1)) + end) end defp enterprise_apps(_profile_info = %{edition_type: :enterprise}) do + umbrella_apps = + Enum.map(enterprise_umbrella_apps(), fn app_name -> + path = "apps/#{app_name}" + {app_name, path: path, manager: :rebar3, override: true} + end) + "lib-ee/*" |> Path.wildcard() |> Enum.filter(&File.dir?/1) @@ -131,12 +144,20 @@ defmodule EMQXUmbrella.MixProject do {app, path: path, manager: :rebar3, override: true} end) + |> Enum.concat(umbrella_apps) end defp enterprise_apps(_profile_info) do [] end + # need to remove those when listing `/apps/`... + defp enterprise_umbrella_apps() do + MapSet.new([ + :emqx_bridge_kafka + ]) + end + defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, @@ -146,7 +167,8 @@ defmodule EMQXUmbrella.MixProject do {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"}, {:snappyer, "1.2.8", override: true}, - {:supervisor3, "1.1.11", override: true} + {:crc32cer, "0.1.8", override: true}, + {:supervisor3, "1.1.12", override: true} ] end @@ -320,6 +342,7 @@ defmodule EMQXUmbrella.MixProject do emqx_ee_conf: :load, emqx_ee_connector: :permanent, emqx_ee_bridge: :permanent, + emqx_bridge_kafka: :permanent, emqx_ee_schema_registry: :permanent ], else: [] diff --git a/rebar.config.erl b/rebar.config.erl index cdd628664..39c336422 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -78,6 +78,9 @@ is_cover_enabled() -> is_enterprise(ce) -> false; is_enterprise(ee) -> true. +is_community_umbrella_app("apps/emqx_bridge_kafka") -> false; +is_community_umbrella_app(_) -> true. + is_jq_supported() -> not (false =/= os:getenv("BUILD_WITHOUT_JQ") orelse is_win32()) orelse @@ -122,8 +125,14 @@ project_app_dirs() -> project_app_dirs(get_edition_from_profile_env()). project_app_dirs(Edition) -> - ["apps/*"] ++ - case is_enterprise(Edition) of + IsEnterprise = is_enterprise(Edition), + UmbrellaApps = [ + Path + || Path <- filelib:wildcard("apps/*"), + is_community_umbrella_app(Path) orelse IsEnterprise + ], + UmbrellaApps ++ + case IsEnterprise of true -> ["lib-ee/*"]; false -> [] end. @@ -428,6 +437,7 @@ relx_apps_per_edition(ee) -> {emqx_ee_conf, load}, emqx_ee_connector, emqx_ee_bridge, + emqx_bridge_kafka, emqx_ee_schema_registry ]; relx_apps_per_edition(ce) -> diff --git a/rel/i18n/emqx_ee_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon similarity index 99% rename from rel/i18n/emqx_ee_bridge_kafka.hocon rename to rel/i18n/emqx_bridge_kafka.hocon index 1638eb89f..2f1811269 100644 --- a/rel/i18n/emqx_ee_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -1,4 +1,4 @@ -emqx_ee_bridge_kafka { +emqx_bridge_kafka { config_enable { desc { en: "Enable (true) or disable (false) this Kafka bridge." diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 82823720d..83e69d1ad 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -107,6 +107,10 @@ case "${WHICH_APP}" in ## ensure enterprise profile when testing lib-ee applications export PROFILE='emqx-enterprise' ;; + apps/emqx_bridge_kafka) + ## ensure enterprise profile when testing ee applications + export PROFILE='emqx-enterprise' + ;; *) export PROFILE="${PROFILE:-emqx}" ;; @@ -172,7 +176,7 @@ for dep in ${CT_DEPS}; do ;; rocketmq) FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml' ) - ;; + ;; cassandra) FILES+=( '.ci/docker-compose-file/docker-compose-cassandra.yaml' ) ;; diff --git a/scripts/find-apps.sh b/scripts/find-apps.sh index f07cd2f7d..66990ae12 100755 --- a/scripts/find-apps.sh +++ b/scripts/find-apps.sh @@ -72,6 +72,9 @@ describe_app() { runner="docker" fi case "${app}" in + apps/emqx_bridge_kafka) + profile='emqx-enterprise' + ;; apps/*) profile='emqx' ;;