refactor: add emqx_connector_info behavior
This commit adds the behavior `emqx_connector_info`. The `emqx_connector_info` behavior should be implement when creating a new connector to provide information about the connector (such as connector schema etc) to the `emqx_connetor` application. The connector in the `emqx_bridge_dynamo` application has also been refactored to use the new behavior (as a test to see that the behavior is working as intended). Fixes: https://emqx.atlassian.net/browse/EMQX-11427
This commit is contained in:
parent
bb050d9767
commit
af3a604354
|
@ -8,7 +8,10 @@
|
||||||
emqx_resource,
|
emqx_resource,
|
||||||
erlcloud
|
erlcloud
|
||||||
]},
|
]},
|
||||||
{env, [{emqx_action_info_modules, [emqx_bridge_dynamo_action_info]}]},
|
{env, [
|
||||||
|
{emqx_action_info_modules, [emqx_bridge_dynamo_action_info]},
|
||||||
|
{emqx_connector_info_modules, [emqx_bridge_dynamo_connector_info]}
|
||||||
|
]},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{links, []}
|
{links, []}
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_dynamo_connector_info).
|
||||||
|
|
||||||
|
-behaviour(emqx_connector_info).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
type_name/0,
|
||||||
|
bridge_types/0,
|
||||||
|
resource_callback_module/0,
|
||||||
|
config_schema/0,
|
||||||
|
schema_module/0,
|
||||||
|
api_schema/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
type_name() ->
|
||||||
|
dynamo.
|
||||||
|
|
||||||
|
bridge_types() ->
|
||||||
|
[dynamo].
|
||||||
|
|
||||||
|
resource_callback_module() ->
|
||||||
|
emqx_bridge_dynamo_connector.
|
||||||
|
|
||||||
|
config_schema() ->
|
||||||
|
{dynamo,
|
||||||
|
hoconsc:mk(
|
||||||
|
hoconsc:map(name, hoconsc:ref(emqx_bridge_dynamo, "config_connector")),
|
||||||
|
#{
|
||||||
|
desc => <<"DynamoDB Connector Config">>,
|
||||||
|
required => false
|
||||||
|
}
|
||||||
|
)}.
|
||||||
|
|
||||||
|
schema_module() ->
|
||||||
|
emqx_bridge_dynamo.
|
||||||
|
|
||||||
|
api_schema(Method) ->
|
||||||
|
emqx_connector_schema:api_ref(
|
||||||
|
emqx_bridge_dynamo, <<"dynamo">>, Method ++ "_connector"
|
||||||
|
).
|
|
@ -0,0 +1,193 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc The module which knows everything about connectors.
|
||||||
|
|
||||||
|
-module(emqx_connector_info).
|
||||||
|
|
||||||
|
%% Public API
|
||||||
|
-export([
|
||||||
|
connector_types/0,
|
||||||
|
bridge_types/1,
|
||||||
|
resource_callback_module/1,
|
||||||
|
schema_module/1,
|
||||||
|
config_schema/1,
|
||||||
|
api_schema/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([clean_cache/0]).
|
||||||
|
|
||||||
|
-callback type_name() -> atom().
|
||||||
|
-callback bridge_types() -> [atom()].
|
||||||
|
-callback resource_callback_module() -> atom().
|
||||||
|
-callback schema_module() -> atom().
|
||||||
|
-callback config_schema() -> term().
|
||||||
|
-callback api_schema([char()]) -> term().
|
||||||
|
|
||||||
|
%% ====================================================================
|
||||||
|
%% HardCoded list of info modules for connectors
|
||||||
|
%% TODO: Remove this list once we have made sure that all relevants
|
||||||
|
%% apps are loaded before this module is called.
|
||||||
|
%% ====================================================================
|
||||||
|
|
||||||
|
-if(?EMQX_RELEASE_EDITION == ee).
|
||||||
|
hard_coded_connector_info_modules_ee() ->
|
||||||
|
[
|
||||||
|
emqx_bridge_dynamo_connector_info
|
||||||
|
].
|
||||||
|
-else.
|
||||||
|
hard_coded_connector_info_modules_ee() ->
|
||||||
|
[].
|
||||||
|
-endif.
|
||||||
|
|
||||||
|
hard_coded_connector_info_modules_common() ->
|
||||||
|
[].
|
||||||
|
|
||||||
|
hard_coded_connector_info_modules() ->
|
||||||
|
hard_coded_connector_info_modules_common() ++ hard_coded_connector_info_modules_ee().
|
||||||
|
|
||||||
|
%% --------------------------------------------------------------------
|
||||||
|
%% Atom macros to avoid typos
|
||||||
|
%% --------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(emqx_connector_info_modules, emqx_connector_info_modules).
|
||||||
|
-define(connector_type_to_info_module, connector_type_to_info_module).
|
||||||
|
-define(connector_type_names, connector_type_names).
|
||||||
|
-define(connector_type_to_bridge_types, connector_type_to_bridge_types).
|
||||||
|
-define(connector_type_to_resource_callback_module, connector_type_to_resource_callback_module).
|
||||||
|
-define(connector_type_to_schema_module, connector_type_to_schema_module).
|
||||||
|
-define(connector_type_to_config_schema, connector_type_to_config_schema).
|
||||||
|
|
||||||
|
%% ====================================================================
|
||||||
|
%% API
|
||||||
|
%% ====================================================================
|
||||||
|
|
||||||
|
connector_types() ->
|
||||||
|
InfoMap = info_map(),
|
||||||
|
TypeNamesMap = maps:get(?connector_type_names, InfoMap),
|
||||||
|
maps:keys(TypeNamesMap).
|
||||||
|
|
||||||
|
bridge_types(ConnectorType) ->
|
||||||
|
InfoMap = info_map(),
|
||||||
|
ConToBridges = maps:get(?connector_type_to_bridge_types, InfoMap),
|
||||||
|
maps:get(ConnectorType, ConToBridges).
|
||||||
|
|
||||||
|
resource_callback_module(ConnectorType) ->
|
||||||
|
InfoMap = info_map(),
|
||||||
|
ConToCallbackMod = maps:get(?connector_type_to_resource_callback_module, InfoMap),
|
||||||
|
maps:get(ConnectorType, ConToCallbackMod).
|
||||||
|
|
||||||
|
schema_module(ConnectorType) ->
|
||||||
|
InfoMap = info_map(),
|
||||||
|
ConToSchemaMod = maps:get(?connector_type_to_schema_module, InfoMap),
|
||||||
|
maps:get(ConnectorType, ConToSchemaMod).
|
||||||
|
|
||||||
|
config_schema(ConnectorType) ->
|
||||||
|
InfoMap = info_map(),
|
||||||
|
ConToConfSchema = maps:get(?connector_type_to_config_schema, InfoMap),
|
||||||
|
maps:get(ConnectorType, ConToConfSchema).
|
||||||
|
|
||||||
|
api_schema(ConnectorType, Method) ->
|
||||||
|
InfoMod = get_info_module(ConnectorType),
|
||||||
|
InfoMod:api_schema(Method).
|
||||||
|
|
||||||
|
%% ====================================================================
|
||||||
|
%% Internal functions for building the info map and accessing it
|
||||||
|
%% ====================================================================
|
||||||
|
|
||||||
|
get_info_module(ConnectorType) ->
|
||||||
|
InfoMap = info_map(),
|
||||||
|
ConToInfoMod = maps:get(?connector_type_to_info_module, InfoMap),
|
||||||
|
maps:get(ConnectorType, ConToInfoMod).
|
||||||
|
|
||||||
|
internal_emqx_connector_persistent_term_info_key() ->
|
||||||
|
?FUNCTION_NAME.
|
||||||
|
|
||||||
|
info_map() ->
|
||||||
|
case persistent_term:get(internal_emqx_connector_persistent_term_info_key(), not_found) of
|
||||||
|
not_found ->
|
||||||
|
build_cache();
|
||||||
|
InfoMap ->
|
||||||
|
InfoMap
|
||||||
|
end.
|
||||||
|
|
||||||
|
build_cache() ->
|
||||||
|
InfoModules = connector_info_modules(),
|
||||||
|
InfoMap =
|
||||||
|
lists:foldl(
|
||||||
|
fun(Module, InfoMapSoFar) ->
|
||||||
|
ModuleInfoMap = get_info_map(Module),
|
||||||
|
emqx_utils_maps:deep_merge(InfoMapSoFar, ModuleInfoMap)
|
||||||
|
end,
|
||||||
|
initial_info_map(),
|
||||||
|
InfoModules
|
||||||
|
),
|
||||||
|
%% Update the persistent term with the new info map
|
||||||
|
persistent_term:put(internal_emqx_connector_persistent_term_info_key(), InfoMap),
|
||||||
|
InfoMap.
|
||||||
|
|
||||||
|
clean_cache() ->
|
||||||
|
persistent_term:erase(internal_emqx_connector_persistent_term_info_key()).
|
||||||
|
|
||||||
|
connector_info_modules() ->
|
||||||
|
InfoModules = [
|
||||||
|
connector_info_modules(App)
|
||||||
|
|| {App, _, _} <- application:loaded_applications()
|
||||||
|
],
|
||||||
|
lists:usort(lists:flatten(InfoModules) ++ hard_coded_connector_info_modules()).
|
||||||
|
|
||||||
|
connector_info_modules(App) ->
|
||||||
|
case application:get_env(App, ?emqx_connector_info_modules) of
|
||||||
|
{ok, Modules} ->
|
||||||
|
Modules;
|
||||||
|
_ ->
|
||||||
|
[]
|
||||||
|
end.
|
||||||
|
|
||||||
|
initial_info_map() ->
|
||||||
|
#{
|
||||||
|
?connector_type_names => #{},
|
||||||
|
?connector_type_to_info_module => #{},
|
||||||
|
?connector_type_to_bridge_types => #{},
|
||||||
|
?connector_type_to_resource_callback_module => #{},
|
||||||
|
?connector_type_to_schema_module => #{},
|
||||||
|
?connector_type_to_config_schema => #{}
|
||||||
|
}.
|
||||||
|
|
||||||
|
get_info_map(Module) ->
|
||||||
|
%% Force the module to get loaded
|
||||||
|
_ = code:ensure_loaded(Module),
|
||||||
|
Type = Module:type_name(),
|
||||||
|
#{
|
||||||
|
?connector_type_names => #{
|
||||||
|
Type => true
|
||||||
|
},
|
||||||
|
?connector_type_to_info_module => #{
|
||||||
|
Type => Module
|
||||||
|
},
|
||||||
|
?connector_type_to_bridge_types => #{
|
||||||
|
Type => Module:bridge_types()
|
||||||
|
},
|
||||||
|
?connector_type_to_resource_callback_module => #{
|
||||||
|
Type => Module:resource_callback_module()
|
||||||
|
},
|
||||||
|
?connector_type_to_schema_module => #{
|
||||||
|
Type => Module:schema_module()
|
||||||
|
},
|
||||||
|
?connector_type_to_config_schema => #{
|
||||||
|
Type => Module:config_schema()
|
||||||
|
}
|
||||||
|
}.
|
|
@ -26,8 +26,6 @@ resource_type(azure_event_hub_producer) ->
|
||||||
emqx_bridge_kafka_impl_producer;
|
emqx_bridge_kafka_impl_producer;
|
||||||
resource_type(confluent_producer) ->
|
resource_type(confluent_producer) ->
|
||||||
emqx_bridge_kafka_impl_producer;
|
emqx_bridge_kafka_impl_producer;
|
||||||
resource_type(dynamo) ->
|
|
||||||
emqx_bridge_dynamo_connector;
|
|
||||||
resource_type(gcp_pubsub_consumer) ->
|
resource_type(gcp_pubsub_consumer) ->
|
||||||
emqx_bridge_gcp_pubsub_impl_consumer;
|
emqx_bridge_gcp_pubsub_impl_consumer;
|
||||||
resource_type(gcp_pubsub_producer) ->
|
resource_type(gcp_pubsub_producer) ->
|
||||||
|
@ -85,7 +83,12 @@ resource_type(rabbitmq) ->
|
||||||
resource_type(s3) ->
|
resource_type(s3) ->
|
||||||
emqx_bridge_s3_connector;
|
emqx_bridge_s3_connector;
|
||||||
resource_type(Type) ->
|
resource_type(Type) ->
|
||||||
error({unknown_connector_type, Type}).
|
try
|
||||||
|
emqx_connector_info:resource_callback_module(Type)
|
||||||
|
catch
|
||||||
|
_:_ ->
|
||||||
|
error({unknown_connector_type, Type})
|
||||||
|
end.
|
||||||
|
|
||||||
%% For connectors that need to override connector configurations.
|
%% For connectors that need to override connector configurations.
|
||||||
connector_impl_module(ConnectorType) when is_binary(ConnectorType) ->
|
connector_impl_module(ConnectorType) when is_binary(ConnectorType) ->
|
||||||
|
@ -132,14 +135,6 @@ connector_structs() ->
|
||||||
required => false
|
required => false
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{dynamo,
|
|
||||||
mk(
|
|
||||||
hoconsc:map(name, ref(emqx_bridge_dynamo, "config_connector")),
|
|
||||||
#{
|
|
||||||
desc => <<"DynamoDB Connector Config">>,
|
|
||||||
required => false
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{gcp_pubsub_consumer,
|
{gcp_pubsub_consumer,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_consumer_schema, "config_connector")),
|
hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_consumer_schema, "config_connector")),
|
||||||
|
@ -371,7 +366,6 @@ schema_modules() ->
|
||||||
[
|
[
|
||||||
emqx_bridge_azure_event_hub,
|
emqx_bridge_azure_event_hub,
|
||||||
emqx_bridge_confluent_producer,
|
emqx_bridge_confluent_producer,
|
||||||
emqx_bridge_dynamo,
|
|
||||||
emqx_bridge_gcp_pubsub_consumer_schema,
|
emqx_bridge_gcp_pubsub_consumer_schema,
|
||||||
emqx_bridge_gcp_pubsub_producer_schema,
|
emqx_bridge_gcp_pubsub_producer_schema,
|
||||||
emqx_bridge_hstreamdb,
|
emqx_bridge_hstreamdb,
|
||||||
|
@ -412,9 +406,6 @@ api_schemas(Method) ->
|
||||||
api_ref(
|
api_ref(
|
||||||
emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector"
|
emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector"
|
||||||
),
|
),
|
||||||
api_ref(
|
|
||||||
emqx_bridge_dynamo, <<"dynamo">>, Method ++ "_connector"
|
|
||||||
),
|
|
||||||
api_ref(
|
api_ref(
|
||||||
emqx_bridge_gcp_pubsub_consumer_schema,
|
emqx_bridge_gcp_pubsub_consumer_schema,
|
||||||
<<"gcp_pubsub_consumer">>,
|
<<"gcp_pubsub_consumer">>,
|
||||||
|
|
|
@ -29,7 +29,8 @@
|
||||||
transform_bridges_v1_to_connectors_and_bridges_v2/1,
|
transform_bridges_v1_to_connectors_and_bridges_v2/1,
|
||||||
transform_bridge_v1_config_to_action_config/4,
|
transform_bridge_v1_config_to_action_config/4,
|
||||||
top_level_common_connector_keys/0,
|
top_level_common_connector_keys/0,
|
||||||
project_to_connector_resource_opts/1
|
project_to_connector_resource_opts/1,
|
||||||
|
api_ref/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
|
-export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
|
||||||
|
@ -112,12 +113,19 @@ examples(Method) ->
|
||||||
-if(?EMQX_RELEASE_EDITION == ee).
|
-if(?EMQX_RELEASE_EDITION == ee).
|
||||||
schema_modules() ->
|
schema_modules() ->
|
||||||
[emqx_bridge_http_schema, emqx_bridge_mqtt_connector_schema] ++
|
[emqx_bridge_http_schema, emqx_bridge_mqtt_connector_schema] ++
|
||||||
emqx_connector_ee_schema:schema_modules().
|
emqx_connector_ee_schema:schema_modules() ++ connector_info_schema_modules().
|
||||||
-else.
|
-else.
|
||||||
schema_modules() ->
|
schema_modules() ->
|
||||||
[emqx_bridge_http_schema, emqx_bridge_mqtt_connector_schema].
|
[emqx_bridge_http_schema, emqx_bridge_mqtt_connector_schema] ++ connector_info_schema_modules().
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
|
connector_info_schema_modules() ->
|
||||||
|
ConnectorTypes = emqx_connector_info:connector_types(),
|
||||||
|
[
|
||||||
|
emqx_connector_info:schema_module(Type)
|
||||||
|
|| Type <- ConnectorTypes
|
||||||
|
].
|
||||||
|
|
||||||
%% @doc Return old bridge(v1) and/or connector(v2) type
|
%% @doc Return old bridge(v1) and/or connector(v2) type
|
||||||
%% from the latest connector type name.
|
%% from the latest connector type name.
|
||||||
connector_type_to_bridge_types(http) ->
|
connector_type_to_bridge_types(http) ->
|
||||||
|
@ -126,8 +134,6 @@ connector_type_to_bridge_types(azure_event_hub_producer) ->
|
||||||
[azure_event_hub_producer];
|
[azure_event_hub_producer];
|
||||||
connector_type_to_bridge_types(confluent_producer) ->
|
connector_type_to_bridge_types(confluent_producer) ->
|
||||||
[confluent_producer];
|
[confluent_producer];
|
||||||
connector_type_to_bridge_types(dynamo) ->
|
|
||||||
[dynamo];
|
|
||||||
connector_type_to_bridge_types(gcp_pubsub_consumer) ->
|
connector_type_to_bridge_types(gcp_pubsub_consumer) ->
|
||||||
[gcp_pubsub_consumer];
|
[gcp_pubsub_consumer];
|
||||||
connector_type_to_bridge_types(gcp_pubsub_producer) ->
|
connector_type_to_bridge_types(gcp_pubsub_producer) ->
|
||||||
|
@ -185,7 +191,9 @@ connector_type_to_bridge_types(tdengine) ->
|
||||||
connector_type_to_bridge_types(rabbitmq) ->
|
connector_type_to_bridge_types(rabbitmq) ->
|
||||||
[rabbitmq];
|
[rabbitmq];
|
||||||
connector_type_to_bridge_types(s3) ->
|
connector_type_to_bridge_types(s3) ->
|
||||||
[s3].
|
[s3];
|
||||||
|
connector_type_to_bridge_types(Type) ->
|
||||||
|
emqx_connector_info:bridge_types(Type).
|
||||||
|
|
||||||
actions_config_name(action) -> <<"actions">>;
|
actions_config_name(action) -> <<"actions">>;
|
||||||
actions_config_name(source) -> <<"sources">>.
|
actions_config_name(source) -> <<"sources">>.
|
||||||
|
@ -481,7 +489,15 @@ post_request() ->
|
||||||
api_schema(Method) ->
|
api_schema(Method) ->
|
||||||
CE = api_schemas(Method),
|
CE = api_schemas(Method),
|
||||||
EE = enterprise_api_schemas(Method),
|
EE = enterprise_api_schemas(Method),
|
||||||
hoconsc:union(connector_api_union(CE ++ EE)).
|
InfoModSchemas = emqx_connector_info_api_schemas(Method),
|
||||||
|
hoconsc:union(connector_api_union(CE ++ EE ++ InfoModSchemas)).
|
||||||
|
|
||||||
|
emqx_connector_info_api_schemas(Method) ->
|
||||||
|
ConnectorTypes = emqx_connector_info:connector_types(),
|
||||||
|
[
|
||||||
|
emqx_connector_info:api_schema(Type, Method)
|
||||||
|
|| Type <- ConnectorTypes
|
||||||
|
].
|
||||||
|
|
||||||
connector_api_union(Refs) ->
|
connector_api_union(Refs) ->
|
||||||
Index = maps:from_list(Refs),
|
Index = maps:from_list(Refs),
|
||||||
|
@ -544,7 +560,7 @@ fields(connectors) ->
|
||||||
required => false
|
required => false
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
] ++ enterprise_fields_connectors();
|
] ++ enterprise_fields_connectors() ++ connector_info_fields_connectors();
|
||||||
fields("node_status") ->
|
fields("node_status") ->
|
||||||
[
|
[
|
||||||
node_name(),
|
node_name(),
|
||||||
|
@ -557,6 +573,13 @@ fields("node_status") ->
|
||||||
})}
|
})}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
connector_info_fields_connectors() ->
|
||||||
|
ConnectorTypes = emqx_connector_info:connector_types(),
|
||||||
|
[
|
||||||
|
emqx_connector_info:config_schema(Type)
|
||||||
|
|| Type <- ConnectorTypes
|
||||||
|
].
|
||||||
|
|
||||||
desc(connectors) ->
|
desc(connectors) ->
|
||||||
?DESC("desc_connectors");
|
?DESC("desc_connectors");
|
||||||
desc("node_status") ->
|
desc("node_status") ->
|
||||||
|
|
Loading…
Reference in New Issue