fix(iotdb): fix iotdb testcases

This commit is contained in:
firest 2024-01-06 13:01:43 +08:00
parent d16458ccd0
commit e20b024b6e
6 changed files with 148 additions and 43 deletions

View File

@ -124,7 +124,7 @@ resource_type(sqlserver) -> emqx_bridge_sqlserver_connector;
resource_type(opents) -> emqx_bridge_opents_connector; resource_type(opents) -> emqx_bridge_opents_connector;
resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer; resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer;
resource_type(oracle) -> emqx_oracle; resource_type(oracle) -> emqx_oracle;
resource_type(iotdb) -> emqx_bridge_iotdb_impl; resource_type(iotdb) -> emqx_bridge_iotdb_connector;
resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector;
resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer; resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer;
resource_type(greptimedb) -> emqx_bridge_greptimedb_connector; resource_type(greptimedb) -> emqx_bridge_greptimedb_connector;

View File

@ -478,11 +478,13 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
), ),
BridgeId = bridge_id(Config), BridgeId = bridge_id(Config),
BridgeType = ?config(bridge_type, Config),
BridgeName = ?config(bridge_name, Config),
Message = {BridgeId, MakeMessageFun()}, Message = {BridgeId, MakeMessageFun()},
?assertMatch( ?assertMatch(
{ok, {ok, _}}, {ok, {ok, _}},
?wait_async_action( ?wait_async_action(
emqx_resource:query(ResourceId, Message, #{ emqx_bridge_v2:query(BridgeType, BridgeName, Message, #{
async_reply_fun => {ReplyFun, [self()]} async_reply_fun => {ReplyFun, [self()]}
}), }),
#{?snk_kind := TracePoint, instance_id := ResourceId}, #{?snk_kind := TracePoint, instance_id := ResourceId},

View File

@ -4,7 +4,7 @@
{vsn, "0.1.4"}, {vsn, "0.1.4"},
{modules, [ {modules, [
emqx_bridge_iotdb, emqx_bridge_iotdb,
emqx_bridge_iotdb_impl emqx_bridge_iotdb_connector
]}, ]},
{registered, []}, {registered, []},
{applications, [ {applications, [

View File

@ -316,7 +316,7 @@ action_values() ->
} }
], ],
is_aligned => false, is_aligned => false,
device_id => <<"${clientid}">>, device_id => <<"my_device">>,
iotdb_version => ?VSN_1_1_X iotdb_version => ?VSN_1_1_X
} }
}. }.

View File

@ -495,10 +495,11 @@ convert_float(undefined) ->
make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) -> make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) ->
InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
{ok,
maps:merge(Rows, #{ maps:merge(Rows, #{
iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
iotdb_field_key(device_id, IotDBVsn) => DeviceId iotdb_field_key(device_id, IotDBVsn) => DeviceId
}). })}.
replace_dtypes(Rows0, IotDBVsn) -> replace_dtypes(Rows0, IotDBVsn) ->
{Types, Rows} = maps:take(dtypes, Rows0), {Types, Rows} = maps:take(dtypes, Rows0),
@ -582,8 +583,8 @@ to_list(Data) -> [Data].
%% If device_id is missing from the channel data, try to find it from the payload %% If device_id is missing from the channel data, try to find it from the payload
device_id(Message, Payloads, Channel) -> device_id(Message, Payloads, Channel) ->
case maps:get(device_id, Channel, <<>>) of case maps:get(device_id, Channel, []) of
<<>> -> [] ->
maps:get(<<"device_id">>, hd(Payloads), undefined); maps:get(<<"device_id">>, hd(Payloads), undefined);
DeviceIdTkn -> DeviceIdTkn ->
emqx_placeholder:proc_tmpl(DeviceIdTkn, Message) emqx_placeholder:proc_tmpl(DeviceIdTkn, Message)
@ -629,17 +630,26 @@ preproc_data_template(DataList) ->
try_render_message({ChannelId, Msg}, Channels) -> try_render_message({ChannelId, Msg}, Channels) ->
case maps:find(ChannelId, Channels) of case maps:find(ChannelId, Channels) of
{ok, Channel} -> {ok, Channel} ->
{ok, render_channel_message(Channel, Msg)}; render_channel_message(Channel, Msg);
_ -> _ ->
{error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
end. end.
render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) -> render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) ->
Payloads = to_list(parse_payload(get_payload(Message))), Payloads = to_list(parse_payload(get_payload(Message))),
DataTemplate = get_data_template(Channel, Payloads), case device_id(Message, Payloads, Channel) of
undefined ->
{error, device_id_missing};
DeviceId ->
case get_data_template(Channel, Payloads) of
[] ->
{error, invalid_data};
DataTemplate ->
DataList = proc_data(DataTemplate, Message), DataList = proc_data(DataTemplate, Message),
DeviceId = device_id(Message, Payloads, Channel),
make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn). make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn)
end
end.
%% Get the message template. %% Get the message template.
%% In order to be compatible with 4.4, the template version has higher priority %% In order to be compatible with 4.4, the template version has higher priority

