From ad4be08bb2bac7c4d730070dc268fc2b54afccc0 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 12 Apr 2023 10:22:36 -0300 Subject: [PATCH] feat: implement Pulsar Producer bridge (e5.0) Fixes https://emqx.atlassian.net/browse/EMQX-8398 --- .../docker-compose-pulsar-tcp.yaml | 32 + .ci/docker-compose-file/toxiproxy.json | 12 + LICENSE | 1 + apps/emqx/test/emqx_test_janitor.erl | 24 +- apps/emqx_bridge/src/emqx_bridge.erl | 3 +- apps/emqx_bridge/src/emqx_bridge_resource.erl | 2 + apps/emqx_bridge_pulsar/.gitignore | 19 + apps/emqx_bridge_pulsar/BSL.txt | 94 ++ apps/emqx_bridge_pulsar/README.md | 30 + apps/emqx_bridge_pulsar/docker-ct | 2 + .../etc/emqx_bridge_pulsar.conf | 0 .../include/emqx_bridge_pulsar.hrl | 14 + apps/emqx_bridge_pulsar/rebar.config | 13 + .../src/emqx_bridge_pulsar.app.src | 15 + .../src/emqx_bridge_pulsar.erl | 228 +++++ .../src/emqx_bridge_pulsar_app.erl | 14 + .../src/emqx_bridge_pulsar_impl_producer.erl | 396 +++++++++ .../src/emqx_bridge_pulsar_sup.erl | 33 + ...emqx_bridge_pulsar_impl_producer_SUITE.erl | 819 ++++++++++++++++++ .../pulsar_echo_consumer.erl | 25 + .../src/emqx_resource_manager.erl | 7 +- changes/ee/feat-10378.en.md | 1 + .../emqx_ee_bridge/src/emqx_ee_bridge.app.src | 4 +- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 24 +- mix.exs | 4 +- rebar.config.erl | 1 + rel/i18n/emqx_bridge_pulsar.hocon | 176 ++++ rel/i18n/zh/emqx_bridge_pulsar.hocon | 173 ++++ scripts/ct/run.sh | 5 +- 29 files changed, 2160 insertions(+), 11 deletions(-) create mode 100644 .ci/docker-compose-file/docker-compose-pulsar-tcp.yaml create mode 100644 apps/emqx_bridge_pulsar/.gitignore create mode 100644 apps/emqx_bridge_pulsar/BSL.txt create mode 100644 apps/emqx_bridge_pulsar/README.md create mode 100644 apps/emqx_bridge_pulsar/docker-ct create mode 100644 apps/emqx_bridge_pulsar/etc/emqx_bridge_pulsar.conf create mode 100644 apps/emqx_bridge_pulsar/include/emqx_bridge_pulsar.hrl create mode 100644 apps/emqx_bridge_pulsar/rebar.config create mode 100644 apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src create mode 100644 apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl create mode 100644 apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_app.erl create mode 100644 apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl create mode 100644 apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_sup.erl create mode 100644 apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl create mode 100644 apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE_data/pulsar_echo_consumer.erl create mode 100644 changes/ee/feat-10378.en.md create mode 100644 rel/i18n/emqx_bridge_pulsar.hocon create mode 100644 rel/i18n/zh/emqx_bridge_pulsar.hocon diff --git a/.ci/docker-compose-file/docker-compose-pulsar-tcp.yaml b/.ci/docker-compose-file/docker-compose-pulsar-tcp.yaml new file mode 100644 index 000000000..926000ae4 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-pulsar-tcp.yaml @@ -0,0 +1,32 @@ +version: '3' + +services: + pulsar: + container_name: pulsar + image: apachepulsar/pulsar:2.11.0 + # ports: + # - 6650:6650 + # - 8080:8080 + networks: + emqx_bridge: + volumes: + - ../../apps/emqx/etc/certs/cert.pem:/etc/certs/server.pem + - ../../apps/emqx/etc/certs/key.pem:/etc/certs/key.pem + - ../../apps/emqx/etc/certs/cacert.pem:/etc/certs/ca.pem + restart: always + command: + - bash + - "-c" + - | + sed -i 's/^advertisedAddress=/#advertisedAddress=/' conf/standalone.conf + sed -ie 's/^brokerServicePort=.*/brokerServicePort=6649/' conf/standalone.conf + sed -i 's/^bindAddress=/#bindAddress=/' conf/standalone.conf + sed -i 's#^bindAddresses=#bindAddresses=plain:pulsar://0.0.0.0:6650,ssl:pulsar+ssl://0.0.0.0:6651,toxiproxy:pulsar://0.0.0.0:6652,toxiproxy_ssl:pulsar+ssl://0.0.0.0:6653#' conf/standalone.conf + sed -i 's#^advertisedAddress=#advertisedAddress=plain:pulsar://pulsar:6650,ssl:pulsar+ssl://pulsar:6651,toxiproxy:pulsar://toxiproxy:6652,toxiproxy_ssl:pulsar+ssl://toxiproxy:6653#' conf/standalone.conf + sed -i 's#^tlsCertificateFilePath=#tlsCertificateFilePath=/etc/certs/server.pem#' conf/standalone.conf + sed -i 's#^tlsTrustCertsFilePath=#tlsTrustCertsFilePath=/etc/certs/ca.pem#' conf/standalone.conf + sed -i 's#^tlsKeyFilePath=#tlsKeyFilePath=/etc/certs/key.pem#' conf/standalone.conf + sed -i 's#^tlsProtocols=#tlsProtocols=TLSv1.3,TLSv1.2#' conf/standalone.conf + sed -i 's#^tlsCiphers=#tlsCiphers=TLS_AES_256_GCM_SHA384#' conf/standalone.conf + echo 'advertisedListeners=plain:pulsar://pulsar:6650,ssl:pulsar+ssl://pulsar:6651,toxiproxy:pulsar://toxiproxy:6652,toxiproxy_ssl:pulsar+ssl://toxiproxy:6653' >> conf/standalone.conf + bin/pulsar standalone -nfw -nss diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index f6b31da4c..9cefcb808 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -107,5 +107,17 @@ "listen": "0.0.0.0:4242", "upstream": "opents:4242", "enabled": true + }, + { + "name": "pulsar_plain", + "listen": "0.0.0.0:6652", + "upstream": "pulsar:6652", + "enabled": true + }, + { + "name": "pulsar_tls", + "listen": "0.0.0.0:6653", + "upstream": "pulsar:6653", + "enabled": true } ] diff --git a/LICENSE b/LICENSE index 4ed190bda..68bb18ce3 100644 --- a/LICENSE +++ b/LICENSE @@ -9,3 +9,4 @@ sub-directory and some of the apps under the apps directory. Source code under apps that uses BSL License: - apps/emqx_bridge_kafka +- apps/emqx_bridge_pulsar diff --git a/apps/emqx/test/emqx_test_janitor.erl b/apps/emqx/test/emqx_test_janitor.erl index c3f82a3e1..041b03fa7 100644 --- a/apps/emqx/test/emqx_test_janitor.erl +++ b/apps/emqx/test/emqx_test_janitor.erl @@ -60,12 +60,12 @@ init(Parent) -> {ok, #{callbacks => [], owner => Parent}}. terminate(_Reason, #{callbacks := Callbacks}) -> - lists:foreach(fun(Fun) -> catch Fun() end, Callbacks). + do_terminate(Callbacks). handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) -> {reply, ok, State#{callbacks := [Callback | Callbacks]}}; handle_call(terminate, _From, State = #{callbacks := Callbacks}) -> - lists:foreach(fun(Fun) -> catch Fun() end, Callbacks), + do_terminate(Callbacks), {stop, normal, ok, State}; handle_call(_Req, _From, State) -> {reply, error, State}. @@ -77,3 +77,23 @@ handle_info({'EXIT', Parent, _Reason}, State = #{owner := Parent}) -> {stop, normal, State}; handle_info(_Msg, State) -> {noreply, State}. + +%%---------------------------------------------------------------------------------- +%% Internal fns +%%---------------------------------------------------------------------------------- + +do_terminate(Callbacks) -> + lists:foreach( + fun(Fun) -> + try + Fun() + catch + K:E:S -> + ct:pal("error executing callback ~p: ~p", [Fun, {K, E}]), + ct:pal("stacktrace: ~p", [S]), + ok + end + end, + Callbacks + ), + ok. diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 08b8222f2..fd4e16263 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -70,7 +70,8 @@ T == dynamo; T == rocketmq; T == cassandra; - T == sqlserver + T == sqlserver; + T == pulsar_producer ). load() -> diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 1ad024c40..da98b073e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -340,6 +340,8 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) -> %% to hocon; keeping this as just `kafka' for backwards compatibility. parse_confs(<<"kafka">> = _Type, Name, Conf) -> Conf#{bridge_name => Name}; +parse_confs(<<"pulsar_producer">> = _Type, Name, Conf) -> + Conf#{bridge_name => Name}; parse_confs(_Type, _Name, Conf) -> Conf. diff --git a/apps/emqx_bridge_pulsar/.gitignore b/apps/emqx_bridge_pulsar/.gitignore new file mode 100644 index 000000000..f1c455451 --- /dev/null +++ b/apps/emqx_bridge_pulsar/.gitignore @@ -0,0 +1,19 @@ +.rebar3 +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +_build +.idea +*.iml +rebar3.crashdump +*~ diff --git a/apps/emqx_bridge_pulsar/BSL.txt b/apps/emqx_bridge_pulsar/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_bridge_pulsar/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_pulsar/README.md b/apps/emqx_bridge_pulsar/README.md new file mode 100644 index 000000000..09e17d8bb --- /dev/null +++ b/apps/emqx_bridge_pulsar/README.md @@ -0,0 +1,30 @@ +# Pulsar Data Integration Bridge + +This application houses the Pulsar Producer data integration bridge +for EMQX Enterprise Edition. It provides the means to connect to +Pulsar and publish messages to it. + +Currently, our Pulsar Producer library has its own `replayq` buffering +implementation, so this bridge does not require buffer workers from +`emqx_resource`. It implements the connection management and +interaction without need for a separate connector app, since it's not +used by authentication and authorization applications. + +# Documentation links + +For more information on Apache Pulsar, please see its [official +site](https://pulsar.apache.org/). + +# Configurations + +Please see [our official +documentation](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-pulsar.html) +for more detailed info. + +# Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + +# License + +See [BSL](./BSL.txt). diff --git a/apps/emqx_bridge_pulsar/docker-ct b/apps/emqx_bridge_pulsar/docker-ct new file mode 100644 index 000000000..6324bb4f7 --- /dev/null +++ b/apps/emqx_bridge_pulsar/docker-ct @@ -0,0 +1,2 @@ +toxiproxy +pulsar diff --git a/apps/emqx_bridge_pulsar/etc/emqx_bridge_pulsar.conf b/apps/emqx_bridge_pulsar/etc/emqx_bridge_pulsar.conf new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx_bridge_pulsar/include/emqx_bridge_pulsar.hrl b/apps/emqx_bridge_pulsar/include/emqx_bridge_pulsar.hrl new file mode 100644 index 000000000..5ee87e48f --- /dev/null +++ b/apps/emqx_bridge_pulsar/include/emqx_bridge_pulsar.hrl @@ -0,0 +1,14 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_BRIDGE_PULSAR_HRL). +-define(EMQX_BRIDGE_PULSAR_HRL, true). + +-define(PULSAR_HOST_OPTIONS, #{ + default_port => 6650, + default_scheme => "pulsar", + supported_schemes => ["pulsar", "pulsar+ssl"] +}). + +-endif. diff --git a/apps/emqx_bridge_pulsar/rebar.config b/apps/emqx_bridge_pulsar/rebar.config new file mode 100644 index 000000000..3b9ae417d --- /dev/null +++ b/apps/emqx_bridge_pulsar/rebar.config @@ -0,0 +1,13 @@ +%% -*- mode: erlang; -*- + +{erl_opts, [debug_info]}. +{deps, [ {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.0"}}} + , {emqx_connector, {path, "../../apps/emqx_connector"}} + , {emqx_resource, {path, "../../apps/emqx_resource"}} + , {emqx_bridge, {path, "../../apps/emqx_bridge"}} + ]}. + +{shell, [ + % {config, "config/sys.config"}, + {apps, [emqx_bridge_pulsar]} +]}. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src new file mode 100644 index 000000000..cd89f6867 --- /dev/null +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -0,0 +1,15 @@ +{application, emqx_bridge_pulsar, [ + {description, "EMQX Pulsar Bridge"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {emqx_bridge_pulsar_app, []}}, + {applications, [ + kernel, + stdlib, + pulsar + ]}, + {env, []}, + {modules, []}, + + {links, []} +]}. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl new file mode 100644 index 000000000..a3e50054e --- /dev/null +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -0,0 +1,228 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_pulsar). + +-include("emqx_bridge_pulsar.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +%% hocon_schema API +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). +%% emqx_ee_bridge "unofficial" API +-export([conn_bridge_examples/1]). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + "bridge_pulsar". + +roots() -> + []. + +fields(pulsar_producer) -> + fields(config) ++ fields(producer_opts); +fields(config) -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {servers, + mk( + binary(), + #{ + required => true, + desc => ?DESC("servers"), + validator => emqx_schema:servers_validator( + ?PULSAR_HOST_OPTIONS, _Required = true + ) + } + )}, + {authentication, + mk(hoconsc:union([none, ref(auth_basic), ref(auth_token)]), #{ + default => none, desc => ?DESC("authentication") + })} + ] ++ emqx_connector_schema_lib:ssl_fields(); +fields(producer_opts) -> + [ + {batch_size, + mk( + pos_integer(), + #{default => 100, desc => ?DESC("producer_batch_size")} + )}, + {compression, + mk( + hoconsc:enum([no_compression, snappy, zlib]), + #{default => no_compression, desc => ?DESC("producer_compression")} + )}, + {send_buffer, + mk(emqx_schema:bytesize(), #{ + default => <<"1MB">>, desc => ?DESC("producer_send_buffer") + })}, + {sync_timeout, + mk(emqx_schema:duration_ms(), #{ + default => <<"3s">>, desc => ?DESC("producer_sync_timeout") + })}, + {retention_period, + mk( + hoconsc:union([infinity, emqx_schema:duration_ms()]), + #{default => infinity, desc => ?DESC("producer_retention_period")} + )}, + {max_batch_bytes, + mk( + emqx_schema:bytesize(), + #{default => <<"900KB">>, desc => ?DESC("producer_max_batch_bytes")} + )}, + {local_topic, mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})}, + {pulsar_topic, mk(binary(), #{required => true, desc => ?DESC("producer_pulsar_topic")})}, + {strategy, + mk( + hoconsc:enum([random, roundrobin, first_key_dispatch]), + #{default => random, desc => ?DESC("producer_strategy")} + )}, + {buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})}, + {message, + mk(ref(producer_pulsar_message), #{ + required => false, desc => ?DESC("producer_message_opts") + })}, + {resource_opts, + mk( + ref(producer_resource_opts), + #{ + required => false, + desc => ?DESC(emqx_resource_schema, "creation_opts") + } + )} + ]; +fields(producer_buffer) -> + [ + {mode, + mk( + hoconsc:enum([memory, disk, hybrid]), + #{default => memory, desc => ?DESC("buffer_mode")} + )}, + {per_partition_limit, + mk( + emqx_schema:bytesize(), + #{default => <<"2GB">>, desc => ?DESC("buffer_per_partition_limit")} + )}, + {segment_bytes, + mk( + emqx_schema:bytesize(), + #{default => <<"100MB">>, desc => ?DESC("buffer_segment_bytes")} + )}, + {memory_overload_protection, + mk(boolean(), #{ + default => false, + desc => ?DESC("buffer_memory_overload_protection") + })} + ]; +fields(producer_pulsar_message) -> + [ + {key, + mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC("producer_key_template")})}, + {value, mk(string(), #{default => <<"${.}">>, desc => ?DESC("producer_value_template")})} + ]; +fields(producer_resource_opts) -> + SupportedOpts = [ + health_check_interval, + resume_interval, + start_after_created, + start_timeout, + auto_restart_interval + ], + lists:filtermap( + fun + ({health_check_interval = Field, MetaFn}) -> + {true, {Field, override_default(MetaFn, 1_000)}}; + ({Field, _Meta}) -> + lists:member(Field, SupportedOpts) + end, + emqx_resource_schema:fields("creation_opts") + ); +fields(auth_basic) -> + [ + {username, mk(binary(), #{required => true, desc => ?DESC("auth_basic_username")})}, + {password, + mk(binary(), #{ + required => true, + desc => ?DESC("auth_basic_password"), + sensitive => true, + converter => fun emqx_schema:password_converter/2 + })} + ]; +fields(auth_token) -> + [ + {jwt, + mk(binary(), #{ + required => true, + desc => ?DESC("auth_token_jwt"), + sensitive => true, + converter => fun emqx_schema:password_converter/2 + })} + ]; +fields("get_" ++ Type) -> + emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type); +fields("put_" ++ Type) -> + fields("config_" ++ Type); +fields("post_" ++ Type) -> + [type_field(), name_field() | fields("config_" ++ Type)]; +fields("config_producer") -> + fields(pulsar_producer). + +desc(pulsar_producer) -> + ?DESC(pulsar_producer_struct); +desc(producer_resource_opts) -> + ?DESC(emqx_resource_schema, "creation_opts"); +desc("get_" ++ Type) when Type =:= "producer" -> + ["Configuration for Pulsar using `GET` method."]; +desc("put_" ++ Type) when Type =:= "producer" -> + ["Configuration for Pulsar using `PUT` method."]; +desc("post_" ++ Type) when Type =:= "producer" -> + ["Configuration for Pulsar using `POST` method."]; +desc(Name) -> + lists:member(Name, struct_names()) orelse throw({missing_desc, Name}), + ?DESC(Name). + +conn_bridge_examples(_Method) -> + [ + #{ + <<"pulsar_producer">> => #{ + summary => <<"Pulsar Producer Bridge">>, + value => #{todo => true} + } + } + ]. + +%%------------------------------------------------------------------------------------------------- +%% Internal fns +%%------------------------------------------------------------------------------------------------- + +mk(Type, Meta) -> hoconsc:mk(Type, Meta). +ref(Name) -> hoconsc:ref(?MODULE, Name). + +type_field() -> + {type, mk(hoconsc:enum([pulsar_producer]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. + +struct_names() -> + [ + auth_basic, + auth_token, + producer_buffer, + producer_pulsar_message + ]. + +override_default(OriginalFn, NewDefault) -> + fun + (default) -> NewDefault; + (Field) -> OriginalFn(Field) + end. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_app.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_app.erl new file mode 100644 index 000000000..bedf42cf6 --- /dev/null +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_app.erl @@ -0,0 +1,14 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_pulsar_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + emqx_bridge_pulsar_sup:start_link(). + +stop(_State) -> + ok. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl new file mode 100644 index 000000000..4bc390b91 --- /dev/null +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -0,0 +1,396 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_pulsar_impl_producer). + +-include("emqx_bridge_pulsar.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% `emqx_resource' API +-export([ + callback_mode/0, + is_buffer_supported/0, + on_start/2, + on_stop/2, + on_get_status/2, + on_query/3, + on_query_async/4 +]). + +-type pulsar_client_id() :: atom(). +-type state() :: #{ + pulsar_client_id := pulsar_client_id(), + producers := pulsar_producers:producers(), + sync_timeout := infinity | time:time(), + message_template := message_template() +}. +-type buffer_mode() :: memory | disk | hybrid. +-type compression_mode() :: no_compression | snappy | zlib. +-type partition_strategy() :: random | roundrobin | first_key_dispatch. +-type message_template_raw() :: #{ + key := binary(), + value := binary() +}. +-type message_template() :: #{ + key := emqx_plugin_libs_rule:tmpl_token(), + value := emqx_plugin_libs_rule:tmpl_token() +}. +-type config() :: #{ + authentication := _, + batch_size := pos_integer(), + bridge_name := atom(), + buffer := #{ + mode := buffer_mode(), + per_partition_limit := emqx_schema:byte_size(), + segment_bytes := emqx_schema:byte_size(), + memory_overload_protection := boolean() + }, + compression := compression_mode(), + max_batch_bytes := emqx_schema:bytesize(), + message := message_template_raw(), + pulsar_topic := binary(), + retention_period := infinity | emqx_schema:duration_ms(), + send_buffer := emqx_schema:bytesize(), + servers := binary(), + ssl := _, + strategy := partition_strategy(), + sync_timeout := emqx_schema:duration_ms() +}. + +%%------------------------------------------------------------------------------------- +%% `emqx_resource' API +%%------------------------------------------------------------------------------------- + +callback_mode() -> async_if_possible. + +%% there are no queries to be made to this bridge, so we say that +%% buffer is supported so we don't spawn unused resource buffer +%% workers. +is_buffer_supported() -> true. + +-spec on_start(manager_id(), config()) -> {ok, state()}. +on_start(InstanceId, Config) -> + #{ + authentication := _Auth, + bridge_name := BridgeName, + servers := Servers0, + ssl := SSL + } = Config, + Servers = format_servers(Servers0), + ClientId = make_client_id(InstanceId, BridgeName), + SSLOpts = emqx_tls_lib:to_client_opts(SSL), + ClientOpts = #{ + ssl_opts => SSLOpts, + conn_opts => conn_opts(Config) + }, + case pulsar:ensure_supervised_client(ClientId, Servers, ClientOpts) of + {ok, _Pid} -> + ?SLOG(info, #{ + msg => "pulsar_client_started", + instance_id => InstanceId, + pulsar_hosts => Servers + }); + {error, Error} -> + ?SLOG(error, #{ + msg => "failed_to_start_kafka_client", + instance_id => InstanceId, + pulsar_hosts => Servers, + reason => Error + }), + throw(failed_to_start_pulsar_client) + end, + start_producer(Config, InstanceId, ClientId, ClientOpts). + +-spec on_stop(manager_id(), state()) -> ok. +on_stop(_InstanceId, State) -> + #{ + pulsar_client_id := ClientId, + producers := Producers + } = State, + stop_producers(ClientId, Producers), + stop_client(ClientId), + ?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}), + ok. + +-spec on_get_status(manager_id(), state()) -> connected | disconnected. +on_get_status(_InstanceId, State) -> + #{ + pulsar_client_id := ClientId, + producers := Producers + } = State, + case pulsar_client_sup:find_client(ClientId) of + {ok, Pid} -> + try pulsar_client:get_status(Pid) of + true -> + get_producer_status(Producers); + false -> + disconnected + catch + error:timeout -> + disconnected; + exit:{noproc, _} -> + disconnected + end; + {error, _} -> + disconnected + end. + +-spec on_query(manager_id(), {send_message, map()}, state()) -> ok | {error, timeout}. +on_query(_InstanceId, {send_message, Message}, State) -> + #{ + producers := Producers, + sync_timeout := SyncTimeout, + message_template := MessageTemplate + } = State, + PulsarMessage = render_message(Message, MessageTemplate), + try + pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) + catch + error:timeout -> + {error, timeout} + end. + +-spec on_query_async( + manager_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state() +) -> + {ok, pid()}. +on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) -> + #{ + producers := Producers, + message_template := MessageTemplate + } = State, + PulsarMessage = render_message(Message, MessageTemplate), + pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}). + +%%------------------------------------------------------------------------------------- +%% Internal fns +%%------------------------------------------------------------------------------------- + +-spec to_bin(atom() | string() | binary()) -> binary(). +to_bin(A) when is_atom(A) -> + atom_to_binary(A); +to_bin(L) when is_list(L) -> + list_to_binary(L); +to_bin(B) when is_binary(B) -> + B. + +-spec format_servers(binary()) -> [string()]. +format_servers(Servers0) -> + Servers1 = emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS), + lists:map( + fun({Scheme, Host, Port}) -> + Scheme ++ "://" ++ Host ++ ":" ++ integer_to_list(Port) + end, + Servers1 + ). + +-spec make_client_id(manager_id(), atom() | binary()) -> pulsar_client_id(). +make_client_id(InstanceId, BridgeName) -> + case is_dry_run(InstanceId) of + true -> + pulsar_producer_probe; + false -> + ClientIdBin = iolist_to_binary([ + <<"pulsar_producer:">>, + to_bin(BridgeName), + <<":">>, + to_bin(node()) + ]), + binary_to_atom(ClientIdBin) + end. + +-spec is_dry_run(manager_id()) -> boolean(). +is_dry_run(InstanceId) -> + TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), + case TestIdStart of + nomatch -> + false; + _ -> + string:equal(TestIdStart, InstanceId) + end. + +conn_opts(#{authentication := none}) -> + #{}; +conn_opts(#{authentication := #{username := Username, password := Password}}) -> + #{ + auth_data => iolist_to_binary([Username, <<":">>, Password]), + auth_method_name => <<"basic">> + }; +conn_opts(#{authentication := #{jwt := JWT}}) -> + #{ + auth_data => JWT, + auth_method_name => <<"token">> + }. + +-spec replayq_dir(pulsar_client_id()) -> string(). +replayq_dir(ClientId) -> + filename:join([emqx:data_dir(), "pulsar", to_bin(ClientId)]). + +-spec producer_name(pulsar_client_id()) -> atom(). +producer_name(ClientId) -> + ClientIdBin = to_bin(ClientId), + binary_to_atom( + iolist_to_binary([ + <<"producer-">>, + ClientIdBin + ]) + ). + +-spec start_producer(config(), manager_id(), pulsar_client_id(), map()) -> {ok, state()}. +start_producer(Config, InstanceId, ClientId, ClientOpts) -> + #{ + conn_opts := ConnOpts, + ssl_opts := SSLOpts + } = ClientOpts, + #{ + batch_size := BatchSize, + buffer := #{ + mode := BufferMode, + per_partition_limit := PerPartitionLimit, + segment_bytes := SegmentBytes, + memory_overload_protection := MemOLP0 + }, + compression := Compression, + max_batch_bytes := MaxBatchBytes, + message := MessageTemplateOpts, + pulsar_topic := PulsarTopic0, + retention_period := RetentionPeriod, + send_buffer := SendBuffer, + strategy := Strategy, + sync_timeout := SyncTimeout + } = Config, + {OffloadMode, ReplayQDir} = + case BufferMode of + memory -> {false, false}; + disk -> {false, replayq_dir(ClientId)}; + hybrid -> {true, replayq_dir(ClientId)} + end, + MemOLP = + case os:type() of + {unix, linux} -> MemOLP0; + _ -> false + end, + ReplayQOpts = #{ + replayq_dir => ReplayQDir, + replayq_offload_mode => OffloadMode, + replayq_max_total_bytes => PerPartitionLimit, + replayq_seg_bytes => SegmentBytes, + drop_if_highmem => MemOLP + }, + ProducerName = producer_name(ClientId), + MessageTemplate = compile_message_template(MessageTemplateOpts), + ProducerOpts0 = + #{ + batch_size => BatchSize, + compression => Compression, + conn_opts => ConnOpts, + max_batch_bytes => MaxBatchBytes, + name => ProducerName, + retention_period => RetentionPeriod, + ssl_opts => SSLOpts, + strategy => Strategy, + tcp_opts => [{sndbuf, SendBuffer}] + }, + ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0), + PulsarTopic = binary_to_list(PulsarTopic0), + try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of + {ok, Producers} -> + State = #{ + pulsar_client_id => ClientId, + producers => Producers, + sync_timeout => SyncTimeout, + message_template => MessageTemplate + }, + ?tp(pulsar_producer_bridge_started, #{}), + {ok, State} + catch + Kind:Error:Stacktrace -> + ?SLOG(error, #{ + msg => "failed_to_start_pulsar_producer", + instance_id => InstanceId, + kind => Kind, + reason => Error, + stacktrace => Stacktrace + }), + stop_client(ClientId), + throw(failed_to_start_pulsar_producer) + end. + +-spec stop_client(pulsar_client_id()) -> ok. +stop_client(ClientId) -> + _ = log_when_error( + fun() -> + ok = pulsar:stop_and_delete_supervised_client(ClientId), + ?tp(pulsar_bridge_client_stopped, #{pulsar_client_id => ClientId}), + ok + end, + #{ + msg => "failed_to_delete_pulsar_client", + pulsar_client_id => ClientId + } + ), + ok. + +-spec stop_producers(pulsar_client_id(), pulsar_producers:producers()) -> ok. +stop_producers(ClientId, Producers) -> + _ = log_when_error( + fun() -> + ok = pulsar:stop_and_delete_supervised_producers(Producers), + ?tp(pulsar_bridge_producer_stopped, #{pulsar_client_id => ClientId}), + ok + end, + #{ + msg => "failed_to_delete_pulsar_producer", + pulsar_client_id => ClientId + } + ), + ok. + +log_when_error(Fun, Log) -> + try + Fun() + catch + C:E -> + ?SLOG(error, Log#{ + exception => C, + reason => E + }) + end. + +-spec compile_message_template(message_template_raw()) -> message_template(). +compile_message_template(TemplateOpts) -> + KeyTemplate = maps:get(key, TemplateOpts, <<"${.clientid}">>), + ValueTemplate = maps:get(value, TemplateOpts, <<"${.}">>), + #{ + key => preproc_tmpl(KeyTemplate), + value => preproc_tmpl(ValueTemplate) + }. + +preproc_tmpl(Template) -> + emqx_plugin_libs_rule:preproc_tmpl(Template). + +render_message( + Message, #{key := KeyTemplate, value := ValueTemplate} +) -> + #{ + key => render(Message, KeyTemplate), + value => render(Message, ValueTemplate) + }. + +render(Message, Template) -> + Opts = #{ + var_trans => fun + (undefined) -> <<"">>; + (X) -> emqx_plugin_libs_rule:bin(X) + end, + return => full_binary + }, + emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts). + +get_producer_status(Producers) -> + case pulsar_producers:all_connected(Producers) of + true -> connected; + false -> connecting + end. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_sup.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_sup.erl new file mode 100644 index 000000000..17121beab --- /dev/null +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_sup.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_pulsar_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +init([]) -> + SupFlags = #{ + strategy => one_for_all, + intensity => 0, + period => 1 + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl new file mode 100644 index 000000000..f86dbc65d --- /dev/null +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -0,0 +1,819 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_pulsar_impl_producer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +-define(BRIDGE_TYPE_BIN, <<"pulsar_producer">>). +-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_pulsar]). +-define(RULE_TOPIC, "mqtt/rule"). +-define(RULE_TOPIC_BIN, <>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, plain}, + {group, tls} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + OnlyOnceTCs = only_once_tests(), + TCs = AllTCs -- OnlyOnceTCs, + [ + {plain, AllTCs}, + {tls, TCs} + ]. + +only_once_tests() -> + [t_create_via_http]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)), + _ = application:stop(emqx_connector), + ok. + +init_per_group(plain = Type, Config) -> + PulsarHost = os:getenv("PULSAR_PLAIN_HOST", "toxiproxy"), + PulsarPort = list_to_integer(os:getenv("PULSAR_PLAIN_PORT", "6652")), + ProxyName = "pulsar_plain", + case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of + true -> + Config1 = common_init_per_group(), + [ + {proxy_name, ProxyName}, + {pulsar_host, PulsarHost}, + {pulsar_port, PulsarPort}, + {pulsar_type, Type}, + {use_tls, false} + | Config1 ++ Config + ]; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_pulsar); + _ -> + {skip, no_pulsar} + end + end; +init_per_group(tls = Type, Config) -> + PulsarHost = os:getenv("PULSAR_TLS_HOST", "toxiproxy"), + PulsarPort = list_to_integer(os:getenv("PULSAR_TLS_PORT", "6653")), + ProxyName = "pulsar_tls", + case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of + true -> + Config1 = common_init_per_group(), + [ + {proxy_name, ProxyName}, + {pulsar_host, PulsarHost}, + {pulsar_port, PulsarPort}, + {pulsar_type, Type}, + {use_tls, true} + | Config1 ++ Config + ]; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_pulsar); + _ -> + {skip, no_pulsar} + end + end; +init_per_group(_Group, Config) -> + Config. + +end_per_group(Group, Config) when + Group =:= plain +-> + common_end_per_group(Config), + ok; +end_per_group(_Group, _Config) -> + ok. + +common_init_per_group() -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + application:load(emqx_bridge), + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps(?APPS), + {ok, _} = application:ensure_all_started(emqx_connector), + emqx_mgmt_api_test_util:init_suite(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>, + [ + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {mqtt_topic, MQTTTopic} + ]. + +common_end_per_group(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + delete_all_bridges(), + ok. + +init_per_testcase(TestCase, Config) -> + common_init_per_testcase(TestCase, Config). + +end_per_testcase(_Testcase, Config) -> + case proplists:get_bool(skip_does_not_apply, Config) of + true -> + ok; + false -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + delete_all_bridges(), + stop_consumer(Config), + %% in CI, apparently this needs more time since the + %% machines struggle with all the containers running... + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + ok + end. + +common_init_per_testcase(TestCase, Config0) -> + ct:timetrap(timer:seconds(60)), + delete_all_bridges(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + PulsarTopic = + << + (atom_to_binary(TestCase))/binary, + UniqueNum/binary + >>, + PulsarType = ?config(pulsar_type, Config0), + Config1 = [{pulsar_topic, PulsarTopic} | Config0], + {Name, ConfigString, PulsarConfig} = pulsar_config( + TestCase, PulsarType, Config1 + ), + ConsumerConfig = start_consumer(TestCase, Config1), + Config = ConsumerConfig ++ Config1, + ok = snabbkaffe:start_trace(), + [ + {pulsar_name, Name}, + {pulsar_config_string, ConfigString}, + {pulsar_config, PulsarConfig} + | Config + ]. + +delete_all_bridges() -> + lists:foreach( + fun(#{name := Name, type := Type}) -> + emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ). + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +pulsar_config(TestCase, _PulsarType, Config) -> + UniqueNum = integer_to_binary(erlang:unique_integer()), + PulsarHost = ?config(pulsar_host, Config), + PulsarPort = ?config(pulsar_port, Config), + PulsarTopic = ?config(pulsar_topic, Config), + AuthType = proplists:get_value(sasl_auth_mechanism, Config, none), + UseTLS = proplists:get_value(use_tls, Config, false), + Name = << + (atom_to_binary(TestCase))/binary, UniqueNum/binary + >>, + MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>), + Prefix = + case UseTLS of + true -> <<"pulsar+ssl://">>; + false -> <<"pulsar://">> + end, + ServerURL = iolist_to_binary([ + Prefix, + PulsarHost, + ":", + integer_to_binary(PulsarPort) + ]), + ConfigString = + io_lib:format( + "bridges.pulsar_producer.~s {\n" + " enable = true\n" + " servers = \"~s\"\n" + " sync_timeout = 5s\n" + " compression = no_compression\n" + " send_buffer = 1MB\n" + " retention_period = infinity\n" + " max_batch_bytes = 900KB\n" + " batch_size = 1\n" + " strategy = random\n" + " buffer {\n" + " mode = memory\n" + " per_partition_limit = 10MB\n" + " segment_bytes = 5MB\n" + " memory_overload_protection = true\n" + " }\n" + " message {\n" + " key = \"${.clientid}\"\n" + " value = \"${.}\"\n" + " }\n" + "~s" + " ssl {\n" + " enable = ~p\n" + " verify = verify_none\n" + " server_name_indication = \"auto\"\n" + " }\n" + " pulsar_topic = \"~s\"\n" + " local_topic = \"~s\"\n" + "}\n", + [ + Name, + ServerURL, + authentication(AuthType), + UseTLS, + PulsarTopic, + MQTTTopic + ] + ), + {Name, ConfigString, parse_and_check(ConfigString, Name)}. + +parse_and_check(ConfigString, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + TypeBin = ?BRIDGE_TYPE_BIN, + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf, + Config. + +authentication(_) -> + " authentication = none\n". + +resource_id(Config) -> + Type = ?BRIDGE_TYPE_BIN, + Name = ?config(pulsar_name, Config), + emqx_bridge_resource:resource_id(Type, Name). + +create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> + Type = ?BRIDGE_TYPE_BIN, + Name = ?config(pulsar_name, Config), + PulsarConfig0 = ?config(pulsar_config, Config), + PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides), + emqx_bridge:create(Type, Name, PulsarConfig). + +create_bridge_api(Config) -> + create_bridge_api(Config, _Overrides = #{}). + +create_bridge_api(Config, Overrides) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(pulsar_name, Config), + PulsarConfig0 = ?config(pulsar_config, Config), + PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides), + Params = PulsarConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("creating bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; + Error -> + Error + end, + ct:pal("bridge create result: ~p", [Res]), + Res. + +update_bridge_api(Config) -> + update_bridge_api(Config, _Overrides = #{}). + +update_bridge_api(Config, Overrides) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(pulsar_name, Config), + PulsarConfig0 = ?config(pulsar_config, Config), + PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides), + BridgeId = emqx_bridge_resource:bridge_id(TypeBin, Name), + Params = PulsarConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("updating bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Params, Opts) of + {ok, {_Status, _Headers, Body0}} -> {ok, emqx_utils_json:decode(Body0, [return_maps])}; + Error -> Error + end, + ct:pal("bridge update result: ~p", [Res]), + Res. + +probe_bridge_api(Config) -> + probe_bridge_api(Config, _Overrides = #{}). + +probe_bridge_api(Config, Overrides) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(pulsar_name, Config), + PulsarConfig = ?config(pulsar_config, Config), + Params0 = PulsarConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + Params = maps:merge(Params0, Overrides), + Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("probing bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of + {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0}; + Error -> Error + end, + ct:pal("bridge probe result: ~p", [Res]), + Res. + +start_consumer(TestCase, Config) -> + PulsarHost = ?config(pulsar_host, Config), + PulsarPort = ?config(pulsar_port, Config), + PulsarTopic = ?config(pulsar_topic, Config), + UseTLS = ?config(use_tls, Config), + %% FIXME: patch pulsar to accept binary urls... + Scheme = + case UseTLS of + true -> <<"pulsar+ssl://">>; + false -> <<"pulsar://">> + end, + URL = + binary_to_list( + <> + ), + ConnOpts = #{}, + ConsumerClientId = TestCase, + CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"), + SSLOpts = #{ + enable => UseTLS, + keyfile => filename:join([CertsPath, "key.pem"]), + certfile => filename:join([CertsPath, "cert.pem"]), + cacertfile => filename:join([CertsPath, "cacert.pem"]) + }, + {ok, _ClientPid} = pulsar:ensure_supervised_client( + ConsumerClientId, + [URL], + #{ + conn_opts => ConnOpts, + ssl_opts => emqx_tls_lib:to_client_opts(SSLOpts) + } + ), + ConsumerOpts = #{ + cb_init_args => #{send_to => self()}, + cb_module => pulsar_echo_consumer, + sub_type => 'Shared', + subscription => atom_to_list(TestCase), + max_consumer_num => 1, + %% Note! This must not coincide with the client + %% id, or else weird bugs will happen, like the + %% consumer never starts... + name => test_consumer, + consumer_id => 1, + conn_opts => ConnOpts + }, + {ok, Consumer} = pulsar:ensure_supervised_consumers( + ConsumerClientId, + PulsarTopic, + ConsumerOpts + ), + %% since connection is async, and there's currently no way to + %% specify the subscription initial position as `Earliest', we + %% need to wait until the consumer is connected to avoid + %% flakiness. + ok = wait_until_consumer_connected(Consumer), + [ + {consumer_client_id, ConsumerClientId}, + {pulsar_consumer, Consumer} + ]. + +stop_consumer(Config) -> + ConsumerClientId = ?config(consumer_client_id, Config), + Consumer = ?config(pulsar_consumer, Config), + ok = pulsar:stop_and_delete_supervised_consumers(Consumer), + ok = pulsar:stop_and_delete_supervised_client(ConsumerClientId), + ok. + +wait_until_consumer_connected(Consumer) -> + ?retry( + _Sleep = 300, + _Attempts0 = 20, + true = pulsar_consumers:all_connected(Consumer) + ), + ok. + +wait_until_producer_connected() -> + wait_until_connected(pulsar_producers_sup, pulsar_producer). + +wait_until_connected(SupMod, Mod) -> + Pids = [ + P + || {_Name, SupPid, _Type, _Mods} <- supervisor:which_children(SupMod), + P <- element(2, process_info(SupPid, links)), + case proc_lib:initial_call(P) of + {Mod, init, _} -> true; + _ -> false + end + ], + ?retry( + _Sleep = 300, + _Attempts0 = 20, + lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids) + ), + ok. + +create_rule_and_action_http(Config) -> + PulsarName = ?config(pulsar_name, Config), + BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, PulsarName), + Params = #{ + enable => true, + sql => <<"SELECT * FROM \"", ?RULE_TOPIC, "\"">>, + actions => [BridgeId] + }, + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + ct:pal("rule action params: ~p", [Params]), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; + Error -> Error + end. + +receive_consumed(Timeout) -> + receive + {pulsar_message, #{payloads := Payloads}} -> + lists:map(fun try_decode_json/1, Payloads) + after Timeout -> + ct:pal("mailbox: ~p", [process_info(self(), messages)]), + ct:fail("no message consumed") + end. + +try_decode_json(Payload) -> + case emqx_utils_json:safe_decode(Payload, [return_maps]) of + {error, _} -> + Payload; + {ok, JSON} -> + JSON + end. + +cluster(Config) -> + PrivDataDir = ?config(priv_dir, Config), + PeerModule = + case os:getenv("IS_CI") of + false -> + slave; + _ -> + ct_slave + end, + Cluster = emqx_common_test_helpers:emqx_cluster( + [core, core], + [ + {apps, [emqx_conf, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]}, + {listener_ports, []}, + {peer_mod, PeerModule}, + {priv_data_dir, PrivDataDir}, + {load_schema, true}, + {start_autocluster, true}, + {schema_mod, emqx_ee_conf_schema}, + {env_handler, fun + (emqx) -> + application:set_env(emqx, boot_modules, [broker, router]), + ok; + (emqx_conf) -> + ok; + (_) -> + ok + end} + ] + ), + ct:pal("cluster: ~p", [Cluster]), + Cluster. + +start_cluster(Cluster) -> + Nodes = + [ + emqx_common_test_helpers:start_slave(Name, Opts) + || {Name, Opts} <- Cluster + ], + on_exit(fun() -> + emqx_utils:pmap( + fun(N) -> + ct:pal("stopping ~p", [N]), + ok = emqx_common_test_helpers:stop_slave(N) + end, + Nodes + ) + end), + Nodes. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_start_and_produce_ok(Config) -> + MQTTTopic = ?config(mqtt_topic, Config), + ResourceId = resource_id(Config), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + QoS = 0, + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + ?check_trace( + begin + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + %% Publish using local topic. + Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload), + emqx:publish(Message0), + %% Publish using rule engine. + Message1 = emqx_message:make(ClientId, QoS, ?RULE_TOPIC_BIN, Payload), + emqx:publish(Message1), + + #{rule_id => RuleId} + end, + fun(#{rule_id := RuleId}, _Trace) -> + Data0 = receive_consumed(5_000), + ?assertMatch( + [ + #{ + <<"clientid">> := ClientId, + <<"event">> := <<"message.publish">>, + <<"payload">> := Payload, + <<"topic">> := MQTTTopic + } + ], + Data0 + ), + Data1 = receive_consumed(5_000), + ?assertMatch( + [ + #{ + <<"clientid">> := ClientId, + <<"event">> := <<"message.publish">>, + <<"payload">> := Payload, + <<"topic">> := ?RULE_TOPIC_BIN + } + ], + Data1 + ), + ?retry( + _Sleep = 100, + _Attempts0 = 20, + begin + ?assertMatch( + #{ + counters := #{ + dropped := 0, + failed := 0, + late_reply := 0, + matched := 2, + received := 0, + retried := 0, + success := 2 + } + }, + emqx_resource_manager:get_metrics(ResourceId) + ), + ?assertEqual( + 1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success') + ), + ?assertEqual( + 0, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.failed') + ), + ok + end + ), + ok + end + ), + ok. + +%% Under normal operations, the bridge will be called async via +%% `simple_async_query'. +t_sync_query(Config) -> + ResourceId = resource_id(Config), + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + Message = {send_message, #{payload => Payload}}, + ?assertMatch( + {ok, #{sequence_id := _}}, emqx_resource:simple_sync_query(ResourceId, Message) + ), + ok + end, + [] + ), + ok. + +t_create_via_http(Config) -> + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config)), + + %% lightweight matrix testing some configs + ?assertMatch( + {ok, _}, + update_bridge_api( + Config, + #{ + <<"buffer">> => + #{<<"mode">> => <<"disk">>} + } + ) + ), + ?assertMatch( + {ok, _}, + update_bridge_api( + Config, + #{ + <<"buffer">> => + #{ + <<"mode">> => <<"hybrid">>, + <<"memory_overload_protection">> => true + } + } + ) + ), + ok + end, + [] + ), + ok. + +t_start_stop(Config) -> + PulsarName = ?config(pulsar_name, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + %% Since the connection process is async, we give it some time to + %% stabilize and avoid flakiness. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + %% Check that the bridge probe API doesn't leak atoms. + redbug:start( + [ + "emqx_resource_manager:health_check_interval -> return", + "emqx_resource_manager:with_health_check -> return" + ], + [{msgs, 100}, {time, 30_000}] + ), + ProbeRes0 = probe_bridge_api( + Config, + #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), + AtomsBefore = erlang:system_info(atom_count), + %% Probe again; shouldn't have created more atoms. + ProbeRes1 = probe_bridge_api( + Config, + #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1), + AtomsAfter = erlang:system_info(atom_count), + ?assertEqual(AtomsBefore, AtomsAfter), + + %% Now stop the bridge. + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_bridge:disable_enable(disable, ?BRIDGE_TYPE_BIN, PulsarName), + #{?snk_kind := pulsar_bridge_stopped}, + 5_000 + ) + ), + + ok + end, + fun(Trace) -> + %% one for each probe, one for real + ?assertMatch([_, _, _], ?of_kind(pulsar_bridge_producer_stopped, Trace)), + ?assertMatch([_, _, _], ?of_kind(pulsar_bridge_client_stopped, Trace)), + ?assertMatch([_, _, _], ?of_kind(pulsar_bridge_stopped, Trace)), + ok + end + ), + ok. + +t_on_get_status(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + ResourceId = resource_id(Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + %% Since the connection process is async, we give it some time to + %% stabilize and avoid flakiness. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ct:sleep(500), + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)) + end), + %% Check that it recovers itself. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + ok. + +t_cluster(Config) -> + MQTTTopic = ?config(mqtt_topic, Config), + ResourceId = resource_id(Config), + Cluster = cluster(Config), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + QoS = 0, + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + ?check_trace( + begin + Nodes = [N1, N2 | _] = start_cluster(Cluster), + {ok, SRef0} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := pulsar_producer_bridge_started}), + length(Nodes), + 15_000 + ), + {ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end), + {ok, _} = snabbkaffe:receive_events(SRef0), + lists:foreach( + fun(N) -> + ?retry( + _Sleep = 1_000, + _Attempts0 = 20, + ?assertEqual( + {ok, connected}, + erpc:call(N, emqx_resource_manager, health_check, [ResourceId]), + #{node => N} + ) + ) + end, + Nodes + ), + erpc:multicall(Nodes, fun wait_until_producer_connected/0), + Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload), + erpc:call(N2, emqx, publish, [Message0]), + + lists:foreach( + fun(N) -> + ?assertEqual( + {ok, connected}, + erpc:call(N, emqx_resource_manager, health_check, [ResourceId]), + #{node => N} + ) + end, + Nodes + ), + + ok + end, + fun(_Trace) -> + Data0 = receive_consumed(10_000), + ?assertMatch( + [ + #{ + <<"clientid">> := ClientId, + <<"event">> := <<"message.publish">>, + <<"payload">> := Payload, + <<"topic">> := MQTTTopic + } + ], + Data0 + ), + ok + end + ), + ok. diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE_data/pulsar_echo_consumer.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE_data/pulsar_echo_consumer.erl new file mode 100644 index 000000000..834978851 --- /dev/null +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE_data/pulsar_echo_consumer.erl @@ -0,0 +1,25 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(pulsar_echo_consumer). + +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% pulsar consumer API +-export([init/2, handle_message/3]). + +init(Topic, Args) -> + ct:pal("consumer init: ~p", [#{topic => Topic, args => Args}]), + SendTo = maps:get(send_to, Args), + ?tp(pulsar_echo_consumer_init, #{topic => Topic}), + {ok, #{topic => Topic, send_to => SendTo}}. + +handle_message(Message, Payloads, State) -> + #{send_to := SendTo, topic := Topic} = State, + ct:pal( + "pulsar consumer received:\n ~p", + [#{message => Message, payloads => Payloads}] + ), + SendTo ! {pulsar_message, #{topic => Topic, message => Message, payloads => Payloads}}, + ?tp(pulsar_echo_consumer_message, #{topic => Topic, message => Message, payloads => Payloads}), + {ok, 'Individual', State}. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 877b35fff..f6a0ebebf 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -165,8 +165,13 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> create_dry_run(ResourceType, Config) -> ResId = make_test_id(), MgrId = set_new_owner(ResId), + Opts = + case is_map(Config) of + true -> maps:get(resource_opts, Config, #{}); + false -> #{} + end, ok = emqx_resource_manager_sup:ensure_child( - MgrId, ResId, <<"dry_run">>, ResourceType, Config, #{} + MgrId, ResId, <<"dry_run">>, ResourceType, Config, Opts ), case wait_for_ready(ResId, 5000) of ok -> diff --git a/changes/ee/feat-10378.en.md b/changes/ee/feat-10378.en.md new file mode 100644 index 000000000..ebdd299c8 --- /dev/null +++ b/changes/ee/feat-10378.en.md @@ -0,0 +1 @@ +Implement Pulsar Producer Bridge, which supports publishing messages to Pulsar from MQTT topics. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 7dc8882b3..5544825f8 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -9,7 +9,9 @@ telemetry, emqx_bridge_kafka, emqx_bridge_gcp_pubsub, - emqx_bridge_opents + emqx_bridge_cassandra, + emqx_bridge_opents, + emqx_bridge_pulsar ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 4b83fda3f..38f471ca2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -36,7 +36,8 @@ api_schemas(Method) -> ref(emqx_ee_bridge_dynamo, Method), ref(emqx_ee_bridge_rocketmq, Method), ref(emqx_ee_bridge_sqlserver, Method), - ref(emqx_bridge_opents, Method) + ref(emqx_bridge_opents, Method), + ref(emqx_bridge_pulsar, Method ++ "_producer") ]. schema_modules() -> @@ -57,7 +58,8 @@ schema_modules() -> emqx_ee_bridge_dynamo, emqx_ee_bridge_rocketmq, emqx_ee_bridge_sqlserver, - emqx_bridge_opents + emqx_bridge_opents, + emqx_bridge_pulsar ]. examples(Method) -> @@ -97,7 +99,8 @@ resource_type(clickhouse) -> emqx_ee_connector_clickhouse; resource_type(dynamo) -> emqx_ee_connector_dynamo; resource_type(rocketmq) -> emqx_ee_connector_rocketmq; resource_type(sqlserver) -> emqx_ee_connector_sqlserver; -resource_type(opents) -> emqx_bridge_opents_connector. +resource_type(opents) -> emqx_bridge_opents_connector; +resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer. fields(bridges) -> [ @@ -165,7 +168,8 @@ fields(bridges) -> required => false } )} - ] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ + ] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++ + redis_structs() ++ pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs(). mongodb_structs() -> @@ -202,6 +206,18 @@ kafka_structs() -> )} ]. +pulsar_structs() -> + [ + {pulsar_producer, + mk( + hoconsc:map(name, ref(emqx_bridge_pulsar, pulsar_producer)), + #{ + desc => <<"Pulsar Producer Bridge Config">>, + required => false + } + )} + ]. + influxdb_structs() -> [ {Protocol, diff --git a/mix.exs b/mix.exs index e2230d55d..97fdd732a 100644 --- a/mix.exs +++ b/mix.exs @@ -169,7 +169,8 @@ defmodule EMQXUmbrella.MixProject do :emqx_bridge_redis, :emqx_bridge_rocketmq, :emqx_bridge_tdengine, - :emqx_bridge_timescale + :emqx_bridge_timescale, + :emqx_bridge_pulsar ]) end @@ -360,6 +361,7 @@ defmodule EMQXUmbrella.MixProject do emqx_ee_connector: :permanent, emqx_ee_bridge: :permanent, emqx_bridge_kafka: :permanent, + emqx_bridge_pulsar: :permanent, emqx_bridge_gcp_pubsub: :permanent, emqx_bridge_cassandra: :permanent, emqx_bridge_opents: :permanent, diff --git a/rebar.config.erl b/rebar.config.erl index 3c863046f..020285a44 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -454,6 +454,7 @@ relx_apps_per_edition(ee) -> emqx_ee_connector, emqx_ee_bridge, emqx_bridge_kafka, + emqx_bridge_pulsar, emqx_bridge_gcp_pubsub, emqx_bridge_cassandra, emqx_bridge_opents, diff --git a/rel/i18n/emqx_bridge_pulsar.hocon b/rel/i18n/emqx_bridge_pulsar.hocon new file mode 100644 index 000000000..1d2bb4599 --- /dev/null +++ b/rel/i18n/emqx_bridge_pulsar.hocon @@ -0,0 +1,176 @@ +emqx_bridge_pulsar { + auth_basic { + desc = "Parameters for basic authentication." + label = "Basic auth params" + } + + auth_basic_password { + desc = "Basic authentication password." + label = "Password" + } + + auth_basic_username { + desc = "Basic authentication username." + label = "Username" + } + + auth_token { + desc = "Parameters for token authentication." + label = "Token auth params" + } + + auth_token_jwt { + desc = "JWT authentication token." + label = "JWT" + } + + authentication { + desc = "Authentication configs." + label = "Authentication" + } + + buffer_memory_overload_protection { + desc = "Applicable when buffer mode is set to memory\n" + "EMQX will drop old buffered messages under high memory pressure." + " The high memory threshold is defined in config sysmon.os.sysmem_high_watermark." + " NOTE: This config only works on Linux." + label = "Memory Overload Protection" + } + + buffer_mode { + desc = "Message buffer mode.\n" + "\n" + "memory: Buffer all messages in memory. The messages will be lost" + " in case of EMQX node restart\ndisk: Buffer all messages on disk." + " The messages on disk are able to survive EMQX node restart.\n" + "hybrid: Buffer message in memory first, when up to certain limit" + " (see segment_bytes config for more information), then start offloading" + " messages to disk, Like memory mode, the messages will be lost in" + " case of EMQX node restart." + label = "Buffer Mode" + } + + buffer_per_partition_limit { + desc = "Number of bytes allowed to buffer for each Pulsar partition." + " When this limit is exceeded, old messages will be dropped in a trade for credits" + " for new messages to be buffered." + label = "Per-partition Buffer Limit" + } + + buffer_segment_bytes { + desc = "Applicable when buffer mode is set to disk or hybrid.\n" + "This value is to specify the size of each on-disk buffer file." + label = "Segment File Bytes" + } + + config_enable { + desc = "Enable (true) or disable (false) this Pulsar bridge." + label = "Enable or Disable" + } + + desc_name { + desc = "Bridge name, used as a human-readable description of the bridge." + label = "Bridge Name" + } + + desc_type { + desc = "The Bridge Type" + label = "Bridge Type" + } + + producer_batch_size { + desc = "Maximum number of individual requests to batch in a Pulsar message." + label = "Batch size" + } + + producer_buffer { + desc = "Configure producer message buffer.\n\n" + "Tell Pulsar producer how to buffer messages when EMQX has more messages to" + " send than Pulsar can keep up, or when Pulsar is down." + label = "Message Buffer" + } + + producer_compression { + desc = "Compression method." + label = "Compression" + } + + producer_key_template { + desc = "Template to render Pulsar message key." + label = "Message Key" + } + + producer_local_topic { + desc = "MQTT topic or topic filter as data source (bridge input)." + " If rule action is used as data source, this config should be left empty," + " otherwise messages will be duplicated in Pulsar." + label = "Source MQTT Topic" + } + + producer_max_batch_bytes { + desc = "Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers" + " default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in" + " order to compensate Pulsar message encoding overheads (especially when each individual" + " message is very small). When a single message is over the limit, it is still" + " sent (as a single element batch)." + label = "Max Batch Bytes" + } + + producer_message_opts { + desc = "Template to render a Pulsar message." + label = "Pulsar Message Template" + } + + producer_pulsar_message { + desc = "Template to render a Pulsar message." + label = "Pulsar Message Template" + } + + producer_pulsar_topic { + desc = "Pulsar topic name" + label = "Pulsar topic name" + } + + producer_retention_period { + desc = "The amount of time messages will be buffered while there is no connection to" + " the Pulsar broker. Longer times mean that more memory/disk will be used" + label = "Retention Period" + } + + producer_send_buffer { + desc = "Fine tune the socket send buffer. The default value is tuned for high throughput." + label = "Socket Send Buffer Size" + } + + producer_strategy { + desc = "Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions.\n" + "\n" + "random: Randomly pick a partition for each message.\n" + "roundrobin: Pick each available producer in turn for each message.\n" + "first_key_dispatch: Hash Pulsar message key of the first message in a batch" + " to a partition number." + label = "Partition Strategy" + } + + producer_sync_timeout { + desc = "Maximum wait time for receiving a receipt from Pulsar when publishing synchronously." + label = "Sync publish timeout" + } + + producer_value_template { + desc = "Template to render Pulsar message value." + label = "Message Value" + } + + pulsar_producer_struct { + desc = "Configuration for a Pulsar bridge." + label = "Pulsar Bridge Configuration" + } + + servers { + desc = "A comma separated list of Pulsar URLs in the form scheme://host[:port]" + " for the client to connect to. The supported schemes are pulsar:// (default)" + " and pulsar+ssl://. The default port is 6650." + label = "Servers" + } +} diff --git a/rel/i18n/zh/emqx_bridge_pulsar.hocon b/rel/i18n/zh/emqx_bridge_pulsar.hocon new file mode 100644 index 000000000..3af8652a4 --- /dev/null +++ b/rel/i18n/zh/emqx_bridge_pulsar.hocon @@ -0,0 +1,173 @@ +emqx_bridge_pulsar { + + pulsar_producer_struct { + desc = "Pulsar 桥接配置" + label = "Pulsar 桥接配置" + } + + desc_type { + desc = "桥接类型" + label = "桥接类型" + } + + desc_name { + desc = "桥接名字,可读描述" + label = "桥接名字" + } + + config_enable { + desc = "启用(true)或停用该(false)Pulsar 数据桥接。" + label = "启用或停用" + } + + servers { + desc = "以scheme://host[:port]形式分隔的Pulsar URL列表," + "供客户端连接使用。支持的方案是 pulsar:// (默认)" + "和pulsar+ssl://。默认的端口是6650。" + label = "服务员" + } + + authentication { + desc = "认证参数。" + label = "认证" + } + + producer_batch_size { + desc = "在一个Pulsar消息中批处理的单个请求的最大数量。" + label = "批量大小" + } + + producer_compression { + desc = "压缩方法。" + label = "压缩" + } + + producer_send_buffer { + desc = "TCP socket 的发送缓存调优。默认值是针对高吞吐量的一个推荐值。" + label = "Socket 发送缓存大小" + } + + producer_sync_timeout { + desc = "同步发布时,从Pulsar接收发送回执的最长等待时间。" + label = "同步发布超时" + } + + auth_basic_username { + desc = "基本认证用户名。" + label = "用户名" + } + + auth_basic_password { + desc = "基本认证密码。" + label = "密码" + } + + auth_token_jwt { + desc = "JWT认证令牌。" + label = "JWT" + } + + producer_max_batch_bytes { + desc = "最大消息批量字节数。" + "大多数 Pulsar 环境的默认最低值是 5 MB,EMQX 的默认值比 5 MB 更小是因为需要" + "补偿 Pulsar 消息编码所需要的额外字节(尤其是当每条消息都很小的情况下)。" + "当单个消息的大小超过该限制时,它仍然会被发送,(相当于该批量中只有单个消息)。" + label = "最大批量字节数" + } + + producer_retention_period { + desc = "当没有连接到Pulsar代理时,信息将被缓冲的时间。 较长的时间意味着将使用更多的内存/磁盘" + label = "保留期" + } + + producer_local_topic { + desc = "MQTT 主题数据源由桥接指定,或留空由规则动作指定。" + label = "源 MQTT 主题" + } + + producer_pulsar_topic { + desc = "Pulsar 主题名称" + label = "Pulsar 主题名称" + } + + producer_strategy { + desc = "设置消息发布时应该如何选择 Pulsar 分区。\n\n" + "random: 为每个消息随机选择一个分区。\n" + "roundrobin: 依次为每条信息挑选可用的生产商。\n" + "first_key_dispatch: 将一批信息中的第一条信息的Pulsar信息密钥哈希到一个分区编号。" + label = "分区选择策略" + } + + producer_buffer { + desc = "配置消息缓存的相关参数。\n\n" + "当 EMQX 需要发送的消息超过 Pulsar 处理能力,或者当 Pulsar 临时下线时,EMQX 内部会将消息缓存起来。" + label = "消息缓存" + } + + buffer_mode { + desc = "消息缓存模式。\n" + "memory: 所有的消息都缓存在内存里。如果 EMQX 服务重启,缓存的消息会丢失。\n" + "disk: 缓存到磁盘上。EMQX 重启后会继续发送重启前未发送完成的消息。\n" + "hybrid: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制" + "(配置项 segment_bytes 描述了该限制)后,后续的消息会缓存到磁盘上。" + "与 memory 模式一样,如果 EMQX 服务重启,缓存的消息会丢失。" + label = "缓存模式" + } + + buffer_per_partition_limit { + desc = "为每个 Pulsar 分区设置的最大缓存字节数。当超过这个上限之后,老的消息会被丢弃," + "为新的消息腾出空间。" + label = "Pulsar 分区缓存上限" + } + + buffer_segment_bytes { + desc = "当缓存模式是 diskhybrid 时适用。" + "该配置用于指定缓存到磁盘上的文件的大小。" + label = "缓存文件大小" + } + + buffer_memory_overload_protection { + desc = "缓存模式是 memoryhybrid 时适用。" + "当系统处于高内存压力时,从队列中丢弃旧的消息以减缓内存增长。" + "内存压力值由配置项 sysmon.os.sysmem_high_watermark 决定。" + "注意,该配置仅在 Linux 系统中有效。" + label = "内存过载保护" + } + + producer_message_opts { + desc = "用于生成 Pulsar 消息的模版。" + label = "Pulsar 消息模版" + } + + producer_key_template { + desc = "生成 Pulsar 消息 Key 的模版。" + label = "消息的 Key" + } + + producer_value_template { + desc = "生成 Pulsar 消息 Value 的模版。" + label = "消息的 Value" + } + + auth_basic { + desc = "基本认证的参数。" + label = "基本认证参数" + } + + auth_token { + desc = "令牌认证的参数。" + label = "Token auth params" + } + + producer_buffer { + desc = "配置消息缓存的相关参数。\n\n" + "当 EMQX 需要发送的消息超过 Pulsar 处理能力,或者当 Pulsar 临时下线时,EMQX 内部会将消息缓存起来。" + label = "消息缓存" + } + + producer_pulsar_message { + desc = "用于生成 Pulsar 消息的模版。" + label = "Pulsar 消息模版" + } + +} diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index c153669f4..307063e84 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -190,7 +190,10 @@ for dep in ${CT_DEPS}; do ;; opents) FILES+=( '.ci/docker-compose-file/docker-compose-opents.yaml' ) - ;; + ;; + pulsar) + FILES+=( '.ci/docker-compose-file/docker-compose-pulsar-tcp.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1