refactor(kinesis,pgsql,timescale,matrix connectors): emqx_connector_info

This commit refactors the kinesis, pgsql, timescale and matrix
connectors to use the `emqx_connector_info` behavior.
This commit is contained in:
Kjell Winblad 2024-03-19 14:43:27 +01:00
parent a3e631cda2
commit c3d7d68cfc
11 changed files with 193 additions and 63 deletions

View File

@ -8,6 +8,7 @@
erlcloud erlcloud
]}, ]},
{env, [{emqx_action_info_modules, [emqx_bridge_kinesis_action_info]}]}, {env, [{emqx_action_info_modules, [emqx_bridge_kinesis_action_info]}]},
{env, [{emqx_connector_info_modules, [emqx_bridge_kinesis_connector_info]}]},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_kinesis_connector_info).
-behaviour(emqx_connector_info).
-export([
type_name/0,
bridge_types/0,
resource_callback_module/0,
config_schema/0,
schema_module/0,
api_schema/1
]).
type_name() ->
kinesis.
bridge_types() ->
[kinesis, kinesis_producer].
resource_callback_module() ->
emqx_bridge_kinesis_impl_producer.
config_schema() ->
{kinesis,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_kinesis, "config_connector")),
#{
desc => <<"Kinesis Connector Config">>,
required => false
}
)}.
schema_module() ->
emqx_bridge_kinesis.
api_schema(Method) ->
emqx_connector_schema:api_ref(
emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"
).

View File

@ -1,13 +1,16 @@
{application, emqx_bridge_matrix, [ {application, emqx_bridge_matrix, [
{description, "EMQX Enterprise MatrixDB Bridge"}, {description, "EMQX Enterprise MatrixDB Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,
stdlib, stdlib,
emqx_resource emqx_resource
]}, ]},
{env, [{emqx_action_info_modules, [emqx_bridge_matrix_action_info]}]}, {env, [
{emqx_action_info_modules, [emqx_bridge_matrix_action_info]},
{emqx_connector_info_modules, [emqx_bridge_matrix_connector_info]}
]},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_matrix_connector_info).
-behaviour(emqx_connector_info).
-export([
type_name/0,
bridge_types/0,
resource_callback_module/0,
config_schema/0,
schema_module/0,
api_schema/1
]).
type_name() ->
matrix.
bridge_types() ->
[matrix].
resource_callback_module() ->
emqx_postgresql.
config_schema() ->
{matrix,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_matrix, "config_connector")),
#{
desc => <<"Matrix Connector Config">>,
required => false
}
)}.
schema_module() ->
emqx_bridge_matrix.
api_schema(Method) ->
emqx_connector_schema:api_ref(
emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"
).

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pgsql, [ {application, emqx_bridge_pgsql, [
{description, "EMQX Enterprise PostgreSQL Bridge"}, {description, "EMQX Enterprise PostgreSQL Bridge"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,
@ -8,7 +8,10 @@
emqx_resource, emqx_resource,
emqx_postgresql emqx_postgresql
]}, ]},
{env, [{emqx_action_info_modules, [emqx_bridge_pgsql_action_info]}]}, {env, [
{emqx_action_info_modules, [emqx_bridge_pgsql_action_info]},
{emqx_connector_info_modules, [emqx_bridge_pgsql_connector_info]}
]},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pgsql_connector_info).
-behaviour(emqx_connector_info).
-export([
type_name/0,
bridge_types/0,
resource_callback_module/0,
config_schema/0,
schema_module/0,
api_schema/1
]).
type_name() ->
pgsql.
bridge_types() ->
[pgsql].
resource_callback_module() ->
emqx_postgresql.
config_schema() ->
{pgsql,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, "config_connector")),
#{
desc => <<"PostgreSQL Connector Config">>,
required => false
}
)}.
schema_module() ->
emqx_postgresql_connector_schema.
api_schema(Method) ->
emqx_connector_schema:api_ref(
emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"
).

View File

