feat(queue): send progress before fetching new messages
This commit is contained in:
parent
d32f282feb
commit
a20d262327
|
@ -987,14 +987,13 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
|
||||||
%% Normal replay:
|
%% Normal replay:
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
fetch_new_messages(Session0 = #{s := S0}, ClientInfo) ->
|
fetch_new_messages(Session0 = #{s := S0, shared_sub_s := SharedSubS0}, ClientInfo) ->
|
||||||
|
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0),
|
||||||
LFS = maps:get(last_fetched_stream, Session0, beginning),
|
LFS = maps:get(last_fetched_stream, Session0, beginning),
|
||||||
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S0),
|
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S1),
|
||||||
BatchSize = get_config(ClientInfo, [batch_size]),
|
BatchSize = get_config(ClientInfo, [batch_size]),
|
||||||
Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo),
|
Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo),
|
||||||
#{s := S1, shared_sub_s := SharedSubS0} = Session1,
|
Session1#{shared_sub_s => SharedSubS1}.
|
||||||
{S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0),
|
|
||||||
Session1#{s => S2, shared_sub_s => SharedSubS1}.
|
|
||||||
|
|
||||||
fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) ->
|
fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) ->
|
||||||
#{inflight := Inflight} = Session0,
|
#{inflight := Inflight} = Session0,
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
on_unsubscribe/4,
|
on_unsubscribe/4,
|
||||||
on_disconnect/2,
|
on_disconnect/2,
|
||||||
|
|
||||||
on_streams_replayed/2,
|
on_streams_replay/2,
|
||||||
on_info/3,
|
on_info/3,
|
||||||
|
|
||||||
renew_streams/2,
|
renew_streams/2,
|
||||||
|
@ -114,11 +114,11 @@ renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
|
||||||
SharedSubS1 = SharedSubS0#{agent => Agent1},
|
SharedSubS1 = SharedSubS0#{agent => Agent1},
|
||||||
{S1, SharedSubS1}.
|
{S1, SharedSubS1}.
|
||||||
|
|
||||||
-spec on_streams_replayed(
|
-spec on_streams_replay(
|
||||||
emqx_persistent_session_ds_state:t(),
|
emqx_persistent_session_ds_state:t(),
|
||||||
t()
|
t()
|
||||||
) -> {emqx_persistent_session_ds_state:t(), t()}.
|
) -> {emqx_persistent_session_ds_state:t(), t()}.
|
||||||
on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
|
on_streams_replay(S, #{agent := Agent0} = SharedSubS0) ->
|
||||||
Progresses = stream_progresses(S),
|
Progresses = stream_progresses(S),
|
||||||
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
|
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
|
||||||
Agent0, Progresses
|
Agent0, Progresses
|
||||||
|
|
|
@ -336,8 +336,8 @@ select_streams_for_revoke(
|
||||||
) ->
|
) ->
|
||||||
%% TODO
|
%% TODO
|
||||||
%% Some intellectual logic should be used regarding:
|
%% Some intellectual logic should be used regarding:
|
||||||
%% * shard ids (better spread shards across different streams);
|
%% * shard ids (better do not mix shards in the same agent);
|
||||||
%% * stream stats (how much data was replayed from stream,
|
%% * stream stats (how much data was replayed from stream),
|
||||||
%% heavy streams should be distributed across different agents);
|
%% heavy streams should be distributed across different agents);
|
||||||
%% * data locality (agents better preserve streams with data available on the agent's node)
|
%% * data locality (agents better preserve streams with data available on the agent's node)
|
||||||
lists:sublist(shuffle(Streams), RevokeCount).
|
lists:sublist(shuffle(Streams), RevokeCount).
|
||||||
|
|
Loading…
Reference in New Issue