fix: update influxdb testcases

This commit is contained in:
Shawn 2024-01-03 18:39:29 +08:00
parent 904bd0270f
commit 19e2ec9748
5 changed files with 61 additions and 33 deletions

View File

@ -80,6 +80,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_kafka_action_info,
emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_influxdb_action_info,
emqx_bridge_mysql_action_info,
emqx_bridge_pgsql_action_info,
emqx_bridge_syskeeper_action_info,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge, [
{description, "EMQX bridges"},
{vsn, "0.1.32"},
{vsn, "0.1.33"},
{registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}},
{applications, [

View File

@ -61,11 +61,10 @@ connector_type_name() -> influxdb.
schema_module() -> ?SCHEMA_MODULE.
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"influxdb_type">> := Type}}, _}) ->
v1_type(Type).
v1_type(<<"influxdb_api_v1">>) -> influxdb_api_v1;
v1_type(<<"influxdb_api_v2">>) -> influxdb_api_v2.
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"database">> := _}}, _}) ->
influxdb_api_v1;
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"bucket">> := _}}, _}) ->
influxdb_api_v2.
make_config_map(PickKeys, IndentKeys, Config) ->
Conf0 = maps:with(PickKeys, Config),

View File

@ -38,6 +38,8 @@
desc/1
]).
-export([transform_bridge_v1_config_to_connector_config/1]).
-export([precision_field/0, server_field/0]).
%% only for test
@ -63,7 +65,7 @@ callback_mode() -> async_if_possible.
on_add_channel(
_InstanceId,
#{channels := Channels} = OldState,
#{channels := Channels, client := Client} = OldState,
ChannelId,
#{parameters := Parameters} = ChannelConfig0
) ->
@ -72,10 +74,13 @@ on_add_channel(
ChannelConfig = maps:merge(
Parameters,
ChannelConfig0#{
channel_client => influxdb:update_precision(Client, Precision),
write_syntax => to_config(WriteSytaxTmpl, Precision)
}
),
{ok, OldState#{channels => maps:put(ChannelId, ChannelConfig, Channels)}}.
{ok, OldState#{
channels => maps:put(ChannelId, ChannelConfig, Channels)
}}.
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
NewState = State#{channels => maps:remove(ChannelId, Channels)},
@ -108,8 +113,9 @@ on_stop(InstId, _State) ->
ok
end.
on_query(InstId, {Channel, Message}, #{channels := ChannelConf, client := Client}) ->
on_query(InstId, {Channel, Message}, #{channels := ChannelConf}) ->
#{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
#{channel_client := Client} = maps:get(Channel, ChannelConf),
case data_to_points(Message, SyntaxLines) of
{ok, Points} ->
?tp(
@ -128,9 +134,10 @@ on_query(InstId, {Channel, Message}, #{channels := ChannelConf, client := Client
%% Once a Batched Data trans to points failed.
%% This batch query failed
on_batch_query(InstId, BatchData, #{channels := ChannelConf, client := Client}) ->
on_batch_query(InstId, BatchData, #{channels := ChannelConf}) ->
[{Channel, _} | _] = BatchData,
#{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
#{channel_client := Client} = maps:get(Channel, ChannelConf),
case parse_batch_data(InstId, BatchData, SyntaxLines) of
{ok, Points} ->
?tp(
@ -150,9 +157,10 @@ on_query_async(
InstId,
{Channel, Message},
{ReplyFun, Args},
#{channels := ChannelConf, client := Client}
#{channels := ChannelConf}
) ->
#{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
#{channel_client := Client} = maps:get(Channel, ChannelConf),
case data_to_points(Message, SyntaxLines) of
{ok, Points} ->
?tp(
@ -173,10 +181,11 @@ on_batch_query_async(
InstId,
BatchData,
{ReplyFun, Args},
#{channels := ChannelConf, client := Client}
#{channels := ChannelConf}
) ->
[{Channel, _} | _] = BatchData,
#{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
#{channel_client := Client} = maps:get(Channel, ChannelConf),
case parse_batch_data(InstId, BatchData, SyntaxLines) of
{ok, Points} ->
?tp(
@ -200,6 +209,22 @@ on_get_status(_InstId, #{client := Client}) ->
disconnected
end.
transform_bridge_v1_config_to_connector_config(BridgeV1Config) ->
IndentKeys = [username, password, database, token, bucket, org],
ConnConfig0 = maps:without([write_syntax, precision], BridgeV1Config),
ConnConfig1 =
case emqx_utils_maps:indent(parameters, IndentKeys, ConnConfig0) of
#{parameters := #{database := _} = Params} = Conf ->
Conf#{parameters => Params#{influxdb_type => influxdb_api_v1}};
#{parameters := #{bucket := _} = Params} = Conf ->
Conf#{parameters => Params#{influxdb_type => influxdb_api_v2}}
end,
emqx_utils_maps:update_if_present(
resource_opts,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
ConnConfig1
).
%% -------------------------------------------------------------------------------------------------
%% schema
namespace() -> connector_influxdb.

View File

@ -537,11 +537,11 @@ t_start_ok(Config) ->
begin
case QueryMode of
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500);
?assertMatch(ok, send_message(Config, SentData));
sync ->
?assertMatch({ok, 204, _}, send_message(Config, SentData))
end,
ct:sleep(1500),
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{
bool => <<"true">>,
@ -594,8 +594,11 @@ t_start_already_started(Config) ->
{ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check(
emqx_bridge_schema, InfluxDBConfigString
),
ConnConfigMap = emqx_bridge_influxdb_connector:transform_bridge_v1_config_to_connector_config(
InfluxDBConfigMap
),
?check_trace(
emqx_bridge_influxdb_connector:on_start(ResourceId, InfluxDBConfigMap),
emqx_bridge_influxdb_connector:on_start(ResourceId, ConnConfigMap),
fun(Result, Trace) ->
?assertMatch({ok, _}, Result),
?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)),
@ -700,11 +703,11 @@ t_const_timestamp(Config) ->
},
case QueryMode of
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500);
?assertMatch(ok, send_message(Config, SentData));
sync ->
?assertMatch({ok, 204, _}, send_message(Config, SentData))
end,
ct:sleep(1500),
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{foo => <<"123">>},
assert_persisted_data(ClientId, Expected, PersistedData),
@ -762,10 +765,7 @@ t_boolean_variants(Config) ->
async ->
?assertMatch(ok, send_message(Config, SentData))
end,
case QueryMode of
async -> ct:sleep(500);
sync -> ok
end,
ct:sleep(1500),
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{
bool => atom_to_binary(Translation),
@ -817,9 +817,10 @@ t_any_num_as_float(Config) ->
?assertMatch({ok, 204, _}, send_message(Config, SentData)),
ok;
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500)
?assertMatch(ok, send_message(Config, SentData))
end,
%% sleep is still need even in sync mode, or we would get an empty result sometimes
ct:sleep(1500),
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{float_no_dp => <<"123">>, float_dp => <<"123">>},
assert_persisted_data(ClientId, Expected, PersistedData),
@ -938,10 +939,13 @@ t_create_disconnected(Config) ->
?assertMatch({ok, _}, create_bridge(Config))
end),
fun(Trace) ->
?assertMatch(
[#{error := influxdb_client_not_alive, reason := econnrefused}],
?of_kind(influxdb_connector_start_failed, Trace)
),
[#{error := influxdb_client_not_alive, reason := Reason}] =
?of_kind(influxdb_connector_start_failed, Trace),
case Reason of
econnrefused -> ok;
{closed, _} -> ok;
_ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason])
end,
ok
end
),
@ -1146,10 +1150,8 @@ t_authentication_error(Config0) ->
ok.
t_authentication_error_on_get_status(Config0) ->
ResourceId = resource_id(Config0),
% Fake initialization to simulate credential update after bridge was created.
emqx_common_test_helpers:with_mock(
ResourceId = emqx_common_test_helpers:with_mock(
influxdb,
check_auth,
fun(_) ->
@ -1165,20 +1167,20 @@ t_authentication_error_on_get_status(Config0) ->
end,
Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}),
{ok, _} = create_bridge(Config),
ResourceId = resource_id(Config0),
?retry(
_Sleep = 1_000,
_Attempts = 10,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
)
),
ResourceId
end
),
% Now back to wrong credentials
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
ok.
t_authentication_error_on_send_message(Config0) ->
ResourceId = resource_id(Config0),
QueryMode = proplists:get_value(query_mode, Config0, sync),
InfluxDBType = ?config(influxdb_type, Config0),
InfluxConfig0 = proplists:get_value(influxdb_config, Config0),
@ -1198,6 +1200,7 @@ t_authentication_error_on_send_message(Config0) ->
end,
fun() ->
{ok, _} = create_bridge(Config),
ResourceId = resource_id(Config),
?retry(
_Sleep = 1_000,
_Attempts = 10,