feat: refine influxdb bridge conf
Consistent influxdb line protocol config to raw syntax format. See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) and [InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
This commit is contained in:
parent
06f246a896
commit
35e347aec8
|
@ -18,44 +18,30 @@ will be forwarded.
|
|||
zh: "本地 Topic"
|
||||
}
|
||||
}
|
||||
measurement {
|
||||
write_syntax {
|
||||
desc {
|
||||
en: """The measurement name to be forwarded to the InfluxDB. Placeholders supported."""
|
||||
zh: """要转发到 InfluxDB 的 Measurement 名称,支持占位符"""
|
||||
en: """
|
||||
Conf of InfluxDB line protocol to write data points. It is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point, and placeHolder supported.
|
||||
See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) and
|
||||
[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) </br>
|
||||
TLDR:
|
||||
```
|
||||
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
|
||||
```
|
||||
"""
|
||||
zh: """
|
||||
使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符</br>
|
||||
参考 [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) 及
|
||||
[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) </br>
|
||||
TLDR:
|
||||
```
|
||||
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
|
||||
```
|
||||
"""
|
||||
}
|
||||
label {
|
||||
en: "Measurement"
|
||||
zh: "Measurement"
|
||||
}
|
||||
}
|
||||
timestamp {
|
||||
desc {
|
||||
en: """The timestamp to be forwarded to the InfluxDB. Placeholders supported. Default is message timestamp"""
|
||||
zh: """要转发到 InfluxDB 的时间戳,支持占位符。默认使用消息的时间戳"""
|
||||
}
|
||||
label {
|
||||
en: "Timestamp"
|
||||
zh: "Timestamp"
|
||||
}
|
||||
}
|
||||
tags {
|
||||
desc {
|
||||
en: """The tags to be forwarded to the InfluxDB. Placeholders supported."""
|
||||
zh: """要转发到 InfluxDB 的 Tags 数据内容,支持占位符"""
|
||||
}
|
||||
label {
|
||||
en: "Tags"
|
||||
zh: "Tags"
|
||||
}
|
||||
}
|
||||
fields {
|
||||
desc {
|
||||
en: """The fields to be forwarded to the InfluxDB. Placeholders supported."""
|
||||
zh: """要转发到 InfluxDB 的 fields 数据内容,支持占位符"""
|
||||
}
|
||||
label {
|
||||
en: "Fields"
|
||||
zh: "Fields"
|
||||
en: "write_syntax"
|
||||
zh: "写语句"
|
||||
}
|
||||
}
|
||||
config_enable {
|
||||
|
|
|
@ -3,9 +3,10 @@
|
|||
%%--------------------------------------------------------------------
|
||||
-module(emqx_ee_bridge_influxdb).
|
||||
|
||||
-include("emqx_ee_bridge.hrl").
|
||||
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include("emqx_ee_bridge.hrl").
|
||||
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
|
@ -55,13 +56,10 @@ values(Protocol, post) ->
|
|||
enable => true,
|
||||
direction => egress,
|
||||
local_topic => <<"local/topic/#">>,
|
||||
measurement => <<"${topic}">>,
|
||||
tags => #{<<"clientid">> => <<"${clientid}">>},
|
||||
fields => #{
|
||||
<<"payload">> => <<"${payload}">>,
|
||||
<<"int_value">> => [int, <<"${payload.int_key}">>],
|
||||
<<"uint_value">> => [uint, <<"${payload.uint_key}">>]
|
||||
}
|
||||
write_syntax =>
|
||||
<<"${topic},clientid=${clientid}", " ", "payload=${payload},",
|
||||
"${clientid}_int_value=${payload.int_key}i,", "uint_value=${payload.uint_key}u,",
|
||||
"bool=${payload.bool}">>
|
||||
};
|
||||
values(Protocol, put) ->
|
||||
values(Protocol, post).
|
||||
|
@ -77,13 +75,7 @@ fields(basic) ->
|
|||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
|
||||
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
|
||||
{measurement, mk(binary(), #{desc => ?DESC("measurement"), required => true})},
|
||||
{timestamp,
|
||||
mk(binary(), #{
|
||||
desc => ?DESC("timestamp"), default => <<"${timestamp}">>, required => false
|
||||
})},
|
||||
{tags, mk(map(), #{desc => ?DESC("tags"), required => false})},
|
||||
{fields, mk(map(), #{desc => ?DESC("fields"), required => true})}
|
||||
{write_syntax, fun write_syntax/1}
|
||||
];
|
||||
fields("post_udp") ->
|
||||
method_fileds(post, influxdb_udp);
|
||||
|
@ -142,3 +134,80 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
|||
["Configuration for HStream using `", string:to_upper(Method), "` method."];
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
write_syntax(type) ->
|
||||
list();
|
||||
write_syntax(required) ->
|
||||
true;
|
||||
write_syntax(validator) ->
|
||||
[?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")];
|
||||
write_syntax(converter) ->
|
||||
fun converter_influx_lines/1;
|
||||
write_syntax(desc) ->
|
||||
?DESC("write_syntax");
|
||||
write_syntax(_) ->
|
||||
undefined.
|
||||
|
||||
converter_influx_lines(RawLines) ->
|
||||
Lines = string:tokens(str(RawLines), "\n"),
|
||||
lists:reverse(lists:foldl(fun converter_influx_line/2, [], Lines)).
|
||||
|
||||
converter_influx_line(Line, AccIn) ->
|
||||
case string:tokens(str(Line), " ") of
|
||||
[MeasurementAndTags, Fields, Timestamp] ->
|
||||
{Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags),
|
||||
[
|
||||
#{
|
||||
measurement => Measurement,
|
||||
tags => kv_pairs(Tags),
|
||||
fields => kv_pairs(string:tokens(Fields, ",")),
|
||||
timestamp => Timestamp
|
||||
}
|
||||
| AccIn
|
||||
];
|
||||
[MeasurementAndTags, Fields] ->
|
||||
{Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags),
|
||||
%% TODO: fix here both here and influxdb driver.
|
||||
%% Default value should evaluated by InfluxDB.
|
||||
[
|
||||
#{
|
||||
measurement => Measurement,
|
||||
tags => kv_pairs(Tags),
|
||||
fields => kv_pairs(string:tokens(Fields, ",")),
|
||||
timestamp => "${timestamp}"
|
||||
}
|
||||
| AccIn
|
||||
];
|
||||
_ ->
|
||||
throw("Bad InfluxDB Line Protocol schema")
|
||||
end.
|
||||
|
||||
split_measurement_and_tags(Subject) ->
|
||||
case string:tokens(Subject, ",") of
|
||||
[] ->
|
||||
throw("Bad Measurement schema");
|
||||
[Measurement] ->
|
||||
{Measurement, []};
|
||||
[Measurement | Tags] ->
|
||||
{Measurement, Tags}
|
||||
end.
|
||||
|
||||
kv_pairs(Pairs) ->
|
||||
kv_pairs(Pairs, []).
|
||||
kv_pairs([], Acc) ->
|
||||
lists:reverse(Acc);
|
||||
kv_pairs([Pair | Rest], Acc) ->
|
||||
case string:tokens(Pair, "=") of
|
||||
[K, V] ->
|
||||
%% Reduplicated keys will be overwritten. Follows InfluxDB Line Protocol.
|
||||
kv_pairs(Rest, [{K, V} | Acc]);
|
||||
_ ->
|
||||
throw(io_lib:format("Bad InfluxDB Line Protocol Key Value pair: ~p", Pair))
|
||||
end.
|
||||
|
||||
str(A) when is_atom(A) ->
|
||||
atom_to_list(A);
|
||||
str(B) when is_binary(B) ->
|
||||
binary_to_list(B);
|
||||
str(S) when is_list(S) ->
|
||||
S.
|
||||
|
|
|
@ -191,10 +191,7 @@ do_start_client(
|
|||
ClientConfig,
|
||||
Config = #{
|
||||
egress := #{
|
||||
measurement := Measurement,
|
||||
timestamp := Timestamp,
|
||||
tags := Tags,
|
||||
fields := Fields
|
||||
write_syntax := Lines
|
||||
}
|
||||
}
|
||||
) ->
|
||||
|
@ -204,10 +201,7 @@ do_start_client(
|
|||
true ->
|
||||
State = #{
|
||||
client => Client,
|
||||
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
|
||||
timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp),
|
||||
tags => to_tags_config(Tags),
|
||||
fields => to_fields_config(Fields)
|
||||
write_syntax => to_config(Lines)
|
||||
},
|
||||
?SLOG(info, #{
|
||||
msg => "starting influxdb connector success",
|
||||
|
@ -306,14 +300,23 @@ ssl_config(SSL = #{enable := true}) ->
|
|||
%% Query
|
||||
|
||||
do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client}) ->
|
||||
case data_to_point(Data, State) of
|
||||
{ok, Point} ->
|
||||
case influxdb:write(Client, [Point]) of
|
||||
{Points, Errs} = data_to_points(Data, State),
|
||||
lists:foreach(
|
||||
fun({error, Reason}) ->
|
||||
?SLOG(error, #{
|
||||
msg => "influxdb trans point failed",
|
||||
connector => InstId,
|
||||
reason => Reason
|
||||
})
|
||||
end,
|
||||
Errs
|
||||
),
|
||||
case influxdb:write(Client, Points) of
|
||||
ok ->
|
||||
?SLOG(debug, #{
|
||||
msg => "influxdb write point success",
|
||||
connector => InstId,
|
||||
point => Point
|
||||
points => Points
|
||||
}),
|
||||
emqx_resource:query_success(AfterQuery);
|
||||
{error, Reason} ->
|
||||
|
@ -323,29 +326,39 @@ do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client})
|
|||
reason => Reason
|
||||
}),
|
||||
emqx_resource:query_failed(AfterQuery)
|
||||
end;
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "influxdb trans point failed",
|
||||
connector => InstId,
|
||||
reason => Reason
|
||||
}),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Tags & Fields Config Trans
|
||||
|
||||
to_tags_config(Tags) ->
|
||||
maps:fold(fun to_maps_config/3, #{}, Tags).
|
||||
to_config(Lines) ->
|
||||
to_config(Lines, []).
|
||||
|
||||
to_fields_config(Fields) ->
|
||||
maps:fold(fun to_maps_config/3, #{}, Fields).
|
||||
to_config([], Acc) ->
|
||||
lists:reverse(Acc);
|
||||
to_config(
|
||||
[
|
||||
#{
|
||||
measurement := Measurement,
|
||||
timestamp := Timestamp,
|
||||
tags := Tags,
|
||||
fields := Fields
|
||||
}
|
||||
| Rest
|
||||
],
|
||||
Acc
|
||||
) ->
|
||||
Res = #{
|
||||
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
|
||||
timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp),
|
||||
tags => to_kv_config(Tags),
|
||||
fields => to_kv_config(Fields)
|
||||
},
|
||||
to_config(Rest, [Res | Acc]).
|
||||
|
||||
to_kv_config(KVfields) ->
|
||||
maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)).
|
||||
|
||||
to_maps_config(K, [IntType, V], Res) when IntType == <<"int">> orelse IntType == <<"uint">> ->
|
||||
NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)),
|
||||
NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)),
|
||||
Res#{NK => {binary_to_atom(IntType, utf8), NV}};
|
||||
to_maps_config(K, V, Res) ->
|
||||
NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)),
|
||||
NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)),
|
||||
|
@ -353,14 +366,24 @@ to_maps_config(K, V, Res) ->
|
|||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Tags & Fields Data Trans
|
||||
data_to_point(
|
||||
data_to_points(Data, #{write_syntax := Lines}) ->
|
||||
lines_to_points(Data, Lines, [], []).
|
||||
|
||||
lines_to_points(_, [], Points, Errs) ->
|
||||
{Points, Errs};
|
||||
lines_to_points(
|
||||
Data,
|
||||
[
|
||||
#{
|
||||
measurement := Measurement,
|
||||
timestamp := Timestamp,
|
||||
tags := Tags,
|
||||
fields := Fields
|
||||
}
|
||||
| Rest
|
||||
],
|
||||
ResAcc,
|
||||
ErrAcc
|
||||
) ->
|
||||
TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
||||
case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of
|
||||
|
@ -373,24 +396,11 @@ data_to_point(
|
|||
tags => EncodeTags,
|
||||
fields => EncodeFields
|
||||
},
|
||||
{ok, Point};
|
||||
lines_to_points(Data, Rest, [Point | ResAcc], ErrAcc);
|
||||
BadTimestamp ->
|
||||
{error, {bad_timestamp, BadTimestamp}}
|
||||
lines_to_points(Data, Rest, ResAcc, [{error, {bad_timestamp, BadTimestamp}} | ErrAcc])
|
||||
end.
|
||||
|
||||
maps_config_to_data(K, {IntType, V}, {Data, Res}) when IntType == int orelse IntType == uint ->
|
||||
KTransOptions = #{return => full_binary},
|
||||
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
||||
NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions),
|
||||
NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions),
|
||||
case {NK, NV} of
|
||||
{[undefined], _} ->
|
||||
{Data, Res};
|
||||
{_, [undefined]} ->
|
||||
{Data, Res};
|
||||
{_, [IntV]} when is_integer(IntV) ->
|
||||
{Data, Res#{NK => {IntType, IntV}}}
|
||||
end;
|
||||
maps_config_to_data(K, V, {Data, Res}) ->
|
||||
KTransOptions = #{return => full_binary},
|
||||
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
||||
|
@ -402,9 +412,38 @@ maps_config_to_data(K, V, {Data, Res}) ->
|
|||
{_, [undefined]} ->
|
||||
{Data, Res};
|
||||
_ ->
|
||||
{Data, Res#{NK => NV}}
|
||||
{Data, Res#{NK => value_type(NV)}}
|
||||
end.
|
||||
|
||||
value_type([Int, <<"i">>]) when
|
||||
is_integer(Int)
|
||||
->
|
||||
{int, Int};
|
||||
value_type([UInt, <<"u">>]) ->
|
||||
{uint, UInt};
|
||||
value_type([<<"t">>]) ->
|
||||
't';
|
||||
value_type([<<"T">>]) ->
|
||||
'T';
|
||||
value_type([<<"true">>]) ->
|
||||
'true';
|
||||
value_type([<<"TRUE">>]) ->
|
||||
'TRUE';
|
||||
value_type([<<"True">>]) ->
|
||||
'True';
|
||||
value_type([<<"f">>]) ->
|
||||
'f';
|
||||
value_type([<<"F">>]) ->
|
||||
'F';
|
||||
value_type([<<"false">>]) ->
|
||||
'false';
|
||||
value_type([<<"FALSE">>]) ->
|
||||
'FALSE';
|
||||
value_type([<<"False">>]) ->
|
||||
'False';
|
||||
value_type(Val) ->
|
||||
Val.
|
||||
|
||||
data_filter(undefined) -> undefined;
|
||||
data_filter(Int) when is_integer(Int) -> Int;
|
||||
data_filter(Number) when is_number(Number) -> Number;
|
||||
|
|
Loading…
Reference in New Issue