feat(opentsdb): supports more flexible tags schema

This commit is contained in:
firest 2024-01-30 17:34:42 +08:00
parent 35c4ef2ee2
commit 81f96f1a68
4 changed files with 196 additions and 29 deletions

View File

@ -130,7 +130,7 @@ fields(action_parameters) ->
array(ref(?MODULE, action_parameters_data)),
#{
desc => ?DESC("action_parameters_data"),
default => <<"[]">>
default => []
}
)}
];
@ -154,22 +154,27 @@ fields(action_parameters_data) ->
)},
{tags,
mk(
binary(),
hoconsc:union([array(ref(?MODULE, action_parameters_data_tags)), binary()]),
#{
required => true,
desc => ?DESC("config_parameters_tags"),
validator => fun(Tmpl) ->
case emqx_placeholder:preproc_tmpl(Tmpl) of
[{var, _}] ->
true;
_ ->
?SLOG(warning, #{
msg => "invalid_tags_template",
path => "opents.parameters.data.tags",
data => Tmpl
}),
false
end
validator => fun
(Tmpl) when is_binary(Tmpl) ->
case emqx_placeholder:preproc_tmpl(Tmpl) of
[{var, _}] ->
true;
_ ->
?SLOG(warning, #{
msg => "invalid_tags_template",
path => "opents.parameters.data.tags",
data => Tmpl
}),
false
end;
([_ | _] = Tags) when is_list(Tags) ->
true;
(_) ->
false
end
}
)},
@ -182,6 +187,25 @@ fields(action_parameters_data) ->
}
)}
];
fields(action_parameters_data_tags) ->
[
{tag,
mk(
binary(),
#{
required => true,
desc => ?DESC("tags_tag")
}
)},
{value,
mk(
binary(),
#{
required => true,
desc => ?DESC("tags_value")
}
)}
];
fields("post_bridge_v2") ->
emqx_bridge_schema:type_and_name_fields(enum([opents])) ++ fields(action_config);
fields("put_bridge_v2") ->
@ -197,6 +221,8 @@ desc(action_parameters) ->
?DESC("action_parameters");
desc(action_parameters_data) ->
?DESC("action_parameters_data");
desc(action_parameters_data_tags) ->
?DESC("action_parameters_data_tags");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."];
desc(_) ->

View File

