emqx/apps/emqx_bridge/src/emqx_action_info.erl

434 lines
16 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The module which knows everything about actions.
%% NOTE: it does not cover the V1 bridges.
-module(emqx_action_info).
-export([
action_type_to_connector_type/1,
action_type_to_bridge_v1_type/2,
bridge_v1_type_to_action_type/1,
bridge_v1_type_name/1,
is_action_type/1,
is_source/1,
is_action/1,
registered_schema_modules_actions/0,
registered_schema_modules_sources/0,
connector_action_config_to_bridge_v1_config/2,
connector_action_config_to_bridge_v1_config/3,
bridge_v1_config_to_connector_config/2,
has_custom_bridge_v1_config_to_connector_config/1,
bridge_v1_config_to_action_config/3,
has_custom_bridge_v1_config_to_action_config/1,
transform_bridge_v1_config_to_action_config/4,
action_convert_from_connector/3
]).
-export([clean_cache/0]).
-callback bridge_v1_type_name() ->
atom()
| {
fun(({ActionConfig :: map(), ConnectorConfig :: map()}) -> Type :: atom()),
TypeList :: [atom()]
}.
-callback action_type_name() -> atom().
-callback connector_type_name() -> atom().
-callback schema_module() -> atom().
%% Define this if the automatic config downgrade is not enough for the bridge.
-callback connector_action_config_to_bridge_v1_config(
ConnectorConfig :: map(), ActionConfig :: map()
) -> map().
%% Define this if the automatic config upgrade is not enough for the connector.
-callback bridge_v1_config_to_connector_config(BridgeV1Config :: map()) ->
map() | {ConnectorTypeName :: atom(), map()}.
%% Define this if the automatic config upgrade is not enough for the bridge.
%% If you want to make use of the automatic config upgrade, you can call
%% emqx_action_info:transform_bridge_v1_config_to_action_config/4 in your
%% implementation and do some adjustments on the result.
-callback bridge_v1_config_to_action_config(BridgeV1Config :: map(), ConnectorName :: binary()) ->
map() | {source | action, ActionTypeName :: atom(), map()} | 'none'.
-callback is_source() ->
boolean().
-callback is_action() ->
boolean().
-optional_callbacks([
bridge_v1_type_name/0,
connector_action_config_to_bridge_v1_config/2,
bridge_v1_config_to_connector_config/1,
bridge_v1_config_to_action_config/2,
is_source/0,
is_action/0
]).
%% ====================================================================
%% HardCoded list of info modules for actions
%% TODO: Remove this list once we have made sure that all relevants
%% apps are loaded before this module is called.
%% ====================================================================
-if(?EMQX_RELEASE_EDITION == ee).
hard_coded_action_info_modules_ee() ->
[
emqx_bridge_azure_event_hub_action_info,
emqx_bridge_confluent_producer_action_info,
emqx_bridge_dynamo_action_info,
emqx_bridge_gcp_pubsub_consumer_action_info,
emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_action_info,
emqx_bridge_kafka_consumer_action_info,
emqx_bridge_kinesis_action_info,
emqx_bridge_hstreamdb_action_info,
emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_oracle_action_info,
emqx_bridge_rocketmq_action_info,
emqx_bridge_influxdb_action_info,
emqx_bridge_cassandra_action_info,
emqx_bridge_clickhouse_action_info,
emqx_bridge_mysql_action_info,
emqx_bridge_pgsql_action_info,
emqx_bridge_syskeeper_action_info,
emqx_bridge_sqlserver_action_info,
emqx_bridge_timescale_action_info,
emqx_bridge_redis_action_info,
emqx_bridge_iotdb_action_info,
emqx_bridge_es_action_info,
emqx_bridge_opents_action_info,
emqx_bridge_rabbitmq_action_info,
emqx_bridge_pulsar_action_info,
emqx_bridge_greptimedb_action_info,
emqx_bridge_tdengine_action_info,
emqx_bridge_s3_action_info
].
-else.
hard_coded_action_info_modules_ee() ->
[].
-endif.
hard_coded_action_info_modules_common() ->
[
emqx_bridge_http_action_info,
emqx_bridge_mqtt_pubsub_action_info
].
hard_coded_action_info_modules() ->
hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee().
%% ====================================================================
%% API
%% ====================================================================
action_type_to_connector_type(Type) when not is_atom(Type) ->
action_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type)));
action_type_to_connector_type(Type) ->
ActionInfoMap = info_map(),
ActionTypeToConnectorTypeMap = maps:get(action_type_to_connector_type, ActionInfoMap),
case maps:get(Type, ActionTypeToConnectorTypeMap, undefined) of
undefined -> Type;
ConnectorType -> ConnectorType
end.
bridge_v1_type_to_action_type(Bin) when is_binary(Bin) ->
bridge_v1_type_to_action_type(binary_to_existing_atom(Bin));
bridge_v1_type_to_action_type(Type) ->
ActionInfoMap = info_map(),
BridgeV1TypeToActionType = maps:get(bridge_v1_type_to_action_type, ActionInfoMap),
case maps:get(Type, BridgeV1TypeToActionType, undefined) of
undefined -> Type;
ActionType -> ActionType
end.
action_type_to_bridge_v1_type(Bin, Conf) when is_binary(Bin) ->
action_type_to_bridge_v1_type(binary_to_existing_atom(Bin), Conf);
action_type_to_bridge_v1_type(ActionType, ActionConf) ->
ActionInfoMap = info_map(),
ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap),
case maps:get(ActionType, ActionTypeToBridgeV1Type, undefined) of
undefined ->
ActionType;
BridgeV1TypeFun when is_function(BridgeV1TypeFun) ->
case get_confs(ActionType, ActionConf) of
{ConnectorConfig, ActionConfig} -> BridgeV1TypeFun({ConnectorConfig, ActionConfig});
undefined -> ActionType
end;
BridgeV1Type ->
BridgeV1Type
end.
get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) ->
ConnectorType = action_type_to_connector_type(ActionType),
case emqx_conf:get_raw([connectors, ConnectorType, ConnectorName], undefined) of
undefined -> undefined;
ConnectorConfig -> {ConnectorConfig, ActionConfig}
end;
get_confs(_, _) ->
undefined.
%% We need this hack because of the bugs introduced by associating v2/action/source types
%% with v1 types unconditionally, like `mongodb' being a "valid" V1 bridge type, or
%% `confluent_producer', which has no v1 equivalent....
bridge_v1_type_name(ActionTypeBin) when is_binary(ActionTypeBin) ->
bridge_v1_type_name(binary_to_existing_atom(ActionTypeBin));
bridge_v1_type_name(ActionType) ->
Module = get_action_info_module(ActionType),
case erlang:function_exported(Module, bridge_v1_type_name, 0) of
true ->
{ok, Module:bridge_v1_type_name()};
false ->
{error, no_v1_equivalent}
end.
%% This function should return true for all inputs that are bridge V1 types for
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
%% types. For everything else the function should return false.
is_action_type(Bin) when is_binary(Bin) ->
is_action_type(binary_to_existing_atom(Bin));
is_action_type(Type) ->
ActionInfoMap = info_map(),
ActionTypes = maps:get(action_type_names, ActionInfoMap),
case maps:get(Type, ActionTypes, undefined) of
undefined -> false;
_ -> true
end.
%% Returns true if the action is an ingress action, false otherwise.
is_source(Bin) when is_binary(Bin) ->
is_source(binary_to_existing_atom(Bin));
is_source(Type) ->
ActionInfoMap = info_map(),
IsSourceMap = maps:get(is_source, ActionInfoMap),
maps:get(Type, IsSourceMap, false).
%% Returns true if the action is an egress action, false otherwise.
is_action(Bin) when is_binary(Bin) ->
is_action(binary_to_existing_atom(Bin));
is_action(Type) ->
ActionInfoMap = info_map(),
IsActionMap = maps:get(is_action, ActionInfoMap),
maps:get(Type, IsActionMap, true).
registered_schema_modules_actions() ->
InfoMap = info_map(),
Schemas = maps:get(action_type_to_schema_module, InfoMap),
All = maps:to_list(Schemas),
[{Type, SchemaMod} || {Type, SchemaMod} <- All, is_action(Type)].
registered_schema_modules_sources() ->
InfoMap = info_map(),
Schemas = maps:get(action_type_to_schema_module, InfoMap),
All = maps:to_list(Schemas),
[{Type, SchemaMod} || {Type, SchemaMod} <- All, is_source(Type)].
connector_action_config_to_bridge_v1_config(ActionOrBridgeType, ConnectorConfig, ActionConfig) ->
Module = get_action_info_module(ActionOrBridgeType),
case erlang:function_exported(Module, connector_action_config_to_bridge_v1_config, 2) of
true ->
Module:connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig);
false ->
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig)
end.
action_convert_from_connector(ActionOrBridgeType, ConnectorConfig, ActionConfig) ->
Module = get_action_info_module(ActionOrBridgeType),
case erlang:function_exported(Module, action_convert_from_connector, 2) of
true ->
Module:action_convert_from_connector(ConnectorConfig, ActionConfig);
false ->
ActionConfig
end.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
Merged = emqx_utils_maps:deep_merge(
maps:without(
[<<"connector">>],
emqx_utils_maps:unindent(<<"parameters">>, ActionConfig)
),
emqx_utils_maps:unindent(<<"parameters">>, ConnectorConfig)
),
maps:without([<<"description">>], Merged).
has_custom_bridge_v1_config_to_connector_config(ActionOrBridgeType) ->
Module = get_action_info_module(ActionOrBridgeType),
erlang:function_exported(Module, bridge_v1_config_to_connector_config, 1).
bridge_v1_config_to_connector_config(ActionOrBridgeType, BridgeV1Config) ->
Module = get_action_info_module(ActionOrBridgeType),
%% should only be called if defined
Module:bridge_v1_config_to_connector_config(BridgeV1Config).
has_custom_bridge_v1_config_to_action_config(ActionOrBridgeType) ->
Module = get_action_info_module(ActionOrBridgeType),
erlang:function_exported(Module, bridge_v1_config_to_action_config, 2).
bridge_v1_config_to_action_config(ActionOrBridgeType, BridgeV1Config, ConnectorName) ->
Module = get_action_info_module(ActionOrBridgeType),
%% should only be called if defined
Module:bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName).
transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
) ->
emqx_connector_schema:transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
).
%% ====================================================================
%% Internal functions for building the info map and accessing it
%% ====================================================================
get_action_info_module(ActionOrBridgeType) ->
InfoMap = info_map(),
ActionInfoModuleMap = maps:get(action_type_to_info_module, InfoMap),
maps:get(ActionOrBridgeType, ActionInfoModuleMap, undefined).
internal_emqx_action_persistent_term_info_key() ->
?FUNCTION_NAME.
info_map() ->
case persistent_term:get(internal_emqx_action_persistent_term_info_key(), not_found) of
not_found ->
build_cache();
ActionInfoMap ->
ActionInfoMap
end.
build_cache() ->
ActionInfoModules = action_info_modules(),
ActionInfoMap =
lists:foldl(
fun(Module, InfoMapSoFar) ->
ModuleInfoMap = get_info_map(Module),
emqx_utils_maps:deep_merge(InfoMapSoFar, ModuleInfoMap)
end,
initial_info_map(),
ActionInfoModules
),
%% Update the persistent term with the new info map
persistent_term:put(internal_emqx_action_persistent_term_info_key(), ActionInfoMap),
ActionInfoMap.
clean_cache() ->
persistent_term:erase(internal_emqx_action_persistent_term_info_key()).
action_info_modules() ->
ActionInfoModules = [
action_info_modules(App)
|| {App, _, _} <- application:loaded_applications()
],
lists:usort(lists:flatten(ActionInfoModules) ++ hard_coded_action_info_modules()).
action_info_modules(App) ->
case application:get_env(App, emqx_action_info_modules) of
{ok, Modules} ->
Modules;
_ ->
[]
end.
initial_info_map() ->
#{
action_type_names => #{},
bridge_v1_type_to_action_type => #{},
action_type_to_bridge_v1_type => #{},
action_type_to_connector_type => #{},
action_type_to_schema_module => #{},
action_type_to_info_module => #{},
is_source => #{},
is_action => #{}
}.
get_info_map(Module) ->
%% Force the module to get loaded
_ = code:ensure_loaded(Module),
ActionType = Module:action_type_name(),
{BridgeV1TypeOrFun, BridgeV1Types} =
case erlang:function_exported(Module, bridge_v1_type_name, 0) of
true ->
case Module:bridge_v1_type_name() of
{_BridgeV1TypeFun, _BridgeV1Types} = BridgeV1TypeTuple ->
BridgeV1TypeTuple;
BridgeV1Type0 ->
{BridgeV1Type0, [BridgeV1Type0]}
end;
false ->
{ActionType, [ActionType]}
end,
IsIngress =
case erlang:function_exported(Module, is_source, 0) of
true ->
Module:is_source();
false ->
false
end,
IsEgress =
case erlang:function_exported(Module, is_action, 0) of
true ->
Module:is_action();
false ->
true
end,
#{
action_type_names =>
lists:foldl(
fun(BridgeV1Type, M) ->
M#{BridgeV1Type => true}
end,
#{ActionType => true},
BridgeV1Types
),
bridge_v1_type_to_action_type =>
lists:foldl(
fun(BridgeV1Type, M) ->
%% Alias the bridge V1 type to the action type
M#{BridgeV1Type => ActionType}
end,
#{ActionType => ActionType},
BridgeV1Types
),
action_type_to_bridge_v1_type => #{
ActionType => BridgeV1TypeOrFun
},
action_type_to_connector_type =>
lists:foldl(
fun(BridgeV1Type, M) ->
M#{BridgeV1Type => Module:connector_type_name()}
end,
#{ActionType => Module:connector_type_name()},
BridgeV1Types
),
action_type_to_schema_module => #{
ActionType => Module:schema_module()
},
action_type_to_info_module =>
lists:foldl(
fun(BridgeV1Type, M) ->
M#{BridgeV1Type => Module}
end,
#{ActionType => Module},
BridgeV1Types
),
is_source => #{
ActionType => IsIngress
},
is_action => #{
ActionType => IsEgress
}
}.