feat(ds): introduce error classes in critical API functions

For now, only recoverable / unrecoverable errors are introduced.
This commit is contained in:
Andrew Mayorov 2024-03-01 19:10:25 +01:00
parent 1f38813cb9
commit 2146d9e1fe
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 75 additions and 57 deletions

View File

@ -68,6 +68,8 @@
make_iterator_result/1, make_iterator_result/0, make_iterator_result/1, make_iterator_result/0,
make_delete_iterator_result/1, make_delete_iterator_result/0, make_delete_iterator_result/1, make_delete_iterator_result/0,
error/1,
ds_specific_stream/0, ds_specific_stream/0,
ds_specific_iterator/0, ds_specific_iterator/0,
ds_specific_generation_rank/0, ds_specific_generation_rank/0,
@ -118,14 +120,14 @@
-type message_key() :: binary(). -type message_key() :: binary().
-type store_batch_result() :: ok | {error, _}. -type store_batch_result() :: ok | error(_).
-type make_iterator_result(Iterator) :: {ok, Iterator} | {error, _}. -type make_iterator_result(Iterator) :: {ok, Iterator} | error(_).
-type make_iterator_result() :: make_iterator_result(iterator()). -type make_iterator_result() :: make_iterator_result(iterator()).
-type next_result(Iterator) :: -type next_result(Iterator) ::
{ok, Iterator, [{message_key(), emqx_types:message()}]} | {ok, end_of_stream} | {error, _}. {ok, Iterator, [{message_key(), emqx_types:message()}]} | {ok, end_of_stream} | error(_).
-type next_result() :: next_result(iterator()). -type next_result() :: next_result(iterator()).
@ -142,6 +144,8 @@
-type delete_next_result() :: delete_next_result(delete_iterator()). -type delete_next_result() :: delete_next_result(delete_iterator()).
-type error(Reason) :: {error, recoverable | unrecoverable, Reason}.
%% Timestamp %% Timestamp
%% Earliest possible timestamp is 0. %% Earliest possible timestamp is 0.
%% TODO granularity? Currently, we should always use milliseconds, as that's the unit we %% TODO granularity? Currently, we should always use milliseconds, as that's the unit we

View File

@ -171,7 +171,14 @@ drop_db(DB) ->
-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> -spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(DB, Messages, Opts) -> store_batch(DB, Messages, Opts) ->
emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts). case emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts) of
ok ->
ok;
Error = {error, _, _} ->
Error;
RPCError = {badrpc, _} ->
{error, recoverable, RPCError}
end.
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{emqx_ds:stream_rank(), stream()}]. [{emqx_ds:stream_rank(), stream()}].
@ -180,7 +187,14 @@ get_streams(DB, TopicFilter, StartTime) ->
lists:flatmap( lists:flatmap(
fun(Shard) -> fun(Shard) ->
Node = node_of_shard(DB, Shard), Node = node_of_shard(DB, Shard),
Streams = emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime), Streams =
try
emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime)
catch
error:{erpc, _} ->
%% TODO: log?
[]
end,
lists:map( lists:map(
fun({RankY, StorageLayerStream}) -> fun({RankY, StorageLayerStream}) ->
RankX = Shard, RankX = Shard,
@ -198,35 +212,29 @@ get_streams(DB, TopicFilter, StartTime) ->
make_iterator(DB, Stream, TopicFilter, StartTime) -> make_iterator(DB, Stream, TopicFilter, StartTime) ->
?stream_v2(Shard, StorageStream) = Stream, ?stream_v2(Shard, StorageStream) = Stream,
Node = node_of_shard(DB, Shard), Node = node_of_shard(DB, Shard),
case emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of try emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
{ok, Iter} -> {ok, Iter} ->
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
Err = {error, _} -> Error = {error, _, _} ->
Err Error
catch
error:RPCError = {erpc, _} ->
{error, recoverable, RPCError}
end. end.
-spec update_iterator( -spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) ->
emqx_ds:db(),
iterator(),
emqx_ds:message_key()
) ->
emqx_ds:make_iterator_result(iterator()). emqx_ds:make_iterator_result(iterator()).
update_iterator(DB, OldIter, DSKey) -> update_iterator(DB, OldIter, DSKey) ->
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
Node = node_of_shard(DB, Shard), Node = node_of_shard(DB, Shard),
case try emqx_ds_proto_v4:update_iterator(Node, DB, Shard, StorageIter, DSKey) of
emqx_ds_proto_v4:update_iterator(
Node,
DB,
Shard,
StorageIter,
DSKey
)
of
{ok, Iter} -> {ok, Iter} ->
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
Err = {error, _} -> Error = {error, _, _} ->
Err Error
catch
error:RPCError = {erpc, _} ->
{error, recoverable, RPCError}
end. end.
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
@ -245,8 +253,12 @@ next(DB, Iter0, BatchSize) ->
{ok, StorageIter, Batch} -> {ok, StorageIter, Batch} ->
Iter = Iter0#{?enc := StorageIter}, Iter = Iter0#{?enc := StorageIter},
{ok, Iter, Batch}; {ok, Iter, Batch};
Other -> Ok = {ok, _} ->
Other Ok;
Error = {error, _, _} ->
Error;
RPCError = {badrpc, _} ->
{error, recoverable, RPCError}
end. end.
-spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). -spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
@ -337,7 +349,7 @@ do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
emqx_ds:topic_filter(), emqx_ds:topic_filter(),
emqx_ds:time() emqx_ds:time()
) -> ) ->
{ok, emqx_ds_storage_layer:iterator()} | {error, _}. emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) -> do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
error(obsolete_api). error(obsolete_api).
@ -348,7 +360,7 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
emqx_ds:topic_filter(), emqx_ds:topic_filter(),
emqx_ds:time() emqx_ds:time()
) -> ) ->
{ok, emqx_ds_storage_layer:iterator()} | {error, _}. emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) -> do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime). emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime).

