From 7c7a0b217245a3f613975d58c16baab3ce3af923 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 2 Jan 2024 17:57:22 +0800 Subject: [PATCH 1/6] refactor: split influxdb bridges to actions and connectors --- .../src/emqx_bridge_influxdb.app.src | 2 +- .../src/emqx_bridge_influxdb.erl | 146 +++++++++++++++-- .../src/emqx_bridge_influxdb_action_info.erl | 77 +++++++++ .../src/emqx_bridge_influxdb_connector.erl | 152 ++++++++++++------ .../src/schema/emqx_connector_ee_schema.erl | 12 ++ .../src/schema/emqx_connector_schema.erl | 2 + rel/i18n/emqx_bridge_influxdb.hocon | 15 ++ 7 files changed, 342 insertions(+), 64 deletions(-) create mode 100644 apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src index ef288368d..864d8945f 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src @@ -8,7 +8,7 @@ emqx_resource, influxdb ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_influxdb_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl index acb295752..8d5360887 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl @@ -11,7 +11,6 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - conn_bridge_examples/1, write_syntax_type/0 ]). @@ -22,17 +21,28 @@ desc/1 ]). +%% Examples +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 +]). + -type write_syntax() :: list(). -reflect_type([write_syntax/0]). -typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}). -export([to_influx_lines/1]). +-define(CONNECTOR_TYPE, influxdb). +-define(ACTION_TYPE, influxdb). + %% ------------------------------------------------------------------------------------------------- %% api write_syntax_type() -> typerefl:alias("string", write_syntax()). +%% Examples conn_bridge_examples(Method) -> [ #{ @@ -49,25 +59,80 @@ conn_bridge_examples(Method) -> } ]. +bridge_v2_examples(Method) -> + WriteExample = + <<"${topic},clientid=${clientid} ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", "bool=${payload.bool}">>, + ParamsExample = #{ + parameters => #{ + write_syntax => WriteExample, precision => ms + } + }, + [ + #{ + <<"influxdb">> => #{ + summary => <<"InfluxDB Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, influxdb, influxdb, ParamsExample + ) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + <<"influxdb_api_v1">> => #{ + summary => <<"InfluxDB HTTP API V1 Connector">>, + value => emqx_connector_schema:connector_values( + Method, influxdb, connector_values(influxdb_api_v1) + ) + } + }, + #{ + <<"influxdb_api_v2">> => #{ + summary => <<"InfluxDB HTTP API V2 Connector">>, + value => emqx_connector_schema:connector_values( + Method, influxdb, connector_values(influxdb_api_v2) + ) + } + } + ]. + +connector_values(Type) -> + maps:merge(basic_connector_values(), #{parameters => connector_values_v(Type)}). + +connector_values_v(influxdb_api_v2) -> + #{ + influxdb_type => influxdb_api_v2, + bucket => <<"example_bucket">>, + org => <<"examlpe_org">>, + token => <<"example_token">> + }; +connector_values_v(influxdb_api_v1) -> + #{ + influxdb_type => influxdb_api_v1, + database => <<"example_database">>, + username => <<"example_username">>, + password => <<"******">> + }. + +basic_connector_values() -> + #{ + enable => true, + server => <<"127.0.0.1:8086">>, + ssl => #{enable => false} + }. + values(Protocol, get) -> values(Protocol, post); values("influxdb_api_v2", post) -> SupportUint = <<"uint_value=${payload.uint_key}u,">>, - TypeOpts = #{ - bucket => <<"example_bucket">>, - org => <<"examlpe_org">>, - token => <<"example_token">>, - server => <<"127.0.0.1:8086">> - }, + TypeOpts = connector_values_v(influxdb_api_v2), values(common, "influxdb_api_v2", SupportUint, TypeOpts); values("influxdb_api_v1", post) -> SupportUint = <<>>, - TypeOpts = #{ - database => <<"example_database">>, - username => <<"example_username">>, - password => <<"******">>, - server => <<"127.0.0.1:8086">> - }, + TypeOpts = connector_values_v(influxdb_api_v1), values(common, "influxdb_api_v1", SupportUint, TypeOpts); values(Protocol, put) -> values(Protocol, post). @@ -98,6 +163,10 @@ namespace() -> "bridge_influxdb". roots() -> []. +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + connection_fields() ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields("post_api_v1") -> method_fields(post, influxdb_api_v1); fields("post_api_v2") -> @@ -110,12 +179,59 @@ fields("get_api_v1") -> method_fields(get, influxdb_api_v1); fields("get_api_v2") -> method_fields(get, influxdb_api_v2); +fields(action) -> + {influxdb, + mk( + hoconsc:map(name, ref(?MODULE, influxdb_action)), + #{desc => <<"InfluxDB Action Config">>, required => false} + )}; +fields(influxdb_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk(ref(?MODULE, action_parameters), #{ + required => true, desc => ?DESC(action_parameters) + }) + ); +fields(action_parameters) -> + [ + {write_syntax, fun write_syntax/1}, + emqx_bridge_influxdb_connector:precision_field() + ]; +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + Fields = + connection_fields() ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(influxdb_action)); fields(Type) when Type == influxdb_api_v1 orelse Type == influxdb_api_v2 -> influxdb_bridge_common_fields() ++ connector_fields(Type). +connection_fields() -> + [ + emqx_bridge_influxdb_connector:server_field(), + {parameters, + mk( + hoconsc:union([ + ref(emqx_bridge_influxdb_connector, "connector_" ++ T) + || T <- ["influxdb_api_v1", "influxdb_api_v2"] + ]), + #{required => true, desc => ?DESC("influxdb_parameters")} + )} + ] ++ emqx_connector_schema_lib:ssl_fields(). + method_fields(post, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType) ++ @@ -154,6 +270,10 @@ desc(influxdb_api_v1) -> ?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v1"); desc(influxdb_api_v2) -> ?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v2"); +desc(influxdb_action) -> + ?DESC(influxdb_action); +desc(action_parameters) -> + ?DESC(action_parameters); desc(_) -> undefined. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl new file mode 100644 index 000000000..a74e88df6 --- /dev/null +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl @@ -0,0 +1,77 @@ +-module(emqx_bridge_influxdb_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +%% dynamic callback +-export([ + bridge_v1_type_name_fun/1 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(SCHEMA_MODULE, emqx_bridge_influxdb). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(influxdb_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ActionTopLevelKeys = schema_keys(influxdb_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ConnectorTopLevelKeys = schema_keys("config_connector"), + ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys), + ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys, + ConnConfig0 = make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnConfig0 + ). + +connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) -> + RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorRawConf, ActionRawConf + ), + maps:without([<<"influxdb_type">>], RawConf). + +bridge_v1_type_name() -> + {fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}. + +action_type_name() -> influxdb. + +connector_type_name() -> influxdb. + +schema_module() -> ?SCHEMA_MODULE. + +bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"influxdb_type">> := Type}}, _}) -> + v1_type(Type). + +v1_type(<<"influxdb_api_v1">>) -> influxdb_api_v1; +v1_type(<<"influxdb_api_v2">>) -> influxdb_api_v2. + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))]. + +bridge_v1_type_names() -> [influxdb, influxdb_api_v1, influxdb_api_v2]. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 2b4fb8d74..6f4e21df5 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -19,6 +19,10 @@ callback_mode/0, on_start/2, on_stop/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channel_status/3, + on_get_channels/1, on_query/3, on_batch_query/3, on_query_async/4, @@ -34,6 +38,8 @@ desc/1 ]). +-export([precision_field/0, server_field/0]). + %% only for test -export([is_unrecoverable_error/1]). @@ -55,6 +61,35 @@ %% resource callback callback_mode() -> async_if_possible. +on_add_channel( + _InstanceId, + #{channels := Channels} = OldState, + ChannelId, + #{parameters := Parameters} = ChannelConfig0 +) -> + #{write_syntax := WriteSytaxTmpl} = Parameters, + Precision = maps:get(precision, Parameters, ms), + ChannelConfig = maps:merge( + Parameters, + ChannelConfig0#{ + write_syntax => to_config(WriteSytaxTmpl, Precision) + } + ), + {ok, OldState#{channels => maps:put(ChannelId, ChannelConfig, Channels)}}. + +on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> + NewState = State#{channels => maps:remove(ChannelId, Channels)}, + {ok, NewState}. + +on_get_channel_status(InstanceId, _ChannelId, State) -> + case on_get_status(InstanceId, State) of + connected -> connected; + _ -> connecting + end. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + on_start(InstId, Config) -> %% InstID as pool would be handled by influxdb client %% so there is no need to allocate pool_name here @@ -73,8 +108,9 @@ on_stop(InstId, _State) -> ok end. -on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) -> - case data_to_points(Data, SyntaxLines) of +on_query(InstId, {Channel, Message}, #{channels := ChannelConf, client := Client}) -> + #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + case data_to_points(Message, SyntaxLines) of {ok, Points} -> ?tp( influxdb_connector_send_query, @@ -92,7 +128,9 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c %% Once a Batched Data trans to points failed. %% This batch query failed -on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) -> +on_batch_query(InstId, BatchData, #{channels := ChannelConf, client := Client}) -> + [{Channel, _} | _] = BatchData, + #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), case parse_batch_data(InstId, BatchData, SyntaxLines) of {ok, Points} -> ?tp( @@ -110,11 +148,12 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client on_query_async( InstId, - {send_message, Data}, + {Channel, Message}, {ReplyFun, Args}, - _State = #{write_syntax := SyntaxLines, client := Client} + #{channels := ChannelConf, client := Client} ) -> - case data_to_points(Data, SyntaxLines) of + #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + case data_to_points(Message, SyntaxLines) of {ok, Points} -> ?tp( influxdb_connector_send_query, @@ -134,8 +173,10 @@ on_batch_query_async( InstId, BatchData, {ReplyFun, Args}, - #{write_syntax := SyntaxLines, client := Client} + #{channels := ChannelConf, client := Client} ) -> + [{Channel, _} | _] = BatchData, + #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), case parse_batch_data(InstId, BatchData, SyntaxLines) of {ok, Points} -> ?tp( @@ -177,30 +218,51 @@ roots() -> fields(common) -> [ - {server, server()}, - {precision, - %% The influxdb only supports these 4 precision: - %% See "https://github.com/influxdata/influxdb/blob/ - %% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for - %% more information. - mk(enum([ns, us, ms, s]), #{ - required => false, default => ms, desc => ?DESC("precision") - })} + server_field(), + precision_field() ]; +fields("connector_influxdb_api_v1") -> + [influxdb_type_field(influxdb_api_v1) | influxdb_api_v1_fields()]; +fields("connector_influxdb_api_v2") -> + [influxdb_type_field(influxdb_api_v2) | influxdb_api_v2_fields()]; fields(influxdb_api_v1) -> - fields(common) ++ - [ - {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, - {username, mk(binary(), #{desc => ?DESC("username")})}, - {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})} - ] ++ emqx_connector_schema_lib:ssl_fields(); + fields(common) ++ influxdb_api_v1_fields() ++ emqx_connector_schema_lib:ssl_fields(); fields(influxdb_api_v2) -> - fields(common) ++ - [ - {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, - {org, mk(binary(), #{required => true, desc => ?DESC("org")})}, - {token, emqx_schema_secret:mk(#{required => true, desc => ?DESC("token")})} - ] ++ emqx_connector_schema_lib:ssl_fields(). + fields(common) ++ influxdb_api_v2_fields() ++ emqx_connector_schema_lib:ssl_fields(). + +influxdb_type_field(Type) -> + {influxdb_type, #{ + required => true, + type => Type, + default => Type, + desc => ?DESC(atom_to_list(Type)) + }}. +server_field() -> + {server, server()}. + +precision_field() -> + {precision, + %% The influxdb only supports these 4 precision: + %% See "https://github.com/influxdata/influxdb/blob/ + %% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for + %% more information. + mk(enum([ns, us, ms, s]), #{ + required => false, default => ms, desc => ?DESC("precision") + })}. + +influxdb_api_v1_fields() -> + [ + {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, + {username, mk(binary(), #{desc => ?DESC("username")})}, + {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})} + ]. + +influxdb_api_v2_fields() -> + [ + {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, + {org, mk(binary(), #{required => true, desc => ?DESC("org")})}, + {token, emqx_schema_secret:mk(#{required => true, desc => ?DESC("token")})} + ]. server() -> Meta = #{ @@ -216,6 +278,10 @@ desc(common) -> desc(influxdb_api_v1) -> ?DESC("influxdb_api_v1"); desc(influxdb_api_v2) -> + ?DESC("influxdb_api_v2"); +desc("connector_influxdb_api_v1") -> + ?DESC("influxdb_api_v1"); +desc("connector_influxdb_api_v2") -> ?DESC("influxdb_api_v2"). %% ------------------------------------------------------------------------------------------------- @@ -248,22 +314,14 @@ start_client(InstId, Config) -> {error, R} end. -do_start_client( - InstId, - ClientConfig, - Config = #{write_syntax := Lines} -) -> - Precision = maps:get(precision, Config, ms), +do_start_client(InstId, ClientConfig, Config) -> case influxdb:start_client(ClientConfig) of {ok, Client} -> case influxdb:is_alive(Client, true) of true -> case influxdb:check_auth(Client) of ok -> - State = #{ - client => Client, - write_syntax => to_config(Lines, Precision) - }, + State = #{client => Client, channels => #{}}, ?SLOG(info, #{ msg => "starting_influxdb_connector_success", connector => InstId, @@ -333,23 +391,17 @@ client_config( ] ++ protocol_config(Config). %% api v1 config -protocol_config( - #{ - database := DB, - ssl := SSL - } = Config -) -> +protocol_config(#{ + parameters := #{influxdb_type := influxdb_api_v1, database := DB} = Params, ssl := SSL +}) -> [ {protocol, http}, {version, v1}, {database, str(DB)} - ] ++ username(Config) ++ - password(Config) ++ ssl_config(SSL); + ] ++ username(Params) ++ password(Params) ++ ssl_config(SSL); %% api v2 config protocol_config(#{ - bucket := Bucket, - org := Org, - token := Token, + parameters := #{influxdb_type := influxdb_api_v2, bucket := Bucket, org := Org, token := Token}, ssl := SSL }) -> [ @@ -501,7 +553,7 @@ to_maps_config(K, V, Res) -> %% Tags & Fields Data Trans parse_batch_data(InstId, BatchData, SyntaxLines) -> {Points, Errors} = lists:foldl( - fun({send_message, Data}, {ListOfPoints, ErrAccIn}) -> + fun({_, Data}, {ListOfPoints, ErrAccIn}) -> case data_to_points(Data, SyntaxLines) of {ok, Points} -> {[Points | ListOfPoints], ErrAccIn}; diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 4e8618915..6fe08ba4a 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -34,6 +34,8 @@ resource_type(matrix) -> emqx_postgresql; resource_type(mongodb) -> emqx_bridge_mongodb_connector; +resource_type(influxdb) -> + emqx_bridge_influxdb_connector; resource_type(mysql) -> emqx_bridge_mysql_connector; resource_type(pgsql) -> @@ -112,6 +114,14 @@ connector_structs() -> required => false } )}, + {influxdb, + mk( + hoconsc:map(name, ref(emqx_bridge_influxdb, "config_connector")), + #{ + desc => <<"InfluxDB Connector Config">>, + required => false + } + )}, {mysql, mk( hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")), @@ -170,6 +180,7 @@ schema_modules() -> emqx_bridge_kafka, emqx_bridge_matrix, emqx_bridge_mongodb, + emqx_bridge_influxdb, emqx_bridge_mysql, emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy, @@ -196,6 +207,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), + api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"), api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index f67cd6991..c491fbed0 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -131,6 +131,8 @@ connector_type_to_bridge_types(matrix) -> [matrix]; connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; +connector_type_to_bridge_types(influxdb) -> + [influxdb, influxdb_api_v1, influxdb_api_v2]; connector_type_to_bridge_types(mysql) -> [mysql]; connector_type_to_bridge_types(pgsql) -> diff --git a/rel/i18n/emqx_bridge_influxdb.hocon b/rel/i18n/emqx_bridge_influxdb.hocon index 48454bbd3..ed628b325 100644 --- a/rel/i18n/emqx_bridge_influxdb.hocon +++ b/rel/i18n/emqx_bridge_influxdb.hocon @@ -47,4 +47,19 @@ Please note that a placeholder for an integer value must be annotated with a suf write_syntax.label: """Write Syntax""" +action_parameters.label: +"""Action Parameters""" +action_parameters.desc: +"""Additional parameters specific to this action type""" + +influxdb_action.label: +"""InfluxDB Action""" +influxdb_action.desc: +"""Action to interact with a InfluxDB connector""" + +influxdb_parameters.label: +"""InfluxdbDB Type Specific Parameters""" +influxdb_parameters.desc: +"""Set of parameters specific for the given type of this InfluxdbDB connector, `influxdb_type` can be one of `influxdb_api_v1`, `influxdb_api_v1`.""" + } From 904bd0270fa2de2968346146b45873648c46d688 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 2 Jan 2024 18:04:06 +0800 Subject: [PATCH 2/6] chore: bump app versions --- apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src | 2 +- apps/emqx_connector/src/emqx_connector.app.src | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src index 864d8945f..de58c2170 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_influxdb, [ {description, "EMQX Enterprise InfluxDB Bridge"}, - {vsn, "0.1.7"}, + {vsn, "0.2.0"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index 3e781dae0..261c70d34 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.1.36"}, + {vsn, "0.1.37"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ From 19e2ec974810b7fdcc15d8c5446d98e109e64eba Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 3 Jan 2024 18:39:29 +0800 Subject: [PATCH 3/6] fix: update influxdb testcases --- apps/emqx_bridge/src/emqx_action_info.erl | 1 + apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- .../src/emqx_bridge_influxdb_action_info.erl | 9 ++-- .../src/emqx_bridge_influxdb_connector.erl | 37 ++++++++++++--- .../test/emqx_bridge_influxdb_SUITE.erl | 45 ++++++++++--------- 5 files changed, 61 insertions(+), 33 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index e9c51edfa..411ab54f1 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -80,6 +80,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_kafka_action_info, emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, + emqx_bridge_influxdb_action_info, emqx_bridge_mysql_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index d9bded51b..cadbf35a0 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.32"}, + {vsn, "0.1.33"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl index a74e88df6..00a6c5510 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl @@ -61,11 +61,10 @@ connector_type_name() -> influxdb. schema_module() -> ?SCHEMA_MODULE. -bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"influxdb_type">> := Type}}, _}) -> - v1_type(Type). - -v1_type(<<"influxdb_api_v1">>) -> influxdb_api_v1; -v1_type(<<"influxdb_api_v2">>) -> influxdb_api_v2. +bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"database">> := _}}, _}) -> + influxdb_api_v1; +bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"bucket">> := _}}, _}) -> + influxdb_api_v2. make_config_map(PickKeys, IndentKeys, Config) -> Conf0 = maps:with(PickKeys, Config), diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 6f4e21df5..8e7ca8a73 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -38,6 +38,8 @@ desc/1 ]). +-export([transform_bridge_v1_config_to_connector_config/1]). + -export([precision_field/0, server_field/0]). %% only for test @@ -63,7 +65,7 @@ callback_mode() -> async_if_possible. on_add_channel( _InstanceId, - #{channels := Channels} = OldState, + #{channels := Channels, client := Client} = OldState, ChannelId, #{parameters := Parameters} = ChannelConfig0 ) -> @@ -72,10 +74,13 @@ on_add_channel( ChannelConfig = maps:merge( Parameters, ChannelConfig0#{ + channel_client => influxdb:update_precision(Client, Precision), write_syntax => to_config(WriteSytaxTmpl, Precision) } ), - {ok, OldState#{channels => maps:put(ChannelId, ChannelConfig, Channels)}}. + {ok, OldState#{ + channels => maps:put(ChannelId, ChannelConfig, Channels) + }}. on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> NewState = State#{channels => maps:remove(ChannelId, Channels)}, @@ -108,8 +113,9 @@ on_stop(InstId, _State) -> ok end. -on_query(InstId, {Channel, Message}, #{channels := ChannelConf, client := Client}) -> +on_query(InstId, {Channel, Message}, #{channels := ChannelConf}) -> #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + #{channel_client := Client} = maps:get(Channel, ChannelConf), case data_to_points(Message, SyntaxLines) of {ok, Points} -> ?tp( @@ -128,9 +134,10 @@ on_query(InstId, {Channel, Message}, #{channels := ChannelConf, client := Client %% Once a Batched Data trans to points failed. %% This batch query failed -on_batch_query(InstId, BatchData, #{channels := ChannelConf, client := Client}) -> +on_batch_query(InstId, BatchData, #{channels := ChannelConf}) -> [{Channel, _} | _] = BatchData, #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + #{channel_client := Client} = maps:get(Channel, ChannelConf), case parse_batch_data(InstId, BatchData, SyntaxLines) of {ok, Points} -> ?tp( @@ -150,9 +157,10 @@ on_query_async( InstId, {Channel, Message}, {ReplyFun, Args}, - #{channels := ChannelConf, client := Client} + #{channels := ChannelConf} ) -> #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + #{channel_client := Client} = maps:get(Channel, ChannelConf), case data_to_points(Message, SyntaxLines) of {ok, Points} -> ?tp( @@ -173,10 +181,11 @@ on_batch_query_async( InstId, BatchData, {ReplyFun, Args}, - #{channels := ChannelConf, client := Client} + #{channels := ChannelConf} ) -> [{Channel, _} | _] = BatchData, #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + #{channel_client := Client} = maps:get(Channel, ChannelConf), case parse_batch_data(InstId, BatchData, SyntaxLines) of {ok, Points} -> ?tp( @@ -200,6 +209,22 @@ on_get_status(_InstId, #{client := Client}) -> disconnected end. +transform_bridge_v1_config_to_connector_config(BridgeV1Config) -> + IndentKeys = [username, password, database, token, bucket, org], + ConnConfig0 = maps:without([write_syntax, precision], BridgeV1Config), + ConnConfig1 = + case emqx_utils_maps:indent(parameters, IndentKeys, ConnConfig0) of + #{parameters := #{database := _} = Params} = Conf -> + Conf#{parameters => Params#{influxdb_type => influxdb_api_v1}}; + #{parameters := #{bucket := _} = Params} = Conf -> + Conf#{parameters => Params#{influxdb_type => influxdb_api_v2}} + end, + emqx_utils_maps:update_if_present( + resource_opts, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnConfig1 + ). + %% ------------------------------------------------------------------------------------------------- %% schema namespace() -> connector_influxdb. diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index c0d63002b..02bfa60fa 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -537,11 +537,11 @@ t_start_ok(Config) -> begin case QueryMode of async -> - ?assertMatch(ok, send_message(Config, SentData)), - ct:sleep(500); + ?assertMatch(ok, send_message(Config, SentData)); sync -> ?assertMatch({ok, 204, _}, send_message(Config, SentData)) end, + ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), Expected = #{ bool => <<"true">>, @@ -594,8 +594,11 @@ t_start_already_started(Config) -> {ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check( emqx_bridge_schema, InfluxDBConfigString ), + ConnConfigMap = emqx_bridge_influxdb_connector:transform_bridge_v1_config_to_connector_config( + InfluxDBConfigMap + ), ?check_trace( - emqx_bridge_influxdb_connector:on_start(ResourceId, InfluxDBConfigMap), + emqx_bridge_influxdb_connector:on_start(ResourceId, ConnConfigMap), fun(Result, Trace) -> ?assertMatch({ok, _}, Result), ?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)), @@ -700,11 +703,11 @@ t_const_timestamp(Config) -> }, case QueryMode of async -> - ?assertMatch(ok, send_message(Config, SentData)), - ct:sleep(500); + ?assertMatch(ok, send_message(Config, SentData)); sync -> ?assertMatch({ok, 204, _}, send_message(Config, SentData)) end, + ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), Expected = #{foo => <<"123">>}, assert_persisted_data(ClientId, Expected, PersistedData), @@ -762,10 +765,7 @@ t_boolean_variants(Config) -> async -> ?assertMatch(ok, send_message(Config, SentData)) end, - case QueryMode of - async -> ct:sleep(500); - sync -> ok - end, + ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), Expected = #{ bool => atom_to_binary(Translation), @@ -817,9 +817,10 @@ t_any_num_as_float(Config) -> ?assertMatch({ok, 204, _}, send_message(Config, SentData)), ok; async -> - ?assertMatch(ok, send_message(Config, SentData)), - ct:sleep(500) + ?assertMatch(ok, send_message(Config, SentData)) end, + %% sleep is still need even in sync mode, or we would get an empty result sometimes + ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), Expected = #{float_no_dp => <<"123">>, float_dp => <<"123">>}, assert_persisted_data(ClientId, Expected, PersistedData), @@ -938,10 +939,13 @@ t_create_disconnected(Config) -> ?assertMatch({ok, _}, create_bridge(Config)) end), fun(Trace) -> - ?assertMatch( - [#{error := influxdb_client_not_alive, reason := econnrefused}], - ?of_kind(influxdb_connector_start_failed, Trace) - ), + [#{error := influxdb_client_not_alive, reason := Reason}] = + ?of_kind(influxdb_connector_start_failed, Trace), + case Reason of + econnrefused -> ok; + {closed, _} -> ok; + _ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason]) + end, ok end ), @@ -1146,10 +1150,8 @@ t_authentication_error(Config0) -> ok. t_authentication_error_on_get_status(Config0) -> - ResourceId = resource_id(Config0), - % Fake initialization to simulate credential update after bridge was created. - emqx_common_test_helpers:with_mock( + ResourceId = emqx_common_test_helpers:with_mock( influxdb, check_auth, fun(_) -> @@ -1165,20 +1167,20 @@ t_authentication_error_on_get_status(Config0) -> end, Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}), {ok, _} = create_bridge(Config), + ResourceId = resource_id(Config0), ?retry( _Sleep = 1_000, _Attempts = 10, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) - ) + ), + ResourceId end ), - % Now back to wrong credentials ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), ok. t_authentication_error_on_send_message(Config0) -> - ResourceId = resource_id(Config0), QueryMode = proplists:get_value(query_mode, Config0, sync), InfluxDBType = ?config(influxdb_type, Config0), InfluxConfig0 = proplists:get_value(influxdb_config, Config0), @@ -1198,6 +1200,7 @@ t_authentication_error_on_send_message(Config0) -> end, fun() -> {ok, _} = create_bridge(Config), + ResourceId = resource_id(Config), ?retry( _Sleep = 1_000, _Attempts = 10, From 8ebd233a4652a852c8b99d7616c169862c253ea9 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 4 Jan 2024 10:20:52 +0800 Subject: [PATCH 4/6] chore: update influxdb-erl to 1.1.12 --- apps/emqx_bridge_influxdb/rebar.config | 2 +- apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl | 4 ++++ mix.exs | 2 +- rel/i18n/emqx_bridge_influxdb.hocon | 4 ++-- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge_influxdb/rebar.config b/apps/emqx_bridge_influxdb/rebar.config index 8acb242eb..c6ad26ac1 100644 --- a/apps/emqx_bridge_influxdb/rebar.config +++ b/apps/emqx_bridge_influxdb/rebar.config @@ -3,7 +3,7 @@ {erl_opts, [debug_info]}. {deps, [ - {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.11"}}}, + {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.12"}}}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl index 8d5360887..8f5a552a7 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl @@ -274,6 +274,10 @@ desc(influxdb_action) -> ?DESC(influxdb_action); desc(action_parameters) -> ?DESC(action_parameters); +desc("config_connector") -> + ?DESC("desc_config"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. diff --git a/mix.exs b/mix.exs index 26617d54b..2be11e957 100644 --- a/mix.exs +++ b/mix.exs @@ -199,7 +199,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, - {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true}, + {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.12", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.9.1"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, diff --git a/rel/i18n/emqx_bridge_influxdb.hocon b/rel/i18n/emqx_bridge_influxdb.hocon index ed628b325..44226214f 100644 --- a/rel/i18n/emqx_bridge_influxdb.hocon +++ b/rel/i18n/emqx_bridge_influxdb.hocon @@ -58,8 +58,8 @@ influxdb_action.desc: """Action to interact with a InfluxDB connector""" influxdb_parameters.label: -"""InfluxdbDB Type Specific Parameters""" +"""InfluxDB Type Specific Parameters""" influxdb_parameters.desc: -"""Set of parameters specific for the given type of this InfluxdbDB connector, `influxdb_type` can be one of `influxdb_api_v1`, `influxdb_api_v1`.""" +"""Set of parameters specific for the given type of this InfluxDB connector, `influxdb_type` can be one of `influxdb_api_v1`, `influxdb_api_v1`.""" } From 9714092525f00d3df6d7031a891b099e6312b245 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 4 Jan 2024 17:48:23 +0800 Subject: [PATCH 5/6] fix: fix: update more influxdb testcases --- .../src/emqx_bridge_influxdb.erl | 19 ++----- .../src/emqx_bridge_influxdb_connector.erl | 44 ++++++++++----- .../test/emqx_bridge_influxdb_SUITE.erl | 1 + .../emqx_bridge_influxdb_connector_SUITE.erl | 56 ++++++++++++++----- rel/i18n/emqx_bridge_influxdb.hocon | 5 ++ 5 files changed, 83 insertions(+), 42 deletions(-) diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl index 8f5a552a7..aea2df858 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl @@ -165,7 +165,7 @@ roots() -> []. fields("config_connector") -> emqx_connector_schema:common_fields() ++ - connection_fields() ++ + emqx_bridge_influxdb_connector:fields("connector") ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields("post_api_v1") -> method_fields(post, influxdb_api_v1); @@ -204,7 +204,7 @@ fields(Field) when Field == "post_connector" -> Fields = - connection_fields() ++ + emqx_bridge_influxdb_connector:fields("connector") ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); fields(Field) when @@ -219,19 +219,6 @@ fields(Type) when influxdb_bridge_common_fields() ++ connector_fields(Type). -connection_fields() -> - [ - emqx_bridge_influxdb_connector:server_field(), - {parameters, - mk( - hoconsc:union([ - ref(emqx_bridge_influxdb_connector, "connector_" ++ T) - || T <- ["influxdb_api_v1", "influxdb_api_v2"] - ]), - #{required => true, desc => ?DESC("influxdb_parameters")} - )} - ] ++ emqx_connector_schema_lib:ssl_fields(). - method_fields(post, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType) ++ @@ -274,6 +261,8 @@ desc(influxdb_action) -> ?DESC(influxdb_action); desc(action_parameters) -> ?DESC(action_parameters); +desc(parameters) -> + ?DESC("influxdb_parameters"); desc("config_connector") -> ?DESC("desc_config"); desc(connector_resource_opts) -> diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 8e7ca8a73..89310405b 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -40,7 +40,7 @@ -export([transform_bridge_v1_config_to_connector_config/1]). --export([precision_field/0, server_field/0]). +-export([precision_field/0]). %% only for test -export([is_unrecoverable_error/1]). @@ -232,28 +232,30 @@ namespace() -> connector_influxdb. roots() -> [ {config, #{ - type => hoconsc:union( - [ - hoconsc:ref(?MODULE, influxdb_api_v1), - hoconsc:ref(?MODULE, influxdb_api_v2) - ] - ) + type => hoconsc:ref(?MODULE, "connector") }} ]. -fields(common) -> +fields("connector") -> [ server_field(), - precision_field() - ]; + parameter_field() + ] ++ emqx_connector_schema_lib:ssl_fields(); fields("connector_influxdb_api_v1") -> [influxdb_type_field(influxdb_api_v1) | influxdb_api_v1_fields()]; fields("connector_influxdb_api_v2") -> [influxdb_type_field(influxdb_api_v2) | influxdb_api_v2_fields()]; +%% ============ begin: schema for old bridge configs ============ fields(influxdb_api_v1) -> - fields(common) ++ influxdb_api_v1_fields() ++ emqx_connector_schema_lib:ssl_fields(); + fields(common) ++ influxdb_api_v1_fields(); fields(influxdb_api_v2) -> - fields(common) ++ influxdb_api_v2_fields() ++ emqx_connector_schema_lib:ssl_fields(). + fields(common) ++ influxdb_api_v2_fields(); +fields(common) -> + [ + server_field(), + precision_field() + ] ++ emqx_connector_schema_lib:ssl_fields(). +%% ============ end: schema for old bridge configs ============ influxdb_type_field(Type) -> {influxdb_type, #{ @@ -262,6 +264,7 @@ influxdb_type_field(Type) -> default => Type, desc => ?DESC(atom_to_list(Type)) }}. + server_field() -> {server, server()}. @@ -275,6 +278,16 @@ precision_field() -> required => false, default => ms, desc => ?DESC("precision") })}. +parameter_field() -> + {parameters, + mk( + hoconsc:union([ + ref(?MODULE, "connector_" ++ T) + || T <- ["influxdb_api_v1", "influxdb_api_v2"] + ]), + #{required => true, desc => ?DESC("influxdb_parameters")} + )}. + influxdb_api_v1_fields() -> [ {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, @@ -300,10 +313,14 @@ server() -> desc(common) -> ?DESC("common"); +desc(parameters) -> + ?DESC("influxdb_parameters"); desc(influxdb_api_v1) -> ?DESC("influxdb_api_v1"); desc(influxdb_api_v2) -> ?DESC("influxdb_api_v2"); +desc("connector") -> + ?DESC("connector"); desc("connector_influxdb_api_v1") -> ?DESC("influxdb_api_v1"); desc("connector_influxdb_api_v2") -> @@ -411,8 +428,7 @@ client_config( {host, str(Host)}, {port, Port}, {pool_size, erlang:system_info(schedulers)}, - {pool, InstId}, - {precision, atom_to_binary(maps:get(precision, Config, ms), utf8)} + {pool, InstId} ] ++ protocol_config(Config). %% api v1 config diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index 02bfa60fa..d79139f17 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -943,6 +943,7 @@ t_create_disconnected(Config) -> ?of_kind(influxdb_connector_start_failed, Trace), case Reason of econnrefused -> ok; + closed -> ok; {closed, _} -> ok; _ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason]) end, diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl index 94e8e3fad..915662b24 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl @@ -66,7 +66,7 @@ t_lifecycle(Config) -> Port = ?config(influxdb_tcp_port, Config), perform_lifecycle_check( <<"emqx_bridge_influxdb_connector_SUITE">>, - influxdb_config(Host, Port, false, <<"verify_none">>) + influxdb_connector_config(Host, Port, false, <<"verify_none">>) ). perform_lifecycle_check(PoolName, InitialConfig) -> @@ -76,6 +76,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> % expects this FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()}, {ok, #{ + id := ResourceId, state := #{client := #{pool := ReturnedPoolName}} = State, status := InitialStatus }} = emqx_resource:create_local( @@ -93,8 +94,18 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + %% install actions to the connector + ActionConfig = influxdb_action_config(), + ChannelId = <<"test_channel">>, + ?assertEqual( + ok, + emqx_resource_manager:add_channel( + ResourceId, ChannelId, ActionConfig + ) + ), + ?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)), % % Perform query as further check that the resource is working as expected - ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())), + ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))), ?assertEqual(ok, emqx_resource:stop(PoolName)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. @@ -116,7 +127,15 @@ perform_lifecycle_check(PoolName, InitialConfig) -> {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), - ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())), + ChannelId = <<"test_channel">>, + ?assertEqual( + ok, + emqx_resource_manager:add_channel( + ResourceId, ChannelId, ActionConfig + ) + ), + ?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)), + ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))), % Stop and remove the resource in one go. ?assertEqual(ok, emqx_resource:remove_local(PoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), @@ -127,7 +146,7 @@ t_tls_verify_none(Config) -> PoolName = <<"testpool-1">>, Host = ?config(influxdb_tls_host, Config), Port = ?config(influxdb_tls_port, Config), - InitialConfig = influxdb_config(Host, Port, true, <<"verify_none">>), + InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_none">>), ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid), ?assertEqual(connected, ValidStatus), InvalidStatus = perform_tls_opts_check(PoolName, InitialConfig, fail), @@ -138,7 +157,7 @@ t_tls_verify_peer(Config) -> PoolName = <<"testpool-2">>, Host = ?config(influxdb_tls_host, Config), Port = ?config(influxdb_tls_port, Config), - InitialConfig = influxdb_config(Host, Port, true, <<"verify_peer">>), + InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_peer">>), %% This works without a CA-cert & friends since we are using a mock ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid), ?assertEqual(connected, ValidStatus), @@ -191,19 +210,30 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) -> % %% Helpers % %%------------------------------------------------------------------------------ -influxdb_config(Host, Port, SslEnabled, Verify) -> +influxdb_connector_config(Host, Port, SslEnabled, Verify) -> Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])), - ResourceConfig = #{ - <<"bucket">> => <<"mqtt">>, - <<"org">> => <<"emqx">>, - <<"token">> => <<"abcdefg">>, + ConnectorConf = #{ + <<"parameters">> => #{ + <<"influxdb_type">> => <<"influxdb_api_v2">>, + <<"bucket">> => <<"mqtt">>, + <<"org">> => <<"emqx">>, + <<"token">> => <<"abcdefg">> + }, <<"server">> => Server, <<"ssl">> => #{ <<"enable">> => SslEnabled, <<"verify">> => Verify } }, - #{<<"config">> => ResourceConfig}. + #{<<"config">> => ConnectorConf}. + +influxdb_action_config() -> + #{ + parameters => #{ + write_syntax => influxdb_write_syntax(), + precision => ms + } + }. custom_verify() -> fun @@ -227,8 +257,8 @@ influxdb_write_syntax() -> } ]. -test_query() -> - {send_message, #{ +test_query(ChannelId) -> + {ChannelId, #{ <<"clientid">> => <<"something">>, <<"payload">> => #{bool => true}, <<"topic">> => <<"connector_test">>, diff --git a/rel/i18n/emqx_bridge_influxdb.hocon b/rel/i18n/emqx_bridge_influxdb.hocon index 44226214f..412203cf9 100644 --- a/rel/i18n/emqx_bridge_influxdb.hocon +++ b/rel/i18n/emqx_bridge_influxdb.hocon @@ -52,6 +52,11 @@ action_parameters.label: action_parameters.desc: """Additional parameters specific to this action type""" +connector.label: +"""InfluxDB Connector""" +connector.desc: +"""InfluxDB Connector Configs""" + influxdb_action.label: """InfluxDB Action""" influxdb_action.desc: From 2f08471303c9ca579e31afd2b293eae403e35698 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 5 Jan 2024 11:59:13 +0800 Subject: [PATCH 6/6] fix: cannot found desc for influxdb_parameters --- apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl | 2 -- .../src/emqx_bridge_influxdb_connector.erl | 2 ++ rel/i18n/emqx_bridge_influxdb.hocon | 5 ----- rel/i18n/emqx_bridge_influxdb_connector.hocon | 5 +++++ 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl index aea2df858..4228d23d5 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl @@ -261,8 +261,6 @@ desc(influxdb_action) -> ?DESC(influxdb_action); desc(action_parameters) -> ?DESC(action_parameters); -desc(parameters) -> - ?DESC("influxdb_parameters"); desc("config_connector") -> ?DESC("desc_config"); desc(connector_resource_opts) -> diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 89310405b..7a84bc440 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -315,6 +315,8 @@ desc(common) -> ?DESC("common"); desc(parameters) -> ?DESC("influxdb_parameters"); +desc("influxdb_parameters") -> + ?DESC("influxdb_parameters"); desc(influxdb_api_v1) -> ?DESC("influxdb_api_v1"); desc(influxdb_api_v2) -> diff --git a/rel/i18n/emqx_bridge_influxdb.hocon b/rel/i18n/emqx_bridge_influxdb.hocon index 412203cf9..5a81334db 100644 --- a/rel/i18n/emqx_bridge_influxdb.hocon +++ b/rel/i18n/emqx_bridge_influxdb.hocon @@ -62,9 +62,4 @@ influxdb_action.label: influxdb_action.desc: """Action to interact with a InfluxDB connector""" -influxdb_parameters.label: -"""InfluxDB Type Specific Parameters""" -influxdb_parameters.desc: -"""Set of parameters specific for the given type of this InfluxDB connector, `influxdb_type` can be one of `influxdb_api_v1`, `influxdb_api_v1`.""" - } diff --git a/rel/i18n/emqx_bridge_influxdb_connector.hocon b/rel/i18n/emqx_bridge_influxdb_connector.hocon index ce79c2a93..4020afd6f 100644 --- a/rel/i18n/emqx_bridge_influxdb_connector.hocon +++ b/rel/i18n/emqx_bridge_influxdb_connector.hocon @@ -68,4 +68,9 @@ username.desc: username.label: """Username""" +influxdb_parameters.label: +"""InfluxDB Type Specific Parameters""" +influxdb_parameters.desc: +"""Set of parameters specific for the given type of this InfluxDB connector, `influxdb_type` can be one of `influxdb_api_v1`, `influxdb_api_v1`.""" + }