diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl index cb4aa853c..eeceb0c43 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl @@ -205,7 +205,7 @@ get_topic(Topic, ConnectorState) -> Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>, Body = <<>>, PreparedRequest = {prepared_request, {Method, Path, Body}}, - query_sync(PreparedRequest, ConnectorState). + ?MODULE:query_sync(PreparedRequest, ConnectorState). %%------------------------------------------------------------------------------------------------- %% Helper fns diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index ddceb4a11..d984b42ed 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -217,7 +217,9 @@ handle_continue(?ensure_subscription, State0) -> {noreply, State0, {continue, ?ensure_subscription}}; not_found -> %% there's nothing much to do if the topic suddenly doesn't exist anymore. - {stop, {error, topic_not_found}, State0} + {stop, {error, topic_not_found}, State0}; + permission_denied -> + {stop, {error, permission_denied}, State0} end; handle_continue(?patch_subscription, State0) -> ?tp(gcp_pubsub_consumer_worker_patch_subscription_enter, #{}), @@ -291,14 +293,17 @@ handle_info(Msg, State0) -> }), {noreply, State0}. -terminate({error, topic_not_found} = _Reason, State) -> +terminate({error, Reason}, State) when + Reason =:= topic_not_found; + Reason =:= permission_denied +-> #{ instance_id := InstanceId, topic := _Topic } = State, optvar:unset(?OPTVAR_SUB_OK(self())), - emqx_bridge_gcp_pubsub_impl_consumer:mark_topic_as_nonexistent(InstanceId), - ?tp(gcp_pubsub_consumer_worker_terminate, #{reason => _Reason, topic => _Topic}), + emqx_bridge_gcp_pubsub_impl_consumer:mark_as_unhealthy(InstanceId, Reason), + ?tp(gcp_pubsub_consumer_worker_terminate, #{reason => {error, Reason}, topic => _Topic}), ok; terminate(_Reason, _State) -> optvar:unset(?OPTVAR_SUB_OK(self())), @@ -329,7 +334,8 @@ ensure_pull_timer(State = #{pull_timer := TRef}) when is_reference(TRef) -> ensure_pull_timer(State = #{pull_retry_interval := PullRetryInterval}) -> State#{pull_timer := emqx_utils:start_timer(PullRetryInterval, pull)}. --spec ensure_subscription_exists(state()) -> continue | retry | not_found | already_exists. +-spec ensure_subscription_exists(state()) -> + continue | retry | not_found | permission_denied | already_exists. ensure_subscription_exists(State) -> ?tp(gcp_pubsub_consumer_worker_create_subscription_enter, #{}), #{ @@ -367,6 +373,17 @@ ensure_subscription_exists(State) -> } ), not_found; + {error, #{status_code := 403}} -> + %% permission denied + ?tp( + warning, + "gcp_pubsub_consumer_worker_permission_denied", + #{ + instance_id => InstanceId, + topic => Topic + } + ), + permission_denied; {ok, #{status_code := 200}} -> ?tp( debug, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index 74ee941ec..998a95a48 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -17,9 +17,9 @@ %% health check API -export([ - mark_topic_as_nonexistent/1, - unset_nonexistent_topic/1, - is_nonexistent_topic/1 + mark_as_unhealthy/2, + clear_unhealthy/1, + check_if_unhealthy/1 ]). -include_lib("emqx/include/logger.hrl"). @@ -47,11 +47,15 @@ -define(AUTO_RECONNECT_S, 2). -define(DEFAULT_FORGET_INTERVAL, timer:seconds(60)). --define(OPTVAR_TOPIC_NOT_FOUND(INSTANCE_ID), {?MODULE, topic_not_found, INSTANCE_ID}). +-define(OPTVAR_UNHEALTHY(INSTANCE_ID), {?MODULE, topic_not_found, INSTANCE_ID}). -define(TOPIC_MESSAGE, "GCP PubSub topics are invalid. Please check the logs, check if the " "topics exist in GCP and if the service account has permissions to use them." ). +-define(PERMISSION_MESSAGE, + "Permission denied while verifying topic existence. Please check that the " + "provided service account has the correct permissions configured." +). %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API @@ -77,7 +81,7 @@ on_start(InstanceId, Config0) -> -spec on_stop(resource_id(), state()) -> ok | {error, term()}. on_stop(InstanceId, _State) -> ?tp(gcp_pubsub_consumer_stop_enter, #{}), - unset_nonexistent_topic(InstanceId), + clear_unhealthy(InstanceId), ok = stop_consumers(InstanceId), emqx_bridge_gcp_pubsub_client:stop(InstanceId). @@ -85,10 +89,12 @@ on_stop(InstanceId, _State) -> on_get_status(InstanceId, State) -> %% We need to check this flag separately because the workers might be gone when we %% check them. - case is_nonexistent_topic(InstanceId) of - true -> + case check_if_unhealthy(InstanceId) of + {error, topic_not_found} -> {disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}}; - false -> + {error, permission_denied} -> + {disconnected, State, {unhealthy_target, ?PERMISSION_MESSAGE}}; + ok -> #{client := Client} = State, check_workers(InstanceId, Client) end. @@ -97,24 +103,24 @@ on_get_status(InstanceId, State) -> %% Health check API (signalled by consumer worker) %%------------------------------------------------------------------------------------------------- --spec mark_topic_as_nonexistent(resource_id()) -> ok. -mark_topic_as_nonexistent(InstanceId) -> - optvar:set(?OPTVAR_TOPIC_NOT_FOUND(InstanceId), true), +-spec mark_as_unhealthy(resource_id(), topic_not_found | permission_denied) -> ok. +mark_as_unhealthy(InstanceId, Reason) -> + optvar:set(?OPTVAR_UNHEALTHY(InstanceId), Reason), ok. --spec unset_nonexistent_topic(resource_id()) -> ok. -unset_nonexistent_topic(InstanceId) -> - optvar:unset(?OPTVAR_TOPIC_NOT_FOUND(InstanceId)), - ?tp(gcp_pubsub_consumer_unset_nonexistent_topic, #{}), +-spec clear_unhealthy(resource_id()) -> ok. +clear_unhealthy(InstanceId) -> + optvar:unset(?OPTVAR_UNHEALTHY(InstanceId)), + ?tp(gcp_pubsub_consumer_clear_unhealthy, #{}), ok. --spec is_nonexistent_topic(resource_id()) -> boolean(). -is_nonexistent_topic(InstanceId) -> - case optvar:peek(?OPTVAR_TOPIC_NOT_FOUND(InstanceId)) of - {ok, true} -> - true; - _ -> - false +-spec check_if_unhealthy(resource_id()) -> ok | {error, topic_not_found | permission_denied}. +check_if_unhealthy(InstanceId) -> + case optvar:peek(?OPTVAR_UNHEALTHY(InstanceId)) of + {ok, Reason} -> + {error, Reason}; + undefined -> + ok end. %%------------------------------------------------------------------------------------------------- @@ -153,6 +159,11 @@ start_consumers(InstanceId, Client, Config) -> throw( {unhealthy_target, ?TOPIC_MESSAGE} ); + {error, permission_denied} -> + _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId), + throw( + {unhealthy_target, ?PERMISSION_MESSAGE} + ); {error, _} -> %% connection might be down; we'll have to check topic existence during health %% check, or the workers will kill themselves when they realized there's no @@ -229,6 +240,8 @@ check_for_topic_existence(Topic, Client) -> ok; {error, #{status_code := 404}} -> {error, not_found}; + {error, #{status_code := 403}} -> + {error, permission_denied}; {error, Reason} -> ?tp(warning, "gcp_pubsub_consumer_check_topic_error", #{reason => Reason}), {error, Reason} diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index 8cb0ef2f9..681e5fed7 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -760,6 +760,64 @@ prop_acked_ids_eventually_forgotten(Trace) -> ), ok. +permission_denied_response() -> + Link = + <<"https://console.developers.google.com/project/9999/apiui/credential">>, + {error, #{ + status_code => 403, + headers => + [ + {<<"vary">>, <<"X-Origin">>}, + {<<"vary">>, <<"Referer">>}, + {<<"content-type">>, <<"application/json; charset=UTF-8">>}, + {<<"date">>, <<"Tue, 15 Aug 2023 13:59:09 GMT">>}, + {<<"server">>, <<"ESF">>}, + {<<"cache-control">>, <<"private">>}, + {<<"x-xss-protection">>, <<"0">>}, + {<<"x-frame-options">>, <<"SAMEORIGIN">>}, + {<<"x-content-type-options">>, <<"nosniff">>}, + {<<"alt-svc">>, <<"h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000">>}, + {<<"accept-ranges">>, <<"none">>}, + {<<"vary">>, <<"Origin,Accept-Encoding">>}, + {<<"transfer-encoding">>, <<"chunked">>} + ], + body => emqx_utils_json:encode( + #{ + <<"error">> => + #{ + <<"code">> => 403, + <<"details">> => + [ + #{ + <<"@type">> => <<"type.googleapis.com/google.rpc.Help">>, + <<"links">> => + [ + #{ + <<"description">> => + <<"Google developer console API key">>, + <<"url">> => + Link + } + ] + }, + #{ + <<"@type">> => <<"type.googleapis.com/google.rpc.ErrorInfo">>, + <<"domain">> => <<"googleapis.com">>, + <<"metadata">> => + #{ + <<"consumer">> => <<"projects/9999">>, + <<"service">> => <<"pubsub.googleapis.com">> + }, + <<"reason">> => <<"CONSUMER_INVALID">> + } + ], + <<"message">> => <<"Project #9999 has been deleted.">>, + <<"status">> => <<"PERMISSION_DENIED">> + } + } + ) + }}. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -785,7 +843,7 @@ t_start_stop(Config) -> prop_client_stopped(), prop_workers_stopped(PubSubTopic), fun(Trace) -> - ?assertMatch([_], ?of_kind(gcp_pubsub_consumer_unset_nonexistent_topic, Trace)), + ?assertMatch([_], ?of_kind(gcp_pubsub_consumer_clear_unhealthy, Trace)), ok end ] @@ -1992,6 +2050,81 @@ t_get_subscription(Config) -> ), ok. +t_permission_denied_topic_check(Config) -> + [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + %% the emulator does not check any credentials + emqx_common_test_helpers:with_mock( + emqx_bridge_gcp_pubsub_client, + query_sync, + fun(PreparedRequest = {prepared_request, {Method, Path, _Body}}, Client) -> + RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]), + case {Method =:= get, re:run(Path, RE)} of + {true, {match, _}} -> + permission_denied_response(); + _ -> + meck:passthrough([PreparedRequest, Client]) + end + end, + fun() -> + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := gcp_pubsub_stop}, + 5_000 + ), + ?assertMatch( + {ok, disconnected}, + emqx_resource_manager:health_check(ResourceId) + ), + ?assertMatch( + {ok, _Group, #{error := {unhealthy_target, "Permission denied" ++ _}}}, + emqx_resource_manager:lookup_cached(ResourceId) + ), + ok + end + ), + ok + end, + [] + ), + ok. + +t_permission_denied_worker(Config) -> + ?check_trace( + begin + emqx_common_test_helpers:with_mock( + emqx_bridge_gcp_pubsub_client, + query_sync, + fun(PreparedRequest = {prepared_request, {Method, _Path, _Body}}, Client) -> + case Method =:= put of + true -> + permission_denied_response(); + false -> + meck:passthrough([PreparedRequest, Client]) + end + end, + fun() -> + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge( + Config + ), + #{?snk_kind := gcp_pubsub_consumer_worker_terminate}, + 10_000 + ), + + ok + end + ), + ok + end, + [] + ), + ok. + t_cluster_subscription(Config) -> [ #{