fix(pulsar_producer): do not return `disconnected` when checking status (r5.1)
Fixes https://emqx.atlassian.net/browse/EMQX-10278 Since Pulsar client has its own replayq that lives outside the management of the buffer workers, we must not return disconnected status for such bridge: otherwise, the resource manager will eventually kill the producers and data may be lost.
This commit is contained in:
parent
beee35bdea
commit
b609792a90
|
@ -84,6 +84,8 @@
|
|||
%% Toxiproxy API
|
||||
-export([
|
||||
with_failure/5,
|
||||
enable_failure/4,
|
||||
heal_failure/4,
|
||||
reset_proxy/2
|
||||
]).
|
||||
|
||||
|
|
|
@ -145,7 +145,10 @@ on_stop(InstanceId, _State) ->
|
|||
ok
|
||||
end.
|
||||
|
||||
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
|
||||
%% Note: since Pulsar client has its own replayq that is not managed by
|
||||
%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise,
|
||||
%% `emqx_resource_manager' will kill the Pulsar producers and messages might be lost.
|
||||
-spec on_get_status(resource_id(), state()) -> connected | connecting.
|
||||
on_get_status(_InstanceId, State = #{}) ->
|
||||
#{
|
||||
pulsar_client_id := ClientId,
|
||||
|
@ -157,15 +160,15 @@ on_get_status(_InstanceId, State = #{}) ->
|
|||
true ->
|
||||
get_producer_status(Producers);
|
||||
false ->
|
||||
disconnected
|
||||
connecting
|
||||
catch
|
||||
error:timeout ->
|
||||
disconnected;
|
||||
connecting;
|
||||
exit:{noproc, _} ->
|
||||
disconnected
|
||||
connecting
|
||||
end;
|
||||
{error, _} ->
|
||||
disconnected
|
||||
connecting
|
||||
end;
|
||||
on_get_status(_InstanceId, _State) ->
|
||||
%% If a health check happens just after a concurrent request to
|
||||
|
|
|
@ -45,6 +45,7 @@ only_once_tests() ->
|
|||
t_send_when_timeout,
|
||||
t_failure_to_start_producer,
|
||||
t_producer_process_crash,
|
||||
t_resilience,
|
||||
t_resource_manager_crash_after_producers_started,
|
||||
t_resource_manager_crash_before_producers_started
|
||||
].
|
||||
|
@ -733,13 +734,6 @@ t_start_stop(Config) ->
|
|||
),
|
||||
|
||||
%% Check that the bridge probe API doesn't leak atoms.
|
||||
redbug:start(
|
||||
[
|
||||
"emqx_resource_manager:health_check_interval -> return",
|
||||
"emqx_resource_manager:with_health_check -> return"
|
||||
],
|
||||
[{msgs, 100}, {time, 30_000}]
|
||||
),
|
||||
ProbeRes0 = probe_bridge_api(
|
||||
Config,
|
||||
#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
|
||||
|
@ -795,7 +789,11 @@ t_on_get_status(Config) ->
|
|||
),
|
||||
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||
ct:sleep(500),
|
||||
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
|
||||
?retry(
|
||||
_Sleep = 1_000,
|
||||
_Attempts = 20,
|
||||
?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId))
|
||||
)
|
||||
end),
|
||||
%% Check that it recovers itself.
|
||||
?retry(
|
||||
|
@ -1154,3 +1152,86 @@ do_t_cluster(Config) ->
|
|||
[]
|
||||
),
|
||||
ok.
|
||||
|
||||
t_resilience(Config) ->
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyName = ?config(proxy_name, Config),
|
||||
ResourceId = resource_id(Config),
|
||||
?check_trace(
|
||||
begin
|
||||
{ok, _} = create_bridge(Config),
|
||||
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
?retry(
|
||||
_Sleep0 = 1_000,
|
||||
_Attempts0 = 20,
|
||||
?assertEqual(
|
||||
{ok, connected},
|
||||
emqx_resource_manager:health_check(ResourceId)
|
||||
)
|
||||
),
|
||||
|
||||
{ok, C} = emqtt:start_link(),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
ProduceInterval = 100,
|
||||
TestPid = self(),
|
||||
StartSequentialProducer =
|
||||
fun Go(SeqNo0) ->
|
||||
receive
|
||||
stop -> TestPid ! {done, SeqNo0}
|
||||
after 0 ->
|
||||
SeqNo = SeqNo0 + 1,
|
||||
emqtt:publish(C, ?RULE_TOPIC_BIN, integer_to_binary(SeqNo)),
|
||||
SeqNo rem 10 =:= 0 andalso (TestPid ! {sent, SeqNo}),
|
||||
timer:sleep(ProduceInterval),
|
||||
Go(SeqNo)
|
||||
end
|
||||
end,
|
||||
SequentialProducer = spawn_link(fun() -> StartSequentialProducer(0) end),
|
||||
ct:sleep(2 * ProduceInterval),
|
||||
{ok, _} = emqx_common_test_helpers:enable_failure(
|
||||
down, ProxyName, ProxyHost, ProxyPort
|
||||
),
|
||||
?retry(
|
||||
_Sleep1 = 1_000,
|
||||
_Attempts1 = 20,
|
||||
?assertNotEqual(
|
||||
{ok, connected},
|
||||
emqx_resource_manager:health_check(ResourceId)
|
||||
)
|
||||
),
|
||||
%% Note: we don't check for timeouts here because:
|
||||
%% a) If we do trigger auto reconnect, that means that the producers were
|
||||
%% killed and the `receive_consumed' below will fail.
|
||||
%% b) If there's a timeout, that's the correct path; we just need to give the
|
||||
%% resource manager a chance to do so.
|
||||
?block_until(#{?snk_kind := resource_auto_reconnect}, 5_000),
|
||||
{ok, _} = emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort),
|
||||
?retry(
|
||||
_Sleep2 = 1_000,
|
||||
_Attempts2 = 20,
|
||||
?assertEqual(
|
||||
{ok, connected},
|
||||
emqx_resource_manager:health_check(ResourceId)
|
||||
)
|
||||
),
|
||||
SequentialProducer ! stop,
|
||||
NumProduced =
|
||||
receive
|
||||
{done, SeqNo} -> SeqNo
|
||||
after 1_000 -> ct:fail("producer didn't stop!")
|
||||
end,
|
||||
Consumed = lists:flatmap(
|
||||
fun(_) -> receive_consumed(5_000) end, lists:seq(1, NumProduced)
|
||||
),
|
||||
?assertEqual(NumProduced, length(Consumed)),
|
||||
ExpectedPayloads = lists:map(fun integer_to_binary/1, lists:seq(1, NumProduced)),
|
||||
?assertEqual(
|
||||
ExpectedPayloads, lists:map(fun(#{<<"payload">> := P}) -> P end, Consumed)
|
||||
),
|
||||
ok
|
||||
end,
|
||||
[]
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Fixed a health check issue for Pulsar Producer that could lead to loss of messages when the connection to Pulsar's brokers were down.
|
Loading…
Reference in New Issue