From 2228a0d47716d4d95a7860872f8bc1d80b60c25f Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Tue, 24 Oct 2023 20:55:48 +0200 Subject: [PATCH] feat: port azure event hub to bridge_v2 --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 10 +- .../src/schema/emqx_bridge_v2_enterprise.erl | 18 +- .../src/emqx_bridge_azure_event_hub.app.src | 2 +- .../src/emqx_bridge_azure_event_hub.erl | 181 +++++++++++++++++- .../src/emqx_bridge_kafka.erl | 2 +- .../src/emqx_connector_resource.erl | 2 +- .../src/schema/emqx_connector_ee_schema.erl | 26 ++- .../src/schema/emqx_connector_schema.erl | 3 +- rel/i18n/emqx_bridge_azure_event_hub.hocon | 17 ++ 9 files changed, 235 insertions(+), 26 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 1ca2f8b18..9f5600cfa 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -620,7 +620,9 @@ id(BridgeType, BridgeName, ConnectorName) -> bridge_v2_type_to_connector_type(Bin) when is_binary(Bin) -> ?MODULE:bridge_v2_type_to_connector_type(binary_to_existing_atom(Bin)); bridge_v2_type_to_connector_type(kafka) -> - kafka. + kafka; +bridge_v2_type_to_connector_type(azure_event_hub) -> + azure_event_hub. %%==================================================================== %% Config Update Handler API @@ -799,12 +801,16 @@ is_valid_bridge_v1(BridgeV1Type, BridgeName) -> bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) -> ?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin)); bridge_v1_type_to_bridge_v2_type(kafka) -> - kafka. + kafka; +bridge_v1_type_to_bridge_v2_type(azure_event_hub) -> + azure_event_hub. is_bridge_v2_type(Atom) when is_atom(Atom) -> is_bridge_v2_type(atom_to_binary(Atom, utf8)); is_bridge_v2_type(<<"kafka">>) -> true; +is_bridge_v2_type(<<"azure_event_hub">>) -> + true; is_bridge_v2_type(_) -> false. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl index 3614b8695..48fb08911 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl @@ -27,13 +27,14 @@ examples(Method) -> schema_modules() -> [ - emqx_bridge_kafka + emqx_bridge_kafka, + emqx_bridge_azure_event_hub ]. fields(bridges_v2) -> - kafka_structs(). + bridge_v2_structs(). -kafka_structs() -> +bridge_v2_structs() -> [ {kafka, mk( @@ -42,6 +43,14 @@ kafka_structs() -> desc => <<"Kafka Producer Bridge V2 Config">>, required => false } + )}, + {azure_event_hub, + mk( + hoconsc:map(name, ref(emqx_bridge_azure_event_hub, bridge_v2)), + #{ + desc => <<"Azure Event Hub Bridge V2 Config">>, + required => false + } )} ]. @@ -51,7 +60,8 @@ api_schemas(Method) -> %% connector schema module. %% TODO: rename this to `kafka_producer' after alias support is added %% to hocon; keeping this as just `kafka' for backwards compatibility. - api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_bridge_v2") + api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_bridge_v2"), + api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub">>, Method ++ "_bridge_v2") ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src index 43033b657..ece0495f9 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_azure_event_hub, [ {description, "EMQX Enterprise Azure Event Hub Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index abdc6a265..9ffbd9862 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -7,7 +7,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -behaviour(hocon_schema). --behaviour(emqx_bridge_resource). +-behaviour(emqx_connector_resource). %% `hocon_schema' API -export([ @@ -18,14 +18,22 @@ ]). %% emqx_bridge_enterprise "unofficial" API --export([conn_bridge_examples/1]). +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 +]). +%% emqx_connector_resource behaviour callbacks -export([connector_config/1]). -export([producer_converter/2, host_opts/0]). -import(hoconsc, [mk/2, enum/1, ref/2]). +-define(AEH_CONNECTOR_TYPE, azure_event_hub). +-define(AEH_CONNECTOR_TYPE_BIN, <<"azure_event_hub">>). + %%------------------------------------------------------------------------------------------------- %% `hocon_schema' API %%------------------------------------------------------------------------------------------------- @@ -34,12 +42,50 @@ namespace() -> "bridge_azure_event_hub". roots() -> ["config_producer"]. +fields("put_connector") -> + Fields = override( + emqx_bridge_kafka:fields("put_connector"), + connector_overrides() + ), + override_documentations(Fields); +fields("get_connector") -> + emqx_bridge_schema:status_fields() ++ + fields("post_connector"); +fields("post_connector") -> + Fields = override( + emqx_bridge_kafka:fields("post_connector"), + connector_overrides() + ), + override_documentations(Fields); +fields("put_bridge_v2") -> + Fields = override( + emqx_bridge_kafka:fields("put_bridge_v2"), + bridge_v2_overrides() + ), + override_documentations(Fields); +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ + fields("post_bridge_v2"); +fields("post_bridge_v2") -> + Fields = override( + emqx_bridge_kafka:fields("post_bridge_v2"), + bridge_v2_overrides() + ), + override_documentations(Fields); fields("post_producer") -> Fields = override( emqx_bridge_kafka:fields("post_producer"), producer_overrides() ), override_documentations(Fields); +fields("config_bridge_v2") -> + fields(bridge_v2); +fields("config_connector") -> + Fields = override( + emqx_bridge_kafka:fields(kafka_connector), + connector_overrides() + ), + override_documentations(Fields); fields("config_producer") -> Fields = override( emqx_bridge_kafka:fields(kafka_producer), @@ -68,19 +114,37 @@ fields(kafka_message) -> Fields0 = emqx_bridge_kafka:fields(kafka_message), Fields = proplists:delete(timestamp, Fields0), override_documentations(Fields); +fields(bridge_v2) -> + Fields = + override( + emqx_bridge_kafka:fields(producer_opts), + bridge_v2_overrides() + ) ++ + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {connector, + mk(binary(), #{ + desc => ?DESC(emqx_connector_schema, "connector_field"), required => true + })} + ], + override_documentations(Fields); fields(Method) -> Fields = emqx_bridge_kafka:fields(Method), override_documentations(Fields). +desc("config") -> + ?DESC("desc_config"); +desc("config_connector") -> + ?DESC("desc_config"); desc("config_producer") -> ?DESC("desc_config"); desc("ssl_client_opts") -> emqx_schema:desc("ssl_client_opts"); -desc("get_producer") -> +desc("get_" ++ Type) when Type == "producer"; Type == "connector"; Type == "bridge_v2" -> ["Configuration for Azure Event Hub using `GET` method."]; -desc("put_producer") -> +desc("put_" ++ Type) when Type == "producer"; Type == "connector"; Type == "bridge_v2" -> ["Configuration for Azure Event Hub using `PUT` method."]; -desc("post_producer") -> +desc("post_" ++ Type) when Type == "producer"; Type == "connector"; Type == "bridge_v2" -> ["Configuration for Azure Event Hub using `POST` method."]; desc(Name) -> lists:member(Name, struct_names()) orelse throw({missing_desc, Name}), @@ -90,7 +154,28 @@ struct_names() -> [ auth_username_password, kafka_message, - producer_kafka_opts + producer_kafka_opts, + bridge_v2 + ]. + +bridge_v2_examples(Method) -> + [ + #{ + ?AEH_CONNECTOR_TYPE_BIN => #{ + summary => <<"Azure Event Hub Bridge v2">>, + value => values({Method, bridge_v2}) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + ?AEH_CONNECTOR_TYPE_BIN => #{ + summary => <<"Azure Event Hub Connector">>, + value => values({Method, connector}) + } + } ]. conn_bridge_examples(Method) -> @@ -104,11 +189,40 @@ conn_bridge_examples(Method) -> ]. values({get, AEHType}) -> - values({post, AEHType}); + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values({post, AEHType}) + ); +values({post, bridge_v2}) -> + maps:merge( + values(producer), + #{ + enable => true, + connector => <<"my_azure_event_hub_connector">>, + name => <<"my_azure_event_hub_bridge">>, + type => ?AEH_CONNECTOR_TYPE_BIN + } + ); values({post, AEHType}) -> maps:merge(values(common_config), values(AEHType)); values({put, AEHType}) -> values({post, AEHType}); +values(connector) -> + maps:merge( + values(common_config), + #{ + name => <<"my_azure_event_hub_connector">>, + type => ?AEH_CONNECTOR_TYPE_BIN + } + ); values(common_config) -> #{ authentication => #{ @@ -119,12 +233,14 @@ values(common_config) -> enable => true, metadata_request_timeout => <<"4s">>, min_metadata_refresh_interval => <<"3s">>, + name => <<"my_azure_event_hub_bridge">>, socket_opts => #{ sndbuf => <<"1024KB">>, recbuf => <<"1024KB">>, nodelay => true, tcp_keepalive => <<"none">> - } + }, + type => <<"azure_event_hub_producer">> }; values(producer) -> #{ @@ -163,7 +279,7 @@ values(producer) -> }. %%------------------------------------------------------------------------------------------------- -%% `emqx_bridge_resource' API +%% `emqx_connector_resource' API %%------------------------------------------------------------------------------------------------- connector_config(Config) -> @@ -182,6 +298,37 @@ connector_config(Config) -> ref(Name) -> hoconsc:ref(?MODULE, Name). +connector_overrides() -> + #{ + authentication => + mk( + ref(auth_username_password), + #{ + default => #{}, + required => true, + desc => ?DESC("authentication") + } + ), + bootstrap_hosts => + mk( + binary(), + #{ + required => true, + validator => emqx_schema:servers_validator( + host_opts(), _Required = true + ) + } + ), + ssl => mk(ref("ssl_client_opts"), #{default => #{<<"enable">> => true}}), + type => mk( + ?AEH_CONNECTOR_TYPE, + #{ + required => true, + desc => ?DESC("connector_type") + } + ) + }. + producer_overrides() -> #{ authentication => @@ -212,6 +359,22 @@ producer_overrides() -> type => mk(azure_event_hub_producer, #{required => true}) }. +bridge_v2_overrides() -> + #{ + kafka => + mk(ref(producer_kafka_opts), #{ + required => true, + validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1 + }), + ssl => mk(ref("ssl_client_opts"), #{default => #{<<"enable">> => true}}), + type => mk( + ?AEH_CONNECTOR_TYPE, + #{ + required => true, + desc => ?DESC("bridge_v2_type") + } + ) + }. auth_overrides() -> #{ mechanism => diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index d6ac057f2..071ddbe18 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -105,7 +105,7 @@ values(bridge_v2_producer) -> maps:merge( #{ enable => true, - connector => <<"my_connector">>, + connector => <<"my_kafka_connector">>, resource_opts => #{ health_check_interval => "32s" } diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 082198309..74e1a4c72 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -258,7 +258,7 @@ create_dry_run(Type, Conf0, Callback) -> %% Already typechecked, no need to catch errors TypeBin = bin(Type), TypeAtom = safe_atom(Type), - %% We use a fixed name here to avoid createing an atom + %% We use a fixed name here to avoid creating an atom TmpName = iolist_to_binary([?TEST_ID_PREFIX, TypeBin, ":", <<"probedryrun">>]), TmpPath = emqx_utils:safe_filename(TmpName), Conf1 = maps:without([<<"name">>], Conf0), diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 458c696fa..3d0b94674 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -19,18 +19,22 @@ ]). resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); -resource_type(kafka) -> emqx_bridge_kafka_impl_producer. +resource_type(kafka) -> emqx_bridge_kafka_impl_producer; +%% We use AEH's Kafka interface. +resource_type(azure_event_hub) -> emqx_bridge_kafka_impl_producer. %% For connectors that need to override connector configurations. connector_impl_module(ConnectorType) when is_binary(ConnectorType) -> connector_impl_module(binary_to_atom(ConnectorType, utf8)); +connector_impl_module(azure_event_hub) -> + emqx_bridge_azure_event_hub; connector_impl_module(_ConnectorType) -> undefined. fields(connectors) -> - kafka_structs(). + connector_structs(). -kafka_structs() -> +connector_structs() -> [ {kafka, mk( @@ -39,6 +43,14 @@ kafka_structs() -> desc => <<"Kafka Connector Config">>, required => false } + )}, + {azure_event_hub, + mk( + hoconsc:map(name, ref(emqx_bridge_azure_event_hub, "config_connector")), + #{ + desc => <<"Azure Event Hub Connector Config">>, + required => false + } )} ]. @@ -56,16 +68,16 @@ examples(Method) -> schema_modules() -> [ - emqx_bridge_kafka + emqx_bridge_kafka, + emqx_bridge_azure_event_hub ]. api_schemas(Method) -> [ %% We need to map the `type' field of a request (binary) to a %% connector schema module. - %% TODO: rename this to `kafka_producer' after alias support is added - %% to hocon; keeping this as just `kafka' for backwards compatibility. - api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_connector") + api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_connector"), + api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub">>, Method ++ "_connector") ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index dce1eed88..276539604 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -56,7 +56,8 @@ enterprise_fields_connectors() -> []. -endif. -connector_type_to_bridge_types(kafka) -> [kafka]. +connector_type_to_bridge_types(kafka) -> [kafka]; +connector_type_to_bridge_types(azure_event_hub) -> [azure_event_hub]. actions_config_name() -> <<"bridges_v2">>. diff --git a/rel/i18n/emqx_bridge_azure_event_hub.hocon b/rel/i18n/emqx_bridge_azure_event_hub.hocon index a0ccb0f2f..6534e7471 100644 --- a/rel/i18n/emqx_bridge_azure_event_hub.hocon +++ b/rel/i18n/emqx_bridge_azure_event_hub.hocon @@ -183,6 +183,23 @@ authentication.desc: authentication.label: """Authentication""" +connector_type.label: +"""Connector Type""" + +connector_type.desc: +"""The type of the connector.""" + +bridge_v2_type.label: +"""Bridge Type""" + +bridge_v2_type.desc: +"""The type of the bridge.""" + +bridge_v2.label: +"""Bridge v2 Config""" +bridge_v2.desc: +"""The configuration for a bridge v2.""" + buffer_memory_overload_protection.desc: """Applicable when buffer mode is set to memory EMQX will drop old buffered messages under high memory pressure. The high memory threshold is defined in config sysmon.os.sysmem_high_watermark. NOTE: This config only works on Linux."""