feat: support batch_size on iotdb

This commit is contained in:
zhongwencool 2024-04-19 14:52:56 +08:00
parent c0521fd250
commit b163a87386
4 changed files with 167 additions and 67 deletions

View File

@ -66,12 +66,7 @@ fields(action_config) ->
] ]
); );
fields(action_resource_opts) -> fields(action_resource_opts) ->
lists:filter( emqx_bridge_v2_schema:action_resource_opts_fields();
fun({K, _V}) ->
not lists:member(K, unsupported_opts())
end,
emqx_bridge_v2_schema:action_resource_opts_fields()
);
fields(action_parameters) -> fields(action_parameters) ->
[ [
{is_aligned, {is_aligned,
@ -150,7 +145,7 @@ fields("get_bridge_v2") ->
fields("config") -> fields("config") ->
basic_config() ++ request_config(); basic_config() ++ request_config();
fields("creation_opts") -> fields("creation_opts") ->
proplists_without(unsupported_opts(), emqx_resource_schema:fields("creation_opts")); emqx_resource_schema:fields("creation_opts");
fields(auth_basic) -> fields(auth_basic) ->
[ [
{username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})}, {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})},
@ -268,12 +263,6 @@ resource_creation_opts() ->
)} )}
]. ].
unsupported_opts() ->
[
batch_size,
batch_time
].
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% v2 examples %% v2 examples
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------

View File

