chore(ds): Update BPAPI version
This commit is contained in:
parent
eee221f1d0
commit
305a54f646
|
@ -121,7 +121,7 @@ open_db(DB, CreateOpts) ->
|
||||||
-spec add_generation(emqx_ds:db()) -> ok | {error, _}.
|
-spec add_generation(emqx_ds:db()) -> ok | {error, _}.
|
||||||
add_generation(DB) ->
|
add_generation(DB) ->
|
||||||
Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
|
Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
|
||||||
_ = emqx_ds_proto_v2:add_generation(Nodes, DB),
|
_ = emqx_ds_proto_v3:add_generation(Nodes, DB),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
|
-spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
|
||||||
|
@ -157,7 +157,7 @@ drop_generation(DB, {Shard, GenId}) ->
|
||||||
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
|
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
|
||||||
drop_db(DB) ->
|
drop_db(DB) ->
|
||||||
Nodes = list_nodes(),
|
Nodes = list_nodes(),
|
||||||
_ = emqx_ds_proto_v2:drop_db(Nodes, DB),
|
_ = emqx_ds_proto_v3:drop_db(Nodes, DB),
|
||||||
_ = emqx_ds_replication_layer_meta:drop_db(DB),
|
_ = emqx_ds_replication_layer_meta:drop_db(DB),
|
||||||
emqx_ds_builtin_sup:stop_db(DB),
|
emqx_ds_builtin_sup:stop_db(DB),
|
||||||
ok.
|
ok.
|
||||||
|
@ -174,7 +174,7 @@ get_streams(DB, TopicFilter, StartTime) ->
|
||||||
lists:flatmap(
|
lists:flatmap(
|
||||||
fun(Shard) ->
|
fun(Shard) ->
|
||||||
Node = node_of_shard(DB, Shard),
|
Node = node_of_shard(DB, Shard),
|
||||||
Streams = emqx_ds_proto_v1:get_streams(Node, DB, Shard, TopicFilter, StartTime),
|
Streams = emqx_ds_proto_v3:get_streams(Node, DB, Shard, TopicFilter, StartTime),
|
||||||
lists:map(
|
lists:map(
|
||||||
fun({RankY, Stream}) ->
|
fun({RankY, Stream}) ->
|
||||||
RankX = Shard,
|
RankX = Shard,
|
||||||
|
@ -196,7 +196,7 @@ get_streams(DB, TopicFilter, StartTime) ->
|
||||||
make_iterator(DB, Stream, TopicFilter, StartTime) ->
|
make_iterator(DB, Stream, TopicFilter, StartTime) ->
|
||||||
#{?tag := ?STREAM, ?shard := Shard, ?enc := StorageStream} = Stream,
|
#{?tag := ?STREAM, ?shard := Shard, ?enc := StorageStream} = Stream,
|
||||||
Node = node_of_shard(DB, Shard),
|
Node = node_of_shard(DB, Shard),
|
||||||
case emqx_ds_proto_v1:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
|
case emqx_ds_proto_v3:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
|
||||||
{ok, Iter} ->
|
{ok, Iter} ->
|
||||||
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
|
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
|
||||||
Err = {error, _} ->
|
Err = {error, _} ->
|
||||||
|
@ -213,7 +213,7 @@ update_iterator(DB, OldIter, DSKey) ->
|
||||||
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
|
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
|
||||||
Node = node_of_shard(DB, Shard),
|
Node = node_of_shard(DB, Shard),
|
||||||
case
|
case
|
||||||
emqx_ds_proto_v2:update_iterator(
|
emqx_ds_proto_v3:update_iterator(
|
||||||
Node,
|
Node,
|
||||||
DB,
|
DB,
|
||||||
Shard,
|
Shard,
|
||||||
|
@ -239,7 +239,7 @@ next(DB, Iter0, BatchSize) ->
|
||||||
%%
|
%%
|
||||||
%% This kind of trickery should be probably done here in the
|
%% This kind of trickery should be probably done here in the
|
||||||
%% replication layer. Or, perhaps, in the logic layer.
|
%% replication layer. Or, perhaps, in the logic layer.
|
||||||
case emqx_ds_proto_v1:next(Node, DB, Shard, StorageIter0, BatchSize) of
|
case emqx_ds_proto_v3:next(Node, DB, Shard, StorageIter0, BatchSize) of
|
||||||
{ok, StorageIter, Batch} ->
|
{ok, StorageIter, Batch} ->
|
||||||
Iter = Iter0#{?enc := StorageIter},
|
Iter = Iter0#{?enc := StorageIter},
|
||||||
{ok, Iter, Batch};
|
{ok, Iter, Batch};
|
||||||
|
|
|
@ -172,4 +172,4 @@ do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Repl
|
||||||
|
|
||||||
start_timer() ->
|
start_timer() ->
|
||||||
Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
|
Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
|
||||||
erlang:send_after(Interval, self(), flush).
|
erlang:send_after(Interval, self(), ?flush).
|
||||||
|
|
Loading…
Reference in New Issue