diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index ed3a93212..4229112d3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -30,9 +30,12 @@ drop_db/1, store_batch/3, get_streams/3, + get_delete_streams/3, make_iterator/4, + make_delete_iterator/4, update_iterator/3, next/3, + delete_next/4, node_of_shard/2, shard_of_message/3, maybe_set_myself_as_leader/2 @@ -50,11 +53,22 @@ do_next_v1/4, do_add_generation_v2/1, do_list_generations_with_lifetimes_v3/2, - do_drop_generation_v3/3 + do_drop_generation_v3/3, + do_get_delete_streams_v4/4, + do_make_delete_iterator_v4/5, + do_delete_next_v4/5 ]). -export_type([ - shard_id/0, builtin_db_opts/0, stream_v1/0, stream/0, iterator/0, message_id/0, batch/0 + shard_id/0, + builtin_db_opts/0, + stream_v1/0, + stream/0, + delete_stream/0, + iterator/0, + delete_iterator/0, + message_id/0, + batch/0 ]). -include_lib("emqx_utils/include/emqx_message.hrl"). @@ -86,9 +100,12 @@ }. -define(stream_v2(SHARD, INNER), [2, SHARD | INNER]). +-define(delete_stream(SHARD, INNER), [3, SHARD | INNER]). -opaque stream() :: nonempty_maybe_improper_list(). +-opaque delete_stream() :: nonempty_maybe_improper_list(). + -opaque iterator() :: #{ ?tag := ?IT, @@ -96,6 +113,13 @@ ?enc := emqx_ds_storage_layer:iterator() }. +-opaque delete_iterator() :: + #{ + ?tag := ?DELETE_IT, + ?shard := emqx_ds_replication_layer:shard_id(), + ?enc := emqx_ds_storage_layer:delete_iterator() + }. + -type message_id() :: emqx_ds:message_id(). -type batch() :: #{ @@ -193,6 +217,24 @@ get_streams(DB, TopicFilter, StartTime) -> Shards ). +-spec get_delete_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> + [delete_stream()]. +get_delete_streams(DB, TopicFilter, StartTime) -> + Shards = list_shards(DB), + lists:flatmap( + fun(Shard) -> + Node = node_of_shard(DB, Shard), + Streams = emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, StartTime), + lists:map( + fun(StorageLayerStream) -> + ?delete_stream(Shard, StorageLayerStream) + end, + Streams + ) + end, + Shards + ). + -spec make_iterator(emqx_ds:db(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). make_iterator(DB, Stream, TopicFilter, StartTime) -> @@ -205,6 +247,22 @@ make_iterator(DB, Stream, TopicFilter, StartTime) -> Err end. +-spec make_delete_iterator(emqx_ds:db(), delete_stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> + emqx_ds:make_delete_iterator_result(delete_iterator()). +make_delete_iterator(DB, Stream, TopicFilter, StartTime) -> + ?delete_stream(Shard, StorageStream) = Stream, + Node = node_of_shard(DB, Shard), + case + emqx_ds_proto_v4:make_delete_iterator( + Node, DB, Shard, StorageStream, TopicFilter, StartTime + ) + of + {ok, Iter} -> + {ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}}; + Err = {error, _} -> + Err + end. + -spec update_iterator( emqx_ds:db(), iterator(), @@ -249,6 +307,19 @@ next(DB, Iter0, BatchSize) -> Other end. +-spec delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> + emqx_ds:delete_next_result(delete_iterator()). +delete_next(DB, Iter0, Selector, BatchSize) -> + #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0} = Iter0, + Node = node_of_shard(DB, Shard), + case emqx_ds_proto_v4:delete_next(Node, DB, Shard, StorageIter0, Selector, BatchSize) of + {ok, StorageIter, NumDeleted} -> + Iter = Iter0#{?enc := StorageIter}, + {ok, Iter, NumDeleted}; + Other -> + Other + end. + -spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). node_of_shard(DB, Shard) -> case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of @@ -352,6 +423,17 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) -> do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime). +-spec do_make_delete_iterator_v4( + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:delete_stream(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + {ok, emqx_ds_storage_layer:delete_iterator()} | {error, _}. +do_make_delete_iterator_v4(DB, Shard, Stream, TopicFilter, StartTime) -> + emqx_ds_storage_layer:make_delete_iterator({DB, Shard}, Stream, TopicFilter, StartTime). + -spec do_update_iterator_v2( emqx_ds:db(), emqx_ds_replication_layer:shard_id(), @@ -374,6 +456,17 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) -> do_next_v1(DB, Shard, Iter, BatchSize) -> emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize). +-spec do_delete_next_v4( + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:delete_iterator(), + emqx_ds:delete_selector(), + pos_integer() +) -> + emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()). +do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) -> + emqx_ds_storage_layer:delete_next({DB, Shard}, Iter, Selector, BatchSize). + -spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}. do_add_generation_v2(DB) -> MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB), @@ -394,6 +487,13 @@ do_list_generations_with_lifetimes_v3(DB, ShardId) -> do_drop_generation_v3(DB, ShardId, GenId) -> emqx_ds_storage_layer:drop_generation({DB, ShardId}, GenId). +-spec do_get_delete_streams_v4( + emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() +) -> + [emqx_ds_storage_layer:delete_stream()]. +do_get_delete_streams_v4(DB, Shard, TopicFilter, StartTime) -> + emqx_ds_storage_layer:get_delete_streams({DB, Shard}, TopicFilter, StartTime). + %%================================================================================ %% Internal functions %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl index 42e72f258..b3a57d442 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -23,6 +23,7 @@ -define(STREAM, 1). -define(IT, 2). -define(BATCH, 3). +-define(DELETE_IT, 4). %% keys: -define(tag, 1). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index d265d8fec..1f4b3f6ca 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -30,9 +30,12 @@ drop/5, store_batch/4, get_streams/4, + get_delete_streams/4, make_iterator/5, + make_delete_iterator/5, update_iterator/4, next/4, + delete_next/5, post_creation_actions/1 ]). @@ -54,6 +57,7 @@ %% tags: -define(STREAM, 1). -define(IT, 2). +-define(DELETE_IT, 3). %% keys: -define(tag, 1). @@ -91,6 +95,8 @@ -type stream() :: emqx_ds_lts:msg_storage_key(). +-type delete_stream() :: emqx_ds_lts:msg_storage_key(). + -type iterator() :: #{ ?tag := ?IT, @@ -100,6 +106,15 @@ ?last_seen_key := binary() }. +-type delete_iterator() :: + #{ + ?tag := ?DELETE_IT, + ?topic_filter := emqx_ds:topic_filter(), + ?start_time := emqx_ds:time(), + ?storage_key := emqx_ds_lts:msg_storage_key(), + ?last_seen_key := binary() + }. + -define(COUNTER, emqx_ds_storage_bitfield_lts_counter). %% Limit on the number of wildcard levels in the learned topic trie: @@ -262,6 +277,15 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> emqx_ds_lts:match_topics(Trie, TopicFilter). +-spec get_delete_streams( + emqx_ds_storage_layer:shard_id(), + s(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> [delete_stream()]. +get_delete_streams(Shard, State, TopicFilter, StartTime) -> + get_streams(Shard, State, TopicFilter, StartTime). + -spec make_iterator( emqx_ds_storage_layer:shard_id(), s(), @@ -283,6 +307,27 @@ make_iterator( ?last_seen_key => <<>> }}. +-spec make_delete_iterator( + emqx_ds_storage_layer:shard_id(), + s(), + delete_stream(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> {ok, delete_iterator()}. +make_delete_iterator( + _Shard, _Data, StorageKey, TopicFilter, StartTime +) -> + %% Note: it's a good idea to keep the iterator structure lean, + %% since it can be stored on a remote node that could update its + %% code independently from us. + {ok, #{ + ?tag => ?DELETE_IT, + ?topic_filter => TopicFilter, + ?start_time => StartTime, + ?storage_key => StorageKey, + ?last_seen_key => <<>> + }}. + -spec update_iterator( emqx_ds_storage_layer:shard_id(), s(), @@ -319,6 +364,76 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, ?start_time := StartTime, ?storage_key := {TopicIndex, Varying} } = It, + #{ + it_handle := ITHandle, + keymapper := Keymapper, + filter := Filter + } = prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers), + try + put(?COUNTER, 0), + next_loop(ITHandle, Keymapper, Filter, SafeCutoffTime, It, [], BatchSize) + after + rocksdb:iterator_close(ITHandle), + erase(?COUNTER) + end. + +delete_next(_Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) -> + %% Compute safe cutoff time. + %% It's the point in time where the last complete epoch ends, so we need to know + %% the current time to compute it. + Now = emqx_message:timestamp_now(), + SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, + delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize). + +delete_next_until( + _Schema, + It = #{?tag := ?DELETE_IT, ?start_time := StartTime}, + SafeCutoffTime, + _Selector, + _BatchSize +) when + StartTime >= SafeCutoffTime +-> + %% We're in the middle of the current epoch, so we can't yet iterate over it. + %% It would be unsafe otherwise: messages can be stored in the current epoch + %% concurrently with iterating over it. They can end up earlier (in the iteration + %% order) due to the nature of keymapping, potentially causing us to miss them. + {ok, It, 0, 0}; +delete_next_until( + #s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, Selector, BatchSize +) -> + #{ + ?tag := ?DELETE_IT, + ?start_time := StartTime, + ?storage_key := {TopicIndex, Varying} + } = It, + #{it_handle := ITHandle} = + LoopContext0 = prepare_loop_context( + DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers + ), + try + put(?COUNTER, 0), + LoopContext = LoopContext0#{ + db => DB, + cf => CF, + safe_cutoff_time => SafeCutoffTime, + storage_iter => It, + deleted => 0, + iterated_over => 0, + selector => Selector, + remaining => BatchSize + }, + delete_next_loop(LoopContext) + after + rocksdb:iterator_close(ITHandle), + erase(?COUNTER) + end. + +%%================================================================================ +%% Internal functions +%%================================================================================ + +prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers) -> %% Make filter: Inequations = [ {'=', TopicIndex}, @@ -350,17 +465,11 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, {iterate_lower_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, LowerBound)}, {iterate_upper_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, UpperBound + 1)} ]), - try - put(?COUNTER, 0), - next_loop(ITHandle, Keymapper, Filter, SafeCutoffTime, It, [], BatchSize) - after - rocksdb:iterator_close(ITHandle), - erase(?COUNTER) - end. - -%%================================================================================ -%% Internal functions -%%================================================================================ + #{ + it_handle => ITHandle, + keymapper => Keymapper, + filter => Filter + }. next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {ok, It, lists:reverse(Acc)}; @@ -412,7 +521,108 @@ traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N) -> {0, It, Acc} end. --spec check_message(emqx_ds:time(), iterator(), emqx_types:message()) -> +delete_next_loop( + #{deleted := AccDel, iterated_over := AccIter, storage_iter := It, remaining := 0} +) -> + {ok, It, AccDel, AccIter}; +delete_next_loop(LoopContext0) -> + #{ + storage_iter := It0, + filter := Filter, + deleted := AccDel0, + iterated_over := AccIter0, + it_handle := ITHandle + } = LoopContext0, + inc_counter(), + #{?tag := ?DELETE_IT, ?last_seen_key := Key0} = It0, + case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of + overflow -> + {ok, It0, AccDel0, AccIter0}; + Key1 -> + %% assert + true = Key1 > Key0, + case rocksdb:iterator_move(ITHandle, {seek, Key1}) of + {ok, Key, Val} -> + {N, It, AccDel, AccIter} = + delete_traverse_interval(LoopContext0#{ + iterated_over := AccIter0 + 1, + current_key => Key, + current_val => Val + }), + delete_next_loop(LoopContext0#{ + iterated_over := AccIter, + deleted := AccDel, + remaining := N, + storage_iter := It + }); + {error, invalid_iterator} -> + {ok, It0, AccDel0, AccIter0} + end + end. + +delete_traverse_interval(LoopContext0) -> + #{ + storage_iter := It0, + current_key := Key, + current_val := Val, + filter := Filter, + safe_cutoff_time := Cutoff, + selector := Selector, + db := DB, + cf := CF, + deleted := AccDel0, + iterated_over := AccIter0, + remaining := Remaining0 + } = LoopContext0, + It = It0#{?last_seen_key := Key}, + case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of + true -> + Msg = deserialize(Val), + case check_message(Cutoff, It, Msg) of + true -> + case Selector(Msg) of + true -> + ok = rocksdb:delete(DB, CF, Key, _WriteOpts = []), + delete_traverse_interval1(LoopContext0#{ + deleted := AccDel0 + 1, + remaining := Remaining0 - 1 + }); + false -> + delete_traverse_interval1(LoopContext0#{remaining := Remaining0 - 1}) + end; + false -> + delete_traverse_interval1(LoopContext0); + overflow -> + {0, It0, AccDel0, AccIter0} + end; + false -> + {Remaining0, It, AccDel0, AccIter0} + end. + +delete_traverse_interval1(#{ + storage_iter := It, deleted := AccDel, iterated_over := AccIter, remaining := 0 +}) -> + {0, It, AccDel, AccIter}; +delete_traverse_interval1(LoopContext0) -> + #{ + it_handle := ITHandle, + deleted := AccDel, + iterated_over := AccIter, + storage_iter := It + } = LoopContext0, + inc_counter(), + case rocksdb:iterator_move(ITHandle, next) of + {ok, Key, Val} -> + delete_traverse_interval(LoopContext0#{ + iterated_over := AccIter + 1, + current_key := Key, + current_val := Val + }); + {error, invalid_iterator} -> + {0, It, AccDel, AccIter} + end. + +-spec check_message(emqx_ds:time(), iterator() | delete_iterator(), emqx_types:message()) -> true | false | overflow. check_message( Cutoff, @@ -430,6 +640,12 @@ check_message( #message{timestamp = Timestamp, topic = Topic} ) when Timestamp >= StartTime -> emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter); +check_message( + _Cutoff, + #{?tag := ?DELETE_IT, ?start_time := StartTime, ?topic_filter := TopicFilter}, + #message{timestamp = Timestamp, topic = Topic} +) when Timestamp >= StartTime -> + emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter); check_message(_Cutoff, _It, _Msg) -> false. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index e0bf1fa1b..0f38629d4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -23,9 +23,12 @@ drop_shard/1, store_batch/3, get_streams/3, + get_delete_streams/3, make_iterator/4, + make_delete_iterator/4, update_iterator/3, next/3, + delete_next/4, update_config/2, add_generation/1, list_generations_with_lifetimes/1, @@ -43,8 +46,10 @@ generation/0, cf_refs/0, stream/0, + delete_stream/0, stream_v1/0, iterator/0, + delete_iterator/0, shard_id/0, options/0, prototype/0, @@ -56,6 +61,7 @@ -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). -define(stream_v2(GENERATION, INNER), [GENERATION | INNER]). +-define(delete_stream(GENERATION, INNER), [GENERATION | INNER]). %%================================================================================ %% Type declarations @@ -67,6 +73,7 @@ %% tags: -define(STREAM, 1). -define(IT, 2). +-define(DELETE_IT, 3). %% keys: -define(tag, 1). @@ -94,7 +101,10 @@ %% Note: this might be stored permanently on a remote node. -opaque stream() :: nonempty_maybe_improper_list(gen_id(), term()). -%% Note: this might be stred permanently on a remote node. +%% Note: this might be stored permanently on a remote node. +-opaque delete_stream() :: stream(). + +%% Note: this might be stored permanently on a remote node. -opaque iterator() :: #{ ?tag := ?IT, @@ -102,6 +112,14 @@ ?enc := term() }. +%% Note: this might be stored permanently on a remote node. +-opaque delete_iterator() :: + #{ + ?tag := ?DELETE_IT, + ?generation := gen_id(), + ?enc := term() + }. + %%%% Generation: -define(GEN_KEY(GEN_ID), {generation, GEN_ID}). @@ -185,6 +203,11 @@ -callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(_Iterator). +-callback make_delete_iterator( + shard_id(), _Data, _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time() +) -> + emqx_ds:make_delete_iterator_result(_Iterator). + -callback next(shard_id(), _Data, Iter, pos_integer()) -> {ok, Iter, [emqx_types:message()]} | {error, _}. @@ -238,6 +261,29 @@ get_streams(Shard, TopicFilter, StartTime) -> Gens ). +-spec get_delete_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> + [delete_stream()]. +get_delete_streams(Shard, TopicFilter, StartTime) -> + Gens = generations_since(Shard, StartTime), + ?tp(get_streams_all_gens, #{gens => Gens}), + lists:flatmap( + fun(GenId) -> + ?tp(get_streams_get_gen, #{gen_id => GenId}), + case generation_get_safe(Shard, GenId) of + {ok, #{module := Mod, data := GenData}} -> + Streams = Mod:get_delete_streams(Shard, GenData, TopicFilter, StartTime), + [ + ?delete_stream(GenId, InnerStream) + || InnerStream <- Streams + ]; + {error, not_found} -> + %% race condition: generation was dropped before getting its streams? + [] + end + end, + Gens + ). + -spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). make_iterator( @@ -259,6 +305,27 @@ make_iterator( {error, end_of_stream} end. +-spec make_delete_iterator(shard_id(), delete_stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> + emqx_ds:make_delete_iterator_result(delete_iterator()). +make_delete_iterator( + Shard, ?delete_stream(GenId, Stream), TopicFilter, StartTime +) -> + case generation_get_safe(Shard, GenId) of + {ok, #{module := Mod, data := GenData}} -> + case Mod:make_delete_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of + {ok, Iter} -> + {ok, #{ + ?tag => ?DELETE_IT, + ?generation => GenId, + ?enc => Iter + }}; + {error, _} = Err -> + Err + end; + {error, not_found} -> + {error, end_of_stream} + end. + -spec update_iterator( shard_id(), iterator(), emqx_ds:message_key() ) -> @@ -306,6 +373,33 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch {ok, end_of_stream} end. +-spec delete_next(shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> + emqx_ds:delete_next_result(delete_iterator()). +delete_next( + Shard, + Iter = #{?tag := ?DELETE_IT, ?generation := GenId, ?enc := GenIter0}, + Selector, + BatchSize +) -> + case generation_get_safe(Shard, GenId) of + {ok, #{module := Mod, data := GenData}} -> + Current = generation_current(Shard), + case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of + {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current -> + %% This is a past generation. Storage layer won't write + %% any more messages here. The iterator reached the end: + %% the stream has been fully replayed. + {ok, end_of_stream}; + {ok, GenIter, NumDeleted, _IteratedOver} -> + {ok, Iter#{?enc := GenIter}, NumDeleted}; + Error = {error, _} -> + Error + end; + {error, not_found} -> + %% generation was possibly dropped by GC + {ok, end_of_stream} + end. + -spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok. update_config(ShardId, Options) -> gen_server:call(?REF(ShardId), {?FUNCTION_NAME, Options}, infinity). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 92918bb13..c9b5bad60 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -33,9 +33,12 @@ drop/5, store_batch/4, get_streams/4, + get_delete_streams/4, make_iterator/5, + make_delete_iterator/5, update_iterator/4, - next/4 + next/4, + delete_next/5 ]). %% internal exports: @@ -62,12 +65,20 @@ -record(stream, {}). +-record(delete_stream, {}). + -record(it, { topic_filter :: emqx_ds:topic_filter(), start_time :: emqx_ds:time(), last_seen_message_key = first :: binary() | first }). +-record(delete_it, { + topic_filter :: emqx_ds:topic_filter(), + start_time :: emqx_ds:time(), + last_seen_message_key = first :: binary() | first +}). + %%================================================================================ %% API funcions %%================================================================================ @@ -118,12 +129,21 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> get_streams(_Shard, _Data, _TopicFilter, _StartTime) -> [#stream{}]. +get_delete_streams(_Shard, _Data, _TopicFilter, _StartTime) -> + [#delete_stream{}]. + make_iterator(_Shard, _Data, #stream{}, TopicFilter, StartTime) -> {ok, #it{ topic_filter = TopicFilter, start_time = StartTime }}. +make_delete_iterator(_Shard, _Data, #delete_stream{}, TopicFilter, StartTime) -> + {ok, #delete_it{ + topic_filter = TopicFilter, + start_time = StartTime + }}. + update_iterator(_Shard, _Data, OldIter, DSKey) -> #it{ topic_filter = TopicFilter, @@ -151,6 +171,37 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> It = It0#it{last_seen_message_key = Key}, {ok, It, lists:reverse(Messages)}. +delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize) -> + #delete_it{ + topic_filter = TopicFilter, + start_time = StartTime, + last_seen_message_key = Key0 + } = It0, + {ok, ITHandle} = rocksdb:iterator(DB, CF, []), + Action = + case Key0 of + first -> + first; + _ -> + _ = rocksdb:iterator_move(ITHandle, Key0), + next + end, + {Key, {NumDeleted, NumIterated}} = do_delete_next( + TopicFilter, + StartTime, + DB, + CF, + ITHandle, + Action, + Selector, + BatchSize, + Key0, + {0, 0} + ), + rocksdb:iterator_close(ITHandle), + It = It0#delete_it{last_seen_message_key = Key}, + {ok, It, NumDeleted, NumIterated}. + %%================================================================================ %% Internal functions %%================================================================================ @@ -172,6 +223,65 @@ do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) -> {Key0, Acc} end. +%% TODO: use a context map... +do_delete_next(_, _, _, _, _, _, _, 0, Key, Acc) -> + {Key, Acc}; +do_delete_next( + TopicFilter, StartTime, DB, CF, IT, Action, Selector, NLeft, Key0, {AccDel, AccIter} +) -> + case rocksdb:iterator_move(IT, Action) of + {ok, Key, Blob} -> + Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob), + TopicWords = emqx_topic:words(Topic), + case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of + true -> + case Selector(Msg) of + true -> + ok = rocksdb:delete(DB, CF, Key, _WriteOpts = []), + do_delete_next( + TopicFilter, + StartTime, + DB, + CF, + IT, + next, + Selector, + NLeft - 1, + Key, + {AccDel + 1, AccIter + 1} + ); + false -> + do_delete_next( + TopicFilter, + StartTime, + DB, + CF, + IT, + next, + Selector, + NLeft - 1, + Key, + {AccDel, AccIter + 1} + ) + end; + false -> + do_delete_next( + TopicFilter, + StartTime, + DB, + CF, + IT, + next, + Selector, + NLeft, + Key, + {AccDel, AccIter + 1} + ) + end; + {error, invalid_iterator} -> + {Key0, {AccDel, AccIter}} + end. + %% @doc Generate a column family ID for the MQTT messages -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. data_cf(GenId) -> diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl index fcab12507..12612dace 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl @@ -27,10 +27,13 @@ next/5, update_iterator/5, add_generation/2, - - %% introduced in v3 list_generations_with_lifetimes/3, - drop_generation/4 + drop_generation/4, + + %% introduced in v4 + get_delete_streams/5, + make_delete_iterator/6, + delete_next/6 ]). %% behavior callbacks: @@ -114,10 +117,6 @@ update_iterator(Node, DB, Shard, OldIter, DSKey) -> add_generation(Node, DB) -> erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]). -%%-------------------------------------------------------------------------------- -%% Introduced in V3 -%%-------------------------------------------------------------------------------- - -spec list_generations_with_lifetimes( node(), emqx_ds:db(), @@ -139,9 +138,60 @@ list_generations_with_lifetimes(Node, DB, Shard) -> drop_generation(Node, DB, Shard, GenId) -> erpc:call(Node, emqx_ds_replication_layer, do_drop_generation_v3, [DB, Shard, GenId]). +%%-------------------------------------------------------------------------------- +%% Introduced in V4 +%%-------------------------------------------------------------------------------- + +-spec get_delete_streams( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + [emqx_ds_storage_layer:delete_stream()]. +get_delete_streams(Node, DB, Shard, TopicFilter, Time) -> + erpc:call(Node, emqx_ds_replication_layer, do_get_delete_streams_v4, [ + DB, Shard, TopicFilter, Time + ]). + +-spec make_delete_iterator( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:delete_stream(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + {ok, emqx_ds_storage_layer:delete_iterator()} | {error, _}. +make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> + erpc:call(Node, emqx_ds_replication_layer, do_make_delete_iterator_v4, [ + DB, Shard, Stream, TopicFilter, StartTime + ]). + +-spec delete_next( + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:delete_iterator(), + emqx_ds:delete_selector(), + pos_integer() +) -> + {ok, emqx_ds_storage_layer:delete_iterator(), non_neg_integer()} + | {ok, end_of_stream} + | {error, _}. +delete_next(Node, DB, Shard, Iter, Selector, BatchSize) -> + emqx_rpc:call( + Shard, + Node, + emqx_ds_replication_layer, + do_delete_next_v4, + [DB, Shard, Iter, Selector, BatchSize] + ). + %%================================================================================ %% behavior callbacks %%================================================================================ introduced_in() -> - "5.5.1". + "5.6.0". diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index a0dae0e6f..607545347 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -372,6 +372,48 @@ t_10_non_atomic_store_batch(_Config) -> ), ok. +t_smoke_delete_next(_Config) -> + DB = ?FUNCTION_NAME, + ?check_trace( + begin + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + StartTime = 0, + TopicFilter = [<<"foo">>, '#'], + Msgs = + [Msg1, _Msg2, Msg3] = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo">>, <<"2">>, 1), + message(<<"bar/bar">>, <<"3">>, 2) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + + [DStream] = emqx_ds:get_delete_streams(DB, TopicFilter, StartTime), + {ok, DIter0} = emqx_ds:make_delete_iterator(DB, DStream, TopicFilter, StartTime), + + Selector = fun(#message{topic = Topic}) -> + Topic == <<"foo">> + end, + {ok, DIter1, NumDeleted1} = delete(DB, DIter0, Selector, 1), + ?assertEqual(0, NumDeleted1), + {ok, DIter2, NumDeleted2} = delete(DB, DIter1, Selector, 1), + ?assertEqual(1, NumDeleted2), + + TopicFilterHash = ['#'], + [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilterHash, StartTime), + {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilterHash, StartTime), + {ok, _Iter, Batch} = iterate(DB, Iter0, 1), + ?assertEqual([Msg1, Msg3], [Msg || {_Key, Msg} <- Batch]), + + ok = emqx_ds:add_generation(DB), + + ?assertMatch({ok, end_of_stream}, emqx_ds:delete_next(DB, DIter2, Selector, 1)), + + ok + end, + [] + ), + ok. + t_drop_generation_with_never_used_iterator(_Config) -> %% This test checks how the iterator behaves when: %% 1) it's created at generation 1 and not consumed from. @@ -609,6 +651,21 @@ iterate(DB, It0, BatchSize, Acc) -> Ret end. +delete(DB, It, Selector, BatchSize) -> + delete(DB, It, Selector, BatchSize, 0). + +delete(DB, It0, Selector, BatchSize, Acc) -> + case emqx_ds:delete_next(DB, It0, Selector, BatchSize) of + {ok, It, 0} -> + {ok, It, Acc}; + {ok, It, NumDeleted} -> + delete(DB, It, BatchSize, Selector, Acc + NumDeleted); + {ok, end_of_stream} -> + {ok, end_of_stream, Acc}; + Ret -> + Ret + end. + %% CT callbacks all() -> emqx_common_test_helpers:all(?MODULE). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 173669919..bb4faa3bc 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -83,6 +83,35 @@ t_iterate(_Config) -> ], ok. +%% Smoke test for deleting messages. +t_delete(_Config) -> + %% Prepare data: + TopicToDelete = <<"foo/bar/baz">>, + Topics = [<<"foo/bar">>, TopicToDelete, <<"a">>], + Timestamps = lists:seq(1, 10), + Batch = [ + make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) + || Topic <- Topics, PublishedAt <- Timestamps + ], + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + + %% Iterate through topics: + StartTime = 0, + TopicFilter = parse_topic(<<"#">>), + Selector = fun(#message{topic = T} = Msg) -> + T == TopicToDelete + end, + NumDeleted = delete(?SHARD, TopicFilter, StartTime, Selector), + ?assertEqual(10, NumDeleted), + + %% Read surviving messages. + Messages = [Msg || {_DSKey, Msg} <- replay(?SHARD, TopicFilter, StartTime)], + MessagesByTopic = maps:groups_from_list(fun emqx_message:topic/1, Messages), + ?assertNot(is_map_key(TopicToDelete, MessagesByTopic), #{msgs => MessagesByTopic}), + ?assertEqual(20, length(Messages)), + + ok. + -define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))). %% Smoke test that verifies that concrete topics are mapped to @@ -509,3 +538,71 @@ keyspace(TC) -> set_keyspace_config(Keyspace, Config) -> ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}). + +delete(Shard, TopicFilter, Time, Selector) -> + Streams = emqx_ds_storage_layer:get_delete_streams(Shard, TopicFilter, Time), + Iterators = lists:map( + fun(Stream) -> + {ok, Iterator} = emqx_ds_storage_layer:make_delete_iterator( + Shard, + Stream, + TopicFilter, + Time + ), + Iterator + end, + Streams + ), + delete(Shard, Iterators, Selector). + +delete(_Shard, [], _Selector) -> + 0; +delete(Shard, Iterators, Selector) -> + {NewIterators0, N} = lists:foldl( + fun(Iterator0, {AccIterators, NAcc}) -> + case emqx_ds_storage_layer:delete_next(Shard, Iterator0, Selector, 10) of + {ok, end_of_stream} -> + {AccIterators, NAcc}; + {ok, _Iterator1, 0} -> + {AccIterators, NAcc}; + {ok, Iterator1, NDeleted} -> + {[Iterator1 | AccIterators], NDeleted + NAcc} + end + end, + {[], 0}, + Iterators + ), + NewIterators1 = lists:reverse(NewIterators0), + N + delete(Shard, NewIterators1, Selector). + +replay(Shard, TopicFilter, Time) -> + StreamsByRank = emqx_ds_storage_layer:get_streams(Shard, TopicFilter, Time), + Iterators = lists:map( + fun({_Rank, Stream}) -> + {ok, Iterator} = emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, Time), + Iterator + end, + StreamsByRank + ), + replay(Shard, Iterators). + +replay(_Shard, []) -> + []; +replay(Shard, Iterators) -> + {NewIterators0, Messages0} = lists:foldl( + fun(Iterator0, {AccIterators, AccMessages}) -> + case emqx_ds_storage_layer:next(Shard, Iterator0, 10) of + {ok, end_of_stream} -> + {AccIterators, AccMessages}; + {ok, _Iterator1, []} -> + {AccIterators, AccMessages}; + {ok, Iterator1, NewMessages} -> + {[Iterator1 | AccIterators], [NewMessages | AccMessages]} + end + end, + {[], []}, + Iterators + ), + Messages1 = lists:flatten(lists:reverse(Messages0)), + NewIterators1 = lists:reverse(NewIterators0), + Messages1 ++ replay(Shard, NewIterators1).