diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index af4cba951..e8bd30471 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -292,50 +292,20 @@ try_render_messages([{ChannelId, _} | _] = BatchReq, Channels) -> render_channel_message(Msg, #{data := DataList}, Acc) -> RawOpts = #{return => rawlist, var_trans => fun(X) -> X end}, lists:foldl( - fun(#{metric := MetricTk, tags := TagsTk, value := ValueTk} = Data, InAcc) -> + fun( + #{ + metric := MetricTk, + tags := TagsProcer, + value := ValueProcer, + timestamp := TimeProcer + }, + InAcc + ) -> MetricVal = emqx_placeholder:proc_tmpl(MetricTk, Msg), - - TagsVal = - case TagsTk of - [tags | TagTkList] -> - maps:from_list([ - { - emqx_placeholder:proc_tmpl(TagName, Msg), - emqx_placeholder:proc_tmpl(TagValue, Msg) - } - || {TagName, TagValue} <- TagTkList - ]); - TagsTks -> - case emqx_placeholder:proc_tmpl(TagsTks, Msg, RawOpts) of - [undefined] -> - #{}; - [Any] -> - Any - end - end, - - ValueVal = - case ValueTk of - [_] -> - %% just one element, maybe is a variable or a plain text - %% we should keep it as it is - erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts)); - Tks when is_list(Tks) -> - emqx_placeholder:proc_tmpl(Tks, Msg); - Raw -> - %% not a token list, just a raw value - Raw - end, - Base = #{metric => MetricVal, tags => TagsVal, value => ValueVal}, - [ - case maps:get(timestamp, Data, undefined) of - undefined -> - Base; - TimestampTk -> - Base#{timestamp => emqx_placeholder:proc_tmpl(TimestampTk, Msg)} - end - | InAcc - ] + TagsVal = TagsProcer(Msg, RawOpts), + ValueVal = ValueProcer(Msg, RawOpts), + Result = TimeProcer(Msg, #{metric => MetricVal, tags => TagsVal, value => ValueVal}), + [Result | InAcc] end, Acc, DataList @@ -345,41 +315,72 @@ preproc_data_template([]) -> preproc_data_template(emqx_bridge_opents:default_data_template()); preproc_data_template(DataList) -> lists:map( - fun(#{tags := Tags, value := Value} = Data) -> - Data2 = maps:without([tags, value], Data), - Template = maps:map( - fun(_Key, Val) -> - emqx_placeholder:preproc_tmpl(Val) - end, - Data2 - ), - - TagsTk = - case Tags of - Tmpl when is_binary(Tmpl) -> - emqx_placeholder:preproc_tmpl(Tmpl); - Map when is_map(Map) -> - [ - tags - | [ - { - emqx_placeholder:preproc_tmpl(emqx_utils_conv:bin(TagName)), - emqx_placeholder:preproc_tmpl(TagValue) - } - || {TagName, TagValue} <- maps:to_list(Map) - ] - ] - end, - - ValueTk = - case Value of - Text when is_binary(Text) -> - emqx_placeholder:preproc_tmpl(Text); - Raw -> - Raw - end, - - Template#{tags => TagsTk, value => ValueTk} + fun(#{metric := Metric, tags := Tags, value := Value} = Data) -> + TagsProcer = mk_tags_procer(Tags), + ValueProcer = mk_value_procer(Value), + #{ + metric => emqx_placeholder:preproc_tmpl(Metric), + tags => TagsProcer, + value => ValueProcer, + timestamp => mk_timestamp_procer(Data) + } end, DataList ). + +mk_tags_procer(Tmpl) when is_binary(Tmpl) -> + TagsTks = emqx_placeholder:preproc_tmpl(Tmpl), + fun(Msg, RawOpts) -> + case emqx_placeholder:proc_tmpl(TagsTks, Msg, RawOpts) of + [undefined] -> + #{}; + [Any] -> + Any + end + end; +mk_tags_procer(Map) when is_map(Map) -> + TagTkList = [ + { + emqx_placeholder:preproc_tmpl(emqx_utils_conv:bin(TagName)), + emqx_placeholder:preproc_tmpl(TagValue) + } + || {TagName, TagValue} <- maps:to_list(Map) + ], + fun(Msg, _RawOpts) -> + maps:from_list([ + { + emqx_placeholder:proc_tmpl(TagName, Msg), + emqx_placeholder:proc_tmpl(TagValue, Msg) + } + || {TagName, TagValue} <- TagTkList + ]) + end. + +mk_value_procer(Text) when is_binary(Text) -> + ValueTk = emqx_placeholder:preproc_tmpl(Text), + case ValueTk of + [_] -> + %% just one element, maybe is a variable or a plain text + %% we should keep it as it is + fun(Msg, RawOpts) -> + erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts)) + end; + Tks when is_list(Tks) -> + fun(Msg, _RawOpts) -> + emqx_placeholder:proc_tmpl(Tks, Msg) + end + end; +mk_value_procer(Raw) -> + fun(_, _) -> + Raw + end. + +mk_timestamp_procer(#{timestamp := Timestamp}) -> + TimestampTk = emqx_placeholder:preproc_tmpl(Timestamp), + fun(Msg, Base) -> + Base#{timestamp => emqx_placeholder:proc_tmpl(TimestampTk, Msg)} + end; +mk_timestamp_procer(_) -> + fun(_Msg, Base) -> + Base + end.