Merge pull request #11094 from thalesmg/fix-check-health-on-start-master
fix(resource): check status when (re)starting a resource
This commit is contained in:
commit
9c5d4f188d
|
@ -223,8 +223,7 @@ restart(ResId, Opts) when is_binary(ResId) ->
|
||||||
start(ResId, Opts) ->
|
start(ResId, Opts) ->
|
||||||
case safe_call(ResId, start, ?T_OPERATION) of
|
case safe_call(ResId, start, ?T_OPERATION) of
|
||||||
ok ->
|
ok ->
|
||||||
_ = wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)),
|
wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000));
|
||||||
ok;
|
|
||||||
{error, _Reason} = Error ->
|
{error, _Reason} = Error ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -246,6 +246,9 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}
|
||||||
on_get_status(_InstId, #{health_check_error := true}) ->
|
on_get_status(_InstId, #{health_check_error := true}) ->
|
||||||
?tp(connector_demo_health_check_error, #{}),
|
?tp(connector_demo_health_check_error, #{}),
|
||||||
disconnected;
|
disconnected;
|
||||||
|
on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) ->
|
||||||
|
?tp(connector_demo_health_check_error, #{}),
|
||||||
|
{disconnected, State, Message};
|
||||||
on_get_status(_InstId, #{pid := Pid}) ->
|
on_get_status(_InstId, #{pid := Pid}) ->
|
||||||
timer:sleep(300),
|
timer:sleep(300),
|
||||||
case is_process_alive(Pid) of
|
case is_process_alive(Pid) of
|
||||||
|
|
|
@ -40,6 +40,7 @@ groups() ->
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
ct:timetrap({seconds, 30}),
|
ct:timetrap({seconds, 30}),
|
||||||
emqx_connector_demo:set_callback_mode(always_sync),
|
emqx_connector_demo:set_callback_mode(always_sync),
|
||||||
|
snabbkaffe:start_trace(),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(_, _Config) ->
|
end_per_testcase(_, _Config) ->
|
||||||
|
@ -1145,10 +1146,33 @@ t_auto_retry(_) ->
|
||||||
?DEFAULT_RESOURCE_GROUP,
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{name => test_resource, create_error => true},
|
#{name => test_resource, create_error => true},
|
||||||
#{auto_retry_interval => 100}
|
#{health_check_interval => 100}
|
||||||
),
|
),
|
||||||
?assertEqual(ok, Res).
|
?assertEqual(ok, Res).
|
||||||
|
|
||||||
|
%% tests resources that have an asynchronous start: they are created
|
||||||
|
%% without problems, but later some issue is found when calling the
|
||||||
|
%% health check.
|
||||||
|
t_start_throw_error(_Config) ->
|
||||||
|
Message = "something went wrong",
|
||||||
|
?assertMatch(
|
||||||
|
{{ok, _}, {ok, _}},
|
||||||
|
?wait_async_action(
|
||||||
|
emqx_resource:create_local(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource, health_check_error => {msg, Message}},
|
||||||
|
#{health_check_interval => 100}
|
||||||
|
),
|
||||||
|
#{?snk_kind := connector_demo_health_check_error},
|
||||||
|
1_000
|
||||||
|
)
|
||||||
|
),
|
||||||
|
%% Now, if we try to "reconnect" (restart) it, we should get the error
|
||||||
|
?assertMatch({error, Message}, emqx_resource:start(?ID, _Opts = #{})),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_health_check_disconnected(_) ->
|
t_health_check_disconnected(_) ->
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
|
@ -1157,7 +1181,7 @@ t_health_check_disconnected(_) ->
|
||||||
?DEFAULT_RESOURCE_GROUP,
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{name => test_resource, create_error => true},
|
#{name => test_resource, create_error => true},
|
||||||
#{auto_retry_interval => 100}
|
#{health_check_interval => 100}
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
{ok, disconnected},
|
{ok, disconnected},
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed an issue where connection errors in Kafka Producer would not be reported when reconnecting the bridge.
|
Loading…
Reference in New Issue