test: greptimedb data brige

This commit is contained in:
Dennis Zhuang 2023-07-06 19:11:20 +08:00 committed by firest
parent 91ebd90442
commit 89bce99870
8 changed files with 1043 additions and 15 deletions

View File

@ -1,2 +1,2 @@
erlang 25.3.2-1 erlang 25.3.2.3
elixir 1.14.5-otp-25 elixir 1.14.5-otp-25

View File

@ -90,7 +90,7 @@
T == oracle; T == oracle;
T == iotdb; T == iotdb;
T == kinesis_producer; T == kinesis_producer;
T == greptimedb T == greptimedb_grpc_v1
). ).
-define(ROOT_KEY, bridges). -define(ROOT_KEY, bridges).

View File

@ -50,7 +50,7 @@ api_schemas(Method) ->
api_ref(emqx_bridge_iotdb, <<"iotdb">>, Method), api_ref(emqx_bridge_iotdb, <<"iotdb">>, Method),
api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method), api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method),
api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer"), api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer"),
api_ref(emqx_bridge_greptimedb, Method) api_ref(emqx_bridge_greptimedb, <<"greptimedb_grpc_v1">>, Method ++ "_grpc_v1")
]. ].
schema_modules() -> schema_modules() ->
@ -124,8 +124,7 @@ resource_type(oracle) -> emqx_oracle;
resource_type(iotdb) -> emqx_bridge_iotdb_impl; resource_type(iotdb) -> emqx_bridge_iotdb_impl;
resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector;
resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer. resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer.
resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector. resource_type(greptimedb_grpc_v1) -> emqx_bridge_greptimedb_connector.
resource_type(greptimedb) -> emqx_bridge_greptimedb_connector.
fields(bridges) -> fields(bridges) ->
[ [
@ -214,7 +213,8 @@ fields(bridges) ->
influxdb_structs() ++ influxdb_structs() ++
redis_structs() ++ redis_structs() ++
pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs() ++ pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs() ++
kinesis_structs(). kinesis_structs() ++
greptimedb_structs().
mongodb_structs() -> mongodb_structs() ->
[ [
@ -299,6 +299,21 @@ influxdb_structs() ->
] ]
]. ].
greptimedb_structs() ->
[
{Protocol,
mk(
hoconsc:map(name, ref(emqx_bridge_greptimedb, Protocol)),
#{
desc => <<"GreptimeDB Bridge Config">>,
required => false
}
)}
|| Protocol <- [
greptimedb_grpc_v1
]
].
redis_structs() -> redis_structs() ->
[ [
{Type, {Type,

View File

@ -0,0 +1,2 @@
toxiproxy
greptimedb

View File

@ -6,7 +6,7 @@
{emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}},
{greptimedb_client_erl, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.1"}}} {greptimedb_client_erl, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {branch, "feature/check-auth"}}}
]}. ]}.
{plugins, [rebar3_path_deps]}. {plugins, [rebar3_path_deps]}.
{project_plugins, [erlfmt]}. {project_plugins, [erlfmt]}.

View File

@ -47,7 +47,7 @@ values("greptimedb_grpc_v1", post) ->
bucket => <<"example_bucket">>, bucket => <<"example_bucket">>,
org => <<"examlpe_org">>, org => <<"examlpe_org">>,
token => <<"example_token">>, token => <<"example_token">>,
server => <<"127.0.0.1:4000">> server => <<"127.0.0.1:4001">>
}, },
values(common, "greptimedb_grpc_v1", SupportUint, TypeOpts); values(common, "greptimedb_grpc_v1", SupportUint, TypeOpts);
values(Protocol, put) -> values(Protocol, put) ->
@ -68,7 +68,7 @@ values(common, Protocol, SupportUint, TypeOpts) ->
batch_size => 100, batch_size => 100,
batch_time => <<"20ms">> batch_time => <<"20ms">>
}, },
server => <<"127.0.0.1:4000">>, server => <<"127.0.0.1:4001">>,
ssl => #{enable => false} ssl => #{enable => false}
}, },
maps:merge(TypeOpts, CommonConfigs). maps:merge(TypeOpts, CommonConfigs).

View File

@ -39,7 +39,7 @@
%% Allocatable resources %% Allocatable resources
-define(greptime_client, greptime_client). -define(greptime_client, greptime_client).
-define(GREPTIMEDB_DEFAULT_PORT, 4000). -define(GREPTIMEDB_DEFAULT_PORT, 4001).
-define(DEFAULT_DB, <<"public">>). -define(DEFAULT_DB, <<"public">>).
@ -81,7 +81,7 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c
#{batch => false, mode => sync, error => ErrorPoints} #{batch => false, mode => sync, error => ErrorPoints}
), ),
log_error_points(InstId, ErrorPoints), log_error_points(InstId, ErrorPoints),
{error, {unrecoverable_error, ErrorPoints}} ErrorPoints
end. end.
%% Once a Batched Data trans to points failed. %% Once a Batched Data trans to points failed.
@ -140,13 +140,21 @@ fields(common) ->
fields(greptimedb_grpc_v1) -> fields(greptimedb_grpc_v1) ->
fields(common) ++ fields(common) ++
[ [
{dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})} {dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})},
{username, mk(binary(), #{desc => ?DESC("username")})},
{password,
mk(binary(), #{
desc => ?DESC("password"),
format => <<"password">>,
sensitive => true,
converter => fun emqx_schema:password_converter/2
})}
] ++ emqx_connector_schema_lib:ssl_fields(). ] ++ emqx_connector_schema_lib:ssl_fields().
server() -> server() ->
Meta = #{ Meta = #{
required => false, required => false,
default => <<"127.0.0.1:4000">>, default => <<"127.0.0.1:4001">>,
desc => ?DESC("server"), desc => ?DESC("server"),
converter => fun convert_server/2 converter => fun convert_server/2
}, },
@ -477,7 +485,7 @@ line_to_point(
{_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
TableName = emqx_placeholder:proc_tmpl(Measurement, Data), TableName = emqx_placeholder:proc_tmpl(Measurement, Data),
{TableName, [ {TableName, [
maps:without([precision], Item#{ maps:without([precision, measurement], Item#{
tags => EncodedTags, tags => EncodedTags,
fields => EncodedFields, fields => EncodedFields,
timestamp => maybe_convert_time_unit(Ts, Precision) timestamp => maybe_convert_time_unit(Ts, Precision)
@ -539,7 +547,7 @@ value_type([<<"FALSE">>]) ->
value_type([<<"False">>]) -> value_type([<<"False">>]) ->
greptimedb_values:boolean_value(false); greptimedb_values:boolean_value(false);
value_type(Val) -> value_type(Val) ->
#{values => #{string_values => Val, datatype => 'STRING'}}. #{values => #{string_values => Val}, datatype => 'STRING'}.
key_filter(undefined) -> undefined; key_filter(undefined) -> undefined;
key_filter(Value) -> emqx_utils_conv:bin(Value). key_filter(Value) -> emqx_utils_conv:bin(Value).

File diff suppressed because it is too large Load Diff