diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index 9f42763ee..7684d1469 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -141,6 +141,9 @@ connect(Opts0) -> {Topic, MQTTConfig} = lists:nth(Index, TopicMappingList), Config = #{ ack_retry_interval => AckRetryInterval, + %% Note: the `connector_state' value here must be immutable and not changed by the + %% bridge during `on_get_status', since we have handed it over to the pull + %% workers. connector_state => ConnectorState, hookpoint => Hookpoint, instance_id => InstanceId, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index 901e38dc7..e004875f9 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -69,6 +69,8 @@ on_stop(InstanceId, undefined = _State) -> -spec on_get_status(resource_id(), state()) -> connected | disconnected. on_get_status(InstanceId, _State) -> + %% Note: do *not* alter the `connector_state' value here. It must be immutable, since + %% we have handed it over to the pull workers. case emqx_resource_pool:health_check_workers( InstanceId,