Merge pull request #11448 from thalesmg/gcp-consumer-403-20230815
fix(gcp_consumer): handle 403 responses
This commit is contained in:
commit
029b461a13
|
@ -205,7 +205,7 @@ get_topic(Topic, ConnectorState) ->
|
||||||
Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
|
Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
|
||||||
Body = <<>>,
|
Body = <<>>,
|
||||||
PreparedRequest = {prepared_request, {Method, Path, Body}},
|
PreparedRequest = {prepared_request, {Method, Path, Body}},
|
||||||
query_sync(PreparedRequest, ConnectorState).
|
?MODULE:query_sync(PreparedRequest, ConnectorState).
|
||||||
|
|
||||||
%%-------------------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------------------
|
||||||
%% Helper fns
|
%% Helper fns
|
||||||
|
|
|
@ -217,7 +217,9 @@ handle_continue(?ensure_subscription, State0) ->
|
||||||
{noreply, State0, {continue, ?ensure_subscription}};
|
{noreply, State0, {continue, ?ensure_subscription}};
|
||||||
not_found ->
|
not_found ->
|
||||||
%% there's nothing much to do if the topic suddenly doesn't exist anymore.
|
%% there's nothing much to do if the topic suddenly doesn't exist anymore.
|
||||||
{stop, {error, topic_not_found}, State0}
|
{stop, {error, topic_not_found}, State0};
|
||||||
|
permission_denied ->
|
||||||
|
{stop, {error, permission_denied}, State0}
|
||||||
end;
|
end;
|
||||||
handle_continue(?patch_subscription, State0) ->
|
handle_continue(?patch_subscription, State0) ->
|
||||||
?tp(gcp_pubsub_consumer_worker_patch_subscription_enter, #{}),
|
?tp(gcp_pubsub_consumer_worker_patch_subscription_enter, #{}),
|
||||||
|
@ -291,14 +293,17 @@ handle_info(Msg, State0) ->
|
||||||
}),
|
}),
|
||||||
{noreply, State0}.
|
{noreply, State0}.
|
||||||
|
|
||||||
terminate({error, topic_not_found} = _Reason, State) ->
|
terminate({error, Reason}, State) when
|
||||||
|
Reason =:= topic_not_found;
|
||||||
|
Reason =:= permission_denied
|
||||||
|
->
|
||||||
#{
|
#{
|
||||||
instance_id := InstanceId,
|
instance_id := InstanceId,
|
||||||
topic := _Topic
|
topic := _Topic
|
||||||
} = State,
|
} = State,
|
||||||
optvar:unset(?OPTVAR_SUB_OK(self())),
|
optvar:unset(?OPTVAR_SUB_OK(self())),
|
||||||
emqx_bridge_gcp_pubsub_impl_consumer:mark_topic_as_nonexistent(InstanceId),
|
emqx_bridge_gcp_pubsub_impl_consumer:mark_as_unhealthy(InstanceId, Reason),
|
||||||
?tp(gcp_pubsub_consumer_worker_terminate, #{reason => _Reason, topic => _Topic}),
|
?tp(gcp_pubsub_consumer_worker_terminate, #{reason => {error, Reason}, topic => _Topic}),
|
||||||
ok;
|
ok;
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
optvar:unset(?OPTVAR_SUB_OK(self())),
|
optvar:unset(?OPTVAR_SUB_OK(self())),
|
||||||
|
@ -329,7 +334,8 @@ ensure_pull_timer(State = #{pull_timer := TRef}) when is_reference(TRef) ->
|
||||||
ensure_pull_timer(State = #{pull_retry_interval := PullRetryInterval}) ->
|
ensure_pull_timer(State = #{pull_retry_interval := PullRetryInterval}) ->
|
||||||
State#{pull_timer := emqx_utils:start_timer(PullRetryInterval, pull)}.
|
State#{pull_timer := emqx_utils:start_timer(PullRetryInterval, pull)}.
|
||||||
|
|
||||||
-spec ensure_subscription_exists(state()) -> continue | retry | not_found | already_exists.
|
-spec ensure_subscription_exists(state()) ->
|
||||||
|
continue | retry | not_found | permission_denied | already_exists.
|
||||||
ensure_subscription_exists(State) ->
|
ensure_subscription_exists(State) ->
|
||||||
?tp(gcp_pubsub_consumer_worker_create_subscription_enter, #{}),
|
?tp(gcp_pubsub_consumer_worker_create_subscription_enter, #{}),
|
||||||
#{
|
#{
|
||||||
|
@ -367,6 +373,17 @@ ensure_subscription_exists(State) ->
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
not_found;
|
not_found;
|
||||||
|
{error, #{status_code := 403}} ->
|
||||||
|
%% permission denied
|
||||||
|
?tp(
|
||||||
|
warning,
|
||||||
|
"gcp_pubsub_consumer_worker_permission_denied",
|
||||||
|
#{
|
||||||
|
instance_id => InstanceId,
|
||||||
|
topic => Topic
|
||||||
|
}
|
||||||
|
),
|
||||||
|
permission_denied;
|
||||||
{ok, #{status_code := 200}} ->
|
{ok, #{status_code := 200}} ->
|
||||||
?tp(
|
?tp(
|
||||||
debug,
|
debug,
|
||||||
|
|
|
@ -17,9 +17,9 @@
|
||||||
|
|
||||||
%% health check API
|
%% health check API
|
||||||
-export([
|
-export([
|
||||||
mark_topic_as_nonexistent/1,
|
mark_as_unhealthy/2,
|
||||||
unset_nonexistent_topic/1,
|
clear_unhealthy/1,
|
||||||
is_nonexistent_topic/1
|
check_if_unhealthy/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
@ -47,11 +47,15 @@
|
||||||
|
|
||||||
-define(AUTO_RECONNECT_S, 2).
|
-define(AUTO_RECONNECT_S, 2).
|
||||||
-define(DEFAULT_FORGET_INTERVAL, timer:seconds(60)).
|
-define(DEFAULT_FORGET_INTERVAL, timer:seconds(60)).
|
||||||
-define(OPTVAR_TOPIC_NOT_FOUND(INSTANCE_ID), {?MODULE, topic_not_found, INSTANCE_ID}).
|
-define(OPTVAR_UNHEALTHY(INSTANCE_ID), {?MODULE, topic_not_found, INSTANCE_ID}).
|
||||||
-define(TOPIC_MESSAGE,
|
-define(TOPIC_MESSAGE,
|
||||||
"GCP PubSub topics are invalid. Please check the logs, check if the "
|
"GCP PubSub topics are invalid. Please check the logs, check if the "
|
||||||
"topics exist in GCP and if the service account has permissions to use them."
|
"topics exist in GCP and if the service account has permissions to use them."
|
||||||
).
|
).
|
||||||
|
-define(PERMISSION_MESSAGE,
|
||||||
|
"Permission denied while verifying topic existence. Please check that the "
|
||||||
|
"provided service account has the correct permissions configured."
|
||||||
|
).
|
||||||
|
|
||||||
%%-------------------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------------------
|
||||||
%% `emqx_resource' API
|
%% `emqx_resource' API
|
||||||
|
@ -77,7 +81,7 @@ on_start(InstanceId, Config0) ->
|
||||||
-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
|
-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
|
||||||
on_stop(InstanceId, _State) ->
|
on_stop(InstanceId, _State) ->
|
||||||
?tp(gcp_pubsub_consumer_stop_enter, #{}),
|
?tp(gcp_pubsub_consumer_stop_enter, #{}),
|
||||||
unset_nonexistent_topic(InstanceId),
|
clear_unhealthy(InstanceId),
|
||||||
ok = stop_consumers(InstanceId),
|
ok = stop_consumers(InstanceId),
|
||||||
emqx_bridge_gcp_pubsub_client:stop(InstanceId).
|
emqx_bridge_gcp_pubsub_client:stop(InstanceId).
|
||||||
|
|
||||||
|
@ -85,10 +89,12 @@ on_stop(InstanceId, _State) ->
|
||||||
on_get_status(InstanceId, State) ->
|
on_get_status(InstanceId, State) ->
|
||||||
%% We need to check this flag separately because the workers might be gone when we
|
%% We need to check this flag separately because the workers might be gone when we
|
||||||
%% check them.
|
%% check them.
|
||||||
case is_nonexistent_topic(InstanceId) of
|
case check_if_unhealthy(InstanceId) of
|
||||||
true ->
|
{error, topic_not_found} ->
|
||||||
{disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}};
|
{disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}};
|
||||||
false ->
|
{error, permission_denied} ->
|
||||||
|
{disconnected, State, {unhealthy_target, ?PERMISSION_MESSAGE}};
|
||||||
|
ok ->
|
||||||
#{client := Client} = State,
|
#{client := Client} = State,
|
||||||
check_workers(InstanceId, Client)
|
check_workers(InstanceId, Client)
|
||||||
end.
|
end.
|
||||||
|
@ -97,24 +103,24 @@ on_get_status(InstanceId, State) ->
|
||||||
%% Health check API (signalled by consumer worker)
|
%% Health check API (signalled by consumer worker)
|
||||||
%%-------------------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
-spec mark_topic_as_nonexistent(resource_id()) -> ok.
|
-spec mark_as_unhealthy(resource_id(), topic_not_found | permission_denied) -> ok.
|
||||||
mark_topic_as_nonexistent(InstanceId) ->
|
mark_as_unhealthy(InstanceId, Reason) ->
|
||||||
optvar:set(?OPTVAR_TOPIC_NOT_FOUND(InstanceId), true),
|
optvar:set(?OPTVAR_UNHEALTHY(InstanceId), Reason),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec unset_nonexistent_topic(resource_id()) -> ok.
|
-spec clear_unhealthy(resource_id()) -> ok.
|
||||||
unset_nonexistent_topic(InstanceId) ->
|
clear_unhealthy(InstanceId) ->
|
||||||
optvar:unset(?OPTVAR_TOPIC_NOT_FOUND(InstanceId)),
|
optvar:unset(?OPTVAR_UNHEALTHY(InstanceId)),
|
||||||
?tp(gcp_pubsub_consumer_unset_nonexistent_topic, #{}),
|
?tp(gcp_pubsub_consumer_clear_unhealthy, #{}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec is_nonexistent_topic(resource_id()) -> boolean().
|
-spec check_if_unhealthy(resource_id()) -> ok | {error, topic_not_found | permission_denied}.
|
||||||
is_nonexistent_topic(InstanceId) ->
|
check_if_unhealthy(InstanceId) ->
|
||||||
case optvar:peek(?OPTVAR_TOPIC_NOT_FOUND(InstanceId)) of
|
case optvar:peek(?OPTVAR_UNHEALTHY(InstanceId)) of
|
||||||
{ok, true} ->
|
{ok, Reason} ->
|
||||||
true;
|
{error, Reason};
|
||||||
_ ->
|
undefined ->
|
||||||
false
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%-------------------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------------------
|
||||||
|
@ -153,6 +159,11 @@ start_consumers(InstanceId, Client, Config) ->
|
||||||
throw(
|
throw(
|
||||||
{unhealthy_target, ?TOPIC_MESSAGE}
|
{unhealthy_target, ?TOPIC_MESSAGE}
|
||||||
);
|
);
|
||||||
|
{error, permission_denied} ->
|
||||||
|
_ = emqx_bridge_gcp_pubsub_client:stop(InstanceId),
|
||||||
|
throw(
|
||||||
|
{unhealthy_target, ?PERMISSION_MESSAGE}
|
||||||
|
);
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
%% connection might be down; we'll have to check topic existence during health
|
%% connection might be down; we'll have to check topic existence during health
|
||||||
%% check, or the workers will kill themselves when they realized there's no
|
%% check, or the workers will kill themselves when they realized there's no
|
||||||
|
@ -229,6 +240,8 @@ check_for_topic_existence(Topic, Client) ->
|
||||||
ok;
|
ok;
|
||||||
{error, #{status_code := 404}} ->
|
{error, #{status_code := 404}} ->
|
||||||
{error, not_found};
|
{error, not_found};
|
||||||
|
{error, #{status_code := 403}} ->
|
||||||
|
{error, permission_denied};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?tp(warning, "gcp_pubsub_consumer_check_topic_error", #{reason => Reason}),
|
?tp(warning, "gcp_pubsub_consumer_check_topic_error", #{reason => Reason}),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
|
|
|
@ -760,6 +760,64 @@ prop_acked_ids_eventually_forgotten(Trace) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
permission_denied_response() ->
|
||||||
|
Link =
|
||||||
|
<<"https://console.developers.google.com/project/9999/apiui/credential">>,
|
||||||
|
{error, #{
|
||||||
|
status_code => 403,
|
||||||
|
headers =>
|
||||||
|
[
|
||||||
|
{<<"vary">>, <<"X-Origin">>},
|
||||||
|
{<<"vary">>, <<"Referer">>},
|
||||||
|
{<<"content-type">>, <<"application/json; charset=UTF-8">>},
|
||||||
|
{<<"date">>, <<"Tue, 15 Aug 2023 13:59:09 GMT">>},
|
||||||
|
{<<"server">>, <<"ESF">>},
|
||||||
|
{<<"cache-control">>, <<"private">>},
|
||||||
|
{<<"x-xss-protection">>, <<"0">>},
|
||||||
|
{<<"x-frame-options">>, <<"SAMEORIGIN">>},
|
||||||
|
{<<"x-content-type-options">>, <<"nosniff">>},
|
||||||
|
{<<"alt-svc">>, <<"h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000">>},
|
||||||
|
{<<"accept-ranges">>, <<"none">>},
|
||||||
|
{<<"vary">>, <<"Origin,Accept-Encoding">>},
|
||||||
|
{<<"transfer-encoding">>, <<"chunked">>}
|
||||||
|
],
|
||||||
|
body => emqx_utils_json:encode(
|
||||||
|
#{
|
||||||
|
<<"error">> =>
|
||||||
|
#{
|
||||||
|
<<"code">> => 403,
|
||||||
|
<<"details">> =>
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"@type">> => <<"type.googleapis.com/google.rpc.Help">>,
|
||||||
|
<<"links">> =>
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"description">> =>
|
||||||
|
<<"Google developer console API key">>,
|
||||||
|
<<"url">> =>
|
||||||
|
Link
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
<<"@type">> => <<"type.googleapis.com/google.rpc.ErrorInfo">>,
|
||||||
|
<<"domain">> => <<"googleapis.com">>,
|
||||||
|
<<"metadata">> =>
|
||||||
|
#{
|
||||||
|
<<"consumer">> => <<"projects/9999">>,
|
||||||
|
<<"service">> => <<"pubsub.googleapis.com">>
|
||||||
|
},
|
||||||
|
<<"reason">> => <<"CONSUMER_INVALID">>
|
||||||
|
}
|
||||||
|
],
|
||||||
|
<<"message">> => <<"Project #9999 has been deleted.">>,
|
||||||
|
<<"status">> => <<"PERMISSION_DENIED">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Testcases
|
%% Testcases
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -785,7 +843,7 @@ t_start_stop(Config) ->
|
||||||
prop_client_stopped(),
|
prop_client_stopped(),
|
||||||
prop_workers_stopped(PubSubTopic),
|
prop_workers_stopped(PubSubTopic),
|
||||||
fun(Trace) ->
|
fun(Trace) ->
|
||||||
?assertMatch([_], ?of_kind(gcp_pubsub_consumer_unset_nonexistent_topic, Trace)),
|
?assertMatch([_], ?of_kind(gcp_pubsub_consumer_clear_unhealthy, Trace)),
|
||||||
ok
|
ok
|
||||||
end
|
end
|
||||||
]
|
]
|
||||||
|
@ -1992,6 +2050,81 @@ t_get_subscription(Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_permission_denied_topic_check(Config) ->
|
||||||
|
[#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config),
|
||||||
|
ResourceId = resource_id(Config),
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
%% the emulator does not check any credentials
|
||||||
|
emqx_common_test_helpers:with_mock(
|
||||||
|
emqx_bridge_gcp_pubsub_client,
|
||||||
|
query_sync,
|
||||||
|
fun(PreparedRequest = {prepared_request, {Method, Path, _Body}}, Client) ->
|
||||||
|
RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]),
|
||||||
|
case {Method =:= get, re:run(Path, RE)} of
|
||||||
|
{true, {match, _}} ->
|
||||||
|
permission_denied_response();
|
||||||
|
_ ->
|
||||||
|
meck:passthrough([PreparedRequest, Client])
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
fun() ->
|
||||||
|
{{ok, _}, {ok, _}} =
|
||||||
|
?wait_async_action(
|
||||||
|
create_bridge(Config),
|
||||||
|
#{?snk_kind := gcp_pubsub_stop},
|
||||||
|
5_000
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, disconnected},
|
||||||
|
emqx_resource_manager:health_check(ResourceId)
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _Group, #{error := {unhealthy_target, "Permission denied" ++ _}}},
|
||||||
|
emqx_resource_manager:lookup_cached(ResourceId)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_permission_denied_worker(Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
emqx_common_test_helpers:with_mock(
|
||||||
|
emqx_bridge_gcp_pubsub_client,
|
||||||
|
query_sync,
|
||||||
|
fun(PreparedRequest = {prepared_request, {Method, _Path, _Body}}, Client) ->
|
||||||
|
case Method =:= put of
|
||||||
|
true ->
|
||||||
|
permission_denied_response();
|
||||||
|
false ->
|
||||||
|
meck:passthrough([PreparedRequest, Client])
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
fun() ->
|
||||||
|
{{ok, _}, {ok, _}} =
|
||||||
|
?wait_async_action(
|
||||||
|
create_bridge(
|
||||||
|
Config
|
||||||
|
),
|
||||||
|
#{?snk_kind := gcp_pubsub_consumer_worker_terminate},
|
||||||
|
10_000
|
||||||
|
),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_cluster_subscription(Config) ->
|
t_cluster_subscription(Config) ->
|
||||||
[
|
[
|
||||||
#{
|
#{
|
||||||
|
|
Loading…
Reference in New Issue