@ -1,10 +1,12 @@
{application, emqx_bridge_timescale, [ {application, emqx_bridge_timescale, [
{description, "EMQX Enterprise TimescaleDB Bridge"}, {description, "EMQX Enterprise TimescaleDB Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource]}, {applications, [kernel, stdlib, emqx_resource]},
{env, [{emqx_action_info_module, emqx_bridge_timescale_action_info}]}, {env, [
{env, []}, {emqx_action_info_modules, emqx_bridge_timescale_action_info},
{emqx_connector_info_modules, emqx_bridge_timescale_connector_info}
]},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_timescale_connector_info).
-behaviour(emqx_connector_info).
-export([
type_name/0,
bridge_types/0,
resource_callback_module/0,
config_schema/0,
schema_module/0,
api_schema/1
]).
type_name() ->
timescale.
bridge_types() ->
[timescale].
resource_callback_module() ->
emqx_postgresql.
config_schema() ->
{timescale,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_timescale, "config_connector")),
#{
desc => <<"Timescale Connector Config">>,
required => false
}
)}.
schema_module() ->
emqx_bridge_timescale.
api_schema(Method) ->
emqx_connector_schema:api_ref(
emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"
).

View File

@ -63,7 +63,11 @@ hard_coded_connector_info_modules_ee() ->
emqx_bridge_gcp_pubsub_producer_connector_info, emqx_bridge_gcp_pubsub_producer_connector_info,
emqx_bridge_hstreamdb_connector_info, emqx_bridge_hstreamdb_connector_info,
emqx_bridge_kafka_consumer_connector_info, emqx_bridge_kafka_consumer_connector_info,
emqx_bridge_kafka_connector_info emqx_bridge_kafka_connector_info,
emqx_bridge_kinesis_connector_info,
emqx_bridge_matrix_connector_info,
emqx_bridge_pgsql_connector_info,
emqx_bridge_timescale_connector_info
]. ].
-else. -else.
hard_coded_connector_info_modules_ee() -> hard_coded_connector_info_modules_ee() ->

View File

