diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf index 3930825e5..701608721 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf @@ -18,45 +18,31 @@ will be forwarded. zh: "本地 Topic" } } - measurement { + write_syntax { desc { - en: """The measurement name to be forwarded to the InfluxDB. Placeholders supported.""" - zh: """要转发到 InfluxDB 的 Measurement 名称,支持占位符""" + en: """ +Conf of InfluxDB line protocol to write data points. It is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point, and placeHolder supported. +See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) and +[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
+TLDR: +``` +[,=[,=]] =[,=] [] +``` +""" + zh: """ +使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符
+参考 [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) 及 +[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
+TLDR: +``` +[,=[,=]] =[,=] [] +``` +""" } label { - en: "Measurement" - zh: "Measurement" - } - } - timestamp { - desc { - en: """The timestamp to be forwarded to the InfluxDB. Placeholders supported. Default is message timestamp""" - zh: """要转发到 InfluxDB 的时间戳,支持占位符。默认使用消息的时间戳""" + en: "write_syntax" + zh: "写语句" } - label { - en: "Timestamp" - zh: "Timestamp" - } - } - tags { - desc { - en: """The tags to be forwarded to the InfluxDB. Placeholders supported.""" - zh: """要转发到 InfluxDB 的 Tags 数据内容,支持占位符""" - } - label { - en: "Tags" - zh: "Tags" - } - } - fields { - desc { - en: """The fields to be forwarded to the InfluxDB. Placeholders supported.""" - zh: """要转发到 InfluxDB 的 fields 数据内容,支持占位符""" - } - label { - en: "Fields" - zh: "Fields" - } } config_enable { desc { diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index a55d9d47a..3a6787af6 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -3,9 +3,10 @@ %%-------------------------------------------------------------------- -module(emqx_ee_bridge_influxdb). +-include("emqx_ee_bridge.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --include("emqx_ee_bridge.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -55,13 +56,10 @@ values(Protocol, post) -> enable => true, direction => egress, local_topic => <<"local/topic/#">>, - measurement => <<"${topic}">>, - tags => #{<<"clientid">> => <<"${clientid}">>}, - fields => #{ - <<"payload">> => <<"${payload}">>, - <<"int_value">> => [int, <<"${payload.int_key}">>], - <<"uint_value">> => [uint, <<"${payload.uint_key}">>] - } + write_syntax => + <<"${topic},clientid=${clientid}", " ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", "uint_value=${payload.uint_key}u,", + "bool=${payload.bool}">> }; values(Protocol, put) -> values(Protocol, post). @@ -77,13 +75,7 @@ fields(basic) -> {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, - {measurement, mk(binary(), #{desc => ?DESC("measurement"), required => true})}, - {timestamp, - mk(binary(), #{ - desc => ?DESC("timestamp"), default => <<"${timestamp}">>, required => false - })}, - {tags, mk(map(), #{desc => ?DESC("tags"), required => false})}, - {fields, mk(map(), #{desc => ?DESC("fields"), required => true})} + {write_syntax, fun write_syntax/1} ]; fields("post_udp") -> method_fileds(post, influxdb_udp); @@ -142,3 +134,80 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for HStream using `", string:to_upper(Method), "` method."]; desc(_) -> undefined. + +write_syntax(type) -> + list(); +write_syntax(required) -> + true; +write_syntax(validator) -> + [?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")]; +write_syntax(converter) -> + fun converter_influx_lines/1; +write_syntax(desc) -> + ?DESC("write_syntax"); +write_syntax(_) -> + undefined. + +converter_influx_lines(RawLines) -> + Lines = string:tokens(str(RawLines), "\n"), + lists:reverse(lists:foldl(fun converter_influx_line/2, [], Lines)). + +converter_influx_line(Line, AccIn) -> + case string:tokens(str(Line), " ") of + [MeasurementAndTags, Fields, Timestamp] -> + {Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags), + [ + #{ + measurement => Measurement, + tags => kv_pairs(Tags), + fields => kv_pairs(string:tokens(Fields, ",")), + timestamp => Timestamp + } + | AccIn + ]; + [MeasurementAndTags, Fields] -> + {Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags), + %% TODO: fix here both here and influxdb driver. + %% Default value should evaluated by InfluxDB. + [ + #{ + measurement => Measurement, + tags => kv_pairs(Tags), + fields => kv_pairs(string:tokens(Fields, ",")), + timestamp => "${timestamp}" + } + | AccIn + ]; + _ -> + throw("Bad InfluxDB Line Protocol schema") + end. + +split_measurement_and_tags(Subject) -> + case string:tokens(Subject, ",") of + [] -> + throw("Bad Measurement schema"); + [Measurement] -> + {Measurement, []}; + [Measurement | Tags] -> + {Measurement, Tags} + end. + +kv_pairs(Pairs) -> + kv_pairs(Pairs, []). +kv_pairs([], Acc) -> + lists:reverse(Acc); +kv_pairs([Pair | Rest], Acc) -> + case string:tokens(Pair, "=") of + [K, V] -> + %% Reduplicated keys will be overwritten. Follows InfluxDB Line Protocol. + kv_pairs(Rest, [{K, V} | Acc]); + _ -> + throw(io_lib:format("Bad InfluxDB Line Protocol Key Value pair: ~p", Pair)) + end. + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index f0e5cd333..aca92e791 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -191,10 +191,7 @@ do_start_client( ClientConfig, Config = #{ egress := #{ - measurement := Measurement, - timestamp := Timestamp, - tags := Tags, - fields := Fields + write_syntax := Lines } } ) -> @@ -204,10 +201,7 @@ do_start_client( true -> State = #{ client => Client, - measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), - timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp), - tags => to_tags_config(Tags), - fields => to_fields_config(Fields) + write_syntax => to_config(Lines) }, ?SLOG(info, #{ msg => "starting influxdb connector success", @@ -306,46 +300,65 @@ ssl_config(SSL = #{enable := true}) -> %% Query do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client}) -> - case data_to_point(Data, State) of - {ok, Point} -> - case influxdb:write(Client, [Point]) of - ok -> - ?SLOG(debug, #{ - msg => "influxdb write point success", - connector => InstId, - point => Point - }), - emqx_resource:query_success(AfterQuery); - {error, Reason} -> - ?SLOG(error, #{ - msg => "influxdb write point failed", - connector => InstId, - reason => Reason - }), - emqx_resource:query_failed(AfterQuery) - end; - {error, Reason} -> + {Points, Errs} = data_to_points(Data, State), + lists:foreach( + fun({error, Reason}) -> ?SLOG(error, #{ msg => "influxdb trans point failed", connector => InstId, reason => Reason + }) + end, + Errs + ), + case influxdb:write(Client, Points) of + ok -> + ?SLOG(debug, #{ + msg => "influxdb write point success", + connector => InstId, + points => Points }), - {error, Reason} + emqx_resource:query_success(AfterQuery); + {error, Reason} -> + ?SLOG(error, #{ + msg => "influxdb write point failed", + connector => InstId, + reason => Reason + }), + emqx_resource:query_failed(AfterQuery) end. %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Config Trans -to_tags_config(Tags) -> - maps:fold(fun to_maps_config/3, #{}, Tags). +to_config(Lines) -> + to_config(Lines, []). -to_fields_config(Fields) -> - maps:fold(fun to_maps_config/3, #{}, Fields). +to_config([], Acc) -> + lists:reverse(Acc); +to_config( + [ + #{ + measurement := Measurement, + timestamp := Timestamp, + tags := Tags, + fields := Fields + } + | Rest + ], + Acc +) -> + Res = #{ + measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), + timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp), + tags => to_kv_config(Tags), + fields => to_kv_config(Fields) + }, + to_config(Rest, [Res | Acc]). + +to_kv_config(KVfields) -> + maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)). -to_maps_config(K, [IntType, V], Res) when IntType == <<"int">> orelse IntType == <<"uint">> -> - NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)), - NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)), - Res#{NK => {binary_to_atom(IntType, utf8), NV}}; to_maps_config(K, V, Res) -> NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)), NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)), @@ -353,14 +366,24 @@ to_maps_config(K, V, Res) -> %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Data Trans -data_to_point( +data_to_points(Data, #{write_syntax := Lines}) -> + lines_to_points(Data, Lines, [], []). + +lines_to_points(_, [], Points, Errs) -> + {Points, Errs}; +lines_to_points( Data, - #{ - measurement := Measurement, - timestamp := Timestamp, - tags := Tags, - fields := Fields - } + [ + #{ + measurement := Measurement, + timestamp := Timestamp, + tags := Tags, + fields := Fields + } + | Rest + ], + ResAcc, + ErrAcc ) -> TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of @@ -373,24 +396,11 @@ data_to_point( tags => EncodeTags, fields => EncodeFields }, - {ok, Point}; + lines_to_points(Data, Rest, [Point | ResAcc], ErrAcc); BadTimestamp -> - {error, {bad_timestamp, BadTimestamp}} + lines_to_points(Data, Rest, ResAcc, [{error, {bad_timestamp, BadTimestamp}} | ErrAcc]) end. -maps_config_to_data(K, {IntType, V}, {Data, Res}) when IntType == int orelse IntType == uint -> - KTransOptions = #{return => full_binary}, - VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions), - NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions), - case {NK, NV} of - {[undefined], _} -> - {Data, Res}; - {_, [undefined]} -> - {Data, Res}; - {_, [IntV]} when is_integer(IntV) -> - {Data, Res#{NK => {IntType, IntV}}} - end; maps_config_to_data(K, V, {Data, Res}) -> KTransOptions = #{return => full_binary}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, @@ -402,9 +412,38 @@ maps_config_to_data(K, V, {Data, Res}) -> {_, [undefined]} -> {Data, Res}; _ -> - {Data, Res#{NK => NV}} + {Data, Res#{NK => value_type(NV)}} end. +value_type([Int, <<"i">>]) when + is_integer(Int) +-> + {int, Int}; +value_type([UInt, <<"u">>]) -> + {uint, UInt}; +value_type([<<"t">>]) -> + 't'; +value_type([<<"T">>]) -> + 'T'; +value_type([<<"true">>]) -> + 'true'; +value_type([<<"TRUE">>]) -> + 'TRUE'; +value_type([<<"True">>]) -> + 'True'; +value_type([<<"f">>]) -> + 'f'; +value_type([<<"F">>]) -> + 'F'; +value_type([<<"false">>]) -> + 'false'; +value_type([<<"FALSE">>]) -> + 'FALSE'; +value_type([<<"False">>]) -> + 'False'; +value_type(Val) -> + Val. + data_filter(undefined) -> undefined; data_filter(Int) when is_integer(Int) -> Int; data_filter(Number) when is_number(Number) -> Number;