%%-------------------------------------------------------------------- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- %% @doc The module which knows everything about actions. %% NOTE: it does not cover the V1 bridges. -module(emqx_action_info). -export([ action_type_to_connector_type/1, action_type_to_bridge_v1_type/2, bridge_v1_type_to_action_type/1, bridge_v1_type_name/1, is_action_type/1, is_source/1, is_action/1, registered_schema_modules_actions/0, registered_schema_modules_sources/0, connector_action_config_to_bridge_v1_config/2, connector_action_config_to_bridge_v1_config/3, bridge_v1_config_to_connector_config/2, has_custom_bridge_v1_config_to_connector_config/1, bridge_v1_config_to_action_config/3, has_custom_bridge_v1_config_to_action_config/1, transform_bridge_v1_config_to_action_config/4, action_convert_from_connector/3 ]). -export([clean_cache/0]). -callback bridge_v1_type_name() -> atom() | { fun(({ActionConfig :: map(), ConnectorConfig :: map()}) -> Type :: atom()), TypeList :: [atom()] }. -callback action_type_name() -> atom(). -callback connector_type_name() -> atom(). -callback schema_module() -> atom(). %% Define this if the automatic config downgrade is not enough for the bridge. -callback connector_action_config_to_bridge_v1_config( ConnectorConfig :: map(), ActionConfig :: map() ) -> map(). %% Define this if the automatic config upgrade is not enough for the connector. -callback bridge_v1_config_to_connector_config(BridgeV1Config :: map()) -> map() | {ConnectorTypeName :: atom(), map()}. %% Define this if the automatic config upgrade is not enough for the bridge. %% If you want to make use of the automatic config upgrade, you can call %% emqx_action_info:transform_bridge_v1_config_to_action_config/4 in your %% implementation and do some adjustments on the result. -callback bridge_v1_config_to_action_config(BridgeV1Config :: map(), ConnectorName :: binary()) -> map() | {source | action, ActionTypeName :: atom(), map()} | 'none'. -callback is_source() -> boolean(). -callback is_action() -> boolean(). -optional_callbacks([ bridge_v1_type_name/0, connector_action_config_to_bridge_v1_config/2, bridge_v1_config_to_connector_config/1, bridge_v1_config_to_action_config/2, is_source/0, is_action/0 ]). %% ==================================================================== %% HardCoded list of info modules for actions %% TODO: Remove this list once we have made sure that all relevants %% apps are loaded before this module is called. %% ==================================================================== -if(?EMQX_RELEASE_EDITION == ee). hard_coded_action_info_modules_ee() -> [ emqx_bridge_azure_event_hub_action_info, emqx_bridge_confluent_producer_action_info, emqx_bridge_dynamo_action_info, emqx_bridge_gcp_pubsub_consumer_action_info, emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, emqx_bridge_kafka_consumer_action_info, emqx_bridge_kinesis_action_info, emqx_bridge_hstreamdb_action_info, emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, emqx_bridge_oracle_action_info, emqx_bridge_rocketmq_action_info, emqx_bridge_influxdb_action_info, emqx_bridge_cassandra_action_info, emqx_bridge_clickhouse_action_info, emqx_bridge_mysql_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, emqx_bridge_sqlserver_action_info, emqx_bridge_timescale_action_info, emqx_bridge_redis_action_info, emqx_bridge_iotdb_action_info, emqx_bridge_es_action_info, emqx_bridge_opents_action_info, emqx_bridge_rabbitmq_action_info, emqx_bridge_pulsar_action_info, emqx_bridge_greptimedb_action_info, emqx_bridge_tdengine_action_info, emqx_bridge_s3_action_info ]. -else. hard_coded_action_info_modules_ee() -> []. -endif. hard_coded_action_info_modules_common() -> [ emqx_bridge_http_action_info, emqx_bridge_mqtt_pubsub_action_info ]. hard_coded_action_info_modules() -> hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee(). %% ==================================================================== %% API %% ==================================================================== action_type_to_connector_type(Type) when not is_atom(Type) -> action_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type))); action_type_to_connector_type(Type) -> ActionInfoMap = info_map(), ActionTypeToConnectorTypeMap = maps:get(action_type_to_connector_type, ActionInfoMap), case maps:get(Type, ActionTypeToConnectorTypeMap, undefined) of undefined -> Type; ConnectorType -> ConnectorType end. bridge_v1_type_to_action_type(Bin) when is_binary(Bin) -> bridge_v1_type_to_action_type(binary_to_existing_atom(Bin)); bridge_v1_type_to_action_type(Type) -> ActionInfoMap = info_map(), BridgeV1TypeToActionType = maps:get(bridge_v1_type_to_action_type, ActionInfoMap), case maps:get(Type, BridgeV1TypeToActionType, undefined) of undefined -> Type; ActionType -> ActionType end. action_type_to_bridge_v1_type(Bin, Conf) when is_binary(Bin) -> action_type_to_bridge_v1_type(binary_to_existing_atom(Bin), Conf); action_type_to_bridge_v1_type(ActionType, ActionConf) -> ActionInfoMap = info_map(), ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap), case maps:get(ActionType, ActionTypeToBridgeV1Type, undefined) of undefined -> ActionType; BridgeV1TypeFun when is_function(BridgeV1TypeFun) -> case get_confs(ActionType, ActionConf) of {ConnectorConfig, ActionConfig} -> BridgeV1TypeFun({ConnectorConfig, ActionConfig}); undefined -> ActionType end; BridgeV1Type -> BridgeV1Type end. get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) -> ConnectorType = action_type_to_connector_type(ActionType), case emqx_conf:get_raw([connectors, ConnectorType, ConnectorName], undefined) of undefined -> undefined; ConnectorConfig -> {ConnectorConfig, ActionConfig} end; get_confs(_, _) -> undefined. %% We need this hack because of the bugs introduced by associating v2/action/source types %% with v1 types unconditionally, like `mongodb' being a "valid" V1 bridge type, or %% `confluent_producer', which has no v1 equivalent.... bridge_v1_type_name(ActionTypeBin) when is_binary(ActionTypeBin) -> bridge_v1_type_name(binary_to_existing_atom(ActionTypeBin)); bridge_v1_type_name(ActionType) -> Module = get_action_info_module(ActionType), case erlang:function_exported(Module, bridge_v1_type_name, 0) of true -> {ok, Module:bridge_v1_type_name()}; false -> {error, no_v1_equivalent} end. %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 %% types. For everything else the function should return false. is_action_type(Bin) when is_binary(Bin) -> is_action_type(binary_to_existing_atom(Bin)); is_action_type(Type) -> ActionInfoMap = info_map(), ActionTypes = maps:get(action_type_names, ActionInfoMap), case maps:get(Type, ActionTypes, undefined) of undefined -> false; _ -> true end. %% Returns true if the action is an ingress action, false otherwise. is_source(Bin) when is_binary(Bin) -> is_source(binary_to_existing_atom(Bin)); is_source(Type) -> ActionInfoMap = info_map(), IsSourceMap = maps:get(is_source, ActionInfoMap), maps:get(Type, IsSourceMap, false). %% Returns true if the action is an egress action, false otherwise. is_action(Bin) when is_binary(Bin) -> is_action(binary_to_existing_atom(Bin)); is_action(Type) -> ActionInfoMap = info_map(), IsActionMap = maps:get(is_action, ActionInfoMap), maps:get(Type, IsActionMap, true). registered_schema_modules_actions() -> InfoMap = info_map(), Schemas = maps:get(action_type_to_schema_module, InfoMap), All = maps:to_list(Schemas), [{Type, SchemaMod} || {Type, SchemaMod} <- All, is_action(Type)]. registered_schema_modules_sources() -> InfoMap = info_map(), Schemas = maps:get(action_type_to_schema_module, InfoMap), All = maps:to_list(Schemas), [{Type, SchemaMod} || {Type, SchemaMod} <- All, is_source(Type)]. connector_action_config_to_bridge_v1_config(ActionOrBridgeType, ConnectorConfig, ActionConfig) -> Module = get_action_info_module(ActionOrBridgeType), case erlang:function_exported(Module, connector_action_config_to_bridge_v1_config, 2) of true -> Module:connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig); false -> connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) end. action_convert_from_connector(ActionOrBridgeType, ConnectorConfig, ActionConfig) -> Module = get_action_info_module(ActionOrBridgeType), case erlang:function_exported(Module, action_convert_from_connector, 2) of true -> Module:action_convert_from_connector(ConnectorConfig, ActionConfig); false -> ActionConfig end. connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> Merged = emqx_utils_maps:deep_merge( maps:without( [<<"connector">>], emqx_utils_maps:unindent(<<"parameters">>, ActionConfig) ), emqx_utils_maps:unindent(<<"parameters">>, ConnectorConfig) ), maps:without([<<"description">>], Merged). has_custom_bridge_v1_config_to_connector_config(ActionOrBridgeType) -> Module = get_action_info_module(ActionOrBridgeType), erlang:function_exported(Module, bridge_v1_config_to_connector_config, 1). bridge_v1_config_to_connector_config(ActionOrBridgeType, BridgeV1Config) -> Module = get_action_info_module(ActionOrBridgeType), %% should only be called if defined Module:bridge_v1_config_to_connector_config(BridgeV1Config). has_custom_bridge_v1_config_to_action_config(ActionOrBridgeType) -> Module = get_action_info_module(ActionOrBridgeType), erlang:function_exported(Module, bridge_v1_config_to_action_config, 2). bridge_v1_config_to_action_config(ActionOrBridgeType, BridgeV1Config, ConnectorName) -> Module = get_action_info_module(ActionOrBridgeType), %% should only be called if defined Module:bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName). transform_bridge_v1_config_to_action_config( BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName ) -> emqx_connector_schema:transform_bridge_v1_config_to_action_config( BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName ). %% ==================================================================== %% Internal functions for building the info map and accessing it %% ==================================================================== get_action_info_module(ActionOrBridgeType) -> InfoMap = info_map(), ActionInfoModuleMap = maps:get(action_type_to_info_module, InfoMap), maps:get(ActionOrBridgeType, ActionInfoModuleMap, undefined). internal_emqx_action_persistent_term_info_key() -> ?FUNCTION_NAME. info_map() -> case persistent_term:get(internal_emqx_action_persistent_term_info_key(), not_found) of not_found -> build_cache(); ActionInfoMap -> ActionInfoMap end. build_cache() -> ActionInfoModules = action_info_modules(), ActionInfoMap = lists:foldl( fun(Module, InfoMapSoFar) -> ModuleInfoMap = get_info_map(Module), emqx_utils_maps:deep_merge(InfoMapSoFar, ModuleInfoMap) end, initial_info_map(), ActionInfoModules ), %% Update the persistent term with the new info map persistent_term:put(internal_emqx_action_persistent_term_info_key(), ActionInfoMap), ActionInfoMap. clean_cache() -> persistent_term:erase(internal_emqx_action_persistent_term_info_key()). action_info_modules() -> ActionInfoModules = [ action_info_modules(App) || {App, _, _} <- application:loaded_applications() ], lists:usort(lists:flatten(ActionInfoModules) ++ hard_coded_action_info_modules()). action_info_modules(App) -> case application:get_env(App, emqx_action_info_modules) of {ok, Modules} -> Modules; _ -> [] end. initial_info_map() -> #{ action_type_names => #{}, bridge_v1_type_to_action_type => #{}, action_type_to_bridge_v1_type => #{}, action_type_to_connector_type => #{}, action_type_to_schema_module => #{}, action_type_to_info_module => #{}, is_source => #{}, is_action => #{} }. get_info_map(Module) -> %% Force the module to get loaded _ = code:ensure_loaded(Module), ActionType = Module:action_type_name(), {BridgeV1TypeOrFun, BridgeV1Types} = case erlang:function_exported(Module, bridge_v1_type_name, 0) of true -> case Module:bridge_v1_type_name() of {_BridgeV1TypeFun, _BridgeV1Types} = BridgeV1TypeTuple -> BridgeV1TypeTuple; BridgeV1Type0 -> {BridgeV1Type0, [BridgeV1Type0]} end; false -> {ActionType, [ActionType]} end, IsIngress = case erlang:function_exported(Module, is_source, 0) of true -> Module:is_source(); false -> false end, IsEgress = case erlang:function_exported(Module, is_action, 0) of true -> Module:is_action(); false -> true end, #{ action_type_names => lists:foldl( fun(BridgeV1Type, M) -> M#{BridgeV1Type => true} end, #{ActionType => true}, BridgeV1Types ), bridge_v1_type_to_action_type => lists:foldl( fun(BridgeV1Type, M) -> %% Alias the bridge V1 type to the action type M#{BridgeV1Type => ActionType} end, #{ActionType => ActionType}, BridgeV1Types ), action_type_to_bridge_v1_type => #{ ActionType => BridgeV1TypeOrFun }, action_type_to_connector_type => lists:foldl( fun(BridgeV1Type, M) -> M#{BridgeV1Type => Module:connector_type_name()} end, #{ActionType => Module:connector_type_name()}, BridgeV1Types ), action_type_to_schema_module => #{ ActionType => Module:schema_module() }, action_type_to_info_module => lists:foldl( fun(BridgeV1Type, M) -> M#{BridgeV1Type => Module} end, #{ActionType => Module}, BridgeV1Types ), is_source => #{ ActionType => IsIngress }, is_action => #{ ActionType => IsEgress } }.