diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 7840fd474..9b264cfcb 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -53,6 +53,8 @@ -export([reply_after_query/7, batch_reply_after_query/7]). +-export([clear_disk_queue_dir/2]). + -elvis([{elvis_style, dont_repeat_yourself, disable}]). -define(Q_ITEM(REQUEST), {q_item, REQUEST}). @@ -176,6 +178,7 @@ init({Id, Index, Opts}) -> resume_interval => maps:get(resume_interval, Opts, HCItvl), tref => undefined }, + ?tp(resource_worker_init, #{id => Id, index => Index}), {ok, blocked, St, {next_event, cast, resume}}. running(enter, _, St) -> @@ -232,7 +235,9 @@ blocked(info, Info, _Data) -> terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> emqx_resource_metrics:inflight_set(Id, Index, 0), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)), - gproc_pool:disconnect_worker(Id, {Id, Index}). + gproc_pool:disconnect_worker(Id, {Id, Index}), + replayq:close(Q), + ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -432,7 +437,11 @@ flush(Data0) -> {Q1, QAckRef, Batch0} = replayq:pop(Q0, #{count_limit => BatchSize}), Batch = [Item || ?Q_ITEM(Item) <- Batch0], IsBatch = BatchSize =/= 1, - do_flush(Data0, #{ + %% We *must* use the new queue, because we currently can't + %% `nack' a `pop'. + %% Maybe we could re-open the queue? + Data1 = Data0#{queue := Q1}, + do_flush(Data1, #{ new_queue => Q1, is_batch => IsBatch, batch => Batch, @@ -463,24 +472,35 @@ do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_que %% failed and is not async; keep the request in the queue to %% be retried {true, false} -> - {next_state, blocked, Data1}; + %% Note: currently, we cannot safely pop an item from + %% `replayq', keep the old reference to the queue and + %% later try to append new items to the old ref: by + %% popping an item, we may cause the side effect of + %% closing an open segment and opening a new one, and the + %% later `append' with the old file descriptor will fail + %% with `einval' because it has been closed. So we are + %% forced to re-append the item, changing the order of + %% requests... + ok = replayq:ack(Q1, QAckRef), + SentBatch = mark_as_sent(Batch), + Q2 = append_queue(Id, Index, Q1, SentBatch), + Data2 = Data1#{queue := Q2}, + {next_state, blocked, Data2}; %% failed and is async; remove the request from the queue, as %% it is already in inflight table {true, true} -> ok = replayq:ack(Q1, QAckRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - Data = Data1#{queue := Q1}, - {next_state, blocked, Data}; + {next_state, blocked, Data1}; %% success; just ack {false, _} -> ok = replayq:ack(Q1, QAckRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - Data2 = Data1#{queue := Q1}, case replayq:count(Q1) > 0 of true -> - {keep_state, Data2, [{next_event, internal, flush}]}; + {keep_state, Data1, [{next_event, internal, flush}]}; false -> - {keep_state, Data2} + {keep_state, Data1} end end; do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) -> @@ -498,28 +518,39 @@ do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queu %% failed and is not async; keep the request in the queue to %% be retried {true, false} -> - {next_state, blocked, Data1}; + %% Note: currently, we cannot safely pop an item from + %% `replayq', keep the old reference to the queue and + %% later try to append new items to the old ref: by + %% popping an item, we may cause the side effect of + %% closing an open segment and opening a new one, and the + %% later `append' with the old file descriptor will fail + %% with `einval' because it has been closed. So we are + %% forced to re-append the item, changing the order of + %% requests... + ok = replayq:ack(Q1, QAckRef), + SentBatch = mark_as_sent(Batch), + Q2 = append_queue(Id, Index, Q1, SentBatch), + Data2 = Data1#{queue := Q2}, + {next_state, blocked, Data2}; %% failed and is async; remove the request from the queue, as %% it is already in inflight table {true, true} -> ok = replayq:ack(Q1, QAckRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - Data = Data1#{queue := Q1}, - {next_state, blocked, Data}; + {next_state, blocked, Data1}; %% success; just ack {false, _} -> ok = replayq:ack(Q1, QAckRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), CurrentCount = replayq:count(Q1), - Data2 = Data1#{queue := Q1}, case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> - {keep_state, Data2}; + {keep_state, Data1}; {true, true} -> - {keep_state, Data2, [{next_event, internal, flush}]}; + {keep_state, Data1, [{next_event, internal, flush}]}; {true, false} -> - Data3 = ensure_flush_timer(Data2), - {keep_state, Data3} + Data2 = ensure_flush_timer(Data1), + {keep_state, Data2} end end. @@ -874,6 +905,15 @@ disk_queue_dir(Id, Index) -> QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index), filename:join([emqx:data_dir(), "resource_worker", node(), QDir]). +clear_disk_queue_dir(Id, Index) -> + ReplayQDir = disk_queue_dir(Id, Index), + case file:del_dir_r(ReplayQDir) of + {error, enoent} -> + ok; + Res -> + Res + end. + ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) -> Ref = make_ref(), TRef = erlang:send_after(T, self(), {flush, Ref}), diff --git a/apps/emqx_resource/src/emqx_resource_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_worker_sup.erl index b6557620c..a5de76eca 100644 --- a/apps/emqx_resource/src/emqx_resource_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_worker_sup.erl @@ -67,7 +67,8 @@ stop_workers(ResId, Opts) -> WorkerPoolSize = worker_pool_size(Opts), lists:foreach( fun(Idx) -> - ensure_worker_removed(ResId, Idx) + ensure_worker_removed(ResId, Idx), + ensure_disk_queue_dir_absent(ResId, Idx) end, lists:seq(1, WorkerPoolSize) ), @@ -127,6 +128,10 @@ ensure_worker_removed(ResId, Idx) -> {error, Reason} end. +ensure_disk_queue_dir_absent(ResourceId, Index) -> + ok = emqx_resource_worker:clear_disk_queue_dir(ResourceId, Index), + ok. + ensure_worker_pool_removed(ResId) -> try gproc_pool:delete(ResId) diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 692895548..6c03e93cc 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -88,6 +88,19 @@ on_query(_InstId, block, #{pid := Pid}) -> on_query(_InstId, resume, #{pid := Pid}) -> Pid ! resume, ok; +on_query(_InstId, {big_payload, Payload}, #{pid := Pid}) -> + ReqRef = make_ref(), + From = {self(), ReqRef}, + Pid ! {From, {big_payload, Payload}}, + receive + {ReqRef, ok} -> + ?tp(connector_demo_big_payload, #{payload => Payload}), + ok; + {ReqRef, incorrect_status} -> + {error, {recoverable_error, incorrect_status}} + after 1000 -> + {error, timeout} + end; on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> ReqRef = make_ref(), From = {self(), ReqRef}, @@ -216,6 +229,9 @@ counter_loop( {{FromPid, ReqRef}, {inc, _N}} when Status == blocked -> FromPid ! {ReqRef, incorrect_status}, State#{incorrect_status_count := IncorrectCount + 1}; + {{FromPid, ReqRef}, {big_payload, _Payload}} when Status == blocked -> + FromPid ! {ReqRef, incorrect_status}, + State#{incorrect_status_count := IncorrectCount + 1}; {get, ReplyFun} -> apply_reply(ReplyFun, Num), State; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index cdec414c9..ea0f4411c 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1116,6 +1116,88 @@ t_retry_batch(_Config) -> ), ok. +t_delete_and_re_create_with_same_name(_Config) -> + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 1, + worker_pool_size => 2, + queue_seg_bytes => 100, + resume_interval => 1_000 + } + ), + %% pre-condition: we should have just created a new queue + Queuing0 = emqx_resource_metrics:queuing_get(?ID), + ?assertEqual(0, Queuing0), + ?check_trace( + begin + ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), + NumRequests = 10, + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := resource_worker_appended_to_queue}), + %% +1 because the first request will fail, + %% block the resource, and will be + %% re-appended to the queue. + NumRequests + 1, + _Timeout = 5_000 + ), + %% ensure replayq offloads to disk + Payload = binary:copy(<<"a">>, 119), + lists:foreach( + fun(N) -> + {error, _} = + emqx_resource:query( + ?ID, + {big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>} + ) + end, + lists:seq(1, NumRequests) + ), + + {ok, _} = snabbkaffe:receive_events(SRef), + + %% ensure that stuff got enqueued into disk + Queuing1 = emqx_resource_metrics:queuing_get(?ID), + ?assertEqual(NumRequests, Queuing1), + + %% now, we delete the resource + ok = emqx_resource:remove_local(?ID), + ?assertEqual({error, not_found}, emqx_resource_manager:lookup(?ID)), + + %% re-create the resource with the *same name* + {{ok, _}, {ok, _Events}} = + ?wait_async_action( + emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 1, + worker_pool_size => 2, + queue_seg_bytes => 100, + resume_interval => 1_000 + } + ), + #{?snk_kind := resource_worker_enter_blocked}, + 5_000 + ), + + %% it shouldn't have anything enqueued, as it's a fresh resource + Queuing2 = emqx_resource_metrics:queuing_get(?ID), + ?assertEqual(0, Queuing2), + + ok + end, + [] + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------