From af3a6043541e2721d6f078bb526fd31bab11888f Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 18 Mar 2024 13:36:27 +0100 Subject: [PATCH] refactor: add emqx_connector_info behavior This commit adds the behavior `emqx_connector_info`. The `emqx_connector_info` behavior should be implement when creating a new connector to provide information about the connector (such as connector schema etc) to the `emqx_connetor` application. The connector in the `emqx_bridge_dynamo` application has also been refactored to use the new behavior (as a test to see that the behavior is working as intended). Fixes: https://emqx.atlassian.net/browse/EMQX-11427 --- .../src/emqx_bridge_dynamo.app.src | 5 +- .../src/emqx_bridge_dynamo_connector_info.erl | 43 ++++ .../src/emqx_connector_info.erl | 193 ++++++++++++++++++ .../src/schema/emqx_connector_ee_schema.erl | 21 +- .../src/schema/emqx_connector_schema.erl | 39 +++- 5 files changed, 277 insertions(+), 24 deletions(-) create mode 100644 apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_info.erl create mode 100644 apps/emqx_connector/src/emqx_connector_info.erl diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index 580f4eebc..81e7fc128 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -8,7 +8,10 @@ emqx_resource, erlcloud ]}, - {env, [{emqx_action_info_modules, [emqx_bridge_dynamo_action_info]}]}, + {env, [ + {emqx_action_info_modules, [emqx_bridge_dynamo_action_info]}, + {emqx_connector_info_modules, [emqx_bridge_dynamo_connector_info]} + ]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_info.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_info.erl new file mode 100644 index 000000000..f82420338 --- /dev/null +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_info.erl @@ -0,0 +1,43 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_dynamo_connector_info). + +-behaviour(emqx_connector_info). + +-export([ + type_name/0, + bridge_types/0, + resource_callback_module/0, + config_schema/0, + schema_module/0, + api_schema/1 +]). + +type_name() -> + dynamo. + +bridge_types() -> + [dynamo]. + +resource_callback_module() -> + emqx_bridge_dynamo_connector. + +config_schema() -> + {dynamo, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_bridge_dynamo, "config_connector")), + #{ + desc => <<"DynamoDB Connector Config">>, + required => false + } + )}. + +schema_module() -> + emqx_bridge_dynamo. + +api_schema(Method) -> + emqx_connector_schema:api_ref( + emqx_bridge_dynamo, <<"dynamo">>, Method ++ "_connector" + ). diff --git a/apps/emqx_connector/src/emqx_connector_info.erl b/apps/emqx_connector/src/emqx_connector_info.erl new file mode 100644 index 000000000..d3e7dd27e --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_info.erl @@ -0,0 +1,193 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc The module which knows everything about connectors. + +-module(emqx_connector_info). + +%% Public API +-export([ + connector_types/0, + bridge_types/1, + resource_callback_module/1, + schema_module/1, + config_schema/1, + api_schema/2 +]). + +-export([clean_cache/0]). + +-callback type_name() -> atom(). +-callback bridge_types() -> [atom()]. +-callback resource_callback_module() -> atom(). +-callback schema_module() -> atom(). +-callback config_schema() -> term(). +-callback api_schema([char()]) -> term(). + +%% ==================================================================== +%% HardCoded list of info modules for connectors +%% TODO: Remove this list once we have made sure that all relevants +%% apps are loaded before this module is called. +%% ==================================================================== + +-if(?EMQX_RELEASE_EDITION == ee). +hard_coded_connector_info_modules_ee() -> + [ + emqx_bridge_dynamo_connector_info + ]. +-else. +hard_coded_connector_info_modules_ee() -> + []. +-endif. + +hard_coded_connector_info_modules_common() -> + []. + +hard_coded_connector_info_modules() -> + hard_coded_connector_info_modules_common() ++ hard_coded_connector_info_modules_ee(). + +%% -------------------------------------------------------------------- +%% Atom macros to avoid typos +%% -------------------------------------------------------------------- + +-define(emqx_connector_info_modules, emqx_connector_info_modules). +-define(connector_type_to_info_module, connector_type_to_info_module). +-define(connector_type_names, connector_type_names). +-define(connector_type_to_bridge_types, connector_type_to_bridge_types). +-define(connector_type_to_resource_callback_module, connector_type_to_resource_callback_module). +-define(connector_type_to_schema_module, connector_type_to_schema_module). +-define(connector_type_to_config_schema, connector_type_to_config_schema). + +%% ==================================================================== +%% API +%% ==================================================================== + +connector_types() -> + InfoMap = info_map(), + TypeNamesMap = maps:get(?connector_type_names, InfoMap), + maps:keys(TypeNamesMap). + +bridge_types(ConnectorType) -> + InfoMap = info_map(), + ConToBridges = maps:get(?connector_type_to_bridge_types, InfoMap), + maps:get(ConnectorType, ConToBridges). + +resource_callback_module(ConnectorType) -> + InfoMap = info_map(), + ConToCallbackMod = maps:get(?connector_type_to_resource_callback_module, InfoMap), + maps:get(ConnectorType, ConToCallbackMod). + +schema_module(ConnectorType) -> + InfoMap = info_map(), + ConToSchemaMod = maps:get(?connector_type_to_schema_module, InfoMap), + maps:get(ConnectorType, ConToSchemaMod). + +config_schema(ConnectorType) -> + InfoMap = info_map(), + ConToConfSchema = maps:get(?connector_type_to_config_schema, InfoMap), + maps:get(ConnectorType, ConToConfSchema). + +api_schema(ConnectorType, Method) -> + InfoMod = get_info_module(ConnectorType), + InfoMod:api_schema(Method). + +%% ==================================================================== +%% Internal functions for building the info map and accessing it +%% ==================================================================== + +get_info_module(ConnectorType) -> + InfoMap = info_map(), + ConToInfoMod = maps:get(?connector_type_to_info_module, InfoMap), + maps:get(ConnectorType, ConToInfoMod). + +internal_emqx_connector_persistent_term_info_key() -> + ?FUNCTION_NAME. + +info_map() -> + case persistent_term:get(internal_emqx_connector_persistent_term_info_key(), not_found) of + not_found -> + build_cache(); + InfoMap -> + InfoMap + end. + +build_cache() -> + InfoModules = connector_info_modules(), + InfoMap = + lists:foldl( + fun(Module, InfoMapSoFar) -> + ModuleInfoMap = get_info_map(Module), + emqx_utils_maps:deep_merge(InfoMapSoFar, ModuleInfoMap) + end, + initial_info_map(), + InfoModules + ), + %% Update the persistent term with the new info map + persistent_term:put(internal_emqx_connector_persistent_term_info_key(), InfoMap), + InfoMap. + +clean_cache() -> + persistent_term:erase(internal_emqx_connector_persistent_term_info_key()). + +connector_info_modules() -> + InfoModules = [ + connector_info_modules(App) + || {App, _, _} <- application:loaded_applications() + ], + lists:usort(lists:flatten(InfoModules) ++ hard_coded_connector_info_modules()). + +connector_info_modules(App) -> + case application:get_env(App, ?emqx_connector_info_modules) of + {ok, Modules} -> + Modules; + _ -> + [] + end. + +initial_info_map() -> + #{ + ?connector_type_names => #{}, + ?connector_type_to_info_module => #{}, + ?connector_type_to_bridge_types => #{}, + ?connector_type_to_resource_callback_module => #{}, + ?connector_type_to_schema_module => #{}, + ?connector_type_to_config_schema => #{} + }. + +get_info_map(Module) -> + %% Force the module to get loaded + _ = code:ensure_loaded(Module), + Type = Module:type_name(), + #{ + ?connector_type_names => #{ + Type => true + }, + ?connector_type_to_info_module => #{ + Type => Module + }, + ?connector_type_to_bridge_types => #{ + Type => Module:bridge_types() + }, + ?connector_type_to_resource_callback_module => #{ + Type => Module:resource_callback_module() + }, + ?connector_type_to_schema_module => #{ + Type => Module:schema_module() + }, + ?connector_type_to_config_schema => #{ + Type => Module:config_schema() + } + }. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index d45d5de35..bf5d9d619 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -26,8 +26,6 @@ resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer; resource_type(confluent_producer) -> emqx_bridge_kafka_impl_producer; -resource_type(dynamo) -> - emqx_bridge_dynamo_connector; resource_type(gcp_pubsub_consumer) -> emqx_bridge_gcp_pubsub_impl_consumer; resource_type(gcp_pubsub_producer) -> @@ -85,7 +83,12 @@ resource_type(rabbitmq) -> resource_type(s3) -> emqx_bridge_s3_connector; resource_type(Type) -> - error({unknown_connector_type, Type}). + try + emqx_connector_info:resource_callback_module(Type) + catch + _:_ -> + error({unknown_connector_type, Type}) + end. %% For connectors that need to override connector configurations. connector_impl_module(ConnectorType) when is_binary(ConnectorType) -> @@ -132,14 +135,6 @@ connector_structs() -> required => false } )}, - {dynamo, - mk( - hoconsc:map(name, ref(emqx_bridge_dynamo, "config_connector")), - #{ - desc => <<"DynamoDB Connector Config">>, - required => false - } - )}, {gcp_pubsub_consumer, mk( hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_consumer_schema, "config_connector")), @@ -371,7 +366,6 @@ schema_modules() -> [ emqx_bridge_azure_event_hub, emqx_bridge_confluent_producer, - emqx_bridge_dynamo, emqx_bridge_gcp_pubsub_consumer_schema, emqx_bridge_gcp_pubsub_producer_schema, emqx_bridge_hstreamdb, @@ -412,9 +406,6 @@ api_schemas(Method) -> api_ref( emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector" ), - api_ref( - emqx_bridge_dynamo, <<"dynamo">>, Method ++ "_connector" - ), api_ref( emqx_bridge_gcp_pubsub_consumer_schema, <<"gcp_pubsub_consumer">>, diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index d68514c41..4ce386534 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -29,7 +29,8 @@ transform_bridges_v1_to_connectors_and_bridges_v2/1, transform_bridge_v1_config_to_action_config/4, top_level_common_connector_keys/0, - project_to_connector_resource_opts/1 + project_to_connector_resource_opts/1, + api_ref/3 ]). -export([roots/0, fields/1, desc/1, namespace/0, tags/0]). @@ -112,12 +113,19 @@ examples(Method) -> -if(?EMQX_RELEASE_EDITION == ee). schema_modules() -> [emqx_bridge_http_schema, emqx_bridge_mqtt_connector_schema] ++ - emqx_connector_ee_schema:schema_modules(). + emqx_connector_ee_schema:schema_modules() ++ connector_info_schema_modules(). -else. schema_modules() -> - [emqx_bridge_http_schema, emqx_bridge_mqtt_connector_schema]. + [emqx_bridge_http_schema, emqx_bridge_mqtt_connector_schema] ++ connector_info_schema_modules(). -endif. +connector_info_schema_modules() -> + ConnectorTypes = emqx_connector_info:connector_types(), + [ + emqx_connector_info:schema_module(Type) + || Type <- ConnectorTypes + ]. + %% @doc Return old bridge(v1) and/or connector(v2) type %% from the latest connector type name. connector_type_to_bridge_types(http) -> @@ -126,8 +134,6 @@ connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]; connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; -connector_type_to_bridge_types(dynamo) -> - [dynamo]; connector_type_to_bridge_types(gcp_pubsub_consumer) -> [gcp_pubsub_consumer]; connector_type_to_bridge_types(gcp_pubsub_producer) -> @@ -185,7 +191,9 @@ connector_type_to_bridge_types(tdengine) -> connector_type_to_bridge_types(rabbitmq) -> [rabbitmq]; connector_type_to_bridge_types(s3) -> - [s3]. + [s3]; +connector_type_to_bridge_types(Type) -> + emqx_connector_info:bridge_types(Type). actions_config_name(action) -> <<"actions">>; actions_config_name(source) -> <<"sources">>. @@ -481,7 +489,15 @@ post_request() -> api_schema(Method) -> CE = api_schemas(Method), EE = enterprise_api_schemas(Method), - hoconsc:union(connector_api_union(CE ++ EE)). + InfoModSchemas = emqx_connector_info_api_schemas(Method), + hoconsc:union(connector_api_union(CE ++ EE ++ InfoModSchemas)). + +emqx_connector_info_api_schemas(Method) -> + ConnectorTypes = emqx_connector_info:connector_types(), + [ + emqx_connector_info:api_schema(Type, Method) + || Type <- ConnectorTypes + ]. connector_api_union(Refs) -> Index = maps:from_list(Refs), @@ -544,7 +560,7 @@ fields(connectors) -> required => false } )} - ] ++ enterprise_fields_connectors(); + ] ++ enterprise_fields_connectors() ++ connector_info_fields_connectors(); fields("node_status") -> [ node_name(), @@ -557,6 +573,13 @@ fields("node_status") -> })} ]. +connector_info_fields_connectors() -> + ConnectorTypes = emqx_connector_info:connector_types(), + [ + emqx_connector_info:config_schema(Type) + || Type <- ConnectorTypes + ]. + desc(connectors) -> ?DESC("desc_connectors"); desc("node_status") ->