View File

@ -230,7 +230,7 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{}) ->
emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts() emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
) -> ) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options = #{atomic := true}) -> store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
{ok, Batch} = rocksdb:batch(), {ok, Batch} = rocksdb:batch(),
lists:foreach( lists:foreach(
fun(Msg) -> fun(Msg) ->
@ -240,18 +240,17 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options = #{atomi
end, end,
Messages Messages
), ),
Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []), Result = rocksdb:write_batch(DB, Batch, []),
rocksdb:release_batch(Batch), rocksdb:release_batch(Batch),
Res; %% NOTE
store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> %% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to
lists:foreach( %% observe until there's `{no_slowdown, true}` in write options.
fun(Msg) -> case Result of
{Key, _} = make_key(S, Msg), ok ->
Val = serialize(Msg), ok;
rocksdb:put(DB, Data, Key, Val, []) {error, {error, Reason}} ->
end, {error, unrecoverable, {rocksdb, Reason}}
Messages end.
).
-spec get_streams( -spec get_streams(
emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:shard_id(),

View File

@ -256,12 +256,10 @@ make_iterator(
Err Err
end; end;
{error, not_found} -> {error, not_found} ->
{error, end_of_stream} {error, unrecoverable, generation_not_found}
end. end.
-spec update_iterator( -spec update_iterator(shard_id(), iterator(), emqx_ds:message_key()) ->
shard_id(), iterator(), emqx_ds:message_key()
) ->
emqx_ds:make_iterator_result(iterator()). emqx_ds:make_iterator_result(iterator()).
update_iterator( update_iterator(
Shard, Shard,
@ -281,7 +279,7 @@ update_iterator(
Err Err
end; end;
{error, not_found} -> {error, not_found} ->
{error, end_of_stream} {error, unrecoverable, generation_not_found}
end. end.
-spec next(shard_id(), iterator(), pos_integer()) -> -spec next(shard_id(), iterator(), pos_integer()) ->
@ -298,12 +296,12 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch
{ok, end_of_stream}; {ok, end_of_stream};
{ok, GenIter, Batch} -> {ok, GenIter, Batch} ->
{ok, Iter#{?enc := GenIter}, Batch}; {ok, Iter#{?enc := GenIter}, Batch};
Error = {error, _} -> Error = {error, _, _} ->
Error Error
end; end;
{error, not_found} -> {error, not_found} ->
%% generation was possibly dropped by GC %% generation was possibly dropped by GC
{ok, end_of_stream} {error, unrecoverable, generation_not_found}
end. end.
-spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok. -spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok.

View File

@ -404,7 +404,10 @@ t_drop_generation_with_never_used_iterator(_Config) ->
], ],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter0, 1)), ?assertMatch(
{error, unrecoverable, generation_not_found, []},
iterate(DB, Iter0, 1)
),
%% New iterator for the new stream will only see the later messages. %% New iterator for the new stream will only see the later messages.
[{_, Stream1}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), [{_, Stream1}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
@ -453,9 +456,10 @@ t_drop_generation_with_used_once_iterator(_Config) ->
], ],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter1, 1)), ?assertMatch(
{error, unrecoverable, generation_not_found, []},
ok. iterate(DB, Iter1, 1)
).
t_drop_generation_update_iterator(_Config) -> t_drop_generation_update_iterator(_Config) ->
%% This checks the behavior of `emqx_ds:update_iterator' after the generation %% This checks the behavior of `emqx_ds:update_iterator' after the generation
@ -481,9 +485,10 @@ t_drop_generation_update_iterator(_Config) ->
ok = emqx_ds:add_generation(DB), ok = emqx_ds:add_generation(DB),
ok = emqx_ds:drop_generation(DB, GenId0), ok = emqx_ds:drop_generation(DB, GenId0),
?assertEqual({error, end_of_stream}, emqx_ds:update_iterator(DB, Iter1, Key2)), ?assertEqual(
{error, unrecoverable, generation_not_found},
ok. emqx_ds:update_iterator(DB, Iter1, Key2)
).
t_make_iterator_stale_stream(_Config) -> t_make_iterator_stale_stream(_Config) ->
%% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying %% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying
@ -507,7 +512,7 @@ t_make_iterator_stale_stream(_Config) ->
ok = emqx_ds:drop_generation(DB, GenId0), ok = emqx_ds:drop_generation(DB, GenId0),
?assertEqual( ?assertEqual(
{error, end_of_stream}, {error, unrecoverable, generation_not_found},
emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime) emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime)
), ),
@ -605,8 +610,8 @@ iterate(DB, It0, BatchSize, Acc) ->
iterate(DB, It, BatchSize, Acc ++ Msgs); iterate(DB, It, BatchSize, Acc ++ Msgs);
{ok, end_of_stream} -> {ok, end_of_stream} ->
{ok, end_of_stream, Acc}; {ok, end_of_stream, Acc};
Ret -> {error, Class, Reason} ->
Ret {error, Class, Reason, Acc}
end. end.
%% CT callbacks %% CT callbacks