refactor(emqx_bridge_v2): make independent of Kafka

This removes the Kafka specific knowledge from emqx_bridge_v2 and
makes it possible to add new Bridge V2 bridges without modifying
the emqx_bridge application.
This commit is contained in:
Kjell Winblad 2023-11-05 20:27:27 +01:00 committed by Ivan Dyachkov
parent cd5b1f9b96
commit 9eaee8f333
7 changed files with 321 additions and 54 deletions

View File

@ -0,0 +1,136 @@
%% This function is called to register a bridge V2. It should be called before
%% the system boots in a function triggered by an -on_load() directive
%% since it should be called before the system boots because the config
%% system depends on that.
%%
%% It is placed in an hrl file instead of in emqx_bridge_v2.erl because emqx_bridge_v2
%% might not be loaded when the bridge module is loaded.
-spec emqx_bridge_v2_register_bridge_type(#{
%% Should be provided by all bridges. Even if the bridge_v2_type_name is
%% the same as the bridge_v1_type_named.
'bridge_v1_type_name' := atom(),
'bridge_v2_type_name' := atom(),
'connector_type' := atom(),
'schema_module' := atom(),
'schema_struct_field' := atom() | binary()
}) -> ok.
emqx_bridge_v2_register_bridge_type(BridgeTypeInfo) ->
try
%% We must prevent overwriting so we take a lock when writing to persistent_term
global:set_lock(
{
internal_emqx_bridge_v2_persistent_term_info_key(),
internal_emqx_bridge_v2_persistent_term_info_key()
},
[node()],
infinity
),
internal_maybe_create_initial_bridge_v2_info_map(),
internal_register_bridge_type_with_lock(BridgeTypeInfo)
catch
ErrorType:Reason:Stacktrace ->
%% Print the error on standard output as logger might not be
%% started yet
io:format("~p~n", [
#{
'error_type' => ErrorType,
'reason' => Reason,
'stacktrace' => Stacktrace,
'msg' => "Failed to register bridge V2 type"
}
]),
erlang:raise(ErrorType, Reason, Stacktrace)
after
global:del_lock(
{
internal_emqx_bridge_v2_persistent_term_info_key(),
internal_emqx_bridge_v2_persistent_term_info_key()
},
[node()]
)
end,
ok.
internal_register_bridge_type_with_lock(BridgeTypeInfo) ->
InfoMap0 = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
%% The Bridge V1 type is also a bridge V2 type due to backwards compatibility
InfoMap1 = emqx_utils_maps:deep_force_put(
[
bridge_v2_type_names,
maps:get(bridge_v1_type_name, BridgeTypeInfo)
],
InfoMap0,
true
),
InfoMap2 = emqx_utils_maps:deep_force_put(
[
bridge_v2_type_names,
maps:get(bridge_v2_type_name, BridgeTypeInfo)
],
InfoMap1,
true
),
InfoMap3 = emqx_utils_maps:deep_force_put(
[
bridge_v1_type_to_bridge_v2_type,
maps:get(bridge_v1_type_name, BridgeTypeInfo)
],
InfoMap2,
maps:get(bridge_v2_type_name, BridgeTypeInfo)
),
InfoMap4 = emqx_utils_maps:deep_force_put(
[
bridge_v2_type_to_connector_type,
maps:get(bridge_v2_type_name, BridgeTypeInfo)
],
InfoMap3,
maps:get(connector_type, BridgeTypeInfo)
),
%% Backwards compatibility
InfoMap5 = emqx_utils_maps:deep_force_put(
[
bridge_v2_type_to_connector_type,
maps:get(bridge_v1_type_name, BridgeTypeInfo)
],
InfoMap4,
maps:get(connector_type, BridgeTypeInfo)
),
InfoMap6 = emqx_utils_maps:deep_force_put(
[
bridge_v2_type_to_schema_module,
maps:get(bridge_v2_type_name, BridgeTypeInfo)
],
InfoMap5,
maps:get(schema_module, BridgeTypeInfo)
),
InfoMap7 = emqx_utils_maps:deep_force_put(
[
bridge_v2_type_to_schema_struct_field,
maps:get(bridge_v2_type_name, BridgeTypeInfo)
],
InfoMap6,
maps:get(schema_struct_field, BridgeTypeInfo)
),
ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap7).
internal_maybe_create_initial_bridge_v2_info_map() ->
case persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key(), undefined) of
undefined ->
ok = persistent_term:put(
internal_emqx_bridge_v2_persistent_term_info_key(),
#{
bridge_v2_type_names => #{},
bridge_v1_type_to_bridge_v2_type => #{},
bridge_v2_type_to_connector_type => #{},
bridge_v2_type_to_schema_module => #{},
bridge_v2_type_to_schema_struct_field => #{}
}
),
ok;
_ ->
ok
end.
internal_emqx_bridge_v2_persistent_term_info_key() ->
?FUNCTION_NAME.

View File

