refactor(kafka connectors): to use emqx_connector_info

This commit refactors the `emqx_bridge_kafka` to use the
`emqx_connector_info` behavior. The `emqx_bridge_kafka` related
information can thus be removed from `emqx_connector_chema` and
`emqx_connector_resource`.
This commit is contained in:
Kjell Winblad 2024-03-19 13:46:03 +01:00
parent b8ef357fef
commit a3e631cda2
6 changed files with 93 additions and 29 deletions

View File

@ -16,6 +16,10 @@
{emqx_action_info_modules, [
emqx_bridge_kafka_action_info,
emqx_bridge_kafka_consumer_action_info
]},
{emqx_connector_info_modules, [
emqx_bridge_kafka_consumer_connector_info,
emqx_bridge_kafka_connector_info
]}
]},
{modules, []},

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka_connector_info).
-behaviour(emqx_connector_info).
-export([
type_name/0,
bridge_types/0,
resource_callback_module/0,
config_schema/0,
schema_module/0,
api_schema/1
]).
type_name() ->
kafka_producer.
bridge_types() ->
[kafka, kafka_producer].
resource_callback_module() ->
emqx_bridge_kafka_impl_producer.
config_schema() ->
{kafka_producer,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_kafka, "config_connector")),
#{
desc => <<"Kafka Producer Connector Config">>,
required => false
}
)}.
schema_module() ->
emqx_bridge_kafka.
api_schema(Method) ->
emqx_connector_schema:api_ref(
emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"
).

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka_consumer_connector_info).
-behaviour(emqx_connector_info).
-export([
type_name/0,
bridge_types/0,
resource_callback_module/0,
config_schema/0,
schema_module/0,
api_schema/1
]).
type_name() ->
kafka_consumer.
bridge_types() ->
[kafka_consumer].
resource_callback_module() ->
emqx_bridge_kafka_impl_consumer.
config_schema() ->
{kafka_consumer,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_kafka_consumer_schema, "config_connector")),
#{
desc => <<"Kafka Consumer Connector Config">>,
required => false
}
)}.
schema_module() ->
emqx_bridge_kafka_consumer_schema.
api_schema(Method) ->
emqx_connector_schema:api_ref(
emqx_bridge_kafka_consumer_schema, <<"kafka_consumer">>, Method ++ "_connector"
).

View File

@ -61,7 +61,9 @@ hard_coded_connector_info_modules_ee() ->
emqx_bridge_confluent_producer_connector_info,
emqx_bridge_gcp_pubsub_consumer_connector_info,
emqx_bridge_gcp_pubsub_producer_connector_info,
emqx_bridge_hstreamdb_connector_info
emqx_bridge_hstreamdb_connector_info,
emqx_bridge_kafka_consumer_connector_info,
emqx_bridge_kafka_connector_info
].
-else.
hard_coded_connector_info_modules_ee() ->

View File

@ -21,10 +21,6 @@
resource_type(Type) when is_binary(Type) ->
resource_type(binary_to_atom(Type, utf8));
resource_type(kafka_consumer) ->
emqx_bridge_kafka_impl_consumer;
resource_type(kafka_producer) ->
emqx_bridge_kafka_impl_producer;
resource_type(kinesis) ->
emqx_bridge_kinesis_impl_producer;
resource_type(matrix) ->
@ -99,22 +95,6 @@ fields(connectors) ->
connector_structs() ->
[
{kafka_consumer,
mk(
hoconsc:map(name, ref(emqx_bridge_kafka_consumer_schema, "config_connector")),
#{
desc => <<"Kafka Consumer Connector Config">>,
required => false
}
)},
{kafka_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")),
#{
desc => <<"Kafka Producer Connector Config">>,
required => false
}
)},
{kinesis,
mk(
hoconsc:map(name, ref(emqx_bridge_kinesis, "config_connector")),
@ -304,8 +284,6 @@ connector_structs() ->
schema_modules() ->
[
emqx_bridge_kafka,
emqx_bridge_kafka_consumer_schema,
emqx_bridge_kinesis,
emqx_bridge_matrix,
emqx_bridge_mongodb,
@ -335,8 +313,6 @@ api_schemas(Method) ->
[
%% We need to map the `type' field of a request (binary) to a
%% connector schema module.
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
api_ref(emqx_bridge_kafka_consumer_schema, <<"kafka_consumer">>, Method ++ "_connector"),
api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"),
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),

View File

@ -127,10 +127,6 @@ connector_info_schema_modules() ->
%% @doc Return old bridge(v1) and/or connector(v2) type
%% from the latest connector type name.
connector_type_to_bridge_types(kafka_consumer) ->
[kafka_consumer];
connector_type_to_bridge_types(kafka_producer) ->
[kafka, kafka_producer];
connector_type_to_bridge_types(kinesis) ->
[kinesis, kinesis_producer];
connector_type_to_bridge_types(matrix) ->