fix(buffer): fix `replayq` usages in buffer workers (5.0)

https://emqx.atlassian.net/browse/EMQX-8700

Fixes a few errors in the usage of `replayq` queues.

- Close `replayq` when `emqx_resource_worker` terminates.
- Do not keep old references to `replayq` after any `pop`s.
- Clear `replayq`'s data directories when removing a resource.
This commit is contained in:
Thales Macedo Garitezi 2023-01-06 14:05:16 -03:00
parent 4c25be8a2c
commit c383558467
4 changed files with 160 additions and 17 deletions

View File

@ -53,6 +53,8 @@
-export([reply_after_query/7, batch_reply_after_query/7]). -export([reply_after_query/7, batch_reply_after_query/7]).
-export([clear_disk_queue_dir/2]).
-elvis([{elvis_style, dont_repeat_yourself, disable}]). -elvis([{elvis_style, dont_repeat_yourself, disable}]).
-define(Q_ITEM(REQUEST), {q_item, REQUEST}). -define(Q_ITEM(REQUEST), {q_item, REQUEST}).
@ -176,6 +178,7 @@ init({Id, Index, Opts}) ->
resume_interval => maps:get(resume_interval, Opts, HCItvl), resume_interval => maps:get(resume_interval, Opts, HCItvl),
tref => undefined tref => undefined
}, },
?tp(resource_worker_init, #{id => Id, index => Index}),
{ok, blocked, St, {next_event, cast, resume}}. {ok, blocked, St, {next_event, cast, resume}}.
running(enter, _, St) -> running(enter, _, St) ->
@ -232,7 +235,9 @@ blocked(info, Info, _Data) ->
terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
emqx_resource_metrics:inflight_set(Id, Index, 0), emqx_resource_metrics:inflight_set(Id, Index, 0),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
gproc_pool:disconnect_worker(Id, {Id, Index}). gproc_pool:disconnect_worker(Id, {Id, Index}),
replayq:close(Q),
ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -432,7 +437,11 @@ flush(Data0) ->
{Q1, QAckRef, Batch0} = replayq:pop(Q0, #{count_limit => BatchSize}), {Q1, QAckRef, Batch0} = replayq:pop(Q0, #{count_limit => BatchSize}),
Batch = [Item || ?Q_ITEM(Item) <- Batch0], Batch = [Item || ?Q_ITEM(Item) <- Batch0],
IsBatch = BatchSize =/= 1, IsBatch = BatchSize =/= 1,
do_flush(Data0, #{ %% We *must* use the new queue, because we currently can't
%% `nack' a `pop'.
%% Maybe we could re-open the queue?
Data1 = Data0#{queue := Q1},
do_flush(Data1, #{
new_queue => Q1, new_queue => Q1,
is_batch => IsBatch, is_batch => IsBatch,
batch => Batch, batch => Batch,
@ -463,24 +472,35 @@ do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_que
%% failed and is not async; keep the request in the queue to %% failed and is not async; keep the request in the queue to
%% be retried %% be retried
{true, false} -> {true, false} ->
{next_state, blocked, Data1}; %% Note: currently, we cannot safely pop an item from
%% `replayq', keep the old reference to the queue and
%% later try to append new items to the old ref: by
%% popping an item, we may cause the side effect of
%% closing an open segment and opening a new one, and the
%% later `append' with the old file descriptor will fail
%% with `einval' because it has been closed. So we are
%% forced to re-append the item, changing the order of
%% requests...
ok = replayq:ack(Q1, QAckRef),
SentBatch = mark_as_sent(Batch),
Q2 = append_queue(Id, Index, Q1, SentBatch),
Data2 = Data1#{queue := Q2},
{next_state, blocked, Data2};
%% failed and is async; remove the request from the queue, as %% failed and is async; remove the request from the queue, as
%% it is already in inflight table %% it is already in inflight table
{true, true} -> {true, true} ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
Data = Data1#{queue := Q1}, {next_state, blocked, Data1};
{next_state, blocked, Data};
%% success; just ack %% success; just ack
{false, _} -> {false, _} ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
Data2 = Data1#{queue := Q1},
case replayq:count(Q1) > 0 of case replayq:count(Q1) > 0 of
true -> true ->
{keep_state, Data2, [{next_event, internal, flush}]}; {keep_state, Data1, [{next_event, internal, flush}]};
false -> false ->
{keep_state, Data2} {keep_state, Data1}
end end
end; end;
do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) -> do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) ->
@ -498,28 +518,39 @@ do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queu
%% failed and is not async; keep the request in the queue to %% failed and is not async; keep the request in the queue to
%% be retried %% be retried
{true, false} -> {true, false} ->
{next_state, blocked, Data1}; %% Note: currently, we cannot safely pop an item from
%% `replayq', keep the old reference to the queue and
%% later try to append new items to the old ref: by
%% popping an item, we may cause the side effect of
%% closing an open segment and opening a new one, and the
%% later `append' with the old file descriptor will fail
%% with `einval' because it has been closed. So we are
%% forced to re-append the item, changing the order of
%% requests...
ok = replayq:ack(Q1, QAckRef),
SentBatch = mark_as_sent(Batch),
Q2 = append_queue(Id, Index, Q1, SentBatch),
Data2 = Data1#{queue := Q2},
{next_state, blocked, Data2};
%% failed and is async; remove the request from the queue, as %% failed and is async; remove the request from the queue, as
%% it is already in inflight table %% it is already in inflight table
{true, true} -> {true, true} ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
Data = Data1#{queue := Q1}, {next_state, blocked, Data1};
{next_state, blocked, Data};
%% success; just ack %% success; just ack
{false, _} -> {false, _} ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
CurrentCount = replayq:count(Q1), CurrentCount = replayq:count(Q1),
Data2 = Data1#{queue := Q1},
case {CurrentCount > 0, CurrentCount >= BatchSize} of case {CurrentCount > 0, CurrentCount >= BatchSize} of
{false, _} -> {false, _} ->
{keep_state, Data2}; {keep_state, Data1};
{true, true} -> {true, true} ->
{keep_state, Data2, [{next_event, internal, flush}]}; {keep_state, Data1, [{next_event, internal, flush}]};
{true, false} -> {true, false} ->
Data3 = ensure_flush_timer(Data2), Data2 = ensure_flush_timer(Data1),
{keep_state, Data3} {keep_state, Data2}
end end
end. end.
@ -874,6 +905,15 @@ disk_queue_dir(Id, Index) ->
QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index), QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
filename:join([emqx:data_dir(), "resource_worker", node(), QDir]). filename:join([emqx:data_dir(), "resource_worker", node(), QDir]).
clear_disk_queue_dir(Id, Index) ->
ReplayQDir = disk_queue_dir(Id, Index),
case file:del_dir_r(ReplayQDir) of
{error, enoent} ->
ok;
Res ->
Res
end.
ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) -> ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) ->
Ref = make_ref(), Ref = make_ref(),
TRef = erlang:send_after(T, self(), {flush, Ref}), TRef = erlang:send_after(T, self(), {flush, Ref}),

View File

@ -67,7 +67,8 @@ stop_workers(ResId, Opts) ->
WorkerPoolSize = worker_pool_size(Opts), WorkerPoolSize = worker_pool_size(Opts),
lists:foreach( lists:foreach(
fun(Idx) -> fun(Idx) ->
ensure_worker_removed(ResId, Idx) ensure_worker_removed(ResId, Idx),
ensure_disk_queue_dir_absent(ResId, Idx)
end, end,
lists:seq(1, WorkerPoolSize) lists:seq(1, WorkerPoolSize)
), ),
@ -127,6 +128,10 @@ ensure_worker_removed(ResId, Idx) ->
{error, Reason} {error, Reason}
end. end.
ensure_disk_queue_dir_absent(ResourceId, Index) ->
ok = emqx_resource_worker:clear_disk_queue_dir(ResourceId, Index),
ok.
ensure_worker_pool_removed(ResId) -> ensure_worker_pool_removed(ResId) ->
try try
gproc_pool:delete(ResId) gproc_pool:delete(ResId)

View File

@ -88,6 +88,19 @@ on_query(_InstId, block, #{pid := Pid}) ->
on_query(_InstId, resume, #{pid := Pid}) -> on_query(_InstId, resume, #{pid := Pid}) ->
Pid ! resume, Pid ! resume,
ok; ok;
on_query(_InstId, {big_payload, Payload}, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, {big_payload, Payload}},
receive
{ReqRef, ok} ->
?tp(connector_demo_big_payload, #{payload => Payload}),
ok;
{ReqRef, incorrect_status} ->
{error, {recoverable_error, incorrect_status}}
after 1000 ->
{error, timeout}
end;
on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
ReqRef = make_ref(), ReqRef = make_ref(),
From = {self(), ReqRef}, From = {self(), ReqRef},
@ -216,6 +229,9 @@ counter_loop(
{{FromPid, ReqRef}, {inc, _N}} when Status == blocked -> {{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->
FromPid ! {ReqRef, incorrect_status}, FromPid ! {ReqRef, incorrect_status},
State#{incorrect_status_count := IncorrectCount + 1}; State#{incorrect_status_count := IncorrectCount + 1};
{{FromPid, ReqRef}, {big_payload, _Payload}} when Status == blocked ->
FromPid ! {ReqRef, incorrect_status},
State#{incorrect_status_count := IncorrectCount + 1};
{get, ReplyFun} -> {get, ReplyFun} ->
apply_reply(ReplyFun, Num), apply_reply(ReplyFun, Num),
State; State;

View File

@ -1116,6 +1116,88 @@ t_retry_batch(_Config) ->
), ),
ok. ok.
t_delete_and_re_create_with_same_name(_Config) ->
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 1,
worker_pool_size => 2,
queue_seg_bytes => 100,
resume_interval => 1_000
}
),
%% pre-condition: we should have just created a new queue
Queuing0 = emqx_resource_metrics:queuing_get(?ID),
?assertEqual(0, Queuing0),
?check_trace(
begin
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
NumRequests = 10,
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := resource_worker_appended_to_queue}),
%% +1 because the first request will fail,
%% block the resource, and will be
%% re-appended to the queue.
NumRequests + 1,
_Timeout = 5_000
),
%% ensure replayq offloads to disk
Payload = binary:copy(<<"a">>, 119),
lists:foreach(
fun(N) ->
{error, _} =
emqx_resource:query(
?ID,
{big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>}
)
end,
lists:seq(1, NumRequests)
),
{ok, _} = snabbkaffe:receive_events(SRef),
%% ensure that stuff got enqueued into disk
Queuing1 = emqx_resource_metrics:queuing_get(?ID),
?assertEqual(NumRequests, Queuing1),
%% now, we delete the resource
ok = emqx_resource:remove_local(?ID),
?assertEqual({error, not_found}, emqx_resource_manager:lookup(?ID)),
%% re-create the resource with the *same name*
{{ok, _}, {ok, _Events}} =
?wait_async_action(
emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
worker_pool_size => 2,
queue_seg_bytes => 100,
resume_interval => 1_000
}
),
#{?snk_kind := resource_worker_enter_blocked},
5_000
),
%% it shouldn't have anything enqueued, as it's a fresh resource
Queuing2 = emqx_resource_metrics:queuing_get(?ID),
?assertEqual(0, Queuing2),
ok
end,
[]
),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helpers %% Helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------