refactor: rename `emqx_resource_worker` -> `emqx_resource_buffer_worker`
To make it more clear that it's purpose is serve as a buffering layer.
This commit is contained in:
parent
caf50d1b35
commit
47f796dd12
|
@ -65,7 +65,7 @@ load() ->
|
|||
fun({Type, NamedConf}) ->
|
||||
lists:foreach(
|
||||
fun({Name, Conf}) ->
|
||||
%% fetch opts for `emqx_resource_worker`
|
||||
%% fetch opts for `emqx_resource_buffer_worker`
|
||||
ResOpts = emqx_resource:fetch_creation_opts(Conf),
|
||||
safe_load_bridge(Type, Name, Conf, ResOpts)
|
||||
end,
|
||||
|
|
|
@ -886,9 +886,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
|||
{ok, SRef} =
|
||||
snabbkaffe:subscribe(
|
||||
fun
|
||||
(#{?snk_kind := resource_worker_retry_inflight_failed}) ->
|
||||
(#{?snk_kind := buffer_worker_retry_inflight_failed}) ->
|
||||
true;
|
||||
(#{?snk_kind := resource_worker_flush_nack}) ->
|
||||
(#{?snk_kind := buffer_worker_flush_nack}) ->
|
||||
true;
|
||||
(_) ->
|
||||
false
|
||||
|
|
|
@ -255,7 +255,7 @@ reset_metrics(ResId) ->
|
|||
query(ResId, Request) ->
|
||||
query(ResId, Request, #{}).
|
||||
|
||||
-spec query(resource_id(), Request :: term(), emqx_resource_worker:query_opts()) ->
|
||||
-spec query(resource_id(), Request :: term(), emqx_resource_buffer_worker:query_opts()) ->
|
||||
Result :: term().
|
||||
query(ResId, Request, Opts) ->
|
||||
case emqx_resource_manager:ets_lookup(ResId) of
|
||||
|
@ -263,11 +263,11 @@ query(ResId, Request, Opts) ->
|
|||
IsBufferSupported = is_buffer_supported(Module),
|
||||
case {IsBufferSupported, QM} of
|
||||
{true, _} ->
|
||||
emqx_resource_worker:simple_sync_query(ResId, Request);
|
||||
emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
|
||||
{false, sync} ->
|
||||
emqx_resource_worker:sync_query(ResId, Request, Opts);
|
||||
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
|
||||
{false, async} ->
|
||||
emqx_resource_worker:async_query(ResId, Request, Opts)
|
||||
emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
|
||||
end;
|
||||
{error, not_found} ->
|
||||
?RESOURCE_ERROR(not_found, "resource not found")
|
||||
|
@ -275,7 +275,7 @@ query(ResId, Request, Opts) ->
|
|||
|
||||
-spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term().
|
||||
simple_sync_query(ResId, Request) ->
|
||||
emqx_resource_worker:simple_sync_query(ResId, Request).
|
||||
emqx_resource_buffer_worker:simple_sync_query(ResId, Request).
|
||||
|
||||
-spec start(resource_id()) -> ok | {error, Reason :: term()}.
|
||||
start(ResId) ->
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
%% This module implements async message sending, disk message queuing,
|
||||
%% and message batching using ReplayQ.
|
||||
|
||||
-module(emqx_resource_worker).
|
||||
-module(emqx_resource_buffer_worker).
|
||||
|
||||
-include("emqx_resource.hrl").
|
||||
-include("emqx_resource_utils.hrl").
|
||||
|
@ -176,11 +176,11 @@ init({Id, Index, Opts}) ->
|
|||
resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
|
||||
tref => undefined
|
||||
},
|
||||
?tp(resource_worker_init, #{id => Id, index => Index}),
|
||||
?tp(buffer_worker_init, #{id => Id, index => Index}),
|
||||
{ok, running, Data}.
|
||||
|
||||
running(enter, _, St) ->
|
||||
?tp(resource_worker_enter_running, #{}),
|
||||
?tp(buffer_worker_enter_running, #{}),
|
||||
maybe_flush(St);
|
||||
running(cast, resume, _St) ->
|
||||
keep_state_and_data;
|
||||
|
@ -206,7 +206,7 @@ running(info, Info, _St) ->
|
|||
keep_state_and_data.
|
||||
|
||||
blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
|
||||
?tp(resource_worker_enter_blocked, #{}),
|
||||
?tp(buffer_worker_enter_blocked, #{}),
|
||||
{keep_state_and_data, {state_timeout, ResumeT, unblock}};
|
||||
blocked(cast, block, _St) ->
|
||||
keep_state_and_data;
|
||||
|
@ -315,7 +315,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
|
|||
index := Index,
|
||||
resume_interval := ResumeT
|
||||
} = Data0,
|
||||
?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
|
||||
?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
|
||||
QueryOpts = #{simple_query => false},
|
||||
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
|
||||
ReplyResult =
|
||||
|
@ -331,7 +331,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
|
|||
{nack, PostFn} ->
|
||||
PostFn(),
|
||||
?tp(
|
||||
resource_worker_retry_inflight_failed,
|
||||
buffer_worker_retry_inflight_failed,
|
||||
#{
|
||||
ref => Ref,
|
||||
query_or_batch => QueryOrBatch
|
||||
|
@ -349,7 +349,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
|
|||
%% we bump the counter when removing it from the table.
|
||||
IsAcked andalso PostFn(),
|
||||
?tp(
|
||||
resource_worker_retry_inflight_succeeded,
|
||||
buffer_worker_retry_inflight_succeeded,
|
||||
#{
|
||||
ref => Ref,
|
||||
query_or_batch => QueryOrBatch
|
||||
|
@ -415,7 +415,7 @@ flush(Data0) ->
|
|||
{0, _} ->
|
||||
{keep_state, Data1};
|
||||
{_, true} ->
|
||||
?tp(resource_worker_flush_but_inflight_full, #{}),
|
||||
?tp(buffer_worker_flush_but_inflight_full, #{}),
|
||||
Data2 = ensure_flush_timer(Data1),
|
||||
{keep_state, Data2};
|
||||
{_, false} ->
|
||||
|
@ -483,7 +483,7 @@ do_flush(
|
|||
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
|
||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||
?tp(
|
||||
resource_worker_flush_nack,
|
||||
buffer_worker_flush_nack,
|
||||
#{
|
||||
ref => Ref,
|
||||
is_retriable => IsRetriable,
|
||||
|
@ -512,7 +512,7 @@ do_flush(
|
|||
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
|
||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||
?tp(
|
||||
resource_worker_flush_ack,
|
||||
buffer_worker_flush_ack,
|
||||
#{
|
||||
batch_or_query => Request,
|
||||
result => Result
|
||||
|
@ -560,7 +560,7 @@ do_flush(Data0, #{
|
|||
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
|
||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||
?tp(
|
||||
resource_worker_flush_nack,
|
||||
buffer_worker_flush_nack,
|
||||
#{
|
||||
ref => Ref,
|
||||
is_retriable => IsRetriable,
|
||||
|
@ -589,7 +589,7 @@ do_flush(Data0, #{
|
|||
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
|
||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||
?tp(
|
||||
resource_worker_flush_ack,
|
||||
buffer_worker_flush_ack,
|
||||
#{
|
||||
batch_or_query => Batch,
|
||||
result => Result
|
||||
|
@ -873,7 +873,7 @@ reply_after_query(
|
|||
case Action of
|
||||
nack ->
|
||||
%% Keep retrying.
|
||||
?tp(resource_worker_reply_after_query, #{
|
||||
?tp(buffer_worker_reply_after_query, #{
|
||||
action => Action,
|
||||
batch_or_query => ?QUERY(From, Request, HasBeenSent),
|
||||
ref => Ref,
|
||||
|
@ -882,7 +882,7 @@ reply_after_query(
|
|||
mark_inflight_as_retriable(InflightTID, Ref),
|
||||
?MODULE:block(Pid);
|
||||
ack ->
|
||||
?tp(resource_worker_reply_after_query, #{
|
||||
?tp(buffer_worker_reply_after_query, #{
|
||||
action => Action,
|
||||
batch_or_query => ?QUERY(From, Request, HasBeenSent),
|
||||
ref => Ref,
|
||||
|
@ -903,7 +903,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Resu
|
|||
case Action of
|
||||
nack ->
|
||||
%% Keep retrying.
|
||||
?tp(resource_worker_reply_after_query, #{
|
||||
?tp(buffer_worker_reply_after_query, #{
|
||||
action => nack,
|
||||
batch_or_query => Batch,
|
||||
ref => Ref,
|
||||
|
@ -912,7 +912,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Resu
|
|||
mark_inflight_as_retriable(InflightTID, Ref),
|
||||
?MODULE:block(Pid);
|
||||
ack ->
|
||||
?tp(resource_worker_reply_after_query, #{
|
||||
?tp(buffer_worker_reply_after_query, #{
|
||||
action => ack,
|
||||
batch_or_query => Batch,
|
||||
ref => Ref,
|
||||
|
@ -955,7 +955,7 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
|
|||
end,
|
||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)),
|
||||
?tp(
|
||||
resource_worker_appended_to_queue,
|
||||
buffer_worker_appended_to_queue,
|
||||
#{
|
||||
id => Id,
|
||||
items => Queries,
|
||||
|
@ -973,7 +973,7 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
|
|||
|
||||
inflight_new(InfltWinSZ, Id, Index) ->
|
||||
TableId = ets:new(
|
||||
emqx_resource_worker_inflight_tab,
|
||||
emqx_resource_buffer_worker_inflight_tab,
|
||||
[ordered_set, public, {write_concurrency, true}]
|
||||
),
|
||||
inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index),
|
||||
|
@ -1040,7 +1040,7 @@ inflight_append(
|
|||
BatchSize = length(Batch),
|
||||
IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}),
|
||||
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
|
||||
?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
|
||||
?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
|
||||
ok;
|
||||
inflight_append(
|
||||
InflightTID,
|
||||
|
@ -1053,7 +1053,7 @@ inflight_append(
|
|||
IsNew = ets:insert_new(InflightTID, InflightItem),
|
||||
IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}),
|
||||
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
|
||||
?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
|
||||
?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
|
||||
ok;
|
||||
inflight_append(InflightTID, {Ref, Data}, _Id, _Index) ->
|
||||
ets:insert(InflightTID, {Ref, Data}),
|
||||
|
@ -1130,7 +1130,7 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
|
|||
end
|
||||
),
|
||||
_NumAffected = ets:select_replace(InflightTID, MatchSpec),
|
||||
?tp(resource_worker_worker_down_update, #{num_affected => _NumAffected}),
|
||||
?tp(buffer_worker_worker_down_update, #{num_affected => _NumAffected}),
|
||||
ok.
|
||||
|
||||
%%==============================================================================
|
|
@ -13,7 +13,7 @@
|
|||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_resource_worker_sup).
|
||||
-module(emqx_resource_buffer_worker_sup).
|
||||
-behaviour(supervisor).
|
||||
|
||||
%%%=============================================================================
|
||||
|
@ -99,7 +99,7 @@ ensure_worker_added(ResId, Idx) ->
|
|||
|
||||
-define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}).
|
||||
ensure_worker_started(ResId, Idx, Opts) ->
|
||||
Mod = emqx_resource_worker,
|
||||
Mod = emqx_resource_buffer_worker,
|
||||
Spec = #{
|
||||
id => ?CHILD_ID(Mod, ResId, Idx),
|
||||
start => {Mod, start_link, [ResId, Idx, Opts]},
|
||||
|
@ -116,7 +116,7 @@ ensure_worker_started(ResId, Idx, Opts) ->
|
|||
end.
|
||||
|
||||
ensure_worker_removed(ResId, Idx) ->
|
||||
ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx),
|
||||
ChildId = ?CHILD_ID(emqx_resource_buffer_worker, ResId, Idx),
|
||||
case supervisor:terminate_child(?SERVER, ChildId) of
|
||||
ok ->
|
||||
Res = supervisor:delete_child(?SERVER, ChildId),
|
||||
|
@ -129,7 +129,7 @@ ensure_worker_removed(ResId, Idx) ->
|
|||
end.
|
||||
|
||||
ensure_disk_queue_dir_absent(ResourceId, Index) ->
|
||||
ok = emqx_resource_worker:clear_disk_queue_dir(ResourceId, Index),
|
||||
ok = emqx_resource_buffer_worker:clear_disk_queue_dir(ResourceId, Index),
|
||||
ok.
|
||||
|
||||
ensure_worker_pool_removed(ResId) ->
|
|
@ -150,7 +150,7 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
|
|||
%% buffer, so there is no need for resource workers
|
||||
ok;
|
||||
false ->
|
||||
ok = emqx_resource_worker_sup:start_workers(ResId, Opts),
|
||||
ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts),
|
||||
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
|
||||
true ->
|
||||
wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
|
||||
|
@ -473,7 +473,7 @@ retry_actions(Data) ->
|
|||
|
||||
handle_remove_event(From, ClearMetrics, Data) ->
|
||||
stop_resource(Data),
|
||||
ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts),
|
||||
ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
|
||||
case ClearMetrics of
|
||||
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
|
||||
false -> ok
|
||||
|
@ -587,9 +587,9 @@ maybe_alarm(_Status, ResId) ->
|
|||
maybe_resume_resource_workers(connected) ->
|
||||
lists:foreach(
|
||||
fun({_, Pid, _, _}) ->
|
||||
emqx_resource_worker:resume(Pid)
|
||||
emqx_resource_buffer_worker:resume(Pid)
|
||||
end,
|
||||
supervisor:which_children(emqx_resource_worker_sup)
|
||||
supervisor:which_children(emqx_resource_buffer_worker_sup)
|
||||
);
|
||||
maybe_resume_resource_workers(_) ->
|
||||
ok.
|
||||
|
|
|
@ -39,8 +39,8 @@ init([]) ->
|
|||
modules => [emqx_resource_manager_sup]
|
||||
},
|
||||
WorkerSup = #{
|
||||
id => emqx_resource_worker_sup,
|
||||
start => {emqx_resource_worker_sup, start_link, []},
|
||||
id => emqx_resource_buffer_worker_sup,
|
||||
start => {emqx_resource_buffer_worker_sup, start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => infinity,
|
||||
type => supervisor
|
||||
|
|
|
@ -414,7 +414,7 @@ t_query_counter_async_inflight(_) ->
|
|||
{_, {ok, _}} =
|
||||
?wait_async_action(
|
||||
inc_counter_in_parallel(WindowSize, ReqOpts),
|
||||
#{?snk_kind := resource_worker_flush_but_inflight_full},
|
||||
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
||||
1_000
|
||||
),
|
||||
fun(Trace) ->
|
||||
|
@ -439,7 +439,7 @@ t_query_counter_async_inflight(_) ->
|
|||
emqx_resource:query(?ID, {inc_counter, 99}, #{
|
||||
async_reply_fun => {Insert, [Tab0, tmp_query]}
|
||||
}),
|
||||
#{?snk_kind := resource_worker_appended_to_queue},
|
||||
#{?snk_kind := buffer_worker_appended_to_queue},
|
||||
1_000
|
||||
),
|
||||
tap_metrics(?LINE),
|
||||
|
@ -490,7 +490,7 @@ t_query_counter_async_inflight(_) ->
|
|||
{_, {ok, _}} =
|
||||
?wait_async_action(
|
||||
inc_counter_in_parallel(WindowSize, ReqOpts),
|
||||
#{?snk_kind := resource_worker_flush_but_inflight_full},
|
||||
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
||||
1_000
|
||||
),
|
||||
fun(Trace) ->
|
||||
|
@ -596,7 +596,7 @@ t_query_counter_async_inflight_batch(_) ->
|
|||
{_, {ok, _}} =
|
||||
?wait_async_action(
|
||||
inc_counter_in_parallel(NumMsgs, ReqOpts),
|
||||
#{?snk_kind := resource_worker_flush_but_inflight_full},
|
||||
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
||||
5_000
|
||||
),
|
||||
fun(Trace) ->
|
||||
|
@ -623,7 +623,7 @@ t_query_counter_async_inflight_batch(_) ->
|
|||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
emqx_resource:query(?ID, {inc_counter, 2}),
|
||||
#{?snk_kind := resource_worker_flush_but_inflight_full},
|
||||
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
||||
5_000
|
||||
),
|
||||
?assertMatch(0, ets:info(Tab0, size)),
|
||||
|
@ -646,7 +646,7 @@ t_query_counter_async_inflight_batch(_) ->
|
|||
emqx_resource:query(?ID, {inc_counter, 3}, #{
|
||||
async_reply_fun => {Insert, [Tab0, tmp_query]}
|
||||
}),
|
||||
#{?snk_kind := resource_worker_appended_to_queue},
|
||||
#{?snk_kind := buffer_worker_appended_to_queue},
|
||||
1_000
|
||||
),
|
||||
tap_metrics(?LINE),
|
||||
|
@ -706,7 +706,7 @@ t_query_counter_async_inflight_batch(_) ->
|
|||
{_, {ok, _}} =
|
||||
?wait_async_action(
|
||||
inc_counter_in_parallel(NumMsgs, ReqOpts),
|
||||
#{?snk_kind := resource_worker_flush_but_inflight_full},
|
||||
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
||||
5_000
|
||||
),
|
||||
fun(Trace) ->
|
||||
|
@ -1055,7 +1055,7 @@ t_retry_batch(_Config) ->
|
|||
end,
|
||||
Payloads
|
||||
),
|
||||
#{?snk_kind := resource_worker_enter_blocked},
|
||||
#{?snk_kind := buffer_worker_enter_blocked},
|
||||
5_000
|
||||
),
|
||||
%% now the individual messages should have been counted
|
||||
|
@ -1066,7 +1066,7 @@ t_retry_batch(_Config) ->
|
|||
%% batch shall remain enqueued.
|
||||
{ok, _} =
|
||||
snabbkaffe:block_until(
|
||||
?match_n_events(2, #{?snk_kind := resource_worker_retry_inflight_failed}),
|
||||
?match_n_events(2, #{?snk_kind := buffer_worker_retry_inflight_failed}),
|
||||
5_000
|
||||
),
|
||||
%% should not have increased the matched count with the retries
|
||||
|
@ -1078,7 +1078,7 @@ t_retry_batch(_Config) ->
|
|||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
ok = emqx_resource:simple_sync_query(?ID, resume),
|
||||
#{?snk_kind := resource_worker_retry_inflight_succeeded},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_succeeded},
|
||||
5_000
|
||||
),
|
||||
%% 1 more because of the `resume' call
|
||||
|
@ -1140,7 +1140,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
|
|||
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
|
||||
NumRequests = 10,
|
||||
{ok, SRef} = snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := resource_worker_enter_blocked}),
|
||||
?match_event(#{?snk_kind := buffer_worker_enter_blocked}),
|
||||
NumBufferWorkers,
|
||||
_Timeout = 5_000
|
||||
),
|
||||
|
@ -1189,7 +1189,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
|
|||
resume_interval => 1_000
|
||||
}
|
||||
),
|
||||
#{?snk_kind := resource_worker_enter_running},
|
||||
#{?snk_kind := buffer_worker_enter_running},
|
||||
5_000
|
||||
),
|
||||
|
||||
|
@ -1271,13 +1271,13 @@ t_retry_sync_inflight(_Config) ->
|
|||
Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
|
||||
TestPid ! {res, Res}
|
||||
end),
|
||||
#{?snk_kind := resource_worker_retry_inflight_failed},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_failed},
|
||||
ResumeInterval * 2
|
||||
),
|
||||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
ok = emqx_resource:simple_sync_query(?ID, resume),
|
||||
#{?snk_kind := resource_worker_retry_inflight_succeeded},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_succeeded},
|
||||
ResumeInterval * 3
|
||||
),
|
||||
receive
|
||||
|
@ -1322,13 +1322,13 @@ t_retry_sync_inflight_batch(_Config) ->
|
|||
Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
|
||||
TestPid ! {res, Res}
|
||||
end),
|
||||
#{?snk_kind := resource_worker_retry_inflight_failed},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_failed},
|
||||
ResumeInterval * 2
|
||||
),
|
||||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
ok = emqx_resource:simple_sync_query(?ID, resume),
|
||||
#{?snk_kind := resource_worker_retry_inflight_succeeded},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_succeeded},
|
||||
ResumeInterval * 3
|
||||
),
|
||||
receive
|
||||
|
@ -1368,7 +1368,7 @@ t_retry_async_inflight(_Config) ->
|
|||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
|
||||
#{?snk_kind := resource_worker_retry_inflight_failed},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_failed},
|
||||
ResumeInterval * 2
|
||||
),
|
||||
|
||||
|
@ -1376,7 +1376,7 @@ t_retry_async_inflight(_Config) ->
|
|||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
emqx_resource:simple_sync_query(?ID, resume),
|
||||
#{?snk_kind := resource_worker_enter_running},
|
||||
#{?snk_kind := buffer_worker_enter_running},
|
||||
ResumeInterval * 2
|
||||
),
|
||||
ok
|
||||
|
@ -1411,7 +1411,7 @@ t_retry_async_inflight_batch(_Config) ->
|
|||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
|
||||
#{?snk_kind := resource_worker_retry_inflight_failed},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_failed},
|
||||
ResumeInterval * 2
|
||||
),
|
||||
|
||||
|
@ -1419,7 +1419,7 @@ t_retry_async_inflight_batch(_Config) ->
|
|||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
emqx_resource:simple_sync_query(?ID, resume),
|
||||
#{?snk_kind := resource_worker_enter_running},
|
||||
#{?snk_kind := buffer_worker_enter_running},
|
||||
ResumeInterval * 2
|
||||
),
|
||||
ok
|
||||
|
@ -1459,7 +1459,7 @@ t_async_pool_worker_death(_Config) ->
|
|||
NumReqs = 10,
|
||||
{ok, SRef0} =
|
||||
snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := resource_worker_appended_to_inflight}),
|
||||
?match_event(#{?snk_kind := buffer_worker_appended_to_inflight}),
|
||||
NumReqs,
|
||||
1_000
|
||||
),
|
||||
|
@ -1472,7 +1472,7 @@ t_async_pool_worker_death(_Config) ->
|
|||
%% grab one of the worker pids and kill it
|
||||
{ok, SRef1} =
|
||||
snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := resource_worker_worker_down_update}),
|
||||
?match_event(#{?snk_kind := buffer_worker_worker_down_update}),
|
||||
NumBufferWorkers,
|
||||
10_000
|
||||
),
|
||||
|
@ -1568,8 +1568,8 @@ assert_sync_retry_fail_then_succeed_inflight(Trace) ->
|
|||
ct:pal(" ~p", [Trace]),
|
||||
?assert(
|
||||
?strict_causality(
|
||||
#{?snk_kind := resource_worker_flush_nack, ref := _Ref},
|
||||
#{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
|
||||
#{?snk_kind := buffer_worker_flush_nack, ref := _Ref},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
|
||||
Trace
|
||||
)
|
||||
),
|
||||
|
@ -1577,8 +1577,8 @@ assert_sync_retry_fail_then_succeed_inflight(Trace) ->
|
|||
%% before restoring the resource health.
|
||||
?assert(
|
||||
?causality(
|
||||
#{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
|
||||
#{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_succeeded, ref := _Ref},
|
||||
Trace
|
||||
)
|
||||
),
|
||||
|
@ -1588,8 +1588,8 @@ assert_async_retry_fail_then_succeed_inflight(Trace) ->
|
|||
ct:pal(" ~p", [Trace]),
|
||||
?assert(
|
||||
?strict_causality(
|
||||
#{?snk_kind := resource_worker_reply_after_query, action := nack, ref := _Ref},
|
||||
#{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
|
||||
#{?snk_kind := buffer_worker_reply_after_query, action := nack, ref := _Ref},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
|
||||
Trace
|
||||
)
|
||||
),
|
||||
|
@ -1597,8 +1597,8 @@ assert_async_retry_fail_then_succeed_inflight(Trace) ->
|
|||
%% before restoring the resource health.
|
||||
?assert(
|
||||
?causality(
|
||||
#{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
|
||||
#{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
|
||||
#{?snk_kind := buffer_worker_retry_inflight_succeeded, ref := _Ref},
|
||||
Trace
|
||||
)
|
||||
),
|
||||
|
|
|
@ -914,7 +914,7 @@ t_write_failure(Config) ->
|
|||
fun(Trace0) ->
|
||||
case QueryMode of
|
||||
sync ->
|
||||
Trace = ?of_kind(resource_worker_flush_nack, Trace0),
|
||||
Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
|
||||
?assertMatch([_ | _], Trace),
|
||||
[#{result := Result} | _] = Trace,
|
||||
?assert(
|
||||
|
|
|
@ -413,7 +413,7 @@ t_write_failure(Config) ->
|
|||
end),
|
||||
fun(Trace0) ->
|
||||
ct:pal("trace: ~p", [Trace0]),
|
||||
Trace = ?of_kind(resource_worker_flush_nack, Trace0),
|
||||
Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
|
||||
?assertMatch([#{result := {error, _}} | _], Trace),
|
||||
[#{result := {error, Error}} | _] = Trace,
|
||||
case Error of
|
||||
|
|
|
@ -431,7 +431,7 @@ t_write_failure(Config) ->
|
|||
end),
|
||||
fun(Trace0) ->
|
||||
ct:pal("trace: ~p", [Trace0]),
|
||||
Trace = ?of_kind(resource_worker_flush_nack, Trace0),
|
||||
Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
|
||||
?assertMatch([#{result := {error, _}} | _], Trace),
|
||||
[#{result := {error, Error}} | _] = Trace,
|
||||
case Error of
|
||||
|
|
Loading…
Reference in New Issue