From 68946f1f6c5638d5c41272f69484858706fdb293 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 15 Aug 2022 11:57:37 +0800 Subject: [PATCH] feat: influxdb support `async`/`batch_async` query --- apps/emqx_resource/src/emqx_resource.erl | 10 +++++ .../src/emqx_ee_connector_influxdb.erl | 40 +++++++++++++------ 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b79650904..99e1f6057 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -115,6 +115,7 @@ on_query/3, on_batch_query/3, on_query_async/4, + on_batch_query_async/4, on_get_status/2 ]). @@ -131,6 +132,7 @@ %% when calling emqx_resource:on_batch_query/3 -callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result(). +%% when calling emqx_resource:on_query_async/4 -callback on_query_async( resource_id(), Request :: term(), @@ -138,6 +140,14 @@ resource_state() ) -> query_result(). +%% when calling emqx_resource:on_batch_query_async/4 +-callback on_batch_query_async( + resource_id(), + Request :: term(), + {ReplyFun :: function(), Args :: list()}, + resource_state() +) -> query_result(). + %% when calling emqx_resource:health_check/2 -callback on_get_status(resource_id(), resource_state()) -> resource_status() diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index d0c17b6d5..2c2de9a99 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -19,6 +19,7 @@ on_query/3, on_batch_query/3, on_query_async/4, + on_batch_query_async/4, on_get_status/2 ]). @@ -31,7 +32,7 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback -callback_mode() -> always_sync. +callback_mode() -> async_if_possible. on_start(InstId, Config) -> start_client(InstId, Config). @@ -50,17 +51,12 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c %% Once a Batched Data trans to points failed. %% This batch query failed -on_batch_query(InstId, BatchData, State = #{write_syntax := SyntaxLines, client := Client}) -> - case on_get_status(InstId, State) of - connected -> - case parse_batch_data(InstId, BatchData, SyntaxLines) of - {ok, Points} -> - do_query(InstId, Client, Points); - {error, Reason} -> - {error, Reason} - end; - disconnected -> - {resource_down, disconnected} +on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) -> + case parse_batch_data(InstId, BatchData, SyntaxLines) of + {ok, Points} -> + do_query(InstId, Client, Points); + {error, Reason} -> + {error, Reason} end. on_query_async( @@ -77,6 +73,24 @@ on_query_async( Err end. +on_batch_query_async( + InstId, + BatchData, + {ReplayFun, Args}, + State = #{write_syntax := SyntaxLines, client := Client} +) -> + case on_get_status(InstId, State) of + connected -> + case parse_batch_data(InstId, BatchData, SyntaxLines) of + {ok, Points} -> + do_async_query(InstId, Client, Points, {ReplayFun, Args}); + {error, Reason} -> + {error, Reason} + end; + disconnected -> + {resource_down, disconnected} + end. + on_get_status(_InstId, #{client := Client}) -> case influxdb:is_alive(Client) of true -> @@ -122,7 +136,7 @@ fields(basic) -> mk(enum([ns, us, ms, s, m, h]), #{ required => false, default => ms, desc => ?DESC("precision") })}, - {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})} + {pool_size, mk(pos_integer(), #{desc => ?DESC("pool_size")})} ]; fields(influxdb_udp) -> fields(basic);