diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src index b588d1e18..7e70fffff 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_azure_event_hub, [ {description, "EMQX Enterprise Azure Event Hub Bridge"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [ kernel, @@ -9,7 +9,10 @@ telemetry, wolff ]}, - {env, [{emqx_action_info_modules, [emqx_bridge_azure_event_hub_action_info]}]}, + {env, [ + {emqx_action_info_modules, [emqx_bridge_azure_event_hub_action_info]}, + {emqx_connector_info_modules, [emqx_bridge_azure_event_hub_connector_info]} + ]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_connector_info.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_connector_info.erl new file mode 100644 index 000000000..932df54fa --- /dev/null +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_connector_info.erl @@ -0,0 +1,58 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% @doc +%% Implementation of emqx_connector_info for Azure Event Hub connector. +%% This module provides connector-specific information and configurations +%% required by the emqx_connector application. +%%-------------------------------------------------------------------- +-module(emqx_bridge_azure_event_hub_connector_info). + +-behaviour(emqx_connector_info). + +%% API exports. +-export([ + type_name/0, + bridge_types/0, + resource_callback_module/0, + config_transform_module/0, + config_schema/0, + schema_module/0, + api_schema/1 +]). + +%%-------------------------------------------------------------------- +%% API Functions +%%-------------------------------------------------------------------- + +type_name() -> + azure_event_hub_producer. + +bridge_types() -> + [azure_event_hub_producer]. + +resource_callback_module() -> + emqx_bridge_kafka_impl_producer. + +config_transform_module() -> + emqx_bridge_azure_event_hub. + +config_schema() -> + {azure_event_hub_producer, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_bridge_azure_event_hub, "config_connector")), + #{ + desc => <<"Azure Event Hub Connector Config">>, + required => false + } + )}. + +schema_module() -> + emqx_bridge_azure_event_hub. + +api_schema(Method) -> + emqx_connector_schema:api_ref( + emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_connector" + ). diff --git a/apps/emqx_connector/src/emqx_connector_info.erl b/apps/emqx_connector/src/emqx_connector_info.erl index d3e7dd27e..2589726d4 100644 --- a/apps/emqx_connector/src/emqx_connector_info.erl +++ b/apps/emqx_connector/src/emqx_connector_info.erl @@ -25,7 +25,8 @@ resource_callback_module/1, schema_module/1, config_schema/1, - api_schema/2 + api_schema/2, + config_transform_module/1 ]). -export([clean_cache/0]). @@ -36,6 +37,15 @@ -callback schema_module() -> atom(). -callback config_schema() -> term(). -callback api_schema([char()]) -> term(). +%% Optional callback that should return a module with an exported +%% connector_config/2 function. If present this function will be used to +%% transfrom the connector configuration. See the callback connector_config/2 +%% in emqx_connector_resource for more information. +-callback config_transform_module() -> atom(). + +-optional_callbacks([ + config_transform_module/0 +]). %% ==================================================================== %% HardCoded list of info modules for connectors @@ -46,7 +56,8 @@ -if(?EMQX_RELEASE_EDITION == ee). hard_coded_connector_info_modules_ee() -> [ - emqx_bridge_dynamo_connector_info + emqx_bridge_dynamo_connector_info, + emqx_bridge_azure_event_hub_connector_info ]. -else. hard_coded_connector_info_modules_ee() -> @@ -70,6 +81,7 @@ hard_coded_connector_info_modules() -> -define(connector_type_to_resource_callback_module, connector_type_to_resource_callback_module). -define(connector_type_to_schema_module, connector_type_to_schema_module). -define(connector_type_to_config_schema, connector_type_to_config_schema). +-define(connector_type_to_config_transform_module, connector_type_to_config_transform_module). %% ==================================================================== %% API @@ -101,14 +113,19 @@ config_schema(ConnectorType) -> maps:get(ConnectorType, ConToConfSchema). api_schema(ConnectorType, Method) -> - InfoMod = get_info_module(ConnectorType), + InfoMod = info_module(ConnectorType), InfoMod:api_schema(Method). +config_transform_module(ConnectorType) -> + InfoMap = info_map(), + ConToConfTransMod = maps:get(?connector_type_to_config_transform_module, InfoMap), + maps:get(ConnectorType, ConToConfTransMod, undefined). + %% ==================================================================== %% Internal functions for building the info map and accessing it %% ==================================================================== -get_info_module(ConnectorType) -> +info_module(ConnectorType) -> InfoMap = info_map(), ConToInfoMod = maps:get(?connector_type_to_info_module, InfoMap), maps:get(ConnectorType, ConToInfoMod). @@ -164,13 +181,21 @@ initial_info_map() -> ?connector_type_to_bridge_types => #{}, ?connector_type_to_resource_callback_module => #{}, ?connector_type_to_schema_module => #{}, - ?connector_type_to_config_schema => #{} + ?connector_type_to_config_schema => #{}, + ?connector_type_to_config_transform_module => #{} }. get_info_map(Module) -> %% Force the module to get loaded _ = code:ensure_loaded(Module), Type = Module:type_name(), + ConfigTransformModule = + case erlang:function_exported(Module, config_transform_module, 0) of + true -> + Module:config_transform_module(); + false -> + undefined + end, #{ ?connector_type_names => #{ Type => true @@ -189,5 +214,8 @@ get_info_map(Module) -> }, ?connector_type_to_config_schema => #{ Type => Module:config_schema() + }, + ?connector_type_to_config_transform_module => #{ + Type => ConfigTransformModule } }. diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index fbebc20f5..1b434496a 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -67,15 +67,24 @@ connector_to_resource_type(ConnectorType) -> connector_to_resource_type_ce(ConnectorType) end. +connector_impl_module(ConnectorType) when is_binary(ConnectorType) -> + connector_impl_module(binary_to_atom(ConnectorType, utf8)); connector_impl_module(ConnectorType) -> - emqx_connector_ee_schema:connector_impl_module(ConnectorType). + case emqx_connector_ee_schema:connector_impl_module(ConnectorType) of + undefined -> + emqx_connector_info:config_transform_module(ConnectorType); + Module -> + Module + end. -else. connector_to_resource_type(ConnectorType) -> connector_to_resource_type_ce(ConnectorType). -connector_impl_module(_ConnectorType) -> - undefined. +connector_impl_module(ConnectorType) when is_binary(ConnectorType) -> + connector_impl_module(binary_to_atom(ConnectorType, utf8)); +connector_impl_module(ConnectorType) -> + emqx_connector_info:config_transform_module(ConnectorType). -endif. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index bf5d9d619..f7fd3db7a 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -21,9 +21,6 @@ resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); -resource_type(azure_event_hub_producer) -> - %% We use AEH's Kafka interface. - emqx_bridge_kafka_impl_producer; resource_type(confluent_producer) -> emqx_bridge_kafka_impl_producer; resource_type(gcp_pubsub_consumer) -> @@ -93,8 +90,6 @@ resource_type(Type) -> %% For connectors that need to override connector configurations. connector_impl_module(ConnectorType) when is_binary(ConnectorType) -> connector_impl_module(binary_to_atom(ConnectorType, utf8)); -connector_impl_module(azure_event_hub_producer) -> - emqx_bridge_azure_event_hub; connector_impl_module(confluent_producer) -> emqx_bridge_confluent_producer; connector_impl_module(iotdb) -> @@ -119,14 +114,6 @@ fields(connectors) -> connector_structs() -> [ - {azure_event_hub_producer, - mk( - hoconsc:map(name, ref(emqx_bridge_azure_event_hub, "config_connector")), - #{ - desc => <<"Azure Event Hub Connector Config">>, - required => false - } - )}, {confluent_producer, mk( hoconsc:map(name, ref(emqx_bridge_confluent_producer, "config_connector")), @@ -364,7 +351,6 @@ connector_structs() -> schema_modules() -> [ - emqx_bridge_azure_event_hub, emqx_bridge_confluent_producer, emqx_bridge_gcp_pubsub_consumer_schema, emqx_bridge_gcp_pubsub_producer_schema, @@ -400,9 +386,6 @@ api_schemas(Method) -> [ %% We need to map the `type' field of a request (binary) to a %% connector schema module. - api_ref( - emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_connector" - ), api_ref( emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector" ), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 4ce386534..b68dc194c 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -130,8 +130,6 @@ connector_info_schema_modules() -> %% from the latest connector type name. connector_type_to_bridge_types(http) -> [webhook, http]; -connector_type_to_bridge_types(azure_event_hub_producer) -> - [azure_event_hub_producer]; connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; connector_type_to_bridge_types(gcp_pubsub_consumer) ->