test: make test passed 21/29

This commit is contained in:
Dennis Zhuang 2023-07-16 17:46:04 +08:00 committed by firest
parent 975795a6e0
commit c6a7f3e2ad
4 changed files with 69 additions and 145 deletions

View File

@ -3,13 +3,20 @@ version: '3.9'
services: services:
greptimedb: greptimedb:
container_name: greptimedb container_name: greptimedb
hostname: greptimedb
image: greptime/greptimedb:0.3.2 image: greptime/greptimedb:0.3.2
expose: expose:
- "4000" - "4000"
- "4001" - "4001"
# uncomment for local testing
# ports:
# - "4000:4000"
# - "4001:4001"
restart: always restart: always
networks: networks:
- emqx_bridge - emqx_bridge
command: command:
standalone start standalone start
--user-provider=static_user_provider:cmd:greptime_user=greptime_pwd --user-provider=static_user_provider:cmd:greptime_user=greptime_pwd
--http-addr="0.0.0.0:4000"
--rpc-addr="0.0.0.0:4001"

View File

@ -160,17 +160,18 @@
"name": "hstreamdb", "name": "hstreamdb",
"listen": "0.0.0.0:6570", "listen": "0.0.0.0:6570",
"upstream": "hstreamdb:6570", "upstream": "hstreamdb:6570",
"enabled": true
}, },
{ {
"name": "greptimedb_http", "name": "greptimedb_http",
"listen": "0.0.0.0:4000", "listen": "0.0.0.0:4000",
"upstream": "iotdb:4000", "upstream": "greptimedb:4000",
"enabled": true "enabled": true
}, },
{ {
"name": "greptimedb_grpc", "name": "greptimedb_grpc",
"listen": "0.0.0.0:4001", "listen": "0.0.0.0:4001",
"upstream": "iotdb:4001", "upstream": "greptimedb:4001",
"enabled": true "enabled": true
}, },
{ {

View File

@ -312,7 +312,7 @@ do_query(InstId, Client, Points) ->
connector => InstId, connector => InstId,
points => Points points => Points
}); });
{error, {401, _, _}} -> {error, {unauth, _, _}} ->
?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}), ?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}),
?SLOG(error, #{ ?SLOG(error, #{
msg => "greptimedb_authorization_failed", msg => "greptimedb_authorization_failed",

View File

@ -57,6 +57,7 @@ init_per_group(GreptimedbType, Config0) when
#{ #{
host := GreptimedbHost, host := GreptimedbHost,
port := GreptimedbPort, port := GreptimedbPort,
http_port := GreptimedbHttpPort,
use_tls := UseTLS, use_tls := UseTLS,
proxy_name := ProxyName proxy_name := ProxyName
} = } =
@ -65,13 +66,15 @@ init_per_group(GreptimedbType, Config0) when
#{ #{
host => os:getenv("GREPTIMEDB_GRPCV1_TCP_HOST", "toxiproxy"), host => os:getenv("GREPTIMEDB_GRPCV1_TCP_HOST", "toxiproxy"),
port => list_to_integer(os:getenv("GREPTIMEDB_GRPCV1_TCP_PORT", "4001")), port => list_to_integer(os:getenv("GREPTIMEDB_GRPCV1_TCP_PORT", "4001")),
http_port => list_to_integer(os:getenv("GREPTIMEDB_HTTP_PORT", "4000")),
use_tls => false, use_tls => false,
proxy_name => "greptimedb_tcp" proxy_name => "greptimedb_grpc"
}; };
grpcv1_tls -> grpcv1_tls ->
#{ #{
host => os:getenv("GREPTIMEDB_GRPCV1_TLS_HOST", "toxiproxy"), host => os:getenv("GREPTIMEDB_GRPCV1_TLS_HOST", "toxiproxy"),
port => list_to_integer(os:getenv("GREPTIMEDB_GRPCV1_TLS_PORT", "4001")), port => list_to_integer(os:getenv("GREPTIMEDB_GRPCV1_TLS_PORT", "4001")),
http_port => list_to_integer(os:getenv("GREPTIMEDB_HTTP_PORT", "4000")),
use_tls => true, use_tls => true,
proxy_name => "greptimedb_tls" proxy_name => "greptimedb_tls"
} }
@ -98,7 +101,7 @@ init_per_group(GreptimedbType, Config0) when
end, end,
EHttpcPoolOpts = [ EHttpcPoolOpts = [
{host, GreptimedbHost}, {host, GreptimedbHost},
{port, GreptimedbPort}, {port, GreptimedbHttpPort},
{pool_size, 1}, {pool_size, 1},
{transport, EHttpcTransport}, {transport, EHttpcTransport},
{transport_opts, EHttpcTransportOpts} {transport_opts, EHttpcTransportOpts}
@ -110,6 +113,7 @@ init_per_group(GreptimedbType, Config0) when
{proxy_name, ProxyName}, {proxy_name, ProxyName},
{greptimedb_host, GreptimedbHost}, {greptimedb_host, GreptimedbHost},
{greptimedb_port, GreptimedbPort}, {greptimedb_port, GreptimedbPort},
{greptimedb_http_port, GreptimedbHttpPort},
{greptimedb_type, grpcv1}, {greptimedb_type, grpcv1},
{greptimedb_config, GreptimedbConfig}, {greptimedb_config, GreptimedbConfig},
{greptimedb_config_string, ConfigString}, {greptimedb_config_string, ConfigString},
@ -282,12 +286,12 @@ send_message(Config, Payload) ->
BridgeId = emqx_bridge_resource:bridge_id(Type, Name), BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
emqx_bridge:send_message(BridgeId, Payload). emqx_bridge:send_message(BridgeId, Payload).
query_by_clientid(ClientId, Config) -> query_by_clientid(Topic, ClientId, Config) ->
GreptimedbHost = ?config(greptimedb_host, Config), GreptimedbHost = ?config(greptimedb_host, Config),
GreptimedbPort = ?config(greptimedb_port, Config), GreptimedbPort = ?config(greptimedb_http_port, Config),
EHttpcPoolName = ?config(ehttpc_pool_name, Config), EHttpcPoolName = ?config(ehttpc_pool_name, Config),
UseTLS = ?config(use_tls, Config), UseTLS = ?config(use_tls, Config),
Path = <<"/api/v2/query?org=emqx">>, Path = <<"/v1/sql?db=public">>,
Scheme = Scheme =
case UseTLS of case UseTLS of
true -> <<"https://">>; true -> <<"https://">>;
@ -300,26 +304,11 @@ query_by_clientid(ClientId, Config) ->
integer_to_binary(GreptimedbPort), integer_to_binary(GreptimedbPort),
Path Path
]), ]),
Query =
<<
"from(bucket: \"mqtt\")\n"
" |> range(start: -12h)\n"
" |> filter(fn: (r) => r.clientid == \"",
ClientId/binary,
"\")"
>>,
Headers = [ Headers = [
{"Authorization", "Token abcdefg"}, {"Authorization", "Basic Z3JlcHRpbWVfdXNlcjpncmVwdGltZV9wd2Q="},
{"Content-Type", "application/json"} {"Content-Type", "application/x-www-form-urlencoded"}
], ],
Body = Body = <<"sql=select * from ", Topic/binary, " where clientid='", ClientId/binary, "'">>,
emqx_utils_json:encode(#{
query => Query,
dialect => #{
header => true,
delimiter => <<";">>
}
}),
{ok, 200, _Headers, RawBody0} = {ok, 200, _Headers, RawBody0} =
ehttpc:request( ehttpc:request(
EHttpcPoolName, EHttpcPoolName,
@ -328,32 +317,30 @@ query_by_clientid(ClientId, Config) ->
_Timeout = 10_000, _Timeout = 10_000,
_Retry = 0 _Retry = 0
), ),
RawBody1 = iolist_to_binary(string:replace(RawBody0, <<"\r\n">>, <<"\n">>, all)), #{
{ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}), <<"code">> := 0,
DecodedCSV1 = [ <<"output">> := [
[Field || Field <- Line, Field =/= <<>>] #{
|| Line <- DecodedCSV0, <<"records">> := #{
Line =/= [<<>>] <<"rows">> := Rows,
], <<"schema">> := Schema
DecodedCSV2 = csv_lines_to_maps(DecodedCSV1, []), }
index_by_field(DecodedCSV2). }
]
} = emqx_utils_json:decode(RawBody0, [return_maps]),
decode_csv(RawBody) -> case Schema of
Lines = null ->
[ #{};
binary:split(Line, [<<";">>], [global, trim_all]) #{<<"column_schemas">> := ColumnsSchemas} ->
|| Line <- binary:split(RawBody, [<<"\r\n">>], [global, trim_all]) Columns = lists:map(fun(#{<<"name">> := Name}) -> Name end, ColumnsSchemas),
], index_by_field(Rows, Columns)
csv_lines_to_maps(Lines, []). end.
csv_lines_to_maps([Fields, Data | Rest], Acc) -> index_by_field([], Columns) ->
Map = maps:from_list(lists:zip(Fields, Data)), #{};
csv_lines_to_maps(Rest, [Map | Acc]); index_by_field([Row], Columns) ->
csv_lines_to_maps(_Data, Acc) -> maps:from_list(lists:zip(Columns, Row)).
lists:reverse(Acc).
index_by_field(DecodedCSV) ->
maps:from_list([{Field, Data} || Data = #{<<"_field">> := Field} <- DecodedCSV]).
assert_persisted_data(ClientId, Expected, PersistedData) -> assert_persisted_data(ClientId, Expected, PersistedData) ->
ClientIdIntKey = <<ClientId/binary, "_int_value">>, ClientIdIntKey = <<ClientId/binary, "_int_value">>,
@ -361,12 +348,12 @@ assert_persisted_data(ClientId, Expected, PersistedData) ->
fun fun
(int_value, ExpectedValue) -> (int_value, ExpectedValue) ->
?assertMatch( ?assertMatch(
#{<<"_value">> := ExpectedValue}, ExpectedValue,
maps:get(ClientIdIntKey, PersistedData) maps:get(ClientIdIntKey, PersistedData)
); );
(Key, ExpectedValue) -> (Key, ExpectedValue) ->
?assertMatch( ?assertMatch(
#{<<"_value">> := ExpectedValue}, ExpectedValue,
maps:get(atom_to_binary(Key), PersistedData), maps:get(atom_to_binary(Key), PersistedData),
#{expected => ExpectedValue} #{expected => ExpectedValue}
) )
@ -409,12 +396,12 @@ t_start_ok(Config) ->
sync -> sync ->
?assertMatch(ok, send_message(Config, SentData)) ?assertMatch(ok, send_message(Config, SentData))
end, end,
PersistedData = query_by_clientid(ClientId, Config), PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config),
Expected = #{ Expected = #{
bool => <<"true">>, bool => true,
int_value => <<"-123">>, int_value => -123,
uint_value => <<"123">>, uint_value => 123,
float_value => <<"24.5">>, float_value => 24.5,
payload => emqx_utils_json:encode(Payload) payload => emqx_utils_json:encode(Payload)
}, },
assert_persisted_data(ClientId, Expected, PersistedData), assert_persisted_data(ClientId, Expected, PersistedData),
@ -423,12 +410,16 @@ t_start_ok(Config) ->
fun(Trace0) -> fun(Trace0) ->
Trace = ?of_kind(greptimedb_connector_send_query, Trace0), Trace = ?of_kind(greptimedb_connector_send_query, Trace0),
?assertMatch([#{points := [_]}], Trace), ?assertMatch([#{points := [_]}], Trace),
[#{points := [Point]}] = Trace, [#{points := [Point0]}] = Trace,
{Measurement, [Point]} = Point0,
ct:pal("sent point: ~p", [Point]), ct:pal("sent point: ~p", [Point]),
?assertMatch(
<<_/binary>>,
Measurement
),
?assertMatch( ?assertMatch(
#{ #{
fields := #{}, fields := #{},
measurement := <<_/binary>>,
tags := #{}, tags := #{},
timestamp := TS timestamp := TS
} when is_integer(TS), } when is_integer(TS),
@ -538,9 +529,6 @@ t_const_timestamp(Config) ->
QueryMode = ?config(query_mode, Config), QueryMode = ?config(query_mode, Config),
Const = erlang:system_time(nanosecond), Const = erlang:system_time(nanosecond),
ConstBin = integer_to_binary(Const), ConstBin = integer_to_binary(Const),
TsStr = iolist_to_binary(
calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}])
),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge( create_bridge(
@ -563,24 +551,11 @@ t_const_timestamp(Config) ->
sync -> sync ->
?assertMatch(ok, send_message(Config, SentData)) ?assertMatch(ok, send_message(Config, SentData))
end, end,
PersistedData = query_by_clientid(ClientId, Config), PersistedData = query_by_clientid(<<"mqtt">>, ClientId, Config),
Expected = #{foo => <<"123">>}, Expected = #{foo => 123},
assert_persisted_data(ClientId, Expected, PersistedData), assert_persisted_data(ClientId, Expected, PersistedData),
TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"foo">>, PersistedData)), TimeReturned = maps:get(<<"greptime_timestamp">>, PersistedData),
TimeReturned = pad_zero(TimeReturned0), ?assertEqual(Const, TimeReturned).
?assertEqual(TsStr, TimeReturned).
%% greptimedb returns timestamps without trailing zeros such as
%% "2023-02-28T17:21:51.63678163Z"
%% while the standard should be
%% "2023-02-28T17:21:51.636781630Z"
pad_zero(BinTs) ->
StrTs = binary_to_list(BinTs),
[Nano | Rest] = lists:reverse(string:tokens(StrTs, ".")),
[$Z | NanoNum] = lists:reverse(Nano),
Padding = lists:duplicate(10 - length(Nano), $0),
NewNano = lists:reverse(NanoNum) ++ Padding ++ "Z",
iolist_to_binary(string:join(lists:reverse([NewNano | Rest]), ".")).
t_boolean_variants(Config) -> t_boolean_variants(Config) ->
QueryMode = ?config(query_mode, Config), QueryMode = ?config(query_mode, Config),
@ -621,11 +596,11 @@ t_boolean_variants(Config) ->
case QueryMode of case QueryMode of
sync -> ok sync -> ok
end, end,
PersistedData = query_by_clientid(ClientId, Config), PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config),
Expected = #{ Expected = #{
bool => atom_to_binary(Translation), bool => Translation,
int_value => <<"-123">>, int_value => -123,
uint_value => <<"123">>, uint_value => 123,
payload => emqx_utils_json:encode(Payload) payload => emqx_utils_json:encode(Payload)
}, },
assert_persisted_data(ClientId, Expected, PersistedData), assert_persisted_data(ClientId, Expected, PersistedData),
@ -728,7 +703,7 @@ t_create_disconnected(Config) ->
end), end),
fun(Trace) -> fun(Trace) ->
?assertMatch( ?assertMatch(
[#{error := greptimedb_client_not_alive, reason := econnrefused}], [#{error := greptimedb_client_not_alive, reason := _SomeReason}],
?of_kind(greptimedb_connector_start_failed, Trace) ?of_kind(greptimedb_connector_start_failed, Trace)
), ),
ok ok
@ -871,8 +846,8 @@ t_missing_field(Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
PersistedData0 = query_by_clientid(ClientId0, Config), PersistedData0 = query_by_clientid(ClientId0, ClientId0, Config),
PersistedData1 = query_by_clientid(ClientId1, Config), PersistedData1 = query_by_clientid(ClientId1, ClientId1, Config),
case IsBatch of case IsBatch of
true -> true ->
?assertMatch( ?assertMatch(
@ -893,65 +868,6 @@ t_missing_field(Config) ->
), ),
ok. ok.
t_authentication_error(Config0) ->
GreptimedbType = ?config(greptimedb_type, Config0),
GreptimeConfig0 = proplists:get_value(greptimedb_config, Config0),
GreptimeConfig =
case GreptimedbType of
grpcv1 -> GreptimeConfig0#{<<"password">> => <<"wrong_password">>}
end,
Config = lists:keyreplace(greptimedb_config, 1, Config0, {greptimedb_config, GreptimeConfig}),
?check_trace(
begin
?wait_async_action(
create_bridge(Config),
#{?snk_kind := greptimedb_connector_start_failed},
10_000
)
end,
fun(Trace) ->
?assertMatch(
[#{error := auth_error} | _],
?of_kind(greptimedb_connector_start_failed, Trace)
),
ok
end
),
ok.
t_authentication_error_on_get_status(Config0) ->
ResourceId = resource_id(Config0),
% Fake initialization to simulate credential update after bridge was created.
emqx_common_test_helpers:with_mock(
greptimedb,
check_auth,
fun(_) ->
ok
end,
fun() ->
GreptimedbType = ?config(greptimedb_type, Config0),
GreptimeConfig0 = proplists:get_value(greptimedb_config, Config0),
GreptimeConfig =
case GreptimedbType of
grpcv1 -> GreptimeConfig0#{<<"password">> => <<"wrong_password">>}
end,
Config = lists:keyreplace(
greptimedb_config, 1, Config0, {greptimedb_config, GreptimeConfig}
),
{ok, _} = create_bridge(Config),
?retry(
_Sleep = 1_000,
_Attempts = 10,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
)
end
),
% Now back to wrong credentials
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
ok.
t_authentication_error_on_send_message(Config0) -> t_authentication_error_on_send_message(Config0) ->
ResourceId = resource_id(Config0), ResourceId = resource_id(Config0),
QueryMode = proplists:get_value(query_mode, Config0, sync), QueryMode = proplists:get_value(query_mode, Config0, sync),