refactor: copy bridge api code over to emqx_connector

This commit is contained in:
Stefan Strigler 2023-10-03 18:02:23 +02:00 committed by Zaiming (Stone) Shi
parent d05f2010b3
commit ed8aa46602
3 changed files with 1120 additions and 8 deletions

File diff suppressed because it is too large Load Diff

View File

@ -13,9 +13,20 @@
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
-export([ -export([
fields/1 api_schemas/1,
fields/1,
examples/1
]). ]).
resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
resource_type(kafka) -> emqx_bridge_kafka_impl_producer.
%% For connectors that need to override connector configurations.
connector_impl_module(ConnectorType) when is_binary(ConnectorType) ->
connector_impl_module(binary_to_atom(ConnectorType, utf8));
connector_impl_module(_ConnectorType) ->
undefined.
fields(connectors) -> fields(connectors) ->
kafka_structs(). kafka_structs().
@ -31,14 +42,34 @@ kafka_structs() ->
)} )}
]. ].
resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); examples(Method) ->
resource_type(kafka) -> emqx_bridge_kafka_impl_producer. MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, conn_bridge_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
lists:foldl(Fun, #{}, schema_modules()).
%% For connectors that need to override connector configurations. schema_modules() ->
connector_impl_module(ConnectorType) when is_binary(ConnectorType) -> [
connector_impl_module(binary_to_atom(ConnectorType, utf8)); emqx_bridge_kafka
connector_impl_module(_ConnectorType) -> ].
undefined.
api_schemas(Method) ->
[
%% We need to map the `type' field of a request (binary) to a
%% connector schema module.
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_producer")
].
api_ref(Module, Type, Method) ->
{Type, ref(Module, Method)}.
-else. -else.

View File

@ -26,6 +26,14 @@
-export([roots/0, fields/1, desc/1, namespace/0, tags/0]). -export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
enterprise_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use
%% `call_hocon' from `nodetool' to generate initial configurations.
_ = emqx_connector_ee_schema:module_info(),
case erlang:function_exported(emqx_connector_ee_schema, api_schemas, 1) of
true -> emqx_connector_ee_schema:api_schemas(Method);
false -> []
end.
enterprise_fields_connectors() -> enterprise_fields_connectors() ->
%% We *must* do this to ensure the module is really loaded, especially when we use %% We *must* do this to ensure the module is really loaded, especially when we use
@ -199,6 +207,53 @@ transform_bridges_v1_to_connectors_and_bridges_v2(RawConfig) ->
%% HOCON Schema Callbacks %% HOCON Schema Callbacks
%%====================================================================================== %%======================================================================================
%% For HTTP APIs
get_response() ->
api_schema("get").
put_request() ->
api_schema("put").
post_request() ->
api_schema("post").
api_schema(Method) ->
Broker = [
{Type, ref(Mod, Method)}
|| {Type, Mod} <- [
{<<"webhook">>, emqx_bridge_http_schema},
{<<"mqtt">>, emqx_bridge_mqtt_schema}
]
],
EE = enterprise_api_schemas(Method),
hoconsc:union(bridge_api_union(Broker ++ EE)).
bridge_api_union(Refs) ->
Index = maps:from_list(Refs),
fun
(all_union_members) ->
maps:values(Index);
({value, V}) ->
case V of
#{<<"type">> := T} ->
case maps:get(T, Index, undefined) of
undefined ->
throw(#{
field_name => type,
reason => <<"unknown bridge type">>
});
Ref ->
[Ref]
end;
_ ->
throw(#{
field_name => type,
reason => <<"unknown bridge type">>
})
end
end.
%% general config
namespace() -> "connector". namespace() -> "connector".
tags() -> tags() ->