diff --git a/.ci/docker-compose-file/docker-compose-greptimedb.yaml b/.ci/docker-compose-file/docker-compose-greptimedb.yaml index f379969bd..8980c946d 100644 --- a/.ci/docker-compose-file/docker-compose-greptimedb.yaml +++ b/.ci/docker-compose-file/docker-compose-greptimedb.yaml @@ -3,13 +3,20 @@ version: '3.9' services: greptimedb: container_name: greptimedb + hostname: greptimedb image: greptime/greptimedb:0.3.2 expose: - "4000" - "4001" + # uncomment for local testing + # ports: + # - "4000:4000" + # - "4001:4001" restart: always networks: - emqx_bridge command: standalone start --user-provider=static_user_provider:cmd:greptime_user=greptime_pwd + --http-addr="0.0.0.0:4000" + --rpc-addr="0.0.0.0:4001" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index a8e2f086c..f5df5a853 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -160,17 +160,18 @@ "name": "hstreamdb", "listen": "0.0.0.0:6570", "upstream": "hstreamdb:6570", + "enabled": true }, { "name": "greptimedb_http", "listen": "0.0.0.0:4000", - "upstream": "iotdb:4000", + "upstream": "greptimedb:4000", "enabled": true }, { "name": "greptimedb_grpc", "listen": "0.0.0.0:4001", - "upstream": "iotdb:4001", + "upstream": "greptimedb:4001", "enabled": true }, { diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index bc4eacbab..43455d5d2 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -312,7 +312,7 @@ do_query(InstId, Client, Points) -> connector => InstId, points => Points }); - {error, {401, _, _}} -> + {error, {unauth, _, _}} -> ?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}), ?SLOG(error, #{ msg => "greptimedb_authorization_failed", diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl index 57ffed926..044d6a2bd 100644 --- a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl @@ -57,6 +57,7 @@ init_per_group(GreptimedbType, Config0) when #{ host := GreptimedbHost, port := GreptimedbPort, + http_port := GreptimedbHttpPort, use_tls := UseTLS, proxy_name := ProxyName } = @@ -65,13 +66,15 @@ init_per_group(GreptimedbType, Config0) when #{ host => os:getenv("GREPTIMEDB_GRPCV1_TCP_HOST", "toxiproxy"), port => list_to_integer(os:getenv("GREPTIMEDB_GRPCV1_TCP_PORT", "4001")), + http_port => list_to_integer(os:getenv("GREPTIMEDB_HTTP_PORT", "4000")), use_tls => false, - proxy_name => "greptimedb_tcp" + proxy_name => "greptimedb_grpc" }; grpcv1_tls -> #{ host => os:getenv("GREPTIMEDB_GRPCV1_TLS_HOST", "toxiproxy"), port => list_to_integer(os:getenv("GREPTIMEDB_GRPCV1_TLS_PORT", "4001")), + http_port => list_to_integer(os:getenv("GREPTIMEDB_HTTP_PORT", "4000")), use_tls => true, proxy_name => "greptimedb_tls" } @@ -98,7 +101,7 @@ init_per_group(GreptimedbType, Config0) when end, EHttpcPoolOpts = [ {host, GreptimedbHost}, - {port, GreptimedbPort}, + {port, GreptimedbHttpPort}, {pool_size, 1}, {transport, EHttpcTransport}, {transport_opts, EHttpcTransportOpts} @@ -110,6 +113,7 @@ init_per_group(GreptimedbType, Config0) when {proxy_name, ProxyName}, {greptimedb_host, GreptimedbHost}, {greptimedb_port, GreptimedbPort}, + {greptimedb_http_port, GreptimedbHttpPort}, {greptimedb_type, grpcv1}, {greptimedb_config, GreptimedbConfig}, {greptimedb_config_string, ConfigString}, @@ -282,12 +286,12 @@ send_message(Config, Payload) -> BridgeId = emqx_bridge_resource:bridge_id(Type, Name), emqx_bridge:send_message(BridgeId, Payload). -query_by_clientid(ClientId, Config) -> +query_by_clientid(Topic, ClientId, Config) -> GreptimedbHost = ?config(greptimedb_host, Config), - GreptimedbPort = ?config(greptimedb_port, Config), + GreptimedbPort = ?config(greptimedb_http_port, Config), EHttpcPoolName = ?config(ehttpc_pool_name, Config), UseTLS = ?config(use_tls, Config), - Path = <<"/api/v2/query?org=emqx">>, + Path = <<"/v1/sql?db=public">>, Scheme = case UseTLS of true -> <<"https://">>; @@ -300,26 +304,11 @@ query_by_clientid(ClientId, Config) -> integer_to_binary(GreptimedbPort), Path ]), - Query = - << - "from(bucket: \"mqtt\")\n" - " |> range(start: -12h)\n" - " |> filter(fn: (r) => r.clientid == \"", - ClientId/binary, - "\")" - >>, Headers = [ - {"Authorization", "Token abcdefg"}, - {"Content-Type", "application/json"} + {"Authorization", "Basic Z3JlcHRpbWVfdXNlcjpncmVwdGltZV9wd2Q="}, + {"Content-Type", "application/x-www-form-urlencoded"} ], - Body = - emqx_utils_json:encode(#{ - query => Query, - dialect => #{ - header => true, - delimiter => <<";">> - } - }), + Body = <<"sql=select * from ", Topic/binary, " where clientid='", ClientId/binary, "'">>, {ok, 200, _Headers, RawBody0} = ehttpc:request( EHttpcPoolName, @@ -328,32 +317,30 @@ query_by_clientid(ClientId, Config) -> _Timeout = 10_000, _Retry = 0 ), - RawBody1 = iolist_to_binary(string:replace(RawBody0, <<"\r\n">>, <<"\n">>, all)), - {ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}), - DecodedCSV1 = [ - [Field || Field <- Line, Field =/= <<>>] - || Line <- DecodedCSV0, - Line =/= [<<>>] - ], - DecodedCSV2 = csv_lines_to_maps(DecodedCSV1, []), - index_by_field(DecodedCSV2). + #{ + <<"code">> := 0, + <<"output">> := [ + #{ + <<"records">> := #{ + <<"rows">> := Rows, + <<"schema">> := Schema + } + } + ] + } = emqx_utils_json:decode(RawBody0, [return_maps]), -decode_csv(RawBody) -> - Lines = - [ - binary:split(Line, [<<";">>], [global, trim_all]) - || Line <- binary:split(RawBody, [<<"\r\n">>], [global, trim_all]) - ], - csv_lines_to_maps(Lines, []). + case Schema of + null -> + #{}; + #{<<"column_schemas">> := ColumnsSchemas} -> + Columns = lists:map(fun(#{<<"name">> := Name}) -> Name end, ColumnsSchemas), + index_by_field(Rows, Columns) + end. -csv_lines_to_maps([Fields, Data | Rest], Acc) -> - Map = maps:from_list(lists:zip(Fields, Data)), - csv_lines_to_maps(Rest, [Map | Acc]); -csv_lines_to_maps(_Data, Acc) -> - lists:reverse(Acc). - -index_by_field(DecodedCSV) -> - maps:from_list([{Field, Data} || Data = #{<<"_field">> := Field} <- DecodedCSV]). +index_by_field([], Columns) -> + #{}; +index_by_field([Row], Columns) -> + maps:from_list(lists:zip(Columns, Row)). assert_persisted_data(ClientId, Expected, PersistedData) -> ClientIdIntKey = <>, @@ -361,12 +348,12 @@ assert_persisted_data(ClientId, Expected, PersistedData) -> fun (int_value, ExpectedValue) -> ?assertMatch( - #{<<"_value">> := ExpectedValue}, + ExpectedValue, maps:get(ClientIdIntKey, PersistedData) ); (Key, ExpectedValue) -> ?assertMatch( - #{<<"_value">> := ExpectedValue}, + ExpectedValue, maps:get(atom_to_binary(Key), PersistedData), #{expected => ExpectedValue} ) @@ -409,12 +396,12 @@ t_start_ok(Config) -> sync -> ?assertMatch(ok, send_message(Config, SentData)) end, - PersistedData = query_by_clientid(ClientId, Config), + PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config), Expected = #{ - bool => <<"true">>, - int_value => <<"-123">>, - uint_value => <<"123">>, - float_value => <<"24.5">>, + bool => true, + int_value => -123, + uint_value => 123, + float_value => 24.5, payload => emqx_utils_json:encode(Payload) }, assert_persisted_data(ClientId, Expected, PersistedData), @@ -423,12 +410,16 @@ t_start_ok(Config) -> fun(Trace0) -> Trace = ?of_kind(greptimedb_connector_send_query, Trace0), ?assertMatch([#{points := [_]}], Trace), - [#{points := [Point]}] = Trace, + [#{points := [Point0]}] = Trace, + {Measurement, [Point]} = Point0, ct:pal("sent point: ~p", [Point]), + ?assertMatch( + <<_/binary>>, + Measurement + ), ?assertMatch( #{ fields := #{}, - measurement := <<_/binary>>, tags := #{}, timestamp := TS } when is_integer(TS), @@ -538,9 +529,6 @@ t_const_timestamp(Config) -> QueryMode = ?config(query_mode, Config), Const = erlang:system_time(nanosecond), ConstBin = integer_to_binary(Const), - TsStr = iolist_to_binary( - calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}]) - ), ?assertMatch( {ok, _}, create_bridge( @@ -563,24 +551,11 @@ t_const_timestamp(Config) -> sync -> ?assertMatch(ok, send_message(Config, SentData)) end, - PersistedData = query_by_clientid(ClientId, Config), - Expected = #{foo => <<"123">>}, + PersistedData = query_by_clientid(<<"mqtt">>, ClientId, Config), + Expected = #{foo => 123}, assert_persisted_data(ClientId, Expected, PersistedData), - TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"foo">>, PersistedData)), - TimeReturned = pad_zero(TimeReturned0), - ?assertEqual(TsStr, TimeReturned). - -%% greptimedb returns timestamps without trailing zeros such as -%% "2023-02-28T17:21:51.63678163Z" -%% while the standard should be -%% "2023-02-28T17:21:51.636781630Z" -pad_zero(BinTs) -> - StrTs = binary_to_list(BinTs), - [Nano | Rest] = lists:reverse(string:tokens(StrTs, ".")), - [$Z | NanoNum] = lists:reverse(Nano), - Padding = lists:duplicate(10 - length(Nano), $0), - NewNano = lists:reverse(NanoNum) ++ Padding ++ "Z", - iolist_to_binary(string:join(lists:reverse([NewNano | Rest]), ".")). + TimeReturned = maps:get(<<"greptime_timestamp">>, PersistedData), + ?assertEqual(Const, TimeReturned). t_boolean_variants(Config) -> QueryMode = ?config(query_mode, Config), @@ -621,11 +596,11 @@ t_boolean_variants(Config) -> case QueryMode of sync -> ok end, - PersistedData = query_by_clientid(ClientId, Config), + PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config), Expected = #{ - bool => atom_to_binary(Translation), - int_value => <<"-123">>, - uint_value => <<"123">>, + bool => Translation, + int_value => -123, + uint_value => 123, payload => emqx_utils_json:encode(Payload) }, assert_persisted_data(ClientId, Expected, PersistedData), @@ -728,7 +703,7 @@ t_create_disconnected(Config) -> end), fun(Trace) -> ?assertMatch( - [#{error := greptimedb_client_not_alive, reason := econnrefused}], + [#{error := greptimedb_client_not_alive, reason := _SomeReason}], ?of_kind(greptimedb_connector_start_failed, Trace) ), ok @@ -871,8 +846,8 @@ t_missing_field(Config) -> ok end, fun(Trace) -> - PersistedData0 = query_by_clientid(ClientId0, Config), - PersistedData1 = query_by_clientid(ClientId1, Config), + PersistedData0 = query_by_clientid(ClientId0, ClientId0, Config), + PersistedData1 = query_by_clientid(ClientId1, ClientId1, Config), case IsBatch of true -> ?assertMatch( @@ -893,65 +868,6 @@ t_missing_field(Config) -> ), ok. -t_authentication_error(Config0) -> - GreptimedbType = ?config(greptimedb_type, Config0), - GreptimeConfig0 = proplists:get_value(greptimedb_config, Config0), - GreptimeConfig = - case GreptimedbType of - grpcv1 -> GreptimeConfig0#{<<"password">> => <<"wrong_password">>} - end, - Config = lists:keyreplace(greptimedb_config, 1, Config0, {greptimedb_config, GreptimeConfig}), - ?check_trace( - begin - ?wait_async_action( - create_bridge(Config), - #{?snk_kind := greptimedb_connector_start_failed}, - 10_000 - ) - end, - fun(Trace) -> - ?assertMatch( - [#{error := auth_error} | _], - ?of_kind(greptimedb_connector_start_failed, Trace) - ), - ok - end - ), - ok. - -t_authentication_error_on_get_status(Config0) -> - ResourceId = resource_id(Config0), - - % Fake initialization to simulate credential update after bridge was created. - emqx_common_test_helpers:with_mock( - greptimedb, - check_auth, - fun(_) -> - ok - end, - fun() -> - GreptimedbType = ?config(greptimedb_type, Config0), - GreptimeConfig0 = proplists:get_value(greptimedb_config, Config0), - GreptimeConfig = - case GreptimedbType of - grpcv1 -> GreptimeConfig0#{<<"password">> => <<"wrong_password">>} - end, - Config = lists:keyreplace( - greptimedb_config, 1, Config0, {greptimedb_config, GreptimeConfig} - ), - {ok, _} = create_bridge(Config), - ?retry( - _Sleep = 1_000, - _Attempts = 10, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) - ) - end - ), - - % Now back to wrong credentials - ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), - ok. - t_authentication_error_on_send_message(Config0) -> ResourceId = resource_id(Config0), QueryMode = proplists:get_value(query_mode, Config0, sync),