Merge pull request #9807 from thalesmg/buffer-worker-rename-v50

buffer work refactoring follow up (part 1)
This commit is contained in:
Thales Macedo Garitezi 2023-01-19 14:17:10 -03:00 committed by GitHub
commit a6ad97e6a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 76 additions and 74 deletions

View File

@ -65,7 +65,7 @@ load() ->
fun({Type, NamedConf}) -> fun({Type, NamedConf}) ->
lists:foreach( lists:foreach(
fun({Name, Conf}) -> fun({Name, Conf}) ->
%% fetch opts for `emqx_resource_worker` %% fetch opts for `emqx_resource_buffer_worker`
ResOpts = emqx_resource:fetch_creation_opts(Conf), ResOpts = emqx_resource:fetch_creation_opts(Conf),
safe_load_bridge(Type, Name, Conf, ResOpts) safe_load_bridge(Type, Name, Conf, ResOpts)
end, end,

View File

@ -886,9 +886,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
{ok, SRef} = {ok, SRef} =
snabbkaffe:subscribe( snabbkaffe:subscribe(
fun fun
(#{?snk_kind := resource_worker_retry_inflight_failed}) -> (#{?snk_kind := buffer_worker_retry_inflight_failed}) ->
true; true;
(#{?snk_kind := resource_worker_flush_nack}) -> (#{?snk_kind := buffer_worker_flush_nack}) ->
true; true;
(_) -> (_) ->
false false

View File

@ -255,7 +255,7 @@ reset_metrics(ResId) ->
query(ResId, Request) -> query(ResId, Request) ->
query(ResId, Request, #{}). query(ResId, Request, #{}).
-spec query(resource_id(), Request :: term(), emqx_resource_worker:query_opts()) -> -spec query(resource_id(), Request :: term(), emqx_resource_buffer_worker:query_opts()) ->
Result :: term(). Result :: term().
query(ResId, Request, Opts) -> query(ResId, Request, Opts) ->
case emqx_resource_manager:ets_lookup(ResId) of case emqx_resource_manager:ets_lookup(ResId) of
@ -263,11 +263,11 @@ query(ResId, Request, Opts) ->
IsBufferSupported = is_buffer_supported(Module), IsBufferSupported = is_buffer_supported(Module),
case {IsBufferSupported, QM} of case {IsBufferSupported, QM} of
{true, _} -> {true, _} ->
emqx_resource_worker:simple_sync_query(ResId, Request); emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
{false, sync} -> {false, sync} ->
emqx_resource_worker:sync_query(ResId, Request, Opts); emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{false, async} -> {false, async} ->
emqx_resource_worker:async_query(ResId, Request, Opts) emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
end; end;
{error, not_found} -> {error, not_found} ->
?RESOURCE_ERROR(not_found, "resource not found") ?RESOURCE_ERROR(not_found, "resource not found")
@ -275,7 +275,7 @@ query(ResId, Request, Opts) ->
-spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term(). -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term().
simple_sync_query(ResId, Request) -> simple_sync_query(ResId, Request) ->
emqx_resource_worker:simple_sync_query(ResId, Request). emqx_resource_buffer_worker:simple_sync_query(ResId, Request).
-spec start(resource_id()) -> ok | {error, Reason :: term()}. -spec start(resource_id()) -> ok | {error, Reason :: term()}.
start(ResId) -> start(ResId) ->

View File

@ -17,7 +17,7 @@
%% This module implements async message sending, disk message queuing, %% This module implements async message sending, disk message queuing,
%% and message batching using ReplayQ. %% and message batching using ReplayQ.
-module(emqx_resource_worker). -module(emqx_resource_buffer_worker).
-include("emqx_resource.hrl"). -include("emqx_resource.hrl").
-include("emqx_resource_utils.hrl"). -include("emqx_resource_utils.hrl").
@ -176,11 +176,11 @@ init({Id, Index, Opts}) ->
resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
tref => undefined tref => undefined
}, },
?tp(resource_worker_init, #{id => Id, index => Index}), ?tp(buffer_worker_init, #{id => Id, index => Index}),
{ok, running, Data}. {ok, running, Data}.
running(enter, _, St) -> running(enter, _, St) ->
?tp(resource_worker_enter_running, #{}), ?tp(buffer_worker_enter_running, #{}),
maybe_flush(St); maybe_flush(St);
running(cast, resume, _St) -> running(cast, resume, _St) ->
keep_state_and_data; keep_state_and_data;
@ -206,7 +206,7 @@ running(info, Info, _St) ->
keep_state_and_data. keep_state_and_data.
blocked(enter, _, #{resume_interval := ResumeT} = _St) -> blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
?tp(resource_worker_enter_blocked, #{}), ?tp(buffer_worker_enter_blocked, #{}),
{keep_state_and_data, {state_timeout, ResumeT, unblock}}; {keep_state_and_data, {state_timeout, ResumeT, unblock}};
blocked(cast, block, _St) -> blocked(cast, block, _St) ->
keep_state_and_data; keep_state_and_data;
@ -315,7 +315,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
index := Index, index := Index,
resume_interval := ResumeT resume_interval := ResumeT
} = Data0, } = Data0,
?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), ?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
QueryOpts = #{simple_query => false}, QueryOpts = #{simple_query => false},
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
ReplyResult = ReplyResult =
@ -331,7 +331,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
{nack, PostFn} -> {nack, PostFn} ->
PostFn(), PostFn(),
?tp( ?tp(
resource_worker_retry_inflight_failed, buffer_worker_retry_inflight_failed,
#{ #{
ref => Ref, ref => Ref,
query_or_batch => QueryOrBatch query_or_batch => QueryOrBatch
@ -349,7 +349,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
%% we bump the counter when removing it from the table. %% we bump the counter when removing it from the table.
IsAcked andalso PostFn(), IsAcked andalso PostFn(),
?tp( ?tp(
resource_worker_retry_inflight_succeeded, buffer_worker_retry_inflight_succeeded,
#{ #{
ref => Ref, ref => Ref,
query_or_batch => QueryOrBatch query_or_batch => QueryOrBatch
@ -415,7 +415,7 @@ flush(Data0) ->
{0, _} -> {0, _} ->
{keep_state, Data1}; {keep_state, Data1};
{_, true} -> {_, true} ->
?tp(resource_worker_flush_but_inflight_full, #{}), ?tp(buffer_worker_flush_but_inflight_full, #{}),
Data2 = ensure_flush_timer(Data1), Data2 = ensure_flush_timer(Data1),
{keep_state, Data2}; {keep_state, Data2};
{_, false} -> {_, false} ->
@ -483,7 +483,7 @@ do_flush(
store_async_worker_reference(InflightTID, Ref, WorkerMRef), store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp( ?tp(
resource_worker_flush_nack, buffer_worker_flush_nack,
#{ #{
ref => Ref, ref => Ref,
is_retriable => IsRetriable, is_retriable => IsRetriable,
@ -512,7 +512,7 @@ do_flush(
store_async_worker_reference(InflightTID, Ref, WorkerMRef), store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp( ?tp(
resource_worker_flush_ack, buffer_worker_flush_ack,
#{ #{
batch_or_query => Request, batch_or_query => Request,
result => Result result => Result
@ -560,7 +560,7 @@ do_flush(Data0, #{
store_async_worker_reference(InflightTID, Ref, WorkerMRef), store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp( ?tp(
resource_worker_flush_nack, buffer_worker_flush_nack,
#{ #{
ref => Ref, ref => Ref,
is_retriable => IsRetriable, is_retriable => IsRetriable,
@ -589,7 +589,7 @@ do_flush(Data0, #{
store_async_worker_reference(InflightTID, Ref, WorkerMRef), store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp( ?tp(
resource_worker_flush_ack, buffer_worker_flush_ack,
#{ #{
batch_or_query => Batch, batch_or_query => Batch,
result => Result result => Result
@ -873,7 +873,7 @@ reply_after_query(
case Action of case Action of
nack -> nack ->
%% Keep retrying. %% Keep retrying.
?tp(resource_worker_reply_after_query, #{ ?tp(buffer_worker_reply_after_query, #{
action => Action, action => Action,
batch_or_query => ?QUERY(From, Request, HasBeenSent), batch_or_query => ?QUERY(From, Request, HasBeenSent),
ref => Ref, ref => Ref,
@ -882,7 +882,7 @@ reply_after_query(
mark_inflight_as_retriable(InflightTID, Ref), mark_inflight_as_retriable(InflightTID, Ref),
?MODULE:block(Pid); ?MODULE:block(Pid);
ack -> ack ->
?tp(resource_worker_reply_after_query, #{ ?tp(buffer_worker_reply_after_query, #{
action => Action, action => Action,
batch_or_query => ?QUERY(From, Request, HasBeenSent), batch_or_query => ?QUERY(From, Request, HasBeenSent),
ref => Ref, ref => Ref,
@ -903,7 +903,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Resu
case Action of case Action of
nack -> nack ->
%% Keep retrying. %% Keep retrying.
?tp(resource_worker_reply_after_query, #{ ?tp(buffer_worker_reply_after_query, #{
action => nack, action => nack,
batch_or_query => Batch, batch_or_query => Batch,
ref => Ref, ref => Ref,
@ -912,7 +912,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Resu
mark_inflight_as_retriable(InflightTID, Ref), mark_inflight_as_retriable(InflightTID, Ref),
?MODULE:block(Pid); ?MODULE:block(Pid);
ack -> ack ->
?tp(resource_worker_reply_after_query, #{ ?tp(buffer_worker_reply_after_query, #{
action => ack, action => ack,
batch_or_query => Batch, batch_or_query => Batch,
ref => Ref, ref => Ref,
@ -955,7 +955,7 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
end, end,
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)),
?tp( ?tp(
resource_worker_appended_to_queue, buffer_worker_appended_to_queue,
#{ #{
id => Id, id => Id,
items => Queries, items => Queries,
@ -973,7 +973,7 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
inflight_new(InfltWinSZ, Id, Index) -> inflight_new(InfltWinSZ, Id, Index) ->
TableId = ets:new( TableId = ets:new(
emqx_resource_worker_inflight_tab, emqx_resource_buffer_worker_inflight_tab,
[ordered_set, public, {write_concurrency, true}] [ordered_set, public, {write_concurrency, true}]
), ),
inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index), inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index),
@ -1040,7 +1040,7 @@ inflight_append(
BatchSize = length(Batch), BatchSize = length(Batch),
IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
ok; ok;
inflight_append( inflight_append(
InflightTID, InflightTID,
@ -1053,7 +1053,7 @@ inflight_append(
IsNew = ets:insert_new(InflightTID, InflightItem), IsNew = ets:insert_new(InflightTID, InflightItem),
IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
ok; ok;
inflight_append(InflightTID, {Ref, Data}, _Id, _Index) -> inflight_append(InflightTID, {Ref, Data}, _Id, _Index) ->
ets:insert(InflightTID, {Ref, Data}), ets:insert(InflightTID, {Ref, Data}),
@ -1130,7 +1130,7 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
end end
), ),
_NumAffected = ets:select_replace(InflightTID, MatchSpec), _NumAffected = ets:select_replace(InflightTID, MatchSpec),
?tp(resource_worker_worker_down_update, #{num_affected => _NumAffected}), ?tp(buffer_worker_worker_down_update, #{num_affected => _NumAffected}),
ok. ok.
%%============================================================================== %%==============================================================================

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_resource_worker_sup). -module(emqx_resource_buffer_worker_sup).
-behaviour(supervisor). -behaviour(supervisor).
%%%============================================================================= %%%=============================================================================
@ -99,7 +99,7 @@ ensure_worker_added(ResId, Idx) ->
-define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}). -define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}).
ensure_worker_started(ResId, Idx, Opts) -> ensure_worker_started(ResId, Idx, Opts) ->
Mod = emqx_resource_worker, Mod = emqx_resource_buffer_worker,
Spec = #{ Spec = #{
id => ?CHILD_ID(Mod, ResId, Idx), id => ?CHILD_ID(Mod, ResId, Idx),
start => {Mod, start_link, [ResId, Idx, Opts]}, start => {Mod, start_link, [ResId, Idx, Opts]},
@ -116,7 +116,7 @@ ensure_worker_started(ResId, Idx, Opts) ->
end. end.
ensure_worker_removed(ResId, Idx) -> ensure_worker_removed(ResId, Idx) ->
ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx), ChildId = ?CHILD_ID(emqx_resource_buffer_worker, ResId, Idx),
case supervisor:terminate_child(?SERVER, ChildId) of case supervisor:terminate_child(?SERVER, ChildId) of
ok -> ok ->
Res = supervisor:delete_child(?SERVER, ChildId), Res = supervisor:delete_child(?SERVER, ChildId),
@ -129,7 +129,7 @@ ensure_worker_removed(ResId, Idx) ->
end. end.
ensure_disk_queue_dir_absent(ResourceId, Index) -> ensure_disk_queue_dir_absent(ResourceId, Index) ->
ok = emqx_resource_worker:clear_disk_queue_dir(ResourceId, Index), ok = emqx_resource_buffer_worker:clear_disk_queue_dir(ResourceId, Index),
ok. ok.
ensure_worker_pool_removed(ResId) -> ensure_worker_pool_removed(ResId) ->

View File

@ -145,7 +145,7 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
%% buffer, so there is no need for resource workers %% buffer, so there is no need for resource workers
ok; ok;
false -> false ->
ok = emqx_resource_worker_sup:start_workers(ResId, Opts), ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts),
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
true -> true ->
wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
@ -468,7 +468,7 @@ retry_actions(Data) ->
handle_remove_event(From, ClearMetrics, Data) -> handle_remove_event(From, ClearMetrics, Data) ->
stop_resource(Data), stop_resource(Data),
ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts), ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
case ClearMetrics of case ClearMetrics of
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
false -> ok false -> ok
@ -582,9 +582,9 @@ maybe_alarm(_Status, ResId) ->
maybe_resume_resource_workers(connected) -> maybe_resume_resource_workers(connected) ->
lists:foreach( lists:foreach(
fun({_, Pid, _, _}) -> fun({_, Pid, _, _}) ->
emqx_resource_worker:resume(Pid) emqx_resource_buffer_worker:resume(Pid)
end, end,
supervisor:which_children(emqx_resource_worker_sup) supervisor:which_children(emqx_resource_buffer_worker_sup)
); );
maybe_resume_resource_workers(_) -> maybe_resume_resource_workers(_) ->
ok. ok.

View File

@ -39,8 +39,8 @@ init([]) ->
modules => [emqx_resource_manager_sup] modules => [emqx_resource_manager_sup]
}, },
WorkerSup = #{ WorkerSup = #{
id => emqx_resource_worker_sup, id => emqx_resource_buffer_worker_sup,
start => {emqx_resource_worker_sup, start_link, []}, start => {emqx_resource_buffer_worker_sup, start_link, []},
restart => permanent, restart => permanent,
shutdown => infinity, shutdown => infinity,
type => supervisor type => supervisor

View File

@ -414,7 +414,7 @@ t_query_counter_async_inflight(_) ->
{_, {ok, _}} = {_, {ok, _}} =
?wait_async_action( ?wait_async_action(
inc_counter_in_parallel(WindowSize, ReqOpts), inc_counter_in_parallel(WindowSize, ReqOpts),
#{?snk_kind := resource_worker_flush_but_inflight_full}, #{?snk_kind := buffer_worker_flush_but_inflight_full},
1_000 1_000
), ),
fun(Trace) -> fun(Trace) ->
@ -439,7 +439,7 @@ t_query_counter_async_inflight(_) ->
emqx_resource:query(?ID, {inc_counter, 99}, #{ emqx_resource:query(?ID, {inc_counter, 99}, #{
async_reply_fun => {Insert, [Tab0, tmp_query]} async_reply_fun => {Insert, [Tab0, tmp_query]}
}), }),
#{?snk_kind := resource_worker_appended_to_queue}, #{?snk_kind := buffer_worker_appended_to_queue},
1_000 1_000
), ),
tap_metrics(?LINE), tap_metrics(?LINE),
@ -490,7 +490,7 @@ t_query_counter_async_inflight(_) ->
{_, {ok, _}} = {_, {ok, _}} =
?wait_async_action( ?wait_async_action(
inc_counter_in_parallel(WindowSize, ReqOpts), inc_counter_in_parallel(WindowSize, ReqOpts),
#{?snk_kind := resource_worker_flush_but_inflight_full}, #{?snk_kind := buffer_worker_flush_but_inflight_full},
1_000 1_000
), ),
fun(Trace) -> fun(Trace) ->
@ -596,7 +596,7 @@ t_query_counter_async_inflight_batch(_) ->
{_, {ok, _}} = {_, {ok, _}} =
?wait_async_action( ?wait_async_action(
inc_counter_in_parallel(NumMsgs, ReqOpts), inc_counter_in_parallel(NumMsgs, ReqOpts),
#{?snk_kind := resource_worker_flush_but_inflight_full}, #{?snk_kind := buffer_worker_flush_but_inflight_full},
5_000 5_000
), ),
fun(Trace) -> fun(Trace) ->
@ -623,7 +623,7 @@ t_query_counter_async_inflight_batch(_) ->
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 2}), emqx_resource:query(?ID, {inc_counter, 2}),
#{?snk_kind := resource_worker_flush_but_inflight_full}, #{?snk_kind := buffer_worker_flush_but_inflight_full},
5_000 5_000
), ),
?assertMatch(0, ets:info(Tab0, size)), ?assertMatch(0, ets:info(Tab0, size)),
@ -646,7 +646,7 @@ t_query_counter_async_inflight_batch(_) ->
emqx_resource:query(?ID, {inc_counter, 3}, #{ emqx_resource:query(?ID, {inc_counter, 3}, #{
async_reply_fun => {Insert, [Tab0, tmp_query]} async_reply_fun => {Insert, [Tab0, tmp_query]}
}), }),
#{?snk_kind := resource_worker_appended_to_queue}, #{?snk_kind := buffer_worker_appended_to_queue},
1_000 1_000
), ),
tap_metrics(?LINE), tap_metrics(?LINE),
@ -706,7 +706,7 @@ t_query_counter_async_inflight_batch(_) ->
{_, {ok, _}} = {_, {ok, _}} =
?wait_async_action( ?wait_async_action(
inc_counter_in_parallel(NumMsgs, ReqOpts), inc_counter_in_parallel(NumMsgs, ReqOpts),
#{?snk_kind := resource_worker_flush_but_inflight_full}, #{?snk_kind := buffer_worker_flush_but_inflight_full},
5_000 5_000
), ),
fun(Trace) -> fun(Trace) ->
@ -1055,7 +1055,7 @@ t_retry_batch(_Config) ->
end, end,
Payloads Payloads
), ),
#{?snk_kind := resource_worker_enter_blocked}, #{?snk_kind := buffer_worker_enter_blocked},
5_000 5_000
), ),
%% now the individual messages should have been counted %% now the individual messages should have been counted
@ -1066,7 +1066,7 @@ t_retry_batch(_Config) ->
%% batch shall remain enqueued. %% batch shall remain enqueued.
{ok, _} = {ok, _} =
snabbkaffe:block_until( snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := resource_worker_retry_inflight_failed}), ?match_n_events(2, #{?snk_kind := buffer_worker_retry_inflight_failed}),
5_000 5_000
), ),
%% should not have increased the matched count with the retries %% should not have increased the matched count with the retries
@ -1078,7 +1078,7 @@ t_retry_batch(_Config) ->
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
ok = emqx_resource:simple_sync_query(?ID, resume), ok = emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := resource_worker_retry_inflight_succeeded}, #{?snk_kind := buffer_worker_retry_inflight_succeeded},
5_000 5_000
), ),
%% 1 more because of the `resume' call %% 1 more because of the `resume' call
@ -1140,7 +1140,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
NumRequests = 10, NumRequests = 10,
{ok, SRef} = snabbkaffe:subscribe( {ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := resource_worker_enter_blocked}), ?match_event(#{?snk_kind := buffer_worker_enter_blocked}),
NumBufferWorkers, NumBufferWorkers,
_Timeout = 5_000 _Timeout = 5_000
), ),
@ -1189,7 +1189,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
resume_interval => 1_000 resume_interval => 1_000
} }
), ),
#{?snk_kind := resource_worker_enter_running}, #{?snk_kind := buffer_worker_enter_running},
5_000 5_000
), ),
@ -1271,13 +1271,13 @@ t_retry_sync_inflight(_Config) ->
Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
TestPid ! {res, Res} TestPid ! {res, Res}
end), end),
#{?snk_kind := resource_worker_retry_inflight_failed}, #{?snk_kind := buffer_worker_retry_inflight_failed},
ResumeInterval * 2 ResumeInterval * 2
), ),
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
ok = emqx_resource:simple_sync_query(?ID, resume), ok = emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := resource_worker_retry_inflight_succeeded}, #{?snk_kind := buffer_worker_retry_inflight_succeeded},
ResumeInterval * 3 ResumeInterval * 3
), ),
receive receive
@ -1322,13 +1322,13 @@ t_retry_sync_inflight_batch(_Config) ->
Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
TestPid ! {res, Res} TestPid ! {res, Res}
end), end),
#{?snk_kind := resource_worker_retry_inflight_failed}, #{?snk_kind := buffer_worker_retry_inflight_failed},
ResumeInterval * 2 ResumeInterval * 2
), ),
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
ok = emqx_resource:simple_sync_query(?ID, resume), ok = emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := resource_worker_retry_inflight_succeeded}, #{?snk_kind := buffer_worker_retry_inflight_succeeded},
ResumeInterval * 3 ResumeInterval * 3
), ),
receive receive
@ -1368,7 +1368,7 @@ t_retry_async_inflight(_Config) ->
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
#{?snk_kind := resource_worker_retry_inflight_failed}, #{?snk_kind := buffer_worker_retry_inflight_failed},
ResumeInterval * 2 ResumeInterval * 2
), ),
@ -1376,7 +1376,7 @@ t_retry_async_inflight(_Config) ->
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume), emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := resource_worker_enter_running}, #{?snk_kind := buffer_worker_enter_running},
ResumeInterval * 2 ResumeInterval * 2
), ),
ok ok
@ -1411,7 +1411,7 @@ t_retry_async_inflight_batch(_Config) ->
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
#{?snk_kind := resource_worker_retry_inflight_failed}, #{?snk_kind := buffer_worker_retry_inflight_failed},
ResumeInterval * 2 ResumeInterval * 2
), ),
@ -1419,7 +1419,7 @@ t_retry_async_inflight_batch(_Config) ->
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume), emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := resource_worker_enter_running}, #{?snk_kind := buffer_worker_enter_running},
ResumeInterval * 2 ResumeInterval * 2
), ),
ok ok
@ -1459,7 +1459,7 @@ t_async_pool_worker_death(_Config) ->
NumReqs = 10, NumReqs = 10,
{ok, SRef0} = {ok, SRef0} =
snabbkaffe:subscribe( snabbkaffe:subscribe(
?match_event(#{?snk_kind := resource_worker_appended_to_inflight}), ?match_event(#{?snk_kind := buffer_worker_appended_to_inflight}),
NumReqs, NumReqs,
1_000 1_000
), ),
@ -1472,7 +1472,7 @@ t_async_pool_worker_death(_Config) ->
%% grab one of the worker pids and kill it %% grab one of the worker pids and kill it
{ok, SRef1} = {ok, SRef1} =
snabbkaffe:subscribe( snabbkaffe:subscribe(
?match_event(#{?snk_kind := resource_worker_worker_down_update}), ?match_event(#{?snk_kind := buffer_worker_worker_down_update}),
NumBufferWorkers, NumBufferWorkers,
10_000 10_000
), ),
@ -1568,8 +1568,8 @@ assert_sync_retry_fail_then_succeed_inflight(Trace) ->
ct:pal(" ~p", [Trace]), ct:pal(" ~p", [Trace]),
?assert( ?assert(
?strict_causality( ?strict_causality(
#{?snk_kind := resource_worker_flush_nack, ref := _Ref}, #{?snk_kind := buffer_worker_flush_nack, ref := _Ref},
#{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
Trace Trace
) )
), ),
@ -1577,8 +1577,8 @@ assert_sync_retry_fail_then_succeed_inflight(Trace) ->
%% before restoring the resource health. %% before restoring the resource health.
?assert( ?assert(
?causality( ?causality(
#{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
#{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref}, #{?snk_kind := buffer_worker_retry_inflight_succeeded, ref := _Ref},
Trace Trace
) )
), ),
@ -1588,8 +1588,8 @@ assert_async_retry_fail_then_succeed_inflight(Trace) ->
ct:pal(" ~p", [Trace]), ct:pal(" ~p", [Trace]),
?assert( ?assert(
?strict_causality( ?strict_causality(
#{?snk_kind := resource_worker_reply_after_query, action := nack, ref := _Ref}, #{?snk_kind := buffer_worker_reply_after_query, action := nack, ref := _Ref},
#{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
Trace Trace
) )
), ),
@ -1597,8 +1597,8 @@ assert_async_retry_fail_then_succeed_inflight(Trace) ->
%% before restoring the resource health. %% before restoring the resource health.
?assert( ?assert(
?causality( ?causality(
#{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
#{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref}, #{?snk_kind := buffer_worker_retry_inflight_succeeded, ref := _Ref},
Trace Trace
) )
), ),

View File

@ -95,7 +95,8 @@ fields("config") ->
} }
)} )}
] ++ ] ++
emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields(); (emqx_connector_mysql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields());
fields("creation_opts") -> fields("creation_opts") ->
Opts = emqx_resource_schema:fields("creation_opts"), Opts = emqx_resource_schema:fields("creation_opts"),
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];

View File

@ -97,7 +97,8 @@ fields("config") ->
} }
)} )}
] ++ ] ++
emqx_connector_pgsql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields(); (emqx_connector_pgsql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields());
fields("creation_opts") -> fields("creation_opts") ->
Opts = emqx_resource_schema:fields("creation_opts"), Opts = emqx_resource_schema:fields("creation_opts"),
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];

View File

@ -914,7 +914,7 @@ t_write_failure(Config) ->
fun(Trace0) -> fun(Trace0) ->
case QueryMode of case QueryMode of
sync -> sync ->
Trace = ?of_kind(resource_worker_flush_nack, Trace0), Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
?assertMatch([_ | _], Trace), ?assertMatch([_ | _], Trace),
[#{result := Result} | _] = Trace, [#{result := Result} | _] = Trace,
?assert( ?assert(

View File

@ -413,7 +413,7 @@ t_write_failure(Config) ->
end), end),
fun(Trace0) -> fun(Trace0) ->
ct:pal("trace: ~p", [Trace0]), ct:pal("trace: ~p", [Trace0]),
Trace = ?of_kind(resource_worker_flush_nack, Trace0), Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
?assertMatch([#{result := {error, _}} | _], Trace), ?assertMatch([#{result := {error, _}} | _], Trace),
[#{result := {error, Error}} | _] = Trace, [#{result := {error, Error}} | _] = Trace,
case Error of case Error of

View File

@ -431,7 +431,7 @@ t_write_failure(Config) ->
end), end),
fun(Trace0) -> fun(Trace0) ->
ct:pal("trace: ~p", [Trace0]), ct:pal("trace: ~p", [Trace0]),
Trace = ?of_kind(resource_worker_flush_nack, Trace0), Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
?assertMatch([#{result := {error, _}} | _], Trace), ?assertMatch([#{result := {error, _}} | _], Trace),
[#{result := {error, Error}} | _] = Trace, [#{result := {error, Error}} | _] = Trace,
case Error of case Error of