@ -21,6 +21,8 @@
on_get_status/2, on_get_status/2,
on_query/3, on_query/3,
on_query_async/4, on_query_async/4,
on_batch_query/3,
on_batch_query_async/4,
on_add_channel/4, on_add_channel/4,
on_remove_channel/3, on_remove_channel/3,
on_get_channels/1, on_get_channels/1,
@ -280,8 +282,8 @@ on_query(
state => emqx_utils:redact(State) state => emqx_utils:redact(State)
}), }),
case try_render_message(Req, IoTDBVsn, Channels) of case try_render_messages([Req], IoTDBVsn, Channels) of
{ok, IoTDBPayload} -> {ok, [IoTDBPayload]} ->
handle_response( handle_response(
emqx_bridge_http_connector:on_query( emqx_bridge_http_connector:on_query(
InstanceId, {ChannelId, IoTDBPayload}, State InstanceId, {ChannelId, IoTDBPayload}, State
@ -306,8 +308,8 @@ on_query_async(
send_message => Req, send_message => Req,
state => emqx_utils:redact(State) state => emqx_utils:redact(State)
}), }),
case try_render_message(Req, IoTDBVsn, Channels) of case try_render_messages([Req], IoTDBVsn, Channels) of
{ok, IoTDBPayload} -> {ok, [IoTDBPayload]} ->
ReplyFunAndArgs = ReplyFunAndArgs =
{ {
fun(Result) -> fun(Result) ->
@ -323,6 +325,71 @@ on_query_async(
Error Error
end. end.
on_batch_query_async(
InstId,
Requests,
Callback,
#{iotdb_version := IoTDBVsn, channels := Channels} = State
) ->
?tp(iotdb_bridge_on_batch_query_async, #{instance_id => InstId}),
[{ChannelId, _Message} | _] = Requests,
?SLOG(debug, #{
msg => "iotdb_bridge_on_query_batch_async_called",
instance_id => InstId,
send_message => Requests,
state => emqx_utils:redact(State)
}),
case try_render_messages(Requests, IoTDBVsn, Channels) of
{ok, IoTDBPayloads} ->
ReplyFunAndArgs =
{
fun(Result) ->
Response = handle_response(Result),
emqx_resource:apply_reply_fun(Callback, Response)
end,
[]
},
lists:map(
fun(IoTDBPayload) ->
emqx_bridge_http_connector:on_query_async(
InstId, {ChannelId, IoTDBPayload}, ReplyFunAndArgs, State
)
end,
IoTDBPayloads
);
Error ->
Error
end.
on_batch_query(
InstId,
[{ChannelId, _Message}] = Requests,
#{iotdb_version := IoTDBVsn, channels := Channels} = State
) ->
?tp(iotdb_bridge_on_batch_query, #{instance_id => InstId}),
?SLOG(debug, #{
msg => "iotdb_bridge_on_batch_query_called",
instance_id => InstId,
send_message => Requests,
state => emqx_utils:redact(State)
}),
case try_render_messages(Requests, IoTDBVsn, Channels) of
{ok, IoTDBPayloads} ->
lists:map(
fun(IoTDBPayload) ->
handle_response(
emqx_bridge_http_connector:on_query(
InstId, {ChannelId, IoTDBPayload}, State
)
)
end,
IoTDBPayloads
);
Error ->
Error
end.
on_add_channel( on_add_channel(
InstanceId, InstanceId,
#{iotdb_version := Version, channels := Channels} = OldState0, #{iotdb_version := Version, channels := Channels} = OldState0,
@ -576,11 +643,10 @@ convert_float(undefined) ->
make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) -> make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) ->
InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IoTDBVsn), Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IoTDBVsn),
{ok,
maps:merge(Rows, #{ maps:merge(Rows, #{
iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned, iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned,
iotdb_field_key(device_id, IoTDBVsn) => DeviceId iotdb_field_key(device_id, IoTDBVsn) => DeviceId
})}. }).
replace_dtypes(Rows0, IoTDBVsn) -> replace_dtypes(Rows0, IoTDBVsn) ->
{Types, Rows} = maps:take(dtypes, Rows0), {Types, Rows} = maps:take(dtypes, Rows0),
@ -720,14 +786,37 @@ preproc_data_template(DataList) ->
DataList DataList
). ).
try_render_message({ChannelId, Msg}, IoTDBVsn, Channels) -> try_render_messages([{ChannelId, _} | _] = Msgs, IoTDBVsn, Channels) ->
case maps:find(ChannelId, Channels) of case maps:find(ChannelId, Channels) of
{ok, Channel} -> {ok, Channel} ->
render_channel_message(Channel, IoTDBVsn, Msg); case do_render_message(Msgs, Channel, IoTDBVsn, #{}) of
RenderMsgs when is_map(RenderMsgs) ->
{ok,
lists:map(
fun({{DeviceId, IsAligned}, DataList}) ->
make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn)
end,
maps:to_list(RenderMsgs)
)};
Error ->
Error
end;
_ -> _ ->
{error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
end. end.
do_render_message([], _Channel, _IoTDBVsn, Acc) ->
Acc;
do_render_message([{_, Msg} | Msgs], Channel, IoTDBVsn, Acc) ->
case render_channel_message(Channel, IoTDBVsn, Msg) of
{ok, NewDataList, DeviceId, IsAligned} ->
Fun = fun(V) -> NewDataList ++ V end,
Acc1 = maps:update_with({DeviceId, IsAligned}, Fun, NewDataList, Acc),
do_render_message(Msgs, Channel, IoTDBVsn, Acc1);
Error ->
Error
end.
render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) -> render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) ->
Payloads = to_list(parse_payload(get_payload(Message))), Payloads = to_list(parse_payload(get_payload(Message))),
case device_id(Message, Payloads, Channel) of case device_id(Message, Payloads, Channel) of
@ -740,7 +829,7 @@ render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message)
DataTemplate -> DataTemplate ->
case proc_data(DataTemplate, Message, IoTDBVsn) of case proc_data(DataTemplate, Message, IoTDBVsn) of
{ok, DataList} -> {ok, DataList} ->
make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn); {ok, DataList, DeviceId, IsAligned};
Error -> Error ->
Error Error
end end

View File

