diff --git a/.ci/docker-compose-file/docker-compose-greptimedb.yaml b/.ci/docker-compose-file/docker-compose-greptimedb.yaml index 8980c946d..6813b4983 100644 --- a/.ci/docker-compose-file/docker-compose-greptimedb.yaml +++ b/.ci/docker-compose-file/docker-compose-greptimedb.yaml @@ -4,7 +4,7 @@ services: greptimedb: container_name: greptimedb hostname: greptimedb - image: greptime/greptimedb:0.3.2 + image: greptime/greptimedb:v0.4.4 expose: - "4000" - "4001" diff --git a/apps/emqx_bridge_greptimedb/rebar.config b/apps/emqx_bridge_greptimedb/rebar.config index f0942f910..985299c3d 100644 --- a/apps/emqx_bridge_greptimedb/rebar.config +++ b/apps/emqx_bridge_greptimedb/rebar.config @@ -6,7 +6,7 @@ {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}}, - {greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.2"}}} + {greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.6"}}} ]}. {plugins, [rebar3_path_deps]}. {project_plugins, [erlfmt]}. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index d588f7f8c..af42dac52 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -21,8 +21,11 @@ on_stop/2, on_query/3, on_batch_query/3, + on_query_async/4, + on_batch_query_async/4, on_get_status/2 ]). +-export([reply_callback/2]). -export([ roots/0, @@ -57,7 +60,7 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback -callback_mode() -> always_sync. +callback_mode() -> async_if_possible. on_start(InstId, Config) -> %% InstID as pool would be handled by greptimedb client @@ -110,6 +113,49 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client {error, {unrecoverable_error, Reason}} end. +on_query_async( + InstId, + {send_message, Data}, + {ReplyFun, Args}, + _State = #{write_syntax := SyntaxLines, client := Client} +) -> + case data_to_points(Data, SyntaxLines) of + {ok, Points} -> + ?tp( + greptimedb_connector_send_query, + #{points => Points, batch => false, mode => async} + ), + do_async_query(InstId, Client, Points, {ReplyFun, Args}); + {error, ErrorPoints} = Err -> + ?tp( + greptimedb_connector_send_query_error, + #{batch => false, mode => async, error => ErrorPoints} + ), + log_error_points(InstId, ErrorPoints), + Err + end. + +on_batch_query_async( + InstId, + BatchData, + {ReplyFun, Args}, + #{write_syntax := SyntaxLines, client := Client} +) -> + case parse_batch_data(InstId, BatchData, SyntaxLines) of + {ok, Points} -> + ?tp( + greptimedb_connector_send_query, + #{points => Points, batch => true, mode => async} + ), + do_async_query(InstId, Client, Points, {ReplyFun, Args}); + {error, Reason} -> + ?tp( + greptimedb_connector_send_query_error, + #{batch => true, mode => async, error => Reason} + ), + {error, {unrecoverable_error, Reason}} + end. + on_get_status(_InstId, #{client := Client}) -> case greptimedb:is_alive(Client) of true -> @@ -344,6 +390,31 @@ do_query(InstId, Client, Points) -> end end. +do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> + ?SLOG(info, #{ + msg => "greptimedb_write_point_async", + connector => InstId, + points => Points + }), + WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]}, + ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs). + +reply_callback(ReplyFunAndArgs, {error, {unauth, _, _}}) -> + ?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}), + Result = {error, {unrecoverable_error, <<"authorization failure">>}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); +reply_callback(ReplyFunAndArgs, {error, Reason} = Error) -> + case is_unrecoverable_error(Error) of + true -> + Result = {error, {unrecoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); + false -> + Result = {error, {recoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) + end; +reply_callback(ReplyFunAndArgs, Result) -> + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result). + %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Config Trans diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl index f9d8854ac..73223892d 100644 --- a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl @@ -25,16 +25,23 @@ groups() -> TCs = emqx_common_test_helpers:all(?MODULE), [ {with_batch, [ - {group, sync_query} + {group, sync_query}, + {group, async_query} ]}, {without_batch, [ - {group, sync_query} + {group, sync_query}, + {group, async_query} ]}, {sync_query, [ {group, grpcv1_tcp} %% uncomment tls when we are ready %% {group, grpcv1_tls} ]}, + {async_query, [ + {group, grpcv1_tcp} + %% uncomment tls when we are ready + %% {group, grpcv1_tls} + ]}, {grpcv1_tcp, TCs} %%{grpcv1_tls, TCs} ]. @@ -130,6 +137,8 @@ init_per_group(GreptimedbType, Config0) when end; init_per_group(sync_query, Config) -> [{query_mode, sync} | Config]; +init_per_group(async_query, Config) -> + [{query_mode, async} | Config]; init_per_group(with_batch, Config) -> [{batch_size, 100} | Config]; init_per_group(without_batch, Config) -> @@ -420,6 +429,9 @@ t_start_ok(Config) -> ?check_trace( begin case QueryMode of + async -> + ?assertMatch(ok, send_message(Config, SentData)), + ct:sleep(500); sync -> ?assertMatch({ok, _}, send_message(Config, SentData)) end, @@ -666,6 +678,9 @@ t_const_timestamp(Config) -> <<"timestamp">> => erlang:system_time(millisecond) }, case QueryMode of + async -> + ?assertMatch(ok, send_message(Config, SentData)), + ct:sleep(500); sync -> ?assertMatch({ok, _}, send_message(Config, SentData)) end, @@ -709,9 +724,12 @@ t_boolean_variants(Config) -> }, case QueryMode of sync -> - ?assertMatch({ok, _}, send_message(Config, SentData)) + ?assertMatch({ok, _}, send_message(Config, SentData)); + async -> + ?assertMatch(ok, send_message(Config, SentData)) end, case QueryMode of + async -> ct:sleep(500); sync -> ok end, PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config), @@ -779,11 +797,29 @@ t_bad_timestamp(Config) -> #{?snk_kind := greptimedb_connector_send_query_error}, 10_000 ), - fun(Result, _Trace) -> + fun(Result, Trace) -> ?assertMatch({_, {ok, _}}, Result), {Return, {ok, _}} = Result, IsBatch = BatchSize > 1, case {QueryMode, IsBatch} of + {async, true} -> + ?assertEqual(ok, Return), + ?assertMatch( + [#{error := points_trans_failed}], + ?of_kind(greptimedb_connector_send_query_error, Trace) + ); + {async, false} -> + ?assertEqual(ok, Return), + ?assertMatch( + [ + #{ + error := [ + {error, {bad_timestamp, <<"bad_timestamp">>}} + ] + } + ], + ?of_kind(greptimedb_connector_send_query_error, Trace) + ); {sync, false} -> ?assertEqual( {error, [ @@ -907,17 +943,34 @@ t_write_failure(Config) -> {error, {resource_error, #{reason := timeout}}}, send_message(Config, SentData) ), - #{?snk_kind := greptimedb_connector_do_query_failure, action := nack}, - 16_000 + #{?snk_kind := handle_async_reply, action := nack}, + 1_000 + ); + async -> + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := handle_async_reply}, + 1_000 ) end end), - fun(Trace) -> + fun(Trace0) -> case QueryMode of sync -> - ?assertMatch( - [#{error := _} | _], - ?of_kind(greptimedb_connector_do_query_failure, Trace) + Trace = ?of_kind(handle_async_reply, Trace0), + ?assertMatch([_ | _], Trace), + [#{result := Result} | _] = Trace, + ?assert( + not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result), + #{got => Result} + ); + async -> + Trace = ?of_kind(handle_async_reply, Trace0), + ?assertMatch([_ | _], Trace), + [#{result := Result} | _] = Trace, + ?assert( + not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result), + #{got => Result} ) end, ok @@ -1029,6 +1082,23 @@ t_authentication_error_on_send_message(Config0) -> ?assertMatch( {error, {unrecoverable_error, <<"authorization failure">>}}, send_message(Config, SentData) + ); + async -> + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := handle_async_reply}, + 1_000 + ) + end, + fun(Trace) -> + ?assertMatch( + [#{error := <<"authorization failure">>} | _], + ?of_kind(greptimedb_connector_do_query_failure, Trace) + ), + ok + end ) end, ok. diff --git a/changes/ee/feat-12072.en.md b/changes/ee/feat-12072.en.md new file mode 100644 index 000000000..0b91b5c45 --- /dev/null +++ b/changes/ee/feat-12072.en.md @@ -0,0 +1,3 @@ +Supports async query mode for GreptimeDB data bridge. It provides better performance. + + diff --git a/mix.exs b/mix.exs index 567cd9e80..9c06e20af 100644 --- a/mix.exs +++ b/mix.exs @@ -236,7 +236,7 @@ defmodule EMQXUmbrella.MixProject do {:crc32cer, "0.1.8", override: true}, {:supervisor3, "1.1.12", override: true}, {:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true}, - {:greptimedb, github: "GreptimeTeam/greptimedb-client-erl", tag: "v0.1.2", override: true}, + {:greptimedb, github: "GreptimeTeam/greptimedb-client-erl", tag: "v0.1.6", override: true}, # The following two are dependencies of rabbit_common. They are needed here to # make mix not complain about conflicting versions {:thoas, github: "emqx/thoas", tag: "v1.0.0", override: true},