From b609792a908b69f524d1db823e2637b9d36b3c34 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 13 Jun 2023 11:07:20 -0300 Subject: [PATCH] fix(pulsar_producer): do not return `disconnected` when checking status (r5.1) Fixes https://emqx.atlassian.net/browse/EMQX-10278 Since Pulsar client has its own replayq that lives outside the management of the buffer workers, we must not return disconnected status for such bridge: otherwise, the resource manager will eventually kill the producers and data may be lost. --- apps/emqx/test/emqx_common_test_helpers.erl | 2 + .../src/emqx_bridge_pulsar_impl_producer.erl | 13 ++- ...emqx_bridge_pulsar_impl_producer_SUITE.erl | 97 +++++++++++++++++-- changes/ee/fix-11038.en.md | 1 + 4 files changed, 100 insertions(+), 13 deletions(-) create mode 100644 changes/ee/fix-11038.en.md diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index bf4b2c0ad..dc3a0fac7 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -84,6 +84,8 @@ %% Toxiproxy API -export([ with_failure/5, + enable_failure/4, + heal_failure/4, reset_proxy/2 ]). diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 16b039ba5..2fc44e5ca 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -145,7 +145,10 @@ on_stop(InstanceId, _State) -> ok end. --spec on_get_status(resource_id(), state()) -> connected | disconnected. +%% Note: since Pulsar client has its own replayq that is not managed by +%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, +%% `emqx_resource_manager' will kill the Pulsar producers and messages might be lost. +-spec on_get_status(resource_id(), state()) -> connected | connecting. on_get_status(_InstanceId, State = #{}) -> #{ pulsar_client_id := ClientId, @@ -157,15 +160,15 @@ on_get_status(_InstanceId, State = #{}) -> true -> get_producer_status(Producers); false -> - disconnected + connecting catch error:timeout -> - disconnected; + connecting; exit:{noproc, _} -> - disconnected + connecting end; {error, _} -> - disconnected + connecting end; on_get_status(_InstanceId, _State) -> %% If a health check happens just after a concurrent request to diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 9f9381c95..4530748de 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -45,6 +45,7 @@ only_once_tests() -> t_send_when_timeout, t_failure_to_start_producer, t_producer_process_crash, + t_resilience, t_resource_manager_crash_after_producers_started, t_resource_manager_crash_before_producers_started ]. @@ -733,13 +734,6 @@ t_start_stop(Config) -> ), %% Check that the bridge probe API doesn't leak atoms. - redbug:start( - [ - "emqx_resource_manager:health_check_interval -> return", - "emqx_resource_manager:with_health_check -> return" - ], - [{msgs, 100}, {time, 30_000}] - ), ProbeRes0 = probe_bridge_api( Config, #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} @@ -795,7 +789,11 @@ t_on_get_status(Config) -> ), emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ct:sleep(500), - ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)) + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)) + ) end), %% Check that it recovers itself. ?retry( @@ -1154,3 +1152,86 @@ do_t_cluster(Config) -> [] ), ok. + +t_resilience(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + {ok, _} = create_bridge(Config), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + ?retry( + _Sleep0 = 1_000, + _Attempts0 = 20, + ?assertEqual( + {ok, connected}, + emqx_resource_manager:health_check(ResourceId) + ) + ), + + {ok, C} = emqtt:start_link(), + {ok, _} = emqtt:connect(C), + ProduceInterval = 100, + TestPid = self(), + StartSequentialProducer = + fun Go(SeqNo0) -> + receive + stop -> TestPid ! {done, SeqNo0} + after 0 -> + SeqNo = SeqNo0 + 1, + emqtt:publish(C, ?RULE_TOPIC_BIN, integer_to_binary(SeqNo)), + SeqNo rem 10 =:= 0 andalso (TestPid ! {sent, SeqNo}), + timer:sleep(ProduceInterval), + Go(SeqNo) + end + end, + SequentialProducer = spawn_link(fun() -> StartSequentialProducer(0) end), + ct:sleep(2 * ProduceInterval), + {ok, _} = emqx_common_test_helpers:enable_failure( + down, ProxyName, ProxyHost, ProxyPort + ), + ?retry( + _Sleep1 = 1_000, + _Attempts1 = 20, + ?assertNotEqual( + {ok, connected}, + emqx_resource_manager:health_check(ResourceId) + ) + ), + %% Note: we don't check for timeouts here because: + %% a) If we do trigger auto reconnect, that means that the producers were + %% killed and the `receive_consumed' below will fail. + %% b) If there's a timeout, that's the correct path; we just need to give the + %% resource manager a chance to do so. + ?block_until(#{?snk_kind := resource_auto_reconnect}, 5_000), + {ok, _} = emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort), + ?retry( + _Sleep2 = 1_000, + _Attempts2 = 20, + ?assertEqual( + {ok, connected}, + emqx_resource_manager:health_check(ResourceId) + ) + ), + SequentialProducer ! stop, + NumProduced = + receive + {done, SeqNo} -> SeqNo + after 1_000 -> ct:fail("producer didn't stop!") + end, + Consumed = lists:flatmap( + fun(_) -> receive_consumed(5_000) end, lists:seq(1, NumProduced) + ), + ?assertEqual(NumProduced, length(Consumed)), + ExpectedPayloads = lists:map(fun integer_to_binary/1, lists:seq(1, NumProduced)), + ?assertEqual( + ExpectedPayloads, lists:map(fun(#{<<"payload">> := P}) -> P end, Consumed) + ), + ok + end, + [] + ), + ok. diff --git a/changes/ee/fix-11038.en.md b/changes/ee/fix-11038.en.md new file mode 100644 index 000000000..2392019fe --- /dev/null +++ b/changes/ee/fix-11038.en.md @@ -0,0 +1 @@ +Fixed a health check issue for Pulsar Producer that could lead to loss of messages when the connection to Pulsar's brokers were down.