From 0f1aaa65bc35dc928a75f5d3b2b332a15a272400 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 12 Jan 2024 17:52:31 +0800 Subject: [PATCH 1/2] fix(iotdb): move the `iot_version` into IoTDB connector --- .../src/emqx_bridge_iotdb.erl | 11 +--- .../src/emqx_bridge_iotdb_connector.erl | 52 +++++++++++++------ .../test/emqx_bridge_iotdb_impl_SUITE.erl | 10 ++-- 3 files changed, 41 insertions(+), 32 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 781eae4b6..562678a17 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -89,14 +89,6 @@ fields(action_parameters) -> desc => ?DESC("config_device_id") } )}, - {iotdb_version, - mk( - hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), - #{ - desc => ?DESC("config_iotdb_version"), - default => ?VSN_1_1_X - } - )}, {data, mk( array(ref(?MODULE, action_parameters_data)), @@ -310,8 +302,7 @@ action_values() -> } ], is_aligned => false, - device_id => <<"my_device">>, - iotdb_version => ?VSN_1_1_X + device_id => <<"my_device">> } }. diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 4286a59e4..75161bdc2 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -47,6 +47,7 @@ connect_timeout := pos_integer(), pool_type := random | hash, pool_size := pos_integer(), + iotdb_version := atom(), request => undefined | map(), atom() => _ }. @@ -57,6 +58,7 @@ connect_timeout := pos_integer(), pool_type := random | hash, channels := map(), + iotdb_version := atom(), request => undefined | map(), atom() => _ }. @@ -88,6 +90,7 @@ connector_example_values() -> name => <<"iotdb_connector">>, type => iotdb, enable => true, + iotdb_version => ?VSN_1_1_X, authentication => #{ <<"username">> => <<"root">>, <<"password">> => <<"*****">> @@ -121,6 +124,14 @@ fields("connection_fields") -> desc => ?DESC(emqx_bridge_iotdb, "config_base_url") } )}, + {iotdb_version, + mk( + hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), + #{ + desc => ?DESC(emqx_bridge_iotdb, "iotdb_version"), + default => ?VSN_1_1_X + } + )}, {authentication, mk( hoconsc:union([ref(?MODULE, auth_basic)]), @@ -190,7 +201,7 @@ proplists_without(Keys, List) -> callback_mode() -> async_if_possible. -spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). -on_start(InstanceId, Config) -> +on_start(InstanceId, #{iotdb_version := Version} = Config) -> %% [FIXME] The configuration passed in here is pre-processed and transformed %% in emqx_bridge_resource:parse_confs/2. case emqx_bridge_http_connector:on_start(InstanceId, Config) of @@ -201,7 +212,7 @@ on_start(InstanceId, Config) -> request => maps:get(request, State, <<>>) }), ?tp(iotdb_bridge_started, #{instance_id => InstanceId}), - {ok, State#{channels => #{}}}; + {ok, State#{iotdb_version => Version, channels => #{}}}; {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_start_iotdb_bridge", @@ -231,7 +242,11 @@ on_get_status(InstanceId, State) -> {ok, pos_integer(), [term()], term()} | {ok, pos_integer(), [term()]} | {error, term()}. -on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = State) -> +on_query( + InstanceId, + {ChannelId, _Message} = Req, + #{iotdb_version := IoTDBVsn, channels := Channels} = State +) -> ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}), ?SLOG(debug, #{ msg => "iotdb_bridge_on_query_called", @@ -240,7 +255,7 @@ on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = Stat state => emqx_utils:redact(State) }), - case try_render_message(Req, Channels) of + case try_render_message(Req, IoTDBVsn, Channels) of {ok, IoTDBPayload} -> handle_response( emqx_bridge_http_connector:on_query( @@ -254,7 +269,10 @@ on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = Stat -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) -> {ok, pid()} | {error, empty_request}. on_query_async( - InstanceId, {ChannelId, _Message} = Req, ReplyFunAndArgs0, #{channels := Channels} = State + InstanceId, + {ChannelId, _Message} = Req, + ReplyFunAndArgs0, + #{iotdb_version := IoTDBVsn, channels := Channels} = State ) -> ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}), ?SLOG(debug, #{ @@ -263,7 +281,7 @@ on_query_async( send_message => Req, state => emqx_utils:redact(State) }), - case try_render_message(Req, Channels) of + case try_render_message(Req, IoTDBVsn, Channels) of {ok, IoTDBPayload} -> ReplyFunAndArgs = { @@ -282,10 +300,10 @@ on_query_async( on_add_channel( InstanceId, - #{channels := Channels} = OldState0, + #{iotdb_version := Version, channels := Channels} = OldState0, ChannelId, #{ - parameters := #{iotdb_version := Version, data := Data} = Parameter + parameters := #{data := Data} = Parameter } ) -> case maps:is_key(ChannelId, Channels) of @@ -495,18 +513,18 @@ convert_float(Str) when is_binary(Str) -> convert_float(undefined) -> null. -make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) -> +make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) -> InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, - Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), + Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IoTDBVsn), {ok, maps:merge(Rows, #{ - iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, - iotdb_field_key(device_id, IotDBVsn) => DeviceId + iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned, + iotdb_field_key(device_id, IoTDBVsn) => DeviceId })}. -replace_dtypes(Rows0, IotDBVsn) -> +replace_dtypes(Rows0, IoTDBVsn) -> {Types, Rows} = maps:take(dtypes, Rows0), - Rows#{iotdb_field_key(data_types, IotDBVsn) => Types}. + Rows#{iotdb_field_key(data_types, IoTDBVsn) => Types}. aggregate_rows(DataList, InitAcc) -> lists:foldr( @@ -645,15 +663,15 @@ preproc_data_template(DataList) -> DataList ). -try_render_message({ChannelId, Msg}, Channels) -> +try_render_message({ChannelId, Msg}, IoTDBVsn, Channels) -> case maps:find(ChannelId, Channels) of {ok, Channel} -> - render_channel_message(Channel, Msg); + render_channel_message(Channel, IoTDBVsn, Msg); _ -> {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} end. -render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) -> +render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) -> Payloads = to_list(parse_payload(get_payload(Message))), case device_id(Message, Payloads, Channel) of undefined -> diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 6b9af7b9a..d2d5760e5 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -255,7 +255,6 @@ is_error_check(Reason) -> end. action_config(Name, Config) -> - Version = ?config(iotdb_version, Config), Type = ?config(bridge_type, Config), ConfigString = io_lib:format( @@ -263,15 +262,13 @@ action_config(Name, Config) -> " enable = true\n" " connector = \"~s\"\n" " parameters = {\n" - " iotdb_version = \"~s\"\n" " data = []\n" " }\n" "}\n", [ Type, Name, - Name, - Version + Name ] ), ct:pal("ActionConfig:~ts~n", [ConfigString]), @@ -281,12 +278,14 @@ connector_config(Name, Config) -> Host = ?config(bridge_host, Config), Port = ?config(bridge_port, Config), Type = ?config(bridge_type, Config), + Version = ?config(iotdb_version, Config), ServerURL = iotdb_server_url(Host, Port), ConfigString = io_lib:format( "connectors.~s.~s {\n" " enable = true\n" " base_url = \"~s\"\n" + " iotdb_version = \"~s\"\n" " authentication = {\n" " username = \"root\"\n" " password = \"root\"\n" @@ -295,7 +294,8 @@ connector_config(Name, Config) -> [ Type, Name, - ServerURL + ServerURL, + Version ] ), ct:pal("ConnectorConfig:~ts~n", [ConfigString]), From b15106c753de3bb42f12266efda5396c4aaed8c5 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 12 Jan 2024 18:49:55 +0800 Subject: [PATCH 2/2] fix(iotdb): robustify type verification 1. let the type is not case sensitive 2. return error if type is invalid --- .../src/emqx_bridge_iotdb_connector.erl | 101 ++++++++++-------- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 23 ++++ 2 files changed, 80 insertions(+), 44 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 75161bdc2..2f078864e 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -128,7 +128,7 @@ fields("connection_fields") -> mk( hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), #{ - desc => ?DESC(emqx_bridge_iotdb, "iotdb_version"), + desc => ?DESC(emqx_bridge_iotdb, "config_iotdb_version"), default => ?VSN_1_1_X } )}, @@ -422,25 +422,41 @@ proc_data(PreProcessedData, Msg) -> now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond), now_ns => NowNS }, - lists:map( - fun( - #{ - timestamp := TimestampTkn, - measurement := Measurement, - data_type := DataType0, - value := ValueTkn - } - ) -> - DataType = emqx_placeholder:proc_tmpl(DataType0, Msg), - #{ - timestamp => iot_timestamp(TimestampTkn, Msg, Nows), - measurement => emqx_placeholder:proc_tmpl(Measurement, Msg), - data_type => DataType, - value => proc_value(DataType, ValueTkn, Msg) - } - end, - PreProcessedData - ). + proc_data(PreProcessedData, Msg, Nows, []). + +proc_data( + [ + #{ + timestamp := TimestampTkn, + measurement := Measurement, + data_type := DataType0, + value := ValueTkn + } + | T + ], + Msg, + Nows, + Acc +) -> + DataType = list_to_binary( + string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg))) + ), + case proc_value(DataType, ValueTkn, Msg) of + {ok, Value} -> + proc_data(T, Msg, Nows, [ + #{ + timestamp => iot_timestamp(TimestampTkn, Msg, Nows), + measurement => emqx_placeholder:proc_tmpl(Measurement, Msg), + data_type => DataType, + value => Value + } + | Acc + ]); + Error -> + Error + end; +proc_data([], _Msg, _Nows, Acc) -> + {ok, lists:reverse(Acc)}. iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) -> Timestamp; @@ -459,16 +475,19 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) -> binary_to_integer(Timestamp). proc_value(<<"TEXT">>, ValueTkn, Msg) -> - case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of - <<"undefined">> -> null; - Val -> Val - end; + {ok, + case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of + <<"undefined">> -> null; + Val -> Val + end}; proc_value(<<"BOOLEAN">>, ValueTkn, Msg) -> - convert_bool(replace_var(ValueTkn, Msg)); + {ok, convert_bool(replace_var(ValueTkn, Msg))}; proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> -> - convert_int(replace_var(ValueTkn, Msg)); + {ok, convert_int(replace_var(ValueTkn, Msg))}; proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> -> - convert_float(replace_var(ValueTkn, Msg)). + {ok, convert_float(replace_var(ValueTkn, Msg))}; +proc_value(Type, _, _) -> + {error, {invalid_type, Type}}. replace_var(Tokens, Data) when is_list(Tokens) -> [Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}), @@ -630,9 +649,9 @@ eval_response_body(Body, Resp) -> preproc_data_template(DataList) -> Atom2Bin = fun - (Atom, Converter) when is_atom(Atom) -> - Converter(Atom); - (Bin, _) -> + (Atom) when is_atom(Atom) -> + erlang:atom_to_binary(Atom); + (Bin) -> Bin end, lists:map( @@ -645,18 +664,9 @@ preproc_data_template(DataList) -> } ) -> #{ - timestamp => emqx_placeholder:preproc_tmpl( - Atom2Bin(Timestamp, fun erlang:atom_to_binary/1) - ), + timestamp => emqx_placeholder:preproc_tmpl(Atom2Bin(Timestamp)), measurement => emqx_placeholder:preproc_tmpl(Measurement), - data_type => emqx_placeholder:preproc_tmpl( - Atom2Bin( - DataType, - fun(Atom) -> - erlang:list_to_binary(string:uppercase(erlang:atom_to_list(Atom))) - end - ) - ), + data_type => emqx_placeholder:preproc_tmpl(Atom2Bin(DataType)), value => emqx_placeholder:preproc_tmpl(Value) } end, @@ -681,9 +691,12 @@ render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) [] -> {error, invalid_data}; DataTemplate -> - DataList = proc_data(DataTemplate, Message), - - make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) + case proc_data(DataTemplate, Message) of + {ok, DataList} -> + make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn); + Error -> + Error + end end end. diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index d2d5760e5..8145faf33 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -646,6 +646,29 @@ t_template(Config) -> iotdb_reset(Config, TemplateDeviceId), ok. +t_sync_query_case(Config) -> + DeviceId = iotdb_device(Config), + Payload = make_iotdb_payload(DeviceId, "temp", "InT32", "36"), + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query + ), + Query = <<"select temp from ", DeviceId/binary>>, + {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query), + ?assertMatch( + #{<<"values">> := [[36]]}, + emqx_utils_json:decode(IoTDBResult) + ). + +t_sync_query_invalid_type(Config) -> + DeviceId = iotdb_device(Config), + Payload = make_iotdb_payload(DeviceId, "temp", "IxT32", "36"), + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), + IsInvalidType = fun(Result) -> ?assertMatch({error, {invalid_type, _}}, Result) end, + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query + ). + is_empty(null) -> true; is_empty([]) -> true; is_empty([[]]) -> true;