Merge pull request #10679 from lafirest/refactor/influxdb_dir

refactor(influxdb): move influxdb bridge into its own app
This commit is contained in:
lafirest 2023-05-12 14:31:29 +08:00 committed by GitHub
commit 33d1872e8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 64 additions and 47 deletions

View File

@ -0,0 +1,2 @@
toxiproxy
influxdb

View File

@ -0,0 +1,8 @@
{erl_opts, [debug_info]}.
{deps, [
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
]}.

View File

@ -1,8 +1,8 @@
{application, emqx_bridge_influxdb, [
{description, "EMQX Enterprise InfluxDB Bridge"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [kernel, stdlib]},
{applications, [kernel, stdlib, influxdb]},
{env, []},
{modules, []},
{links, []}

View File

@ -1,7 +1,7 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_influxdb).
-module(emqx_bridge_influxdb).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
@ -134,7 +134,7 @@ influxdb_bridge_common_fields() ->
emqx_resource_schema:fields("resource_opts").
connector_fields(Type) ->
emqx_ee_connector_influxdb:fields(Type).
emqx_bridge_influxdb_connector:fields(Type).
type_name_fields(Type) ->
[
@ -147,9 +147,9 @@ desc("config") ->
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for InfluxDB using `", string:to_upper(Method), "` method."];
desc(influxdb_api_v1) ->
?DESC(emqx_ee_connector_influxdb, "influxdb_api_v1");
?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v1");
desc(influxdb_api_v2) ->
?DESC(emqx_ee_connector_influxdb, "influxdb_api_v2");
?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v2");
desc(_) ->
undefined.

View File

@ -1,9 +1,8 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector_influxdb).
-module(emqx_bridge_influxdb_connector).
-include("emqx_ee_connector.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("hocon/include/hoconsc.hrl").
@ -40,6 +39,8 @@
-type ts_precision() :: ns | us | ms | s.
-define(INFLUXDB_DEFAULT_PORT, 8086).
%% influxdb servers don't need parse
-define(INFLUXDB_HOST_OPTIONS, #{
default_port => ?INFLUXDB_DEFAULT_PORT

View File

@ -1,7 +1,7 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_influxdb_SUITE).
-module(emqx_bridge_influxdb_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
@ -583,7 +583,7 @@ t_start_already_started(Config) ->
emqx_bridge_schema, InfluxDBConfigString
),
?check_trace(
emqx_ee_connector_influxdb:on_start(ResourceId, InfluxDBConfigMap),
emqx_bridge_influxdb_connector:on_start(ResourceId, InfluxDBConfigMap),
fun(Result, Trace) ->
?assertMatch({ok, _}, Result),
?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)),
@ -985,7 +985,7 @@ t_write_failure(Config) ->
?assertMatch([_ | _], Trace),
[#{result := Result} | _] = Trace,
?assert(
not emqx_ee_connector_influxdb:is_unrecoverable_error(Result),
not emqx_bridge_influxdb_connector:is_unrecoverable_error(Result),
#{got => Result}
);
async ->
@ -993,7 +993,7 @@ t_write_failure(Config) ->
?assertMatch([#{action := nack} | _], Trace),
[#{result := Result} | _] = Trace,
?assert(
not emqx_ee_connector_influxdb:is_unrecoverable_error(Result),
not emqx_bridge_influxdb_connector:is_unrecoverable_error(Result),
#{got => Result}
)
end,

View File

@ -2,16 +2,16 @@
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector_influxdb_SUITE).
-module(emqx_bridge_influxdb_connector_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx_connector.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(INFLUXDB_RESOURCE_MOD, emqx_ee_connector_influxdb).
-define(INFLUXDB_RESOURCE_MOD, emqx_bridge_influxdb_connector).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -65,7 +65,7 @@ t_lifecycle(Config) ->
Host = ?config(influxdb_tcp_host, Config),
Port = ?config(influxdb_tcp_port, Config),
perform_lifecycle_check(
<<"emqx_ee_connector_influxdb_SUITE">>,
<<"emqx_bridge_influxdb_connector_SUITE">>,
influxdb_config(Host, Port, false, <<"verify_none">>)
).
@ -124,7 +124,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
t_tls_verify_none(Config) ->
PoolName = <<"emqx_ee_connector_influxdb_SUITE">>,
PoolName = <<"emqx_bridge_influxdb_connector_SUITE">>,
Host = ?config(influxdb_tls_host, Config),
Port = ?config(influxdb_tls_port, Config),
InitialConfig = influxdb_config(Host, Port, true, <<"verify_none">>),
@ -135,7 +135,7 @@ t_tls_verify_none(Config) ->
ok.
t_tls_verify_peer(Config) ->
PoolName = <<"emqx_ee_connector_influxdb_SUITE">>,
PoolName = <<"emqx_bridge_influxdb_connector_SUITE">>,
Host = ?config(influxdb_tls_host, Config),
Port = ?config(influxdb_tls_port, Config),
InitialConfig = influxdb_config(Host, Port, true, <<"verify_peer">>),

View File

@ -1,7 +1,7 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_influxdb_tests).
-module(emqx_bridge_influxdb_tests).
-include_lib("eunit/include/eunit.hrl").
@ -192,7 +192,9 @@
fields => [{"field", "\"field\\4\""}],
timestamp => undefined
}},
{"m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5}",
{
"m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,"
"field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5}",
#{
measurement => "m5,mA",
tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
@ -200,7 +202,8 @@
{" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
],
timestamp => "${timestamp5}"
}},
}
},
{"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"",
#{
measurement => "m6",
@ -208,20 +211,26 @@
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
timestamp => undefined
}},
{"\\ \\ m7\\ \\ ,tag=\\ tag\\,7\\ ,tag_a=\"tag7a\",tag_b\\,tag1=tag7b field=\"field7\",field_a=field7a,field_b=\"field7b\\\\\n\"",
{
"\\ \\ m7\\ \\ ,tag=\\ tag\\,7\\ ,tag_a=\"tag7a\",tag_b\\,tag1=tag7b field=\"field7\","
"field_a=field7a,field_b=\"field7b\\\\\n\"",
#{
measurement => " m7 ",
tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}],
fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}],
timestamp => undefined
}},
{"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"\\\"field\\\" = 8b\" ${timestamp8}",
}
},
{
"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,"
"field_b=\"\\\"field\\\" = 8b\" ${timestamp8}",
#{
measurement => "m8",
tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}],
timestamp => "${timestamp8}"
}},
}
},
{"m\\9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field\\=field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
#{
measurement => "m\\9",
@ -263,7 +272,9 @@
fields => [{"field", "\"field\\4\""}],
timestamp => undefined
}},
{" m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5} ",
{
" m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,"
"field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5} ",
#{
measurement => "m5,mA",
tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
@ -271,7 +282,8 @@
{" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
],
timestamp => "${timestamp5}"
}},
}
},
{" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\" ",
#{
measurement => "m6",
@ -330,7 +342,7 @@ to_influx_lines(RawLines) ->
try
%% mute error logs from this call
emqx_logger:set_primary_log_level(none),
emqx_ee_bridge_influxdb:to_influx_lines(RawLines)
emqx_bridge_influxdb:to_influx_lines(RawLines)
after
emqx_logger:set_primary_log_level(OldLevel)
end.

View File

@ -0,0 +1 @@
Refactor the directory structure of the InfluxDB data bridge.

View File

@ -1,5 +1,4 @@
toxiproxy
influxdb
mongo
mongo_rs_sharded
mysql

View File

@ -16,7 +16,8 @@
emqx_bridge_sqlserver,
emqx_bridge_rocketmq,
emqx_bridge_rabbitmq,
emqx_bridge_tdengine
emqx_bridge_tdengine,
emqx_bridge_influxdb
]},
{env, []},
{modules, []},

