refactor(sessds): #ifs -> #srs
This commit is contained in:
parent
974760d331
commit
2d23212792
|
@ -138,7 +138,7 @@
|
||||||
ref :: reference()
|
ref :: reference()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type stream_state() :: #ifs{}.
|
-type stream_state() :: #srs{}.
|
||||||
|
|
||||||
-type timestamp() :: emqx_utils_calendar:epoch_millisecond().
|
-type timestamp() :: emqx_utils_calendar:epoch_millisecond().
|
||||||
-type millisecond() :: non_neg_integer().
|
-type millisecond() :: non_neg_integer().
|
||||||
|
@ -495,15 +495,15 @@ replay(ClientInfo, [], Session0 = #{s := S0}) ->
|
||||||
{ok, [], pull_now(Session)}.
|
{ok, [], pull_now(Session)}.
|
||||||
|
|
||||||
-spec replay_batch(stream_state(), session(), clientinfo()) -> session().
|
-spec replay_batch(stream_state(), session(), clientinfo()) -> session().
|
||||||
replay_batch(Ifs0, Session, ClientInfo) ->
|
replay_batch(Srs0, Session, ClientInfo) ->
|
||||||
#ifs{batch_size = BatchSize} = Ifs0,
|
#srs{batch_size = BatchSize} = Srs0,
|
||||||
%% TODO: retry on errors:
|
%% TODO: retry on errors:
|
||||||
{Ifs, Inflight} = enqueue_batch(true, BatchSize, Ifs0, Session, ClientInfo),
|
{Srs, Inflight} = enqueue_batch(true, BatchSize, Srs0, Session, ClientInfo),
|
||||||
%% Assert:
|
%% Assert:
|
||||||
Ifs =:= Ifs0 orelse
|
Srs =:= Srs0 orelse
|
||||||
?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{
|
?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{
|
||||||
expected => Ifs0,
|
expected => Srs0,
|
||||||
got => Ifs
|
got => Srs
|
||||||
}),
|
}),
|
||||||
Session#{inflight => Inflight}.
|
Session#{inflight => Inflight}.
|
||||||
|
|
||||||
|
@ -678,29 +678,29 @@ fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo
|
||||||
fetch_new_messages(Streams, Session, ClientInfo)
|
fetch_new_messages(Streams, Session, ClientInfo)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
new_batch({StreamKey, Ifs0}, BatchSize, Session = #{s := S0}, ClientInfo) ->
|
new_batch({StreamKey, Srs0}, BatchSize, Session = #{s := S0}, ClientInfo) ->
|
||||||
SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0),
|
SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0),
|
||||||
SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0),
|
SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0),
|
||||||
Ifs1 = Ifs0#ifs{
|
Srs1 = Srs0#srs{
|
||||||
first_seqno_qos1 = SN1,
|
first_seqno_qos1 = SN1,
|
||||||
first_seqno_qos2 = SN2,
|
first_seqno_qos2 = SN2,
|
||||||
batch_size = 0,
|
batch_size = 0,
|
||||||
last_seqno_qos1 = SN1,
|
last_seqno_qos1 = SN1,
|
||||||
last_seqno_qos2 = SN2
|
last_seqno_qos2 = SN2
|
||||||
},
|
},
|
||||||
{Ifs, Inflight} = enqueue_batch(false, BatchSize, Ifs1, Session, ClientInfo),
|
{Srs, Inflight} = enqueue_batch(false, BatchSize, Srs1, Session, ClientInfo),
|
||||||
S1 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_1), Ifs#ifs.last_seqno_qos1, S0),
|
S1 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_1), Srs#srs.last_seqno_qos1, S0),
|
||||||
S2 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_2), Ifs#ifs.last_seqno_qos2, S1),
|
S2 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_2), Srs#srs.last_seqno_qos2, S1),
|
||||||
S = emqx_persistent_session_ds_state:put_stream(StreamKey, Ifs, S2),
|
S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2),
|
||||||
Session#{s => S, inflight => Inflight}.
|
Session#{s => S, inflight => Inflight}.
|
||||||
|
|
||||||
enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, ClientInfo) ->
|
enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, ClientInfo) ->
|
||||||
#ifs{
|
#srs{
|
||||||
it_begin = ItBegin0,
|
it_begin = ItBegin0,
|
||||||
it_end = ItEnd0,
|
it_end = ItEnd0,
|
||||||
first_seqno_qos1 = FirstSeqnoQos1,
|
first_seqno_qos1 = FirstSeqnoQos1,
|
||||||
first_seqno_qos2 = FirstSeqnoQos2
|
first_seqno_qos2 = FirstSeqnoQos2
|
||||||
} = Ifs0,
|
} = Srs0,
|
||||||
ItBegin =
|
ItBegin =
|
||||||
case IsReplay of
|
case IsReplay of
|
||||||
true -> ItBegin0;
|
true -> ItBegin0;
|
||||||
|
@ -711,7 +711,7 @@ enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, Cli
|
||||||
{Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
|
{Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
|
||||||
IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0
|
IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0
|
||||||
),
|
),
|
||||||
Ifs = Ifs0#ifs{
|
Srs = Srs0#srs{
|
||||||
it_begin = ItBegin,
|
it_begin = ItBegin,
|
||||||
it_end = ItEnd,
|
it_end = ItEnd,
|
||||||
%% TODO: it should be possible to avoid calling
|
%% TODO: it should be possible to avoid calling
|
||||||
|
@ -721,13 +721,13 @@ enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, Cli
|
||||||
last_seqno_qos1 = LastSeqnoQos1,
|
last_seqno_qos1 = LastSeqnoQos1,
|
||||||
last_seqno_qos2 = LastSeqnoQos2
|
last_seqno_qos2 = LastSeqnoQos2
|
||||||
},
|
},
|
||||||
{Ifs, Inflight};
|
{Srs, Inflight};
|
||||||
{ok, end_of_stream} ->
|
{ok, end_of_stream} ->
|
||||||
%% No new messages; just update the end iterator:
|
%% No new messages; just update the end iterator:
|
||||||
{Ifs0#ifs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0}, Inflight0};
|
{Srs0#srs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0}, Inflight0};
|
||||||
{error, _} when not IsReplay ->
|
{error, _} when not IsReplay ->
|
||||||
?SLOG(info, #{msg => "failed_to_fetch_batch", iterator => ItBegin}),
|
?SLOG(info, #{msg => "failed_to_fetch_batch", iterator => ItBegin}),
|
||||||
{Ifs0, Inflight0}
|
{Srs0, Inflight0}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% key_of_iter(#{3 := #{3 := #{5 := K}}}) ->
|
%% key_of_iter(#{3 := #{3 := #{5 := K}}}) ->
|
||||||
|
|
|
@ -48,8 +48,8 @@
|
||||||
%% Last seqno assigned to a message (it may not be sent yet).
|
%% Last seqno assigned to a message (it may not be sent yet).
|
||||||
-define(next(QOS), (30 + QOS)).
|
-define(next(QOS), (30 + QOS)).
|
||||||
|
|
||||||
%%%%% State of the stream:
|
%%%%% Stream Replay State:
|
||||||
-record(ifs, {
|
-record(srs, {
|
||||||
rank_x :: emqx_ds:rank_x(),
|
rank_x :: emqx_ds:rank_x(),
|
||||||
rank_y :: emqx_ds:rank_y(),
|
rank_y :: emqx_ds:rank_y(),
|
||||||
%% Iterators at the beginning and the end of the last batch:
|
%% Iterators at the beginning and the end of the last batch:
|
||||||
|
|
|
@ -147,14 +147,14 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
|
||||||
{ok, Iterator} = emqx_ds:make_iterator(
|
{ok, Iterator} = emqx_ds:make_iterator(
|
||||||
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
|
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
|
||||||
),
|
),
|
||||||
NewStreamState = #ifs{
|
NewStreamState = #srs{
|
||||||
rank_x = RankX,
|
rank_x = RankX,
|
||||||
rank_y = RankY,
|
rank_y = RankY,
|
||||||
it_begin = Iterator,
|
it_begin = Iterator,
|
||||||
it_end = Iterator
|
it_end = Iterator
|
||||||
},
|
},
|
||||||
emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
|
emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
|
||||||
#ifs{} ->
|
#srs{} ->
|
||||||
S
|
S
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -199,7 +199,7 @@ remove_fully_replayed_streams(S0) ->
|
||||||
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0),
|
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0),
|
||||||
%% 1. For each subscription, find the X ranks that were fully replayed:
|
%% 1. For each subscription, find the X ranks that were fully replayed:
|
||||||
Groups = emqx_persistent_session_ds_state:fold_streams(
|
Groups = emqx_persistent_session_ds_state:fold_streams(
|
||||||
fun({SubId, _Stream}, StreamState = #ifs{rank_x = RankX, rank_y = RankY}, Acc) ->
|
fun({SubId, _Stream}, StreamState = #srs{rank_x = RankX, rank_y = RankY}, Acc) ->
|
||||||
Key = {SubId, RankX},
|
Key = {SubId, RankX},
|
||||||
case
|
case
|
||||||
{maps:get(Key, Acc, undefined), is_fully_replayed(CommQos1, CommQos2, StreamState)}
|
{maps:get(Key, Acc, undefined), is_fully_replayed(CommQos1, CommQos2, StreamState)}
|
||||||
|
@ -228,7 +228,7 @@ remove_fully_replayed_streams(S0) ->
|
||||||
),
|
),
|
||||||
%% 3. Remove the fully replayed streams:
|
%% 3. Remove the fully replayed streams:
|
||||||
emqx_persistent_session_ds_state:fold_streams(
|
emqx_persistent_session_ds_state:fold_streams(
|
||||||
fun(Key = {SubId, _Stream}, #ifs{rank_x = RankX, rank_y = RankY}, Acc) ->
|
fun(Key = {SubId, _Stream}, #srs{rank_x = RankX, rank_y = RankY}, Acc) ->
|
||||||
case emqx_persistent_session_ds_state:get_rank({SubId, RankX}, Acc) of
|
case emqx_persistent_session_ds_state:get_rank({SubId, RankX}, Acc) of
|
||||||
undefined ->
|
undefined ->
|
||||||
Acc;
|
Acc;
|
||||||
|
@ -249,8 +249,8 @@ remove_fully_replayed_streams(S0) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
compare_streams(
|
compare_streams(
|
||||||
{_KeyA, #ifs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}},
|
{_KeyA, #srs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}},
|
||||||
{_KeyB, #ifs{first_seqno_qos1 = B1, first_seqno_qos2 = B2}}
|
{_KeyB, #srs{first_seqno_qos1 = B1, first_seqno_qos2 = B2}}
|
||||||
) ->
|
) ->
|
||||||
case A1 =:= B1 of
|
case A1 =:= B1 of
|
||||||
true ->
|
true ->
|
||||||
|
@ -259,10 +259,10 @@ compare_streams(
|
||||||
A1 < B1
|
A1 < B1
|
||||||
end.
|
end.
|
||||||
|
|
||||||
is_fully_replayed(Comm1, Comm2, S = #ifs{it_end = It}) ->
|
is_fully_replayed(Comm1, Comm2, S = #srs{it_end = It}) ->
|
||||||
It =:= end_of_stream andalso is_fully_acked(Comm1, Comm2, S).
|
It =:= end_of_stream andalso is_fully_acked(Comm1, Comm2, S).
|
||||||
|
|
||||||
is_fully_acked(Comm1, Comm2, #ifs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
|
is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
|
||||||
(Comm1 >= S1) andalso (Comm2 >= S2).
|
(Comm1 >= S1) andalso (Comm2 >= S2).
|
||||||
|
|
||||||
-spec shuffle([A]) -> [A].
|
-spec shuffle([A]) -> [A].
|
||||||
|
|
Loading…
Reference in New Issue