@ -18,6 +18,13 @@
-behaviour(emqx_config_handler).
-behaviour(emqx_config_backup).
-compile([
{nowarn_unused_function, [
{internal_register_bridge_type_with_lock, 1},
{emqx_bridge_v2_register_bridge_type, 1}
]}
]).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
@ -28,6 +35,15 @@
%% refactored into a new module/application with appropriate name.
-define(ROOT_KEY, actions).
-on_load(on_load/0).
%% Getting registered bridge schemas:
-export([
bridge_v2_type_to_schame_stuct_field/1,
registered_schema_modules/0
]).
%% Loading and unloading config when EMQX starts and stops
-export([
load/0,
@ -134,6 +150,55 @@
-type bridge_v2_type() :: binary() | atom().
-type bridge_v2_name() :: binary() | atom().
%%====================================================================
%%====================================================================
-include_lib("emqx_bridge/include/emqx_bridge_v2_register.hrl").
on_load() ->
try
%% We must prevent overwriting so we take a lock when writing to persistent_term
global:set_lock(
{
internal_emqx_bridge_v2_persistent_term_info_key(),
internal_emqx_bridge_v2_persistent_term_info_key()
},
[node()],
infinity
),
internal_maybe_create_initial_bridge_v2_info_map()
catch
ErrorType:Reason:Stacktrace ->
%% Logger may not be started so print to stdout
io:format("~p~n", #{
'error_type' => ErrorType,
'reason' => Reason,
'stacktrace' => Stacktrace,
'msg' => "Failed to register bridge V2 type"
}),
erlang:raise(ErrorType, Reason, Stacktrace)
after
global:del_lock(
{
internal_emqx_bridge_v2_persistent_term_info_key(),
internal_emqx_bridge_v2_persistent_term_info_key()
},
[node()]
)
end,
ok.
bridge_v2_type_to_schame_stuct_field(BridgeV2Type) ->
InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
Map = maps:get(bridge_v2_type_to_schema_struct_field, InfoMap),
maps:get(BridgeV2Type, Map).
registered_schema_modules() ->
InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
Schemas = maps:get(bridge_v2_type_to_schema_module, InfoMap),
maps:to_list(Schemas).
%%====================================================================
%% Loading and unloading config when EMQX starts and stops
%%====================================================================
@ -822,12 +887,20 @@ connector_type(Type) ->
bridge_v2_type_to_connector_type(Type) when not is_atom(Type) ->
bridge_v2_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type)));
bridge_v2_type_to_connector_type(kafka) ->
%% backward compatible
kafka_producer;
bridge_v2_type_to_connector_type(kafka_producer) ->
kafka_producer;
bridge_v2_type_to_connector_type(azure_event_hub_producer) ->
bridge_v2_type_to_connector_type(Type) ->
BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
BridgeV2TypeToConnectorTypeMap = maps:get(bridge_v2_type_to_connector_type, BridgeV2InfoMap),
case maps:get(Type, BridgeV2TypeToConnectorTypeMap, undefined) of
undefined -> bridge_v2_type_to_connector_type_old(Type);
ConnectorType -> ConnectorType
end.
% bridge_v2_type_to_connector_type_old(kafka) ->
% %% backward compatible
% kafka_producer;
% bridge_v2_type_to_connector_type_old(kafka_producer) ->
% kafka_producer;
bridge_v2_type_to_connector_type_old(azure_event_hub_producer) ->
azure_event_hub_producer.
%%====================================================================
@ -1050,25 +1123,41 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) ->
?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin));
bridge_v1_type_to_bridge_v2_type(kafka) ->
kafka_producer;
bridge_v1_type_to_bridge_v2_type(kafka_producer) ->
kafka_producer;
bridge_v1_type_to_bridge_v2_type(azure_event_hub_producer) ->
bridge_v1_type_to_bridge_v2_type(Type) ->
BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
BridgeV1TypeToBridgeV2Type = maps:get(bridge_v1_type_to_bridge_v2_type, BridgeV2InfoMap),
case maps:get(Type, BridgeV1TypeToBridgeV2Type, undefined) of
undefined -> bridge_v1_type_to_bridge_v2_type_old(Type);
BridgeV2Type -> BridgeV2Type
end.
% bridge_v1_type_to_bridge_v2_type_old(kafka) ->
% kafka_producer;
% bridge_v1_type_to_bridge_v2_type_old(kafka_producer) ->
% kafka_producer;
bridge_v1_type_to_bridge_v2_type_old(azure_event_hub_producer) ->
azure_event_hub_producer.
%% This function should return true for all inputs that are bridge V1 types for
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
%% types. For everything else the function should return false.
is_bridge_v2_type(Atom) when is_atom(Atom) ->
is_bridge_v2_type(atom_to_binary(Atom, utf8));
is_bridge_v2_type(<<"kafka_producer">>) ->
is_bridge_v2_type(Bin) when is_binary(Bin) ->
is_bridge_v2_type(binary_to_existing_atom(Bin));
is_bridge_v2_type(Type) ->
BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
BridgeV2Types = maps:get(bridge_v2_type_names, BridgeV2InfoMap),
case maps:get(Type, BridgeV2Types, undefined) of
undefined -> is_bridge_v2_type_old(Type);
_ -> true
end.
% is_bridge_v2_type_old(kafka_producer) ->
% true;
% is_bridge_v2_type_old(kafka) ->
% true;
is_bridge_v2_type_old(azure_event_hub_producer) ->
true;
is_bridge_v2_type(<<"kafka">>) ->
true;
is_bridge_v2_type(<<"azure_event_hub_producer">>) ->
true;
is_bridge_v2_type(_) ->
is_bridge_v2_type_old(_) ->
false.
bridge_v1_list_and_transform() ->

