diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index f91a994c7..4ef384da6 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -195,7 +195,7 @@ init({Id, Index, Opts}) -> {ok, running, Data}. running(enter, _, Data) -> - ?tp(buffer_worker_enter_running, #{}), + ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data)}), %% According to `gen_statem' laws, we mustn't call `maybe_flush' %% directly because it may decide to return `{next_state, blocked, _}', %% and that's an invalid response for a state enter call. diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl index 4987946c9..a00dcdcd2 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl @@ -23,7 +23,7 @@ %% External API -export([start_link/0]). --export([start_workers/2, stop_workers/2]). +-export([start_workers/2, stop_workers/2, worker_pids/1]). %% Callbacks -export([init/1]). @@ -75,6 +75,14 @@ stop_workers(ResId, Opts) -> ensure_worker_pool_removed(ResId), ok. +worker_pids(ResId) -> + lists:map( + fun({_Name, Pid}) -> + Pid + end, + gproc_pool:active_workers(ResId) + ). + %%%============================================================================= %%% Internal %%%============================================================================= diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 232b17ce7..5de55fc4f 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -555,12 +555,14 @@ handle_connected_health_check(Data) -> end ). +with_health_check(#data{state = undefined} = Data, Func) -> + Func(disconnected, Data); with_health_check(Data, Func) -> ResId = Data#data.id, HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state), {Status, NewState, Err} = parse_health_check_result(HCRes, Data), _ = maybe_alarm(Status, ResId), - ok = maybe_resume_resource_workers(Status), + ok = maybe_resume_resource_workers(ResId, Status), UpdatedData = Data#data{ state = NewState, status = Status, error = Err }, @@ -581,14 +583,12 @@ maybe_alarm(_Status, ResId) -> <<"resource down: ", ResId/binary>> ). -maybe_resume_resource_workers(connected) -> +maybe_resume_resource_workers(ResId, connected) -> lists:foreach( - fun({_, Pid, _, _}) -> - emqx_resource_buffer_worker:resume(Pid) - end, - supervisor:which_children(emqx_resource_buffer_worker_sup) + fun emqx_resource_buffer_worker:resume/1, + emqx_resource_buffer_worker_sup:worker_pids(ResId) ); -maybe_resume_resource_workers(_) -> +maybe_resume_resource_workers(_, _) -> ok. maybe_clear_alarm(<>) -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 3ae69a47d..bc0331d02 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -24,6 +24,7 @@ -define(TEST_RESOURCE, emqx_connector_demo). -define(ID, <<"id">>). +-define(ID1, <<"id1">>). -define(DEFAULT_RESOURCE_GROUP, <<"default">>). -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). @@ -1029,6 +1030,63 @@ t_auto_retry(_) -> ), ?assertEqual(ok, Res). +t_health_check_disconnected(_) -> + _ = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, create_error => true}, + #{auto_retry_interval => 100} + ), + ?assertEqual( + {ok, disconnected}, + emqx_resource:health_check(?ID) + ). + +t_unblock_only_required_buffer_workers(_) -> + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 5 + } + ), + lists:foreach( + fun emqx_resource_buffer_worker:block/1, + emqx_resource_buffer_worker_sup:worker_pids(?ID) + ), + emqx_resource:create( + ?ID1, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 5 + } + ), + %% creation of `?ID1` should not have unblocked `?ID`'s buffer workers + %% so we should see resumes now (`buffer_worker_enter_running`). + ?check_trace( + ?wait_async_action( + lists:foreach( + fun emqx_resource_buffer_worker:resume/1, + emqx_resource_buffer_worker_sup:worker_pids(?ID) + ), + #{?snk_kind := buffer_worker_enter_running}, + 5000 + ), + fun(Trace) -> + ?assertMatch( + [#{id := ?ID} | _], + ?of_kind(buffer_worker_enter_running, Trace) + ) + end + ). + t_retry_batch(_Config) -> {ok, _} = emqx_resource:create( ?ID, diff --git a/changes/v5.0.16/fix-9884.en.md b/changes/v5.0.16/fix-9884.en.md new file mode 100644 index 000000000..28eacfc86 --- /dev/null +++ b/changes/v5.0.16/fix-9884.en.md @@ -0,0 +1,2 @@ +Do not resume all buffer workers on successful health check of any individual resource. +Previously after any successful healthcheck, all buffer workers (for all resources) were resumed diff --git a/changes/v5.0.16/fix-9884.zh.md b/changes/v5.0.16/fix-9884.zh.md new file mode 100644 index 000000000..08f6e7188 --- /dev/null +++ b/changes/v5.0.16/fix-9884.zh.md @@ -0,0 +1,2 @@ +不在任意一个资源健康检查成功时恢复所有资源发送缓存。 +在此修复之前,在任意一个资源成功进行健康检查后,所有资源的缓存都会尝试恢复。