diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index c6fdc69aa..9330e0b1a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -158,7 +158,7 @@ %% Command. Each command is an entry in the replication log. -type ra_command() :: #{ - ?tag := ?BATCH | add_generation | update_config | drop_generation, + ?tag := ?BATCH | add_generation | update_config | drop_generation | storage_event, _ => _ }. @@ -752,7 +752,7 @@ apply( -spec tick(integer(), ra_state()) -> ra_machine:effects(). tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), - {Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest), + {Timestamp, _} = ensure_monotonic_timestamp(timestamp_to_timeus(TimeMs), Latest), ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}), handle_custom_event(DBShard, Timestamp, tick). @@ -794,7 +794,7 @@ handle_custom_event(DBShard, Latest, Event) -> [{append, #{?tag => storage_event, ?payload => I, ?now => Latest}} || I <- Events] catch EC:Err:Stacktrace -> - ?tp(error, ds_storage_custom_even_fail, #{ + ?tp(error, ds_storage_custom_event_fail, #{ EC => Err, stacktrace => Stacktrace, event => Event }), [] diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index d05296a29..ebbcde17c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -275,7 +275,7 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> -spec prepare_batch( emqx_ds_storage_layer:shard_id(), s(), - [{emqx_ds:time(), emqx_types:message()}], + [{emqx_ds:time(), emqx_types:message()}, ...], emqx_ds:message_store_opts() ) -> {ok, cooked_batch()}. @@ -301,7 +301,7 @@ prepare_batch(_ShardId, S, Messages, _Options) -> emqx_ds_storage_layer:shard_id(), s(), cooked_batch() -) -> ok. +) -> ok | emqx_ds:error(_). commit_batch( _ShardId, _Data, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index df1253e1c..d9a0321b0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -225,13 +225,13 @@ [{emqx_ds:time(), emqx_types:message()}, ...], emqx_ds:message_store_opts() ) -> - {ok, term()} | {error, _}. + {ok, term()} | emqx_ds:error(_). -callback commit_batch( shard_id(), _Data, _CookedBatch -) -> ok. +) -> ok | emqx_ds:error(_). -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> [_Stream]. @@ -288,7 +288,7 @@ store_batch(Shard, Messages, Options) -> commit_batch(Shard, CookedBatch); ignore -> ok; - Error = {error, _} -> + Error = {error, _, _} -> Error end. @@ -296,7 +296,7 @@ store_batch(Shard, Messages, Options) -> shard_id(), [{emqx_ds:time(), emqx_types:message()}], emqx_ds:message_store_opts() -) -> {ok, cooked_batch()} | ignore | {error, _}. +) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. @@ -309,7 +309,7 @@ prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> case Mod:prepare_batch(Shard, GenData, Messages, Options) of {ok, CookedBatch} -> {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; - Error = {error, _} -> + Error = {error, _, _} -> Error end, T1 = erlang:monotonic_time(microsecond), @@ -319,7 +319,7 @@ prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> prepare_batch(_Shard, [], _Options) -> ignore. --spec commit_batch(shard_id(), cooked_batch()) -> ok. +-spec commit_batch(shard_id(), cooked_batch()) -> emqx_ds:store_batch_result(). commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) -> #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard), T0 = erlang:monotonic_time(microsecond), diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 04c57aa80..4707e6766 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -42,7 +42,7 @@ opts(Overrides) -> n_sites => 1, replication_factor => 3, replication_options => #{ - wal_max_size_bytes => 64 * 1024, + wal_max_size_bytes => 64, wal_max_batch_size => 1024, snapshot_interval => 128 } diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 3a0145199..288dba0c9 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -87,11 +87,12 @@ topic_messages(TestCase, ClientId) -> topic_messages(TestCase, ClientId, N) -> fun() -> + NBin = integer_to_binary(N), Msg = #message{ from = ClientId, topic = client_topic(TestCase, ClientId), timestamp = N * 100, - payload = integer_to_binary(N) + payload = <> }, [Msg | topic_messages(TestCase, ClientId, N + 1)] end. diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index a38deceeb..8b6db8292 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -164,6 +164,10 @@ repeat(S) -> %% specifies size of the "batch" to be consumed from the stream at a %% time (stream is the second tuple element). If element of the list %% is a plain stream, then the batch size is assumed to be 1. +%% +%% If `ContinueAtEmpty' is `false', and one of the streams returns +%% `[]', then the function will return `[]' as well. Otherwise, it +%% will continue consuming data from the remaining streams. -spec interleave([stream(X) | {non_neg_integer(), stream(X)}], boolean()) -> stream(X). interleave(L0, ContinueAtEmpty) -> L = lists:map( diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 0ab3cdb70..fe340d3ee 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -165,6 +165,14 @@ interleave_test() -> emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], true)) ). +interleave_stop_test() -> + S1 = emqx_utils_stream:const(1), + S2 = emqx_utils_stream:list([a, b, c, d]), + ?assertEqual( + [1, 1, a, b, 1, 1, c, d, 1, 1], + emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], false)) + ). + csv_test() -> Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>, ?assertEqual(