@ -294,13 +294,26 @@ render_channel_message(Msg, #{data := DataList}, Acc) ->
lists:foldl(
fun(#{metric := MetricTk, tags := TagsTk, value := ValueTk} = Data, InAcc) ->
MetricVal = emqx_placeholder:proc_tmpl(MetricTk, Msg),
TagsVal =
case emqx_placeholder:proc_tmpl(TagsTk, Msg, RawOpts) of
[undefined] ->
#{};
[Any] ->
Any
case TagsTk of
[tags | TagTkList] ->
maps:from_list([
{
emqx_placeholder:proc_tmpl(TagName, Msg),
emqx_placeholder:proc_tmpl(TagValue, Msg)
}
|| {TagName, TagValue} <- TagTkList
]);
TagsTks ->
case emqx_placeholder:proc_tmpl(TagsTks, Msg, RawOpts) of
[undefined] ->
#{};
[Any] ->
Any
end
end,
ValueVal =
case ValueTk of
[_] ->
@ -308,7 +321,7 @@ render_channel_message(Msg, #{data := DataList}, Acc) ->
%% we should keep it as it is
erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts));
Tks when is_list(Tks) ->
emqx_placeholder:proc_tmpl(ValueTk, Msg);
emqx_placeholder:proc_tmpl(Tks, Msg);
Raw ->
%% not a token list, just a raw value
Raw
@ -332,8 +345,8 @@ preproc_data_template([]) ->
preproc_data_template(emqx_bridge_opents:default_data_template());
preproc_data_template(DataList) ->
lists:map(
fun(Data) ->
{Value, Data2} = maps:take(value, Data),
fun(#{tags := Tags, value := Value} = Data) ->
Data2 = maps:without([tags, value], Data),
Template = maps:map(
fun(_Key, Val) ->
emqx_placeholder:preproc_tmpl(Val)
@ -341,12 +354,32 @@ preproc_data_template(DataList) ->
Data2
),
case Value of
Text when is_binary(Text) ->
Template#{value => emqx_placeholder:preproc_tmpl(Text)};
Raw ->
Template#{value => Raw}
end
TagsTk =
case Tags of
Tmpl when is_binary(Tmpl) ->
emqx_placeholder:preproc_tmpl(Tmpl);
List ->
[
tags
| [
{
emqx_placeholder:preproc_tmpl(TagName),
emqx_placeholder:preproc_tmpl(TagValue)
}
|| #{tag := TagName, value := TagValue} <- List
]
]
end,
ValueTk =
case Value of
Text when is_binary(Text) ->
emqx_placeholder:preproc_tmpl(Text);
Raw ->
Raw
end,
Template#{tags => TagsTk, value => ValueTk}
end,
DataList
).

View File

@ -294,6 +294,96 @@ t_raw_int_value(Config) ->
t_raw_float_value(Config) ->
raw_value_test(<<"t_raw_float_value">>, 42.5, Config).
t_list_tags(Config) ->
?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
?retry(
_Sleep = 1_000,
_Attempts = 10,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
?assertMatch(
{ok, _},
emqx_bridge_v2_testlib:update_bridge_api(Config, #{
<<"parameters">> => #{
<<"data">> => [
#{
<<"metric">> => <<"${metric}">>,
<<"tags">> => [#{<<"tag">> => <<"host">>, <<"value">> => <<"valueA">>}],
value => <<"${value}">>
}
]
}
})
),
Metric = <<"t_list_tags">>,
Value = 12,
MakeMessageFun = fun() -> make_data(Metric, Value) end,
is_success_check(
emqx_resource:simple_sync_query(ResourceId, {BridgeId, MakeMessageFun()})
),
{ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric),
QResult = emqx_utils_json:decode(IoTDBResult),
?assertMatch(
[
#{
<<"metric">> := Metric,
<<"tags">> := #{<<"host">> := <<"valueA">>}
}
],
QResult
).
t_list_tags_with_var(Config) ->
?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
?retry(
_Sleep = 1_000,
_Attempts = 10,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
?assertMatch(
{ok, _},
emqx_bridge_v2_testlib:update_bridge_api(Config, #{
<<"parameters">> => #{
<<"data">> => [
#{
<<"metric">> => <<"${metric}">>,
<<"tags">> => [#{<<"tag">> => <<"host">>, <<"value">> => <<"${value}">>}],
value => <<"${value}">>
}
]
}
})
),
Metric = <<"t_list_tags_with_var">>,
Value = 12,
MakeMessageFun = fun() -> make_data(Metric, Value) end,
is_success_check(
emqx_resource:simple_sync_query(ResourceId, {BridgeId, MakeMessageFun()})
),
{ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric),
QResult = emqx_utils_json:decode(IoTDBResult),
?assertMatch(
[
#{
<<"metric">> := Metric,
<<"tags">> := #{<<"host">> := <<"12">>}
}
],
QResult
).
raw_value_test(Metric, RawValue, Config) ->
?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
ResourceId = emqx_bridge_v2_testlib:resource_id(Config),

View File

@ -49,7 +49,7 @@ config_parameters_metric.label:
"""Metric"""
config_parameters_tags.desc:
"""Tags. Only supports with placeholder to extract tags from a variable"""
"""Tags. Only supports with placeholder to extract tags from a variable or a list of tags"""
config_parameters_tags.label:
"""Tags"""
@ -60,4 +60,22 @@ config_parameters_value.desc:
config_parameters_value.label:
"""Value"""
action_parameters_data_tags.desc:
"""OpenTSDB data tags"""
action_parameters_data_tags.label:
"""Tags"""
tags_tag.desc:
"""The name of this tag. Placeholders in format of ${var} is supported"""
tags_tag.label:
"""Tag"""
tags_value.desc:
"""The value of this tag. Placeholders in format of ${var} is supported"""
tags_value.label:
"""Value"""
}