Compare commits
10 Commits
master
...
ft/EMQX-11
Author | SHA1 | Date |
---|---|---|
![]() |
393a1b1391 | |
![]() |
c6085a6ab0 | |
![]() |
0b69c8b7f3 | |
![]() |
534e177e7c | |
![]() |
663ea69574 | |
![]() |
0e36f7afa4 | |
![]() |
4dbf68716a | |
![]() |
5d6efa622c | |
![]() |
cc9926f159 | |
![]() |
9e34fcd1d7 |
|
@ -136,6 +136,8 @@
|
||||||
%% TODO: Needs configuration?
|
%% TODO: Needs configuration?
|
||||||
-define(TIMEOUT_RETRY_REPLAY, 1000).
|
-define(TIMEOUT_RETRY_REPLAY, 1000).
|
||||||
|
|
||||||
|
-define(TIMEOUT_STREAM_TAINT, 1000).
|
||||||
|
|
||||||
-type session() :: #{
|
-type session() :: #{
|
||||||
%% Client ID
|
%% Client ID
|
||||||
id := id(),
|
id := id(),
|
||||||
|
@ -502,7 +504,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
|
||||||
{Publishes, Session1} =
|
{Publishes, Session1} =
|
||||||
case ?IS_REPLAY_ONGOING(Session0) of
|
case ?IS_REPLAY_ONGOING(Session0) of
|
||||||
false ->
|
false ->
|
||||||
drain_buffer(fetch_new_messages(Session0, ClientInfo));
|
drain_buffer(clear_stream_taints(fetch_new_messages(Session0, ClientInfo)));
|
||||||
true ->
|
true ->
|
||||||
{[], Session0}
|
{[], Session0}
|
||||||
end,
|
end,
|
||||||
|
@ -511,7 +513,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
|
||||||
[] ->
|
[] ->
|
||||||
get_config(ClientInfo, [idle_poll_interval]);
|
get_config(ClientInfo, [idle_poll_interval]);
|
||||||
[_ | _] ->
|
[_ | _] ->
|
||||||
0
|
get_config(ClientInfo, [idle_poll_interval]) div length(Publishes)
|
||||||
end,
|
end,
|
||||||
Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
|
Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
|
||||||
{ok, Publishes, Session};
|
{ok, Publishes, Session};
|
||||||
|
@ -567,7 +569,7 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo)
|
||||||
RetryTimeout = ?TIMEOUT_RETRY_REPLAY,
|
RetryTimeout = ?TIMEOUT_RETRY_REPLAY,
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "failed_to_fetch_replay_batch",
|
msg => "failed_to_fetch_replay_batch",
|
||||||
stream => Srs0,
|
iterator => Srs0#srs.it_end,
|
||||||
reason => Reason,
|
reason => Reason,
|
||||||
class => recoverable,
|
class => recoverable,
|
||||||
retry_in_ms => RetryTimeout
|
retry_in_ms => RetryTimeout
|
||||||
|
@ -837,7 +839,8 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
|
||||||
last_seqno_qos1 = SN1,
|
last_seqno_qos1 = SN1,
|
||||||
last_seqno_qos2 = SN2
|
last_seqno_qos2 = SN2
|
||||||
},
|
},
|
||||||
case enqueue_batch(false, BatchSize, Srs1, Session0, ClientInfo) of
|
Tainted = has_stream_taint(StreamKey, Session0),
|
||||||
|
case Tainted orelse enqueue_batch(false, BatchSize, Srs1, Session0, ClientInfo) of
|
||||||
{ok, Srs, Session} ->
|
{ok, Srs, Session} ->
|
||||||
S1 = emqx_persistent_session_ds_state:put_seqno(
|
S1 = emqx_persistent_session_ds_state:put_seqno(
|
||||||
?next(?QOS_1),
|
?next(?QOS_1),
|
||||||
|
@ -859,6 +862,9 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
|
||||||
reason => Reason,
|
reason => Reason,
|
||||||
class => Class
|
class => Class
|
||||||
}),
|
}),
|
||||||
|
taint_stream(StreamKey, Session0);
|
||||||
|
Tainted = true ->
|
||||||
|
%% Stream is tainted, skip for now.
|
||||||
Session0
|
Session0
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -980,6 +986,22 @@ process_batch(
|
||||||
IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight
|
IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight
|
||||||
).
|
).
|
||||||
|
|
||||||
|
taint_stream(StreamKey, Session) ->
|
||||||
|
Taints = maps:get(taints, Session, #{}),
|
||||||
|
Until = now_ms() + ?TIMEOUT_STREAM_TAINT,
|
||||||
|
Session#{taints => Taints#{StreamKey => Until}}.
|
||||||
|
|
||||||
|
has_stream_taint(StreamKey, Session) ->
|
||||||
|
Taints = maps:get(taints, Session, #{}),
|
||||||
|
maps:is_key(StreamKey, Taints).
|
||||||
|
|
||||||
|
clear_stream_taints(Session = #{taints := Taints0}) ->
|
||||||
|
Now = now_ms(),
|
||||||
|
Taints = maps:filter(fun(_StreamKey, Until) -> Until > Now end, Taints0),
|
||||||
|
Session#{taints := Taints};
|
||||||
|
clear_stream_taints(Session = #{}) ->
|
||||||
|
Session.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Transient messages
|
%% Transient messages
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -208,16 +208,24 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
|
||||||
?SLOG(debug, #{
|
?SLOG(debug, #{
|
||||||
msg => new_stream, key => Key, stream => Stream
|
msg => new_stream, key => Key, stream => Stream
|
||||||
}),
|
}),
|
||||||
{ok, Iterator} = emqx_ds:make_iterator(
|
case emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime) of
|
||||||
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
|
{ok, Iterator} ->
|
||||||
),
|
NewStreamState = #srs{
|
||||||
NewStreamState = #srs{
|
rank_x = RankX,
|
||||||
rank_x = RankX,
|
rank_y = RankY,
|
||||||
rank_y = RankY,
|
it_begin = Iterator,
|
||||||
it_begin = Iterator,
|
it_end = Iterator
|
||||||
it_end = Iterator
|
},
|
||||||
},
|
emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
|
||||||
emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
|
{error, recoverable, Reason} ->
|
||||||
|
?SLOG(warning, #{
|
||||||
|
msg => "failed_to_initialize_stream_iterator",
|
||||||
|
stream => {Key, Stream},
|
||||||
|
class => recoverable,
|
||||||
|
reason => Reason
|
||||||
|
}),
|
||||||
|
S
|
||||||
|
end;
|
||||||
#srs{} ->
|
#srs{} ->
|
||||||
S
|
S
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -21,7 +21,16 @@
|
||||||
-behaviour(supervisor).
|
-behaviour(supervisor).
|
||||||
|
|
||||||
%% API:
|
%% API:
|
||||||
-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1, ensure_egress/1]).
|
-export([
|
||||||
|
start_db/2,
|
||||||
|
start_shard/1,
|
||||||
|
start_egress/1,
|
||||||
|
stop_shard/1,
|
||||||
|
terminate_storage/1,
|
||||||
|
restart_storage/1,
|
||||||
|
ensure_shard/1,
|
||||||
|
ensure_egress/1
|
||||||
|
]).
|
||||||
-export([which_shards/1]).
|
-export([which_shards/1]).
|
||||||
|
|
||||||
%% behaviour callbacks:
|
%% behaviour callbacks:
|
||||||
|
@ -64,12 +73,22 @@ start_shard({DB, Shard}) ->
|
||||||
start_egress({DB, Shard}) ->
|
start_egress({DB, Shard}) ->
|
||||||
supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)).
|
supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)).
|
||||||
|
|
||||||
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
|
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok.
|
||||||
stop_shard(Shard = {DB, _}) ->
|
stop_shard(Shard = {DB, _}) ->
|
||||||
Sup = ?via(#?shards_sup{db = DB}),
|
Sup = ?via(#?shards_sup{db = DB}),
|
||||||
ok = supervisor:terminate_child(Sup, Shard),
|
ok = supervisor:terminate_child(Sup, Shard),
|
||||||
ok = supervisor:delete_child(Sup, Shard).
|
ok = supervisor:delete_child(Sup, Shard).
|
||||||
|
|
||||||
|
-spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}.
|
||||||
|
terminate_storage({DB, Shard}) ->
|
||||||
|
Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
|
||||||
|
supervisor:terminate_child(Sup, {Shard, storage}).
|
||||||
|
|
||||||
|
-spec restart_storage(emqx_ds_storage_layer:shard_id()) -> {ok, _Child} | {error, _Reason}.
|
||||||
|
restart_storage({DB, Shard}) ->
|
||||||
|
Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
|
||||||
|
supervisor:restart_child(Sup, {Shard, storage}).
|
||||||
|
|
||||||
-spec ensure_shard(emqx_ds_storage_layer:shard_id()) ->
|
-spec ensure_shard(emqx_ds_storage_layer:shard_id()) ->
|
||||||
ok | {error, _Reason}.
|
ok | {error, _Reason}.
|
||||||
ensure_shard(Shard) ->
|
ensure_shard(Shard) ->
|
||||||
|
|
|
@ -263,12 +263,14 @@ trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec get_id_for_key(trie(), state(), edge()) -> static_key().
|
-spec get_id_for_key(trie(), state(), edge()) -> static_key().
|
||||||
get_id_for_key(#trie{static_key_size = Size}, _State, _Token) ->
|
get_id_for_key(#trie{static_key_size = Size}, State, Token) when Size =< 32 ->
|
||||||
%% Requirements for the return value:
|
%% Requirements for the return value:
|
||||||
%%
|
%%
|
||||||
%% It should be globally unique for the `{State, Token}` pair. Other
|
%% It should be globally unique for the `{State, Token}` pair. Other
|
||||||
%% than that, there's no requirements. The return value doesn't even
|
%% than that, there's no requirements. The return value doesn't even
|
||||||
%% have to be deterministic, since the states are saved in the trie.
|
%% have to be deterministic, since the states are saved in the trie.
|
||||||
|
%% Yet, it helps a lot if it is, so that applying the same sequence
|
||||||
|
%% of topics to different tries will result in the same trie state.
|
||||||
%%
|
%%
|
||||||
%% The generated value becomes the ID of the topic in the durable
|
%% The generated value becomes the ID of the topic in the durable
|
||||||
%% storage. Its size should be relatively small to reduce the
|
%% storage. Its size should be relatively small to reduce the
|
||||||
|
@ -277,7 +279,7 @@ get_id_for_key(#trie{static_key_size = Size}, _State, _Token) ->
|
||||||
%% If we want to impress computer science crowd, sorry, I mean to
|
%% If we want to impress computer science crowd, sorry, I mean to
|
||||||
%% minimize storage requirements, we can even employ Huffman coding
|
%% minimize storage requirements, we can even employ Huffman coding
|
||||||
%% based on the frequency of messages.
|
%% based on the frequency of messages.
|
||||||
<<Int:(Size * 8)>> = crypto:strong_rand_bytes(Size),
|
<<Int:(Size * 8), _/bytes>> = crypto:hash(sha256, term_to_binary([State | Token])),
|
||||||
Int.
|
Int.
|
||||||
|
|
||||||
%% erlfmt-ignore
|
%% erlfmt-ignore
|
||||||
|
|
|
@ -43,7 +43,6 @@
|
||||||
-export([
|
-export([
|
||||||
%% RPC Targets:
|
%% RPC Targets:
|
||||||
do_drop_db_v1/1,
|
do_drop_db_v1/1,
|
||||||
do_store_batch_v1/4,
|
|
||||||
do_get_streams_v1/4,
|
do_get_streams_v1/4,
|
||||||
do_get_streams_v2/4,
|
do_get_streams_v2/4,
|
||||||
do_make_iterator_v2/5,
|
do_make_iterator_v2/5,
|
||||||
|
@ -53,11 +52,11 @@
|
||||||
do_get_delete_streams_v4/4,
|
do_get_delete_streams_v4/4,
|
||||||
do_make_delete_iterator_v4/5,
|
do_make_delete_iterator_v4/5,
|
||||||
do_delete_next_v4/5,
|
do_delete_next_v4/5,
|
||||||
%% Unused:
|
|
||||||
do_drop_generation_v3/3,
|
|
||||||
%% Obsolete:
|
%% Obsolete:
|
||||||
|
do_store_batch_v1/4,
|
||||||
do_make_iterator_v1/5,
|
do_make_iterator_v1/5,
|
||||||
do_add_generation_v2/1,
|
do_add_generation_v2/1,
|
||||||
|
do_drop_generation_v3/3,
|
||||||
|
|
||||||
%% Egress API:
|
%% Egress API:
|
||||||
ra_store_batch/3
|
ra_store_batch/3
|
||||||
|
@ -65,7 +64,9 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
init/1,
|
init/1,
|
||||||
apply/3
|
apply/3,
|
||||||
|
|
||||||
|
snapshot_module/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([
|
-export_type([
|
||||||
|
@ -80,6 +81,10 @@
|
||||||
batch/0
|
batch/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export_type([
|
||||||
|
ra_state/0
|
||||||
|
]).
|
||||||
|
|
||||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||||
-include("emqx_ds_replication_layer.hrl").
|
-include("emqx_ds_replication_layer.hrl").
|
||||||
|
|
||||||
|
@ -133,6 +138,8 @@
|
||||||
|
|
||||||
-type message_id() :: emqx_ds:message_id().
|
-type message_id() :: emqx_ds:message_id().
|
||||||
|
|
||||||
|
%% TODO: this type is obsolete and is kept only for compatibility with
|
||||||
|
%% BPAPIs. Remove it when emqx_ds_proto_v4 is gone (EMQX 5.6)
|
||||||
-type batch() :: #{
|
-type batch() :: #{
|
||||||
?tag := ?BATCH,
|
?tag := ?BATCH,
|
||||||
?batch_messages := [emqx_types:message()]
|
?batch_messages := [emqx_types:message()]
|
||||||
|
@ -140,6 +147,20 @@
|
||||||
|
|
||||||
-type generation_rank() :: {shard_id(), term()}.
|
-type generation_rank() :: {shard_id(), term()}.
|
||||||
|
|
||||||
|
%% Core state of the replication, i.e. the state of ra machine.
|
||||||
|
-type ra_state() :: #{
|
||||||
|
db_shard := {emqx_ds:db(), shard_id()},
|
||||||
|
latest := timestamp_us()
|
||||||
|
}.
|
||||||
|
|
||||||
|
%% Command. Each command is an entry in the replication log.
|
||||||
|
-type ra_command() :: #{
|
||||||
|
?tag := ?BATCH | add_generation | update_config | drop_generation,
|
||||||
|
_ => _
|
||||||
|
}.
|
||||||
|
|
||||||
|
-type timestamp_us() :: non_neg_integer().
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API functions
|
%% API functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -181,12 +202,19 @@ list_generations_with_lifetimes(DB) ->
|
||||||
Shards = list_shards(DB),
|
Shards = list_shards(DB),
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(Shard, GensAcc) ->
|
fun(Shard, GensAcc) ->
|
||||||
|
case ra_list_generations_with_lifetimes(DB, Shard) of
|
||||||
|
Gens = #{} ->
|
||||||
|
ok;
|
||||||
|
{error, _Class, _Reason} ->
|
||||||
|
%% TODO: log error
|
||||||
|
Gens = #{}
|
||||||
|
end,
|
||||||
maps:fold(
|
maps:fold(
|
||||||
fun(GenId, Data, AccInner) ->
|
fun(GenId, Data, AccInner) ->
|
||||||
AccInner#{{Shard, GenId} => Data}
|
AccInner#{{Shard, GenId} => Data}
|
||||||
end,
|
end,
|
||||||
GensAcc,
|
GensAcc,
|
||||||
ra_list_generations_with_lifetimes(DB, Shard)
|
Gens
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
#{},
|
#{},
|
||||||
|
@ -221,14 +249,13 @@ get_streams(DB, TopicFilter, StartTime) ->
|
||||||
Shards = list_shards(DB),
|
Shards = list_shards(DB),
|
||||||
lists:flatmap(
|
lists:flatmap(
|
||||||
fun(Shard) ->
|
fun(Shard) ->
|
||||||
Streams =
|
case ra_get_streams(DB, Shard, TopicFilter, StartTime) of
|
||||||
try
|
Streams when is_list(Streams) ->
|
||||||
ra_get_streams(DB, Shard, TopicFilter, StartTime)
|
ok;
|
||||||
catch
|
{error, _Class, _Reason} ->
|
||||||
error:{erpc, _} ->
|
%% TODO: log error
|
||||||
%% TODO: log?
|
Streams = []
|
||||||
[]
|
end,
|
||||||
end,
|
|
||||||
lists:map(
|
lists:map(
|
||||||
fun({RankY, StorageLayerStream}) ->
|
fun({RankY, StorageLayerStream}) ->
|
||||||
RankX = Shard,
|
RankX = Shard,
|
||||||
|
@ -262,14 +289,11 @@ get_delete_streams(DB, TopicFilter, StartTime) ->
|
||||||
emqx_ds:make_iterator_result(iterator()).
|
emqx_ds:make_iterator_result(iterator()).
|
||||||
make_iterator(DB, Stream, TopicFilter, StartTime) ->
|
make_iterator(DB, Stream, TopicFilter, StartTime) ->
|
||||||
?stream_v2(Shard, StorageStream) = Stream,
|
?stream_v2(Shard, StorageStream) = Stream,
|
||||||
try ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
|
case ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
|
||||||
{ok, Iter} ->
|
{ok, Iter} ->
|
||||||
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
|
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
|
||||||
Error = {error, _, _} ->
|
Error = {error, _, _} ->
|
||||||
Error
|
Error
|
||||||
catch
|
|
||||||
error:RPCError = {erpc, _} ->
|
|
||||||
{error, recoverable, RPCError}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec make_delete_iterator(emqx_ds:db(), delete_stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
-spec make_delete_iterator(emqx_ds:db(), delete_stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||||
|
@ -279,22 +303,19 @@ make_delete_iterator(DB, Stream, TopicFilter, StartTime) ->
|
||||||
case ra_make_delete_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
|
case ra_make_delete_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
|
||||||
{ok, Iter} ->
|
{ok, Iter} ->
|
||||||
{ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}};
|
{ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}};
|
||||||
Err = {error, _} ->
|
Error = {error, _, _} ->
|
||||||
Err
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) ->
|
-spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) ->
|
||||||
emqx_ds:make_iterator_result(iterator()).
|
emqx_ds:make_iterator_result(iterator()).
|
||||||
update_iterator(DB, OldIter, DSKey) ->
|
update_iterator(DB, OldIter, DSKey) ->
|
||||||
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
|
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
|
||||||
try ra_update_iterator(DB, Shard, StorageIter, DSKey) of
|
case ra_update_iterator(DB, Shard, StorageIter, DSKey) of
|
||||||
{ok, Iter} ->
|
{ok, Iter} ->
|
||||||
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
|
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
|
||||||
Error = {error, _, _} ->
|
Error = {error, _, _} ->
|
||||||
Error
|
Error
|
||||||
catch
|
|
||||||
error:RPCError = {erpc, _} ->
|
|
||||||
{error, recoverable, RPCError}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
|
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
|
||||||
|
@ -354,6 +375,19 @@ foreach_shard(DB, Fun) ->
|
||||||
%% Internal exports (RPC targets)
|
%% Internal exports (RPC targets)
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
|
%% NOTE
|
||||||
|
%% Target node may still be in the process of starting up when RPCs arrive, it's
|
||||||
|
%% good to have them handled gracefully.
|
||||||
|
%% TODO
|
||||||
|
%% There's a possibility of race condition: storage may shut down right after we
|
||||||
|
%% ask for its status.
|
||||||
|
-define(IF_STORAGE_RUNNING(SHARDID, EXPR),
|
||||||
|
case emqx_ds_storage_layer:shard_info(SHARDID, status) of
|
||||||
|
running -> EXPR;
|
||||||
|
down -> {error, recoverable, storage_down}
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
-spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}.
|
-spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}.
|
||||||
do_drop_db_v1(DB) ->
|
do_drop_db_v1(DB) ->
|
||||||
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
||||||
|
@ -371,10 +405,9 @@ do_drop_db_v1(DB) ->
|
||||||
batch(),
|
batch(),
|
||||||
emqx_ds:message_store_opts()
|
emqx_ds:message_store_opts()
|
||||||
) ->
|
) ->
|
||||||
emqx_ds:store_batch_result().
|
no_return().
|
||||||
do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) ->
|
do_store_batch_v1(_DB, _Shard, _Batch, _Options) ->
|
||||||
Batch = [{emqx_message:timestamp(Message), Message} || Message <- Messages],
|
error(obsolete_api).
|
||||||
emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options).
|
|
||||||
|
|
||||||
%% Remove me in EMQX 5.6
|
%% Remove me in EMQX 5.6
|
||||||
-dialyzer({nowarn_function, do_get_streams_v1/4}).
|
-dialyzer({nowarn_function, do_get_streams_v1/4}).
|
||||||
|
@ -386,11 +419,18 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
|
||||||
error(obsolete_api).
|
error(obsolete_api).
|
||||||
|
|
||||||
-spec do_get_streams_v2(
|
-spec do_get_streams_v2(
|
||||||
emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
emqx_ds:db(),
|
||||||
|
emqx_ds_replication_layer:shard_id(),
|
||||||
|
emqx_ds:topic_filter(),
|
||||||
|
emqx_ds:time()
|
||||||
) ->
|
) ->
|
||||||
[{integer(), emqx_ds_storage_layer:stream()}].
|
[{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down).
|
||||||
do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
|
do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
|
||||||
emqx_ds_storage_layer:get_streams({DB, Shard}, TopicFilter, StartTime).
|
ShardId = {DB, Shard},
|
||||||
|
?IF_STORAGE_RUNNING(
|
||||||
|
ShardId,
|
||||||
|
emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
|
||||||
|
).
|
||||||
|
|
||||||
-dialyzer({nowarn_function, do_make_iterator_v1/5}).
|
-dialyzer({nowarn_function, do_make_iterator_v1/5}).
|
||||||
-spec do_make_iterator_v1(
|
-spec do_make_iterator_v1(
|
||||||
|
@ -413,7 +453,11 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
|
||||||
) ->
|
) ->
|
||||||
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
|
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
|
||||||
do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
|
do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
|
||||||
emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime).
|
ShardId = {DB, Shard},
|
||||||
|
?IF_STORAGE_RUNNING(
|
||||||
|
ShardId,
|
||||||
|
emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
|
||||||
|
).
|
||||||
|
|
||||||
-spec do_make_delete_iterator_v4(
|
-spec do_make_delete_iterator_v4(
|
||||||
emqx_ds:db(),
|
emqx_ds:db(),
|
||||||
|
@ -434,9 +478,7 @@ do_make_delete_iterator_v4(DB, Shard, Stream, TopicFilter, StartTime) ->
|
||||||
) ->
|
) ->
|
||||||
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
|
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
|
||||||
do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
|
do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
|
||||||
emqx_ds_storage_layer:update_iterator(
|
emqx_ds_storage_layer:update_iterator({DB, Shard}, OldIter, DSKey).
|
||||||
{DB, Shard}, OldIter, DSKey
|
|
||||||
).
|
|
||||||
|
|
||||||
-spec do_next_v1(
|
-spec do_next_v1(
|
||||||
emqx_ds:db(),
|
emqx_ds:db(),
|
||||||
|
@ -446,7 +488,11 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
|
||||||
) ->
|
) ->
|
||||||
emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
|
emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
|
||||||
do_next_v1(DB, Shard, Iter, BatchSize) ->
|
do_next_v1(DB, Shard, Iter, BatchSize) ->
|
||||||
emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize).
|
ShardId = {DB, Shard},
|
||||||
|
?IF_STORAGE_RUNNING(
|
||||||
|
ShardId,
|
||||||
|
emqx_ds_storage_layer:next(ShardId, Iter, BatchSize)
|
||||||
|
).
|
||||||
|
|
||||||
-spec do_delete_next_v4(
|
-spec do_delete_next_v4(
|
||||||
emqx_ds:db(),
|
emqx_ds:db(),
|
||||||
|
@ -464,14 +510,19 @@ do_add_generation_v2(_DB) ->
|
||||||
error(obsolete_api).
|
error(obsolete_api).
|
||||||
|
|
||||||
-spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) ->
|
-spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) ->
|
||||||
#{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}.
|
#{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}
|
||||||
do_list_generations_with_lifetimes_v3(DB, ShardId) ->
|
| emqx_ds:error(storage_down).
|
||||||
emqx_ds_storage_layer:list_generations_with_lifetimes({DB, ShardId}).
|
do_list_generations_with_lifetimes_v3(DB, Shard) ->
|
||||||
|
ShardId = {DB, Shard},
|
||||||
|
?IF_STORAGE_RUNNING(
|
||||||
|
ShardId,
|
||||||
|
emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
|
||||||
|
).
|
||||||
|
|
||||||
-spec do_drop_generation_v3(emqx_ds:db(), shard_id(), emqx_ds_storage_layer:gen_id()) ->
|
-spec do_drop_generation_v3(emqx_ds:db(), shard_id(), emqx_ds_storage_layer:gen_id()) ->
|
||||||
ok | {error, _}.
|
no_return().
|
||||||
do_drop_generation_v3(DB, ShardId, GenId) ->
|
do_drop_generation_v3(_DB, _ShardId, _GenId) ->
|
||||||
emqx_ds_storage_layer:drop_generation({DB, ShardId}, GenId).
|
error(obsolete_api).
|
||||||
|
|
||||||
-spec do_get_delete_streams_v4(
|
-spec do_get_delete_streams_v4(
|
||||||
emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
||||||
|
@ -491,6 +542,15 @@ list_nodes() ->
|
||||||
%% Too large for normal operation, need better backpressure mechanism.
|
%% Too large for normal operation, need better backpressure mechanism.
|
||||||
-define(RA_TIMEOUT, 60 * 1000).
|
-define(RA_TIMEOUT, 60 * 1000).
|
||||||
|
|
||||||
|
-define(SAFERPC(EXPR),
|
||||||
|
try
|
||||||
|
EXPR
|
||||||
|
catch
|
||||||
|
error:RPCError = {erpc, _} ->
|
||||||
|
{error, recoverable, RPCError}
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
ra_store_batch(DB, Shard, Messages) ->
|
ra_store_batch(DB, Shard, Messages) ->
|
||||||
Command = #{
|
Command = #{
|
||||||
?tag => ?BATCH,
|
?tag => ?BATCH,
|
||||||
|
@ -544,24 +604,25 @@ ra_drop_generation(DB, Shard, GenId) ->
|
||||||
ra_get_streams(DB, Shard, TopicFilter, Time) ->
|
ra_get_streams(DB, Shard, TopicFilter, Time) ->
|
||||||
{_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
{_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
||||||
TimestampUs = timestamp_to_timeus(Time),
|
TimestampUs = timestamp_to_timeus(Time),
|
||||||
emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs).
|
?SAFERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs)).
|
||||||
|
|
||||||
ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
|
ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
|
||||||
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
||||||
emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time).
|
?SAFERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time)).
|
||||||
|
|
||||||
ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
|
ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
|
||||||
{_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
{_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
||||||
TimestampUs = timestamp_to_timeus(StartTime),
|
TimeUs = timestamp_to_timeus(StartTime),
|
||||||
emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimestampUs).
|
?SAFERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
|
||||||
|
|
||||||
ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
|
ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
|
||||||
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
||||||
emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
|
TimeUs = timestamp_to_timeus(StartTime),
|
||||||
|
?SAFERPC(emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
|
||||||
|
|
||||||
ra_update_iterator(DB, Shard, Iter, DSKey) ->
|
ra_update_iterator(DB, Shard, Iter, DSKey) ->
|
||||||
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
||||||
emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey).
|
?SAFERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)).
|
||||||
|
|
||||||
ra_next(DB, Shard, Iter, BatchSize) ->
|
ra_next(DB, Shard, Iter, BatchSize) ->
|
||||||
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
||||||
|
@ -573,25 +634,32 @@ ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
|
||||||
|
|
||||||
ra_list_generations_with_lifetimes(DB, Shard) ->
|
ra_list_generations_with_lifetimes(DB, Shard) ->
|
||||||
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
|
||||||
Gens = emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard),
|
case ?SAFERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)) of
|
||||||
maps:map(
|
Gens = #{} ->
|
||||||
fun(_GenId, Data = #{since := Since, until := Until}) ->
|
maps:map(
|
||||||
Data#{
|
fun(_GenId, Data = #{since := Since, until := Until}) ->
|
||||||
since := timeus_to_timestamp(Since),
|
Data#{
|
||||||
until := emqx_maybe:apply(fun timeus_to_timestamp/1, Until)
|
since := timeus_to_timestamp(Since),
|
||||||
}
|
until := emqx_maybe:apply(fun timeus_to_timestamp/1, Until)
|
||||||
end,
|
}
|
||||||
Gens
|
end,
|
||||||
).
|
Gens
|
||||||
|
);
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
ra_drop_shard(DB, Shard) ->
|
ra_drop_shard(DB, Shard) ->
|
||||||
ra:delete_cluster(emqx_ds_replication_layer_shard:shard_servers(DB, Shard), ?RA_TIMEOUT).
|
ra:delete_cluster(emqx_ds_replication_layer_shard:shard_servers(DB, Shard), ?RA_TIMEOUT).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
-spec init(_Args :: map()) -> ra_state().
|
||||||
init(#{db := DB, shard := Shard}) ->
|
init(#{db := DB, shard := Shard}) ->
|
||||||
#{db_shard => {DB, Shard}, latest => 0}.
|
#{db_shard => {DB, Shard}, latest => 0}.
|
||||||
|
|
||||||
|
-spec apply(ra_machine:command_meta_data(), ra_command(), ra_state()) ->
|
||||||
|
{ra_state(), _Reply, _Effects}.
|
||||||
apply(
|
apply(
|
||||||
#{index := RaftIdx},
|
#{index := RaftIdx},
|
||||||
#{
|
#{
|
||||||
|
@ -671,3 +739,6 @@ timestamp_to_timeus(TimestampMs) ->
|
||||||
|
|
||||||
timeus_to_timestamp(TimestampUs) ->
|
timeus_to_timestamp(TimestampUs) ->
|
||||||
TimestampUs div 1000.
|
TimestampUs div 1000.
|
||||||
|
|
||||||
|
snapshot_module() ->
|
||||||
|
emqx_ds_replication_snapshot.
|
||||||
|
|
|
@ -130,17 +130,40 @@ print_status() ->
|
||||||
eval_qlc(mnesia:table(?NODE_TAB))
|
eval_qlc(mnesia:table(?NODE_TAB))
|
||||||
),
|
),
|
||||||
io:format(
|
io:format(
|
||||||
"~nSHARDS:~nId Replicas~n", []
|
"~nSHARDS:~n~s~s~s~n", [
|
||||||
|
string:pad("Id", 30),
|
||||||
|
string:pad("Leader", 24),
|
||||||
|
string:pad("Replicas", 60)
|
||||||
|
]
|
||||||
),
|
),
|
||||||
|
Leaderboard = ra_leaderboard:overview(),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(#?SHARD_TAB{shard = {DB, Shard}, replica_set = RS}) ->
|
fun(#?SHARD_TAB{shard = {DB, Shard}, replica_set = RS}) ->
|
||||||
ShardStr = string:pad(io_lib:format("~p/~s", [DB, Shard]), 30),
|
ShardStr = string:pad(io_lib:format("~p/~s", [DB, Shard]), 30),
|
||||||
ReplicasStr = string:pad(io_lib:format("~p", [RS]), 40),
|
LeaderStr = string:pad(print_leader({DB, Shard}, RS, Leaderboard), 24),
|
||||||
io:format("~s ~s~n", [ShardStr, ReplicasStr])
|
ReplicasStr = string:pad([[R, " "] || R <- RS], 60),
|
||||||
|
io:format("~s~s~s~n", [ShardStr, LeaderStr, ReplicasStr])
|
||||||
end,
|
end,
|
||||||
eval_qlc(mnesia:table(?SHARD_TAB))
|
eval_qlc(mnesia:table(?SHARD_TAB))
|
||||||
).
|
).
|
||||||
|
|
||||||
|
print_leader({DB, Shard}, Sites, Leaderboard) ->
|
||||||
|
ClusterName = emqx_ds_replication_layer_shard:cluster_name(DB, Shard),
|
||||||
|
Servers = [
|
||||||
|
{{emqx_ds_replication_layer_shard:server_name(DB, Shard, S), ?MODULE:node(S)}, S}
|
||||||
|
|| S <- Sites
|
||||||
|
],
|
||||||
|
case lists:keyfind(ClusterName, 1, Leaderboard) of
|
||||||
|
{ClusterName, Leader, _Members} ->
|
||||||
|
case lists:keyfind(Leader, 1, Servers) of
|
||||||
|
{{_Name, Node}, Site} when Node == node() -> [Site, " *"];
|
||||||
|
{{_Name, _}, Site} -> Site;
|
||||||
|
false -> "X"
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
"?"
|
||||||
|
end.
|
||||||
|
|
||||||
-spec this_site() -> site().
|
-spec this_site() -> site().
|
||||||
this_site() ->
|
this_site() ->
|
||||||
persistent_term:get(?emqx_ds_builtin_site).
|
persistent_term:get(?emqx_ds_builtin_site).
|
||||||
|
|
|
@ -21,7 +21,9 @@
|
||||||
%% Static server configuration
|
%% Static server configuration
|
||||||
-export([
|
-export([
|
||||||
shard_servers/2,
|
shard_servers/2,
|
||||||
local_server/2
|
local_server/2,
|
||||||
|
cluster_name/2,
|
||||||
|
server_name/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Dynamic server location API
|
%% Dynamic server location API
|
||||||
|
@ -80,7 +82,7 @@ get_servers_leader_preferred(DB, Shard) ->
|
||||||
[Leader | lists:delete(Leader, Servers)];
|
[Leader | lists:delete(Leader, Servers)];
|
||||||
undefined ->
|
undefined ->
|
||||||
%% TODO: Dynamic membership.
|
%% TODO: Dynamic membership.
|
||||||
get_shard_servers(DB, Shard)
|
get_online_servers(DB, Shard)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_server_local_preferred(DB, Shard) ->
|
get_server_local_preferred(DB, Shard) ->
|
||||||
|
@ -95,9 +97,27 @@ get_server_local_preferred(DB, Shard) ->
|
||||||
%% Leader is unkonwn if there are no servers of this group on the
|
%% Leader is unkonwn if there are no servers of this group on the
|
||||||
%% local node. We want to pick a replica in that case as well.
|
%% local node. We want to pick a replica in that case as well.
|
||||||
%% TODO: Dynamic membership.
|
%% TODO: Dynamic membership.
|
||||||
pick_random(get_shard_servers(DB, Shard))
|
pick_random(get_online_servers(DB, Shard))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
get_online_servers(DB, Shard) ->
|
||||||
|
filter_online(get_shard_servers(DB, Shard)).
|
||||||
|
|
||||||
|
filter_online(Servers) ->
|
||||||
|
case lists:filter(fun is_server_online/1, Servers) of
|
||||||
|
[] ->
|
||||||
|
%% FIXME
|
||||||
|
Servers;
|
||||||
|
Online ->
|
||||||
|
Online
|
||||||
|
end.
|
||||||
|
|
||||||
|
is_server_online({_Name, Node}) ->
|
||||||
|
is_node_online(Node).
|
||||||
|
|
||||||
|
is_node_online(Node) ->
|
||||||
|
Node == node() orelse lists:member(Node, nodes()).
|
||||||
|
|
||||||
pick_local(Servers) ->
|
pick_local(Servers) ->
|
||||||
case lists:dropwhile(fun({_Name, Node}) -> Node =/= node() end, Servers) of
|
case lists:dropwhile(fun({_Name, Node}) -> Node =/= node() end, Servers) of
|
||||||
[Local | _] ->
|
[Local | _] ->
|
||||||
|
@ -147,19 +167,21 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
|
||||||
Bootstrap = false;
|
Bootstrap = false;
|
||||||
{error, name_not_registered} ->
|
{error, name_not_registered} ->
|
||||||
Bootstrap = true,
|
Bootstrap = true,
|
||||||
|
Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
|
||||||
|
LogOpts = maps:with(
|
||||||
|
[
|
||||||
|
snapshot_interval,
|
||||||
|
resend_window
|
||||||
|
],
|
||||||
|
ReplicationOpts
|
||||||
|
),
|
||||||
ok = ra:start_server(DB, #{
|
ok = ra:start_server(DB, #{
|
||||||
id => LocalServer,
|
id => LocalServer,
|
||||||
uid => <<ClusterName/binary, "_", Site/binary>>,
|
uid => <<ClusterName/binary, "_", Site/binary>>,
|
||||||
cluster_name => ClusterName,
|
cluster_name => ClusterName,
|
||||||
initial_members => Servers,
|
initial_members => Servers,
|
||||||
machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
|
machine => Machine,
|
||||||
log_init_args => maps:with(
|
log_init_args => LogOpts
|
||||||
[
|
|
||||||
snapshot_interval,
|
|
||||||
resend_window
|
|
||||||
],
|
|
||||||
ReplicationOpts
|
|
||||||
)
|
|
||||||
})
|
})
|
||||||
end,
|
end,
|
||||||
case Servers of
|
case Servers of
|
||||||
|
|
|
@ -0,0 +1,229 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_ds_replication_snapshot).
|
||||||
|
|
||||||
|
-include_lib("snabbkaffe/include/trace.hrl").
|
||||||
|
|
||||||
|
-behaviour(ra_snapshot).
|
||||||
|
-export([
|
||||||
|
prepare/2,
|
||||||
|
write/3,
|
||||||
|
|
||||||
|
begin_read/2,
|
||||||
|
read_chunk/3,
|
||||||
|
|
||||||
|
begin_accept/2,
|
||||||
|
accept_chunk/2,
|
||||||
|
complete_accept/2,
|
||||||
|
|
||||||
|
recover/1,
|
||||||
|
validate/1,
|
||||||
|
read_meta/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Read state.
|
||||||
|
-record(rs, {
|
||||||
|
phase :: machine_state | storage_snapshot,
|
||||||
|
started_at :: _Time :: integer(),
|
||||||
|
state :: emqx_ds_replication_layer:ra_state() | undefined,
|
||||||
|
reader :: emqx_ds_storage_snapshot:reader() | undefined
|
||||||
|
}).
|
||||||
|
|
||||||
|
%% Write state.
|
||||||
|
-record(ws, {
|
||||||
|
phase :: machine_state | storage_snapshot,
|
||||||
|
started_at :: _Time :: integer(),
|
||||||
|
dir :: file:filename(),
|
||||||
|
meta :: ra_snapshot:meta(),
|
||||||
|
state :: emqx_ds_replication_layer:ra_state() | undefined,
|
||||||
|
writer :: emqx_ds_storage_snapshot:writer() | undefined
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type rs() :: #rs{}.
|
||||||
|
-type ws() :: #ws{}.
|
||||||
|
|
||||||
|
-type ra_state() :: emqx_ds_replication_layer:ra_state().
|
||||||
|
|
||||||
|
%% Writing a snapshot.
|
||||||
|
%% This process is exactly the same as writing a ra log snapshot: store the
|
||||||
|
%% log meta and the machine state in a single snapshot file.
|
||||||
|
|
||||||
|
-spec prepare(_RaftIndex, ra_state()) -> _State :: ra_state().
|
||||||
|
prepare(Index, State) ->
|
||||||
|
ra_log_snapshot:prepare(Index, State).
|
||||||
|
|
||||||
|
-spec write(_SnapshotDir :: file:filename(), ra_snapshot:meta(), _State :: ra_state()) ->
|
||||||
|
ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}.
|
||||||
|
write(Dir, Meta, MachineState) ->
|
||||||
|
ra_log_snapshot:write(Dir, Meta, MachineState).
|
||||||
|
|
||||||
|
%% Reading a snapshot.
|
||||||
|
%% This is triggered by the leader when it finds out that a follower is
|
||||||
|
%% behind so much that there are no log segments covering the gap anymore.
|
||||||
|
%% This process, on the other hand, MUST involve reading the storage snapshot,
|
||||||
|
%% (in addition to the log snapshot) to reconstruct the storage state on the
|
||||||
|
%% target node.
|
||||||
|
|
||||||
|
-spec begin_read(_SnapshotDir :: file:filename(), _Context :: #{}) ->
|
||||||
|
{ok, ra_snapshot:meta(), rs()} | {error, _Reason :: term()}.
|
||||||
|
begin_read(Dir, _Context) ->
|
||||||
|
RS = #rs{
|
||||||
|
phase = machine_state,
|
||||||
|
started_at = erlang:monotonic_time(millisecond)
|
||||||
|
},
|
||||||
|
case ra_log_snapshot:recover(Dir) of
|
||||||
|
{ok, Meta, MachineState} ->
|
||||||
|
start_snapshot_reader(Meta, RS#rs{state = MachineState});
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
start_snapshot_reader(Meta, RS) ->
|
||||||
|
ShardId = shard_id(RS),
|
||||||
|
logger:info(#{
|
||||||
|
msg => "dsrepl_snapshot_read_started",
|
||||||
|
shard => ShardId
|
||||||
|
}),
|
||||||
|
{ok, SnapReader} = emqx_ds_storage_layer:take_snapshot(ShardId),
|
||||||
|
{ok, Meta, RS#rs{reader = SnapReader}}.
|
||||||
|
|
||||||
|
-spec read_chunk(rs(), _Size :: non_neg_integer(), _SnapshotDir :: file:filename()) ->
|
||||||
|
{ok, binary(), {next, rs()} | last} | {error, _Reason :: term()}.
|
||||||
|
read_chunk(RS = #rs{phase = machine_state, state = MachineState}, _Size, _Dir) ->
|
||||||
|
Chunk = term_to_binary(MachineState),
|
||||||
|
{ok, Chunk, {next, RS#rs{phase = storage_snapshot}}};
|
||||||
|
read_chunk(RS = #rs{phase = storage_snapshot, reader = SnapReader0}, Size, _Dir) ->
|
||||||
|
case emqx_ds_storage_snapshot:read_chunk(SnapReader0, Size) of
|
||||||
|
{next, Chunk, SnapReader} ->
|
||||||
|
{ok, Chunk, {next, RS#rs{reader = SnapReader}}};
|
||||||
|
{last, Chunk, SnapReader} ->
|
||||||
|
%% TODO: idempotence?
|
||||||
|
?tp(dsrepl_snapshot_read_complete, #{reader => SnapReader}),
|
||||||
|
_ = complete_read(RS#rs{reader = SnapReader}),
|
||||||
|
{ok, Chunk, last};
|
||||||
|
{error, Reason} ->
|
||||||
|
?tp(dsrepl_snapshot_read_error, #{reason => Reason, reader => SnapReader0}),
|
||||||
|
_ = emqx_ds_storage_snapshot:release_reader(SnapReader0),
|
||||||
|
error(Reason)
|
||||||
|
end.
|
||||||
|
|
||||||
|
complete_read(RS = #rs{reader = SnapReader, started_at = StartedAt}) ->
|
||||||
|
_ = emqx_ds_storage_snapshot:release_reader(SnapReader),
|
||||||
|
logger:info(#{
|
||||||
|
msg => "dsrepl_snapshot_read_complete",
|
||||||
|
shard => shard_id(RS),
|
||||||
|
duration_ms => erlang:monotonic_time(millisecond) - StartedAt,
|
||||||
|
read_bytes => emqx_ds_storage_snapshot:reader_info(bytes_read, SnapReader)
|
||||||
|
}).
|
||||||
|
|
||||||
|
%% Accepting a snapshot.
|
||||||
|
%% This process is triggered by the target server, when the leader finds out
|
||||||
|
%% that the target server is severely lagging behind. This is receiving side of
|
||||||
|
%% `begin_read/2` and `read_chunk/3`.
|
||||||
|
|
||||||
|
-spec begin_accept(_SnapshotDir :: file:filename(), ra_snapshot:meta()) ->
|
||||||
|
{ok, ws()}.
|
||||||
|
begin_accept(Dir, Meta) ->
|
||||||
|
WS = #ws{
|
||||||
|
phase = machine_state,
|
||||||
|
started_at = erlang:monotonic_time(millisecond),
|
||||||
|
dir = Dir,
|
||||||
|
meta = Meta
|
||||||
|
},
|
||||||
|
{ok, WS}.
|
||||||
|
|
||||||
|
-spec accept_chunk(binary(), ws()) ->
|
||||||
|
{ok, ws()} | {error, _Reason :: term()}.
|
||||||
|
accept_chunk(Chunk, WS = #ws{phase = machine_state}) ->
|
||||||
|
MachineState = binary_to_term(Chunk),
|
||||||
|
start_snapshot_writer(WS#ws{state = MachineState});
|
||||||
|
accept_chunk(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) ->
|
||||||
|
%% TODO: idempotence?
|
||||||
|
case emqx_ds_storage_snapshot:write_chunk(SnapWriter0, Chunk) of
|
||||||
|
{next, SnapWriter} ->
|
||||||
|
{ok, WS#ws{writer = SnapWriter}};
|
||||||
|
{error, Reason} ->
|
||||||
|
?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}),
|
||||||
|
_ = emqx_ds_storage_snapshot:abort_writer(SnapWriter0),
|
||||||
|
error(Reason)
|
||||||
|
end.
|
||||||
|
|
||||||
|
start_snapshot_writer(WS) ->
|
||||||
|
ShardId = shard_id(WS),
|
||||||
|
logger:info(#{
|
||||||
|
msg => "dsrepl_snapshot_write_started",
|
||||||
|
shard => ShardId
|
||||||
|
}),
|
||||||
|
_ = emqx_ds_builtin_db_sup:terminate_storage(ShardId),
|
||||||
|
{ok, SnapWriter} = emqx_ds_storage_layer:accept_snapshot(ShardId),
|
||||||
|
{ok, WS#ws{phase = storage_snapshot, writer = SnapWriter}}.
|
||||||
|
|
||||||
|
-spec complete_accept(ws()) -> ok | {error, ra_snapshot:file_err()}.
|
||||||
|
complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) ->
|
||||||
|
%% TODO: idempotence?
|
||||||
|
case emqx_ds_storage_snapshot:write_chunk(SnapWriter0, Chunk) of
|
||||||
|
{last, SnapWriter} ->
|
||||||
|
?tp(dsrepl_snapshot_write_complete, #{writer => SnapWriter}),
|
||||||
|
_ = emqx_ds_storage_snapshot:release_writer(SnapWriter),
|
||||||
|
Result = complete_accept(WS#ws{writer = SnapWriter}),
|
||||||
|
?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS)}),
|
||||||
|
Result;
|
||||||
|
{error, Reason} ->
|
||||||
|
?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}),
|
||||||
|
_ = emqx_ds_storage_snapshot:abort_writer(SnapWriter0),
|
||||||
|
error(Reason)
|
||||||
|
end.
|
||||||
|
|
||||||
|
complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) ->
|
||||||
|
ShardId = shard_id(WS),
|
||||||
|
logger:info(#{
|
||||||
|
msg => "dsrepl_snapshot_read_complete",
|
||||||
|
shard => ShardId,
|
||||||
|
duration_ms => erlang:monotonic_time(millisecond) - StartedAt,
|
||||||
|
bytes_written => emqx_ds_storage_snapshot:writer_info(bytes_written, SnapWriter)
|
||||||
|
}),
|
||||||
|
{ok, _} = emqx_ds_builtin_db_sup:restart_storage(ShardId),
|
||||||
|
write_machine_snapshot(WS).
|
||||||
|
|
||||||
|
write_machine_snapshot(#ws{dir = Dir, meta = Meta, state = MachineState}) ->
|
||||||
|
write(Dir, Meta, MachineState).
|
||||||
|
|
||||||
|
%% Restoring machine state from a snapshot.
|
||||||
|
%% This is equivalent to restoring from a log snapshot.
|
||||||
|
|
||||||
|
-spec recover(_SnapshotDir :: file:filename()) ->
|
||||||
|
{ok, ra_snapshot:meta(), ra_state()} | {error, _Reason}.
|
||||||
|
recover(Dir) ->
|
||||||
|
%% TODO: Verify that storage layer is online?
|
||||||
|
ra_log_snapshot:recover(Dir).
|
||||||
|
|
||||||
|
-spec validate(_SnapshotDir :: file:filename()) ->
|
||||||
|
ok | {error, _Reason}.
|
||||||
|
validate(Dir) ->
|
||||||
|
ra_log_snapshot:validate(Dir).
|
||||||
|
|
||||||
|
-spec read_meta(_SnapshotDir :: file:filename()) ->
|
||||||
|
{ok, ra_snapshot:meta()} | {error, _Reason}.
|
||||||
|
read_meta(Dir) ->
|
||||||
|
ra_log_snapshot:read_meta(Dir).
|
||||||
|
|
||||||
|
shard_id(#rs{state = MachineState}) ->
|
||||||
|
shard_id(MachineState);
|
||||||
|
shard_id(#ws{state = MachineState}) ->
|
||||||
|
shard_id(MachineState);
|
||||||
|
shard_id(MachineState) ->
|
||||||
|
maps:get(db_shard, MachineState).
|
|
@ -19,8 +19,12 @@
|
||||||
|
|
||||||
%% Replication layer API:
|
%% Replication layer API:
|
||||||
-export([
|
-export([
|
||||||
open_shard/2,
|
%% Lifecycle
|
||||||
|
start_link/2,
|
||||||
drop_shard/1,
|
drop_shard/1,
|
||||||
|
shard_info/2,
|
||||||
|
|
||||||
|
%% Data
|
||||||
store_batch/3,
|
store_batch/3,
|
||||||
get_streams/3,
|
get_streams/3,
|
||||||
get_delete_streams/3,
|
get_delete_streams/3,
|
||||||
|
@ -29,14 +33,20 @@
|
||||||
update_iterator/3,
|
update_iterator/3,
|
||||||
next/3,
|
next/3,
|
||||||
delete_next/4,
|
delete_next/4,
|
||||||
|
|
||||||
|
%% Generations
|
||||||
update_config/3,
|
update_config/3,
|
||||||
add_generation/2,
|
add_generation/2,
|
||||||
list_generations_with_lifetimes/1,
|
list_generations_with_lifetimes/1,
|
||||||
drop_generation/2
|
drop_generation/2,
|
||||||
|
|
||||||
|
%% Snapshotting
|
||||||
|
take_snapshot/1,
|
||||||
|
accept_snapshot/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% gen_server
|
%% gen_server
|
||||||
-export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
||||||
|
|
||||||
%% internal exports:
|
%% internal exports:
|
||||||
-export([db_dir/1]).
|
-export([db_dir/1]).
|
||||||
|
@ -229,10 +239,7 @@
|
||||||
-record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}).
|
-record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}).
|
||||||
-record(call_list_generations_with_lifetimes, {}).
|
-record(call_list_generations_with_lifetimes, {}).
|
||||||
-record(call_drop_generation, {gen_id :: gen_id()}).
|
-record(call_drop_generation, {gen_id :: gen_id()}).
|
||||||
|
-record(call_take_snapshot, {}).
|
||||||
-spec open_shard(shard_id(), options()) -> ok.
|
|
||||||
open_shard(Shard, Options) ->
|
|
||||||
emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
|
|
||||||
|
|
||||||
-spec drop_shard(shard_id()) -> ok.
|
-spec drop_shard(shard_id()) -> ok.
|
||||||
drop_shard(Shard) ->
|
drop_shard(Shard) ->
|
||||||
|
@ -244,11 +251,13 @@ drop_shard(Shard) ->
|
||||||
emqx_ds:message_store_opts()
|
emqx_ds:message_store_opts()
|
||||||
) ->
|
) ->
|
||||||
emqx_ds:store_batch_result().
|
emqx_ds:store_batch_result().
|
||||||
store_batch(Shard, Messages, Options) ->
|
store_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
|
||||||
%% We always store messages in the current generation:
|
%% NOTE
|
||||||
GenId = generation_current(Shard),
|
%% We assume that batches do not span generations. Callers should enforce this.
|
||||||
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
|
#{module := Mod, data := GenData} = generation_at(Shard, Time),
|
||||||
Mod:store_batch(Shard, GenData, Messages, Options).
|
Mod:store_batch(Shard, GenData, Messages, Options);
|
||||||
|
store_batch(_Shard, [], _Options) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
-spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
-spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||||
[{integer(), stream()}].
|
[{integer(), stream()}].
|
||||||
|
@ -258,14 +267,14 @@ get_streams(Shard, TopicFilter, StartTime) ->
|
||||||
lists:flatmap(
|
lists:flatmap(
|
||||||
fun(GenId) ->
|
fun(GenId) ->
|
||||||
?tp(get_streams_get_gen, #{gen_id => GenId}),
|
?tp(get_streams_get_gen, #{gen_id => GenId}),
|
||||||
case generation_get_safe(Shard, GenId) of
|
case generation_get(Shard, GenId) of
|
||||||
{ok, #{module := Mod, data := GenData}} ->
|
#{module := Mod, data := GenData} ->
|
||||||
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
|
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
|
||||||
[
|
[
|
||||||
{GenId, ?stream_v2(GenId, InnerStream)}
|
{GenId, ?stream_v2(GenId, InnerStream)}
|
||||||
|| InnerStream <- Streams
|
|| InnerStream <- Streams
|
||||||
];
|
];
|
||||||
{error, not_found} ->
|
not_found ->
|
||||||
%% race condition: generation was dropped before getting its streams?
|
%% race condition: generation was dropped before getting its streams?
|
||||||
[]
|
[]
|
||||||
end
|
end
|
||||||
|
@ -281,14 +290,14 @@ get_delete_streams(Shard, TopicFilter, StartTime) ->
|
||||||
lists:flatmap(
|
lists:flatmap(
|
||||||
fun(GenId) ->
|
fun(GenId) ->
|
||||||
?tp(get_streams_get_gen, #{gen_id => GenId}),
|
?tp(get_streams_get_gen, #{gen_id => GenId}),
|
||||||
case generation_get_safe(Shard, GenId) of
|
case generation_get(Shard, GenId) of
|
||||||
{ok, #{module := Mod, data := GenData}} ->
|
#{module := Mod, data := GenData} ->
|
||||||
Streams = Mod:get_delete_streams(Shard, GenData, TopicFilter, StartTime),
|
Streams = Mod:get_delete_streams(Shard, GenData, TopicFilter, StartTime),
|
||||||
[
|
[
|
||||||
?delete_stream(GenId, InnerStream)
|
?delete_stream(GenId, InnerStream)
|
||||||
|| InnerStream <- Streams
|
|| InnerStream <- Streams
|
||||||
];
|
];
|
||||||
{error, not_found} ->
|
not_found ->
|
||||||
%% race condition: generation was dropped before getting its streams?
|
%% race condition: generation was dropped before getting its streams?
|
||||||
[]
|
[]
|
||||||
end
|
end
|
||||||
|
@ -301,8 +310,8 @@ get_delete_streams(Shard, TopicFilter, StartTime) ->
|
||||||
make_iterator(
|
make_iterator(
|
||||||
Shard, ?stream_v2(GenId, Stream), TopicFilter, StartTime
|
Shard, ?stream_v2(GenId, Stream), TopicFilter, StartTime
|
||||||
) ->
|
) ->
|
||||||
case generation_get_safe(Shard, GenId) of
|
case generation_get(Shard, GenId) of
|
||||||
{ok, #{module := Mod, data := GenData}} ->
|
#{module := Mod, data := GenData} ->
|
||||||
case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
|
case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
|
||||||
{ok, Iter} ->
|
{ok, Iter} ->
|
||||||
{ok, #{
|
{ok, #{
|
||||||
|
@ -313,7 +322,7 @@ make_iterator(
|
||||||
{error, _} = Err ->
|
{error, _} = Err ->
|
||||||
Err
|
Err
|
||||||
end;
|
end;
|
||||||
{error, not_found} ->
|
not_found ->
|
||||||
{error, unrecoverable, generation_not_found}
|
{error, unrecoverable, generation_not_found}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -322,8 +331,8 @@ make_iterator(
|
||||||
make_delete_iterator(
|
make_delete_iterator(
|
||||||
Shard, ?delete_stream(GenId, Stream), TopicFilter, StartTime
|
Shard, ?delete_stream(GenId, Stream), TopicFilter, StartTime
|
||||||
) ->
|
) ->
|
||||||
case generation_get_safe(Shard, GenId) of
|
case generation_get(Shard, GenId) of
|
||||||
{ok, #{module := Mod, data := GenData}} ->
|
#{module := Mod, data := GenData} ->
|
||||||
case Mod:make_delete_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
|
case Mod:make_delete_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
|
||||||
{ok, Iter} ->
|
{ok, Iter} ->
|
||||||
{ok, #{
|
{ok, #{
|
||||||
|
@ -334,7 +343,7 @@ make_delete_iterator(
|
||||||
{error, _} = Err ->
|
{error, _} = Err ->
|
||||||
Err
|
Err
|
||||||
end;
|
end;
|
||||||
{error, not_found} ->
|
not_found ->
|
||||||
{error, end_of_stream}
|
{error, end_of_stream}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -345,8 +354,8 @@ update_iterator(
|
||||||
#{?tag := ?IT, ?generation := GenId, ?enc := OldIter},
|
#{?tag := ?IT, ?generation := GenId, ?enc := OldIter},
|
||||||
DSKey
|
DSKey
|
||||||
) ->
|
) ->
|
||||||
case generation_get_safe(Shard, GenId) of
|
case generation_get(Shard, GenId) of
|
||||||
{ok, #{module := Mod, data := GenData}} ->
|
#{module := Mod, data := GenData} ->
|
||||||
case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of
|
case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of
|
||||||
{ok, Iter} ->
|
{ok, Iter} ->
|
||||||
{ok, #{
|
{ok, #{
|
||||||
|
@ -357,15 +366,15 @@ update_iterator(
|
||||||
{error, _} = Err ->
|
{error, _} = Err ->
|
||||||
Err
|
Err
|
||||||
end;
|
end;
|
||||||
{error, not_found} ->
|
not_found ->
|
||||||
{error, unrecoverable, generation_not_found}
|
{error, unrecoverable, generation_not_found}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec next(shard_id(), iterator(), pos_integer()) ->
|
-spec next(shard_id(), iterator(), pos_integer()) ->
|
||||||
emqx_ds:next_result(iterator()).
|
emqx_ds:next_result(iterator()).
|
||||||
next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->
|
next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->
|
||||||
case generation_get_safe(Shard, GenId) of
|
case generation_get(Shard, GenId) of
|
||||||
{ok, #{module := Mod, data := GenData}} ->
|
#{module := Mod, data := GenData} ->
|
||||||
Current = generation_current(Shard),
|
Current = generation_current(Shard),
|
||||||
case Mod:next(Shard, GenData, GenIter0, BatchSize) of
|
case Mod:next(Shard, GenData, GenIter0, BatchSize) of
|
||||||
{ok, _GenIter, []} when GenId < Current ->
|
{ok, _GenIter, []} when GenId < Current ->
|
||||||
|
@ -378,7 +387,7 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch
|
||||||
Error = {error, _, _} ->
|
Error = {error, _, _} ->
|
||||||
Error
|
Error
|
||||||
end;
|
end;
|
||||||
{error, not_found} ->
|
not_found ->
|
||||||
%% generation was possibly dropped by GC
|
%% generation was possibly dropped by GC
|
||||||
{error, unrecoverable, generation_not_found}
|
{error, unrecoverable, generation_not_found}
|
||||||
end.
|
end.
|
||||||
|
@ -391,8 +400,8 @@ delete_next(
|
||||||
Selector,
|
Selector,
|
||||||
BatchSize
|
BatchSize
|
||||||
) ->
|
) ->
|
||||||
case generation_get_safe(Shard, GenId) of
|
case generation_get(Shard, GenId) of
|
||||||
{ok, #{module := Mod, data := GenData}} ->
|
#{module := Mod, data := GenData} ->
|
||||||
Current = generation_current(Shard),
|
Current = generation_current(Shard),
|
||||||
case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of
|
case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of
|
||||||
{ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current ->
|
{ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current ->
|
||||||
|
@ -405,7 +414,7 @@ delete_next(
|
||||||
Error = {error, _} ->
|
Error = {error, _} ->
|
||||||
Error
|
Error
|
||||||
end;
|
end;
|
||||||
{error, not_found} ->
|
not_found ->
|
||||||
%% generation was possibly dropped by GC
|
%% generation was possibly dropped by GC
|
||||||
{ok, end_of_stream}
|
{ok, end_of_stream}
|
||||||
end.
|
end.
|
||||||
|
@ -436,6 +445,28 @@ list_generations_with_lifetimes(ShardId) ->
|
||||||
drop_generation(ShardId, GenId) ->
|
drop_generation(ShardId, GenId) ->
|
||||||
gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity).
|
gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity).
|
||||||
|
|
||||||
|
-spec take_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:reader()} | {error, _Reason}.
|
||||||
|
take_snapshot(ShardId) ->
|
||||||
|
case gen_server:call(?REF(ShardId), #call_take_snapshot{}, infinity) of
|
||||||
|
{ok, Dir} ->
|
||||||
|
emqx_ds_storage_snapshot:new_reader(Dir);
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec accept_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:writer()} | {error, _Reason}.
|
||||||
|
accept_snapshot(ShardId) ->
|
||||||
|
ok = drop_shard(ShardId),
|
||||||
|
handle_accept_snapshot(ShardId).
|
||||||
|
|
||||||
|
-spec shard_info(shard_id(), status) -> running | down.
|
||||||
|
shard_info(ShardId, status) ->
|
||||||
|
try get_schema_runtime(ShardId) of
|
||||||
|
#{} -> running
|
||||||
|
catch
|
||||||
|
error:badarg -> down
|
||||||
|
end.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% gen_server for the shard
|
%% gen_server for the shard
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -505,6 +536,9 @@ handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
|
||||||
{Reply, S} = handle_drop_generation(S0, GenId),
|
{Reply, S} = handle_drop_generation(S0, GenId),
|
||||||
commit_metadata(S),
|
commit_metadata(S),
|
||||||
{reply, Reply, S};
|
{reply, Reply, S};
|
||||||
|
handle_call(#call_take_snapshot{}, _From, S) ->
|
||||||
|
Snapshot = handle_take_snapshot(S),
|
||||||
|
{reply, Snapshot, S};
|
||||||
handle_call(_Call, _From, S) ->
|
handle_call(_Call, _From, S) ->
|
||||||
{reply, {error, unknown_call}, S}.
|
{reply, {error, unknown_call}, S}.
|
||||||
|
|
||||||
|
@ -671,7 +705,7 @@ create_new_shard_schema(ShardId, DB, CFRefs, Prototype) ->
|
||||||
{gen_id(), shard_schema(), cf_refs()}.
|
{gen_id(), shard_schema(), cf_refs()}.
|
||||||
new_generation(ShardId, DB, Schema0, Since) ->
|
new_generation(ShardId, DB, Schema0, Since) ->
|
||||||
#{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0,
|
#{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0,
|
||||||
GenId = PrevGenId + 1,
|
GenId = next_generation_id(PrevGenId),
|
||||||
{GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf),
|
{GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf),
|
||||||
GenSchema = #{
|
GenSchema = #{
|
||||||
module => Mod,
|
module => Mod,
|
||||||
|
@ -687,6 +721,14 @@ new_generation(ShardId, DB, Schema0, Since) ->
|
||||||
},
|
},
|
||||||
{GenId, Schema, NewCFRefs}.
|
{GenId, Schema, NewCFRefs}.
|
||||||
|
|
||||||
|
-spec next_generation_id(gen_id()) -> gen_id().
|
||||||
|
next_generation_id(GenId) ->
|
||||||
|
GenId + 1.
|
||||||
|
|
||||||
|
-spec prev_generation_id(gen_id()) -> gen_id().
|
||||||
|
prev_generation_id(GenId) when GenId > 0 ->
|
||||||
|
GenId - 1.
|
||||||
|
|
||||||
%% @doc Commit current state of the server to both rocksdb and the persistent term
|
%% @doc Commit current state of the server to both rocksdb and the persistent term
|
||||||
-spec commit_metadata(server_state()) -> ok.
|
-spec commit_metadata(server_state()) -> ok.
|
||||||
commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB}) ->
|
commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB}) ->
|
||||||
|
@ -726,7 +768,11 @@ rocksdb_open(Shard, Options) ->
|
||||||
|
|
||||||
-spec db_dir(shard_id()) -> file:filename().
|
-spec db_dir(shard_id()) -> file:filename().
|
||||||
db_dir({DB, ShardId}) ->
|
db_dir({DB, ShardId}) ->
|
||||||
filename:join([emqx_ds:base_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
|
filename:join([emqx_ds:base_dir(), DB, binary_to_list(ShardId)]).
|
||||||
|
|
||||||
|
-spec checkpoint_dir(shard_id(), _Name :: file:name()) -> file:filename().
|
||||||
|
checkpoint_dir({DB, ShardId}, Name) ->
|
||||||
|
filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId), Name]).
|
||||||
|
|
||||||
-spec update_last_until(Schema, emqx_ds:time()) ->
|
-spec update_last_until(Schema, emqx_ds:time()) ->
|
||||||
Schema | {error, exists | overlaps_existing_generations}
|
Schema | {error, exists | overlaps_existing_generations}
|
||||||
|
@ -759,6 +805,21 @@ run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) ->
|
||||||
%% Different implementation modules
|
%% Different implementation modules
|
||||||
NewGenData.
|
NewGenData.
|
||||||
|
|
||||||
|
handle_take_snapshot(#s{db = DB, shard_id = ShardId}) ->
|
||||||
|
Name = integer_to_list(erlang:system_time(millisecond)),
|
||||||
|
Dir = checkpoint_dir(ShardId, Name),
|
||||||
|
_ = filelib:ensure_dir(Dir),
|
||||||
|
case rocksdb:checkpoint(DB, Dir) of
|
||||||
|
ok ->
|
||||||
|
{ok, Dir};
|
||||||
|
{error, _} = Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
handle_accept_snapshot(ShardId) ->
|
||||||
|
Dir = db_dir(ShardId),
|
||||||
|
emqx_ds_storage_snapshot:new_writer(Dir).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------------
|
||||||
%% Schema access
|
%% Schema access
|
||||||
%%--------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------------
|
||||||
|
@ -768,18 +829,13 @@ generation_current(Shard) ->
|
||||||
#{current_generation := Current} = get_schema_runtime(Shard),
|
#{current_generation := Current} = get_schema_runtime(Shard),
|
||||||
Current.
|
Current.
|
||||||
|
|
||||||
-spec generation_get(shard_id(), gen_id()) -> generation().
|
-spec generation_get(shard_id(), gen_id()) -> generation() | not_found.
|
||||||
generation_get(Shard, GenId) ->
|
generation_get(Shard, GenId) ->
|
||||||
{ok, GenData} = generation_get_safe(Shard, GenId),
|
|
||||||
GenData.
|
|
||||||
|
|
||||||
-spec generation_get_safe(shard_id(), gen_id()) -> {ok, generation()} | {error, not_found}.
|
|
||||||
generation_get_safe(Shard, GenId) ->
|
|
||||||
case get_schema_runtime(Shard) of
|
case get_schema_runtime(Shard) of
|
||||||
#{?GEN_KEY(GenId) := GenData} ->
|
#{?GEN_KEY(GenId) := GenData} ->
|
||||||
{ok, GenData};
|
GenData;
|
||||||
#{} ->
|
#{} ->
|
||||||
{error, not_found}
|
not_found
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].
|
-spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].
|
||||||
|
@ -796,6 +852,20 @@ generations_since(Shard, Since) ->
|
||||||
Schema
|
Schema
|
||||||
).
|
).
|
||||||
|
|
||||||
|
-spec generation_at(shard_id(), emqx_ds:time()) -> generation().
|
||||||
|
generation_at(Shard, Time) ->
|
||||||
|
Schema = #{current_generation := Current} = get_schema_runtime(Shard),
|
||||||
|
generation_at(Time, Current, Schema).
|
||||||
|
|
||||||
|
generation_at(Time, GenId, Schema) ->
|
||||||
|
#{?GEN_KEY(GenId) := Gen} = Schema,
|
||||||
|
case Gen of
|
||||||
|
#{since := Since} when Time < Since andalso GenId > 0 ->
|
||||||
|
generation_at(Time, prev_generation_id(GenId), Schema);
|
||||||
|
_ ->
|
||||||
|
Gen
|
||||||
|
end.
|
||||||
|
|
||||||
-define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).
|
-define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).
|
||||||
|
|
||||||
-spec get_schema_runtime(shard_id()) -> shard().
|
-spec get_schema_runtime(shard_id()) -> shard().
|
||||||
|
|
|
@ -1,88 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
-module(emqx_ds_storage_layer_sup).
|
|
||||||
|
|
||||||
-behaviour(supervisor).
|
|
||||||
|
|
||||||
%% API:
|
|
||||||
-export([start_link/0, start_shard/2, stop_shard/1, ensure_shard/2]).
|
|
||||||
|
|
||||||
%% behaviour callbacks:
|
|
||||||
-export([init/1]).
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% Type declarations
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
-define(SUP, ?MODULE).
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% API funcions
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
-spec start_link() -> {ok, pid()}.
|
|
||||||
start_link() ->
|
|
||||||
supervisor:start_link(?MODULE, []).
|
|
||||||
|
|
||||||
-spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
|
|
||||||
supervisor:startchild_ret().
|
|
||||||
start_shard(Shard, Options) ->
|
|
||||||
supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
|
|
||||||
|
|
||||||
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
|
|
||||||
stop_shard(Shard) ->
|
|
||||||
ok = supervisor:terminate_child(?SUP, Shard),
|
|
||||||
ok = supervisor:delete_child(?SUP, Shard).
|
|
||||||
|
|
||||||
-spec ensure_shard(emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:options()) ->
|
|
||||||
ok | {error, _Reason}.
|
|
||||||
ensure_shard(Shard, Options) ->
|
|
||||||
case start_shard(Shard, Options) of
|
|
||||||
{ok, _Pid} ->
|
|
||||||
ok;
|
|
||||||
{error, {already_started, _Pid}} ->
|
|
||||||
ok;
|
|
||||||
{error, Reason} ->
|
|
||||||
{error, Reason}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% behaviour callbacks
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
init([]) ->
|
|
||||||
Children = [],
|
|
||||||
SupFlags = #{
|
|
||||||
strategy => one_for_one,
|
|
||||||
intensity => 10,
|
|
||||||
period => 10
|
|
||||||
},
|
|
||||||
{ok, {SupFlags, Children}}.
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% Internal functions
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
-spec shard_child_spec(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
|
|
||||||
supervisor:child_spec().
|
|
||||||
shard_child_spec(Shard, Options) ->
|
|
||||||
#{
|
|
||||||
id => Shard,
|
|
||||||
start => {emqx_ds_storage_layer, start_link, [Shard, Options]},
|
|
||||||
shutdown => 5_000,
|
|
||||||
restart => permanent,
|
|
||||||
type => worker
|
|
||||||
}.
|
|
|
@ -0,0 +1,325 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_ds_storage_snapshot).
|
||||||
|
|
||||||
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
|
||||||
|
-export([
|
||||||
|
new_reader/1,
|
||||||
|
read_chunk/2,
|
||||||
|
abort_reader/1,
|
||||||
|
release_reader/1,
|
||||||
|
reader_info/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
new_writer/1,
|
||||||
|
write_chunk/2,
|
||||||
|
abort_writer/1,
|
||||||
|
release_writer/1,
|
||||||
|
writer_info/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export_type([
|
||||||
|
reader/0,
|
||||||
|
writer/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
-define(FILECHUNK(RELPATH, POS, MORE), #{
|
||||||
|
'$' => chunk,
|
||||||
|
rp => RELPATH,
|
||||||
|
pos => POS,
|
||||||
|
more => MORE
|
||||||
|
}).
|
||||||
|
-define(PAT_FILECHUNK(RELPATH, POS, MORE), #{
|
||||||
|
'$' := chunk,
|
||||||
|
rp := RELPATH,
|
||||||
|
pos := POS,
|
||||||
|
more := MORE
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(EOS(), #{
|
||||||
|
'$' => eos
|
||||||
|
}).
|
||||||
|
-define(PAT_EOS(), #{
|
||||||
|
'$' := eos
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PAT_HEADER(), #{'$' := _}).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
-record(reader, {
|
||||||
|
dirpath :: file:filename(),
|
||||||
|
files :: #{_RelPath => reader_file()},
|
||||||
|
queue :: [_RelPath :: file:filename()]
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(rfile, {
|
||||||
|
abspath :: file:filename(),
|
||||||
|
fd :: file:io_device() | eof,
|
||||||
|
pos :: non_neg_integer()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-opaque reader() :: #reader{}.
|
||||||
|
-type reader_file() :: #rfile{}.
|
||||||
|
|
||||||
|
-type reason() :: {atom(), _AbsPath :: file:filename(), _Details :: term()}.
|
||||||
|
|
||||||
|
%% @doc Initialize a reader for a snapshot directory.
|
||||||
|
%% Snapshot directory is a directory containing arbitrary number of regular
|
||||||
|
%% files in arbitrary subdirectory structure. Files are read in indeterminate
|
||||||
|
%% order. It's an error to have non-regular files in the directory (e.g. symlinks).
|
||||||
|
-spec new_reader(_Dir :: file:filename()) -> {ok, reader()}.
|
||||||
|
new_reader(DirPath) ->
|
||||||
|
%% NOTE
|
||||||
|
%% Opening all files at once, so there would be less error handling later
|
||||||
|
%% during transfer.
|
||||||
|
%% TODO
|
||||||
|
%% Beware of how errors are handled: if one file fails to open, the whole
|
||||||
|
%% process will exit. This is fine for the purpose of replication (because
|
||||||
|
%% ra spawns separate process for each transfer), but may not be suitable
|
||||||
|
%% for other use cases.
|
||||||
|
Files = emqx_utils_fs:traverse_dir(
|
||||||
|
fun(Path, Info, Acc) -> new_reader_file(Path, Info, DirPath, Acc) end,
|
||||||
|
#{},
|
||||||
|
DirPath
|
||||||
|
),
|
||||||
|
{ok, #reader{
|
||||||
|
dirpath = DirPath,
|
||||||
|
files = Files,
|
||||||
|
queue = maps:keys(Files)
|
||||||
|
}}.
|
||||||
|
|
||||||
|
new_reader_file(Path, #file_info{type = regular}, DirPath, Acc) ->
|
||||||
|
case file:open(Path, [read, binary, raw]) of
|
||||||
|
{ok, IoDev} ->
|
||||||
|
RelPath = emqx_utils_fs:find_relpath(Path, DirPath),
|
||||||
|
File = #rfile{abspath = Path, fd = IoDev, pos = 0},
|
||||||
|
Acc#{RelPath => File};
|
||||||
|
{error, Reason} ->
|
||||||
|
error({open_failed, Path, Reason})
|
||||||
|
end;
|
||||||
|
new_reader_file(Path, #file_info{type = Type}, _, _Acc) ->
|
||||||
|
error({bad_file_type, Path, Type});
|
||||||
|
new_reader_file(Path, {error, Reason}, _, _Acc) ->
|
||||||
|
error({inaccessible, Path, Reason}).
|
||||||
|
|
||||||
|
%% @doc Read a chunk of data from the snapshot.
|
||||||
|
%% Returns `{last, Chunk, Reader}` when the last chunk is read. After that, one
|
||||||
|
%% should call `release_reader/1` to finalize the process (or `abort_reader/1` if
|
||||||
|
%% keeping the snapshot is desired).
|
||||||
|
-spec read_chunk(reader(), _Size :: non_neg_integer()) ->
|
||||||
|
{last | next, _Chunk :: iodata(), reader()} | {error, reason()}.
|
||||||
|
read_chunk(R = #reader{files = Files, queue = [RelPath | Rest]}, Size) ->
|
||||||
|
File = maps:get(RelPath, Files),
|
||||||
|
case read_chunk_file(RelPath, File, Size) of
|
||||||
|
{last, Chunk, FileRest} ->
|
||||||
|
{next, Chunk, R#reader{files = Files#{RelPath := FileRest}, queue = Rest}};
|
||||||
|
{next, Chunk, FileRest} ->
|
||||||
|
{next, Chunk, R#reader{files = Files#{RelPath := FileRest}}};
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end;
|
||||||
|
read_chunk(R = #reader{queue = []}, _Size) ->
|
||||||
|
{last, make_packet(?EOS()), R}.
|
||||||
|
|
||||||
|
read_chunk_file(RelPath, RFile0 = #rfile{fd = IoDev, pos = Pos, abspath = AbsPath}, Size) ->
|
||||||
|
case file:read(IoDev, Size) of
|
||||||
|
{ok, Chunk} ->
|
||||||
|
ChunkSize = byte_size(Chunk),
|
||||||
|
HasMore = ChunkSize div Size,
|
||||||
|
RFile1 = RFile0#rfile{pos = Pos + ChunkSize},
|
||||||
|
case ChunkSize < Size of
|
||||||
|
false ->
|
||||||
|
Status = next,
|
||||||
|
RFile = RFile1;
|
||||||
|
true ->
|
||||||
|
Status = last,
|
||||||
|
RFile = release_reader_file(RFile1)
|
||||||
|
end,
|
||||||
|
Packet = make_packet(?FILECHUNK(RelPath, Pos, HasMore), Chunk),
|
||||||
|
{Status, Packet, RFile};
|
||||||
|
eof ->
|
||||||
|
Packet = make_packet(?FILECHUNK(RelPath, Pos, 0)),
|
||||||
|
{last, Packet, release_reader_file(RFile0)};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, {read_failed, AbsPath, Reason}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Aborts the snapshot reader, but does not release the snapshot files.
|
||||||
|
-spec abort_reader(reader()) -> ok.
|
||||||
|
abort_reader(#reader{files = Files}) ->
|
||||||
|
lists:foreach(fun release_reader_file/1, maps:values(Files)).
|
||||||
|
|
||||||
|
%% @doc Aborts the snapshot reader and deletes the snapshot files.
|
||||||
|
-spec release_reader(reader()) -> ok.
|
||||||
|
release_reader(R = #reader{dirpath = DirPath}) ->
|
||||||
|
ok = abort_reader(R),
|
||||||
|
file:del_dir_r(DirPath).
|
||||||
|
|
||||||
|
release_reader_file(RFile = #rfile{fd = eof}) ->
|
||||||
|
RFile;
|
||||||
|
release_reader_file(RFile = #rfile{fd = IoDev}) ->
|
||||||
|
_ = file:close(IoDev),
|
||||||
|
RFile#rfile{fd = eof}.
|
||||||
|
|
||||||
|
-spec reader_info(bytes_read, reader()) -> _Bytes :: non_neg_integer().
|
||||||
|
reader_info(bytes_read, #reader{files = Files}) ->
|
||||||
|
maps:fold(fun(_, RFile, Sum) -> Sum + RFile#rfile.pos end, 0, Files).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
-record(writer, {
|
||||||
|
dirpath :: file:filename(),
|
||||||
|
files :: #{_RelPath :: file:filename() => writer_file()}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(wfile, {
|
||||||
|
abspath :: file:filename(),
|
||||||
|
fd :: file:io_device() | eof,
|
||||||
|
pos :: non_neg_integer()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-opaque writer() :: #writer{}.
|
||||||
|
-type writer_file() :: #wfile{}.
|
||||||
|
|
||||||
|
%% @doc Initialize a writer into a snapshot directory.
|
||||||
|
%% The directory needs not to exist, it will be created if it doesn't.
|
||||||
|
%% Having non-empty directory is not an error, existing files will be
|
||||||
|
%% overwritten.
|
||||||
|
-spec new_writer(_Dir :: file:filename()) -> {ok, writer()} | {error, reason()}.
|
||||||
|
new_writer(DirPath) ->
|
||||||
|
case filelib:ensure_path(DirPath) of
|
||||||
|
ok ->
|
||||||
|
{ok, #writer{dirpath = DirPath, files = #{}}};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, {mkdir_failed, DirPath, Reason}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Write a chunk of data to the snapshot.
|
||||||
|
%% Returns `{last, Writer}` when the last chunk is written. After that, one
|
||||||
|
%% should call `release_writer/1` to finalize the process.
|
||||||
|
-spec write_chunk(writer(), _Chunk :: binary()) ->
|
||||||
|
{last | next, writer()} | {error, _Reason}.
|
||||||
|
write_chunk(W, Packet) ->
|
||||||
|
case parse_packet(Packet) of
|
||||||
|
{?PAT_FILECHUNK(RelPath, Pos, More), Chunk} ->
|
||||||
|
write_chunk(W, RelPath, Pos, More, Chunk);
|
||||||
|
{?PAT_EOS(), _Rest} ->
|
||||||
|
%% TODO: Verify all files are `eof` at this point?
|
||||||
|
{last, W};
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
write_chunk(W = #writer{files = Files}, RelPath, Pos, More, Chunk) ->
|
||||||
|
case Files of
|
||||||
|
#{RelPath := WFile} ->
|
||||||
|
write_chunk(W, WFile, RelPath, Pos, More, Chunk);
|
||||||
|
#{} when Pos == 0 ->
|
||||||
|
case new_writer_file(W, RelPath) of
|
||||||
|
WFile = #wfile{} ->
|
||||||
|
write_chunk(W, WFile, RelPath, Pos, More, Chunk);
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end;
|
||||||
|
#{} ->
|
||||||
|
{error, {bad_chunk, RelPath, Pos}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
write_chunk(W = #writer{files = Files}, WFile0, RelPath, Pos, More, Chunk) ->
|
||||||
|
case write_chunk_file(WFile0, Pos, More, Chunk) of
|
||||||
|
WFile = #wfile{} ->
|
||||||
|
{next, W#writer{files = Files#{RelPath => WFile}}};
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
new_writer_file(#writer{dirpath = DirPath}, RelPath) ->
|
||||||
|
AbsPath = filename:join(DirPath, RelPath),
|
||||||
|
_ = filelib:ensure_dir(AbsPath),
|
||||||
|
case file:open(AbsPath, [write, binary, raw]) of
|
||||||
|
{ok, IoDev} ->
|
||||||
|
#wfile{
|
||||||
|
abspath = AbsPath,
|
||||||
|
fd = IoDev,
|
||||||
|
pos = 0
|
||||||
|
};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, {open_failed, AbsPath, Reason}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
write_chunk_file(WFile0 = #wfile{fd = IoDev, pos = Pos, abspath = AbsPath}, Pos, More, Chunk) ->
|
||||||
|
ChunkSize = byte_size(Chunk),
|
||||||
|
case file:write(IoDev, Chunk) of
|
||||||
|
ok ->
|
||||||
|
WFile1 = WFile0#wfile{pos = Pos + ChunkSize},
|
||||||
|
case More of
|
||||||
|
0 -> release_writer_file(WFile1);
|
||||||
|
_ -> WFile1
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, {write_failed, AbsPath, Reason}}
|
||||||
|
end;
|
||||||
|
write_chunk_file(WFile = #wfile{pos = WPos}, Pos, _More, _Chunk) when Pos < WPos ->
|
||||||
|
WFile;
|
||||||
|
write_chunk_file(#wfile{abspath = AbsPath}, Pos, _More, _Chunk) ->
|
||||||
|
{error, {bad_chunk, AbsPath, Pos}}.
|
||||||
|
|
||||||
|
%% @doc Abort the writer and clean up unfinished snapshot files.
|
||||||
|
-spec abort_writer(writer()) -> ok | {error, file:posix()}.
|
||||||
|
abort_writer(W = #writer{dirpath = DirPath}) ->
|
||||||
|
ok = release_writer(W),
|
||||||
|
file:del_dir_r(DirPath).
|
||||||
|
|
||||||
|
%% @doc Release the writer and close all snapshot files.
|
||||||
|
-spec release_writer(writer()) -> ok.
|
||||||
|
release_writer(#writer{files = Files}) ->
|
||||||
|
ok = lists:foreach(fun release_writer_file/1, maps:values(Files)).
|
||||||
|
|
||||||
|
release_writer_file(WFile = #wfile{fd = eof}) ->
|
||||||
|
WFile;
|
||||||
|
release_writer_file(WFile = #wfile{fd = IoDev}) ->
|
||||||
|
_ = file:close(IoDev),
|
||||||
|
WFile#wfile{fd = eof}.
|
||||||
|
|
||||||
|
-spec writer_info(bytes_written, writer()) -> _Bytes :: non_neg_integer().
|
||||||
|
writer_info(bytes_written, #writer{files = Files}) ->
|
||||||
|
maps:fold(fun(_, WFile, Sum) -> Sum + WFile#wfile.pos end, 0, Files).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
make_packet(Header) ->
|
||||||
|
term_to_binary(Header).
|
||||||
|
|
||||||
|
make_packet(Header, Rest) ->
|
||||||
|
HeaderBytes = term_to_binary(Header),
|
||||||
|
<<HeaderBytes/binary, Rest/binary>>.
|
||||||
|
|
||||||
|
parse_packet(Packet) ->
|
||||||
|
try binary_to_term(Packet, [safe, used]) of
|
||||||
|
{Header = ?PAT_HEADER(), Length} ->
|
||||||
|
Rest = binary:part(Packet, Length, byte_size(Packet) - Length),
|
||||||
|
{Header, Rest};
|
||||||
|
{Header, _} ->
|
||||||
|
{error, {bad_header, Header}}
|
||||||
|
catch
|
||||||
|
error:badarg ->
|
||||||
|
{error, bad_packet}
|
||||||
|
end.
|
|
@ -0,0 +1,202 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_ds_replication_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/test_macros.hrl").
|
||||||
|
|
||||||
|
-define(DB, testdb).
|
||||||
|
|
||||||
|
opts() ->
|
||||||
|
#{
|
||||||
|
backend => builtin,
|
||||||
|
storage => {emqx_ds_storage_bitfield_lts, #{}},
|
||||||
|
n_shards => 1,
|
||||||
|
n_sites => 3,
|
||||||
|
replication_factor => 3,
|
||||||
|
replication_options => #{
|
||||||
|
wal_max_size_bytes => 128 * 1024,
|
||||||
|
wal_max_batch_size => 1024,
|
||||||
|
snapshot_interval => 128
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
|
t_replication_transfers_snapshots(Config) ->
|
||||||
|
NMsgs = 4000,
|
||||||
|
Nodes = [Node, NodeOffline | _] = ?config(nodes, Config),
|
||||||
|
_Specs = [_, SpecOffline | _] = ?config(specs, Config),
|
||||||
|
|
||||||
|
%% Initialize DB on all nodes and wait for it to be online.
|
||||||
|
?assertEqual(
|
||||||
|
[{ok, ok} || _ <- Nodes],
|
||||||
|
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, opts()])
|
||||||
|
),
|
||||||
|
?retry(
|
||||||
|
500,
|
||||||
|
10,
|
||||||
|
?assertMatch([_], shards_online(Node, ?DB))
|
||||||
|
),
|
||||||
|
|
||||||
|
%% Stop the DB on the "offline" node.
|
||||||
|
ok = emqx_cth_cluster:stop_node(NodeOffline),
|
||||||
|
|
||||||
|
%% Fill the storage with messages and few additional generations.
|
||||||
|
Messages = fill_storage(Node, ?DB, NMsgs, #{p_addgen => 0.01}),
|
||||||
|
|
||||||
|
%% Restart the node.
|
||||||
|
[NodeOffline] = emqx_cth_cluster:restart(SpecOffline),
|
||||||
|
{ok, SRef} = snabbkaffe:subscribe(
|
||||||
|
?match_event(#{
|
||||||
|
?snk_kind := dsrepl_snapshot_accepted,
|
||||||
|
?snk_meta := #{node := NodeOffline}
|
||||||
|
})
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
ok,
|
||||||
|
erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()])
|
||||||
|
),
|
||||||
|
|
||||||
|
%% Trigger storage operation and wait the replica to be restored.
|
||||||
|
_ = add_generation(Node, ?DB),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
snabbkaffe:receive_events(SRef)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% Wait until any pending replication activities are finished (e.g. Raft log entries).
|
||||||
|
ok = timer:sleep(3_000),
|
||||||
|
|
||||||
|
%% Check that the DB has been restored.
|
||||||
|
Shard = hd(shards(NodeOffline, ?DB)),
|
||||||
|
MessagesOffline = lists:keysort(
|
||||||
|
#message.timestamp,
|
||||||
|
consume(NodeOffline, ?DB, Shard, ['#'], 0)
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
sample(40, Messages),
|
||||||
|
sample(40, MessagesOffline)
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
Messages,
|
||||||
|
MessagesOffline
|
||||||
|
).
|
||||||
|
|
||||||
|
shards(Node, DB) ->
|
||||||
|
erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
|
||||||
|
|
||||||
|
shards_online(Node, DB) ->
|
||||||
|
erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [DB]).
|
||||||
|
|
||||||
|
fill_storage(Node, DB, NMsgs, Opts) ->
|
||||||
|
fill_storage(Node, DB, NMsgs, 0, Opts).
|
||||||
|
|
||||||
|
fill_storage(Node, DB, NMsgs, I, Opts = #{p_addgen := PAddGen}) when I < NMsgs ->
|
||||||
|
R1 = push_message(Node, DB, I),
|
||||||
|
R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end),
|
||||||
|
R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts);
|
||||||
|
fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) ->
|
||||||
|
[].
|
||||||
|
|
||||||
|
push_message(Node, DB, I) ->
|
||||||
|
Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]),
|
||||||
|
{Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)),
|
||||||
|
Message = message(Topic, Bytes, I * 100),
|
||||||
|
ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]),
|
||||||
|
[Message].
|
||||||
|
|
||||||
|
add_generation(Node, DB) ->
|
||||||
|
ok = erpc:call(Node, emqx_ds, add_generation, [DB]),
|
||||||
|
[].
|
||||||
|
|
||||||
|
message(Topic, Payload, PublishedAt) ->
|
||||||
|
#message{
|
||||||
|
from = <<?MODULE_STRING>>,
|
||||||
|
topic = Topic,
|
||||||
|
payload = Payload,
|
||||||
|
timestamp = PublishedAt,
|
||||||
|
id = emqx_guid:gen()
|
||||||
|
}.
|
||||||
|
|
||||||
|
consume(Node, DB, Shard, TopicFilter, StartTime) ->
|
||||||
|
Streams = erpc:call(Node, emqx_ds_storage_layer, get_streams, [
|
||||||
|
{DB, Shard}, TopicFilter, StartTime
|
||||||
|
]),
|
||||||
|
lists:flatmap(
|
||||||
|
fun({_Rank, Stream}) ->
|
||||||
|
{ok, It} = erpc:call(Node, emqx_ds_storage_layer, make_iterator, [
|
||||||
|
{DB, Shard}, Stream, TopicFilter, StartTime
|
||||||
|
]),
|
||||||
|
consume_stream(Node, DB, Shard, It)
|
||||||
|
end,
|
||||||
|
Streams
|
||||||
|
).
|
||||||
|
|
||||||
|
consume_stream(Node, DB, Shard, It) ->
|
||||||
|
case erpc:call(Node, emqx_ds_storage_layer, next, [{DB, Shard}, It, 100]) of
|
||||||
|
{ok, _NIt, _Msgs = []} ->
|
||||||
|
[];
|
||||||
|
{ok, NIt, Batch} ->
|
||||||
|
[Msg || {_Key, Msg} <- Batch] ++ consume_stream(Node, DB, Shard, NIt);
|
||||||
|
{ok, end_of_stream} ->
|
||||||
|
[]
|
||||||
|
end.
|
||||||
|
|
||||||
|
probably(P, Fun) ->
|
||||||
|
case rand:uniform() of
|
||||||
|
X when X < P -> Fun();
|
||||||
|
_ -> []
|
||||||
|
end.
|
||||||
|
|
||||||
|
sample(N, List) ->
|
||||||
|
L = length(List),
|
||||||
|
H = N div 2,
|
||||||
|
Filler = integer_to_list(L - N) ++ " more",
|
||||||
|
lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
suite() -> [{timetrap, {seconds, 60}}].
|
||||||
|
|
||||||
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_testcase(TCName, Config) ->
|
||||||
|
Apps = [
|
||||||
|
{emqx_durable_storage, #{
|
||||||
|
before_start => fun snabbkaffe:fix_ct_logging/0,
|
||||||
|
override_env => [{egress_flush_interval, 1}]
|
||||||
|
}}
|
||||||
|
],
|
||||||
|
WorkDir = emqx_cth_suite:work_dir(TCName, Config),
|
||||||
|
NodeSpecs = emqx_cth_cluster:mk_nodespecs(
|
||||||
|
[
|
||||||
|
{emqx_ds_replication_SUITE1, #{apps => Apps}},
|
||||||
|
{emqx_ds_replication_SUITE2, #{apps => Apps}},
|
||||||
|
{emqx_ds_replication_SUITE3, #{apps => Apps}}
|
||||||
|
],
|
||||||
|
#{work_dir => WorkDir}
|
||||||
|
),
|
||||||
|
Nodes = emqx_cth_cluster:start(NodeSpecs),
|
||||||
|
ok = snabbkaffe:start_trace(),
|
||||||
|
[{nodes, Nodes}, {specs, NodeSpecs} | Config].
|
||||||
|
|
||||||
|
end_per_testcase(_TCName, Config) ->
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
|
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
|
|
@ -0,0 +1,171 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_ds_storage_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
|
||||||
|
opts() ->
|
||||||
|
#{storage => {emqx_ds_storage_bitfield_lts, #{}}}.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
t_idempotent_store_batch(_Config) ->
|
||||||
|
Shard = {?FUNCTION_NAME, _ShardId = <<"42">>},
|
||||||
|
{ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
|
||||||
|
%% Push some messages to the shard.
|
||||||
|
Msgs1 = [gen_message(N) || N <- lists:seq(10, 20)],
|
||||||
|
GenTs = 30,
|
||||||
|
Msgs2 = [gen_message(N) || N <- lists:seq(40, 50)],
|
||||||
|
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})),
|
||||||
|
%% Add new generation and push the same batch + some more.
|
||||||
|
?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, GenTs)),
|
||||||
|
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})),
|
||||||
|
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs2), #{})),
|
||||||
|
%% First batch should have been handled idempotently.
|
||||||
|
?assertEqual(
|
||||||
|
Msgs1 ++ Msgs2,
|
||||||
|
lists:keysort(#message.timestamp, consume(Shard, ['#']))
|
||||||
|
),
|
||||||
|
ok = stop_shard(Pid).
|
||||||
|
|
||||||
|
t_snapshot_take_restore(_Config) ->
|
||||||
|
Shard = {?FUNCTION_NAME, _ShardId = <<"42">>},
|
||||||
|
{ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
|
||||||
|
|
||||||
|
%% Push some messages to the shard.
|
||||||
|
Msgs1 = [gen_message(N) || N <- lists:seq(1000, 2000)],
|
||||||
|
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})),
|
||||||
|
|
||||||
|
%% Add new generation and push some more.
|
||||||
|
?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, 3000)),
|
||||||
|
Msgs2 = [gen_message(N) || N <- lists:seq(4000, 5000)],
|
||||||
|
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs2), #{})),
|
||||||
|
?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, 6000)),
|
||||||
|
|
||||||
|
%% Take a snapshot of the shard.
|
||||||
|
{ok, SnapReader} = emqx_ds_storage_layer:take_snapshot(Shard),
|
||||||
|
|
||||||
|
%% Push even more messages to the shard AFTER taking the snapshot.
|
||||||
|
Msgs3 = [gen_message(N) || N <- lists:seq(7000, 8000)],
|
||||||
|
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs3), #{})),
|
||||||
|
|
||||||
|
%% Destroy the shard.
|
||||||
|
ok = stop_shard(Pid),
|
||||||
|
ok = emqx_ds_storage_layer:drop_shard(Shard),
|
||||||
|
|
||||||
|
%% Restore the shard from the snapshot.
|
||||||
|
{ok, SnapWriter} = emqx_ds_storage_layer:accept_snapshot(Shard),
|
||||||
|
?assertEqual(ok, transfer_snapshot(SnapReader, SnapWriter)),
|
||||||
|
|
||||||
|
%% Verify that the restored shard contains the messages up until the snapshot.
|
||||||
|
{ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
|
||||||
|
?assertEqual(
|
||||||
|
Msgs1 ++ Msgs2,
|
||||||
|
lists:keysort(#message.timestamp, consume(Shard, ['#']))
|
||||||
|
).
|
||||||
|
|
||||||
|
transfer_snapshot(Reader, Writer) ->
|
||||||
|
ChunkSize = rand:uniform(1024),
|
||||||
|
ReadResult = emqx_ds_storage_snapshot:read_chunk(Reader, ChunkSize),
|
||||||
|
?assertMatch({RStatus, _, _} when RStatus == next; RStatus == last, ReadResult),
|
||||||
|
{RStatus, Chunk, NReader} = ReadResult,
|
||||||
|
Data = iolist_to_binary(Chunk),
|
||||||
|
{WStatus, NWriter} = emqx_ds_storage_snapshot:write_chunk(Writer, Data),
|
||||||
|
%% Verify idempotency.
|
||||||
|
?assertMatch(
|
||||||
|
{WStatus, NWriter},
|
||||||
|
emqx_ds_storage_snapshot:write_chunk(NWriter, Data)
|
||||||
|
),
|
||||||
|
%% Verify convergence.
|
||||||
|
?assertEqual(
|
||||||
|
RStatus,
|
||||||
|
WStatus,
|
||||||
|
#{reader => NReader, writer => NWriter}
|
||||||
|
),
|
||||||
|
case WStatus of
|
||||||
|
last ->
|
||||||
|
?assertEqual(ok, emqx_ds_storage_snapshot:release_reader(NReader)),
|
||||||
|
?assertEqual(ok, emqx_ds_storage_snapshot:release_writer(NWriter)),
|
||||||
|
ok;
|
||||||
|
next ->
|
||||||
|
transfer_snapshot(NReader, NWriter)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
batch(Msgs) ->
|
||||||
|
[{emqx_message:timestamp(Msg), Msg} || Msg <- Msgs].
|
||||||
|
|
||||||
|
gen_message(N) ->
|
||||||
|
Topic = emqx_topic:join([<<"foo">>, <<"bar">>, integer_to_binary(N)]),
|
||||||
|
message(Topic, crypto:strong_rand_bytes(16), N).
|
||||||
|
|
||||||
|
message(Topic, Payload, PublishedAt) ->
|
||||||
|
#message{
|
||||||
|
from = <<?MODULE_STRING>>,
|
||||||
|
topic = Topic,
|
||||||
|
payload = Payload,
|
||||||
|
timestamp = PublishedAt,
|
||||||
|
id = emqx_guid:gen()
|
||||||
|
}.
|
||||||
|
|
||||||
|
consume(Shard, TopicFilter) ->
|
||||||
|
consume(Shard, TopicFilter, 0).
|
||||||
|
|
||||||
|
consume(Shard, TopicFilter, StartTime) ->
|
||||||
|
Streams = emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime),
|
||||||
|
lists:flatmap(
|
||||||
|
fun({_Rank, Stream}) ->
|
||||||
|
{ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime),
|
||||||
|
consume_stream(Shard, It)
|
||||||
|
end,
|
||||||
|
Streams
|
||||||
|
).
|
||||||
|
|
||||||
|
consume_stream(Shard, It) ->
|
||||||
|
case emqx_ds_storage_layer:next(Shard, It, 100) of
|
||||||
|
{ok, _NIt, _Msgs = []} ->
|
||||||
|
[];
|
||||||
|
{ok, NIt, Batch} ->
|
||||||
|
[Msg || {_DSKey, Msg} <- Batch] ++ consume_stream(Shard, NIt);
|
||||||
|
{ok, end_of_stream} ->
|
||||||
|
[]
|
||||||
|
end.
|
||||||
|
|
||||||
|
stop_shard(Pid) ->
|
||||||
|
_ = unlink(Pid),
|
||||||
|
proc_lib:stop(Pid, shutdown, infinity).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_testcase(TCName, Config) ->
|
||||||
|
WorkDir = emqx_cth_suite:work_dir(TCName, Config),
|
||||||
|
Apps = emqx_cth_suite:start(
|
||||||
|
[{emqx_durable_storage, #{override_env => [{db_data_dir, WorkDir}]}}],
|
||||||
|
#{work_dir => WorkDir}
|
||||||
|
),
|
||||||
|
[{apps, Apps} | Config].
|
||||||
|
|
||||||
|
end_per_testcase(_TCName, Config) ->
|
||||||
|
ok = emqx_cth_suite:stop(?config(apps, Config)),
|
||||||
|
ok.
|
Loading…
Reference in New Issue