@ -373,9 +373,9 @@ t_sync_query_aggregated(Config) ->
Payload = [ Payload = [
make_iotdb_payload(DeviceId, "temp", "INT32", "36", MS - 7000), make_iotdb_payload(DeviceId, "temp", "INT32", "36", MS - 7000),
make_iotdb_payload(DeviceId, "temp", "INT32", 37, MS - 6000), make_iotdb_payload(DeviceId, "temp", "INT32", 37, MS - 6000),
make_iotdb_payload(DeviceId, "temp", "INT32", 38.7, MS - 5000), make_iotdb_payload(DeviceId, "temp", "INT64", 38.7, MS - 5000),
make_iotdb_payload(DeviceId, "temp", "INT32", "39", integer_to_binary(MS - 4000)), make_iotdb_payload(DeviceId, "temp", "INT64", "39", integer_to_binary(MS - 4000)),
make_iotdb_payload(DeviceId, "temp", "INT32", "34", MS - 3000), make_iotdb_payload(DeviceId, "temp", "INT64", "34", MS - 3000),
make_iotdb_payload(DeviceId, "temp", "INT32", 33.7, MS - 2000), make_iotdb_payload(DeviceId, "temp", "INT32", 33.7, MS - 2000),
make_iotdb_payload(DeviceId, "temp", "INT32", 32, MS - 1000), make_iotdb_payload(DeviceId, "temp", "INT32", 32, MS - 1000),
%% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB
@ -417,13 +417,12 @@ t_sync_query_aggregated(Config) ->
#{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]}, #{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]},
emqx_utils_json:decode(ResultWeight) emqx_utils_json:decode(ResultWeight)
), ),
%% [FIXME] https://github.com/apache/iotdb/issues/12375
%% null don't seem to be supported by IoTDB insertTablet when 1.3.0
case ?config(iotdb_version, Config) of case ?config(iotdb_version, Config) of
?VSN_1_3_X -> ?VSN_1_3_X ->
ct:pal("waiting:~p ~p ~n, [DeviceId]", [DeviceId, MS + 1000]), skip;
timer:sleep(3600 * 1000 * 1000);
_ -> _ ->
ok
end,
%% check rest ts = MS + 1000 %% check rest ts = MS + 1000
CheckTime = integer_to_binary(MS + 1000), CheckTime = integer_to_binary(MS + 1000),
QueryRest = <<"select * from ", DeviceId/binary, " where time = ", CheckTime/binary>>, QueryRest = <<"select * from ", DeviceId/binary, " where time = ", CheckTime/binary>>,
@ -458,7 +457,8 @@ t_sync_query_aggregated(Config) ->
?assertMatch( ?assertMatch(
#{<<"values">> := [[36, 37, 38, 39, 34, 33, 32, 41]]}, #{<<"values">> := [[36, 37, 38, 39, 34, 33, 32, 41]]},
emqx_utils_json:decode(ResultTemp) emqx_utils_json:decode(ResultTemp)
), )
end,
ok. ok.
exp(Dev, M0) -> exp(Dev, M0) ->

View File

@ -1024,7 +1024,29 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) -> handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) ->
{ack, fun() -> ok end, #{}}; {ack, fun() -> ok end, #{}};
handle_query_async_result_pure(_Id, ok, _HasBeenSent) -> handle_query_async_result_pure(_Id, ok, _HasBeenSent) ->
{ack, fun() -> ok end, #{}}. {ack, fun() -> ok end, #{}};
handle_query_async_result_pure(Id, Results, HasBeenSent) when is_list(Results) ->
All = fun(L) ->
case L of
{ok, Pid} -> is_pid(Pid);
_ -> false
end
end,
case lists:all(All, Results) of
true ->
{ack, fun() -> ok end, #{}};
false ->
PostFn = fun() ->
?SLOG(error, #{
id => Id,
msg => "async_batch_send_error",
reason => Results,
has_been_sent => HasBeenSent
}),
ok
end,
{nack, PostFn, #{}}
end.
-spec aggregate_counters(data(), counters()) -> data(). -spec aggregate_counters(data(), counters()) -> data().
aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) -> aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) ->