diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl index 7e490576f..119de1978 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl @@ -3,6 +3,7 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_opents). +-include_lib("emqx/include/logger.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -156,12 +157,25 @@ fields(action_parameters_data) -> binary(), #{ required => true, - desc => ?DESC("config_parameters_tags") + desc => ?DESC("config_parameters_tags"), + validator => fun(Tmpl) -> + case emqx_placeholder:preproc_tmpl(Tmpl) of + [{var, _}] -> + true; + _ -> + ?SLOG(warning, #{ + msg => "invalid_tags_template", + path => "opents.parameters.data.tags", + data => Tmpl + }), + false + end + end } )}, {value, mk( - binary(), + hoconsc:union([integer(), float(), binary()]), #{ required => true, desc => ?DESC("config_parameters_value") diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index e3fe9d6b4..d71468d82 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -304,9 +304,14 @@ render_channel_message(Msg, #{data := DataList}, Acc) -> ValueVal = case ValueTk of [_] -> + %% just one element, maybe is a variable or a plain text + %% we should keep it as it is erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts)); - _ -> - emqx_placeholder:proc_tmpl(ValueTk, Msg) + Tks when is_list(Tks) -> + emqx_placeholder:proc_tmpl(ValueTk, Msg); + Raw -> + %% not a token list, just a raw value + Raw end, Base = #{metric => MetricVal, tags => TagsVal, value => ValueVal}, [ @@ -328,12 +333,20 @@ preproc_data_template([]) -> preproc_data_template(DataList) -> lists:map( fun(Data) -> - maps:map( - fun(_Key, Value) -> - emqx_placeholder:preproc_tmpl(Value) + {Value, Data2} = maps:take(value, Data), + Template = maps:map( + fun(_Key, Val) -> + emqx_placeholder:preproc_tmpl(Val) end, - Data - ) + Data2 + ), + + case Value of + Text when is_binary(Text) -> + Template#{value => emqx_placeholder:preproc_tmpl(Text)}; + Raw -> + Template#{value => Raw} + end end, DataList ). diff --git a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl index f86ae6986..e3e89d563 100644 --- a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl +++ b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl @@ -83,7 +83,7 @@ init_per_testcase(TestCase, Config0) -> {bridge_config, ActionConfig} | Config0 ], - %% iotdb_reset(Config), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), ok = snabbkaffe:start_trace(), Config. @@ -253,3 +253,89 @@ t_query_invalid_data(Config) -> ok = emqx_bridge_v2_testlib:t_sync_query( Config, MakeMessageFun, fun is_error_check/1, opents_bridge_on_query ). + +t_tags_validator(Config) -> + %% Create without data configured + ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)), + + ?assertMatch( + {ok, _}, + emqx_bridge_v2_testlib:update_bridge_api(Config, #{ + <<"parameters">> => #{ + <<"data">> => [ + #{ + <<"metric">> => <<"${metric}">>, + <<"tags">> => <<"${tags}">>, + <<"value">> => <<"${payload.value}">> + } + ] + } + }) + ), + + ?assertMatch( + {error, _}, + emqx_bridge_v2_testlib:update_bridge_api(Config, #{ + <<"parameters">> => #{ + <<"data">> => [ + #{ + <<"metric">> => <<"${metric}">>, + <<"tags">> => <<"text">>, + <<"value">> => <<"${payload.value}">> + } + ] + } + }) + ). + +t_raw_int_value(Config) -> + raw_value_test(<<"t_raw_int_value">>, 42, Config). + +t_raw_float_value(Config) -> + raw_value_test(<<"t_raw_float_value">>, 42.5, Config). + +raw_value_test(Metric, RawValue, Config) -> + ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)), + ResourceId = emqx_bridge_v2_testlib:resource_id(Config), + BridgeId = emqx_bridge_v2_testlib:bridge_id(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 10, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + ?assertMatch( + {ok, _}, + emqx_bridge_v2_testlib:update_bridge_api(Config, #{ + <<"parameters">> => #{ + <<"data">> => [ + #{ + <<"metric">> => <<"${metric}">>, + <<"tags">> => <<"${tags}">>, + <<"value">> => RawValue + } + ] + } + }) + ), + + Value = 12, + MakeMessageFun = fun() -> make_data(Metric, Value) end, + + is_success_check( + emqx_resource:simple_sync_query(ResourceId, {BridgeId, MakeMessageFun()}) + ), + + {ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric), + QResult = emqx_utils_json:decode(IoTDBResult), + ?assertMatch( + [ + #{ + <<"metric">> := Metric, + <<"dps">> := _ + } + ], + QResult + ), + [#{<<"dps">> := Dps}] = QResult, + ?assertMatch([RawValue | _], maps:values(Dps)). diff --git a/rel/i18n/emqx_bridge_opents.hocon b/rel/i18n/emqx_bridge_opents.hocon index f5d2ade85..ab2e82180 100644 --- a/rel/i18n/emqx_bridge_opents.hocon +++ b/rel/i18n/emqx_bridge_opents.hocon @@ -49,7 +49,7 @@ config_parameters_metric.label: """Metric""" config_parameters_tags.desc: -"""Data Type, Placeholders in format of ${var} is supported""" +"""Tags. Only supports with placeholder to extract tags from a variable""" config_parameters_tags.label: """Tags"""