From 594d071c05ed243e1c6fb016fdd2e73e06f5dbf2 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 12 Aug 2022 17:45:20 +0800 Subject: [PATCH] feat(influxdb): add async callback --- apps/emqx_resource/src/emqx_resource.erl | 8 +++++++ .../src/emqx_ee_connector_influxdb.erl | 24 ++++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 60f0dd360..b79650904 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -114,6 +114,7 @@ -optional_callbacks([ on_query/3, on_batch_query/3, + on_query_async/4, on_get_status/2 ]). @@ -130,6 +131,13 @@ %% when calling emqx_resource:on_batch_query/3 -callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result(). +-callback on_query_async( + resource_id(), + Request :: term(), + {ReplyFun :: function(), Args :: list()}, + resource_state() +) -> query_result(). + %% when calling emqx_resource:health_check/2 -callback on_get_status(resource_id(), resource_state()) -> resource_status() diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 0a2bbb638..d0c17b6d5 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -18,6 +18,7 @@ on_stop/2, on_query/3, on_batch_query/3, + on_query_async/4, on_get_status/2 ]). @@ -62,6 +63,20 @@ on_batch_query(InstId, BatchData, State = #{write_syntax := SyntaxLines, client {resource_down, disconnected} end. +on_query_async( + InstId, + {send_message, Data}, + {ReplayFun, Args}, + _State = #{write_syntax := SyntaxLines, client := Client} +) -> + case data_to_points(Data, SyntaxLines) of + {ok, Points} -> + do_async_query(InstId, Client, Points, {ReplayFun, Args}); + {error, ErrorPoints} = Err -> + log_error_points(InstId, ErrorPoints), + Err + end. + on_get_status(_InstId, #{client := Client}) -> case influxdb:is_alive(Client) of true -> @@ -331,7 +346,6 @@ ssl_config(SSL = #{enable := true}) -> %% ------------------------------------------------------------------------------------------------- %% Query - do_query(InstId, Client, Points) -> case influxdb:write(Client, Points) of ok -> @@ -349,6 +363,14 @@ do_query(InstId, Client, Points) -> Err end. +do_async_query(InstId, Client, Points, ReplayFunAndArgs) -> + ?SLOG(info, #{ + msg => "influxdb write point async", + connector => InstId, + points => Points + }), + ok = influxdb:write_async(Client, Points, ReplayFunAndArgs). + %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Config Trans