refactor(azure_event_hub connector): to use emqx_connector_info

This commit refactors the emqx_bridge_azure_event_hub to use the
emqx_connector_info behavior. The emqx_bridge_azure_event_hub related
information can thus be removed from emqx_connector_ee_schema and
emqx_connector_schema.
This commit is contained in:
Kjell Winblad 2024-03-18 16:51:32 +01:00
parent af3a604354
commit 795b668c4f
6 changed files with 108 additions and 29 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_azure_event_hub, [ {application, emqx_bridge_azure_event_hub, [
{description, "EMQX Enterprise Azure Event Hub Bridge"}, {description, "EMQX Enterprise Azure Event Hub Bridge"},
{vsn, "0.1.6"}, {vsn, "0.1.7"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,
@ -9,7 +9,10 @@
telemetry, telemetry,
wolff wolff
]}, ]},
{env, [{emqx_action_info_modules, [emqx_bridge_azure_event_hub_action_info]}]}, {env, [
{emqx_action_info_modules, [emqx_bridge_azure_event_hub_action_info]},
{emqx_connector_info_modules, [emqx_bridge_azure_event_hub_connector_info]}
]},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -0,0 +1,58 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% @doc
%% Implementation of emqx_connector_info for Azure Event Hub connector.
%% This module provides connector-specific information and configurations
%% required by the emqx_connector application.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_event_hub_connector_info).
-behaviour(emqx_connector_info).
%% API exports.
-export([
type_name/0,
bridge_types/0,
resource_callback_module/0,
config_transform_module/0,
config_schema/0,
schema_module/0,
api_schema/1
]).
%%--------------------------------------------------------------------
%% API Functions
%%--------------------------------------------------------------------
type_name() ->
azure_event_hub_producer.
bridge_types() ->
[azure_event_hub_producer].
resource_callback_module() ->
emqx_bridge_kafka_impl_producer.
config_transform_module() ->
emqx_bridge_azure_event_hub.
config_schema() ->
{azure_event_hub_producer,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_azure_event_hub, "config_connector")),
#{
desc => <<"Azure Event Hub Connector Config">>,
required => false
}
)}.
schema_module() ->
emqx_bridge_azure_event_hub.
api_schema(Method) ->
emqx_connector_schema:api_ref(
emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_connector"
).

View File

@ -25,7 +25,8 @@
resource_callback_module/1, resource_callback_module/1,
schema_module/1, schema_module/1,
config_schema/1, config_schema/1,
api_schema/2 api_schema/2,
config_transform_module/1
]). ]).
-export([clean_cache/0]). -export([clean_cache/0]).
@ -36,6 +37,15 @@
-callback schema_module() -> atom(). -callback schema_module() -> atom().
-callback config_schema() -> term(). -callback config_schema() -> term().
-callback api_schema([char()]) -> term(). -callback api_schema([char()]) -> term().
%% Optional callback that should return a module with an exported
%% connector_config/2 function. If present this function will be used to
%% transfrom the connector configuration. See the callback connector_config/2
%% in emqx_connector_resource for more information.
-callback config_transform_module() -> atom().
-optional_callbacks([
config_transform_module/0
]).
%% ==================================================================== %% ====================================================================
%% HardCoded list of info modules for connectors %% HardCoded list of info modules for connectors
@ -46,7 +56,8 @@
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
hard_coded_connector_info_modules_ee() -> hard_coded_connector_info_modules_ee() ->
[ [
emqx_bridge_dynamo_connector_info emqx_bridge_dynamo_connector_info,
emqx_bridge_azure_event_hub_connector_info
]. ].
-else. -else.
hard_coded_connector_info_modules_ee() -> hard_coded_connector_info_modules_ee() ->
@ -70,6 +81,7 @@ hard_coded_connector_info_modules() ->
-define(connector_type_to_resource_callback_module, connector_type_to_resource_callback_module). -define(connector_type_to_resource_callback_module, connector_type_to_resource_callback_module).
-define(connector_type_to_schema_module, connector_type_to_schema_module). -define(connector_type_to_schema_module, connector_type_to_schema_module).
-define(connector_type_to_config_schema, connector_type_to_config_schema). -define(connector_type_to_config_schema, connector_type_to_config_schema).
-define(connector_type_to_config_transform_module, connector_type_to_config_transform_module).
%% ==================================================================== %% ====================================================================
%% API %% API
@ -101,14 +113,19 @@ config_schema(ConnectorType) ->
maps:get(ConnectorType, ConToConfSchema). maps:get(ConnectorType, ConToConfSchema).
api_schema(ConnectorType, Method) -> api_schema(ConnectorType, Method) ->
InfoMod = get_info_module(ConnectorType), InfoMod = info_module(ConnectorType),
InfoMod:api_schema(Method). InfoMod:api_schema(Method).
config_transform_module(ConnectorType) ->
InfoMap = info_map(),
ConToConfTransMod = maps:get(?connector_type_to_config_transform_module, InfoMap),
maps:get(ConnectorType, ConToConfTransMod, undefined).
%% ==================================================================== %% ====================================================================
%% Internal functions for building the info map and accessing it %% Internal functions for building the info map and accessing it
%% ==================================================================== %% ====================================================================
get_info_module(ConnectorType) -> info_module(ConnectorType) ->
InfoMap = info_map(), InfoMap = info_map(),
ConToInfoMod = maps:get(?connector_type_to_info_module, InfoMap), ConToInfoMod = maps:get(?connector_type_to_info_module, InfoMap),
maps:get(ConnectorType, ConToInfoMod). maps:get(ConnectorType, ConToInfoMod).
@ -164,13 +181,21 @@ initial_info_map() ->
?connector_type_to_bridge_types => #{}, ?connector_type_to_bridge_types => #{},
?connector_type_to_resource_callback_module => #{}, ?connector_type_to_resource_callback_module => #{},
?connector_type_to_schema_module => #{}, ?connector_type_to_schema_module => #{},
?connector_type_to_config_schema => #{} ?connector_type_to_config_schema => #{},
?connector_type_to_config_transform_module => #{}
}. }.
get_info_map(Module) -> get_info_map(Module) ->
%% Force the module to get loaded %% Force the module to get loaded
_ = code:ensure_loaded(Module), _ = code:ensure_loaded(Module),
Type = Module:type_name(), Type = Module:type_name(),
ConfigTransformModule =
case erlang:function_exported(Module, config_transform_module, 0) of
true ->
Module:config_transform_module();
false ->
undefined
end,
#{ #{
?connector_type_names => #{ ?connector_type_names => #{
Type => true Type => true
@ -189,5 +214,8 @@ get_info_map(Module) ->
}, },
?connector_type_to_config_schema => #{ ?connector_type_to_config_schema => #{
Type => Module:config_schema() Type => Module:config_schema()
},
?connector_type_to_config_transform_module => #{
Type => ConfigTransformModule
} }
}. }.

