Compare commits

...

4 Commits

Author SHA1 Message Date
Thales Macedo Garitezi 80976708e8 feat(buffer_worker): also use the inflight table for sync requests
Related: https://emqx.atlassian.net/browse/EMQX-8692

This should also correctly account for `retried.*` metrics for sync
requests.

Also fixes cases where race conditions for retrying async requests
could potentially lead to inconsistent metrics.

Fixes more cases where a stale reference to `replayq` was being held
accidentally after a `pop`.
2023-01-10 19:31:30 -03:00
Thales Macedo Garitezi c89114b0e3 chore(replayq): update replayq -> 0.3.6 and use `clean_start` for buffer workers
So we can truly avoid resuming work after a node restart.
2023-01-10 18:16:38 -03:00
Thales Macedo Garitezi 4273017a99 fix(buffer): fix `replayq` usages in buffer workers (5.0)
https://emqx.atlassian.net/browse/EMQX-8700

Fixes a few errors in the usage of `replayq` queues.

- Close `replayq` when `emqx_resource_worker` terminates.
- Do not keep old references to `replayq` after any `pop`s.
- Clear `replayq`'s data directories when removing a resource.
2023-01-09 15:16:45 -03:00
Thales Macedo Garitezi 3048ad666a chore(influxdb): remove deprecated value from example 2023-01-09 15:16:45 -03:00
8 changed files with 463 additions and 161 deletions

View File

