Merge pull request #9702 from lafirest/feat/timescale_matrix

feat(bridges): add timescale && matrix bridges
This commit is contained in:
lafirest 2023-01-10 00:06:17 +08:00 committed by GitHub
commit 20f49b903f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 28 deletions

View File

@ -26,7 +26,9 @@ api_schemas(Method) ->
ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2"), ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2"),
ref(emqx_ee_bridge_redis, Method ++ "_single"), ref(emqx_ee_bridge_redis, Method ++ "_single"),
ref(emqx_ee_bridge_redis, Method ++ "_sentinel"), ref(emqx_ee_bridge_redis, Method ++ "_sentinel"),
ref(emqx_ee_bridge_redis, Method ++ "_cluster") ref(emqx_ee_bridge_redis, Method ++ "_cluster"),
ref(emqx_ee_bridge_timescale, Method),
ref(emqx_ee_bridge_matrix, Method)
]. ].
schema_modules() -> schema_modules() ->
@ -38,7 +40,9 @@ schema_modules() ->
emqx_ee_bridge_mongodb, emqx_ee_bridge_mongodb,
emqx_ee_bridge_mysql, emqx_ee_bridge_mysql,
emqx_ee_bridge_redis, emqx_ee_bridge_redis,
emqx_ee_bridge_pgsql emqx_ee_bridge_pgsql,
emqx_ee_bridge_timescale,
emqx_ee_bridge_matrix
]. ].
examples(Method) -> examples(Method) ->
@ -66,7 +70,9 @@ resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb;
resource_type(redis_single) -> emqx_ee_connector_redis; resource_type(redis_single) -> emqx_ee_connector_redis;
resource_type(redis_sentinel) -> emqx_ee_connector_redis; resource_type(redis_sentinel) -> emqx_ee_connector_redis;
resource_type(redis_cluster) -> emqx_ee_connector_redis; resource_type(redis_cluster) -> emqx_ee_connector_redis;
resource_type(pgsql) -> emqx_connector_pgsql. resource_type(pgsql) -> emqx_connector_pgsql;
resource_type(timescale) -> emqx_connector_pgsql;
resource_type(matrix) -> emqx_connector_pgsql.
fields(bridges) -> fields(bridges) ->
[ [
@ -101,16 +107,8 @@ fields(bridges) ->
desc => <<"MySQL Bridge Config">>, desc => <<"MySQL Bridge Config">>,
required => false required => false
} }
)},
{pgsql,
mk(
hoconsc:map(name, ref(emqx_ee_bridge_pgsql, "config")),
#{
desc => <<"PostgreSQL Bridge Config">>,
required => false
}
)} )}
] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs(). ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs().
mongodb_structs() -> mongodb_structs() ->
[ [
@ -157,3 +155,20 @@ redis_structs() ->
redis_cluster redis_cluster
] ]
]. ].
pgsql_structs() ->
[
{Type,
mk(
hoconsc:map(name, ref(emqx_ee_bridge_pgsql, "config")),
#{
desc => <<Name/binary, " Bridge Config">>,
required => false
}
)}
|| {Type, Name} <- [
{pgsql, <<"PostgreSQL">>},
{timescale, <<"Timescale">>},
{matrix, <<"Matrix">>}
]
].

View File

@ -0,0 +1,42 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_matrix).
-export([
conn_bridge_examples/1
]).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% -------------------------------------------------------------------------------------------------
%% api
conn_bridge_examples(Method) ->
[
#{
<<"matrix">> => #{
summary => <<"Matrix Bridge">>,
value => emqx_ee_bridge_pgsql:values(Method, matrix)
}
}
].
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
namespace() -> "bridge_matrix".
roots() -> [].
fields("post") ->
emqx_ee_bridge_pgsql:fields("post", matrix);
fields(Method) ->
emqx_ee_bridge_pgsql:fields(Method).
desc(_) ->
undefined.

View File

