From b163a873861c16f2f8f7a147d43d89f20290a310 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 19 Apr 2024 14:52:56 +0800 Subject: [PATCH] feat: support batch_size on iotdb --- .../src/emqx_bridge_iotdb.erl | 15 +-- .../src/emqx_bridge_iotdb_connector.erl | 113 ++++++++++++++++-- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 82 ++++++------- .../src/emqx_resource_buffer_worker.erl | 24 +++- 4 files changed, 167 insertions(+), 67 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index cbad8ca63..a2471fa3d 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -66,12 +66,7 @@ fields(action_config) -> ] ); fields(action_resource_opts) -> - lists:filter( - fun({K, _V}) -> - not lists:member(K, unsupported_opts()) - end, - emqx_bridge_v2_schema:action_resource_opts_fields() - ); + emqx_bridge_v2_schema:action_resource_opts_fields(); fields(action_parameters) -> [ {is_aligned, @@ -150,7 +145,7 @@ fields("get_bridge_v2") -> fields("config") -> basic_config() ++ request_config(); fields("creation_opts") -> - proplists_without(unsupported_opts(), emqx_resource_schema:fields("creation_opts")); + emqx_resource_schema:fields("creation_opts"); fields(auth_basic) -> [ {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})}, @@ -268,12 +263,6 @@ resource_creation_opts() -> )} ]. -unsupported_opts() -> - [ - batch_size, - batch_time - ]. - %%------------------------------------------------------------------------------------------------- %% v2 examples %%------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 4bde75967..d26b47f73 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -21,6 +21,8 @@ on_get_status/2, on_query/3, on_query_async/4, + on_batch_query/3, + on_batch_query_async/4, on_add_channel/4, on_remove_channel/3, on_get_channels/1, @@ -280,8 +282,8 @@ on_query( state => emqx_utils:redact(State) }), - case try_render_message(Req, IoTDBVsn, Channels) of - {ok, IoTDBPayload} -> + case try_render_messages([Req], IoTDBVsn, Channels) of + {ok, [IoTDBPayload]} -> handle_response( emqx_bridge_http_connector:on_query( InstanceId, {ChannelId, IoTDBPayload}, State @@ -306,8 +308,8 @@ on_query_async( send_message => Req, state => emqx_utils:redact(State) }), - case try_render_message(Req, IoTDBVsn, Channels) of - {ok, IoTDBPayload} -> + case try_render_messages([Req], IoTDBVsn, Channels) of + {ok, [IoTDBPayload]} -> ReplyFunAndArgs = { fun(Result) -> @@ -323,6 +325,71 @@ on_query_async( Error end. +on_batch_query_async( + InstId, + Requests, + Callback, + #{iotdb_version := IoTDBVsn, channels := Channels} = State +) -> + ?tp(iotdb_bridge_on_batch_query_async, #{instance_id => InstId}), + [{ChannelId, _Message} | _] = Requests, + ?SLOG(debug, #{ + msg => "iotdb_bridge_on_query_batch_async_called", + instance_id => InstId, + send_message => Requests, + state => emqx_utils:redact(State) + }), + case try_render_messages(Requests, IoTDBVsn, Channels) of + {ok, IoTDBPayloads} -> + ReplyFunAndArgs = + { + fun(Result) -> + Response = handle_response(Result), + emqx_resource:apply_reply_fun(Callback, Response) + end, + [] + }, + lists:map( + fun(IoTDBPayload) -> + emqx_bridge_http_connector:on_query_async( + InstId, {ChannelId, IoTDBPayload}, ReplyFunAndArgs, State + ) + end, + IoTDBPayloads + ); + Error -> + Error + end. + +on_batch_query( + InstId, + [{ChannelId, _Message}] = Requests, + #{iotdb_version := IoTDBVsn, channels := Channels} = State +) -> + ?tp(iotdb_bridge_on_batch_query, #{instance_id => InstId}), + ?SLOG(debug, #{ + msg => "iotdb_bridge_on_batch_query_called", + instance_id => InstId, + send_message => Requests, + state => emqx_utils:redact(State) + }), + + case try_render_messages(Requests, IoTDBVsn, Channels) of + {ok, IoTDBPayloads} -> + lists:map( + fun(IoTDBPayload) -> + handle_response( + emqx_bridge_http_connector:on_query( + InstId, {ChannelId, IoTDBPayload}, State + ) + ) + end, + IoTDBPayloads + ); + Error -> + Error + end. + on_add_channel( InstanceId, #{iotdb_version := Version, channels := Channels} = OldState0, @@ -576,11 +643,10 @@ convert_float(undefined) -> make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) -> InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IoTDBVsn), - {ok, - maps:merge(Rows, #{ - iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned, - iotdb_field_key(device_id, IoTDBVsn) => DeviceId - })}. + maps:merge(Rows, #{ + iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned, + iotdb_field_key(device_id, IoTDBVsn) => DeviceId + }). replace_dtypes(Rows0, IoTDBVsn) -> {Types, Rows} = maps:take(dtypes, Rows0), @@ -720,14 +786,37 @@ preproc_data_template(DataList) -> DataList ). -try_render_message({ChannelId, Msg}, IoTDBVsn, Channels) -> +try_render_messages([{ChannelId, _} | _] = Msgs, IoTDBVsn, Channels) -> case maps:find(ChannelId, Channels) of {ok, Channel} -> - render_channel_message(Channel, IoTDBVsn, Msg); + case do_render_message(Msgs, Channel, IoTDBVsn, #{}) of + RenderMsgs when is_map(RenderMsgs) -> + {ok, + lists:map( + fun({{DeviceId, IsAligned}, DataList}) -> + make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) + end, + maps:to_list(RenderMsgs) + )}; + Error -> + Error + end; _ -> {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} end. +do_render_message([], _Channel, _IoTDBVsn, Acc) -> + Acc; +do_render_message([{_, Msg} | Msgs], Channel, IoTDBVsn, Acc) -> + case render_channel_message(Channel, IoTDBVsn, Msg) of + {ok, NewDataList, DeviceId, IsAligned} -> + Fun = fun(V) -> NewDataList ++ V end, + Acc1 = maps:update_with({DeviceId, IsAligned}, Fun, NewDataList, Acc), + do_render_message(Msgs, Channel, IoTDBVsn, Acc1); + Error -> + Error + end. + render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) -> Payloads = to_list(parse_payload(get_payload(Message))), case device_id(Message, Payloads, Channel) of @@ -740,7 +829,7 @@ render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) DataTemplate -> case proc_data(DataTemplate, Message, IoTDBVsn) of {ok, DataList} -> - make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn); + {ok, DataList, DeviceId, IsAligned}; Error -> Error end diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index ce8cd01e8..d5661e2fe 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -373,9 +373,9 @@ t_sync_query_aggregated(Config) -> Payload = [ make_iotdb_payload(DeviceId, "temp", "INT32", "36", MS - 7000), make_iotdb_payload(DeviceId, "temp", "INT32", 37, MS - 6000), - make_iotdb_payload(DeviceId, "temp", "INT32", 38.7, MS - 5000), - make_iotdb_payload(DeviceId, "temp", "INT32", "39", integer_to_binary(MS - 4000)), - make_iotdb_payload(DeviceId, "temp", "INT32", "34", MS - 3000), + make_iotdb_payload(DeviceId, "temp", "INT64", 38.7, MS - 5000), + make_iotdb_payload(DeviceId, "temp", "INT64", "39", integer_to_binary(MS - 4000)), + make_iotdb_payload(DeviceId, "temp", "INT64", "34", MS - 3000), make_iotdb_payload(DeviceId, "temp", "INT32", 33.7, MS - 2000), make_iotdb_payload(DeviceId, "temp", "INT32", 32, MS - 1000), %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB @@ -417,48 +417,48 @@ t_sync_query_aggregated(Config) -> #{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]}, emqx_utils_json:decode(ResultWeight) ), + %% [FIXME] https://github.com/apache/iotdb/issues/12375 + %% null don't seem to be supported by IoTDB insertTablet when 1.3.0 case ?config(iotdb_version, Config) of ?VSN_1_3_X -> - ct:pal("waiting:~p ~p ~n, [DeviceId]", [DeviceId, MS + 1000]), - timer:sleep(3600 * 1000 * 1000); + skip; _ -> - ok - end, - %% check rest ts = MS + 1000 - CheckTime = integer_to_binary(MS + 1000), - QueryRest = <<"select * from ", DeviceId/binary, " where time = ", CheckTime/binary>>, - {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest), - #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode( - ResultRest - ), - Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)), - Exp = #{ - exp(DeviceId, "charged") => true, - exp(DeviceId, "floated") => true, - exp(DeviceId, "started") => true, - exp(DeviceId, "stoked") => true, - exp(DeviceId, "enriched") => true, - exp(DeviceId, "gutted") => true, - exp(DeviceId, "drained") => false, - exp(DeviceId, "toasted") => false, - exp(DeviceId, "uncharted") => false, - exp(DeviceId, "dazzled") => false, - exp(DeviceId, "unplugged") => false, - exp(DeviceId, "unraveled") => false, - exp(DeviceId, "undecided") => null, - exp(DeviceId, "foo") => <<"bar">>, - exp(DeviceId, "temp") => null, - exp(DeviceId, "weight") => null - }, - ?assertEqual(Exp, Results), + %% check rest ts = MS + 1000 + CheckTime = integer_to_binary(MS + 1000), + QueryRest = <<"select * from ", DeviceId/binary, " where time = ", CheckTime/binary>>, + {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest), + #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode( + ResultRest + ), + Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)), + Exp = #{ + exp(DeviceId, "charged") => true, + exp(DeviceId, "floated") => true, + exp(DeviceId, "started") => true, + exp(DeviceId, "stoked") => true, + exp(DeviceId, "enriched") => true, + exp(DeviceId, "gutted") => true, + exp(DeviceId, "drained") => false, + exp(DeviceId, "toasted") => false, + exp(DeviceId, "uncharted") => false, + exp(DeviceId, "dazzled") => false, + exp(DeviceId, "unplugged") => false, + exp(DeviceId, "unraveled") => false, + exp(DeviceId, "undecided") => null, + exp(DeviceId, "foo") => <<"bar">>, + exp(DeviceId, "temp") => null, + exp(DeviceId, "weight") => null + }, + ?assertEqual(Exp, Results), - %% check temp - QueryTemp = <<"select temp from ", DeviceId/binary, " where time > ", Time/binary>>, - {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp), - ?assertMatch( - #{<<"values">> := [[36, 37, 38, 39, 34, 33, 32, 41]]}, - emqx_utils_json:decode(ResultTemp) - ), + %% check temp + QueryTemp = <<"select temp from ", DeviceId/binary, " where time > ", Time/binary>>, + {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp), + ?assertMatch( + #{<<"values">> := [[36, 37, 38, 39, 34, 33, 32, 41]]}, + emqx_utils_json:decode(ResultTemp) + ) + end, ok. exp(Dev, M0) -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index bc1aea734..d1fc3081a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1024,7 +1024,29 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) -> {ack, fun() -> ok end, #{}}; handle_query_async_result_pure(_Id, ok, _HasBeenSent) -> - {ack, fun() -> ok end, #{}}. + {ack, fun() -> ok end, #{}}; +handle_query_async_result_pure(Id, Results, HasBeenSent) when is_list(Results) -> + All = fun(L) -> + case L of + {ok, Pid} -> is_pid(Pid); + _ -> false + end + end, + case lists:all(All, Results) of + true -> + {ack, fun() -> ok end, #{}}; + false -> + PostFn = fun() -> + ?SLOG(error, #{ + id => Id, + msg => "async_batch_send_error", + reason => Results, + has_been_sent => HasBeenSent + }), + ok + end, + {nack, PostFn, #{}} + end. -spec aggregate_counters(data(), counters()) -> data(). aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) ->