View File

@ -99,7 +99,7 @@ get_response_body_schema() ->
bridge_info_examples(Method) ->
maps:merge(
#{},
emqx_bridge_v2_schema:examples(Method),
emqx_enterprise_bridge_examples(Method)
).

View File

@ -82,6 +82,11 @@ schema_modules() ->
].
examples(Method) ->
EnterpriseExamples = emqx_bridge_v2_enterprise:examples(Method),
RegisteredExamples = registered_examples(Method),
maps:merge(EnterpriseExamples, RegisteredExamples).
registered_examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)

View File

@ -26,42 +26,16 @@ examples(Method) ->
lists:foldl(Fun, #{}, schema_modules()).
schema_modules() ->
[
emqx_bridge_kafka,
emqx_bridge_azure_event_hub
].
[].
fields(actions) ->
action_structs().
action_structs() ->
[
{kafka_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)),
#{
desc => <<"Kafka Producer Actions Config">>,
required => false
}
)},
{azure_event_hub_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_azure_event_hub, actions)),
#{
desc => <<"Azure Event Hub Actions Config">>,
required => false
}
)}
].
[].
api_schemas(Method) ->
[
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_bridge_v2"),
api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2")
].
api_ref(Module, Type, Method) ->
{Type, ref(Module, Method)}.
api_schemas(_Method) ->
[].
-else.

View File

@ -27,7 +27,8 @@
-export([
get_response/0,
put_request/0,
post_request/0
post_request/0,
examples/1
]).
-export([types/0, types_sc/0]).
@ -85,7 +86,21 @@ post_request() ->
api_schema(Method) ->
EE = ?MODULE:enterprise_api_schemas(Method),
hoconsc:union(bridge_api_union(EE)).
APISchemas = registered_api_schemas(Method),
hoconsc:union(bridge_api_union(EE ++ APISchemas)).
registered_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use
%% `call_hocon' from `nodetool' to generate initial configurations.
_ = emqx_bridge_v2:module_info(),
RegistredSchmeas = emqx_bridge_v2:registered_schema_modules(),
[
api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2")
|| {BridgeV2Type, SchemaModule} <- RegistredSchmeas
].
api_ref(Module, Type, Method) ->
{Type, ref(Module, Method)}.
bridge_api_union(Refs) ->
Index = maps:from_list(Refs),
@ -133,7 +148,17 @@ roots() ->
end.
fields(actions) ->
[] ++ enterprise_fields_actions().
%% We *must* do this to ensure the module is really loaded, especially when we use
%% `call_hocon' from `nodetool' to generate initial configurations.
_ = emqx_bridge_v2:module_info(),
enterprise_fields_actions() ++
registered_schema_fields().
registered_schema_fields() ->
[
Module:fields(emqx_bridge_v2:bridge_v2_type_to_schame_stuct_field(BridgeV2Type))
|| {BridgeV2Type, Module} <- emqx_bridge_v2:registered_schema_modules()
].
desc(actions) ->
?DESC("desc_bridges_v2");
@ -148,6 +173,19 @@ types() ->
types_sc() ->
hoconsc:enum(types()).
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
SchemaModules = [Mod || {_, Mod} <- emqx_bridge_v2:registered_schema_modules()],
lists:foldl(Fun, #{}, SchemaModules).
-ifdef(TEST).
-include_lib("hocon/include/hocon_types.hrl").
schema_homogeneous_test() ->

View File

@ -16,6 +16,8 @@
]).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-on_load(register_bridge_v2/0).
-export([
bridge_v2_examples/1,
conn_bridge_examples/1,
@ -526,7 +528,16 @@ fields(consumer_kafka_opts) ->
fields(resource_opts) ->
SupportedFields = [health_check_interval],
CreationOpts = emqx_resource_schema:create_opts(_Overrides = []),
lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts).
lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts);
fields(bridge_v2_field) ->
{kafka_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)),
#{
desc => <<"Kafka Producer Bridge V2 Config">>,
required => false
}
)}.
desc("config_connector") ->
?DESC("desc_config");
@ -714,3 +725,17 @@ kafka_ext_header_value_validator(Value) ->
"placeholder like ${foo}, or a simple string."
}
end.
-include_lib("emqx_bridge/include/emqx_bridge_v2_register.hrl").
register_bridge_v2() ->
emqx_bridge_v2_register_bridge_type(#{
%% Should be provided by all bridges. Even if the bridge_v2_type_name is
%% the same as the bridge_v1_type_named.
'bridge_v1_type_name' => kafka,
'bridge_v2_type_name' => kafka_producer,
'connector_type' => kafka_producer,
'schema_module' => ?MODULE,
'schema_struct_field' => bridge_v2_field
}),
ok.