feat: supports async mode for greptimedb data bridge
This commit is contained in:
parent
90d0081df4
commit
8bc874c261
|
@ -21,8 +21,11 @@
|
||||||
on_stop/2,
|
on_stop/2,
|
||||||
on_query/3,
|
on_query/3,
|
||||||
on_batch_query/3,
|
on_batch_query/3,
|
||||||
|
on_query_async/4,
|
||||||
|
on_batch_query_async/4,
|
||||||
on_get_status/2
|
on_get_status/2
|
||||||
]).
|
]).
|
||||||
|
-export([reply_callback/2]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
roots/0,
|
roots/0,
|
||||||
|
@ -57,7 +60,7 @@
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% resource callback
|
%% resource callback
|
||||||
callback_mode() -> always_sync.
|
callback_mode() -> async_if_possible.
|
||||||
|
|
||||||
on_start(InstId, Config) ->
|
on_start(InstId, Config) ->
|
||||||
%% InstID as pool would be handled by greptimedb client
|
%% InstID as pool would be handled by greptimedb client
|
||||||
|
@ -110,6 +113,49 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
|
||||||
{error, {unrecoverable_error, Reason}}
|
{error, {unrecoverable_error, Reason}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
on_query_async(
|
||||||
|
InstId,
|
||||||
|
{send_message, Data},
|
||||||
|
{ReplyFun, Args},
|
||||||
|
_State = #{write_syntax := SyntaxLines, client := Client}
|
||||||
|
) ->
|
||||||
|
case data_to_points(Data, SyntaxLines) of
|
||||||
|
{ok, Points} ->
|
||||||
|
?tp(
|
||||||
|
greptimedb_connector_send_query,
|
||||||
|
#{points => Points, batch => false, mode => async}
|
||||||
|
),
|
||||||
|
do_async_query(InstId, Client, Points, {ReplyFun, Args});
|
||||||
|
{error, ErrorPoints} = Err ->
|
||||||
|
?tp(
|
||||||
|
greptimedb_connector_send_query_error,
|
||||||
|
#{batch => false, mode => async, error => ErrorPoints}
|
||||||
|
),
|
||||||
|
log_error_points(InstId, ErrorPoints),
|
||||||
|
Err
|
||||||
|
end.
|
||||||
|
|
||||||
|
on_batch_query_async(
|
||||||
|
InstId,
|
||||||
|
BatchData,
|
||||||
|
{ReplyFun, Args},
|
||||||
|
#{write_syntax := SyntaxLines, client := Client}
|
||||||
|
) ->
|
||||||
|
case parse_batch_data(InstId, BatchData, SyntaxLines) of
|
||||||
|
{ok, Points} ->
|
||||||
|
?tp(
|
||||||
|
greptimedb_connector_send_query,
|
||||||
|
#{points => Points, batch => true, mode => async}
|
||||||
|
),
|
||||||
|
do_async_query(InstId, Client, Points, {ReplyFun, Args});
|
||||||
|
{error, Reason} ->
|
||||||
|
?tp(
|
||||||
|
greptimedb_connector_send_query_error,
|
||||||
|
#{batch => true, mode => async, error => Reason}
|
||||||
|
),
|
||||||
|
{error, {unrecoverable_error, Reason}}
|
||||||
|
end.
|
||||||
|
|
||||||
on_get_status(_InstId, #{client := Client}) ->
|
on_get_status(_InstId, #{client := Client}) ->
|
||||||
case greptimedb:is_alive(Client) of
|
case greptimedb:is_alive(Client) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -344,6 +390,31 @@ do_query(InstId, Client, Points) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
|
||||||
|
?SLOG(info, #{
|
||||||
|
msg => "greptimedb_write_point_async",
|
||||||
|
connector => InstId,
|
||||||
|
points => Points
|
||||||
|
}),
|
||||||
|
WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
|
||||||
|
ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs).
|
||||||
|
|
||||||
|
reply_callback(ReplyFunAndArgs, {error, {unauth, _, _}}) ->
|
||||||
|
?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}),
|
||||||
|
Result = {error, {unrecoverable_error, <<"authorization failure">>}},
|
||||||
|
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
|
||||||
|
reply_callback(ReplyFunAndArgs, {error, Reason} = Error) ->
|
||||||
|
case is_unrecoverable_error(Error) of
|
||||||
|
true ->
|
||||||
|
Result = {error, {unrecoverable_error, Reason}},
|
||||||
|
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
|
||||||
|
false ->
|
||||||
|
Result = {error, {recoverable_error, Reason}},
|
||||||
|
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
|
||||||
|
end;
|
||||||
|
reply_callback(ReplyFunAndArgs, Result) ->
|
||||||
|
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% Tags & Fields Config Trans
|
%% Tags & Fields Config Trans
|
||||||
|
|
||||||
|
|
|
@ -25,16 +25,23 @@ groups() ->
|
||||||
TCs = emqx_common_test_helpers:all(?MODULE),
|
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||||
[
|
[
|
||||||
{with_batch, [
|
{with_batch, [
|
||||||
{group, sync_query}
|
{group, sync_query},
|
||||||
|
{group, async_query}
|
||||||
]},
|
]},
|
||||||
{without_batch, [
|
{without_batch, [
|
||||||
{group, sync_query}
|
{group, sync_query},
|
||||||
|
{group, async_query}
|
||||||
]},
|
]},
|
||||||
{sync_query, [
|
{sync_query, [
|
||||||
{group, grpcv1_tcp}
|
{group, grpcv1_tcp}
|
||||||
%% uncomment tls when we are ready
|
%% uncomment tls when we are ready
|
||||||
%% {group, grpcv1_tls}
|
%% {group, grpcv1_tls}
|
||||||
]},
|
]},
|
||||||
|
{async_query, [
|
||||||
|
{group, grpcv1_tcp}
|
||||||
|
%% uncomment tls when we are ready
|
||||||
|
%% {group, grpcv1_tls}
|
||||||
|
]},
|
||||||
{grpcv1_tcp, TCs}
|
{grpcv1_tcp, TCs}
|
||||||
%%{grpcv1_tls, TCs}
|
%%{grpcv1_tls, TCs}
|
||||||
].
|
].
|
||||||
|
@ -130,6 +137,8 @@ init_per_group(GreptimedbType, Config0) when
|
||||||
end;
|
end;
|
||||||
init_per_group(sync_query, Config) ->
|
init_per_group(sync_query, Config) ->
|
||||||
[{query_mode, sync} | Config];
|
[{query_mode, sync} | Config];
|
||||||
|
init_per_group(async_query, Config) ->
|
||||||
|
[{query_mode, async} | Config];
|
||||||
init_per_group(with_batch, Config) ->
|
init_per_group(with_batch, Config) ->
|
||||||
[{batch_size, 100} | Config];
|
[{batch_size, 100} | Config];
|
||||||
init_per_group(without_batch, Config) ->
|
init_per_group(without_batch, Config) ->
|
||||||
|
@ -420,6 +429,9 @@ t_start_ok(Config) ->
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
case QueryMode of
|
case QueryMode of
|
||||||
|
async ->
|
||||||
|
?assertMatch(ok, send_message(Config, SentData)),
|
||||||
|
ct:sleep(500);
|
||||||
sync ->
|
sync ->
|
||||||
?assertMatch({ok, _}, send_message(Config, SentData))
|
?assertMatch({ok, _}, send_message(Config, SentData))
|
||||||
end,
|
end,
|
||||||
|
@ -666,6 +678,9 @@ t_const_timestamp(Config) ->
|
||||||
<<"timestamp">> => erlang:system_time(millisecond)
|
<<"timestamp">> => erlang:system_time(millisecond)
|
||||||
},
|
},
|
||||||
case QueryMode of
|
case QueryMode of
|
||||||
|
async ->
|
||||||
|
?assertMatch(ok, send_message(Config, SentData)),
|
||||||
|
ct:sleep(500);
|
||||||
sync ->
|
sync ->
|
||||||
?assertMatch({ok, _}, send_message(Config, SentData))
|
?assertMatch({ok, _}, send_message(Config, SentData))
|
||||||
end,
|
end,
|
||||||
|
@ -709,9 +724,12 @@ t_boolean_variants(Config) ->
|
||||||
},
|
},
|
||||||
case QueryMode of
|
case QueryMode of
|
||||||
sync ->
|
sync ->
|
||||||
?assertMatch({ok, _}, send_message(Config, SentData))
|
?assertMatch({ok, _}, send_message(Config, SentData));
|
||||||
|
async ->
|
||||||
|
?assertMatch(ok, send_message(Config, SentData))
|
||||||
end,
|
end,
|
||||||
case QueryMode of
|
case QueryMode of
|
||||||
|
async -> ct:sleep(500);
|
||||||
sync -> ok
|
sync -> ok
|
||||||
end,
|
end,
|
||||||
PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config),
|
PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config),
|
||||||
|
@ -909,15 +927,29 @@ t_write_failure(Config) ->
|
||||||
),
|
),
|
||||||
#{?snk_kind := greptimedb_connector_do_query_failure, action := nack},
|
#{?snk_kind := greptimedb_connector_do_query_failure, action := nack},
|
||||||
16_000
|
16_000
|
||||||
|
);
|
||||||
|
async ->
|
||||||
|
?wait_async_action(
|
||||||
|
?assertEqual(ok, send_message(Config, SentData)),
|
||||||
|
#{?snk_kind := handle_async_reply},
|
||||||
|
1_000
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
end),
|
end),
|
||||||
fun(Trace) ->
|
fun(Trace0) ->
|
||||||
case QueryMode of
|
case QueryMode of
|
||||||
sync ->
|
sync ->
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[#{error := _} | _],
|
[#{error := _} | _],
|
||||||
?of_kind(greptimedb_connector_do_query_failure, Trace)
|
?of_kind(greptimedb_connector_do_query_failure, Trace0)
|
||||||
|
);
|
||||||
|
async ->
|
||||||
|
Trace = ?of_kind(handle_async_reply, Trace0),
|
||||||
|
?assertMatch([#{action := nack} | _], Trace),
|
||||||
|
[#{result := Result} | _] = Trace,
|
||||||
|
?assert(
|
||||||
|
not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result),
|
||||||
|
#{got => Result}
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
ok
|
ok
|
||||||
|
@ -1029,6 +1061,23 @@ t_authentication_error_on_send_message(Config0) ->
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{error, {unrecoverable_error, <<"authorization failure">>}},
|
{error, {unrecoverable_error, <<"authorization failure">>}},
|
||||||
send_message(Config, SentData)
|
send_message(Config, SentData)
|
||||||
|
);
|
||||||
|
async ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
?wait_async_action(
|
||||||
|
?assertEqual(ok, send_message(Config, SentData)),
|
||||||
|
#{?snk_kind := handle_async_reply},
|
||||||
|
1_000
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertMatch(
|
||||||
|
[#{error := <<"authorization failure">>} | _],
|
||||||
|
?of_kind(greptimedb_connector_do_query_failure, Trace)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
ok.
|
ok.
|
||||||
|
|
Loading…
Reference in New Issue