feat: port azure event hub to bridge_v2

This commit is contained in:
Stefan Strigler 2023-10-24 20:55:48 +02:00 committed by Zaiming (Stone) Shi
parent f760f0a5c5
commit 2228a0d477
9 changed files with 235 additions and 26 deletions

View File

@ -620,7 +620,9 @@ id(BridgeType, BridgeName, ConnectorName) ->
bridge_v2_type_to_connector_type(Bin) when is_binary(Bin) ->
?MODULE:bridge_v2_type_to_connector_type(binary_to_existing_atom(Bin));
bridge_v2_type_to_connector_type(kafka) ->
kafka.
kafka;
bridge_v2_type_to_connector_type(azure_event_hub) ->
azure_event_hub.
%%====================================================================
%% Config Update Handler API
@ -799,12 +801,16 @@ is_valid_bridge_v1(BridgeV1Type, BridgeName) ->
bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) ->
?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin));
bridge_v1_type_to_bridge_v2_type(kafka) ->
kafka.
kafka;
bridge_v1_type_to_bridge_v2_type(azure_event_hub) ->
azure_event_hub.
is_bridge_v2_type(Atom) when is_atom(Atom) ->
is_bridge_v2_type(atom_to_binary(Atom, utf8));
is_bridge_v2_type(<<"kafka">>) ->
true;
is_bridge_v2_type(<<"azure_event_hub">>) ->
true;
is_bridge_v2_type(_) ->
false.

View File

