From 5e8e40701735838a84ef11987b624d7eb45b118e Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 9 Nov 2023 13:37:53 +0100 Subject: [PATCH] refactor: action schema retrival after PR feedback --- .../include/emqx_bridge_v2_register.hrl | 169 ------------- apps/emqx_bridge/src/emqx_action_info.erl | 225 ++++++++++++++++++ apps/emqx_bridge/src/emqx_bridge_v2.erl | 135 +---------- .../src/schema/emqx_bridge_v2_schema.erl | 8 +- .../src/emqx_bridge_kafka.app.src | 2 +- .../src/emqx_bridge_kafka.erl | 20 +- .../src/emqx_bridge_kafka_action_info.erl | 18 ++ apps/emqx_conf/src/emqx_conf.erl | 3 + 8 files changed, 258 insertions(+), 322 deletions(-) delete mode 100644 apps/emqx_bridge/include/emqx_bridge_v2_register.hrl create mode 100644 apps/emqx_bridge/src/emqx_action_info.erl create mode 100644 apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl diff --git a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl deleted file mode 100644 index 9c0360c56..000000000 --- a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl +++ /dev/null @@ -1,169 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% This function is called to register a bridge V2. It should be called before -%% the system boots in a function triggered by an -on_load() directive -%% since it should be called before the system boots because the config -%% system depends on that. -%% -%% It is placed in an hrl file instead of in emqx_bridge_v2.erl because emqx_bridge_v2 -%% might not be loaded when the bridge module is loaded. --spec emqx_bridge_v2_register_bridge_type(#{ - %% Should be provided by all bridges. Even if the bridge_v2_type_name is - %% the same as the bridge_v1_type_named. - 'bridge_v1_type_name' := atom(), - 'bridge_v2_type_name' := atom(), - 'connector_type' := atom(), - 'schema_module' := atom(), - 'schema_struct_field' := atom() | binary() -}) -> ok. -emqx_bridge_v2_register_bridge_type(BridgeTypeInfo) -> - try - %% We must prevent overwriting so we take a lock when writing to persistent_term - global:set_lock( - { - internal_emqx_bridge_v2_persistent_term_info_key(), - internal_emqx_bridge_v2_persistent_term_info_key() - }, - [node()], - infinity - ), - internal_maybe_create_initial_bridge_v2_info_map(), - internal_register_bridge_type_with_lock(BridgeTypeInfo) - catch - ErrorType:Reason:Stacktrace -> - %% Print the error on standard output as logger might not be - %% started yet - io:format("~p~n", [ - #{ - 'error_type' => ErrorType, - 'reason' => Reason, - 'stacktrace' => Stacktrace, - 'msg' => "Failed to register bridge V2 type" - } - ]), - erlang:raise(ErrorType, Reason, Stacktrace) - after - global:del_lock( - { - internal_emqx_bridge_v2_persistent_term_info_key(), - internal_emqx_bridge_v2_persistent_term_info_key() - }, - [node()] - ) - end, - ok. - -internal_register_bridge_type_with_lock(BridgeTypeInfo) -> - InfoMap0 = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - %% The Bridge V1 type is also a bridge V2 type due to backwards compatibility - InfoMap1 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_names, - maps:get(bridge_v1_type_name, BridgeTypeInfo) - ], - InfoMap0, - true - ), - InfoMap2 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_names, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ], - InfoMap1, - true - ), - InfoMap3 = emqx_utils_maps:deep_force_put( - [ - bridge_v1_type_to_bridge_v2_type, - maps:get(bridge_v1_type_name, BridgeTypeInfo) - ], - InfoMap2, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ), - %% Backwards compatibility - InfoMap4 = emqx_utils_maps:deep_force_put( - [ - bridge_v1_type_to_bridge_v2_type, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ], - InfoMap3, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ), - InfoMap5 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_to_connector_type, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ], - InfoMap4, - maps:get(connector_type, BridgeTypeInfo) - ), - %% Backwards compatibility - InfoMap6 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_to_connector_type, - maps:get(bridge_v1_type_name, BridgeTypeInfo) - ], - InfoMap5, - maps:get(connector_type, BridgeTypeInfo) - ), - InfoMap7 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_to_schema_module, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ], - InfoMap6, - maps:get(schema_module, BridgeTypeInfo) - ), - InfoMap8 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_to_schema_struct_field, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ], - InfoMap7, - maps:get(schema_struct_field, BridgeTypeInfo) - ), - InfoMap9 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_to_bridge_v1_type, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ], - InfoMap8, - maps:get(bridge_v1_type_name, BridgeTypeInfo) - ), - ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap9). - -internal_maybe_create_initial_bridge_v2_info_map() -> - case persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key(), undefined) of - undefined -> - ok = persistent_term:put( - internal_emqx_bridge_v2_persistent_term_info_key(), - #{ - bridge_v2_type_names => #{}, - bridge_v1_type_to_bridge_v2_type => #{}, - bridge_v2_type_to_bridge_v1_type => #{}, - bridge_v2_type_to_connector_type => #{}, - bridge_v2_type_to_schema_module => #{}, - bridge_v2_type_to_schema_struct_field => #{} - } - ), - ok; - _ -> - ok - end. - -internal_emqx_bridge_v2_persistent_term_info_key() -> - ?FUNCTION_NAME. diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl new file mode 100644 index 000000000..b4b6ed88e --- /dev/null +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -0,0 +1,225 @@ +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc The module which knows everything about actions. + +%% NOTE: it does not cover the V1 bridges. + +-module(emqx_action_info). + +-export([ + action_type_to_connector_type/1, + action_type_to_bridge_v1_type/1, + bridge_v1_type_to_action_type/1, + is_action_type/1, + registered_schema_modules/0 +]). + +-callback bridge_v1_type_name() -> atom(). +-callback action_type_name() -> atom(). +-callback connector_type_name() -> atom(). +-callback schema_module() -> atom(). + +-optional_callbacks([bridge_v1_type_name/0]). + +%% ==================================================================== +%% Hadcoded list of info modules for actions +%% TODO: Remove this list once we have made sure that all relevants +%% apps are loaded before this module is called. +%% ==================================================================== + +-if(?EMQX_RELEASE_EDITION == ee). +hard_coded_action_info_modules_ee() -> + [emqx_bridge_kafka_action_info]. +-else. +hard_coded_action_info_modules_ee() -> + []. +-endif. + +hard_coded_action_info_modules_common() -> + []. + +hard_coded_action_info_modules() -> + hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee(). + +%% ==================================================================== +%% API +%% ==================================================================== + +action_type_to_connector_type(Type) when not is_atom(Type) -> + action_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type))); +action_type_to_connector_type(Type) -> + ActionInfoMap = info_map(), + ActionTypeToConnectorTypeMap = maps:get(action_type_to_connector_type, ActionInfoMap), + case maps:get(Type, ActionTypeToConnectorTypeMap, undefined) of + undefined -> action_type_to_connector_type_old(Type); + ConnectorType -> ConnectorType + end. + +% action_type_to_connector_type_old(kafka) -> +% %% backward compatible +% kafka_producer; +% action_type_to_connector_type_old(kafka_producer) -> +% kafka_producer; +action_type_to_connector_type_old(azure_event_hub_producer) -> + azure_event_hub_producer. + +bridge_v1_type_to_action_type(Bin) when is_binary(Bin) -> + bridge_v1_type_to_action_type(binary_to_existing_atom(Bin)); +bridge_v1_type_to_action_type(Type) -> + ActionInfoMap = info_map(), + BridgeV1TypeToActionType = maps:get(bridge_v1_type_to_action_type, ActionInfoMap), + case maps:get(Type, BridgeV1TypeToActionType, undefined) of + undefined -> bridge_v1_type_to_action_type_old(Type); + ActionType -> ActionType + end. + +% bridge_v1_type_to_action_type_old(kafka) -> +% kafka_producer; +% bridge_v1_type_to_action_type_old(kafka_producer) -> +% kafka_producer; +bridge_v1_type_to_action_type_old(azure_event_hub_producer) -> + azure_event_hub_producer; +bridge_v1_type_to_action_type_old(Type) -> + Type. + +action_type_to_bridge_v1_type(Bin) when is_binary(Bin) -> + action_type_to_bridge_v1_type(binary_to_existing_atom(Bin)); +action_type_to_bridge_v1_type(Type) -> + ActionInfoMap = info_map(), + ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap), + case maps:get(Type, ActionTypeToBridgeV1Type, undefined) of + undefined -> action_type_to_bridge_v1_type_old(Type); + BridgeV1Type -> BridgeV1Type + end. + +action_type_to_bridge_v1_type_old(azure_event_hub_producer) -> + azure_event_hub_producer; +action_type_to_bridge_v1_type_old(Type) -> + Type. + +%% This function should return true for all inputs that are bridge V1 types for +%% bridges that have been refactored to bridge V2s, and for all all bridge V2 +%% types. For everything else the function should return false. +is_action_type(Bin) when is_binary(Bin) -> + is_action_type(binary_to_existing_atom(Bin)); +is_action_type(Type) -> + ActionInfoMap = info_map(), + ActionTypes = maps:get(action_type_names, ActionInfoMap), + case maps:get(Type, ActionTypes, undefined) of + undefined -> is_action_type_old(Type); + _ -> true + end. + +% is_action_type_old(kafka_producer) -> +% true; +% is_action_type_old(kafka) -> +% true; +is_action_type_old(azure_event_hub_producer) -> + true; +is_action_type_old(_) -> + false. + +registered_schema_modules() -> + InfoMap = info_map(), + Schemas = maps:get(action_type_to_schema_module, InfoMap), + maps:to_list(Schemas). + +%% ==================================================================== +%% Internal functions for building the info map and accessing it +%% ==================================================================== + +internal_emqx_action_persistent_term_info_key() -> + ?FUNCTION_NAME. + +info_map() -> + case persistent_term:get(internal_emqx_action_persistent_term_info_key(), not_found) of + not_found -> + build_cache(); + ActionInfoMap -> + ActionInfoMap + end. + +build_cache() -> + ActionInfoModules = action_info_modules(), + ActionInfoMap = + lists:foldl( + fun(Module, InfoMapSoFar) -> + ModuleInfoMap = get_info_map(Module), + emqx_utils_maps:deep_merge(InfoMapSoFar, ModuleInfoMap) + end, + initial_info_map(), + ActionInfoModules + ), + %% Update the persistent term with the new info map + persistent_term:put(internal_emqx_action_persistent_term_info_key(), ActionInfoMap), + ActionInfoMap. + +action_info_modules() -> + ActionInfoModules = [ + action_info_modules(App) + || {App, _, _} <- application:loaded_applications() + ], + lists:usort(lists:flatten(ActionInfoModules) ++ hard_coded_action_info_modules()). + +action_info_modules(App) -> + case application:get_env(App, emqx, action_info_module) of + {ok, Module} -> + [Module]; + _ -> + [] + end. + +initial_info_map() -> + #{ + action_type_names => #{}, + bridge_v1_type_to_action_type => #{}, + action_type_to_bridge_v1_type => #{}, + action_type_to_connector_type => #{}, + action_type_to_schema_module => #{} + }. + +get_info_map(Module) -> + %% Force the module to get loaded + _ = code:ensure_loaded(Module), + ActionType = Module:action_type_name(), + BridgeV1Type = + case erlang:function_exported(Module, bridge_v1_type_name, 0) of + true -> + Module:bridge_v1_type_name(); + false -> + Module:action_type_name() + end, + #{ + action_type_names => #{ + ActionType => true, + BridgeV1Type => true + }, + bridge_v1_type_to_action_type => #{ + BridgeV1Type => ActionType, + %% Alias the bridge V1 type to the action type + ActionType => ActionType + }, + action_type_to_bridge_v1_type => #{ + ActionType => BridgeV1Type + }, + action_type_to_connector_type => #{ + ActionType => Module:connector_type_name(), + %% Alias the bridge V1 type to the action type + BridgeV1Type => Module:connector_type_name() + }, + action_type_to_schema_module => #{ + ActionType => Module:schema_module() + } + }. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 6406f7df2..77ef8bef3 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -18,15 +18,6 @@ -behaviour(emqx_config_handler). -behaviour(emqx_config_backup). --compile([ - {nowarn_unused_function, [ - {internal_register_bridge_type_with_lock, 1}, - {emqx_bridge_v2_register_bridge_type, 1} - ]} -]). --ignore_xref(emqx_bridge_v2_register_bridge_type/1). --ignore_xref(internal_register_bridge_type_with_lock/1). - -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). @@ -37,15 +28,6 @@ %% refactored into a new module/application with appropriate name. -define(ROOT_KEY, actions). --on_load(on_load/0). - -%% Getting registered bridge schemas: - --export([ - bridge_v2_type_to_schame_struct_field/1, - registered_schema_modules/0 -]). - %% Loading and unloading config when EMQX starts and stops -export([ load/0, @@ -158,53 +140,6 @@ %%==================================================================== --include_lib("emqx_bridge/include/emqx_bridge_v2_register.hrl"). - -on_load() -> - try - %% We must prevent overwriting so we take a lock when writing to persistent_term - global:set_lock( - { - internal_emqx_bridge_v2_persistent_term_info_key(), - internal_emqx_bridge_v2_persistent_term_info_key() - }, - [node()], - infinity - ), - internal_maybe_create_initial_bridge_v2_info_map() - catch - ErrorType:Reason:Stacktrace -> - %% Logger may not be started so print to stdout - io:format("~p~n", [ - #{ - 'error_type' => ErrorType, - 'reason' => Reason, - 'stacktrace' => Stacktrace, - 'msg' => "Failed to register bridge V2 type" - } - ]), - erlang:raise(ErrorType, Reason, Stacktrace) - after - global:del_lock( - { - internal_emqx_bridge_v2_persistent_term_info_key(), - internal_emqx_bridge_v2_persistent_term_info_key() - }, - [node()] - ) - end, - ok. - -bridge_v2_type_to_schame_struct_field(BridgeV2Type) -> - InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - Map = maps:get(bridge_v2_type_to_schema_struct_field, InfoMap), - maps:get(BridgeV2Type, Map). - -registered_schema_modules() -> - InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - Schemas = maps:get(bridge_v2_type_to_schema_module, InfoMap), - maps:to_list(Schemas). - %%==================================================================== %% Loading and unloading config when EMQX starts and stops %%==================================================================== @@ -891,23 +826,8 @@ connector_type(Type) -> %% remote call so it can be mocked ?MODULE:bridge_v2_type_to_connector_type(Type). -bridge_v2_type_to_connector_type(Type) when not is_atom(Type) -> - bridge_v2_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type))); bridge_v2_type_to_connector_type(Type) -> - BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - BridgeV2TypeToConnectorTypeMap = maps:get(bridge_v2_type_to_connector_type, BridgeV2InfoMap), - case maps:get(Type, BridgeV2TypeToConnectorTypeMap, undefined) of - undefined -> bridge_v2_type_to_connector_type_old(Type); - ConnectorType -> ConnectorType - end. - -% bridge_v2_type_to_connector_type_old(kafka) -> -% %% backward compatible -% kafka_producer; -% bridge_v2_type_to_connector_type_old(kafka_producer) -> -% kafka_producer; -bridge_v2_type_to_connector_type_old(azure_event_hub_producer) -> - azure_event_hub_producer. + emqx_action_info:action_type_to_connector_type(Type). %%==================================================================== %% Data backup API @@ -1127,61 +1047,14 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) -> end end. -bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) -> - ?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin)); bridge_v1_type_to_bridge_v2_type(Type) -> - BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - BridgeV1TypeToBridgeV2Type = maps:get(bridge_v1_type_to_bridge_v2_type, BridgeV2InfoMap), - case maps:get(Type, BridgeV1TypeToBridgeV2Type, undefined) of - undefined -> bridge_v1_type_to_bridge_v2_type_old(Type); - BridgeV2Type -> BridgeV2Type - end. + emqx_action_info:bridge_v1_type_to_action_type(Type). -% bridge_v1_type_to_bridge_v2_type_old(kafka) -> -% kafka_producer; -% bridge_v1_type_to_bridge_v2_type_old(kafka_producer) -> -% kafka_producer; -bridge_v1_type_to_bridge_v2_type_old(azure_event_hub_producer) -> - azure_event_hub_producer; -bridge_v1_type_to_bridge_v2_type_old(Type) -> - Type. - -bridge_v2_type_to_bridge_v1_type(Bin) when is_binary(Bin) -> - ?MODULE:bridge_v2_type_to_bridge_v1_type(binary_to_existing_atom(Bin)); bridge_v2_type_to_bridge_v1_type(Type) -> - BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - BridgeV2TypeToBridgeV1Type = maps:get(bridge_v2_type_to_bridge_v1_type, BridgeV2InfoMap), - case maps:get(Type, BridgeV2TypeToBridgeV1Type, undefined) of - undefined -> bridge_v2_type_to_bridge_v1_type_old(Type); - BridgeV1Type -> BridgeV1Type - end. + emqx_action_info:action_type_to_bridge_v1_type(Type). -bridge_v2_type_to_bridge_v1_type_old(azure_event_hub_producer) -> - azure_event_hub_producer; -bridge_v2_type_to_bridge_v1_type_old(Type) -> - Type. - -%% This function should return true for all inputs that are bridge V1 types for -%% bridges that have been refactored to bridge V2s, and for all all bridge V2 -%% types. For everything else the function should return false. -is_bridge_v2_type(Bin) when is_binary(Bin) -> - is_bridge_v2_type(binary_to_existing_atom(Bin)); is_bridge_v2_type(Type) -> - BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - BridgeV2Types = maps:get(bridge_v2_type_names, BridgeV2InfoMap), - case maps:get(Type, BridgeV2Types, undefined) of - undefined -> is_bridge_v2_type_old(Type); - _ -> true - end. - -% is_bridge_v2_type_old(kafka_producer) -> -% true; -% is_bridge_v2_type_old(kafka) -> -% true; -is_bridge_v2_type_old(azure_event_hub_producer) -> - true; -is_bridge_v2_type_old(_) -> - false. + emqx_action_info:is_action_type(Type). bridge_v1_list_and_transform() -> Bridges = list_with_lookup_fun(fun bridge_v1_lookup_and_transform/2), diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index c380ac933..65250ad40 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -93,7 +93,7 @@ registered_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use %% `call_hocon' from `nodetool' to generate initial configurations. _ = emqx_bridge_v2:module_info(), - RegistredSchmeas = emqx_bridge_v2:registered_schema_modules(), + RegistredSchmeas = emqx_action_info:registered_schema_modules(), [ api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2") || {BridgeV2Type, SchemaModule} <- RegistredSchmeas @@ -156,8 +156,8 @@ fields(actions) -> registered_schema_fields() -> [ - Module:fields(emqx_bridge_v2:bridge_v2_type_to_schame_struct_field(BridgeV2Type)) - || {BridgeV2Type, Module} <- emqx_bridge_v2:registered_schema_modules() + Module:fields(action) + || {_BridgeV2Type, Module} <- emqx_action_info:registered_schema_modules() ]. desc(actions) -> @@ -183,7 +183,7 @@ examples(Method) -> ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]), lists:foldl(MergeFun, Examples, ConnectorExamples) end, - SchemaModules = [Mod || {_, Mod} <- emqx_bridge_v2:registered_schema_modules()], + SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules()], lists:foldl(Fun, #{}, SchemaModules). -ifdef(TEST). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 88fa6b7bd..ee468249f 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -12,7 +12,7 @@ brod, brod_gssapi ]}, - {env, []}, + {env, [{emqx_action_info_module, emqx_bridge_kafka_action_info}]}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 83ecf7b05..3f270d921 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -16,8 +16,6 @@ ]). -import(hoconsc, [mk/2, enum/1, ref/2]). --on_load(register_bridge_v2/0). - -export([ bridge_v2_examples/1, conn_bridge_examples/1, @@ -537,7 +535,9 @@ fields(bridge_v2_field) -> desc => <<"Kafka Producer Bridge V2 Config">>, required => false } - )}. + )}; +fields(action) -> + fields(bridge_v2_field). desc("config_connector") -> ?DESC("desc_config"); @@ -725,17 +725,3 @@ kafka_ext_header_value_validator(Value) -> "placeholder like ${foo}, or a simple string." } end. - --include_lib("emqx_bridge/include/emqx_bridge_v2_register.hrl"). - -register_bridge_v2() -> - emqx_bridge_v2_register_bridge_type(#{ - %% Should be provided by all bridges. Even if the bridge_v2_type_name is - %% the same as the bridge_v1_type_named. - 'bridge_v1_type_name' => kafka, - 'bridge_v2_type_name' => kafka_producer, - 'connector_type' => kafka_producer, - 'schema_module' => ?MODULE, - 'schema_struct_field' => bridge_v2_field - }), - ok. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl new file mode 100644 index 000000000..82fe73978 --- /dev/null +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl @@ -0,0 +1,18 @@ +-module(emqx_bridge_kafka_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> kafka. + +action_type_name() -> kafka_producer. + +connector_type_name() -> kafka_producer. + +schema_module() -> emqx_bridge_kafka. diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 78a39f5dd..4e00c1c57 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -151,6 +151,9 @@ reset(Node, KeyPath, Opts) -> %% @doc Called from build script. %% TODO: move to a external escript after all refactoring is done dump_schema(Dir, SchemaModule) -> + %% TODO: Load all apps instead of only emqx_dashboard + %% as this will help schemas that searches for apps with + %% relevant schema definitions _ = application:load(emqx_dashboard), ok = emqx_dashboard_desc_cache:init(), lists:foreach(