fix(ds): Apply review remarks

This commit is contained in:
ieQu1 2024-05-13 11:14:35 +02:00
parent 07aa708894
commit 9f7ef9f34f
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
7 changed files with 26 additions and 13 deletions

View File

@ -158,7 +158,7 @@
%% Command. Each command is an entry in the replication log. %% Command. Each command is an entry in the replication log.
-type ra_command() :: #{ -type ra_command() :: #{
?tag := ?BATCH | add_generation | update_config | drop_generation, ?tag := ?BATCH | add_generation | update_config | drop_generation | storage_event,
_ => _ _ => _
}. }.
@ -752,7 +752,7 @@ apply(
-spec tick(integer(), ra_state()) -> ra_machine:effects(). -spec tick(integer(), ra_state()) -> ra_machine:effects().
tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
%% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard),
{Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest), {Timestamp, _} = ensure_monotonic_timestamp(timestamp_to_timeus(TimeMs), Latest),
?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}), ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}),
handle_custom_event(DBShard, Timestamp, tick). handle_custom_event(DBShard, Timestamp, tick).
@ -794,7 +794,7 @@ handle_custom_event(DBShard, Latest, Event) ->
[{append, #{?tag => storage_event, ?payload => I, ?now => Latest}} || I <- Events] [{append, #{?tag => storage_event, ?payload => I, ?now => Latest}} || I <- Events]
catch catch
EC:Err:Stacktrace -> EC:Err:Stacktrace ->
?tp(error, ds_storage_custom_even_fail, #{ ?tp(error, ds_storage_custom_event_fail, #{
EC => Err, stacktrace => Stacktrace, event => Event EC => Err, stacktrace => Stacktrace, event => Event
}), }),
[] []

View File

@ -275,7 +275,7 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) ->
-spec prepare_batch( -spec prepare_batch(
emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:shard_id(),
s(), s(),
[{emqx_ds:time(), emqx_types:message()}], [{emqx_ds:time(), emqx_types:message()}, ...],
emqx_ds:message_store_opts() emqx_ds:message_store_opts()
) -> ) ->
{ok, cooked_batch()}. {ok, cooked_batch()}.
@ -301,7 +301,7 @@ prepare_batch(_ShardId, S, Messages, _Options) ->
emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:shard_id(),
s(), s(),
cooked_batch() cooked_batch()
) -> ok. ) -> ok | emqx_ds:error(_).
commit_batch( commit_batch(
_ShardId, _ShardId,
_Data, _Data,

View File

@ -225,13 +225,13 @@
[{emqx_ds:time(), emqx_types:message()}, ...], [{emqx_ds:time(), emqx_types:message()}, ...],
emqx_ds:message_store_opts() emqx_ds:message_store_opts()
) -> ) ->
{ok, term()} | {error, _}. {ok, term()} | emqx_ds:error(_).
-callback commit_batch( -callback commit_batch(
shard_id(), shard_id(),
_Data, _Data,
_CookedBatch _CookedBatch
) -> ok. ) -> ok | emqx_ds:error(_).
-callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) ->
[_Stream]. [_Stream].
@ -288,7 +288,7 @@ store_batch(Shard, Messages, Options) ->
commit_batch(Shard, CookedBatch); commit_batch(Shard, CookedBatch);
ignore -> ignore ->
ok; ok;
Error = {error, _} -> Error = {error, _, _} ->
Error Error
end. end.
@ -296,7 +296,7 @@ store_batch(Shard, Messages, Options) ->
shard_id(), shard_id(),
[{emqx_ds:time(), emqx_types:message()}], [{emqx_ds:time(), emqx_types:message()}],
emqx_ds:message_store_opts() emqx_ds:message_store_opts()
) -> {ok, cooked_batch()} | ignore | {error, _}. ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_).
prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
%% NOTE %% NOTE
%% We assume that batches do not span generations. Callers should enforce this. %% We assume that batches do not span generations. Callers should enforce this.
@ -309,7 +309,7 @@ prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
case Mod:prepare_batch(Shard, GenData, Messages, Options) of case Mod:prepare_batch(Shard, GenData, Messages, Options) of
{ok, CookedBatch} -> {ok, CookedBatch} ->
{ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
Error = {error, _} -> Error = {error, _, _} ->
Error Error
end, end,
T1 = erlang:monotonic_time(microsecond), T1 = erlang:monotonic_time(microsecond),
@ -319,7 +319,7 @@ prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
prepare_batch(_Shard, [], _Options) -> prepare_batch(_Shard, [], _Options) ->
ignore. ignore.
-spec commit_batch(shard_id(), cooked_batch()) -> ok. -spec commit_batch(shard_id(), cooked_batch()) -> emqx_ds:store_batch_result().
commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) -> commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) ->
#{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard), #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard),
T0 = erlang:monotonic_time(microsecond), T0 = erlang:monotonic_time(microsecond),

View File

@ -42,7 +42,7 @@ opts(Overrides) ->
n_sites => 1, n_sites => 1,
replication_factor => 3, replication_factor => 3,
replication_options => #{ replication_options => #{
wal_max_size_bytes => 64 * 1024, wal_max_size_bytes => 64,
wal_max_batch_size => 1024, wal_max_batch_size => 1024,
snapshot_interval => 128 snapshot_interval => 128
} }

View File

@ -87,11 +87,12 @@ topic_messages(TestCase, ClientId) ->
topic_messages(TestCase, ClientId, N) -> topic_messages(TestCase, ClientId, N) ->
fun() -> fun() ->
NBin = integer_to_binary(N),
Msg = #message{ Msg = #message{
from = ClientId, from = ClientId,
topic = client_topic(TestCase, ClientId), topic = client_topic(TestCase, ClientId),
timestamp = N * 100, timestamp = N * 100,
payload = integer_to_binary(N) payload = <<NBin/binary, " ">>
}, },
[Msg | topic_messages(TestCase, ClientId, N + 1)] [Msg | topic_messages(TestCase, ClientId, N + 1)]
end. end.

View File

@ -164,6 +164,10 @@ repeat(S) ->
%% specifies size of the "batch" to be consumed from the stream at a %% specifies size of the "batch" to be consumed from the stream at a
%% time (stream is the second tuple element). If element of the list %% time (stream is the second tuple element). If element of the list
%% is a plain stream, then the batch size is assumed to be 1. %% is a plain stream, then the batch size is assumed to be 1.
%%
%% If `ContinueAtEmpty' is `false', and one of the streams returns
%% `[]', then the function will return `[]' as well. Otherwise, it
%% will continue consuming data from the remaining streams.
-spec interleave([stream(X) | {non_neg_integer(), stream(X)}], boolean()) -> stream(X). -spec interleave([stream(X) | {non_neg_integer(), stream(X)}], boolean()) -> stream(X).
interleave(L0, ContinueAtEmpty) -> interleave(L0, ContinueAtEmpty) ->
L = lists:map( L = lists:map(

View File

@ -165,6 +165,14 @@ interleave_test() ->
emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], true)) emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], true))
). ).
interleave_stop_test() ->
S1 = emqx_utils_stream:const(1),
S2 = emqx_utils_stream:list([a, b, c, d]),
?assertEqual(
[1, 1, a, b, 1, 1, c, d, 1, 1],
emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], false))
).
csv_test() -> csv_test() ->
Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>, Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>,
?assertEqual( ?assertEqual(