perf(opents): Improve the message processing efficiency of opentsdb

This commit is contained in:
firest 2024-03-13 21:06:52 +08:00
parent 20cd47ac89
commit b156e55430
1 changed files with 79 additions and 78 deletions

View File

@ -292,50 +292,20 @@ try_render_messages([{ChannelId, _} | _] = BatchReq, Channels) ->
render_channel_message(Msg, #{data := DataList}, Acc) ->
RawOpts = #{return => rawlist, var_trans => fun(X) -> X end},
lists:foldl(
fun(#{metric := MetricTk, tags := TagsTk, value := ValueTk} = Data, InAcc) ->
fun(
#{
metric := MetricTk,
tags := TagsProcer,
value := ValueProcer,
timestamp := TimeProcer
},
InAcc
) ->
MetricVal = emqx_placeholder:proc_tmpl(MetricTk, Msg),
TagsVal =
case TagsTk of
[tags | TagTkList] ->
maps:from_list([
{
emqx_placeholder:proc_tmpl(TagName, Msg),
emqx_placeholder:proc_tmpl(TagValue, Msg)
}
|| {TagName, TagValue} <- TagTkList
]);
TagsTks ->
case emqx_placeholder:proc_tmpl(TagsTks, Msg, RawOpts) of
[undefined] ->
#{};
[Any] ->
Any
end
end,
ValueVal =
case ValueTk of
[_] ->
%% just one element, maybe is a variable or a plain text
%% we should keep it as it is
erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts));
Tks when is_list(Tks) ->
emqx_placeholder:proc_tmpl(Tks, Msg);
Raw ->
%% not a token list, just a raw value
Raw
end,
Base = #{metric => MetricVal, tags => TagsVal, value => ValueVal},
[
case maps:get(timestamp, Data, undefined) of
undefined ->
Base;
TimestampTk ->
Base#{timestamp => emqx_placeholder:proc_tmpl(TimestampTk, Msg)}
end
| InAcc
]
TagsVal = TagsProcer(Msg, RawOpts),
ValueVal = ValueProcer(Msg, RawOpts),
Result = TimeProcer(Msg, #{metric => MetricVal, tags => TagsVal, value => ValueVal}),
[Result | InAcc]
end,
Acc,
DataList
@ -345,41 +315,72 @@ preproc_data_template([]) ->
preproc_data_template(emqx_bridge_opents:default_data_template());
preproc_data_template(DataList) ->
lists:map(
fun(#{tags := Tags, value := Value} = Data) ->
Data2 = maps:without([tags, value], Data),
Template = maps:map(
fun(_Key, Val) ->
emqx_placeholder:preproc_tmpl(Val)
end,
Data2
),
TagsTk =
case Tags of
Tmpl when is_binary(Tmpl) ->
emqx_placeholder:preproc_tmpl(Tmpl);
Map when is_map(Map) ->
[
tags
| [
{
emqx_placeholder:preproc_tmpl(emqx_utils_conv:bin(TagName)),
emqx_placeholder:preproc_tmpl(TagValue)
}
|| {TagName, TagValue} <- maps:to_list(Map)
]
]
end,
ValueTk =
case Value of
Text when is_binary(Text) ->
emqx_placeholder:preproc_tmpl(Text);
Raw ->
Raw
end,
Template#{tags => TagsTk, value => ValueTk}
fun(#{metric := Metric, tags := Tags, value := Value} = Data) ->
TagsProcer = mk_tags_procer(Tags),
ValueProcer = mk_value_procer(Value),
#{
metric => emqx_placeholder:preproc_tmpl(Metric),
tags => TagsProcer,
value => ValueProcer,
timestamp => mk_timestamp_procer(Data)
}
end,
DataList
).
mk_tags_procer(Tmpl) when is_binary(Tmpl) ->
TagsTks = emqx_placeholder:preproc_tmpl(Tmpl),
fun(Msg, RawOpts) ->
case emqx_placeholder:proc_tmpl(TagsTks, Msg, RawOpts) of
[undefined] ->
#{};
[Any] ->
Any
end
end;
mk_tags_procer(Map) when is_map(Map) ->
TagTkList = [
{
emqx_placeholder:preproc_tmpl(emqx_utils_conv:bin(TagName)),
emqx_placeholder:preproc_tmpl(TagValue)
}
|| {TagName, TagValue} <- maps:to_list(Map)
],
fun(Msg, _RawOpts) ->
maps:from_list([
{
emqx_placeholder:proc_tmpl(TagName, Msg),
emqx_placeholder:proc_tmpl(TagValue, Msg)
}
|| {TagName, TagValue} <- TagTkList
])
end.
mk_value_procer(Text) when is_binary(Text) ->
ValueTk = emqx_placeholder:preproc_tmpl(Text),
case ValueTk of
[_] ->
%% just one element, maybe is a variable or a plain text
%% we should keep it as it is
fun(Msg, RawOpts) ->
erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts))
end;
Tks when is_list(Tks) ->
fun(Msg, _RawOpts) ->
emqx_placeholder:proc_tmpl(Tks, Msg)
end
end;
mk_value_procer(Raw) ->
fun(_, _) ->
Raw
end.
mk_timestamp_procer(#{timestamp := Timestamp}) ->
TimestampTk = emqx_placeholder:preproc_tmpl(Timestamp),
fun(Msg, Base) ->
Base#{timestamp => emqx_placeholder:proc_tmpl(TimestampTk, Msg)}
end;
mk_timestamp_procer(_) ->
fun(_Msg, Base) ->
Base
end.