diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl
index 1ca2f8b18..9f5600cfa 100644
--- a/apps/emqx_bridge/src/emqx_bridge_v2.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl
@@ -620,7 +620,9 @@ id(BridgeType, BridgeName, ConnectorName) ->
bridge_v2_type_to_connector_type(Bin) when is_binary(Bin) ->
?MODULE:bridge_v2_type_to_connector_type(binary_to_existing_atom(Bin));
bridge_v2_type_to_connector_type(kafka) ->
- kafka.
+ kafka;
+bridge_v2_type_to_connector_type(azure_event_hub) ->
+ azure_event_hub.
%%====================================================================
%% Config Update Handler API
@@ -799,12 +801,16 @@ is_valid_bridge_v1(BridgeV1Type, BridgeName) ->
bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) ->
?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin));
bridge_v1_type_to_bridge_v2_type(kafka) ->
- kafka.
+ kafka;
+bridge_v1_type_to_bridge_v2_type(azure_event_hub) ->
+ azure_event_hub.
is_bridge_v2_type(Atom) when is_atom(Atom) ->
is_bridge_v2_type(atom_to_binary(Atom, utf8));
is_bridge_v2_type(<<"kafka">>) ->
true;
+is_bridge_v2_type(<<"azure_event_hub">>) ->
+ true;
is_bridge_v2_type(_) ->
false.
diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl
index 3614b8695..48fb08911 100644
--- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl
+++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl
@@ -27,13 +27,14 @@ examples(Method) ->
schema_modules() ->
[
- emqx_bridge_kafka
+ emqx_bridge_kafka,
+ emqx_bridge_azure_event_hub
].
fields(bridges_v2) ->
- kafka_structs().
+ bridge_v2_structs().
-kafka_structs() ->
+bridge_v2_structs() ->
[
{kafka,
mk(
@@ -42,6 +43,14 @@ kafka_structs() ->
desc => <<"Kafka Producer Bridge V2 Config">>,
required => false
}
+ )},
+ {azure_event_hub,
+ mk(
+ hoconsc:map(name, ref(emqx_bridge_azure_event_hub, bridge_v2)),
+ #{
+ desc => <<"Azure Event Hub Bridge V2 Config">>,
+ required => false
+ }
)}
].
@@ -51,7 +60,8 @@ api_schemas(Method) ->
%% connector schema module.
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
- api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_bridge_v2")
+ api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_bridge_v2"),
+ api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub">>, Method ++ "_bridge_v2")
].
api_ref(Module, Type, Method) ->
diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src
index 43033b657..ece0495f9 100644
--- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src
+++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_azure_event_hub, [
{description, "EMQX Enterprise Azure Event Hub Bridge"},
- {vsn, "0.1.2"},
+ {vsn, "0.1.3"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl
index abdc6a265..9ffbd9862 100644
--- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl
+++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl
@@ -7,7 +7,7 @@
-include_lib("hocon/include/hoconsc.hrl").
-behaviour(hocon_schema).
--behaviour(emqx_bridge_resource).
+-behaviour(emqx_connector_resource).
%% `hocon_schema' API
-export([
@@ -18,14 +18,22 @@
]).
%% emqx_bridge_enterprise "unofficial" API
--export([conn_bridge_examples/1]).
+-export([
+ bridge_v2_examples/1,
+ conn_bridge_examples/1,
+ connector_examples/1
+]).
+%% emqx_connector_resource behaviour callbacks
-export([connector_config/1]).
-export([producer_converter/2, host_opts/0]).
-import(hoconsc, [mk/2, enum/1, ref/2]).
+-define(AEH_CONNECTOR_TYPE, azure_event_hub).
+-define(AEH_CONNECTOR_TYPE_BIN, <<"azure_event_hub">>).
+
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
@@ -34,12 +42,50 @@ namespace() -> "bridge_azure_event_hub".
roots() -> ["config_producer"].
+fields("put_connector") ->
+ Fields = override(
+ emqx_bridge_kafka:fields("put_connector"),
+ connector_overrides()
+ ),
+ override_documentations(Fields);
+fields("get_connector") ->
+ emqx_bridge_schema:status_fields() ++
+ fields("post_connector");
+fields("post_connector") ->
+ Fields = override(
+ emqx_bridge_kafka:fields("post_connector"),
+ connector_overrides()
+ ),
+ override_documentations(Fields);
+fields("put_bridge_v2") ->
+ Fields = override(
+ emqx_bridge_kafka:fields("put_bridge_v2"),
+ bridge_v2_overrides()
+ ),
+ override_documentations(Fields);
+fields("get_bridge_v2") ->
+ emqx_bridge_schema:status_fields() ++
+ fields("post_bridge_v2");
+fields("post_bridge_v2") ->
+ Fields = override(
+ emqx_bridge_kafka:fields("post_bridge_v2"),
+ bridge_v2_overrides()
+ ),
+ override_documentations(Fields);
fields("post_producer") ->
Fields = override(
emqx_bridge_kafka:fields("post_producer"),
producer_overrides()
),
override_documentations(Fields);
+fields("config_bridge_v2") ->
+ fields(bridge_v2);
+fields("config_connector") ->
+ Fields = override(
+ emqx_bridge_kafka:fields(kafka_connector),
+ connector_overrides()
+ ),
+ override_documentations(Fields);
fields("config_producer") ->
Fields = override(
emqx_bridge_kafka:fields(kafka_producer),
@@ -68,19 +114,37 @@ fields(kafka_message) ->
Fields0 = emqx_bridge_kafka:fields(kafka_message),
Fields = proplists:delete(timestamp, Fields0),
override_documentations(Fields);
+fields(bridge_v2) ->
+ Fields =
+ override(
+ emqx_bridge_kafka:fields(producer_opts),
+ bridge_v2_overrides()
+ ) ++
+ [
+ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+ {connector,
+ mk(binary(), #{
+ desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
+ })}
+ ],
+ override_documentations(Fields);
fields(Method) ->
Fields = emqx_bridge_kafka:fields(Method),
override_documentations(Fields).
+desc("config") ->
+ ?DESC("desc_config");
+desc("config_connector") ->
+ ?DESC("desc_config");
desc("config_producer") ->
?DESC("desc_config");
desc("ssl_client_opts") ->
emqx_schema:desc("ssl_client_opts");
-desc("get_producer") ->
+desc("get_" ++ Type) when Type == "producer"; Type == "connector"; Type == "bridge_v2" ->
["Configuration for Azure Event Hub using `GET` method."];
-desc("put_producer") ->
+desc("put_" ++ Type) when Type == "producer"; Type == "connector"; Type == "bridge_v2" ->
["Configuration for Azure Event Hub using `PUT` method."];
-desc("post_producer") ->
+desc("post_" ++ Type) when Type == "producer"; Type == "connector"; Type == "bridge_v2" ->
["Configuration for Azure Event Hub using `POST` method."];
desc(Name) ->
lists:member(Name, struct_names()) orelse throw({missing_desc, Name}),
@@ -90,7 +154,28 @@ struct_names() ->
[
auth_username_password,
kafka_message,
- producer_kafka_opts
+ producer_kafka_opts,
+ bridge_v2
+ ].
+
+bridge_v2_examples(Method) ->
+ [
+ #{
+ ?AEH_CONNECTOR_TYPE_BIN => #{
+ summary => <<"Azure Event Hub Bridge v2">>,
+ value => values({Method, bridge_v2})
+ }
+ }
+ ].
+
+connector_examples(Method) ->
+ [
+ #{
+ ?AEH_CONNECTOR_TYPE_BIN => #{
+ summary => <<"Azure Event Hub Connector">>,
+ value => values({Method, connector})
+ }
+ }
].
conn_bridge_examples(Method) ->
@@ -104,11 +189,40 @@ conn_bridge_examples(Method) ->
].
values({get, AEHType}) ->
- values({post, AEHType});
+ maps:merge(
+ #{
+ status => <<"connected">>,
+ node_status => [
+ #{
+ node => <<"emqx@localhost">>,
+ status => <<"connected">>
+ }
+ ]
+ },
+ values({post, AEHType})
+ );
+values({post, bridge_v2}) ->
+ maps:merge(
+ values(producer),
+ #{
+ enable => true,
+ connector => <<"my_azure_event_hub_connector">>,
+ name => <<"my_azure_event_hub_bridge">>,
+ type => ?AEH_CONNECTOR_TYPE_BIN
+ }
+ );
values({post, AEHType}) ->
maps:merge(values(common_config), values(AEHType));
values({put, AEHType}) ->
values({post, AEHType});
+values(connector) ->
+ maps:merge(
+ values(common_config),
+ #{
+ name => <<"my_azure_event_hub_connector">>,
+ type => ?AEH_CONNECTOR_TYPE_BIN
+ }
+ );
values(common_config) ->
#{
authentication => #{
@@ -119,12 +233,14 @@ values(common_config) ->
enable => true,
metadata_request_timeout => <<"4s">>,
min_metadata_refresh_interval => <<"3s">>,
+ name => <<"my_azure_event_hub_bridge">>,
socket_opts => #{
sndbuf => <<"1024KB">>,
recbuf => <<"1024KB">>,
nodelay => true,
tcp_keepalive => <<"none">>
- }
+ },
+ type => <<"azure_event_hub_producer">>
};
values(producer) ->
#{
@@ -163,7 +279,7 @@ values(producer) ->
}.
%%-------------------------------------------------------------------------------------------------
-%% `emqx_bridge_resource' API
+%% `emqx_connector_resource' API
%%-------------------------------------------------------------------------------------------------
connector_config(Config) ->
@@ -182,6 +298,37 @@ connector_config(Config) ->
ref(Name) ->
hoconsc:ref(?MODULE, Name).
+connector_overrides() ->
+ #{
+ authentication =>
+ mk(
+ ref(auth_username_password),
+ #{
+ default => #{},
+ required => true,
+ desc => ?DESC("authentication")
+ }
+ ),
+ bootstrap_hosts =>
+ mk(
+ binary(),
+ #{
+ required => true,
+ validator => emqx_schema:servers_validator(
+ host_opts(), _Required = true
+ )
+ }
+ ),
+ ssl => mk(ref("ssl_client_opts"), #{default => #{<<"enable">> => true}}),
+ type => mk(
+ ?AEH_CONNECTOR_TYPE,
+ #{
+ required => true,
+ desc => ?DESC("connector_type")
+ }
+ )
+ }.
+
producer_overrides() ->
#{
authentication =>
@@ -212,6 +359,22 @@ producer_overrides() ->
type => mk(azure_event_hub_producer, #{required => true})
}.
+bridge_v2_overrides() ->
+ #{
+ kafka =>
+ mk(ref(producer_kafka_opts), #{
+ required => true,
+ validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
+ }),
+ ssl => mk(ref("ssl_client_opts"), #{default => #{<<"enable">> => true}}),
+ type => mk(
+ ?AEH_CONNECTOR_TYPE,
+ #{
+ required => true,
+ desc => ?DESC("bridge_v2_type")
+ }
+ )
+ }.
auth_overrides() ->
#{
mechanism =>
diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
index d6ac057f2..071ddbe18 100644
--- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
+++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
@@ -105,7 +105,7 @@ values(bridge_v2_producer) ->
maps:merge(
#{
enable => true,
- connector => <<"my_connector">>,
+ connector => <<"my_kafka_connector">>,
resource_opts => #{
health_check_interval => "32s"
}
diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl
index 082198309..74e1a4c72 100644
--- a/apps/emqx_connector/src/emqx_connector_resource.erl
+++ b/apps/emqx_connector/src/emqx_connector_resource.erl
@@ -258,7 +258,7 @@ create_dry_run(Type, Conf0, Callback) ->
%% Already typechecked, no need to catch errors
TypeBin = bin(Type),
TypeAtom = safe_atom(Type),
- %% We use a fixed name here to avoid createing an atom
+ %% We use a fixed name here to avoid creating an atom
TmpName = iolist_to_binary([?TEST_ID_PREFIX, TypeBin, ":", <<"probedryrun">>]),
TmpPath = emqx_utils:safe_filename(TmpName),
Conf1 = maps:without([<<"name">>], Conf0),
diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
index 458c696fa..3d0b94674 100644
--- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
+++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
@@ -19,18 +19,22 @@
]).
resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
-resource_type(kafka) -> emqx_bridge_kafka_impl_producer.
+resource_type(kafka) -> emqx_bridge_kafka_impl_producer;
+%% We use AEH's Kafka interface.
+resource_type(azure_event_hub) -> emqx_bridge_kafka_impl_producer.
%% For connectors that need to override connector configurations.
connector_impl_module(ConnectorType) when is_binary(ConnectorType) ->
connector_impl_module(binary_to_atom(ConnectorType, utf8));
+connector_impl_module(azure_event_hub) ->
+ emqx_bridge_azure_event_hub;
connector_impl_module(_ConnectorType) ->
undefined.
fields(connectors) ->
- kafka_structs().
+ connector_structs().
-kafka_structs() ->
+connector_structs() ->
[
{kafka,
mk(
@@ -39,6 +43,14 @@ kafka_structs() ->
desc => <<"Kafka Connector Config">>,
required => false
}
+ )},
+ {azure_event_hub,
+ mk(
+ hoconsc:map(name, ref(emqx_bridge_azure_event_hub, "config_connector")),
+ #{
+ desc => <<"Azure Event Hub Connector Config">>,
+ required => false
+ }
)}
].
@@ -56,16 +68,16 @@ examples(Method) ->
schema_modules() ->
[
- emqx_bridge_kafka
+ emqx_bridge_kafka,
+ emqx_bridge_azure_event_hub
].
api_schemas(Method) ->
[
%% We need to map the `type' field of a request (binary) to a
%% connector schema module.
- %% TODO: rename this to `kafka_producer' after alias support is added
- %% to hocon; keeping this as just `kafka' for backwards compatibility.
- api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_connector")
+ api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_connector"),
+ api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub">>, Method ++ "_connector")
].
api_ref(Module, Type, Method) ->
diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl
index dce1eed88..276539604 100644
--- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl
@@ -56,7 +56,8 @@ enterprise_fields_connectors() -> [].
-endif.
-connector_type_to_bridge_types(kafka) -> [kafka].
+connector_type_to_bridge_types(kafka) -> [kafka];
+connector_type_to_bridge_types(azure_event_hub) -> [azure_event_hub].
actions_config_name() -> <<"bridges_v2">>.
diff --git a/rel/i18n/emqx_bridge_azure_event_hub.hocon b/rel/i18n/emqx_bridge_azure_event_hub.hocon
index a0ccb0f2f..6534e7471 100644
--- a/rel/i18n/emqx_bridge_azure_event_hub.hocon
+++ b/rel/i18n/emqx_bridge_azure_event_hub.hocon
@@ -183,6 +183,23 @@ authentication.desc:
authentication.label:
"""Authentication"""
+connector_type.label:
+"""Connector Type"""
+
+connector_type.desc:
+"""The type of the connector."""
+
+bridge_v2_type.label:
+"""Bridge Type"""
+
+bridge_v2_type.desc:
+"""The type of the bridge."""
+
+bridge_v2.label:
+"""Bridge v2 Config"""
+bridge_v2.desc:
+"""The configuration for a bridge v2."""
+
buffer_memory_overload_protection.desc:
"""Applicable when buffer mode is set to memory
EMQX will drop old buffered messages under high memory pressure. The high memory threshold is defined in config sysmon.os.sysmem_high_watermark
. NOTE: This config only works on Linux."""