diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl index 119de1978..16513ac11 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl @@ -130,7 +130,7 @@ fields(action_parameters) -> array(ref(?MODULE, action_parameters_data)), #{ desc => ?DESC("action_parameters_data"), - default => <<"[]">> + default => [] } )} ]; @@ -154,22 +154,27 @@ fields(action_parameters_data) -> )}, {tags, mk( - binary(), + hoconsc:union([array(ref(?MODULE, action_parameters_data_tags)), binary()]), #{ required => true, desc => ?DESC("config_parameters_tags"), - validator => fun(Tmpl) -> - case emqx_placeholder:preproc_tmpl(Tmpl) of - [{var, _}] -> - true; - _ -> - ?SLOG(warning, #{ - msg => "invalid_tags_template", - path => "opents.parameters.data.tags", - data => Tmpl - }), - false - end + validator => fun + (Tmpl) when is_binary(Tmpl) -> + case emqx_placeholder:preproc_tmpl(Tmpl) of + [{var, _}] -> + true; + _ -> + ?SLOG(warning, #{ + msg => "invalid_tags_template", + path => "opents.parameters.data.tags", + data => Tmpl + }), + false + end; + ([_ | _] = Tags) when is_list(Tags) -> + true; + (_) -> + false end } )}, @@ -182,6 +187,25 @@ fields(action_parameters_data) -> } )} ]; +fields(action_parameters_data_tags) -> + [ + {tag, + mk( + binary(), + #{ + required => true, + desc => ?DESC("tags_tag") + } + )}, + {value, + mk( + binary(), + #{ + required => true, + desc => ?DESC("tags_value") + } + )} + ]; fields("post_bridge_v2") -> emqx_bridge_schema:type_and_name_fields(enum([opents])) ++ fields(action_config); fields("put_bridge_v2") -> @@ -197,6 +221,8 @@ desc(action_parameters) -> ?DESC("action_parameters"); desc(action_parameters_data) -> ?DESC("action_parameters_data"); +desc(action_parameters_data_tags) -> + ?DESC("action_parameters_data_tags"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."]; desc(_) -> diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index d71468d82..faa8c769c 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -294,13 +294,26 @@ render_channel_message(Msg, #{data := DataList}, Acc) -> lists:foldl( fun(#{metric := MetricTk, tags := TagsTk, value := ValueTk} = Data, InAcc) -> MetricVal = emqx_placeholder:proc_tmpl(MetricTk, Msg), + TagsVal = - case emqx_placeholder:proc_tmpl(TagsTk, Msg, RawOpts) of - [undefined] -> - #{}; - [Any] -> - Any + case TagsTk of + [tags | TagTkList] -> + maps:from_list([ + { + emqx_placeholder:proc_tmpl(TagName, Msg), + emqx_placeholder:proc_tmpl(TagValue, Msg) + } + || {TagName, TagValue} <- TagTkList + ]); + TagsTks -> + case emqx_placeholder:proc_tmpl(TagsTks, Msg, RawOpts) of + [undefined] -> + #{}; + [Any] -> + Any + end end, + ValueVal = case ValueTk of [_] -> @@ -308,7 +321,7 @@ render_channel_message(Msg, #{data := DataList}, Acc) -> %% we should keep it as it is erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts)); Tks when is_list(Tks) -> - emqx_placeholder:proc_tmpl(ValueTk, Msg); + emqx_placeholder:proc_tmpl(Tks, Msg); Raw -> %% not a token list, just a raw value Raw @@ -332,8 +345,8 @@ preproc_data_template([]) -> preproc_data_template(emqx_bridge_opents:default_data_template()); preproc_data_template(DataList) -> lists:map( - fun(Data) -> - {Value, Data2} = maps:take(value, Data), + fun(#{tags := Tags, value := Value} = Data) -> + Data2 = maps:without([tags, value], Data), Template = maps:map( fun(_Key, Val) -> emqx_placeholder:preproc_tmpl(Val) @@ -341,12 +354,32 @@ preproc_data_template(DataList) -> Data2 ), - case Value of - Text when is_binary(Text) -> - Template#{value => emqx_placeholder:preproc_tmpl(Text)}; - Raw -> - Template#{value => Raw} - end + TagsTk = + case Tags of + Tmpl when is_binary(Tmpl) -> + emqx_placeholder:preproc_tmpl(Tmpl); + List -> + [ + tags + | [ + { + emqx_placeholder:preproc_tmpl(TagName), + emqx_placeholder:preproc_tmpl(TagValue) + } + || #{tag := TagName, value := TagValue} <- List + ] + ] + end, + + ValueTk = + case Value of + Text when is_binary(Text) -> + emqx_placeholder:preproc_tmpl(Text); + Raw -> + Raw + end, + + Template#{tags => TagsTk, value => ValueTk} end, DataList ). diff --git a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl index e3e89d563..34b7901cc 100644 --- a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl +++ b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl @@ -294,6 +294,96 @@ t_raw_int_value(Config) -> t_raw_float_value(Config) -> raw_value_test(<<"t_raw_float_value">>, 42.5, Config). +t_list_tags(Config) -> + ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)), + ResourceId = emqx_bridge_v2_testlib:resource_id(Config), + BridgeId = emqx_bridge_v2_testlib:bridge_id(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 10, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + ?assertMatch( + {ok, _}, + emqx_bridge_v2_testlib:update_bridge_api(Config, #{ + <<"parameters">> => #{ + <<"data">> => [ + #{ + <<"metric">> => <<"${metric}">>, + <<"tags">> => [#{<<"tag">> => <<"host">>, <<"value">> => <<"valueA">>}], + value => <<"${value}">> + } + ] + } + }) + ), + + Metric = <<"t_list_tags">>, + Value = 12, + MakeMessageFun = fun() -> make_data(Metric, Value) end, + + is_success_check( + emqx_resource:simple_sync_query(ResourceId, {BridgeId, MakeMessageFun()}) + ), + + {ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric), + QResult = emqx_utils_json:decode(IoTDBResult), + ?assertMatch( + [ + #{ + <<"metric">> := Metric, + <<"tags">> := #{<<"host">> := <<"valueA">>} + } + ], + QResult + ). + +t_list_tags_with_var(Config) -> + ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)), + ResourceId = emqx_bridge_v2_testlib:resource_id(Config), + BridgeId = emqx_bridge_v2_testlib:bridge_id(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 10, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + ?assertMatch( + {ok, _}, + emqx_bridge_v2_testlib:update_bridge_api(Config, #{ + <<"parameters">> => #{ + <<"data">> => [ + #{ + <<"metric">> => <<"${metric}">>, + <<"tags">> => [#{<<"tag">> => <<"host">>, <<"value">> => <<"${value}">>}], + value => <<"${value}">> + } + ] + } + }) + ), + + Metric = <<"t_list_tags_with_var">>, + Value = 12, + MakeMessageFun = fun() -> make_data(Metric, Value) end, + + is_success_check( + emqx_resource:simple_sync_query(ResourceId, {BridgeId, MakeMessageFun()}) + ), + + {ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric), + QResult = emqx_utils_json:decode(IoTDBResult), + ?assertMatch( + [ + #{ + <<"metric">> := Metric, + <<"tags">> := #{<<"host">> := <<"12">>} + } + ], + QResult + ). + raw_value_test(Metric, RawValue, Config) -> ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)), ResourceId = emqx_bridge_v2_testlib:resource_id(Config), diff --git a/rel/i18n/emqx_bridge_opents.hocon b/rel/i18n/emqx_bridge_opents.hocon index ab2e82180..3a37e104c 100644 --- a/rel/i18n/emqx_bridge_opents.hocon +++ b/rel/i18n/emqx_bridge_opents.hocon @@ -49,7 +49,7 @@ config_parameters_metric.label: """Metric""" config_parameters_tags.desc: -"""Tags. Only supports with placeholder to extract tags from a variable""" +"""Tags. Only supports with placeholder to extract tags from a variable or a list of tags""" config_parameters_tags.label: """Tags""" @@ -60,4 +60,22 @@ config_parameters_value.desc: config_parameters_value.label: """Value""" +action_parameters_data_tags.desc: +"""OpenTSDB data tags""" + +action_parameters_data_tags.label: +"""Tags""" + +tags_tag.desc: +"""The name of this tag. Placeholders in format of ${var} is supported""" + +tags_tag.label: +"""Tag""" + +tags_value.desc: +"""The value of this tag. Placeholders in format of ${var} is supported""" + +tags_value.label: +"""Value""" + }