feat(dsrepl): use more straightforward way to drop ra shards
This commit is contained in:
parent
74881e8706
commit
611b3f0e07
|
@ -295,6 +295,7 @@ drop_db(DB) ->
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
Module ->
|
Module ->
|
||||||
|
_ = persistent_term:erase(?persistent_term(DB)),
|
||||||
Module:drop_db(DB)
|
Module:drop_db(DB)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -196,11 +196,11 @@ drop_generation(DB, {Shard, GenId}) ->
|
||||||
|
|
||||||
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
|
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
|
||||||
drop_db(DB) ->
|
drop_db(DB) ->
|
||||||
Nodes = list_nodes(),
|
foreach_shard(DB, fun(Shard) ->
|
||||||
_ = emqx_ds_proto_v4:drop_db(Nodes, DB),
|
{ok, _} = ra_drop_shard(DB, Shard)
|
||||||
_ = emqx_ds_replication_layer_meta:drop_db(DB),
|
end),
|
||||||
emqx_ds_builtin_sup:stop_db(DB),
|
_ = emqx_ds_proto_v4:drop_db(list_nodes(), DB),
|
||||||
ok.
|
emqx_ds_replication_layer_meta:drop_db(DB).
|
||||||
|
|
||||||
-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
|
-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
|
||||||
emqx_ds:store_batch_result().
|
emqx_ds:store_batch_result().
|
||||||
|
@ -357,8 +357,7 @@ do_drop_db_v1(DB) ->
|
||||||
emqx_ds_builtin_sup:stop_db(DB),
|
emqx_ds_builtin_sup:stop_db(DB),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Shard) ->
|
fun(Shard) ->
|
||||||
emqx_ds_storage_layer:drop_shard({DB, Shard}),
|
emqx_ds_storage_layer:drop_shard({DB, Shard})
|
||||||
ra_drop_shard(DB, Shard)
|
|
||||||
end,
|
end,
|
||||||
MyShards
|
MyShards
|
||||||
).
|
).
|
||||||
|
@ -492,8 +491,6 @@ do_get_delete_streams_v4(DB, Shard, TopicFilter, StartTime) ->
|
||||||
list_nodes() ->
|
list_nodes() ->
|
||||||
mria:running_nodes().
|
mria:running_nodes().
|
||||||
|
|
||||||
%%
|
|
||||||
|
|
||||||
%% TODO
|
%% TODO
|
||||||
%% Too large for normal operation, need better backpressure mechanism.
|
%% Too large for normal operation, need better backpressure mechanism.
|
||||||
-define(RA_TIMEOUT, 60 * 1000).
|
-define(RA_TIMEOUT, 60 * 1000).
|
||||||
|
@ -592,8 +589,7 @@ ra_list_generations_with_lifetimes(DB, Shard) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
ra_drop_shard(DB, Shard) ->
|
ra_drop_shard(DB, Shard) ->
|
||||||
LocalServer = emqx_ds_replication_layer_shard:server(DB, Shard, local),
|
ra:delete_cluster(emqx_ds_replication_layer_shard:shard_servers(DB, Shard), ?RA_TIMEOUT).
|
||||||
ra:force_delete_server(_System = default, LocalServer).
|
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
|
|
@ -17,8 +17,14 @@
|
||||||
-module(emqx_ds_replication_layer_shard).
|
-module(emqx_ds_replication_layer_shard).
|
||||||
|
|
||||||
-export([start_link/3]).
|
-export([start_link/3]).
|
||||||
-export([shard_servers/2]).
|
|
||||||
|
|
||||||
|
%% Static server configuration
|
||||||
|
-export([
|
||||||
|
shard_servers/2,
|
||||||
|
local_server/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Dynamic server location API
|
||||||
-export([
|
-export([
|
||||||
servers/3,
|
servers/3,
|
||||||
server/3
|
server/3
|
||||||
|
|
Loading…
Reference in New Issue