@ -11,7 +11,9 @@
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
-export([ -export([
conn_bridge_examples/1 conn_bridge_examples/1,
values/2,
fields/2
]). ]).
-export([ -export([
@ -34,17 +36,17 @@ conn_bridge_examples(Method) ->
#{ #{
<<"pgsql">> => #{ <<"pgsql">> => #{
summary => <<"PostgreSQL Bridge">>, summary => <<"PostgreSQL Bridge">>,
value => values(Method) value => values(Method, pgsql)
} }
} }
]. ].
values(get) -> values(get, Type) ->
maps:merge(values(post), ?METRICS_EXAMPLE); maps:merge(values(post, Type), ?METRICS_EXAMPLE);
values(post) -> values(post, Type) ->
#{ #{
enable => true, enable => true,
type => pgsql, type => Type,
name => <<"foo">>, name => <<"foo">>,
server => <<"127.0.0.1:5432">>, server => <<"127.0.0.1:5432">>,
database => <<"mqtt">>, database => <<"mqtt">>,
@ -64,8 +66,8 @@ values(post) ->
max_queue_bytes => ?DEFAULT_QUEUE_SIZE max_queue_bytes => ?DEFAULT_QUEUE_SIZE
} }
}; };
values(put) -> values(put, Type) ->
values(post). values(post, Type).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions %% Hocon Schema Definitions
@ -96,17 +98,20 @@ fields("config") ->
} }
)} )}
] ++ ] ++
emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields(); emqx_connector_pgsql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields();
fields("creation_opts") -> fields("creation_opts") ->
Opts = emqx_resource_schema:fields("creation_opts"), Opts = emqx_resource_schema:fields("creation_opts"),
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
fields("post") -> fields("post") ->
[type_field(), name_field() | fields("config")]; fields("post", pgsql);
fields("put") -> fields("put") ->
fields("config"); fields("config");
fields("get") -> fields("get") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("post"). emqx_bridge_schema:metrics_status_fields() ++ fields("post").
fields("post", Type) ->
[type_field(Type), name_field() | fields("config")].
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
@ -123,8 +128,8 @@ is_hidden_opts(Field) ->
async_inflight_window async_inflight_window
]). ]).
type_field() -> type_field(Type) ->
{type, mk(enum([mysql]), #{required => true, desc => ?DESC("desc_type")})}. {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() -> name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

View File

@ -0,0 +1,42 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_timescale).
-export([
conn_bridge_examples/1
]).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% -------------------------------------------------------------------------------------------------
%% api
conn_bridge_examples(Method) ->
[
#{
<<"timescale">> => #{
summary => <<"Timescale Bridge">>,
value => emqx_ee_bridge_pgsql:values(Method, timescale)
}
}
].
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
namespace() -> "bridge_timescale".
roots() -> [].
fields("post") ->
emqx_ee_bridge_pgsql:fields("post", timescale);
fields(Method) ->
emqx_ee_bridge_pgsql:fields(Method).
desc(_) ->
undefined.

View File

@ -45,14 +45,20 @@ groups() ->
[ [
{tcp, [ {tcp, [
{group, with_batch}, {group, with_batch},
{group, without_batch} {group, without_batch},
{group, matrix},
{group, timescale}
]}, ]},
{tls, [ {tls, [
{group, with_batch}, {group, with_batch},
{group, without_batch} {group, without_batch},
{group, matrix},
{group, timescale}
]}, ]},
{with_batch, TCs -- NonBatchCases}, {with_batch, TCs -- NonBatchCases},
{without_batch, TCs} {without_batch, TCs},
{matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
{timescale, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}
]. ].
init_per_group(tcp, Config) -> init_per_group(tcp, Config) ->
@ -83,6 +89,12 @@ init_per_group(with_batch, Config0) ->
init_per_group(without_batch, Config0) -> init_per_group(without_batch, Config0) ->
Config = [{enable_batch, false} | Config0], Config = [{enable_batch, false} | Config0],
common_init(Config); common_init(Config);
init_per_group(matrix, Config0) ->
Config = [{bridge_type, <<"matrix">>}, {enable_batch, true} | Config0],
common_init(Config);
init_per_group(timescale, Config0) ->
Config = [{bridge_type, <<"timescale">>}, {enable_batch, true} | Config0],
common_init(Config);
init_per_group(_Group, Config) -> init_per_group(_Group, Config) ->
Config. Config.
@ -122,7 +134,7 @@ end_per_testcase(_Testcase, Config) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
common_init(Config0) -> common_init(Config0) ->
BridgeType = <<"pgsql">>, BridgeType = proplists:get_value(bridge_type, Config0, <<"pgsql">>),
Host = ?config(pgsql_host, Config0), Host = ?config(pgsql_host, Config0),
Port = ?config(pgsql_port, Config0), Port = ?config(pgsql_port, Config0),
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of