@ -27,13 +27,14 @@ examples(Method) ->
schema_modules() ->
[
emqx_bridge_kafka
emqx_bridge_kafka,
emqx_bridge_azure_event_hub
].
fields(bridges_v2) ->
kafka_structs().
bridge_v2_structs().
kafka_structs() ->
bridge_v2_structs() ->
[
{kafka,
mk(
@ -42,6 +43,14 @@ kafka_structs() ->
desc => <<"Kafka Producer Bridge V2 Config">>,
required => false
}
)},
{azure_event_hub,
mk(
hoconsc:map(name, ref(emqx_bridge_azure_event_hub, bridge_v2)),
#{
desc => <<"Azure Event Hub Bridge V2 Config">>,
required => false
}
)}
].
@ -51,7 +60,8 @@ api_schemas(Method) ->
%% connector schema module.
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_bridge_v2")
api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_bridge_v2"),
api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub">>, Method ++ "_bridge_v2")
].
api_ref(Module, Type, Method) ->

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_azure_event_hub, [
{description, "EMQX Enterprise Azure Event Hub Bridge"},
{vsn, "0.1.2"},
{vsn, "0.1.3"},
{registered, []},
{applications, [
kernel,

View File

@ -7,7 +7,7 @@
-include_lib("hocon/include/hoconsc.hrl").
-behaviour(hocon_schema).
-behaviour(emqx_bridge_resource).
-behaviour(emqx_connector_resource).
%% `hocon_schema' API
-export([
@ -18,14 +18,22 @@
]).
%% emqx_bridge_enterprise "unofficial" API
-export([conn_bridge_examples/1]).
-export([
bridge_v2_examples/1,
conn_bridge_examples/1,
connector_examples/1
]).
%% emqx_connector_resource behaviour callbacks
-export([connector_config/1]).
-export([producer_converter/2, host_opts/0]).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-define(AEH_CONNECTOR_TYPE, azure_event_hub).
-define(AEH_CONNECTOR_TYPE_BIN, <<"azure_event_hub">>).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
@ -34,12 +42,50 @@ namespace() -> "bridge_azure_event_hub".
roots() -> ["config_producer"].
fields("put_connector") ->
Fields = override(
emqx_bridge_kafka:fields("put_connector"),
connector_overrides()
),
override_documentations(Fields);
fields("get_connector") ->
emqx_bridge_schema:status_fields() ++
fields("post_connector");
fields("post_connector") ->
Fields = override(
emqx_bridge_kafka:fields("post_connector"),
connector_overrides()
),
override_documentations(Fields);
fields("put_bridge_v2") ->
Fields = override(
emqx_bridge_kafka:fields("put_bridge_v2"),
bridge_v2_overrides()
),
override_documentations(Fields);
fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++
fields("post_bridge_v2");
fields("post_bridge_v2") ->
Fields = override(
emqx_bridge_kafka:fields("post_bridge_v2"),
bridge_v2_overrides()
),
override_documentations(Fields);
fields("post_producer") ->
Fields = override(
emqx_bridge_kafka:fields("post_producer"),
producer_overrides()
),
override_documentations(Fields);
fields("config_bridge_v2") ->
fields(bridge_v2);
fields("config_connector") ->
Fields = override(
emqx_bridge_kafka:fields(kafka_connector),
connector_overrides()
),
override_documentations(Fields);
fields("config_producer") ->
Fields = override(
emqx_bridge_kafka:fields(kafka_producer),
@ -68,19 +114,37 @@ fields(kafka_message) ->
Fields0 = emqx_bridge_kafka:fields(kafka_message),
Fields = proplists:delete(timestamp, Fields0),
override_documentations(Fields);
fields(bridge_v2) ->
Fields =
override(
emqx_bridge_kafka:fields(producer_opts),
bridge_v2_overrides()
) ++
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{connector,
mk(binary(), #{
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})}
],
override_documentations(Fields);
fields(Method) ->
Fields = emqx_bridge_kafka:fields(Method),
override_documentations(Fields).
desc("config") ->
?DESC("desc_config");
desc("config_connector") ->
?DESC("desc_config");
desc("config_producer") ->
?DESC("desc_config");
desc("ssl_client_opts") ->
emqx_schema:desc("ssl_client_opts");
desc("get_producer") ->
desc("get_" ++ Type) when Type == "producer"; Type == "connector"; Type == "bridge_v2" ->
["Configuration for Azure Event Hub using `GET` method."];
desc("put_producer") ->
desc("put_" ++ Type) when Type == "producer"; Type == "connector"; Type == "bridge_v2" ->
["Configuration for Azure Event Hub using `PUT` method."];
desc("post_producer") ->
desc("post_" ++ Type) when Type == "producer"; Type == "connector"; Type == "bridge_v2" ->
["Configuration for Azure Event Hub using `POST` method."];
desc(Name) ->
lists:member(Name, struct_names()) orelse throw({missing_desc, Name}),
@ -90,7 +154,28 @@ struct_names() ->
[
auth_username_password,
kafka_message,
producer_kafka_opts
producer_kafka_opts,
bridge_v2
].
bridge_v2_examples(Method) ->
[
#{
?AEH_CONNECTOR_TYPE_BIN => #{
summary => <<"Azure Event Hub Bridge v2">>,
value => values({Method, bridge_v2})
}
}
].
connector_examples(Method) ->
[
#{
?AEH_CONNECTOR_TYPE_BIN => #{
summary => <<"Azure Event Hub Connector">>,
value => values({Method, connector})
}
}
].
conn_bridge_examples(Method) ->
@ -104,11 +189,40 @@ conn_bridge_examples(Method) ->
].
values({get, AEHType}) ->
values({post, AEHType});
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
},
values({post, AEHType})
);
values({post, bridge_v2}) ->
maps:merge(
values(producer),
#{
enable => true,
connector => <<"my_azure_event_hub_connector">>,
name => <<"my_azure_event_hub_bridge">>,
type => ?AEH_CONNECTOR_TYPE_BIN
}
);
values({post, AEHType}) ->
maps:merge(values(common_config), values(AEHType));
values({put, AEHType}) ->
values({post, AEHType});
values(connector) ->
maps:merge(
values(common_config),
#{
name => <<"my_azure_event_hub_connector">>,
type => ?AEH_CONNECTOR_TYPE_BIN
}
);
values(common_config) ->
#{
authentication => #{
@ -119,12 +233,14 @@ values(common_config) ->
enable => true,
metadata_request_timeout => <<"4s">>,
min_metadata_refresh_interval => <<"3s">>,
name => <<"my_azure_event_hub_bridge">>,
socket_opts => #{
sndbuf => <<"1024KB">>,
recbuf => <<"1024KB">>,
nodelay => true,
tcp_keepalive => <<"none">>
}
},
type => <<"azure_event_hub_producer">>
};
values(producer) ->
#{
@ -163,7 +279,7 @@ values(producer) ->
}.
%%-------------------------------------------------------------------------------------------------
%% `emqx_bridge_resource' API
%% `emqx_connector_resource' API
%%-------------------------------------------------------------------------------------------------
connector_config(Config) ->
@ -182,6 +298,37 @@ connector_config(Config) ->
ref(Name) ->
hoconsc:ref(?MODULE, Name).
connector_overrides() ->
#{
authentication =>
mk(
ref(auth_username_password),
#{
default => #{},
required => true,
desc => ?DESC("authentication")
}
),
bootstrap_hosts =>
mk(
binary(),
#{
required => true,
validator => emqx_schema:servers_validator(
host_opts(), _Required = true
)
}
),
ssl => mk(ref("ssl_client_opts"), #{default => #{<<"enable">> => true}}),
type => mk(
?AEH_CONNECTOR_TYPE,
#{
required => true,
desc => ?DESC("connector_type")
}
)
}.
producer_overrides() ->
#{
authentication =>
@ -212,6 +359,22 @@ producer_overrides() ->
type => mk(azure_event_hub_producer, #{required => true})
}.
bridge_v2_overrides() ->
#{
kafka =>
mk(ref(producer_kafka_opts), #{
required => true,
validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
}),
ssl => mk(ref("ssl_client_opts"), #{default => #{<<"enable">> => true}}),
type => mk(
?AEH_CONNECTOR_TYPE,
#{
required => true,
desc => ?DESC("bridge_v2_type")
}
)
}.
auth_overrides() ->
#{
mechanism =>

View File

@ -105,7 +105,7 @@ values(bridge_v2_producer) ->
maps:merge(
#{
enable => true,
connector => <<"my_connector">>,
connector => <<"my_kafka_connector">>,
resource_opts => #{
health_check_interval => "32s"
}

View File

@ -258,7 +258,7 @@ create_dry_run(Type, Conf0, Callback) ->
%% Already typechecked, no need to catch errors
TypeBin = bin(Type),
TypeAtom = safe_atom(Type),
%% We use a fixed name here to avoid createing an atom
%% We use a fixed name here to avoid creating an atom
TmpName = iolist_to_binary([?TEST_ID_PREFIX, TypeBin, ":", <<"probedryrun">>]),
TmpPath = emqx_utils:safe_filename(TmpName),
Conf1 = maps:without([<<"name">>], Conf0),

View File

@ -19,18 +19,22 @@
]).
resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
resource_type(kafka) -> emqx_bridge_kafka_impl_producer.
resource_type(kafka) -> emqx_bridge_kafka_impl_producer;
%% We use AEH's Kafka interface.
resource_type(azure_event_hub) -> emqx_bridge_kafka_impl_producer.
%% For connectors that need to override connector configurations.
connector_impl_module(ConnectorType) when is_binary(ConnectorType) ->
connector_impl_module(binary_to_atom(ConnectorType, utf8));
connector_impl_module(azure_event_hub) ->
emqx_bridge_azure_event_hub;
connector_impl_module(_ConnectorType) ->
undefined.
fields(connectors) ->
kafka_structs().
connector_structs().
kafka_structs() ->
connector_structs() ->
[
{kafka,
mk(
@ -39,6 +43,14 @@ kafka_structs() ->
desc => <<"Kafka Connector Config">>,
required => false
}
)},
{azure_event_hub,
mk(
hoconsc:map(name, ref(emqx_bridge_azure_event_hub, "config_connector")),
#{
desc => <<"Azure Event Hub Connector Config">>,
required => false
}
)}
].
@ -56,16 +68,16 @@ examples(Method) ->
schema_modules() ->
[
emqx_bridge_kafka
emqx_bridge_kafka,
emqx_bridge_azure_event_hub
].
api_schemas(Method) ->
[
%% We need to map the `type' field of a request (binary) to a
%% connector schema module.
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_connector")
api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_connector"),
api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub">>, Method ++ "_connector")
].
api_ref(Module, Type, Method) ->

View File

@ -56,7 +56,8 @@ enterprise_fields_connectors() -> [].
-endif.
connector_type_to_bridge_types(kafka) -> [kafka].
connector_type_to_bridge_types(kafka) -> [kafka];
connector_type_to_bridge_types(azure_event_hub) -> [azure_event_hub].
actions_config_name() -> <<"bridges_v2">>.

View File

@ -183,6 +183,23 @@ authentication.desc:
authentication.label:
"""Authentication"""
connector_type.label:
"""Connector Type"""
connector_type.desc:
"""The type of the connector."""
bridge_v2_type.label:
"""Bridge Type"""
bridge_v2_type.desc:
"""The type of the bridge."""
bridge_v2.label:
"""Bridge v2 Config"""
bridge_v2.desc:
"""The configuration for a bridge v2."""
buffer_memory_overload_protection.desc:
"""Applicable when buffer mode is set to <code>memory</code>
EMQX will drop old buffered messages under high memory pressure. The high memory threshold is defined in config <code>sysmon.os.sysmem_high_watermark</code>. NOTE: This config only works on Linux."""