From 14f528cc86f3030ddff469c63ad3c6a5d7cfc251 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 1 Feb 2023 14:22:49 +0200 Subject: [PATCH 1/2] fix(resources): fix resource lifecycle * do not resume all buffer workers on successful healthcheck * do not pass undefined state to resource healthcheck callback --- .../src/emqx_resource_buffer_worker.erl | 2 +- .../src/emqx_resource_buffer_worker_sup.erl | 10 +++- .../src/emqx_resource_manager.erl | 14 ++--- .../test/emqx_resource_SUITE.erl | 58 +++++++++++++++++++ changes/v5.0.16/fix-9884.en.md | 5 ++ changes/v5.0.16/fix-9884.zh.md | 5 ++ 6 files changed, 85 insertions(+), 9 deletions(-) create mode 100644 changes/v5.0.16/fix-9884.en.md create mode 100644 changes/v5.0.16/fix-9884.zh.md diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index f91a994c7..4ef384da6 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -195,7 +195,7 @@ init({Id, Index, Opts}) -> {ok, running, Data}. running(enter, _, Data) -> - ?tp(buffer_worker_enter_running, #{}), + ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data)}), %% According to `gen_statem' laws, we mustn't call `maybe_flush' %% directly because it may decide to return `{next_state, blocked, _}', %% and that's an invalid response for a state enter call. diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl index 4987946c9..a00dcdcd2 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl @@ -23,7 +23,7 @@ %% External API -export([start_link/0]). --export([start_workers/2, stop_workers/2]). +-export([start_workers/2, stop_workers/2, worker_pids/1]). %% Callbacks -export([init/1]). @@ -75,6 +75,14 @@ stop_workers(ResId, Opts) -> ensure_worker_pool_removed(ResId), ok. +worker_pids(ResId) -> + lists:map( + fun({_Name, Pid}) -> + Pid + end, + gproc_pool:active_workers(ResId) + ). + %%%============================================================================= %%% Internal %%%============================================================================= diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 232b17ce7..5de55fc4f 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -555,12 +555,14 @@ handle_connected_health_check(Data) -> end ). +with_health_check(#data{state = undefined} = Data, Func) -> + Func(disconnected, Data); with_health_check(Data, Func) -> ResId = Data#data.id, HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state), {Status, NewState, Err} = parse_health_check_result(HCRes, Data), _ = maybe_alarm(Status, ResId), - ok = maybe_resume_resource_workers(Status), + ok = maybe_resume_resource_workers(ResId, Status), UpdatedData = Data#data{ state = NewState, status = Status, error = Err }, @@ -581,14 +583,12 @@ maybe_alarm(_Status, ResId) -> <<"resource down: ", ResId/binary>> ). -maybe_resume_resource_workers(connected) -> +maybe_resume_resource_workers(ResId, connected) -> lists:foreach( - fun({_, Pid, _, _}) -> - emqx_resource_buffer_worker:resume(Pid) - end, - supervisor:which_children(emqx_resource_buffer_worker_sup) + fun emqx_resource_buffer_worker:resume/1, + emqx_resource_buffer_worker_sup:worker_pids(ResId) ); -maybe_resume_resource_workers(_) -> +maybe_resume_resource_workers(_, _) -> ok. maybe_clear_alarm(<>) -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 620516a88..0d3822ec2 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -24,6 +24,7 @@ -define(TEST_RESOURCE, emqx_connector_demo). -define(ID, <<"id">>). +-define(ID1, <<"id1">>). -define(DEFAULT_RESOURCE_GROUP, <<"default">>). -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). @@ -1033,6 +1034,63 @@ t_auto_retry(_) -> ), ?assertEqual(ok, Res). +t_health_check_disconnected(_) -> + _ = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, create_error => true}, + #{auto_retry_interval => 100} + ), + ?assertEqual( + {ok, disconnected}, + emqx_resource:health_check(?ID) + ). + +t_unblock_only_required_buffer_workers(_) -> + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 5 + } + ), + lists:foreach( + fun emqx_resource_buffer_worker:block/1, + emqx_resource_buffer_worker_sup:worker_pids(?ID) + ), + emqx_resource:create( + ?ID1, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 5 + } + ), + %% creation of `?ID1` should not have unblocked `?ID`'s buffer workers + %% so we should see resumes now (`buffer_worker_enter_running`). + ?check_trace( + ?wait_async_action( + lists:foreach( + fun emqx_resource_buffer_worker:resume/1, + emqx_resource_buffer_worker_sup:worker_pids(?ID) + ), + #{?snk_kind := buffer_worker_enter_running}, + 5000 + ), + fun(Trace) -> + ?assertMatch( + [#{id := ?ID} | _], + ?of_kind(buffer_worker_enter_running, Trace) + ) + end + ). + t_retry_batch(_Config) -> {ok, _} = emqx_resource:create( ?ID, diff --git a/changes/v5.0.16/fix-9884.en.md b/changes/v5.0.16/fix-9884.en.md new file mode 100644 index 000000000..7676b1213 --- /dev/null +++ b/changes/v5.0.16/fix-9884.en.md @@ -0,0 +1,5 @@ +Fix resource health check process + +* Do not resume all buffer workers on successful health check. Previously after a successful healthcheck all buffer workers (for all resources) were resumed + +* Do not pass undefined state to resource health check callback. If `on_start` callback never succeeded, the state of the resource is undefined. There is no sense to pass it to `on_get_status` callback. diff --git a/changes/v5.0.16/fix-9884.zh.md b/changes/v5.0.16/fix-9884.zh.md new file mode 100644 index 000000000..6d6894242 --- /dev/null +++ b/changes/v5.0.16/fix-9884.zh.md @@ -0,0 +1,5 @@ +修复资源健康检查流程 + +* 不要在健康检查成功时恢复所有缓冲区工作者。 之前,在成功进行健康检查后,所有缓冲区工作人员(针对所有资源)都已恢复 + +* 不要将未定义的状态传递给资源健康检查回调。 如果 `on_start` 回调从未成功,资源的状态是未定义的。 将它传递给 `on_get_status` 回调是没有意义的。 From 44b7624c10f05093060712ec97ab9b853c1b5a89 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 2 Feb 2023 09:13:18 +0100 Subject: [PATCH 2/2] docs: update changelog --- changes/v5.0.16/fix-9884.en.md | 7 ++----- changes/v5.0.16/fix-9884.zh.md | 7 ++----- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/changes/v5.0.16/fix-9884.en.md b/changes/v5.0.16/fix-9884.en.md index 7676b1213..28eacfc86 100644 --- a/changes/v5.0.16/fix-9884.en.md +++ b/changes/v5.0.16/fix-9884.en.md @@ -1,5 +1,2 @@ -Fix resource health check process - -* Do not resume all buffer workers on successful health check. Previously after a successful healthcheck all buffer workers (for all resources) were resumed - -* Do not pass undefined state to resource health check callback. If `on_start` callback never succeeded, the state of the resource is undefined. There is no sense to pass it to `on_get_status` callback. +Do not resume all buffer workers on successful health check of any individual resource. +Previously after any successful healthcheck, all buffer workers (for all resources) were resumed diff --git a/changes/v5.0.16/fix-9884.zh.md b/changes/v5.0.16/fix-9884.zh.md index 6d6894242..08f6e7188 100644 --- a/changes/v5.0.16/fix-9884.zh.md +++ b/changes/v5.0.16/fix-9884.zh.md @@ -1,5 +1,2 @@ -修复资源健康检查流程 - -* 不要在健康检查成功时恢复所有缓冲区工作者。 之前,在成功进行健康检查后,所有缓冲区工作人员(针对所有资源)都已恢复 - -* 不要将未定义的状态传递给资源健康检查回调。 如果 `on_start` 回调从未成功,资源的状态是未定义的。 将它传递给 `on_get_status` 回调是没有意义的。 +不在任意一个资源健康检查成功时恢复所有资源发送缓存。 +在此修复之前,在任意一个资源成功进行健康检查后,所有资源的缓存都会尝试恢复。