fix(resources): fix resource lifecycle
* do not resume all buffer workers on successful healthcheck * do not pass undefined state to resource healthcheck callback
This commit is contained in:
parent
13511d2782
commit
14f528cc86
|
@ -195,7 +195,7 @@ init({Id, Index, Opts}) ->
|
|||
{ok, running, Data}.
|
||||
|
||||
running(enter, _, Data) ->
|
||||
?tp(buffer_worker_enter_running, #{}),
|
||||
?tp(buffer_worker_enter_running, #{id => maps:get(id, Data)}),
|
||||
%% According to `gen_statem' laws, we mustn't call `maybe_flush'
|
||||
%% directly because it may decide to return `{next_state, blocked, _}',
|
||||
%% and that's an invalid response for a state enter call.
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
%% External API
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([start_workers/2, stop_workers/2]).
|
||||
-export([start_workers/2, stop_workers/2, worker_pids/1]).
|
||||
|
||||
%% Callbacks
|
||||
-export([init/1]).
|
||||
|
@ -75,6 +75,14 @@ stop_workers(ResId, Opts) ->
|
|||
ensure_worker_pool_removed(ResId),
|
||||
ok.
|
||||
|
||||
worker_pids(ResId) ->
|
||||
lists:map(
|
||||
fun({_Name, Pid}) ->
|
||||
Pid
|
||||
end,
|
||||
gproc_pool:active_workers(ResId)
|
||||
).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% Internal
|
||||
%%%=============================================================================
|
||||
|
|
|
@ -555,12 +555,14 @@ handle_connected_health_check(Data) ->
|
|||
end
|
||||
).
|
||||
|
||||
with_health_check(#data{state = undefined} = Data, Func) ->
|
||||
Func(disconnected, Data);
|
||||
with_health_check(Data, Func) ->
|
||||
ResId = Data#data.id,
|
||||
HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state),
|
||||
{Status, NewState, Err} = parse_health_check_result(HCRes, Data),
|
||||
_ = maybe_alarm(Status, ResId),
|
||||
ok = maybe_resume_resource_workers(Status),
|
||||
ok = maybe_resume_resource_workers(ResId, Status),
|
||||
UpdatedData = Data#data{
|
||||
state = NewState, status = Status, error = Err
|
||||
},
|
||||
|
@ -581,14 +583,12 @@ maybe_alarm(_Status, ResId) ->
|
|||
<<"resource down: ", ResId/binary>>
|
||||
).
|
||||
|
||||
maybe_resume_resource_workers(connected) ->
|
||||
maybe_resume_resource_workers(ResId, connected) ->
|
||||
lists:foreach(
|
||||
fun({_, Pid, _, _}) ->
|
||||
emqx_resource_buffer_worker:resume(Pid)
|
||||
end,
|
||||
supervisor:which_children(emqx_resource_buffer_worker_sup)
|
||||
fun emqx_resource_buffer_worker:resume/1,
|
||||
emqx_resource_buffer_worker_sup:worker_pids(ResId)
|
||||
);
|
||||
maybe_resume_resource_workers(_) ->
|
||||
maybe_resume_resource_workers(_, _) ->
|
||||
ok.
|
||||
|
||||
maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) ->
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
|
||||
-define(TEST_RESOURCE, emqx_connector_demo).
|
||||
-define(ID, <<"id">>).
|
||||
-define(ID1, <<"id1">>).
|
||||
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
|
||||
-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
|
||||
-define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
|
||||
|
@ -1033,6 +1034,63 @@ t_auto_retry(_) ->
|
|||
),
|
||||
?assertEqual(ok, Res).
|
||||
|
||||
t_health_check_disconnected(_) ->
|
||||
_ = emqx_resource:create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource, create_error => true},
|
||||
#{auto_retry_interval => 100}
|
||||
),
|
||||
?assertEqual(
|
||||
{ok, disconnected},
|
||||
emqx_resource:health_check(?ID)
|
||||
).
|
||||
|
||||
t_unblock_only_required_buffer_workers(_) ->
|
||||
{ok, _} = emqx_resource:create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource},
|
||||
#{
|
||||
query_mode => async,
|
||||
batch_size => 5
|
||||
}
|
||||
),
|
||||
lists:foreach(
|
||||
fun emqx_resource_buffer_worker:block/1,
|
||||
emqx_resource_buffer_worker_sup:worker_pids(?ID)
|
||||
),
|
||||
emqx_resource:create(
|
||||
?ID1,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource},
|
||||
#{
|
||||
query_mode => async,
|
||||
batch_size => 5
|
||||
}
|
||||
),
|
||||
%% creation of `?ID1` should not have unblocked `?ID`'s buffer workers
|
||||
%% so we should see resumes now (`buffer_worker_enter_running`).
|
||||
?check_trace(
|
||||
?wait_async_action(
|
||||
lists:foreach(
|
||||
fun emqx_resource_buffer_worker:resume/1,
|
||||
emqx_resource_buffer_worker_sup:worker_pids(?ID)
|
||||
),
|
||||
#{?snk_kind := buffer_worker_enter_running},
|
||||
5000
|
||||
),
|
||||
fun(Trace) ->
|
||||
?assertMatch(
|
||||
[#{id := ?ID} | _],
|
||||
?of_kind(buffer_worker_enter_running, Trace)
|
||||
)
|
||||
end
|
||||
).
|
||||
|
||||
t_retry_batch(_Config) ->
|
||||
{ok, _} = emqx_resource:create(
|
||||
?ID,
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
Fix resource health check process
|
||||
|
||||
* Do not resume all buffer workers on successful health check. Previously after a successful healthcheck all buffer workers (for all resources) were resumed
|
||||
|
||||
* Do not pass undefined state to resource health check callback. If `on_start` callback never succeeded, the state of the resource is undefined. There is no sense to pass it to `on_get_status` callback.
|
|
@ -0,0 +1,5 @@
|
|||
修复资源健康检查流程
|
||||
|
||||
* 不要在健康检查成功时恢复所有缓冲区工作者。 之前,在成功进行健康检查后,所有缓冲区工作人员(针对所有资源)都已恢复
|
||||
|
||||
* 不要将未定义的状态传递给资源健康检查回调。 如果 `on_start` 回调从未成功,资源的状态是未定义的。 将它传递给 `on_get_status` 回调是没有意义的。
|
Loading…
Reference in New Issue