From ba96c725e59f3048692a2f9cddde25b31b6f0cca Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 20 Jun 2023 11:36:35 -0300 Subject: [PATCH] feat: add time-based pull for safety --- .../emqx_bridge_gcp_pubsub_consumer_worker.erl | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index 79d0ad68a..9f42763ee 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -51,6 +51,7 @@ mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(), pending_acks => [ack_id()], pull_max_messages := non_neg_integer(), + pull_timer := undefined | reference(), subscription_id => subscription_id(), topic => emqx_bridge_gcp_pubsub_connector:topic() }. @@ -58,6 +59,7 @@ -define(HEALTH_CHECK_TIMEOUT, 10_000). -define(OPTVAR_SUB_OK(PID), {?MODULE, PID}). +-define(PULL_INTERVAL, 5_000). %%------------------------------------------------------------------------------------------------- %% API used by `reply_delegator' @@ -159,7 +161,8 @@ init(Config) -> State = Config#{ ack_timer => undefined, async_workers => #{}, - pending_acks => [] + pending_acks => [], + pull_timer => undefined }, {ok, State, {continue, ensure_subscription}}. @@ -202,6 +205,11 @@ handle_info({timeout, TRef, ack}, State0 = #{ack_timer := TRef}) -> State1 = acknowledge(State0), State = ensure_ack_timer(State1), {noreply, State}; +handle_info({timeout, TRef, pull}, State0 = #{pull_timer := TRef}) -> + State1 = State0#{pull_timer := undefined}, + State2 = do_pull_async(State1), + State = ensure_pull_timer(State2), + {noreply, State}; handle_info( {'DOWN', _Ref, process, AsyncWorkerPid, _Reason}, State0 = #{async_workers := Workers0} ) when @@ -244,6 +252,12 @@ ensure_ack_timer(State = #{ack_timer := TRef}) when is_reference(TRef) -> ensure_ack_timer(State = #{ack_retry_interval := AckRetryInterval}) -> State#{ack_timer := emqx_utils:start_timer(AckRetryInterval, ack)}. +-spec ensure_pull_timer(state()) -> state(). +ensure_pull_timer(State = #{pull_timer := TRef}) when is_reference(TRef) -> + State; +ensure_pull_timer(State) -> + State#{pull_timer := emqx_utils:start_timer(?PULL_INTERVAL, pull)}. + -spec ensure_subscription_exists(state()) -> ok | error. ensure_subscription_exists(State) -> #{