From d1b574a67e8814ec3dcade7199eb3e4ce647dd69 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 4 Jul 2024 01:47:30 +0200 Subject: [PATCH] perf(dslocal): Run heavy-duty operations in a temporary process --- .../src/emqx_ds_builtin_local.erl | 76 ++++++++++++++----- 1 file changed, 55 insertions(+), 21 deletions(-) diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl index 5fe6eb559..28e307832 100644 --- a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl @@ -46,6 +46,12 @@ shard_of_message/4 ]). +%% Internal exports: +-export([ + do_next/3, + do_delete_next/4 +]). + -export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]). -include_lib("emqx_utils/include/emqx_message.hrl"). @@ -295,19 +301,8 @@ update_iterator(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0 end. -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). -next(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, N) -> - ShardId = {DB, Shard}, - T0 = erlang:monotonic_time(microsecond), - Result = emqx_ds_storage_layer:next(ShardId, StorageIter0, N, current_timestamp(ShardId)), - T1 = erlang:monotonic_time(microsecond), - emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0), - case Result of - {ok, StorageIter, Batch} -> - Iter = Iter0#{?enc := StorageIter}, - {ok, Iter, Batch}; - Other -> - Other - end. +next(DB, Iter, N) -> + with_worker(do_next, [DB, Iter, N]). -spec get_delete_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [emqx_ds:ds_specific_delete_stream()]. @@ -347,7 +342,36 @@ make_delete_iterator(DB, ?delete_stream(Shard, InnerStream), TopicFilter, StartT -spec delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> emqx_ds:delete_next_result(emqx_ds:delete_iterator()). -delete_next(DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N) -> +delete_next(DB, Iter, Selector, N) -> + with_worker(do_delete_next, [DB, Iter, Selector, N]). + +%%================================================================================ +%% Internal exports +%%================================================================================ + +current_timestamp(ShardId) -> + emqx_ds_builtin_local_meta:current_timestamp(ShardId). + +-spec do_next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). +do_next(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, N) -> + ShardId = {DB, Shard}, + T0 = erlang:monotonic_time(microsecond), + Result = emqx_ds_storage_layer:next(ShardId, StorageIter0, N, current_timestamp(ShardId)), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0), + case Result of + {ok, StorageIter, Batch} -> + Iter = Iter0#{?enc := StorageIter}, + {ok, Iter, Batch}; + Other -> + Other + end. + +-spec do_delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> + emqx_ds:delete_next_result(emqx_ds:delete_iterator()). +do_delete_next( + DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N +) -> ShardId = {DB, Shard}, case emqx_ds_storage_layer:delete_next( @@ -362,13 +386,6 @@ delete_next(DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIte Error end. -%%================================================================================ -%% Internal exports -%%================================================================================ - -current_timestamp(ShardId) -> - emqx_ds_builtin_local_meta:current_timestamp(ShardId). - %%================================================================================ %% Internal functions %%================================================================================ @@ -380,3 +397,20 @@ timeus_to_timestamp(undefined) -> undefined; timeus_to_timestamp(TimestampUs) -> TimestampUs div 1000. + +with_worker(F, A) -> + Parent = self(), + Ref = make_ref(), + {_Pid, MRef} = spawn_opt( + fun() -> + Parent ! {Ref, apply(?MODULE, F, A)} + end, + [monitor, {min_heap_size, 10000}] + ), + receive + {Ref, Result} -> + erlang:demonitor(MRef, [flush]), + Result; + {'DOWN', MRef, _, _, _, Info} -> + {error, unrecoverable, Info} + end.