diff --git a/.ci/docker-compose-file/docker-compose-iotdb.yaml b/.ci/docker-compose-file/docker-compose-iotdb.yaml index 2a2b0e603..f5448a1ef 100644 --- a/.ci/docker-compose-file/docker-compose-iotdb.yaml +++ b/.ci/docker-compose-file/docker-compose-iotdb.yaml @@ -1,24 +1,53 @@ version: '3.9' services: - iotdb: - container_name: iotdb - hostname: iotdb - image: apache/iotdb:1.1.0-standalone + iotdb_1_3_0: + container_name: iotdb130 + hostname: iotdb130 + image: apache/iotdb:1.3.0-standalone restart: always environment: - enable_rest_service=true - - cn_internal_address=iotdb + - cn_internal_address=iotdb130 - cn_internal_port=10710 - cn_consensus_port=10720 - - cn_target_config_node_list=iotdb:10710 - - dn_rpc_address=iotdb - - dn_internal_address=iotdb + - cn_seed_config_node=iotdb130:10710 + - dn_rpc_address=iotdb130 + - dn_internal_address=iotdb130 - dn_rpc_port=6667 - dn_mpp_data_exchange_port=10740 - dn_schema_region_consensus_port=10750 - dn_data_region_consensus_port=10760 - - dn_target_config_node_list=iotdb:10710 + - dn_seed_config_node=iotdb130:10710 + # volumes: + # - ./data:/iotdb/data + # - ./logs:/iotdb/logs + expose: + - "18080" + # IoTDB's REST interface, uncomment for local testing + # ports: + # - "18080:18080" + networks: + - emqx_bridge + + iotdb_1_1_0: + container_name: iotdb110 + hostname: iotdb110 + image: apache/iotdb:1.1.0-standalone + restart: always + environment: + - enable_rest_service=true + - cn_internal_address=iotdb110 + - cn_internal_port=10710 + - cn_consensus_port=10720 + - cn_target_config_node_list=iotdb110:10710 + - dn_rpc_address=iotdb110 + - dn_internal_address=iotdb110 + - dn_rpc_port=6667 + - dn_mpp_data_exchange_port=10740 + - dn_schema_region_consensus_port=10750 + - dn_data_region_consensus_port=10760 + - dn_target_config_node_list=iotdb110:10710 # volumes: # - ./data:/iotdb/data # - ./logs:/iotdb/logs diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index 103bae924..a3c1dfbf4 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -139,9 +139,15 @@ "enabled": true }, { - "name": "iotdb", + "name": "iotdb110", "listen": "0.0.0.0:18080", - "upstream": "iotdb:18080", + "upstream": "iotdb110:18080", + "enabled": true + }, + { + "name": "iotdb130", + "listen": "0.0.0.0:28080", + "upstream": "iotdb130:18080", "enabled": true }, { diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 8a781d6e7..a3316e39d 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -705,7 +705,7 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> ), receive {result, Result} -> IsSuccessCheck(Result) - after 5_000 -> + after 8_000 -> throw(timeout) end, ok. diff --git a/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl b/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl index 8ce7bce6d..6cf0c5508 100644 --- a/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl +++ b/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl @@ -5,6 +5,8 @@ -ifndef(EMQX_BRIDGE_IOTDB_HRL). -define(EMQX_BRIDGE_IOTDB_HRL, true). +-define(VSN_1_3_X, 'v1.3.x'). +-define(VSN_1_2_X, 'v1.2.x'). -define(VSN_1_1_X, 'v1.1.x'). -define(VSN_1_0_X, 'v1.0.x'). -define(VSN_0_13_X, 'v0.13.x'). diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 599be842a..19c1b6320 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -66,12 +66,7 @@ fields(action_config) -> ] ); fields(action_resource_opts) -> - lists:filter( - fun({K, _V}) -> - not lists:member(K, unsupported_opts()) - end, - emqx_bridge_v2_schema:action_resource_opts_fields() - ); + emqx_bridge_v2_schema:action_resource_opts_fields(); fields(action_parameters) -> [ {is_aligned, @@ -152,7 +147,7 @@ fields("get_bridge_v2") -> fields("config") -> basic_config() ++ request_config(); fields("creation_opts") -> - proplists_without(unsupported_opts(), emqx_resource_schema:fields("creation_opts")); + emqx_resource_schema:fields("creation_opts"); fields(auth_basic) -> [ {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})}, @@ -222,10 +217,10 @@ basic_config() -> )}, {iotdb_version, mk( - hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), + hoconsc:enum([?VSN_1_3_X, ?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), #{ desc => ?DESC("config_iotdb_version"), - default => ?VSN_1_1_X + default => ?VSN_1_3_X } )} ] ++ resource_creation_opts() ++ @@ -270,12 +265,6 @@ resource_creation_opts() -> )} ]. -unsupported_opts() -> - [ - batch_size, - batch_time - ]. - %%------------------------------------------------------------------------------------------------- %% v2 examples %%------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 92316f0cf..d26b47f73 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -21,6 +21,8 @@ on_get_status/2, on_query/3, on_query_async/4, + on_batch_query/3, + on_batch_query_async/4, on_add_channel/4, on_remove_channel/3, on_get_channels/1, @@ -94,7 +96,7 @@ connector_example_values() -> name => <<"iotdb_connector">>, type => iotdb, enable => true, - iotdb_version => ?VSN_1_1_X, + iotdb_version => ?VSN_1_3_X, authentication => #{ <<"username">> => <<"root">>, <<"password">> => <<"******">> @@ -133,10 +135,10 @@ fields("connection_fields") -> )}, {iotdb_version, mk( - hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), + hoconsc:enum([?VSN_1_3_X, ?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), #{ desc => ?DESC(emqx_bridge_iotdb, "config_iotdb_version"), - default => ?VSN_1_1_X + default => ?VSN_1_3_X } )}, {authentication, @@ -280,8 +282,8 @@ on_query( state => emqx_utils:redact(State) }), - case try_render_message(Req, IoTDBVsn, Channels) of - {ok, IoTDBPayload} -> + case try_render_messages([Req], IoTDBVsn, Channels) of + {ok, [IoTDBPayload]} -> handle_response( emqx_bridge_http_connector:on_query( InstanceId, {ChannelId, IoTDBPayload}, State @@ -306,8 +308,8 @@ on_query_async( send_message => Req, state => emqx_utils:redact(State) }), - case try_render_message(Req, IoTDBVsn, Channels) of - {ok, IoTDBPayload} -> + case try_render_messages([Req], IoTDBVsn, Channels) of + {ok, [IoTDBPayload]} -> ReplyFunAndArgs = { fun(Result) -> @@ -323,6 +325,71 @@ on_query_async( Error end. +on_batch_query_async( + InstId, + Requests, + Callback, + #{iotdb_version := IoTDBVsn, channels := Channels} = State +) -> + ?tp(iotdb_bridge_on_batch_query_async, #{instance_id => InstId}), + [{ChannelId, _Message} | _] = Requests, + ?SLOG(debug, #{ + msg => "iotdb_bridge_on_query_batch_async_called", + instance_id => InstId, + send_message => Requests, + state => emqx_utils:redact(State) + }), + case try_render_messages(Requests, IoTDBVsn, Channels) of + {ok, IoTDBPayloads} -> + ReplyFunAndArgs = + { + fun(Result) -> + Response = handle_response(Result), + emqx_resource:apply_reply_fun(Callback, Response) + end, + [] + }, + lists:map( + fun(IoTDBPayload) -> + emqx_bridge_http_connector:on_query_async( + InstId, {ChannelId, IoTDBPayload}, ReplyFunAndArgs, State + ) + end, + IoTDBPayloads + ); + Error -> + Error + end. + +on_batch_query( + InstId, + [{ChannelId, _Message}] = Requests, + #{iotdb_version := IoTDBVsn, channels := Channels} = State +) -> + ?tp(iotdb_bridge_on_batch_query, #{instance_id => InstId}), + ?SLOG(debug, #{ + msg => "iotdb_bridge_on_batch_query_called", + instance_id => InstId, + send_message => Requests, + state => emqx_utils:redact(State) + }), + + case try_render_messages(Requests, IoTDBVsn, Channels) of + {ok, IoTDBPayloads} -> + lists:map( + fun(IoTDBPayload) -> + handle_response( + emqx_bridge_http_connector:on_query( + InstId, {ChannelId, IoTDBPayload}, State + ) + ) + end, + IoTDBPayloads + ); + Error -> + Error + end. + on_add_channel( InstanceId, #{iotdb_version := Version, channels := Channels} = OldState0, @@ -342,6 +409,7 @@ on_add_channel( Path = case Version of ?VSN_1_1_X -> InsertTabletPathV2; + ?VSN_1_3_X -> InsertTabletPathV2; _ -> InsertTabletPathV1 end, @@ -442,14 +510,14 @@ maybe_preproc_tmpl(Value) when is_binary(Value) -> maybe_preproc_tmpl(Value) -> Value. -proc_data(PreProcessedData, Msg) -> +proc_data(PreProcessedData, Msg, IoTDBVsn) -> NowNS = erlang:system_time(nanosecond), Nows = #{ now_ms => erlang:convert_time_unit(NowNS, nanosecond, millisecond), now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond), now_ns => NowNS }, - proc_data(PreProcessedData, Msg, Nows, []). + proc_data(PreProcessedData, Msg, Nows, IoTDBVsn, []). proc_data( [ @@ -463,15 +531,16 @@ proc_data( ], Msg, Nows, + IotDbVsn, Acc ) -> DataType = list_to_binary( string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg))) ), try - proc_data(T, Msg, Nows, [ + proc_data(T, Msg, Nows, IotDbVsn, [ #{ - timestamp => iot_timestamp(TimestampTkn, Msg, Nows), + timestamp => iot_timestamp(IotDbVsn, TimestampTkn, Msg, Nows), measurement => emqx_placeholder:proc_tmpl(Measurement, Msg), data_type => DataType, value => proc_value(DataType, ValueTkn, Msg) @@ -485,23 +554,28 @@ proc_data( ?SLOG(debug, #{exception => Error, reason => Reason, stacktrace => Stacktrace}), {error, invalid_data} end; -proc_data([], _Msg, _Nows, Acc) -> +proc_data([], _Msg, _Nows, _IotDbVsn, Acc) -> {ok, lists:reverse(Acc)}. -iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) -> +iot_timestamp(_IotDbVsn, Timestamp, _, _) when is_integer(Timestamp) -> Timestamp; -iot_timestamp(TimestampTkn, Msg, Nows) -> - iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows). +iot_timestamp(IotDbVsn, TimestampTkn, Msg, Nows) -> + iot_timestamp(IotDbVsn, emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows). -iot_timestamp(<<"now_us">>, #{now_us := NowUs}) -> +%% > v1.3.0 don't allow write nanoseconds nor microseconds +iot_timestamp(?VSN_1_3_X, <<"now_us">>, #{now_ms := NowMs}) -> + NowMs; +iot_timestamp(?VSN_1_3_X, <<"now_ns">>, #{now_ms := NowMs}) -> + NowMs; +iot_timestamp(_IotDbVsn, <<"now_us">>, #{now_us := NowUs}) -> NowUs; -iot_timestamp(<<"now_ns">>, #{now_ns := NowNs}) -> +iot_timestamp(_IotDbVsn, <<"now_ns">>, #{now_ns := NowNs}) -> NowNs; -iot_timestamp(Timestamp, #{now_ms := NowMs}) when +iot_timestamp(_IotDbVsn, Timestamp, #{now_ms := NowMs}) when Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>> -> NowMs; -iot_timestamp(Timestamp, _) when is_binary(Timestamp) -> +iot_timestamp(_IotDbVsn, Timestamp, _) when is_binary(Timestamp) -> binary_to_integer(Timestamp). proc_value(<<"TEXT">>, ValueTkn, Msg) -> @@ -569,11 +643,10 @@ convert_float(undefined) -> make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) -> InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IoTDBVsn), - {ok, - maps:merge(Rows, #{ - iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned, - iotdb_field_key(device_id, IoTDBVsn) => DeviceId - })}. + maps:merge(Rows, #{ + iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned, + iotdb_field_key(device_id, IoTDBVsn) => DeviceId + }). replace_dtypes(Rows0, IoTDBVsn) -> {Types, Rows} = maps:take(dtypes, Rows0), @@ -633,18 +706,24 @@ insert_value(1, Data, [Value | Values]) -> insert_value(Index, Data, [Value | Values]) -> [[null | Value] | insert_value(Index - 1, Data, Values)]. +iotdb_field_key(is_aligned, ?VSN_1_3_X) -> + <<"is_aligned">>; iotdb_field_key(is_aligned, ?VSN_1_1_X) -> <<"is_aligned">>; iotdb_field_key(is_aligned, ?VSN_1_0_X) -> <<"is_aligned">>; iotdb_field_key(is_aligned, ?VSN_0_13_X) -> <<"isAligned">>; +iotdb_field_key(device_id, ?VSN_1_3_X) -> + <<"device">>; iotdb_field_key(device_id, ?VSN_1_1_X) -> <<"device">>; iotdb_field_key(device_id, ?VSN_1_0_X) -> <<"device">>; iotdb_field_key(device_id, ?VSN_0_13_X) -> <<"deviceId">>; +iotdb_field_key(data_types, ?VSN_1_3_X) -> + <<"data_types">>; iotdb_field_key(data_types, ?VSN_1_1_X) -> <<"data_types">>; iotdb_field_key(data_types, ?VSN_1_0_X) -> @@ -707,14 +786,37 @@ preproc_data_template(DataList) -> DataList ). -try_render_message({ChannelId, Msg}, IoTDBVsn, Channels) -> +try_render_messages([{ChannelId, _} | _] = Msgs, IoTDBVsn, Channels) -> case maps:find(ChannelId, Channels) of {ok, Channel} -> - render_channel_message(Channel, IoTDBVsn, Msg); + case do_render_message(Msgs, Channel, IoTDBVsn, #{}) of + RenderMsgs when is_map(RenderMsgs) -> + {ok, + lists:map( + fun({{DeviceId, IsAligned}, DataList}) -> + make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) + end, + maps:to_list(RenderMsgs) + )}; + Error -> + Error + end; _ -> {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} end. +do_render_message([], _Channel, _IoTDBVsn, Acc) -> + Acc; +do_render_message([{_, Msg} | Msgs], Channel, IoTDBVsn, Acc) -> + case render_channel_message(Channel, IoTDBVsn, Msg) of + {ok, NewDataList, DeviceId, IsAligned} -> + Fun = fun(V) -> NewDataList ++ V end, + Acc1 = maps:update_with({DeviceId, IsAligned}, Fun, NewDataList, Acc), + do_render_message(Msgs, Channel, IoTDBVsn, Acc1); + Error -> + Error + end. + render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) -> Payloads = to_list(parse_payload(get_payload(Message))), case device_id(Message, Payloads, Channel) of @@ -725,9 +827,9 @@ render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) [] -> {error, invalid_template}; DataTemplate -> - case proc_data(DataTemplate, Message) of + case proc_data(DataTemplate, Message, IoTDBVsn) of {ok, DataList} -> - make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn); + {ok, DataList, DeviceId, IsAligned}; Error -> Error end diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 693f16d05..d5661e2fe 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -20,14 +20,16 @@ all() -> [ - {group, plain}, + {group, iotdb110}, + {group, iotdb130}, {group, legacy} ]. groups() -> AllTCs = emqx_common_test_helpers:all(?MODULE), [ - {plain, AllTCs}, + {iotdb110, AllTCs}, + {iotdb130, AllTCs}, {legacy, AllTCs} ]. @@ -37,10 +39,15 @@ init_per_suite(Config) -> end_per_suite(Config) -> emqx_bridge_v2_testlib:end_per_suite(Config). -init_per_group(plain = Type, Config0) -> +init_per_group(Type, Config0) when Type =:= iotdb110 orelse Type =:= iotdb130 -> Host = os:getenv("IOTDB_PLAIN_HOST", "toxiproxy.emqx.net"), - Port = list_to_integer(os:getenv("IOTDB_PLAIN_PORT", "18080")), - ProxyName = "iotdb", + ProxyName = atom_to_list(Type), + {IotDbVersion, DefaultPort} = + case Type of + iotdb110 -> {?VSN_1_1_X, "18080"}; + iotdb130 -> {?VSN_1_3_X, "28080"} + end, + Port = list_to_integer(os:getenv("IOTDB_PLAIN_PORT", DefaultPort)), case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of true -> Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), @@ -48,7 +55,7 @@ init_per_group(plain = Type, Config0) -> {bridge_host, Host}, {bridge_port, Port}, {proxy_name, ProxyName}, - {iotdb_version, ?VSN_1_1_X}, + {iotdb_version, IotDbVersion}, {iotdb_rest_prefix, <<"/rest/v2/">>} | Config ]; @@ -87,7 +94,8 @@ init_per_group(_Group, Config) -> Config. end_per_group(Group, Config) when - Group =:= plain; + Group =:= iotdb110; + Group =:= iotdb130; Group =:= legacy -> emqx_bridge_v2_testlib:end_per_group(Config), @@ -245,7 +253,9 @@ iotdb_query(Config, Query) -> iotdb_request(Config, Path, Body, Opts). is_success_check({ok, 200, _, Body}) -> - ?assert(is_code(200, emqx_utils_json:decode(Body))). + ?assert(is_code(200, emqx_utils_json:decode(Body))); +is_success_check(Other) -> + throw(Other). is_code(Code, #{<<"code">> := Code}) -> true; is_code(_, _) -> false. @@ -359,89 +369,96 @@ t_async_query(Config) -> t_sync_query_aggregated(Config) -> DeviceId = iotdb_device(Config), + MS = erlang:system_time(millisecond) - 5000, Payload = [ - make_iotdb_payload(DeviceId, "temp", "INT32", "36", 1685112026290), - make_iotdb_payload(DeviceId, "temp", "INT32", 37, 1685112026291), - make_iotdb_payload(DeviceId, "temp", "INT32", 38.7, 1685112026292), - make_iotdb_payload(DeviceId, "temp", "INT32", "39", <<"1685112026293">>), - make_iotdb_payload(DeviceId, "temp", "INT64", "36", 1685112026294), - make_iotdb_payload(DeviceId, "temp", "INT64", 36, 1685112026295), - make_iotdb_payload(DeviceId, "temp", "INT64", 36.7, 1685112026296), - %% implicit 'now()' timestamp - make_iotdb_payload(DeviceId, "temp", "INT32", "40"), + make_iotdb_payload(DeviceId, "temp", "INT32", "36", MS - 7000), + make_iotdb_payload(DeviceId, "temp", "INT32", 37, MS - 6000), + make_iotdb_payload(DeviceId, "temp", "INT64", 38.7, MS - 5000), + make_iotdb_payload(DeviceId, "temp", "INT64", "39", integer_to_binary(MS - 4000)), + make_iotdb_payload(DeviceId, "temp", "INT64", "34", MS - 3000), + make_iotdb_payload(DeviceId, "temp", "INT32", 33.7, MS - 2000), + make_iotdb_payload(DeviceId, "temp", "INT32", 32, MS - 1000), %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB (make_iotdb_payload(DeviceId, "temp", "INT32", "41"))#{timestamp => <<"now_us">>}, - (make_iotdb_payload(DeviceId, "temp", "INT32", "42"))#{timestamp => <<"now_ns">>}, - make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", 1685112026290), - make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, 1685112026291), - make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, 1685112026292), - make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", 1685112026293), - make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, 1685112026294), - make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, 1685112026295), + make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", MS - 6000), + make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, MS - 5000), + make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, MS - 4000), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", MS - 3000), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, MS - 2000), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, MS - 1000), - make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", 1685112026300), - make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, 1685112026300), - make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, 1685112026300), - make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", 1685112026300), - make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", 1685112026300), - make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", 1685112026300), - make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", 1685112026300), - make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, 1685112026300), - make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, 1685112026300), - make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", 1685112026300), - make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", 1685112026300), - make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", 1685112026300), - make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, 1685112026300), + make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", MS + 1000), + make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, MS + 1000), + make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, MS + 1000), + make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", MS + 1000), + make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", MS + 1000), + make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", MS + 1000), + make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", MS + 1000), + make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, MS + 1000), + make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, MS + 1000), + make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", MS + 1000), + make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", MS + 1000), + make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", MS + 1000), + make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, MS + 1000), - make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300) + make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", MS + 1000) ], MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), ok = emqx_bridge_v2_testlib:t_sync_query( Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query ), - %% check temp - QueryTemp = <<"select temp from ", DeviceId/binary>>, - {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp), - ?assertMatch( - #{<<"values">> := [[36, 37, 38, 39, 36, 36, 36, 40, 41, 42]]}, - emqx_utils_json:decode(ResultTemp) - ), + Time = integer_to_binary(MS - 20000), %% check weight - QueryWeight = <<"select weight from ", DeviceId/binary>>, + QueryWeight = <<"select weight from ", DeviceId/binary, " where time > ", Time/binary>>, {ok, {{_, 200, _}, _, ResultWeight}} = iotdb_query(Config, QueryWeight), ?assertMatch( #{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]}, emqx_utils_json:decode(ResultWeight) ), - %% check rest ts = 1685112026300 - QueryRest = <<"select * from ", DeviceId/binary, " where time = 1685112026300">>, - {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest), - #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode( - ResultRest - ), - Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)), - Exp = #{ - exp(DeviceId, "charged") => true, - exp(DeviceId, "floated") => true, - exp(DeviceId, "started") => true, - exp(DeviceId, "stoked") => true, - exp(DeviceId, "enriched") => true, - exp(DeviceId, "gutted") => true, - exp(DeviceId, "drained") => false, - exp(DeviceId, "toasted") => false, - exp(DeviceId, "uncharted") => false, - exp(DeviceId, "dazzled") => false, - exp(DeviceId, "unplugged") => false, - exp(DeviceId, "unraveled") => false, - exp(DeviceId, "undecided") => null, - exp(DeviceId, "foo") => <<"bar">>, - exp(DeviceId, "temp") => null, - exp(DeviceId, "weight") => null - }, - ?assertEqual(Exp, Results), + %% [FIXME] https://github.com/apache/iotdb/issues/12375 + %% null don't seem to be supported by IoTDB insertTablet when 1.3.0 + case ?config(iotdb_version, Config) of + ?VSN_1_3_X -> + skip; + _ -> + %% check rest ts = MS + 1000 + CheckTime = integer_to_binary(MS + 1000), + QueryRest = <<"select * from ", DeviceId/binary, " where time = ", CheckTime/binary>>, + {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest), + #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode( + ResultRest + ), + Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)), + Exp = #{ + exp(DeviceId, "charged") => true, + exp(DeviceId, "floated") => true, + exp(DeviceId, "started") => true, + exp(DeviceId, "stoked") => true, + exp(DeviceId, "enriched") => true, + exp(DeviceId, "gutted") => true, + exp(DeviceId, "drained") => false, + exp(DeviceId, "toasted") => false, + exp(DeviceId, "uncharted") => false, + exp(DeviceId, "dazzled") => false, + exp(DeviceId, "unplugged") => false, + exp(DeviceId, "unraveled") => false, + exp(DeviceId, "undecided") => null, + exp(DeviceId, "foo") => <<"bar">>, + exp(DeviceId, "temp") => null, + exp(DeviceId, "weight") => null + }, + ?assertEqual(Exp, Results), + %% check temp + QueryTemp = <<"select temp from ", DeviceId/binary, " where time > ", Time/binary>>, + {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp), + ?assertMatch( + #{<<"values">> := [[36, 37, 38, 39, 34, 33, 32, 41]]}, + emqx_utils_json:decode(ResultTemp) + ) + end, ok. exp(Dev, M0) -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index bc1aea734..d1fc3081a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1024,7 +1024,29 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) -> {ack, fun() -> ok end, #{}}; handle_query_async_result_pure(_Id, ok, _HasBeenSent) -> - {ack, fun() -> ok end, #{}}. + {ack, fun() -> ok end, #{}}; +handle_query_async_result_pure(Id, Results, HasBeenSent) when is_list(Results) -> + All = fun(L) -> + case L of + {ok, Pid} -> is_pid(Pid); + _ -> false + end + end, + case lists:all(All, Results) of + true -> + {ack, fun() -> ok end, #{}}; + false -> + PostFn = fun() -> + ?SLOG(error, #{ + id => Id, + msg => "async_batch_send_error", + reason => Results, + has_been_sent => HasBeenSent + }), + ok + end, + {nack, PostFn, #{}} + end. -spec aggregate_counters(data(), counters()) -> data(). aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) ->