feat: influxdb support `async`/`batch_async` query

This commit is contained in:
JimMoen 2022-08-15 11:57:37 +08:00
parent b01ae8ece6
commit 68946f1f6c
2 changed files with 37 additions and 13 deletions

View File

@ -115,6 +115,7 @@
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
on_query_async/4, on_query_async/4,
on_batch_query_async/4,
on_get_status/2 on_get_status/2
]). ]).
@ -131,6 +132,7 @@
%% when calling emqx_resource:on_batch_query/3 %% when calling emqx_resource:on_batch_query/3
-callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result(). -callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result().
%% when calling emqx_resource:on_query_async/4
-callback on_query_async( -callback on_query_async(
resource_id(), resource_id(),
Request :: term(), Request :: term(),
@ -138,6 +140,14 @@
resource_state() resource_state()
) -> query_result(). ) -> query_result().
%% when calling emqx_resource:on_batch_query_async/4
-callback on_batch_query_async(
resource_id(),
Request :: term(),
{ReplyFun :: function(), Args :: list()},
resource_state()
) -> query_result().
%% when calling emqx_resource:health_check/2 %% when calling emqx_resource:health_check/2
-callback on_get_status(resource_id(), resource_state()) -> -callback on_get_status(resource_id(), resource_state()) ->
resource_status() resource_status()

View File

@ -19,6 +19,7 @@
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
on_query_async/4, on_query_async/4,
on_batch_query_async/4,
on_get_status/2 on_get_status/2
]). ]).
@ -31,7 +32,7 @@
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% resource callback %% resource callback
callback_mode() -> always_sync. callback_mode() -> async_if_possible.
on_start(InstId, Config) -> on_start(InstId, Config) ->
start_client(InstId, Config). start_client(InstId, Config).
@ -50,17 +51,12 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c
%% Once a Batched Data trans to points failed. %% Once a Batched Data trans to points failed.
%% This batch query failed %% This batch query failed
on_batch_query(InstId, BatchData, State = #{write_syntax := SyntaxLines, client := Client}) -> on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) ->
case on_get_status(InstId, State) of case parse_batch_data(InstId, BatchData, SyntaxLines) of
connected -> {ok, Points} ->
case parse_batch_data(InstId, BatchData, SyntaxLines) of do_query(InstId, Client, Points);
{ok, Points} -> {error, Reason} ->
do_query(InstId, Client, Points); {error, Reason}
{error, Reason} ->
{error, Reason}
end;
disconnected ->
{resource_down, disconnected}
end. end.
on_query_async( on_query_async(
@ -77,6 +73,24 @@ on_query_async(
Err Err
end. end.
on_batch_query_async(
InstId,
BatchData,
{ReplayFun, Args},
State = #{write_syntax := SyntaxLines, client := Client}
) ->
case on_get_status(InstId, State) of
connected ->
case parse_batch_data(InstId, BatchData, SyntaxLines) of
{ok, Points} ->
do_async_query(InstId, Client, Points, {ReplayFun, Args});
{error, Reason} ->
{error, Reason}
end;
disconnected ->
{resource_down, disconnected}
end.
on_get_status(_InstId, #{client := Client}) -> on_get_status(_InstId, #{client := Client}) ->
case influxdb:is_alive(Client) of case influxdb:is_alive(Client) of
true -> true ->
@ -122,7 +136,7 @@ fields(basic) ->
mk(enum([ns, us, ms, s, m, h]), #{ mk(enum([ns, us, ms, s, m, h]), #{
required => false, default => ms, desc => ?DESC("precision") required => false, default => ms, desc => ?DESC("precision")
})}, })},
{pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})} {pool_size, mk(pos_integer(), #{desc => ?DESC("pool_size")})}
]; ];
fields(influxdb_udp) -> fields(influxdb_udp) ->
fields(basic); fields(basic);