fix(buffer): fix `replayq` usages in buffer workers (5.0)

https://emqx.atlassian.net/browse/EMQX-8700

Fixes a few errors in the usage of `replayq` queues.

- Close `replayq` when `emqx_resource_worker` terminates.
- Do not keep old references to `replayq` after any `pop`s.
- Clear `replayq`'s data directories when removing a resource.
This commit is contained in:
Thales Macedo Garitezi 2023-01-06 14:05:16 -03:00
parent 3048ad666a
commit 4273017a99
4 changed files with 160 additions and 17 deletions

View File

@ -53,6 +53,8 @@
-export([reply_after_query/7, batch_reply_after_query/7]).
-export([clear_disk_queue_dir/2]).
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
-define(Q_ITEM(REQUEST), {q_item, REQUEST}).
@ -177,6 +179,7 @@ init({Id, Index, Opts}) ->
resume_interval => maps:get(resume_interval, Opts, HCItvl),
tref => undefined
},
?tp(resource_worker_init, #{id => Id, index => Index}),
{ok, blocked, St, {next_event, cast, resume}}.
running(enter, _, St) ->
@ -233,7 +236,9 @@ blocked(info, Info, _Data) ->
terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
emqx_resource_metrics:inflight_set(Id, Index, 0),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
gproc_pool:disconnect_worker(Id, {Id, Index}).
gproc_pool:disconnect_worker(Id, {Id, Index}),
replayq:close(Q),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -433,7 +438,11 @@ flush(Data0) ->
{Q1, QAckRef, Batch0} = replayq:pop(Q0, #{count_limit => BatchSize}),
Batch = [Item || ?Q_ITEM(Item) <- Batch0],
IsBatch = BatchSize =/= 1,
do_flush(Data0, #{
%% We *must* use the new queue, because we currently can't
%% `nack' a `pop'.
%% Maybe we could re-open the queue?
Data1 = Data0#{queue := Q1},
do_flush(Data1, #{
new_queue => Q1,
is_batch => IsBatch,
batch => Batch,
@ -464,24 +473,35 @@ do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_que
%% failed and is not async; keep the request in the queue to
%% be retried
{true, false} ->
{next_state, blocked, Data1};
%% Note: currently, we cannot safely pop an item from
%% `replayq', keep the old reference to the queue and
%% later try to append new items to the old ref: by
%% popping an item, we may cause the side effect of
%% closing an open segment and opening a new one, and the
%% later `append' with the old file descriptor will fail
%% with `einval' because it has been closed. So we are
%% forced to re-append the item, changing the order of
%% requests...
ok = replayq:ack(Q1, QAckRef),
SentBatch = mark_as_sent(Batch),
Q2 = append_queue(Id, Index, Q1, SentBatch),
Data2 = Data1#{queue := Q2},
{next_state, blocked, Data2};
%% failed and is async; remove the request from the queue, as
%% it is already in inflight table
{true, true} ->
ok = replayq:ack(Q1, QAckRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
Data = Data1#{queue := Q1},
{next_state, blocked, Data};
{next_state, blocked, Data1};
%% success; just ack
{false, _} ->
ok = replayq:ack(Q1, QAckRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
Data2 = Data1#{queue := Q1},
case replayq:count(Q1) > 0 of
true ->
{keep_state, Data2, [{next_event, internal, flush}]};
{keep_state, Data1, [{next_event, internal, flush}]};
false ->
{keep_state, Data2}
{keep_state, Data1}
end
end;
do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) ->
@ -499,28 +519,39 @@ do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queu
%% failed and is not async; keep the request in the queue to
%% be retried
{true, false} ->
{next_state, blocked, Data1};
%% Note: currently, we cannot safely pop an item from
%% `replayq', keep the old reference to the queue and
%% later try to append new items to the old ref: by
%% popping an item, we may cause the side effect of
%% closing an open segment and opening a new one, and the
%% later `append' with the old file descriptor will fail
%% with `einval' because it has been closed. So we are
%% forced to re-append the item, changing the order of
%% requests...
ok = replayq:ack(Q1, QAckRef),
SentBatch = mark_as_sent(Batch),
Q2 = append_queue(Id, Index, Q1, SentBatch),
Data2 = Data1#{queue := Q2},
{next_state, blocked, Data2};
%% failed and is async; remove the request from the queue, as
%% it is already in inflight table
{true, true} ->
ok = replayq:ack(Q1, QAckRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
Data = Data1#{queue := Q1},
{next_state, blocked, Data};
{next_state, blocked, Data1};
%% success; just ack
{false, _} ->
ok = replayq:ack(Q1, QAckRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
CurrentCount = replayq:count(Q1),
Data2 = Data1#{queue := Q1},
case {CurrentCount > 0, CurrentCount >= BatchSize} of
{false, _} ->
{keep_state, Data2};
{keep_state, Data1};
{true, true} ->
{keep_state, Data2, [{next_event, internal, flush}]};
{keep_state, Data1, [{next_event, internal, flush}]};
{true, false} ->
Data3 = ensure_flush_timer(Data2),
{keep_state, Data3}
Data2 = ensure_flush_timer(Data1),
{keep_state, Data2}
end
end.
@ -879,6 +910,15 @@ disk_queue_dir(Id, Index) ->
QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
filename:join([emqx:data_dir(), "resource_worker", node(), QDir]).
clear_disk_queue_dir(Id, Index) ->
ReplayQDir = disk_queue_dir(Id, Index),
case file:del_dir_r(ReplayQDir) of
{error, enoent} ->
ok;
Res ->
Res
end.
ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) ->
Ref = make_ref(),
TRef = erlang:send_after(T, self(), {flush, Ref}),

View File

@ -67,7 +67,8 @@ stop_workers(ResId, Opts) ->
WorkerPoolSize = worker_pool_size(Opts),
lists:foreach(
fun(Idx) ->
ensure_worker_removed(ResId, Idx)
ensure_worker_removed(ResId, Idx),
ensure_disk_queue_dir_absent(ResId, Idx)
end,
lists:seq(1, WorkerPoolSize)
),
@ -127,6 +128,10 @@ ensure_worker_removed(ResId, Idx) ->
{error, Reason}
end.
ensure_disk_queue_dir_absent(ResourceId, Index) ->
ok = emqx_resource_worker:clear_disk_queue_dir(ResourceId, Index),
ok.
ensure_worker_pool_removed(ResId) ->
try
gproc_pool:delete(ResId)

View File

@ -88,6 +88,19 @@ on_query(_InstId, block, #{pid := Pid}) ->
on_query(_InstId, resume, #{pid := Pid}) ->
Pid ! resume,
ok;
on_query(_InstId, {big_payload, Payload}, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, {big_payload, Payload}},
receive
{ReqRef, ok} ->
?tp(connector_demo_big_payload, #{payload => Payload}),
ok;
{ReqRef, incorrect_status} ->
{error, {recoverable_error, incorrect_status}}
after 1000 ->
{error, timeout}
end;
on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
@ -216,6 +229,9 @@ counter_loop(
{{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->
FromPid ! {ReqRef, incorrect_status},
State#{incorrect_status_count := IncorrectCount + 1};
{{FromPid, ReqRef}, {big_payload, _Payload}} when Status == blocked ->
FromPid ! {ReqRef, incorrect_status},
State#{incorrect_status_count := IncorrectCount + 1};
{get, ReplyFun} ->
apply_reply(ReplyFun, Num),
State;

View File

@ -1108,6 +1108,88 @@ t_retry_batch(_Config) ->
),
ok.
t_delete_and_re_create_with_same_name(_Config) ->
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 1,
worker_pool_size => 2,
queue_seg_bytes => 100,
resume_interval => 1_000
}
),
%% pre-condition: we should have just created a new queue
Queuing0 = emqx_resource_metrics:queuing_get(?ID),
?assertEqual(0, Queuing0),
?check_trace(
begin
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
NumRequests = 10,
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := resource_worker_appended_to_queue}),
%% +1 because the first request will fail,
%% block the resource, and will be
%% re-appended to the queue.
NumRequests + 1,
_Timeout = 5_000
),
%% ensure replayq offloads to disk
Payload = binary:copy(<<"a">>, 119),
lists:foreach(
fun(N) ->
{error, _} =
emqx_resource:query(
?ID,
{big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>}
)
end,
lists:seq(1, NumRequests)
),
{ok, _} = snabbkaffe:receive_events(SRef),
%% ensure that stuff got enqueued into disk
Queuing1 = emqx_resource_metrics:queuing_get(?ID),
?assertEqual(NumRequests, Queuing1),
%% now, we delete the resource
ok = emqx_resource:remove_local(?ID),
?assertEqual({error, not_found}, emqx_resource_manager:lookup(?ID)),
%% re-create the resource with the *same name*
{{ok, _}, {ok, _Events}} =
?wait_async_action(
emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
worker_pool_size => 2,
queue_seg_bytes => 100,
resume_interval => 1_000
}
),
#{?snk_kind := resource_worker_enter_blocked},
5_000
),
%% it shouldn't have anything enqueued, as it's a fresh resource
Queuing2 = emqx_resource_metrics:queuing_get(?ID),
?assertEqual(0, Queuing2),
ok
end,
[]
),
ok.
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------