From 0f6c3717602e20f77635831ea25eaf90b3d2ff7e Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 11 Aug 2022 18:12:41 +0800 Subject: [PATCH] feat(influxdb): influxdb connector add `on_batch_query/3` callback --- apps/emqx_resource/src/emqx_resource.erl | 1 + .../src/emqx_ee_bridge_influxdb.erl | 9 +- .../src/emqx_ee_connector_influxdb.erl | 106 ++++++++++++++---- 3 files changed, 91 insertions(+), 25 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 0d2289696..6601b9eea 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -126,6 +126,7 @@ %% when calling emqx_resource:query/3 -callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result(). +%% when calling emqx_resource:on_batch_query/3 -callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result(). %% when calling emqx_resource:health_check/2 diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 7de640040..4edeb786a 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -70,7 +70,8 @@ values(Protocol, post) -> write_syntax => <<"${topic},clientid=${clientid}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,", SupportUint/binary, - "bool=${payload.bool}">> + "bool=${payload.bool}">>, + batch => #{enable_batch => false, batch_size => 5, batch_time => <<"1m">>} }; values(Protocol, put) -> values(Protocol, post). @@ -109,7 +110,9 @@ fields("get_api_v2") -> fields(Name) when Name == influxdb_udp orelse Name == influxdb_api_v1 orelse Name == influxdb_api_v2 -> - fields(basic) ++ connector_field(Name). + fields(basic) ++ + emqx_resource_schema:fields('batch&async&queue') ++ + connector_field(Name). method_fileds(post, ConnectorType) -> fields(basic) ++ connector_field(ConnectorType) ++ type_name_field(ConnectorType); @@ -162,6 +165,8 @@ write_syntax(converter) -> fun to_influx_lines/1; write_syntax(desc) -> ?DESC("write_syntax"); +write_syntax(format) -> + <<"sql">>; write_syntax(_) -> undefined. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 09b3d7350..41f7059ec 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -17,6 +17,7 @@ on_start/2, on_stop/2, on_query/3, + on_batch_query/3, on_get_status/2 ]). @@ -37,8 +38,29 @@ on_start(InstId, Config) -> on_stop(_InstId, #{client := Client}) -> influxdb:stop_client(Client). -on_query(InstId, {send_message, Data}, State) -> - do_query(InstId, {send_message, Data}, State). +on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) -> + case data_to_points(Data, SyntaxLines) of + {ok, Points} -> + do_query(InstId, Client, Points); + {error, ErrorPoints} = Err -> + log_error_points(InstId, ErrorPoints), + Err + end. + +%% Once a Batched Data trans to points failed. +%% This batch query failed +on_batch_query(InstId, BatchData, State = #{write_syntax := SyntaxLines, client := Client}) -> + case on_get_status(InstId, State) of + connected -> + case parse_batch_data(InstId, BatchData, SyntaxLines) of + {ok, Points} -> + do_query(InstId, Client, Points); + {error, Reason} -> + {error, Reason} + end; + disconnected -> + {resource_down, disconnected} + end. on_get_status(_InstId, #{client := Client}) -> case influxdb:is_alive(Client) of @@ -79,7 +101,7 @@ fields("api_v2_put") -> fields(basic) -> [ {host, - mk(binary(), #{required => true, default => <<"120.0.0.1">>, desc => ?DESC("host")})}, + mk(binary(), #{required => true, default => <<"127.0.0.1">>, desc => ?DESC("host")})}, {port, mk(pos_integer(), #{required => true, default => 8086, desc => ?DESC("port")})}, {precision, mk(enum([ns, us, ms, s, m, h]), #{ @@ -310,18 +332,7 @@ ssl_config(SSL = #{enable := true}) -> %% ------------------------------------------------------------------------------------------------- %% Query -do_query(InstId, {send_message, Data}, State = #{client := Client}) -> - {Points, Errs} = data_to_points(Data, State), - lists:foreach( - fun({error, Reason}) -> - ?SLOG(error, #{ - msg => "influxdb trans point failed", - connector => InstId, - reason => Reason - }) - end, - Errs - ), +do_query(InstId, Client, Points) -> case influxdb:write(Client, Points) of ok -> ?SLOG(debug, #{ @@ -376,11 +387,45 @@ to_maps_config(K, V, Res) -> %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Data Trans -data_to_points(Data, #{write_syntax := Lines}) -> - lines_to_points(Data, Lines, [], []). +parse_batch_data(InstId, BatchData, SyntaxLines) -> + {Points, Errors} = lists:foldl( + fun({send_message, Data}, {AccIn, ErrAccIn}) -> + case data_to_points(Data, SyntaxLines) of + {ok, Points} -> + {[Points | AccIn], ErrAccIn}; + {error, ErrorPoints} -> + log_error_points(InstId, ErrorPoints), + {AccIn, ErrAccIn + 1} + end + end, + {[], 0}, + BatchData + ), + case Errors of + 0 -> + {ok, Points}; + _ -> + ?SLOG(error, #{ + msg => io_lib:format("InfluxDB trans point failed, count: ~p", [Errors]), + connector => InstId, + reason => points_trans_failed + }), + {error, points_trans_failed} + end. -lines_to_points(_, [], Points, Errs) -> - {Points, Errs}; +data_to_points(Data, SyntaxLines) -> + lines_to_points(Data, SyntaxLines, [], []). + +%% When converting multiple rows data into InfluxDB Line Protocol, they are considered to be strongly correlated. +%% And once a row fails to convert, all of them are considered to have failed. +lines_to_points(_, [], Points, ErrorPoints) -> + case ErrorPoints of + [] -> + {ok, Points}; + _ -> + %% ignore trans succeeded points + {error, ErrorPoints} + end; lines_to_points( Data, [ @@ -392,8 +437,8 @@ lines_to_points( } | Rest ], - ResAcc, - ErrAcc + ResultPointsAcc, + ErrorPointsAcc ) -> TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of @@ -406,9 +451,11 @@ lines_to_points( tags => EncodeTags, fields => EncodeFields }, - lines_to_points(Data, Rest, [Point | ResAcc], ErrAcc); + lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc); BadTimestamp -> - lines_to_points(Data, Rest, ResAcc, [{error, {bad_timestamp, BadTimestamp}} | ErrAcc]) + lines_to_points(Data, Rest, ResultPointsAcc, [ + {error, {bad_timestamp, BadTimestamp}} | ErrorPointsAcc + ]) end. maps_config_to_data(K, V, {Data, Res}) -> @@ -461,3 +508,16 @@ data_filter(Bool) when is_boolean(Bool) -> Bool; data_filter(Data) -> bin(Data). bin(Data) -> emqx_plugin_libs_rule:bin(Data). + +%% helper funcs +log_error_points(InstId, Errs) -> + lists:foreach( + fun({error, Reason}) -> + ?SLOG(error, #{ + msg => "influxdb trans point failed", + connector => InstId, + reason => Reason + }) + end, + Errs + ).