@ -21,10 +21,6 @@
resource_type(Type) when is_binary(Type) -> resource_type(Type) when is_binary(Type) ->
resource_type(binary_to_atom(Type, utf8)); resource_type(binary_to_atom(Type, utf8));
resource_type(kinesis) ->
emqx_bridge_kinesis_impl_producer;
resource_type(matrix) ->
emqx_postgresql;
resource_type(mongodb) -> resource_type(mongodb) ->
emqx_bridge_mongodb_connector; emqx_bridge_mongodb_connector;
resource_type(oracle) -> resource_type(oracle) ->
@ -37,16 +33,12 @@ resource_type(clickhouse) ->
emqx_bridge_clickhouse_connector; emqx_bridge_clickhouse_connector;
resource_type(mysql) -> resource_type(mysql) ->
emqx_bridge_mysql_connector; emqx_bridge_mysql_connector;
resource_type(pgsql) ->
emqx_postgresql;
resource_type(syskeeper_forwarder) -> resource_type(syskeeper_forwarder) ->
emqx_bridge_syskeeper_connector; emqx_bridge_syskeeper_connector;
resource_type(syskeeper_proxy) -> resource_type(syskeeper_proxy) ->
emqx_bridge_syskeeper_proxy_server; emqx_bridge_syskeeper_proxy_server;
resource_type(sqlserver) -> resource_type(sqlserver) ->
emqx_bridge_sqlserver_connector; emqx_bridge_sqlserver_connector;
resource_type(timescale) ->
emqx_postgresql;
resource_type(redis) -> resource_type(redis) ->
emqx_bridge_redis_connector; emqx_bridge_redis_connector;
resource_type(rocketmq) -> resource_type(rocketmq) ->
@ -95,22 +87,6 @@ fields(connectors) ->
connector_structs() -> connector_structs() ->
[ [
{kinesis,
mk(
hoconsc:map(name, ref(emqx_bridge_kinesis, "config_connector")),
#{
desc => <<"Kinesis Connector Config">>,
required => false
}
)},
{matrix,
mk(
hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")),
#{
desc => <<"Matrix Connector Config">>,
required => false
}
)},
{mongodb, {mongodb,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_mongodb, "config_connector")), hoconsc:map(name, ref(emqx_bridge_mongodb, "config_connector")),
@ -160,14 +136,6 @@ connector_structs() ->
required => false required => false
} }
)}, )},
{pgsql,
mk(
hoconsc:map(name, ref(emqx_bridge_pgsql, "config_connector")),
#{
desc => <<"PostgreSQL Connector Config">>,
required => false
}
)},
{redis, {redis,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")), hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")),
@ -208,14 +176,6 @@ connector_structs() ->
required => false required => false
} }
)}, )},
{timescale,
mk(
hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")),
#{
desc => <<"Timescale Connector Config">>,
required => false
}
)},
{iotdb, {iotdb,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_iotdb_connector, config)), hoconsc:map(name, ref(emqx_bridge_iotdb_connector, config)),
@ -284,8 +244,6 @@ connector_structs() ->
schema_modules() -> schema_modules() ->
[ [
emqx_bridge_kinesis,
emqx_bridge_matrix,
emqx_bridge_mongodb, emqx_bridge_mongodb,
emqx_bridge_oracle, emqx_bridge_oracle,
emqx_bridge_influxdb, emqx_bridge_influxdb,
@ -295,7 +253,6 @@ schema_modules() ->
emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_connector,
emqx_bridge_syskeeper_proxy, emqx_bridge_syskeeper_proxy,
emqx_bridge_sqlserver, emqx_bridge_sqlserver,
emqx_bridge_timescale,
emqx_postgresql_connector_schema, emqx_postgresql_connector_schema,
emqx_bridge_redis_schema, emqx_bridge_redis_schema,
emqx_bridge_rocketmq, emqx_bridge_rocketmq,
@ -313,8 +270,6 @@ api_schemas(Method) ->
[ [
%% We need to map the `type' field of a request (binary) to a %% We need to map the `type' field of a request (binary) to a
%% connector schema module. %% connector schema module.
api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"),
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
api_ref(emqx_bridge_oracle, <<"oracle">>, Method ++ "_connector"), api_ref(emqx_bridge_oracle, <<"oracle">>, Method ++ "_connector"),
api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"), api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"),
@ -324,8 +279,6 @@ api_schemas(Method) ->
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
api_ref(emqx_bridge_sqlserver, <<"sqlserver">>, Method ++ "_connector"), api_ref(emqx_bridge_sqlserver, <<"sqlserver">>, Method ++ "_connector"),
api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"),
api_ref(emqx_bridge_rocketmq, <<"rocketmq">>, Method ++ "_connector"), api_ref(emqx_bridge_rocketmq, <<"rocketmq">>, Method ++ "_connector"),
api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method), api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method),

View File

@ -127,10 +127,6 @@ connector_info_schema_modules() ->
%% @doc Return old bridge(v1) and/or connector(v2) type %% @doc Return old bridge(v1) and/or connector(v2) type
%% from the latest connector type name. %% from the latest connector type name.
connector_type_to_bridge_types(kinesis) ->
[kinesis, kinesis_producer];
connector_type_to_bridge_types(matrix) ->
[matrix];
connector_type_to_bridge_types(mongodb) -> connector_type_to_bridge_types(mongodb) ->
[mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
connector_type_to_bridge_types(oracle) -> connector_type_to_bridge_types(oracle) ->
@ -145,8 +141,6 @@ connector_type_to_bridge_types(mysql) ->
[mysql]; [mysql];
connector_type_to_bridge_types(mqtt) -> connector_type_to_bridge_types(mqtt) ->
[mqtt]; [mqtt];
connector_type_to_bridge_types(pgsql) ->
[pgsql];
connector_type_to_bridge_types(redis) -> connector_type_to_bridge_types(redis) ->
[redis, redis_single, redis_sentinel, redis_cluster]; [redis, redis_single, redis_sentinel, redis_cluster];
connector_type_to_bridge_types(rocketmq) -> connector_type_to_bridge_types(rocketmq) ->
@ -157,8 +151,6 @@ connector_type_to_bridge_types(syskeeper_proxy) ->
[]; [];
connector_type_to_bridge_types(sqlserver) -> connector_type_to_bridge_types(sqlserver) ->
[sqlserver]; [sqlserver];
connector_type_to_bridge_types(timescale) ->
[timescale];
connector_type_to_bridge_types(iotdb) -> connector_type_to_bridge_types(iotdb) ->
[iotdb]; [iotdb];
connector_type_to_bridge_types(elasticsearch) -> connector_type_to_bridge_types(elasticsearch) ->