@ -53,6 +53,8 @@
-export([reply_after_query/7, batch_reply_after_query/7]). -export([reply_after_query/7, batch_reply_after_query/7]).
-export([clear_disk_queue_dir/2]).
-elvis([{elvis_style, dont_repeat_yourself, disable}]). -elvis([{elvis_style, dont_repeat_yourself, disable}]).
-define(Q_ITEM(REQUEST), {q_item, REQUEST}). -define(Q_ITEM(REQUEST), {q_item, REQUEST}).
@ -112,10 +114,13 @@ simple_sync_query(Id, Request) ->
%% would mess up the metrics anyway. `undefined' is ignored by %% would mess up the metrics anyway. `undefined' is ignored by
%% `emqx_resource_metrics:*_shift/3'. %% `emqx_resource_metrics:*_shift/3'.
Index = undefined, Index = undefined,
QueryOpts = #{}, QueryOpts = #{perform_inflight_capacity_check => false},
emqx_resource_metrics:matched_inc(Id), emqx_resource_metrics:matched_inc(Id),
Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), QueryOpts), Ref = make_message_ref(),
_ = handle_query_result(Id, Result, false, false), Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts),
HasBeenSent = false,
BlockWorker = false,
_ = handle_query_result(Id, Result, HasBeenSent, BlockWorker),
Result. Result.
-spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
@ -127,10 +132,13 @@ simple_async_query(Id, Request, ReplyFun) ->
%% would mess up the metrics anyway. `undefined' is ignored by %% would mess up the metrics anyway. `undefined' is ignored by
%% `emqx_resource_metrics:*_shift/3'. %% `emqx_resource_metrics:*_shift/3'.
Index = undefined, Index = undefined,
QueryOpts = #{}, QueryOpts = #{perform_inflight_capacity_check => false},
emqx_resource_metrics:matched_inc(Id), emqx_resource_metrics:matched_inc(Id),
Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), QueryOpts), Ref = make_message_ref(),
_ = handle_query_result(Id, Result, false, false), Result = call_query(async, Id, Index, Ref, ?QUERY(ReplyFun, Request, false), QueryOpts),
HasBeenSent = false,
BlockWorker = false,
_ = handle_query_result(Id, Result, HasBeenSent, BlockWorker),
Result. Result.
-spec block(pid() | atom()) -> ok. -spec block(pid() | atom()) -> ok.
@ -157,7 +165,7 @@ init({Id, Index, Opts}) ->
max_total_bytes => TotalBytes, max_total_bytes => TotalBytes,
%% we don't want to retain the queue after %% we don't want to retain the queue after
%% resource restarts. %% resource restarts.
offload => true, offload => {true, volatile},
seg_bytes => SegBytes, seg_bytes => SegBytes,
sizer => fun ?MODULE:estimate_size/1 sizer => fun ?MODULE:estimate_size/1
}, },
@ -177,6 +185,7 @@ init({Id, Index, Opts}) ->
resume_interval => maps:get(resume_interval, Opts, HCItvl), resume_interval => maps:get(resume_interval, Opts, HCItvl),
tref => undefined tref => undefined
}, },
?tp(resource_worker_init, #{id => Id, index => Index}),
{ok, blocked, St, {next_event, cast, resume}}. {ok, blocked, St, {next_event, cast, resume}}.
running(enter, _, St) -> running(enter, _, St) ->
@ -233,7 +242,9 @@ blocked(info, Info, _Data) ->
terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
emqx_resource_metrics:inflight_set(Id, Index, 0), emqx_resource_metrics:inflight_set(Id, Index, 0),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
gproc_pool:disconnect_worker(Id, {Id, Index}). gproc_pool:disconnect_worker(Id, {Id, Index}),
replayq:close(Q),
ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -308,16 +319,27 @@ retry_queue(
empty -> empty ->
{next_state, running, Data0}; {next_state, running, Data0};
{Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} -> {Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} ->
Data = Data0#{queue := Q1},
QueryOpts = #{inflight_name => Name}, QueryOpts = #{inflight_name => Name},
Result = call_query(configured, Id, Index, Query, QueryOpts), Ref = make_message_ref(),
Result = call_query(configured, Id, Index, Ref, Query, QueryOpts),
Reply = ?REPLY(undefined, Request, HasBeenSent, Result), Reply = ?REPLY(undefined, Request, HasBeenSent, Result),
case reply_caller(Id, Reply) of case reply_caller(Id, Reply) of
true -> true ->
{keep_state, Data0, {state_timeout, ResumeT, resume}}; %% Still failed, but now it's in the inflight
%% table and marked as sent, except if the result
%% says inflight is full. In this case, we must
%% ensure it's indeed in the inflight table or
%% risk lose it.
ok = replayq:ack(Q1, QAckRef),
is_inflight_full_result(Result) andalso
inflight_append(Name, Ref, Query, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
{keep_state, Data, {state_timeout, ResumeT, resume}};
false -> false ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_async(Id) orelse inflight_drop(Name, Ref, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
Data = Data0#{queue := Q1},
retry_queue(Data) retry_queue(Data)
end end
end; end;
@ -336,8 +358,10 @@ retry_queue(
empty -> empty ->
{next_state, running, Data0}; {next_state, running, Data0};
{Q1, QAckRef, Batch0} -> {Q1, QAckRef, Batch0} ->
Data = Data0#{queue := Q1},
QueryOpts = #{inflight_name => Name}, QueryOpts = #{inflight_name => Name},
Result = call_query(configured, Id, Index, Batch0, QueryOpts), Ref = make_message_ref(),
Result = call_query(configured, Id, Index, Ref, Batch0, QueryOpts),
%% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue, %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue,
%% we now change the 'from' field to 'undefined' so it will not reply the caller again. %% we now change the 'from' field to 'undefined' so it will not reply the caller again.
Batch = [ Batch = [
@ -347,12 +371,21 @@ retry_queue(
case batch_reply_caller(Id, Result, Batch) of case batch_reply_caller(Id, Result, Batch) of
true -> true ->
?tp(resource_worker_retry_queue_batch_failed, #{batch => Batch}), ?tp(resource_worker_retry_queue_batch_failed, #{batch => Batch}),
{keep_state, Data0, {state_timeout, ResumeT, resume}}; %% Still failed, but now it's in the inflight
%% table and marked as sent, except if the result
%% says inflight is full. In this case, we must
%% ensure it's indeed in the inflight table or
%% risk lose it.
ok = replayq:ack(Q1, QAckRef),
is_inflight_full_result(Result) andalso
inflight_append(Name, Ref, Batch, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
{keep_state, Data, {state_timeout, ResumeT, resume}};
false -> false ->
?tp(resource_worker_retry_queue_batch_succeeded, #{batch => Batch}), ?tp(resource_worker_retry_queue_batch_succeeded, #{batch => Batch}),
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_async(Id) orelse inflight_drop(Name, Ref, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
Data = Data0#{queue := Q1},
retry_queue(Data) retry_queue(Data)
end end
end. end.
@ -367,15 +400,25 @@ retry_inflight_sync(
QueryOpts = #{}, QueryOpts = #{},
%% if we are retrying an inflight query, it has been sent %% if we are retrying an inflight query, it has been sent
HasBeenSent = true, HasBeenSent = true,
Result = call_query(sync, Id, Index, QueryOrBatch, QueryOpts), Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
BlockWorker = false, BlockWorker = false,
case handle_query_result(Id, Result, HasBeenSent, BlockWorker) of case handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker) of
%% Send failed because resource is down %% Send failed because resource is down
true -> {true, PostFn} ->
PostFn(),
?tp(resource_worker_retry_inflight_failed, #{query_or_batch => QueryOrBatch}),
{keep_state, Data0, {state_timeout, ResumeT, resume}}; {keep_state, Data0, {state_timeout, ResumeT, resume}};
%% Send ok or failed but the resource is working %% Send ok or failed but the resource is working
false -> {false, PostFn} ->
inflight_drop(Name, Ref, Id, Index), IsDropped = inflight_drop(Name, Ref, Id, Index),
%% we need to defer bumping the counters after
%% `inflight_drop' to avoid the race condition when an
%% inflight request might get completed concurrently with
%% the retry, bumping them twice. Since both inflight
%% requests (repeated and original) have the safe `Ref',
%% we bump the counter when removing it from the table.
IsDropped andalso PostFn(),
?tp(resource_worker_retry_inflight_succeeded, #{query_or_batch => QueryOrBatch}),
do_resume(Data0) do_resume(Data0)
end. end.
@ -433,10 +476,16 @@ flush(Data0) ->
{Q1, QAckRef, Batch0} = replayq:pop(Q0, #{count_limit => BatchSize}), {Q1, QAckRef, Batch0} = replayq:pop(Q0, #{count_limit => BatchSize}),
Batch = [Item || ?Q_ITEM(Item) <- Batch0], Batch = [Item || ?Q_ITEM(Item) <- Batch0],
IsBatch = BatchSize =/= 1, IsBatch = BatchSize =/= 1,
do_flush(Data0, #{ %% We *must* use the new queue, because we currently can't
%% `nack' a `pop'.
%% Maybe we could re-open the queue?
Data1 = Data0#{queue := Q1},
Ref = make_message_ref(),
do_flush(Data1, #{
new_queue => Q1, new_queue => Q1,
is_batch => IsBatch, is_batch => IsBatch,
batch => Batch, batch => Batch,
ref => Ref,
ack_ref => QAckRef ack_ref => QAckRef
}) })
end. end.
@ -447,7 +496,16 @@ flush(Data0) ->
ack_ref := replayq:ack_ref() ack_ref := replayq:ack_ref()
}) -> }) ->
gen_statem:event_handler_result(state(), data()). gen_statem:event_handler_result(state(), data()).
do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) -> do_flush(
Data0,
#{
is_batch := false,
batch := Batch,
ref := Ref,
ack_ref := QAckRef,
new_queue := Q1
}
) ->
#{ #{
id := Id, id := Id,
index := Index, index := Index,
@ -456,35 +514,38 @@ do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_que
%% unwrap when not batching (i.e., batch size == 1) %% unwrap when not batching (i.e., batch size == 1)
[?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch,
QueryOpts = #{inflight_name => Name}, QueryOpts = #{inflight_name => Name},
Result = call_query(configured, Id, Index, Request, QueryOpts), Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
IsAsync = is_async(Id),
Data1 = cancel_flush_timer(Data0), Data1 = cancel_flush_timer(Data0),
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
case {reply_caller(Id, Reply), IsAsync} of case reply_caller(Id, Reply) of
%% failed and is not async; keep the request in the queue to %% Failed; remove the request from the queue, as we cannot pop
%% be retried %% from it again. But we must ensure it's in the inflight
{true, false} -> %% table, even if it's full, so we don't lose the request.
%% And only in that case.
true ->
ok = replayq:ack(Q1, QAckRef),
is_inflight_full_result(Result) andalso inflight_append(Name, Ref, Request, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
{next_state, blocked, Data1}; {next_state, blocked, Data1};
%% failed and is async; remove the request from the queue, as %% Success; just ack.
%% it is already in inflight table false ->
{true, true} ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_async(Id) orelse inflight_drop(Name, Ref, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
Data = Data1#{queue := Q1},
{next_state, blocked, Data};
%% success; just ack
{false, _} ->
ok = replayq:ack(Q1, QAckRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
Data2 = Data1#{queue := Q1},
case replayq:count(Q1) > 0 of case replayq:count(Q1) > 0 of
true -> true ->
{keep_state, Data2, [{next_event, internal, flush}]}; {keep_state, Data1, [{next_event, internal, flush}]};
false -> false ->
{keep_state, Data2} {keep_state, Data1}
end end
end; end;
do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) -> do_flush(Data0, #{
is_batch := true,
batch := Batch,
ref := Ref,
ack_ref := QAckRef,
new_queue := Q1
}) ->
#{ #{
id := Id, id := Id,
index := Index, index := Index,
@ -492,56 +553,64 @@ do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queu
name := Name name := Name
} = Data0, } = Data0,
QueryOpts = #{inflight_name => Name}, QueryOpts = #{inflight_name => Name},
Result = call_query(configured, Id, Index, Batch, QueryOpts), Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts),
IsAsync = is_async(Id),
Data1 = cancel_flush_timer(Data0), Data1 = cancel_flush_timer(Data0),
case {batch_reply_caller(Id, Result, Batch), IsAsync} of case batch_reply_caller(Id, Result, Batch) of
%% failed and is not async; keep the request in the queue to %% Failed; remove the request from the queue, as we cannot pop
%% be retried %% from it again. But we must ensure it's in the inflight
{true, false} -> %% table, even if it's full, so we don't lose the request.
{next_state, blocked, Data1}; %% And only in that case.
%% failed and is async; remove the request from the queue, as true ->
%% it is already in inflight table
{true, true} ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_inflight_full_result(Result) andalso inflight_append(Name, Ref, Batch, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
Data = Data1#{queue := Q1}, {next_state, blocked, Data1};
{next_state, blocked, Data}; %% Success; just ack.
%% success; just ack false ->
{false, _} ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_async(Id) orelse inflight_drop(Name, Ref, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
CurrentCount = replayq:count(Q1), CurrentCount = replayq:count(Q1),
Data2 = Data1#{queue := Q1},
case {CurrentCount > 0, CurrentCount >= BatchSize} of case {CurrentCount > 0, CurrentCount >= BatchSize} of
{false, _} -> {false, _} ->
{keep_state, Data2}; {keep_state, Data1};
{true, true} -> {true, true} ->
{keep_state, Data2, [{next_event, internal, flush}]}; {keep_state, Data1, [{next_event, internal, flush}]};
{true, false} -> {true, false} ->
Data3 = ensure_flush_timer(Data2), Data2 = ensure_flush_timer(Data1),
{keep_state, Data3} {keep_state, Data2}
end end
end. end.
batch_reply_caller(Id, BatchResult, Batch) -> batch_reply_caller(Id, BatchResult, Batch) ->
{ShouldBlock, PostFns} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch),
lists:foreach(fun(F) -> F() end, PostFns),
ShouldBlock.
batch_reply_caller_defer_metrics(Id, BatchResult, Batch) ->
lists:foldl( lists:foldl(
fun(Reply, BlockWorker) -> fun(Reply, {BlockWorker, PostFns}) ->
reply_caller(Id, Reply, BlockWorker) {ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply, BlockWorker),
{ShouldBlock, [PostFn | PostFns]}
end, end,
false, {false, []},
%% the `Mod:on_batch_query/3` returns a single result for a batch, %% the `Mod:on_batch_query/3` returns a single result for a batch,
%% so we need to expand %% so we need to expand
?EXPAND(BatchResult, Batch) ?EXPAND(BatchResult, Batch)
). ).
reply_caller(Id, Reply) -> reply_caller(Id, Reply) ->
BlockWorker = false, {ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply),
reply_caller(Id, Reply, BlockWorker). PostFn(),
ShouldBlock.
reply_caller(Id, ?REPLY(undefined, _, HasBeenSent, Result), BlockWorker) -> reply_caller_defer_metrics(Id, Reply) ->
handle_query_result(Id, Result, HasBeenSent, BlockWorker); BlockWorker = false,
reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker) when reply_caller_defer_metrics(Id, Reply, BlockWorker).
reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result), BlockWorker) ->
handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker);
reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker) when
is_function(ReplyFun) is_function(ReplyFun)
-> ->
_ = _ =
@ -549,55 +618,89 @@ reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker)
{async_return, _} -> no_reply_for_now; {async_return, _} -> no_reply_for_now;
_ -> apply(ReplyFun, Args ++ [Result]) _ -> apply(ReplyFun, Args ++ [Result])
end, end,
handle_query_result(Id, Result, HasBeenSent, BlockWorker); handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker);
reply_caller(Id, ?REPLY(From, _, HasBeenSent, Result), BlockWorker) -> reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result), BlockWorker) ->
gen_statem:reply(From, Result), gen_statem:reply(From, Result),
handle_query_result(Id, Result, HasBeenSent, BlockWorker). handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker).
handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent, BlockWorker) -> handle_query_result(Id, Result, HasBeenSent, BlockWorker) ->
{ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker),
PostFn(),
ShouldBlock.
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent, BlockWorker) ->
PostFn = fun() ->
?SLOG(error, #{msg => resource_exception, info => Msg}), ?SLOG(error, #{msg => resource_exception, info => Msg}),
inc_sent_failed(Id, HasBeenSent), inc_sent_failed(Id, HasBeenSent),
BlockWorker; ok
handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _) when end,
{BlockWorker, PostFn};
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _) when
NotWorking == not_connected; NotWorking == blocked NotWorking == not_connected; NotWorking == blocked
-> ->
true; {true, fun() -> ok end};
handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, BlockWorker) -> handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, BlockWorker) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
emqx_resource_metrics:dropped_resource_not_found_inc(Id), emqx_resource_metrics:dropped_resource_not_found_inc(Id),
BlockWorker; ok
handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, BlockWorker) -> end,
{BlockWorker, PostFn};
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, BlockWorker) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
emqx_resource_metrics:dropped_resource_stopped_inc(Id), emqx_resource_metrics:dropped_resource_stopped_inc(Id),
BlockWorker; ok
handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, BlockWorker) -> end,
{BlockWorker, PostFn};
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, BlockWorker) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
emqx_resource_metrics:dropped_other_inc(Id), emqx_resource_metrics:dropped_other_inc(Id),
BlockWorker; ok
handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasBeenSent, _BlockWorker) -> end,
{BlockWorker, PostFn};
handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent, _BlockWorker) ->
%% the message will be queued in replayq or inflight window, %% the message will be queued in replayq or inflight window,
%% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
%% sent this message. %% sent this message.
PostFn = fun() ->
?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}), ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}),
true; ok
handle_query_result(Id, {error, Reason}, HasBeenSent, BlockWorker) -> end,
{true, PostFn};
handle_query_result_pure(Id, {error, Reason}, HasBeenSent, BlockWorker) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
inc_sent_failed(Id, HasBeenSent), inc_sent_failed(Id, HasBeenSent),
BlockWorker; ok
handle_query_result(_Id, {async_return, inflight_full}, _HasBeenSent, _BlockWorker) -> end,
true; {BlockWorker, PostFn};
handle_query_result(Id, {async_return, {error, Msg}}, HasBeenSent, BlockWorker) -> handle_query_result_pure(_Id, {async_return, inflight_full}, _HasBeenSent, _BlockWorker) ->
{true, fun() -> ok end};
handle_query_result_pure(Id, {async_return, {error, Msg}}, HasBeenSent, BlockWorker) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
inc_sent_failed(Id, HasBeenSent), inc_sent_failed(Id, HasBeenSent),
BlockWorker; ok
handle_query_result(_Id, {async_return, ok}, _HasBeenSent, BlockWorker) -> end,
BlockWorker; {BlockWorker, PostFn};
handle_query_result(Id, Result, HasBeenSent, BlockWorker) -> handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent, BlockWorker) ->
{BlockWorker, fun() -> ok end};
handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker) ->
PostFn = fun() ->
assert_ok_result(Result), assert_ok_result(Result),
inc_sent_success(Id, HasBeenSent), inc_sent_success(Id, HasBeenSent),
BlockWorker. ok
end,
{BlockWorker, PostFn}.
call_query(QM0, Id, Index, Query, QueryOpts) -> is_inflight_full_result({async_return, inflight_full}) ->
true;
is_inflight_full_result(_) ->
false.
call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
?tp(call_query_enter, #{id => Id, query => Query}), ?tp(call_query_enter, #{id => Id, query => Query}),
case emqx_resource_manager:ets_lookup(Id) of case emqx_resource_manager:ets_lookup(Id) of
{ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} -> {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
@ -607,7 +710,7 @@ call_query(QM0, Id, Index, Query, QueryOpts) ->
_ -> QM0 _ -> QM0
end, end,
CM = maps:get(callback_mode, Data), CM = maps:get(callback_mode, Data),
apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Query, ResSt, QueryOpts); apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
{ok, _Group, #{status := stopped}} -> {ok, _Group, #{status := stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled"); ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, #{status := S}} when S == connecting; S == disconnected -> {ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
@ -634,20 +737,34 @@ call_query(QM0, Id, Index, Query, QueryOpts) ->
end end
). ).
apply_query_fun(sync, Mod, Id, _Index, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); Name = maps:get(inflight_name, QueryOpts, undefined),
apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
?APPLY_RESOURCE(
call_query,
case PerformInflightCapacityCheck andalso is_inflight_full(Name) of
true ->
%% should be kept in the inflight table and retried
%% when resuming.
{async_return, inflight_full};
false ->
ok = inflight_append(Name, Ref, Query, Id, Index),
Mod:on_query(Id, Request, ResSt)
end,
Request
);
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined), Name = maps:get(inflight_name, QueryOpts, undefined),
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
?APPLY_RESOURCE( ?APPLY_RESOURCE(
call_query_async, call_query_async,
case is_inflight_full(Name) of case PerformInflightCapacityCheck andalso is_inflight_full(Name) of
true -> true ->
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
ReplyFun = fun ?MODULE:reply_after_query/7, ReplyFun = fun ?MODULE:reply_after_query/7,
Ref = make_message_ref(),
Args = [self(), Id, Index, Name, Ref, Query], Args = [self(), Id, Index, Name, Ref, Query],
ok = inflight_append(Name, Ref, Query, Id, Index), ok = inflight_append(Name, Ref, Query, Id, Index),
Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
@ -655,21 +772,35 @@ apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, Que
end, end,
Request Request
); );
apply_query_fun(sync, Mod, Id, _Index, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined),
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
Requests = [Request || ?QUERY(_From, Request, _) <- Batch], Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); ?APPLY_RESOURCE(
apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> call_batch_query,
case PerformInflightCapacityCheck andalso is_inflight_full(Name) of
true ->
%% should be kept in the inflight table and retried
%% when resuming.
{async_return, inflight_full};
false ->
ok = inflight_append(Name, Ref, Batch, Id, Index),
Mod:on_batch_query(Id, Requests, ResSt)
end,
Batch
);
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined), Name = maps:get(inflight_name, QueryOpts, undefined),
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
?APPLY_RESOURCE( ?APPLY_RESOURCE(
call_batch_query_async, call_batch_query_async,
case is_inflight_full(Name) of case PerformInflightCapacityCheck andalso is_inflight_full(Name) of
true -> true ->
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
ReplyFun = fun ?MODULE:batch_reply_after_query/7, ReplyFun = fun ?MODULE:batch_reply_after_query/7,
Ref = make_message_ref(),
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]}, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]},
Requests = [Request || ?QUERY(_From, Request, _) <- Batch], Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
ok = inflight_append(Name, Ref, Batch, Id, Index), ok = inflight_append(Name, Ref, Batch, Id, Index),
@ -683,29 +814,36 @@ reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasBeenSent),
%% NOTE: 'inflight' is the count of messages that were sent async %% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the %% but received no ACK, NOT the number of messages queued in the
%% inflight window. %% inflight window.
case reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Result)) of case reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)) of
true -> {true, PostFn} ->
PostFn(),
?MODULE:block(Pid); ?MODULE:block(Pid);
false -> {false, PostFn} ->
drop_inflight_and_resume(Pid, Name, Ref, Id, Index) IsDropped = drop_inflight_and_resume(Pid, Name, Ref, Id, Index),
IsDropped andalso PostFn(),
ok
end. end.
batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) -> batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) ->
%% NOTE: 'inflight' is the count of messages that were sent async %% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the %% but received no ACK, NOT the number of messages queued in the
%% inflight window. %% inflight window.
case batch_reply_caller(Id, Result, Batch) of case batch_reply_caller_defer_metrics(Id, Result, Batch) of
true -> {true, PostFns} ->
lists:foreach(fun(F) -> F() end, PostFns),
?MODULE:block(Pid); ?MODULE:block(Pid);
false -> {false, PostFns} ->
drop_inflight_and_resume(Pid, Name, Ref, Id, Index) IsDropped = drop_inflight_and_resume(Pid, Name, Ref, Id, Index),
IsDropped andalso lists:foreach(fun(F) -> F() end, PostFns),
ok
end. end.
drop_inflight_and_resume(Pid, Name, Ref, Id, Index) -> drop_inflight_and_resume(Pid, Name, Ref, Id, Index) ->
case is_inflight_full(Name) of case is_inflight_full(Name) of
true -> true ->
inflight_drop(Name, Ref, Id, Index), IsDropped = inflight_drop(Name, Ref, Id, Index),
?MODULE:resume(Pid); ?MODULE:resume(Pid),
IsDropped;
false -> false ->
inflight_drop(Name, Ref, Id, Index) inflight_drop(Name, Ref, Id, Index)
end. end.
@ -805,16 +943,18 @@ inflight_append(undefined, _Ref, _Query, _Id, _Index) ->
ok; ok;
inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) -> inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) ->
Batch = mark_as_sent(Batch0), Batch = mark_as_sent(Batch0),
ets:insert(Name, {Ref, Batch}), IsNew = ets:insert_new(Name, {Ref, Batch}),
BatchSize = length(Batch), BatchSize = length(Batch),
ets:update_counter(Name, ?SIZE_REF, {2, BatchSize}), IsNew andalso ets:update_counter(Name, ?SIZE_REF, {2, BatchSize}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
?tp(resource_worker_appended_to_inflight, #{batch => Batch, is_new => IsNew}),
ok; ok;
inflight_append(Name, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) -> inflight_append(Name, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) ->
Query = mark_as_sent(Query0), Query = mark_as_sent(Query0),
ets:insert(Name, {Ref, Query}), IsNew = ets:insert_new(Name, {Ref, Query}),
ets:update_counter(Name, ?SIZE_REF, {2, 1}), IsNew andalso ets:update_counter(Name, ?SIZE_REF, {2, 1}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
?tp(resource_worker_appended_to_inflight, #{query => Query, is_new => IsNew}),
ok; ok;
inflight_append(Name, Ref, Data, _Id, _Index) -> inflight_append(Name, Ref, Data, _Id, _Index) ->
ets:insert(Name, {Ref, Data}), ets:insert(Name, {Ref, Data}),
@ -822,8 +962,8 @@ inflight_append(Name, Ref, Data, _Id, _Index) ->
%% the inflight metric. %% the inflight metric.
ok. ok.
inflight_drop(undefined, _, _Id, _Index) -> inflight_drop(undefined, _Ref, _Id, _Index) ->
ok; false;
inflight_drop(Name, Ref, Id, Index) -> inflight_drop(Name, Ref, Id, Index) ->
Count = Count =
case ets:take(Name, Ref) of case ets:take(Name, Ref) of
@ -831,9 +971,10 @@ inflight_drop(Name, Ref, Id, Index) ->
[{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch); [{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch);
_ -> 0 _ -> 0
end, end,
Count > 0 andalso ets:update_counter(Name, ?SIZE_REF, {2, -Count, 0, 0}), IsDropped = Count > 0,
IsDropped andalso ets:update_counter(Name, ?SIZE_REF, {2, -Count, 0, 0}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
ok. IsDropped.
%%============================================================================== %%==============================================================================
@ -879,6 +1020,15 @@ disk_queue_dir(Id, Index) ->
QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index), QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
filename:join([emqx:data_dir(), "resource_worker", node(), QDir]). filename:join([emqx:data_dir(), "resource_worker", node(), QDir]).
clear_disk_queue_dir(Id, Index) ->
ReplayQDir = disk_queue_dir(Id, Index),
case file:del_dir_r(ReplayQDir) of
{error, enoent} ->
ok;
Res ->
Res
end.
ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) -> ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) ->
Ref = make_ref(), Ref = make_ref(),
TRef = erlang:send_after(T, self(), {flush, Ref}), TRef = erlang:send_after(T, self(), {flush, Ref}),

View File

@ -67,7 +67,8 @@ stop_workers(ResId, Opts) ->
WorkerPoolSize = worker_pool_size(Opts), WorkerPoolSize = worker_pool_size(Opts),
lists:foreach( lists:foreach(
fun(Idx) -> fun(Idx) ->
ensure_worker_removed(ResId, Idx) ensure_worker_removed(ResId, Idx),
ensure_disk_queue_dir_absent(ResId, Idx)
end, end,
lists:seq(1, WorkerPoolSize) lists:seq(1, WorkerPoolSize)
), ),
@ -127,6 +128,10 @@ ensure_worker_removed(ResId, Idx) ->
{error, Reason} {error, Reason}
end. end.
ensure_disk_queue_dir_absent(ResourceId, Index) ->
ok = emqx_resource_worker:clear_disk_queue_dir(ResourceId, Index),
ok.
ensure_worker_pool_removed(ResId) -> ensure_worker_pool_removed(ResId) ->
try try
gproc_pool:delete(ResId) gproc_pool:delete(ResId)

View File

@ -88,6 +88,19 @@ on_query(_InstId, block, #{pid := Pid}) ->
on_query(_InstId, resume, #{pid := Pid}) -> on_query(_InstId, resume, #{pid := Pid}) ->
Pid ! resume, Pid ! resume,
ok; ok;
on_query(_InstId, {big_payload, Payload}, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, {big_payload, Payload}},
receive
{ReqRef, ok} ->
?tp(connector_demo_big_payload, #{payload => Payload}),
ok;
{ReqRef, incorrect_status} ->
{error, {recoverable_error, incorrect_status}}
after 1000 ->
{error, timeout}
end;
on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
ReqRef = make_ref(), ReqRef = make_ref(),
From = {self(), ReqRef}, From = {self(), ReqRef},
@ -216,6 +229,9 @@ counter_loop(
{{FromPid, ReqRef}, {inc, _N}} when Status == blocked -> {{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->
FromPid ! {ReqRef, incorrect_status}, FromPid ! {ReqRef, incorrect_status},
State#{incorrect_status_count := IncorrectCount + 1}; State#{incorrect_status_count := IncorrectCount + 1};
{{FromPid, ReqRef}, {big_payload, _Payload}} when Status == blocked ->
FromPid ! {ReqRef, incorrect_status},
State#{incorrect_status_count := IncorrectCount + 1};
{get, ReplyFun} -> {get, ReplyFun} ->
apply_reply(ReplyFun, Num), apply_reply(ReplyFun, Num),
State; State;

View File

@ -411,8 +411,21 @@ t_query_counter_async_inflight(_) ->
%% send async query to make the inflight window full %% send async query to make the inflight window full
?check_trace( ?check_trace(
?TRACE_OPTS, begin
{ok, SRef} = snabbkaffe:subscribe(
?match_event(
#{
?snk_kind := resource_worker_appended_to_inflight,
is_new := true
}
),
WindowSize,
_Timeout = 5_000
),
inc_counter_in_parallel(WindowSize, ReqOpts), inc_counter_in_parallel(WindowSize, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef),
ok
end,
fun(Trace) -> fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace), QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
@ -455,29 +468,29 @@ t_query_counter_async_inflight(_) ->
%% +1 because the tmp_query above will be retried and succeed %% +1 because the tmp_query above will be retried and succeed
%% this time. %% this time.
WindowSize + 1, WindowSize + 1,
_Timeout = 60_000 _Timeout0 = 10_000
), ),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
tap_metrics(?LINE), tap_metrics(?LINE),
{ok, _} = snabbkaffe:receive_events(SRef0), {ok, _} = snabbkaffe:receive_events(SRef0),
tap_metrics(?LINE),
%% since the previous tmp_query was enqueued to be retried, we %% since the previous tmp_query was enqueued to be retried, we
%% take it again from the table; this time, it should have %% take it again from the table; this time, it should have
%% succeeded. %% succeeded.
?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)), ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
?assertEqual(WindowSize, ets:info(Tab0, size)), ?assertEqual(WindowSize, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
tap_metrics(?LINE), tap_metrics(?LINE),
%% send async query, this time everything should be ok. %% send async query, this time everything should be ok.
Num = 10, Num = 10,
?check_trace( ?check_trace(
?TRACE_OPTS,
begin begin
{ok, SRef} = snabbkaffe:subscribe( {ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}), ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
Num, Num,
_Timeout = 60_000 _Timeout0 = 10_000
), ),
inc_counter_in_parallel(Num, ReqOpts), inc_counter_in_parallel_increasing(Num, 1, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef), {ok, _} = snabbkaffe:receive_events(SRef),
ok ok
end, end,
@ -502,17 +515,18 @@ t_query_counter_async_inflight(_) ->
), ),
%% this will block the resource_worker %% this will block the resource_worker
ok = emqx_resource:query(?ID, {inc_counter, 1}), ok = emqx_resource:query(?ID, {inc_counter, 4}),
Sent = WindowSize + Num + WindowSize, Sent = WindowSize + Num + WindowSize,
{ok, SRef1} = snabbkaffe:subscribe( {ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}), ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
WindowSize, WindowSize,
_Timeout = 60_000 _Timeout0 = 10_000
), ),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
{ok, _} = snabbkaffe:receive_events(SRef1), {ok, _} = snabbkaffe:receive_events(SRef1),
?assertEqual(Sent, ets:info(Tab0, size)), ?assertEqual(Sent, ets:info(Tab0, size)),
tap_metrics(?LINE),
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]), ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
@ -842,6 +856,8 @@ t_stop_start(_) ->
?assert(is_process_alive(Pid0)), ?assert(is_process_alive(Pid0)),
%% metrics are reset when recreating %% metrics are reset when recreating
%% depending on timing, might show the request we just did.
ct:sleep(500),
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
ok = emqx_resource:stop(?ID), ok = emqx_resource:stop(?ID),
@ -861,6 +877,7 @@ t_stop_start(_) ->
?assert(is_process_alive(Pid1)), ?assert(is_process_alive(Pid1)),
%% now stop while resetting the metrics %% now stop while resetting the metrics
ct:sleep(500),
emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1), emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1),
emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4), emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4),
?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
@ -1059,7 +1076,7 @@ t_retry_batch(_Config) ->
%% batch shall remain enqueued. %% batch shall remain enqueued.
{ok, _} = {ok, _} =
snabbkaffe:block_until( snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := resource_worker_retry_queue_batch_failed}), ?match_n_events(2, #{?snk_kind := resource_worker_retry_inflight_failed}),
5_000 5_000
), ),
%% should not have increased the matched count with the retries %% should not have increased the matched count with the retries
@ -1071,7 +1088,7 @@ t_retry_batch(_Config) ->
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
ok = emqx_resource:simple_sync_query(?ID, resume), ok = emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := resource_worker_retry_queue_batch_succeeded}, #{?snk_kind := resource_worker_retry_inflight_succeeded},
5_000 5_000
), ),
%% 1 more because of the `resume' call %% 1 more because of the `resume' call
@ -1108,6 +1125,92 @@ t_retry_batch(_Config) ->
), ),
ok. ok.
t_delete_and_re_create_with_same_name(_Config) ->
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 1,
worker_pool_size => 2,
queue_seg_bytes => 100,
resume_interval => 1_000
}
),
%% pre-condition: we should have just created a new queue
Queuing0 = emqx_resource_metrics:queuing_get(?ID),
Inflight0 = emqx_resource_metrics:inflight_get(?ID),
?assertEqual(0, Queuing0),
?assertEqual(0, Inflight0),
?check_trace(
begin
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
NumRequests = 10,
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := resource_worker_appended_to_queue}),
NumRequests,
_Timeout = 5_000
),
%% ensure replayq offloads to disk
Payload = binary:copy(<<"a">>, 119),
lists:foreach(
fun(N) ->
{error, _} =
emqx_resource:query(
?ID,
{big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>}
)
end,
lists:seq(1, NumRequests)
),
{ok, _} = snabbkaffe:receive_events(SRef),
%% ensure that stuff got enqueued into disk
tap_metrics(?LINE),
Queuing1 = emqx_resource_metrics:queuing_get(?ID),
Inflight1 = emqx_resource_metrics:inflight_get(?ID),
?assertEqual(NumRequests - 1, Queuing1),
?assertEqual(1, Inflight1),
%% now, we delete the resource
ok = emqx_resource:remove_local(?ID),
?assertEqual({error, not_found}, emqx_resource_manager:lookup(?ID)),
%% re-create the resource with the *same name*
{{ok, _}, {ok, _Events}} =
?wait_async_action(
emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
worker_pool_size => 2,
queue_seg_bytes => 100,
resume_interval => 1_000
}
),
#{?snk_kind := resource_worker_enter_blocked},
5_000
),
%% it shouldn't have anything enqueued, as it's a fresh resource
Queuing2 = emqx_resource_metrics:queuing_get(?ID),
Inflight2 = emqx_resource_metrics:queuing_get(?ID),
?assertEqual(0, Queuing2),
?assertEqual(0, Inflight2),
ok
end,
[]
),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helpers %% Helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -1137,6 +1240,29 @@ inc_counter_in_parallel(N, Opts0) ->
|| Pid <- Pids || Pid <- Pids
]. ].
inc_counter_in_parallel_increasing(N, StartN, Opts0) ->
Parent = self(),
Pids = [
erlang:spawn(fun() ->
Opts =
case is_function(Opts0) of
true -> Opts0();
false -> Opts0
end,
emqx_resource:query(?ID, {inc_counter, M}, Opts),
Parent ! {complete, self()}
end)
|| M <- lists:seq(StartN, StartN + N - 1)
],
[
receive
{complete, Pid} -> ok
after 1000 ->
ct:fail({wait_for_query_timeout, Pid})
end
|| Pid <- Pids
].
bin_config() -> bin_config() ->
<<"\"name\": \"test_resource\"">>. <<"\"name\": \"test_resource\"">>.

