diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index e9c51edfa..411ab54f1 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -80,6 +80,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_kafka_action_info, emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, + emqx_bridge_influxdb_action_info, emqx_bridge_mysql_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index d9bded51b..cadbf35a0 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.32"}, + {vsn, "0.1.33"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl index a74e88df6..00a6c5510 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl @@ -61,11 +61,10 @@ connector_type_name() -> influxdb. schema_module() -> ?SCHEMA_MODULE. -bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"influxdb_type">> := Type}}, _}) -> - v1_type(Type). - -v1_type(<<"influxdb_api_v1">>) -> influxdb_api_v1; -v1_type(<<"influxdb_api_v2">>) -> influxdb_api_v2. +bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"database">> := _}}, _}) -> + influxdb_api_v1; +bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"bucket">> := _}}, _}) -> + influxdb_api_v2. make_config_map(PickKeys, IndentKeys, Config) -> Conf0 = maps:with(PickKeys, Config), diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 6f4e21df5..8e7ca8a73 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -38,6 +38,8 @@ desc/1 ]). +-export([transform_bridge_v1_config_to_connector_config/1]). + -export([precision_field/0, server_field/0]). %% only for test @@ -63,7 +65,7 @@ callback_mode() -> async_if_possible. on_add_channel( _InstanceId, - #{channels := Channels} = OldState, + #{channels := Channels, client := Client} = OldState, ChannelId, #{parameters := Parameters} = ChannelConfig0 ) -> @@ -72,10 +74,13 @@ on_add_channel( ChannelConfig = maps:merge( Parameters, ChannelConfig0#{ + channel_client => influxdb:update_precision(Client, Precision), write_syntax => to_config(WriteSytaxTmpl, Precision) } ), - {ok, OldState#{channels => maps:put(ChannelId, ChannelConfig, Channels)}}. + {ok, OldState#{ + channels => maps:put(ChannelId, ChannelConfig, Channels) + }}. on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> NewState = State#{channels => maps:remove(ChannelId, Channels)}, @@ -108,8 +113,9 @@ on_stop(InstId, _State) -> ok end. -on_query(InstId, {Channel, Message}, #{channels := ChannelConf, client := Client}) -> +on_query(InstId, {Channel, Message}, #{channels := ChannelConf}) -> #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + #{channel_client := Client} = maps:get(Channel, ChannelConf), case data_to_points(Message, SyntaxLines) of {ok, Points} -> ?tp( @@ -128,9 +134,10 @@ on_query(InstId, {Channel, Message}, #{channels := ChannelConf, client := Client %% Once a Batched Data trans to points failed. %% This batch query failed -on_batch_query(InstId, BatchData, #{channels := ChannelConf, client := Client}) -> +on_batch_query(InstId, BatchData, #{channels := ChannelConf}) -> [{Channel, _} | _] = BatchData, #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + #{channel_client := Client} = maps:get(Channel, ChannelConf), case parse_batch_data(InstId, BatchData, SyntaxLines) of {ok, Points} -> ?tp( @@ -150,9 +157,10 @@ on_query_async( InstId, {Channel, Message}, {ReplyFun, Args}, - #{channels := ChannelConf, client := Client} + #{channels := ChannelConf} ) -> #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + #{channel_client := Client} = maps:get(Channel, ChannelConf), case data_to_points(Message, SyntaxLines) of {ok, Points} -> ?tp( @@ -173,10 +181,11 @@ on_batch_query_async( InstId, BatchData, {ReplyFun, Args}, - #{channels := ChannelConf, client := Client} + #{channels := ChannelConf} ) -> [{Channel, _} | _] = BatchData, #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + #{channel_client := Client} = maps:get(Channel, ChannelConf), case parse_batch_data(InstId, BatchData, SyntaxLines) of {ok, Points} -> ?tp( @@ -200,6 +209,22 @@ on_get_status(_InstId, #{client := Client}) -> disconnected end. +transform_bridge_v1_config_to_connector_config(BridgeV1Config) -> + IndentKeys = [username, password, database, token, bucket, org], + ConnConfig0 = maps:without([write_syntax, precision], BridgeV1Config), + ConnConfig1 = + case emqx_utils_maps:indent(parameters, IndentKeys, ConnConfig0) of + #{parameters := #{database := _} = Params} = Conf -> + Conf#{parameters => Params#{influxdb_type => influxdb_api_v1}}; + #{parameters := #{bucket := _} = Params} = Conf -> + Conf#{parameters => Params#{influxdb_type => influxdb_api_v2}} + end, + emqx_utils_maps:update_if_present( + resource_opts, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnConfig1 + ). + %% ------------------------------------------------------------------------------------------------- %% schema namespace() -> connector_influxdb. diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index c0d63002b..02bfa60fa 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -537,11 +537,11 @@ t_start_ok(Config) -> begin case QueryMode of async -> - ?assertMatch(ok, send_message(Config, SentData)), - ct:sleep(500); + ?assertMatch(ok, send_message(Config, SentData)); sync -> ?assertMatch({ok, 204, _}, send_message(Config, SentData)) end, + ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), Expected = #{ bool => <<"true">>, @@ -594,8 +594,11 @@ t_start_already_started(Config) -> {ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check( emqx_bridge_schema, InfluxDBConfigString ), + ConnConfigMap = emqx_bridge_influxdb_connector:transform_bridge_v1_config_to_connector_config( + InfluxDBConfigMap + ), ?check_trace( - emqx_bridge_influxdb_connector:on_start(ResourceId, InfluxDBConfigMap), + emqx_bridge_influxdb_connector:on_start(ResourceId, ConnConfigMap), fun(Result, Trace) -> ?assertMatch({ok, _}, Result), ?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)), @@ -700,11 +703,11 @@ t_const_timestamp(Config) -> }, case QueryMode of async -> - ?assertMatch(ok, send_message(Config, SentData)), - ct:sleep(500); + ?assertMatch(ok, send_message(Config, SentData)); sync -> ?assertMatch({ok, 204, _}, send_message(Config, SentData)) end, + ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), Expected = #{foo => <<"123">>}, assert_persisted_data(ClientId, Expected, PersistedData), @@ -762,10 +765,7 @@ t_boolean_variants(Config) -> async -> ?assertMatch(ok, send_message(Config, SentData)) end, - case QueryMode of - async -> ct:sleep(500); - sync -> ok - end, + ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), Expected = #{ bool => atom_to_binary(Translation), @@ -817,9 +817,10 @@ t_any_num_as_float(Config) -> ?assertMatch({ok, 204, _}, send_message(Config, SentData)), ok; async -> - ?assertMatch(ok, send_message(Config, SentData)), - ct:sleep(500) + ?assertMatch(ok, send_message(Config, SentData)) end, + %% sleep is still need even in sync mode, or we would get an empty result sometimes + ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), Expected = #{float_no_dp => <<"123">>, float_dp => <<"123">>}, assert_persisted_data(ClientId, Expected, PersistedData), @@ -938,10 +939,13 @@ t_create_disconnected(Config) -> ?assertMatch({ok, _}, create_bridge(Config)) end), fun(Trace) -> - ?assertMatch( - [#{error := influxdb_client_not_alive, reason := econnrefused}], - ?of_kind(influxdb_connector_start_failed, Trace) - ), + [#{error := influxdb_client_not_alive, reason := Reason}] = + ?of_kind(influxdb_connector_start_failed, Trace), + case Reason of + econnrefused -> ok; + {closed, _} -> ok; + _ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason]) + end, ok end ), @@ -1146,10 +1150,8 @@ t_authentication_error(Config0) -> ok. t_authentication_error_on_get_status(Config0) -> - ResourceId = resource_id(Config0), - % Fake initialization to simulate credential update after bridge was created. - emqx_common_test_helpers:with_mock( + ResourceId = emqx_common_test_helpers:with_mock( influxdb, check_auth, fun(_) -> @@ -1165,20 +1167,20 @@ t_authentication_error_on_get_status(Config0) -> end, Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}), {ok, _} = create_bridge(Config), + ResourceId = resource_id(Config0), ?retry( _Sleep = 1_000, _Attempts = 10, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) - ) + ), + ResourceId end ), - % Now back to wrong credentials ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), ok. t_authentication_error_on_send_message(Config0) -> - ResourceId = resource_id(Config0), QueryMode = proplists:get_value(query_mode, Config0, sync), InfluxDBType = ?config(influxdb_type, Config0), InfluxConfig0 = proplists:get_value(influxdb_config, Config0), @@ -1198,6 +1200,7 @@ t_authentication_error_on_send_message(Config0) -> end, fun() -> {ok, _} = create_bridge(Config), + ResourceId = resource_id(Config), ?retry( _Sleep = 1_000, _Attempts = 10,