View File

@ -32,10 +32,10 @@ groups() ->
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_bridge_testlib:init_per_suite(Config, ?APPS). emqx_bridge_v2_testlib:init_per_suite(Config, ?APPS).
end_per_suite(Config) -> end_per_suite(Config) ->
emqx_bridge_testlib:end_per_suite(Config). emqx_bridge_v2_testlib:end_per_suite(Config).
init_per_group(plain = Type, Config0) -> init_per_group(plain = Type, Config0) ->
Host = os:getenv("IOTDB_PLAIN_HOST", "toxiproxy.emqx.net"), Host = os:getenv("IOTDB_PLAIN_HOST", "toxiproxy.emqx.net"),
@ -43,7 +43,7 @@ init_per_group(plain = Type, Config0) ->
ProxyName = "iotdb", ProxyName = "iotdb",
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
true -> true ->
Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
[ [
{bridge_host, Host}, {bridge_host, Host},
{bridge_port, Port}, {bridge_port, Port},
@ -66,7 +66,7 @@ init_per_group(legacy = Type, Config0) ->
ProxyName = "iotdb013", ProxyName = "iotdb013",
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
true -> true ->
Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
[ [
{bridge_host, Host}, {bridge_host, Host},
{bridge_port, Port}, {bridge_port, Port},
@ -90,18 +90,34 @@ end_per_group(Group, Config) when
Group =:= plain; Group =:= plain;
Group =:= legacy Group =:= legacy
-> ->
emqx_bridge_testlib:end_per_group(Config), emqx_bridge_v2_testlib:end_per_group(Config),
ok; ok;
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.
init_per_testcase(TestCase, Config0) -> init_per_testcase(TestCase, Config0) ->
Config = emqx_bridge_testlib:init_per_testcase(TestCase, Config0, fun bridge_config/3), Type = ?config(bridge_type, Config0),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = <<
(atom_to_binary(TestCase))/binary, UniqueNum/binary
>>,
{_ConfigString, ConnectorConfig} = connector_config(Name, Config0),
{_, ActionConfig} = action_config(Name, Config0),
Config = [
{connector_type, Type},
{connector_name, Name},
{connector_config, ConnectorConfig},
{bridge_type, Type},
{bridge_name, Name},
{bridge_config, ActionConfig}
| Config0
],
iotdb_reset(Config), iotdb_reset(Config),
ok = snabbkaffe:start_trace(),
Config. Config.
end_per_testcase(TestCase, Config) -> end_per_testcase(TestCase, Config) ->
emqx_bridge_testlib:end_per_testcase(TestCase, Config). emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper fns %% Helper fns
@ -114,7 +130,7 @@ iotdb_server_url(Host, Port) ->
integer_to_binary(Port) integer_to_binary(Port)
]). ]).
bridge_config(TestCase, _TestGroup, Config) -> bridge_config(TestCase, Config) ->
UniqueNum = integer_to_binary(erlang:unique_integer()), UniqueNum = integer_to_binary(erlang:unique_integer()),
Host = ?config(bridge_host, Config), Host = ?config(bridge_host, Config),
Port = ?config(bridge_port, Config), Port = ?config(bridge_port, Config),
@ -149,7 +165,7 @@ bridge_config(TestCase, _TestGroup, Config) ->
Version Version
] ]
), ),
{Name, ConfigString, emqx_bridge_testlib:parse_and_check(Type, Name, ConfigString)}. {Name, ConfigString, emqx_bridge_v2_testlib:parse_and_check(Type, Name, ConfigString)}.
make_iotdb_payload(DeviceId, Measurement, Type, Value) -> make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
#{ #{
@ -201,7 +217,7 @@ iotdb_request(Config, Path, Body, Opts) ->
<<"password">> := Password <<"password">> := Password
} }
} = } =
?config(bridge_config, Config), ?config(connector_config, Config),
ct:pal("bridge config: ~p", [_BridgeConfig]), ct:pal("bridge config: ~p", [_BridgeConfig]),
URL = <<BaseURL/binary, Path/binary>>, URL = <<BaseURL/binary, Path/binary>>,
BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>), BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
@ -238,6 +254,76 @@ is_error_check(Reason) ->
?assertEqual({error, Reason}, Result) ?assertEqual({error, Reason}, Result)
end. end.
action_config(Name, Config) ->
Version = ?config(iotdb_version, Config),
Type = ?config(bridge_type, Config),
ConfigString =
io_lib:format(
"actions.~s.~s {\n"
" enable = true\n"
" connector = \"~s\"\n"
" parameters = {\n"
" iotdb_version = \"~s\"\n"
" data = []\n"
" }\n"
"}\n",
[
Type,
Name,
Name,
Version
]
),
ct:pal("ActionConfig:~ts~n", [ConfigString]),
{ConfigString, parse_action_and_check(ConfigString, Type, Name)}.
connector_config(Name, Config) ->
Host = ?config(bridge_host, Config),
Port = ?config(bridge_port, Config),
Type = ?config(bridge_type, Config),
ServerURL = iotdb_server_url(Host, Port),
ConfigString =
io_lib:format(
"connectors.~s.~s {\n"
" enable = true\n"
" base_url = \"~s\"\n"
" authentication = {\n"
" username = \"root\"\n"
" password = \"root\"\n"
" }\n"
"}\n",
[
Type,
Name,
ServerURL
]
),
ct:pal("ConnectorConfig:~ts~n", [ConfigString]),
{ConfigString, parse_connector_and_check(ConfigString, Type, Name)}.
parse_action_and_check(ConfigString, BridgeType, Name) ->
parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name).
parse_connector_and_check(ConfigString, ConnectorType, Name) ->
parse_and_check(
ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name
).
%% emqx_utils_maps:safe_atom_key_map(Config).
parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) ->
Type = to_bin(Type0),
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}),
#{RootKey := #{Type := #{Name := Config}}} = RawConf,
Config.
to_bin(List) when is_list(List) ->
unicode:characters_to_binary(List, utf8);
to_bin(Atom) when is_atom(Atom) ->
erlang:atom_to_binary(Atom);
to_bin(Bin) when is_binary(Bin) ->
Bin.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -246,7 +332,7 @@ t_sync_query_simple(Config) ->
DeviceId = iotdb_device(Config), DeviceId = iotdb_device(Config),
Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
ok = emqx_bridge_testlib:t_sync_query( ok = emqx_bridge_v2_testlib:t_sync_query(
Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
), ),
Query = <<"select temp from ", DeviceId/binary>>, Query = <<"select temp from ", DeviceId/binary>>,
@ -260,7 +346,7 @@ t_async_query(Config) ->
DeviceId = iotdb_device(Config), DeviceId = iotdb_device(Config),
Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
ok = emqx_bridge_testlib:t_async_query( ok = emqx_bridge_v2_testlib:t_async_query(
Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async
), ),
Query = <<"select temp from ", DeviceId/binary>>, Query = <<"select temp from ", DeviceId/binary>>,
@ -310,7 +396,7 @@ t_sync_query_aggregated(Config) ->
make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300) make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300)
], ],
MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
ok = emqx_bridge_testlib:t_sync_query( ok = emqx_bridge_v2_testlib:t_sync_query(
Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
), ),
@ -369,10 +455,12 @@ t_sync_query_fail(Config) ->
fun(Result) -> fun(Result) ->
?assertEqual(error, element(1, Result)) ?assertEqual(error, element(1, Result))
end, end,
emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). emqx_bridge_v2_testlib:t_sync_query(
Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query
).
t_sync_device_id_missing(Config) -> t_sync_device_id_missing(Config) ->
emqx_bridge_testlib:t_sync_query( emqx_bridge_v2_testlib:t_sync_query(
Config, Config,
make_message_fun(iotdb_topic(Config), #{foo => bar}), make_message_fun(iotdb_topic(Config), #{foo => bar}),
is_error_check(device_id_missing), is_error_check(device_id_missing),
@ -387,7 +475,7 @@ t_extract_device_id_from_rule_engine_message(Config) ->
Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)), Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)),
?check_trace( ?check_trace(
begin begin
{ok, _} = emqx_bridge_testlib:create_bridge(Config), {ok, _} = emqx_bridge_v2_testlib:create_bridge(Config),
SQL = << SQL = <<
"SELECT\n" "SELECT\n"
" payload.measurement, payload.data_type, payload.value, payload.device_id\n" " payload.measurement, payload.data_type, payload.value, payload.device_id\n"
@ -397,7 +485,7 @@ t_extract_device_id_from_rule_engine_message(Config) ->
"\"" "\""
>>, >>,
Opts = #{sql => SQL}, Opts = #{sql => SQL},
{ok, _} = emqx_bridge_testlib:create_rule_and_action_http( {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
BridgeType, RuleTopic, Config, Opts BridgeType, RuleTopic, Config, Opts
), ),
emqx:publish(Message), emqx:publish(Message),
@ -415,7 +503,7 @@ t_extract_device_id_from_rule_engine_message(Config) ->
ok. ok.
t_sync_invalid_data(Config) -> t_sync_invalid_data(Config) ->
emqx_bridge_testlib:t_sync_query( emqx_bridge_v2_testlib:t_sync_query(
Config, Config,
make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}), make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
is_error_check(invalid_data), is_error_check(invalid_data),
@ -423,7 +511,7 @@ t_sync_invalid_data(Config) ->
). ).
t_async_device_id_missing(Config) -> t_async_device_id_missing(Config) ->
emqx_bridge_testlib:t_async_query( emqx_bridge_v2_testlib:t_async_query(
Config, Config,
make_message_fun(iotdb_topic(Config), #{foo => bar}), make_message_fun(iotdb_topic(Config), #{foo => bar}),
is_error_check(device_id_missing), is_error_check(device_id_missing),
@ -431,7 +519,7 @@ t_async_device_id_missing(Config) ->
). ).
t_async_invalid_data(Config) -> t_async_invalid_data(Config) ->
emqx_bridge_testlib:t_async_query( emqx_bridge_v2_testlib:t_async_query(
Config, Config,
make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}), make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
is_error_check(invalid_data), is_error_check(invalid_data),
@ -439,18 +527,19 @@ t_async_invalid_data(Config) ->
). ).
t_create_via_http(Config) -> t_create_via_http(Config) ->
emqx_bridge_testlib:t_create_via_http(Config). emqx_bridge_v2_testlib:t_create_via_http(Config).
t_start_stop(Config) -> t_start_stop(Config) ->
emqx_bridge_testlib:t_start_stop(Config, iotdb_bridge_stopped). emqx_bridge_v2_testlib:t_start_stop(Config, iotdb_bridge_stopped).
t_on_get_status(Config) -> t_on_get_status(Config) ->
emqx_bridge_testlib:t_on_get_status(Config). emqx_bridge_v2_testlib:t_on_get_status(Config).
t_device_id(Config) -> t_device_id(Config) ->
ResourceId = emqx_bridge_testlib:resource_id(Config),
%% Create without device_id configured %% Create without device_id configured
?assertMatch({ok, _}, emqx_bridge_testlib:create_bridge(Config)), ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
?retry( ?retry(
_Sleep = 1_000, _Sleep = 1_000,
_Attempts = 20, _Attempts = 20,
@ -463,9 +552,11 @@ t_device_id(Config) ->
iotdb_reset(Config, ConfiguredDevice), iotdb_reset(Config, ConfiguredDevice),
Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true), Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
MessageF1 = make_message_fun(Topic, Payload1), MessageF1 = make_message_fun(Topic, Payload1),
is_success_check( is_success_check(
emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
), ),
{ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]), ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]),
#{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1), #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1),
@ -476,10 +567,12 @@ t_device_id(Config) ->
%% reconfigure bridge with device_id %% reconfigure bridge with device_id
{ok, _} = {ok, _} =
emqx_bridge_testlib:update_bridge_api(Config, #{<<"device_id">> => ConfiguredDevice}), emqx_bridge_v2_testlib:update_bridge_api(Config, #{
<<"parameters">> => #{<<"device_id">> => ConfiguredDevice}
}),
is_success_check( is_success_check(
emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
), ),
%% even though we had a device_id in the message it's not being used %% even though we had a device_id in the message it's not being used