diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index d984b42ed..291ace7e0 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -218,6 +218,8 @@ handle_continue(?ensure_subscription, State0) -> not_found -> %% there's nothing much to do if the topic suddenly doesn't exist anymore. {stop, {error, topic_not_found}, State0}; + bad_credentials -> + {stop, {error, bad_credentials}, State0}; permission_denied -> {stop, {error, permission_denied}, State0} end; @@ -295,6 +297,7 @@ handle_info(Msg, State0) -> terminate({error, Reason}, State) when Reason =:= topic_not_found; + Reason =:= bad_credentials; Reason =:= permission_denied -> #{ @@ -335,7 +338,7 @@ ensure_pull_timer(State = #{pull_retry_interval := PullRetryInterval}) -> State#{pull_timer := emqx_utils:start_timer(PullRetryInterval, pull)}. -spec ensure_subscription_exists(state()) -> - continue | retry | not_found | permission_denied | already_exists. + continue | retry | not_found | permission_denied | bad_credentials | already_exists. ensure_subscription_exists(State) -> ?tp(gcp_pubsub_consumer_worker_create_subscription_enter, #{}), #{ @@ -384,6 +387,17 @@ ensure_subscription_exists(State) -> } ), permission_denied; + {error, #{status_code := 401}} -> + %% bad credentials + ?tp( + warning, + "gcp_pubsub_consumer_worker_bad_credentials", + #{ + instance_id => InstanceId, + topic => Topic + } + ), + bad_credentials; {ok, #{status_code := 200}} -> ?tp( debug, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index 998a95a48..5c726ef9b 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -94,6 +94,8 @@ on_get_status(InstanceId, State) -> {disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}}; {error, permission_denied} -> {disconnected, State, {unhealthy_target, ?PERMISSION_MESSAGE}}; + {error, bad_credentials} -> + {disconnected, State, {unhealthy_target, ?PERMISSION_MESSAGE}}; ok -> #{client := Client} = State, check_workers(InstanceId, Client) @@ -103,7 +105,12 @@ on_get_status(InstanceId, State) -> %% Health check API (signalled by consumer worker) %%------------------------------------------------------------------------------------------------- --spec mark_as_unhealthy(resource_id(), topic_not_found | permission_denied) -> ok. +-spec mark_as_unhealthy( + resource_id(), + topic_not_found + | permission_denied + | bad_credentials +) -> ok. mark_as_unhealthy(InstanceId, Reason) -> optvar:set(?OPTVAR_UNHEALTHY(InstanceId), Reason), ok. @@ -114,7 +121,12 @@ clear_unhealthy(InstanceId) -> ?tp(gcp_pubsub_consumer_clear_unhealthy, #{}), ok. --spec check_if_unhealthy(resource_id()) -> ok | {error, topic_not_found | permission_denied}. +-spec check_if_unhealthy(resource_id()) -> + ok + | {error, + topic_not_found + | permission_denied + | bad_credentials}. check_if_unhealthy(InstanceId) -> case optvar:peek(?OPTVAR_UNHEALTHY(InstanceId)) of {ok, Reason} -> @@ -164,6 +176,11 @@ start_consumers(InstanceId, Client, Config) -> throw( {unhealthy_target, ?PERMISSION_MESSAGE} ); + {error, bad_credentials} -> + _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId), + throw( + {unhealthy_target, ?PERMISSION_MESSAGE} + ); {error, _} -> %% connection might be down; we'll have to check topic existence during health %% check, or the workers will kill themselves when they realized there's no @@ -242,6 +259,8 @@ check_for_topic_existence(Topic, Client) -> {error, not_found}; {error, #{status_code := 403}} -> {error, permission_denied}; + {error, #{status_code := 401}} -> + {error, bad_credentials}; {error, Reason} -> ?tp(warning, "gcp_pubsub_consumer_check_topic_error", #{reason => Reason}), {error, Reason} diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index 681e5fed7..8dc6cd7c4 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -818,6 +818,61 @@ permission_denied_response() -> ) }}. +unauthenticated_response() -> + Msg = << + "Request had invalid authentication credentials. Expected OAuth 2 access token," + " login cookie or other valid authentication credential. " + "See https://developers.google.com/identity/sign-in/web/devconsole-project." + >>, + {error, #{ + body => + #{ + <<"error">> => + #{ + <<"code">> => 401, + <<"details">> => + [ + #{ + <<"@type">> => + <<"type.googleapis.com/google.rpc.ErrorInfo">>, + <<"domain">> => <<"googleapis.com">>, + <<"metadata">> => + #{ + <<"email">> => + <<"test-516@emqx-cloud-pubsub.iam.gserviceaccount.com">>, + <<"method">> => + <<"google.pubsub.v1.Publisher.CreateTopic">>, + <<"service">> => + <<"pubsub.googleapis.com">> + }, + <<"reason">> => <<"ACCOUNT_STATE_INVALID">> + } + ], + <<"message">> => Msg, + + <<"status">> => <<"UNAUTHENTICATED">> + } + }, + headers => + [ + {<<"www-authenticate">>, <<"Bearer realm=\"https://accounts.google.com/\"">>}, + {<<"vary">>, <<"X-Origin">>}, + {<<"vary">>, <<"Referer">>}, + {<<"content-type">>, <<"application/json; charset=UTF-8">>}, + {<<"date">>, <<"Wed, 23 Aug 2023 12:41:40 GMT">>}, + {<<"server">>, <<"ESF">>}, + {<<"cache-control">>, <<"private">>}, + {<<"x-xss-protection">>, <<"0">>}, + {<<"x-frame-options">>, <<"SAMEORIGIN">>}, + {<<"x-content-type-options">>, <<"nosniff">>}, + {<<"alt-svc">>, <<"h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000">>}, + {<<"accept-ranges">>, <<"none">>}, + {<<"vary">>, <<"Origin,Accept-Encoding">>}, + {<<"transfer-encoding">>, <<"chunked">>} + ], + status_code => 401 + }}. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -2125,6 +2180,81 @@ t_permission_denied_worker(Config) -> ), ok. +t_unauthenticated_topic_check(Config) -> + [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + %% the emulator does not check any credentials + emqx_common_test_helpers:with_mock( + emqx_bridge_gcp_pubsub_client, + query_sync, + fun(PreparedRequest = {prepared_request, {Method, Path, _Body}}, Client) -> + RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]), + case {Method =:= get, re:run(Path, RE)} of + {true, {match, _}} -> + unauthenticated_response(); + _ -> + meck:passthrough([PreparedRequest, Client]) + end + end, + fun() -> + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := gcp_pubsub_stop}, + 5_000 + ), + ?assertMatch( + {ok, disconnected}, + emqx_resource_manager:health_check(ResourceId) + ), + ?assertMatch( + {ok, _Group, #{error := {unhealthy_target, "Permission denied" ++ _}}}, + emqx_resource_manager:lookup_cached(ResourceId) + ), + ok + end + ), + ok + end, + [] + ), + ok. + +t_unauthenticated_worker(Config) -> + ?check_trace( + begin + emqx_common_test_helpers:with_mock( + emqx_bridge_gcp_pubsub_client, + query_sync, + fun(PreparedRequest = {prepared_request, {Method, _Path, _Body}}, Client) -> + case Method =:= put of + true -> + unauthenticated_response(); + false -> + meck:passthrough([PreparedRequest, Client]) + end + end, + fun() -> + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge( + Config + ), + #{?snk_kind := gcp_pubsub_consumer_worker_terminate}, + 10_000 + ), + + ok + end + ), + ok + end, + [] + ), + ok. + t_cluster_subscription(Config) -> [ #{