Merge pull request #12695 from lafirest/fix/opents
perf(opents): Improve the message processing efficiency of opentsdb
This commit is contained in:
commit
afb7075f82
|
@ -292,50 +292,20 @@ try_render_messages([{ChannelId, _} | _] = BatchReq, Channels) ->
|
|||
render_channel_message(Msg, #{data := DataList}, Acc) ->
|
||||
RawOpts = #{return => rawlist, var_trans => fun(X) -> X end},
|
||||
lists:foldl(
|
||||
fun(#{metric := MetricTk, tags := TagsTk, value := ValueTk} = Data, InAcc) ->
|
||||
fun(
|
||||
#{
|
||||
metric := MetricTk,
|
||||
tags := TagsProcer,
|
||||
value := ValueProcer,
|
||||
timestamp := TimeProcer
|
||||
},
|
||||
InAcc
|
||||
) ->
|
||||
MetricVal = emqx_placeholder:proc_tmpl(MetricTk, Msg),
|
||||
|
||||
TagsVal =
|
||||
case TagsTk of
|
||||
[tags | TagTkList] ->
|
||||
maps:from_list([
|
||||
{
|
||||
emqx_placeholder:proc_tmpl(TagName, Msg),
|
||||
emqx_placeholder:proc_tmpl(TagValue, Msg)
|
||||
}
|
||||
|| {TagName, TagValue} <- TagTkList
|
||||
]);
|
||||
TagsTks ->
|
||||
case emqx_placeholder:proc_tmpl(TagsTks, Msg, RawOpts) of
|
||||
[undefined] ->
|
||||
#{};
|
||||
[Any] ->
|
||||
Any
|
||||
end
|
||||
end,
|
||||
|
||||
ValueVal =
|
||||
case ValueTk of
|
||||
[_] ->
|
||||
%% just one element, maybe is a variable or a plain text
|
||||
%% we should keep it as it is
|
||||
erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts));
|
||||
Tks when is_list(Tks) ->
|
||||
emqx_placeholder:proc_tmpl(Tks, Msg);
|
||||
Raw ->
|
||||
%% not a token list, just a raw value
|
||||
Raw
|
||||
end,
|
||||
Base = #{metric => MetricVal, tags => TagsVal, value => ValueVal},
|
||||
[
|
||||
case maps:get(timestamp, Data, undefined) of
|
||||
undefined ->
|
||||
Base;
|
||||
TimestampTk ->
|
||||
Base#{timestamp => emqx_placeholder:proc_tmpl(TimestampTk, Msg)}
|
||||
end
|
||||
| InAcc
|
||||
]
|
||||
TagsVal = TagsProcer(Msg, RawOpts),
|
||||
ValueVal = ValueProcer(Msg, RawOpts),
|
||||
Result = TimeProcer(Msg, #{metric => MetricVal, tags => TagsVal, value => ValueVal}),
|
||||
[Result | InAcc]
|
||||
end,
|
||||
Acc,
|
||||
DataList
|
||||
|
@ -345,41 +315,72 @@ preproc_data_template([]) ->
|
|||
preproc_data_template(emqx_bridge_opents:default_data_template());
|
||||
preproc_data_template(DataList) ->
|
||||
lists:map(
|
||||
fun(#{tags := Tags, value := Value} = Data) ->
|
||||
Data2 = maps:without([tags, value], Data),
|
||||
Template = maps:map(
|
||||
fun(_Key, Val) ->
|
||||
emqx_placeholder:preproc_tmpl(Val)
|
||||
end,
|
||||
Data2
|
||||
),
|
||||
|
||||
TagsTk =
|
||||
case Tags of
|
||||
Tmpl when is_binary(Tmpl) ->
|
||||
emqx_placeholder:preproc_tmpl(Tmpl);
|
||||
Map when is_map(Map) ->
|
||||
[
|
||||
tags
|
||||
| [
|
||||
{
|
||||
emqx_placeholder:preproc_tmpl(emqx_utils_conv:bin(TagName)),
|
||||
emqx_placeholder:preproc_tmpl(TagValue)
|
||||
}
|
||||
|| {TagName, TagValue} <- maps:to_list(Map)
|
||||
]
|
||||
]
|
||||
end,
|
||||
|
||||
ValueTk =
|
||||
case Value of
|
||||
Text when is_binary(Text) ->
|
||||
emqx_placeholder:preproc_tmpl(Text);
|
||||
Raw ->
|
||||
Raw
|
||||
end,
|
||||
|
||||
Template#{tags => TagsTk, value => ValueTk}
|
||||
fun(#{metric := Metric, tags := Tags, value := Value} = Data) ->
|
||||
TagsProcer = mk_tags_procer(Tags),
|
||||
ValueProcer = mk_value_procer(Value),
|
||||
#{
|
||||
metric => emqx_placeholder:preproc_tmpl(Metric),
|
||||
tags => TagsProcer,
|
||||
value => ValueProcer,
|
||||
timestamp => mk_timestamp_procer(Data)
|
||||
}
|
||||
end,
|
||||
DataList
|
||||
).
|
||||
|
||||
mk_tags_procer(Tmpl) when is_binary(Tmpl) ->
|
||||
TagsTks = emqx_placeholder:preproc_tmpl(Tmpl),
|
||||
fun(Msg, RawOpts) ->
|
||||
case emqx_placeholder:proc_tmpl(TagsTks, Msg, RawOpts) of
|
||||
[undefined] ->
|
||||
#{};
|
||||
[Any] ->
|
||||
Any
|
||||
end
|
||||
end;
|
||||
mk_tags_procer(Map) when is_map(Map) ->
|
||||
TagTkList = [
|
||||
{
|
||||
emqx_placeholder:preproc_tmpl(emqx_utils_conv:bin(TagName)),
|
||||
emqx_placeholder:preproc_tmpl(TagValue)
|
||||
}
|
||||
|| {TagName, TagValue} <- maps:to_list(Map)
|
||||
],
|
||||
fun(Msg, _RawOpts) ->
|
||||
maps:from_list([
|
||||
{
|
||||
emqx_placeholder:proc_tmpl(TagName, Msg),
|
||||
emqx_placeholder:proc_tmpl(TagValue, Msg)
|
||||
}
|
||||
|| {TagName, TagValue} <- TagTkList
|
||||
])
|
||||
end.
|
||||
|
||||
mk_value_procer(Text) when is_binary(Text) ->
|
||||
ValueTk = emqx_placeholder:preproc_tmpl(Text),
|
||||
case ValueTk of
|
||||
[_] ->
|
||||
%% just one element, maybe is a variable or a plain text
|
||||
%% we should keep it as it is
|
||||
fun(Msg, RawOpts) ->
|
||||
erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts))
|
||||
end;
|
||||
Tks when is_list(Tks) ->
|
||||
fun(Msg, _RawOpts) ->
|
||||
emqx_placeholder:proc_tmpl(Tks, Msg)
|
||||
end
|
||||
end;
|
||||
mk_value_procer(Raw) ->
|
||||
fun(_, _) ->
|
||||
Raw
|
||||
end.
|
||||
|
||||
mk_timestamp_procer(#{timestamp := Timestamp}) ->
|
||||
TimestampTk = emqx_placeholder:preproc_tmpl(Timestamp),
|
||||
fun(Msg, Base) ->
|
||||
Base#{timestamp => emqx_placeholder:proc_tmpl(TimestampTk, Msg)}
|
||||
end;
|
||||
mk_timestamp_procer(_) ->
|
||||
fun(_Msg, Base) ->
|
||||
Base
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue