feat(buffer_worker): monitor async workers and cancel their inflight requests upon death

This commit is contained in:
Thales Macedo Garitezi 2023-01-16 15:16:41 -03:00
parent 731ac6567a
commit 006b4bda97
9 changed files with 243 additions and 54 deletions

View File

@ -380,7 +380,8 @@ on_query_async(
NRequest, NRequest,
Timeout, Timeout,
{fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]} {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]}
). ),
{ok, Worker}.
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of case do_get_status(PoolName, Timeout) of

View File

@ -198,7 +198,10 @@ on_query_async(
#{name := InstanceId} #{name := InstanceId}
) -> ) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}). %% this is a cast, currently.
ok = emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}),
WorkerPid = get_worker_pid(InstanceId),
{ok, WorkerPid}.
on_get_status(_InstId, #{name := InstanceId}) -> on_get_status(_InstId, #{name := InstanceId}) ->
case emqx_connector_mqtt_worker:status(InstanceId) of case emqx_connector_mqtt_worker:status(InstanceId) of
@ -212,6 +215,12 @@ ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
%% mqtt workers, when created and called via bridge callbacks, are
%% registered.
-spec get_worker_pid(atom()) -> pid().
get_worker_pid(InstanceId) ->
whereis(InstanceId).
make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 -> make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 ->
undefined; undefined;
make_sub_confs(undefined, _Conf, _) -> make_sub_confs(undefined, _Conf, _) ->

View File

@ -66,10 +66,11 @@
?REPLY(FROM, REQUEST, SENT, RESULT) ?REPLY(FROM, REQUEST, SENT, RESULT)
|| ?QUERY(FROM, REQUEST, SENT) <- BATCH || ?QUERY(FROM, REQUEST, SENT) <- BATCH
]). ]).
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerPid), -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef),
{Ref, BatchOrQuery, IsRetriable, WorkerPid} {Ref, BatchOrQuery, IsRetriable, WorkerMRef}
). ).
-define(RETRY_IDX, 3). -define(RETRY_IDX, 3).
-define(WORKER_MREF_IDX, 4).
-type id() :: binary(). -type id() :: binary().
-type index() :: pos_integer(). -type index() :: pos_integer().
@ -79,14 +80,15 @@
-type request_from() :: undefined | gen_statem:from(). -type request_from() :: undefined | gen_statem:from().
-type state() :: blocked | running. -type state() :: blocked | running.
-type data() :: #{ -type data() :: #{
id => id(), id := id(),
index => index(), index := index(),
inflight_tid => ets:tid(), inflight_tid := ets:tid(),
batch_size => pos_integer(), async_workers := #{pid() => reference()},
batch_time => timer:time(), batch_size := pos_integer(),
queue => replayq:q(), batch_time := timer:time(),
resume_interval => timer:time(), queue := replayq:q(),
tref => undefined | timer:tref() resume_interval := timer:time(),
tref := undefined | timer:tref()
}. }.
callback_mode() -> [state_functions, state_enter]. callback_mode() -> [state_functions, state_enter].
@ -172,21 +174,22 @@ init({Id, Index, Opts}) ->
Queue = replayq:open(QueueOpts), Queue = replayq:open(QueueOpts),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
emqx_resource_metrics:inflight_set(Id, Index, 0), emqx_resource_metrics:inflight_set(Id, Index, 0),
InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
InflightTID = inflight_new(InfltWinSZ, Id, Index), InflightTID = inflight_new(InflightWinSize, Id, Index),
HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
St = #{ Data = #{
id => Id, id => Id,
index => Index, index => Index,
inflight_tid => InflightTID, inflight_tid => InflightTID,
async_workers => #{},
batch_size => BatchSize, batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
queue => Queue, queue => Queue,
resume_interval => maps:get(resume_interval, Opts, HCItvl), resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
tref => undefined tref => undefined
}, },
?tp(resource_worker_init, #{id => Id, index => Index}), ?tp(resource_worker_init, #{id => Id, index => Index}),
{ok, blocked, St, {next_event, cast, resume}}. {ok, blocked, Data, {next_event, cast, resume}}.
running(enter, _, St) -> running(enter, _, St) ->
?tp(resource_worker_enter_running, #{}), ?tp(resource_worker_enter_running, #{}),
@ -203,6 +206,11 @@ running(internal, flush, St) ->
flush(St); flush(St);
running(info, {flush, _Ref}, _St) -> running(info, {flush, _Ref}, _St) ->
keep_state_and_data; keep_state_and_data;
running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
is_map_key(Pid, AsyncWorkers0)
->
?SLOG(info, #{msg => async_worker_died, state => running, reason => Reason}),
handle_async_worker_down(Data0, Pid);
running(info, Info, _St) -> running(info, Info, _St) ->
?SLOG(error, #{msg => unexpected_msg, state => running, info => Info}), ?SLOG(error, #{msg => unexpected_msg, state => running, info => Info}),
keep_state_and_data. keep_state_and_data.
@ -224,6 +232,11 @@ blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) -
{keep_state, Data}; {keep_state, Data};
blocked(info, {flush, _Ref}, _Data) -> blocked(info, {flush, _Ref}, _Data) ->
keep_state_and_data; keep_state_and_data;
blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
is_map_key(Pid, AsyncWorkers0)
->
?SLOG(info, #{msg => async_worker_died, state => blocked, reason => Reason}),
handle_async_worker_down(Data0, Pid);
blocked(info, Info, _Data) -> blocked(info, Info, _Data) ->
?SLOG(error, #{msg => unexpected_msg, state => blocked, info => Info}), ?SLOG(error, #{msg => unexpected_msg, state => blocked, info => Info}),
keep_state_and_data. keep_state_and_data.
@ -458,11 +471,15 @@ do_flush(
is_recoverable_error_result(Result) orelse is_recoverable_error_result(Result) orelse
is_not_connected_result(Result), is_not_connected_result(Result),
ShouldPreserveInInflight = is_not_connected_result(Result), ShouldPreserveInInflight = is_not_connected_result(Result),
WorkerPid = undefined, %% we set it atomically just below; a limitation of having
InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerPid), %% to use tuples for atomic ets updates
WorkerMRef0 = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerMRef0),
ShouldPreserveInInflight andalso ShouldPreserveInInflight andalso
inflight_append(InflightTID, InflightItem, Id, Index), inflight_append(InflightTID, InflightItem, Id, Index),
IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref), IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref),
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp( ?tp(
resource_worker_flush_nack, resource_worker_flush_nack,
@ -473,18 +490,20 @@ do_flush(
result => Result result => Result
} }
), ),
{next_state, blocked, Data0}; {next_state, blocked, Data1};
%% Success; just ack. %% Success; just ack.
ack -> ack ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index),
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp(resource_worker_flush_ack, #{batch_or_query => Request}), ?tp(resource_worker_flush_ack, #{batch_or_query => Request}),
case queue_count(Q1) > 0 of case queue_count(Q1) > 0 of
true -> true ->
{keep_state, Data0, [{next_event, internal, flush}]}; {keep_state, Data1, [{next_event, internal, flush}]};
false -> false ->
{keep_state, Data0} {keep_state, Data1}
end end
end; end;
do_flush(Data0, #{ do_flush(Data0, #{
@ -518,11 +537,15 @@ do_flush(Data0, #{
is_recoverable_error_result(Result) orelse is_recoverable_error_result(Result) orelse
is_not_connected_result(Result), is_not_connected_result(Result),
ShouldPreserveInInflight = is_not_connected_result(Result), ShouldPreserveInInflight = is_not_connected_result(Result),
WorkerPid = undefined, %% we set it atomically just below; a limitation of having
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), %% to use tuples for atomic ets updates
WorkerMRef0 = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef0),
ShouldPreserveInInflight andalso ShouldPreserveInInflight andalso
inflight_append(InflightTID, InflightItem, Id, Index), inflight_append(InflightTID, InflightItem, Id, Index),
IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref), IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref),
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp( ?tp(
resource_worker_flush_nack, resource_worker_flush_nack,
@ -533,22 +556,24 @@ do_flush(Data0, #{
result => Result result => Result
} }
), ),
{next_state, blocked, Data0}; {next_state, blocked, Data1};
%% Success; just ack. %% Success; just ack.
ack -> ack ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index),
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp(resource_worker_flush_ack, #{batch_or_query => Batch}), ?tp(resource_worker_flush_ack, #{batch_or_query => Batch}),
CurrentCount = queue_count(Q1), CurrentCount = queue_count(Q1),
case {CurrentCount > 0, CurrentCount >= BatchSize} of case {CurrentCount > 0, CurrentCount >= BatchSize} of
{false, _} -> {false, _} ->
{keep_state, Data0}; {keep_state, Data1};
{true, true} -> {true, true} ->
{keep_state, Data0, [{next_event, internal, flush}]}; {keep_state, Data1, [{next_event, internal, flush}]};
{true, false} -> {true, false} ->
Data1 = ensure_flush_timer(Data0), Data2 = ensure_flush_timer(Data1),
{keep_state, Data1} {keep_state, Data2}
end end
end. end.
@ -653,6 +678,8 @@ handle_query_result_pure(Id, {async_return, {error, Msg}}, HasBeenSent) ->
{ack, PostFn}; {ack, PostFn};
handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) -> handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) ->
{ack, fun() -> ok end}; {ack, fun() -> ok end};
handle_query_result_pure(_Id, {async_return, {ok, Pid}}, _HasBeenSent) when is_pid(Pid) ->
{ack, fun() -> ok end};
handle_query_result_pure(Id, Result, HasBeenSent) -> handle_query_result_pure(Id, Result, HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
assert_ok_result(Result), assert_ok_result(Result),
@ -661,6 +688,13 @@ handle_query_result_pure(Id, Result, HasBeenSent) ->
end, end,
{ack, PostFn}. {ack, PostFn}.
handle_async_worker_down(Data0, Pid) ->
#{async_workers := AsyncWorkers0} = Data0,
{WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
Data = Data0#{async_workers := AsyncWorkers},
cancel_inflight_items(Data, WorkerMRef),
{keep_state, Data}.
is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when
Error =:= not_connected; Error =:= blocked Error =:= not_connected; Error =:= blocked
-> ->
@ -723,8 +757,8 @@ apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt,
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
IsRetriable = false, IsRetriable = false,
WorkerPid = undefined, WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid), InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index), ok = inflight_append(InflightTID, InflightItem, Id, Index),
Mod:on_query(Id, Request, ResSt) Mod:on_query(Id, Request, ResSt)
end, end,
@ -745,8 +779,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt
ReplyFun = fun ?MODULE:reply_after_query/7, ReplyFun = fun ?MODULE:reply_after_query/7,
Args = [self(), Id, Index, InflightTID, Ref, Query], Args = [self(), Id, Index, InflightTID, Ref, Query],
IsRetriable = false, IsRetriable = false,
WorkerPid = undefined, WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid), InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index), ok = inflight_append(InflightTID, InflightItem, Id, Index),
Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
{async_return, Result} {async_return, Result}
@ -769,8 +803,8 @@ apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt,
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
IsRetriable = false, IsRetriable = false,
WorkerPid = undefined, WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index), ok = inflight_append(InflightTID, InflightItem, Id, Index),
Mod:on_batch_query(Id, Requests, ResSt) Mod:on_batch_query(Id, Requests, ResSt)
end, end,
@ -792,8 +826,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]}, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]},
Requests = [Request || ?QUERY(_From, Request, _) <- Batch], Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
IsRetriable = false, IsRetriable = false,
WorkerPid = undefined, WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index), ok = inflight_append(InflightTID, InflightItem, Id, Index),
Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt), Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt),
{async_return, Result} {async_return, Result}
@ -905,7 +939,7 @@ inflight_new(InfltWinSZ, Id, Index) ->
inflight_get_first_retriable(InflightTID) -> inflight_get_first_retriable(InflightTID) ->
MatchSpec = MatchSpec =
ets:fun2ms( ets:fun2ms(
fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerPid)) when fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when
IsRetriable =:= true IsRetriable =:= true
-> ->
{Ref, BatchOrQuery} {Ref, BatchOrQuery}
@ -944,12 +978,12 @@ inflight_append(undefined, _InflightItem, _Id, _Index) ->
ok; ok;
inflight_append( inflight_append(
InflightTID, InflightTID,
?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerPid), ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerMRef),
Id, Id,
Index Index
) -> ) ->
Batch = mark_as_sent(Batch0), Batch = mark_as_sent(Batch0),
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
IsNew = ets:insert_new(InflightTID, InflightItem), IsNew = ets:insert_new(InflightTID, InflightItem),
BatchSize = length(Batch), BatchSize = length(Batch),
IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}),
@ -958,12 +992,12 @@ inflight_append(
ok; ok;
inflight_append( inflight_append(
InflightTID, InflightTID,
?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerPid), ?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerMRef),
Id, Id,
Index Index
) -> ) ->
Query = mark_as_sent(Query0), Query = mark_as_sent(Query0),
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid), InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
IsNew = ets:insert_new(InflightTID, InflightItem), IsNew = ets:insert_new(InflightTID, InflightItem),
IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
@ -983,14 +1017,46 @@ mark_inflight_as_retriable(InflightTID, Ref) ->
_ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}), _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}),
ok. ok.
%% Track each worker pid only once.
ensure_async_worker_monitored(
Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, WorkerPid}} = _Result
) when
is_pid(WorkerPid), is_map_key(WorkerPid, AsyncWorkers)
->
WorkerMRef = maps:get(WorkerPid, AsyncWorkers),
{Data0, WorkerMRef};
ensure_async_worker_monitored(
Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, WorkerPid}}
) when
is_pid(WorkerPid)
->
WorkerMRef = monitor(process, WorkerPid),
AsyncWorkers = AsyncWorkers0#{WorkerPid => WorkerMRef},
Data = Data0#{async_workers := AsyncWorkers},
{Data, WorkerMRef};
ensure_async_worker_monitored(Data0, _Result) ->
{Data0, undefined}.
store_async_worker_reference(undefined = _InflightTID, _Ref, _WorkerMRef) ->
ok;
store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) ->
ok;
store_async_worker_reference(InflightTID, Ref, WorkerMRef) when
is_reference(WorkerMRef)
->
_ = ets:update_element(
InflightTID, Ref, {?WORKER_MREF_IDX, WorkerMRef}
),
ok.
ack_inflight(undefined, _Ref, _Id, _Index) -> ack_inflight(undefined, _Ref, _Id, _Index) ->
false; false;
ack_inflight(InflightTID, Ref, Id, Index) -> ack_inflight(InflightTID, Ref, Id, Index) ->
Count = Count =
case ets:take(InflightTID, Ref) of case ets:take(InflightTID, Ref) of
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerPid)] -> [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerMRef)] ->
1; 1;
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerPid)] -> [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
length(Batch); length(Batch);
_ -> _ ->
0 0
@ -1000,6 +1066,38 @@ ack_inflight(InflightTID, Ref, Id, Index) ->
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
IsAcked. IsAcked.
cancel_inflight_items(Data, WorkerMRef) ->
#{inflight_tid := InflightTID} = Data,
MatchSpec =
ets:fun2ms(
fun(?INFLIGHT_ITEM(Ref, _BatchOrQuery, _IsRetriable, WorkerMRef0)) when
WorkerMRef =:= WorkerMRef0
->
Ref
end
),
Refs = ets:select(InflightTID, MatchSpec),
lists:foreach(fun(Ref) -> do_cancel_inflight_item(Data, Ref) end, Refs).
do_cancel_inflight_item(Data, Ref) ->
#{id := Id, index := Index, inflight_tid := InflightTID} = Data,
{Count, Batch} =
case ets:take(InflightTID, Ref) of
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _) = Query, _IsRetriable, _WorkerMRef)] ->
{1, [Query]};
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, _IsRetriable, _WorkerMRef)] ->
{length(Batch0), Batch0};
_ ->
{0, []}
end,
IsAcked = Count > 0,
IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
Result = {error, interrupted},
_ = batch_reply_caller(Id, Result, Batch),
?tp(resource_worker_cancelled_inflight, #{ref => Ref}),
ok.
%%============================================================================== %%==============================================================================
inc_sent_failed(Id, _HasBeenSent = true) -> inc_sent_failed(Id, _HasBeenSent = true) ->

View File

@ -138,10 +138,10 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) -> on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
Pid ! {inc, N, ReplyFun}, Pid ! {inc, N, ReplyFun},
ok; {ok, Pid};
on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) -> on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) ->
Pid ! {get, ReplyFun}, Pid ! {get, ReplyFun},
ok; {ok, Pid};
on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) -> on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) ->
Pid ! {block_now, ReplyFun}, Pid ! {block_now, ReplyFun},
{ok, Pid}; {ok, Pid};

View File

@ -1422,6 +1422,86 @@ t_dont_retry_async_inflight_batch(_Config) ->
), ),
ok. ok.
%% check that we monitor async worker pids and abort their inflight
%% requests if they die.
t_async_pool_worker_death(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
worker_pool_size => 2,
resume_interval => ResumeInterval
}
),
Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
Insert0 = fun(Tab, Ref, Result) ->
ct:pal("inserting ~p", [{Ref, Result}]),
ets:insert(Tab, {Ref, Result})
end,
ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
?check_trace(
begin
ok = emqx_resource:simple_sync_query(?ID, block),
NumReqs = 10,
{ok, SRef0} =
snabbkaffe:subscribe(
?match_event(#{?snk_kind := resource_worker_appended_to_inflight}),
NumReqs,
1_000
),
inc_counter_in_parallel_increasing(NumReqs, 1, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef0),
Inflight0 = emqx_resource_metrics:inflight_get(?ID),
?assertEqual(NumReqs, Inflight0),
%% grab one of the worker pids and kill it
{ok, SRef1} =
snabbkaffe:subscribe(
?match_event(#{?snk_kind := resource_worker_cancelled_inflight}),
NumReqs,
1_000
),
{ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state),
MRef = monitor(process, Pid0),
ct:pal("will kill ~p", [Pid0]),
exit(Pid0, kill),
receive
{'DOWN', MRef, process, Pid0, killed} ->
ct:pal("~p killed", [Pid0]),
ok
after 200 ->
ct:fail("worker should have died")
end,
%% inflight requests should have been cancelled
{ok, _} = snabbkaffe:receive_events(SRef1),
Inflight1 = emqx_resource_metrics:inflight_get(?ID),
?assertEqual(0, Inflight1),
?assert(
lists:all(
fun
({_, {error, interrupted}}) -> true;
(_) -> false
end,
ets:tab2list(Tab0)
),
#{tab => ets:tab2list(Tab0)}
),
ok
end,
[]
),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helpers %% Helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -1,7 +1,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.6"}}}, {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.7"}}},
{emqx, {path, "../../apps/emqx"}} {emqx, {path, "../../apps/emqx"}}
]}. ]}.

View File

@ -178,7 +178,7 @@ on_query(BridgeId, {send_message, Selected}, State) ->
{send_message, map()}, {send_message, map()},
{ReplyFun :: function(), Args :: list()}, {ReplyFun :: function(), Args :: list()},
state() state()
) -> ok. ) -> {ok, pid()}.
on_query_async(BridgeId, {send_message, Selected}, ReplyFunAndArgs, State) -> on_query_async(BridgeId, {send_message, Selected}, ReplyFunAndArgs, State) ->
Requests = [{send_message, Selected}], Requests = [{send_message, Selected}],
?TRACE( ?TRACE(
@ -210,7 +210,7 @@ on_batch_query(BridgeId, Requests, State) ->
[{send_message, map()}], [{send_message, map()}],
{ReplyFun :: function(), Args :: list()}, {ReplyFun :: function(), Args :: list()},
state() state()
) -> ok. ) -> {ok, pid()}.
on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) -> on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) ->
?TRACE( ?TRACE(
"QUERY_ASYNC", "QUERY_ASYNC",
@ -496,7 +496,7 @@ do_send_requests_sync(State, Requests, ResourceId) ->
[{send_message, map()}], [{send_message, map()}],
{ReplyFun :: function(), Args :: list()}, {ReplyFun :: function(), Args :: list()},
resource_id() resource_id()
) -> ok. ) -> {ok, pid()}.
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
#{ #{
pool_name := PoolName, pool_name := PoolName,
@ -531,7 +531,8 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
Request, Request,
RequestTimeout, RequestTimeout,
{fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]} {fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]}
). ),
{ok, Worker}.
-spec reply_delegator( -spec reply_delegator(
resource_id(), resource_id(),

View File

@ -356,7 +356,7 @@ do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
connector => InstId, connector => InstId,
points => Points points => Points
}), }),
ok = influxdb:write_async(Client, Points, ReplyFunAndArgs). {ok, _WorkerPid} = influxdb:write_async(Client, Points, ReplyFunAndArgs).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans %% Tags & Fields Config Trans

View File

@ -131,7 +131,7 @@ defmodule EMQXUmbrella.MixProject do
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
[ [
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.6", override: true}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.7", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.7.4"}, {:wolff, github: "kafka4beam/wolff", tag: "1.7.4"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},