View File

@ -92,7 +92,6 @@ values(common, Protocol, SupportUint, TypeOpts) ->
"bool=${payload.bool}">>, "bool=${payload.bool}">>,
precision => ms, precision => ms,
resource_opts => #{ resource_opts => #{
enable_batch => false,
batch_size => 100, batch_size => 100,
batch_time => <<"20ms">> batch_time => <<"20ms">>
}, },

View File

@ -508,14 +508,16 @@ install_telemetry_handler(TestCase) ->
wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
Events = receive_all_events(GaugeName, Timeout), Events = receive_all_events(GaugeName, Timeout),
case lists:last(Events) of case length(Events) > 0 andalso lists:last(Events) of
#{measurements := #{gauge_set := ExpectedValue}} -> #{measurements := #{gauge_set := ExpectedValue}} ->
ok; ok;
#{measurements := #{gauge_set := Value}} -> #{measurements := #{gauge_set := Value}} ->
ct:fail( ct:fail(
"gauge ~p didn't reach expected value ~p; last value: ~p", "gauge ~p didn't reach expected value ~p; last value: ~p",
[GaugeName, ExpectedValue, Value] [GaugeName, ExpectedValue, Value]
) );
false ->
ct:pal("no ~p gauge events received!", [GaugeName])
end. end.
receive_all_events(EventName, Timeout) -> receive_all_events(EventName, Timeout) ->
@ -605,6 +607,8 @@ t_publish_success(Config) ->
ResourceId, ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000} #{n_events => ExpectedInflightEvents, timeout => 5_000}
), ),
wait_until_gauge_is(queuing, 0, 500),
wait_until_gauge_is(inflight, 0, 500),
assert_metrics( assert_metrics(
#{ #{
dropped => 0, dropped => 0,
@ -653,6 +657,8 @@ t_publish_success_local_topic(Config) ->
ResourceId, ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000} #{n_events => ExpectedInflightEvents, timeout => 5_000}
), ),
wait_until_gauge_is(queuing, 0, 500),
wait_until_gauge_is(inflight, 0, 500),
assert_metrics( assert_metrics(
#{ #{
dropped => 0, dropped => 0,
@ -739,6 +745,8 @@ t_publish_templated(Config) ->
ResourceId, ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000} #{n_events => ExpectedInflightEvents, timeout => 5_000}
), ),
wait_until_gauge_is(queuing, 0, 500),
wait_until_gauge_is(inflight, 0, 500),
assert_metrics( assert_metrics(
#{ #{
dropped => 0, dropped => 0,
@ -1111,19 +1119,17 @@ do_econnrefused_or_timeout_test(Config, Error) ->
ResourceId ResourceId
); );
{_, sync} -> {_, sync} ->
wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
timeout => 10_000, n_events => 2
}),
%% even waiting, hard to avoid flakiness... simpler to just sleep %% even waiting, hard to avoid flakiness... simpler to just sleep
%% a bit until stabilization. %% a bit until stabilization.
ct:sleep(200), wait_until_gauge_is(queuing, 0, 500),
wait_until_gauge_is(inflight, 1, 500),
assert_metrics( assert_metrics(
#{ #{
dropped => 0, dropped => 0,
failed => 0, failed => 0,
inflight => 0, inflight => 1,
matched => 1, matched => 1,
queuing => 1, queuing => 0,
retried => 0, retried => 0,
success => 0 success => 0
}, },

View File

@ -58,7 +58,7 @@ defmodule EMQXUmbrella.MixProject do
{:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
{:minirest, github: "emqx/minirest", tag: "1.3.7", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.7", override: true},
{:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true},
{:replayq, github: "emqx/replayq", tag: "0.3.5", override: true}, {:replayq, github: "emqx/replayq", tag: "0.3.6", override: true},
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
{:emqtt, github: "emqx/emqtt", tag: "1.7.0-rc.2", override: true}, {:emqtt, github: "emqx/emqtt", tag: "1.7.0-rc.2", override: true},
{:rulesql, github: "emqx/rulesql", tag: "0.1.4"}, {:rulesql, github: "emqx/rulesql", tag: "0.1.4"},

View File

@ -60,7 +60,7 @@
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
, {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.5"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.6"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.2"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.2"}}}
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}