View File

@ -67,15 +67,24 @@ connector_to_resource_type(ConnectorType) ->
connector_to_resource_type_ce(ConnectorType) connector_to_resource_type_ce(ConnectorType)
end. end.
connector_impl_module(ConnectorType) when is_binary(ConnectorType) ->
connector_impl_module(binary_to_atom(ConnectorType, utf8));
connector_impl_module(ConnectorType) -> connector_impl_module(ConnectorType) ->
emqx_connector_ee_schema:connector_impl_module(ConnectorType). case emqx_connector_ee_schema:connector_impl_module(ConnectorType) of
undefined ->
emqx_connector_info:config_transform_module(ConnectorType);
Module ->
Module
end.
-else. -else.
connector_to_resource_type(ConnectorType) -> connector_to_resource_type(ConnectorType) ->
connector_to_resource_type_ce(ConnectorType). connector_to_resource_type_ce(ConnectorType).
connector_impl_module(_ConnectorType) -> connector_impl_module(ConnectorType) when is_binary(ConnectorType) ->
undefined. connector_impl_module(binary_to_atom(ConnectorType, utf8));
connector_impl_module(ConnectorType) ->
emqx_connector_info:config_transform_module(ConnectorType).
-endif. -endif.

View File

@ -21,9 +21,6 @@
resource_type(Type) when is_binary(Type) -> resource_type(Type) when is_binary(Type) ->
resource_type(binary_to_atom(Type, utf8)); resource_type(binary_to_atom(Type, utf8));
resource_type(azure_event_hub_producer) ->
%% We use AEH's Kafka interface.
emqx_bridge_kafka_impl_producer;
resource_type(confluent_producer) -> resource_type(confluent_producer) ->
emqx_bridge_kafka_impl_producer; emqx_bridge_kafka_impl_producer;
resource_type(gcp_pubsub_consumer) -> resource_type(gcp_pubsub_consumer) ->
@ -93,8 +90,6 @@ resource_type(Type) ->
%% For connectors that need to override connector configurations. %% For connectors that need to override connector configurations.
connector_impl_module(ConnectorType) when is_binary(ConnectorType) -> connector_impl_module(ConnectorType) when is_binary(ConnectorType) ->
connector_impl_module(binary_to_atom(ConnectorType, utf8)); connector_impl_module(binary_to_atom(ConnectorType, utf8));
connector_impl_module(azure_event_hub_producer) ->
emqx_bridge_azure_event_hub;
connector_impl_module(confluent_producer) -> connector_impl_module(confluent_producer) ->
emqx_bridge_confluent_producer; emqx_bridge_confluent_producer;
connector_impl_module(iotdb) -> connector_impl_module(iotdb) ->
@ -119,14 +114,6 @@ fields(connectors) ->
connector_structs() -> connector_structs() ->
[ [
{azure_event_hub_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_azure_event_hub, "config_connector")),
#{
desc => <<"Azure Event Hub Connector Config">>,
required => false
}
)},
{confluent_producer, {confluent_producer,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_confluent_producer, "config_connector")), hoconsc:map(name, ref(emqx_bridge_confluent_producer, "config_connector")),
@ -364,7 +351,6 @@ connector_structs() ->
schema_modules() -> schema_modules() ->
[ [
emqx_bridge_azure_event_hub,
emqx_bridge_confluent_producer, emqx_bridge_confluent_producer,
emqx_bridge_gcp_pubsub_consumer_schema, emqx_bridge_gcp_pubsub_consumer_schema,
emqx_bridge_gcp_pubsub_producer_schema, emqx_bridge_gcp_pubsub_producer_schema,
@ -400,9 +386,6 @@ api_schemas(Method) ->
[ [
%% We need to map the `type' field of a request (binary) to a %% We need to map the `type' field of a request (binary) to a
%% connector schema module. %% connector schema module.
api_ref(
emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_connector"
),
api_ref( api_ref(
emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector" emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector"
), ),

View File

@ -130,8 +130,6 @@ connector_info_schema_modules() ->
%% from the latest connector type name. %% from the latest connector type name.
connector_type_to_bridge_types(http) -> connector_type_to_bridge_types(http) ->
[webhook, http]; [webhook, http];
connector_type_to_bridge_types(azure_event_hub_producer) ->
[azure_event_hub_producer];
connector_type_to_bridge_types(confluent_producer) -> connector_type_to_bridge_types(confluent_producer) ->
[confluent_producer]; [confluent_producer];
connector_type_to_bridge_types(gcp_pubsub_consumer) -> connector_type_to_bridge_types(gcp_pubsub_consumer) ->