fix(opentsdb): Enhanced the type support for template data
This commit is contained in:
parent
83a8822798
commit
b44420c14f
|
@ -3,6 +3,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_bridge_opents).
|
-module(emqx_bridge_opents).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
-include_lib("hocon/include/hoconsc.hrl").
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
@ -156,12 +157,25 @@ fields(action_parameters_data) ->
|
||||||
binary(),
|
binary(),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC("config_parameters_tags")
|
desc => ?DESC("config_parameters_tags"),
|
||||||
|
validator => fun(Tmpl) ->
|
||||||
|
case emqx_placeholder:preproc_tmpl(Tmpl) of
|
||||||
|
[{var, _}] ->
|
||||||
|
true;
|
||||||
|
_ ->
|
||||||
|
?SLOG(warning, #{
|
||||||
|
msg => "invalid_tags_template",
|
||||||
|
path => "opents.parameters.data.tags",
|
||||||
|
data => Tmpl
|
||||||
|
}),
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{value,
|
{value,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
hoconsc:union([integer(), float(), binary()]),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC("config_parameters_value")
|
desc => ?DESC("config_parameters_value")
|
||||||
|
|
|
@ -304,9 +304,14 @@ render_channel_message(Msg, #{data := DataList}, Acc) ->
|
||||||
ValueVal =
|
ValueVal =
|
||||||
case ValueTk of
|
case ValueTk of
|
||||||
[_] ->
|
[_] ->
|
||||||
|
%% just one element, maybe is a variable or a plain text
|
||||||
|
%% we should keep it as it is
|
||||||
erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts));
|
erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts));
|
||||||
_ ->
|
Tks when is_list(Tks) ->
|
||||||
emqx_placeholder:proc_tmpl(ValueTk, Msg)
|
emqx_placeholder:proc_tmpl(ValueTk, Msg);
|
||||||
|
Raw ->
|
||||||
|
%% not a token list, just a raw value
|
||||||
|
Raw
|
||||||
end,
|
end,
|
||||||
Base = #{metric => MetricVal, tags => TagsVal, value => ValueVal},
|
Base = #{metric => MetricVal, tags => TagsVal, value => ValueVal},
|
||||||
[
|
[
|
||||||
|
@ -328,12 +333,20 @@ preproc_data_template([]) ->
|
||||||
preproc_data_template(DataList) ->
|
preproc_data_template(DataList) ->
|
||||||
lists:map(
|
lists:map(
|
||||||
fun(Data) ->
|
fun(Data) ->
|
||||||
maps:map(
|
{Value, Data2} = maps:take(value, Data),
|
||||||
fun(_Key, Value) ->
|
Template = maps:map(
|
||||||
emqx_placeholder:preproc_tmpl(Value)
|
fun(_Key, Val) ->
|
||||||
|
emqx_placeholder:preproc_tmpl(Val)
|
||||||
end,
|
end,
|
||||||
Data
|
Data2
|
||||||
)
|
),
|
||||||
|
|
||||||
|
case Value of
|
||||||
|
Text when is_binary(Text) ->
|
||||||
|
Template#{value => emqx_placeholder:preproc_tmpl(Text)};
|
||||||
|
Raw ->
|
||||||
|
Template#{value => Raw}
|
||||||
|
end
|
||||||
end,
|
end,
|
||||||
DataList
|
DataList
|
||||||
).
|
).
|
||||||
|
|
|
@ -83,7 +83,7 @@ init_per_testcase(TestCase, Config0) ->
|
||||||
{bridge_config, ActionConfig}
|
{bridge_config, ActionConfig}
|
||||||
| Config0
|
| Config0
|
||||||
],
|
],
|
||||||
%% iotdb_reset(Config),
|
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
||||||
ok = snabbkaffe:start_trace(),
|
ok = snabbkaffe:start_trace(),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
@ -253,3 +253,89 @@ t_query_invalid_data(Config) ->
|
||||||
ok = emqx_bridge_v2_testlib:t_sync_query(
|
ok = emqx_bridge_v2_testlib:t_sync_query(
|
||||||
Config, MakeMessageFun, fun is_error_check/1, opents_bridge_on_query
|
Config, MakeMessageFun, fun is_error_check/1, opents_bridge_on_query
|
||||||
).
|
).
|
||||||
|
|
||||||
|
t_tags_validator(Config) ->
|
||||||
|
%% Create without data configured
|
||||||
|
?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
emqx_bridge_v2_testlib:update_bridge_api(Config, #{
|
||||||
|
<<"parameters">> => #{
|
||||||
|
<<"data">> => [
|
||||||
|
#{
|
||||||
|
<<"metric">> => <<"${metric}">>,
|
||||||
|
<<"tags">> => <<"${tags}">>,
|
||||||
|
<<"value">> => <<"${payload.value}">>
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{error, _},
|
||||||
|
emqx_bridge_v2_testlib:update_bridge_api(Config, #{
|
||||||
|
<<"parameters">> => #{
|
||||||
|
<<"data">> => [
|
||||||
|
#{
|
||||||
|
<<"metric">> => <<"${metric}">>,
|
||||||
|
<<"tags">> => <<"text">>,
|
||||||
|
<<"value">> => <<"${payload.value}">>
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
).
|
||||||
|
|
||||||
|
t_raw_int_value(Config) ->
|
||||||
|
raw_value_test(<<"t_raw_int_value">>, 42, Config).
|
||||||
|
|
||||||
|
t_raw_float_value(Config) ->
|
||||||
|
raw_value_test(<<"t_raw_float_value">>, 42.5, Config).
|
||||||
|
|
||||||
|
raw_value_test(Metric, RawValue, Config) ->
|
||||||
|
?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
||||||
|
ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
|
||||||
|
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
|
||||||
|
?retry(
|
||||||
|
_Sleep = 1_000,
|
||||||
|
_Attempts = 10,
|
||||||
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
emqx_bridge_v2_testlib:update_bridge_api(Config, #{
|
||||||
|
<<"parameters">> => #{
|
||||||
|
<<"data">> => [
|
||||||
|
#{
|
||||||
|
<<"metric">> => <<"${metric}">>,
|
||||||
|
<<"tags">> => <<"${tags}">>,
|
||||||
|
<<"value">> => RawValue
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
),
|
||||||
|
|
||||||
|
Value = 12,
|
||||||
|
MakeMessageFun = fun() -> make_data(Metric, Value) end,
|
||||||
|
|
||||||
|
is_success_check(
|
||||||
|
emqx_resource:simple_sync_query(ResourceId, {BridgeId, MakeMessageFun()})
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric),
|
||||||
|
QResult = emqx_utils_json:decode(IoTDBResult),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"metric">> := Metric,
|
||||||
|
<<"dps">> := _
|
||||||
|
}
|
||||||
|
],
|
||||||
|
QResult
|
||||||
|
),
|
||||||
|
[#{<<"dps">> := Dps}] = QResult,
|
||||||
|
?assertMatch([RawValue | _], maps:values(Dps)).
|
||||||
|
|
|
@ -49,7 +49,7 @@ config_parameters_metric.label:
|
||||||
"""Metric"""
|
"""Metric"""
|
||||||
|
|
||||||
config_parameters_tags.desc:
|
config_parameters_tags.desc:
|
||||||
"""Data Type, Placeholders in format of ${var} is supported"""
|
"""Tags. Only supports with placeholder to extract tags from a variable"""
|
||||||
|
|
||||||
config_parameters_tags.label:
|
config_parameters_tags.label:
|
||||||
"""Tags"""
|
"""Tags"""
|
||||||
|
|
Loading…
Reference in New Issue