diff --git a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl new file mode 100644 index 000000000..79e009fe9 --- /dev/null +++ b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl @@ -0,0 +1,136 @@ +%% This function is called to register a bridge V2. It should be called before +%% the system boots in a function triggered by an -on_load() directive +%% since it should be called before the system boots because the config +%% system depends on that. +%% +%% It is placed in an hrl file instead of in emqx_bridge_v2.erl because emqx_bridge_v2 +%% might not be loaded when the bridge module is loaded. +-spec emqx_bridge_v2_register_bridge_type(#{ + %% Should be provided by all bridges. Even if the bridge_v2_type_name is + %% the same as the bridge_v1_type_named. + 'bridge_v1_type_name' := atom(), + 'bridge_v2_type_name' := atom(), + 'connector_type' := atom(), + 'schema_module' := atom(), + 'schema_struct_field' := atom() | binary() +}) -> ok. +emqx_bridge_v2_register_bridge_type(BridgeTypeInfo) -> + try + %% We must prevent overwriting so we take a lock when writing to persistent_term + global:set_lock( + { + internal_emqx_bridge_v2_persistent_term_info_key(), + internal_emqx_bridge_v2_persistent_term_info_key() + }, + [node()], + infinity + ), + internal_maybe_create_initial_bridge_v2_info_map(), + internal_register_bridge_type_with_lock(BridgeTypeInfo) + catch + ErrorType:Reason:Stacktrace -> + %% Print the error on standard output as logger might not be + %% started yet + io:format("~p~n", [ + #{ + 'error_type' => ErrorType, + 'reason' => Reason, + 'stacktrace' => Stacktrace, + 'msg' => "Failed to register bridge V2 type" + } + ]), + erlang:raise(ErrorType, Reason, Stacktrace) + after + global:del_lock( + { + internal_emqx_bridge_v2_persistent_term_info_key(), + internal_emqx_bridge_v2_persistent_term_info_key() + }, + [node()] + ) + end, + ok. + +internal_register_bridge_type_with_lock(BridgeTypeInfo) -> + InfoMap0 = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + %% The Bridge V1 type is also a bridge V2 type due to backwards compatibility + InfoMap1 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_names, + maps:get(bridge_v1_type_name, BridgeTypeInfo) + ], + InfoMap0, + true + ), + InfoMap2 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_names, + maps:get(bridge_v2_type_name, BridgeTypeInfo) + ], + InfoMap1, + true + ), + InfoMap3 = emqx_utils_maps:deep_force_put( + [ + bridge_v1_type_to_bridge_v2_type, + maps:get(bridge_v1_type_name, BridgeTypeInfo) + ], + InfoMap2, + maps:get(bridge_v2_type_name, BridgeTypeInfo) + ), + InfoMap4 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_to_connector_type, + maps:get(bridge_v2_type_name, BridgeTypeInfo) + ], + InfoMap3, + maps:get(connector_type, BridgeTypeInfo) + ), + %% Backwards compatibility + InfoMap5 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_to_connector_type, + maps:get(bridge_v1_type_name, BridgeTypeInfo) + ], + InfoMap4, + maps:get(connector_type, BridgeTypeInfo) + ), + InfoMap6 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_to_schema_module, + maps:get(bridge_v2_type_name, BridgeTypeInfo) + ], + InfoMap5, + maps:get(schema_module, BridgeTypeInfo) + ), + InfoMap7 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_to_schema_struct_field, + maps:get(bridge_v2_type_name, BridgeTypeInfo) + ], + InfoMap6, + maps:get(schema_struct_field, BridgeTypeInfo) + ), + + ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap7). + +internal_maybe_create_initial_bridge_v2_info_map() -> + case persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key(), undefined) of + undefined -> + ok = persistent_term:put( + internal_emqx_bridge_v2_persistent_term_info_key(), + #{ + bridge_v2_type_names => #{}, + bridge_v1_type_to_bridge_v2_type => #{}, + bridge_v2_type_to_connector_type => #{}, + bridge_v2_type_to_schema_module => #{}, + bridge_v2_type_to_schema_struct_field => #{} + } + ), + ok; + _ -> + ok + end. + +internal_emqx_bridge_v2_persistent_term_info_key() -> + ?FUNCTION_NAME. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 7a74ab438..4989cbae2 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -18,6 +18,13 @@ -behaviour(emqx_config_handler). -behaviour(emqx_config_backup). +-compile([ + {nowarn_unused_function, [ + {internal_register_bridge_type_with_lock, 1}, + {emqx_bridge_v2_register_bridge_type, 1} + ]} +]). + -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). @@ -28,6 +35,15 @@ %% refactored into a new module/application with appropriate name. -define(ROOT_KEY, actions). +-on_load(on_load/0). + +%% Getting registered bridge schemas: + +-export([ + bridge_v2_type_to_schame_stuct_field/1, + registered_schema_modules/0 +]). + %% Loading and unloading config when EMQX starts and stops -export([ load/0, @@ -134,6 +150,55 @@ -type bridge_v2_type() :: binary() | atom(). -type bridge_v2_name() :: binary() | atom(). +%%==================================================================== + +%%==================================================================== + +-include_lib("emqx_bridge/include/emqx_bridge_v2_register.hrl"). + +on_load() -> + try + %% We must prevent overwriting so we take a lock when writing to persistent_term + global:set_lock( + { + internal_emqx_bridge_v2_persistent_term_info_key(), + internal_emqx_bridge_v2_persistent_term_info_key() + }, + [node()], + infinity + ), + internal_maybe_create_initial_bridge_v2_info_map() + catch + ErrorType:Reason:Stacktrace -> + %% Logger may not be started so print to stdout + io:format("~p~n", #{ + 'error_type' => ErrorType, + 'reason' => Reason, + 'stacktrace' => Stacktrace, + 'msg' => "Failed to register bridge V2 type" + }), + erlang:raise(ErrorType, Reason, Stacktrace) + after + global:del_lock( + { + internal_emqx_bridge_v2_persistent_term_info_key(), + internal_emqx_bridge_v2_persistent_term_info_key() + }, + [node()] + ) + end, + ok. + +bridge_v2_type_to_schame_stuct_field(BridgeV2Type) -> + InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + Map = maps:get(bridge_v2_type_to_schema_struct_field, InfoMap), + maps:get(BridgeV2Type, Map). + +registered_schema_modules() -> + InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + Schemas = maps:get(bridge_v2_type_to_schema_module, InfoMap), + maps:to_list(Schemas). + %%==================================================================== %% Loading and unloading config when EMQX starts and stops %%==================================================================== @@ -822,12 +887,20 @@ connector_type(Type) -> bridge_v2_type_to_connector_type(Type) when not is_atom(Type) -> bridge_v2_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type))); -bridge_v2_type_to_connector_type(kafka) -> - %% backward compatible - kafka_producer; -bridge_v2_type_to_connector_type(kafka_producer) -> - kafka_producer; -bridge_v2_type_to_connector_type(azure_event_hub_producer) -> +bridge_v2_type_to_connector_type(Type) -> + BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + BridgeV2TypeToConnectorTypeMap = maps:get(bridge_v2_type_to_connector_type, BridgeV2InfoMap), + case maps:get(Type, BridgeV2TypeToConnectorTypeMap, undefined) of + undefined -> bridge_v2_type_to_connector_type_old(Type); + ConnectorType -> ConnectorType + end. + +% bridge_v2_type_to_connector_type_old(kafka) -> +% %% backward compatible +% kafka_producer; +% bridge_v2_type_to_connector_type_old(kafka_producer) -> +% kafka_producer; +bridge_v2_type_to_connector_type_old(azure_event_hub_producer) -> azure_event_hub_producer. %%==================================================================== @@ -1050,25 +1123,41 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) -> bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) -> ?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin)); -bridge_v1_type_to_bridge_v2_type(kafka) -> - kafka_producer; -bridge_v1_type_to_bridge_v2_type(kafka_producer) -> - kafka_producer; -bridge_v1_type_to_bridge_v2_type(azure_event_hub_producer) -> +bridge_v1_type_to_bridge_v2_type(Type) -> + BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + BridgeV1TypeToBridgeV2Type = maps:get(bridge_v1_type_to_bridge_v2_type, BridgeV2InfoMap), + case maps:get(Type, BridgeV1TypeToBridgeV2Type, undefined) of + undefined -> bridge_v1_type_to_bridge_v2_type_old(Type); + BridgeV2Type -> BridgeV2Type + end. + +% bridge_v1_type_to_bridge_v2_type_old(kafka) -> +% kafka_producer; +% bridge_v1_type_to_bridge_v2_type_old(kafka_producer) -> +% kafka_producer; +bridge_v1_type_to_bridge_v2_type_old(azure_event_hub_producer) -> azure_event_hub_producer. %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 %% types. For everything else the function should return false. -is_bridge_v2_type(Atom) when is_atom(Atom) -> - is_bridge_v2_type(atom_to_binary(Atom, utf8)); -is_bridge_v2_type(<<"kafka_producer">>) -> +is_bridge_v2_type(Bin) when is_binary(Bin) -> + is_bridge_v2_type(binary_to_existing_atom(Bin)); +is_bridge_v2_type(Type) -> + BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + BridgeV2Types = maps:get(bridge_v2_type_names, BridgeV2InfoMap), + case maps:get(Type, BridgeV2Types, undefined) of + undefined -> is_bridge_v2_type_old(Type); + _ -> true + end. + +% is_bridge_v2_type_old(kafka_producer) -> +% true; +% is_bridge_v2_type_old(kafka) -> +% true; +is_bridge_v2_type_old(azure_event_hub_producer) -> true; -is_bridge_v2_type(<<"kafka">>) -> - true; -is_bridge_v2_type(<<"azure_event_hub_producer">>) -> - true; -is_bridge_v2_type(_) -> +is_bridge_v2_type_old(_) -> false. bridge_v1_list_and_transform() -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 634337d99..b28d5ec01 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -99,7 +99,7 @@ get_response_body_schema() -> bridge_info_examples(Method) -> maps:merge( - #{}, + emqx_bridge_v2_schema:examples(Method), emqx_enterprise_bridge_examples(Method) ). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index 5cbc709a5..926cf296c 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -82,6 +82,11 @@ schema_modules() -> ]. examples(Method) -> + EnterpriseExamples = emqx_bridge_v2_enterprise:examples(Method), + RegisteredExamples = registered_examples(Method), + maps:merge(EnterpriseExamples, RegisteredExamples). + +registered_examples(Method) -> MergeFun = fun(Example, Examples) -> maps:merge(Examples, Example) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl index 54448f07d..29dc71b69 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl @@ -26,42 +26,16 @@ examples(Method) -> lists:foldl(Fun, #{}, schema_modules()). schema_modules() -> - [ - emqx_bridge_kafka, - emqx_bridge_azure_event_hub - ]. + []. fields(actions) -> action_structs(). action_structs() -> - [ - {kafka_producer, - mk( - hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)), - #{ - desc => <<"Kafka Producer Actions Config">>, - required => false - } - )}, - {azure_event_hub_producer, - mk( - hoconsc:map(name, ref(emqx_bridge_azure_event_hub, actions)), - #{ - desc => <<"Azure Event Hub Actions Config">>, - required => false - } - )} - ]. + []. -api_schemas(Method) -> - [ - api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_bridge_v2"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2") - ]. - -api_ref(Module, Type, Method) -> - {Type, ref(Module, Method)}. +api_schemas(_Method) -> + []. -else. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 1d059903a..5fe86ec51 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -27,7 +27,8 @@ -export([ get_response/0, put_request/0, - post_request/0 + post_request/0, + examples/1 ]). -export([types/0, types_sc/0]). @@ -85,7 +86,21 @@ post_request() -> api_schema(Method) -> EE = ?MODULE:enterprise_api_schemas(Method), - hoconsc:union(bridge_api_union(EE)). + APISchemas = registered_api_schemas(Method), + hoconsc:union(bridge_api_union(EE ++ APISchemas)). + +registered_api_schemas(Method) -> + %% We *must* do this to ensure the module is really loaded, especially when we use + %% `call_hocon' from `nodetool' to generate initial configurations. + _ = emqx_bridge_v2:module_info(), + RegistredSchmeas = emqx_bridge_v2:registered_schema_modules(), + [ + api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2") + || {BridgeV2Type, SchemaModule} <- RegistredSchmeas + ]. + +api_ref(Module, Type, Method) -> + {Type, ref(Module, Method)}. bridge_api_union(Refs) -> Index = maps:from_list(Refs), @@ -133,7 +148,17 @@ roots() -> end. fields(actions) -> - [] ++ enterprise_fields_actions(). + %% We *must* do this to ensure the module is really loaded, especially when we use + %% `call_hocon' from `nodetool' to generate initial configurations. + _ = emqx_bridge_v2:module_info(), + enterprise_fields_actions() ++ + registered_schema_fields(). + +registered_schema_fields() -> + [ + Module:fields(emqx_bridge_v2:bridge_v2_type_to_schame_stuct_field(BridgeV2Type)) + || {BridgeV2Type, Module} <- emqx_bridge_v2:registered_schema_modules() + ]. desc(actions) -> ?DESC("desc_bridges_v2"); @@ -148,6 +173,19 @@ types() -> types_sc() -> hoconsc:enum(types()). +examples(Method) -> + MergeFun = + fun(Example, Examples) -> + maps:merge(Examples, Example) + end, + Fun = + fun(Module, Examples) -> + ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]), + lists:foldl(MergeFun, Examples, ConnectorExamples) + end, + SchemaModules = [Mod || {_, Mod} <- emqx_bridge_v2:registered_schema_modules()], + lists:foldl(Fun, #{}, SchemaModules). + -ifdef(TEST). -include_lib("hocon/include/hocon_types.hrl"). schema_homogeneous_test() -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 55ac36d1a..83ecf7b05 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -16,6 +16,8 @@ ]). -import(hoconsc, [mk/2, enum/1, ref/2]). +-on_load(register_bridge_v2/0). + -export([ bridge_v2_examples/1, conn_bridge_examples/1, @@ -526,7 +528,16 @@ fields(consumer_kafka_opts) -> fields(resource_opts) -> SupportedFields = [health_check_interval], CreationOpts = emqx_resource_schema:create_opts(_Overrides = []), - lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts). + lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts); +fields(bridge_v2_field) -> + {kafka_producer, + mk( + hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)), + #{ + desc => <<"Kafka Producer Bridge V2 Config">>, + required => false + } + )}. desc("config_connector") -> ?DESC("desc_config"); @@ -714,3 +725,17 @@ kafka_ext_header_value_validator(Value) -> "placeholder like ${foo}, or a simple string." } end. + +-include_lib("emqx_bridge/include/emqx_bridge_v2_register.hrl"). + +register_bridge_v2() -> + emqx_bridge_v2_register_bridge_type(#{ + %% Should be provided by all bridges. Even if the bridge_v2_type_name is + %% the same as the bridge_v1_type_named. + 'bridge_v1_type_name' => kafka, + 'bridge_v2_type_name' => kafka_producer, + 'connector_type' => kafka_producer, + 'schema_module' => ?MODULE, + 'schema_struct_field' => bridge_v2_field + }), + ok.