View File

@ -24,8 +24,8 @@ api_schemas(Method) ->
ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"),
ref(emqx_ee_bridge_mongodb, Method ++ "_single"),
ref(emqx_ee_bridge_hstreamdb, Method),
ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"),
ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2"),
ref(emqx_bridge_influxdb, Method ++ "_api_v1"),
ref(emqx_bridge_influxdb, Method ++ "_api_v2"),
ref(emqx_ee_bridge_redis, Method ++ "_single"),
ref(emqx_ee_bridge_redis, Method ++ "_sentinel"),
ref(emqx_ee_bridge_redis, Method ++ "_cluster"),
@ -49,7 +49,7 @@ schema_modules() ->
emqx_bridge_cassandra,
emqx_ee_bridge_hstreamdb,
emqx_bridge_gcp_pubsub,
emqx_ee_bridge_influxdb,
emqx_bridge_influxdb,
emqx_ee_bridge_mongodb,
emqx_ee_bridge_mysql,
emqx_ee_bridge_redis,
@ -92,8 +92,8 @@ resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
resource_type(mongodb_sharded) -> emqx_ee_connector_mongodb;
resource_type(mongodb_single) -> emqx_ee_connector_mongodb;
resource_type(mysql) -> emqx_connector_mysql;
resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb;
resource_type(influxdb_api_v1) -> emqx_bridge_influxdb_connector;
resource_type(influxdb_api_v2) -> emqx_bridge_influxdb_connector;
resource_type(redis_single) -> emqx_ee_connector_redis;
resource_type(redis_sentinel) -> emqx_ee_connector_redis;
resource_type(redis_cluster) -> emqx_ee_connector_redis;
@ -247,7 +247,7 @@ influxdb_structs() ->
[
{Protocol,
mk(
hoconsc:map(name, ref(emqx_ee_bridge_influxdb, Protocol)),
hoconsc:map(name, ref(emqx_bridge_influxdb, Protocol)),
#{
desc => <<"InfluxDB Bridge Config">>,
required => false

View File

@ -1,5 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------
-define(INFLUXDB_DEFAULT_PORT, 8086).

View File

@ -2,7 +2,6 @@
{erl_opts, [debug_info]}.
{deps, [
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}},
{clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.3"}}},
{emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../../apps/emqx_utils"}}

View File

@ -7,7 +7,6 @@
stdlib,
ecpool,
hstreamdb_erl,
influxdb,
clickhouse
]},
{env, []},

View File

@ -1,4 +1,4 @@
emqx_ee_bridge_influxdb {
emqx_bridge_influxdb {
config_enable.desc:
"""Enable or disable this bridge."""

View File

@ -1,4 +1,4 @@
emqx_ee_connector_influxdb {
emqx_bridge_influxdb_connector {
bucket.desc:
"""InfluxDB bucket name."""

View File

@ -1,4 +1,4 @@
emqx_ee_bridge_influxdb {
emqx_bridge_influxdb {
config_enable.desc:
"""启用/禁用桥接。"""

View File

@ -1,4 +1,4 @@
emqx_ee_connector_influxdb {
emqx_bridge_influxdb_connector {
bucket.desc:
"""InfluxDB bucket 名称。"""