Merge pull request #12656 from thalesmg/fix-gprodu-status-r56-20240306

fix(gcp_pubsub_producer): check for topic existence when creating action
This commit is contained in:
Thales Macedo Garitezi 2024-03-06 15:28:51 -03:00 committed by GitHub
commit b421363661
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 349 additions and 71 deletions

View File

@ -198,13 +198,13 @@ get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) ->
%%-------------------------------------------------------------------------------------------------
-spec get_topic(topic(), state(), request_opts()) -> {ok, map()} | {error, term()}.
get_topic(Topic, ConnectorState, ReqOpts) ->
#{project_id := ProjectId} = ConnectorState,
get_topic(Topic, ClientState, ReqOpts) ->
#{project_id := ProjectId} = ClientState,
Method = get,
Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
Body = <<>>,
PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
?MODULE:query_sync(PreparedRequest, ConnectorState).
?MODULE:query_sync(PreparedRequest, ClientState).
%%-------------------------------------------------------------------------------------------------
%% Helper fns

View File

@ -186,10 +186,14 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, ConnectorState) ->
{ok, connector_state()}.
on_add_channel(_ConnectorResId, ConnectorState0, ActionId, ActionConfig) ->
#{installed_actions := InstalledActions0} = ConnectorState0,
ChannelState = install_channel(ActionConfig),
case install_channel(ActionConfig, ConnectorState0) of
{ok, ChannelState} ->
InstalledActions = InstalledActions0#{ActionId => ChannelState},
ConnectorState = ConnectorState0#{installed_actions := InstalledActions},
{ok, ConnectorState}.
{ok, ConnectorState};
Error = {error, _} ->
Error
end.
-spec on_remove_channel(
connector_resource_id(),
@ -218,8 +222,7 @@ on_get_channel_status(_ConnectorResId, _ChannelId, _ConnectorState) ->
%% Helper fns
%%-------------------------------------------------------------------------------------------------
%% TODO: check if topic exists ("unhealthy target")
install_channel(ActionConfig) ->
install_channel(ActionConfig, ConnectorState) ->
#{
parameters := #{
attributes_template := AttributesTemplate,
@ -231,13 +234,27 @@ install_channel(ActionConfig) ->
request_ttl := RequestTTL
}
} = ActionConfig,
#{
#{client := Client} = ConnectorState,
case
emqx_bridge_gcp_pubsub_client:get_topic(PubSubTopic, Client, #{request_ttl => RequestTTL})
of
{error, #{status_code := 404}} ->
{error, {unhealthy_target, <<"Topic does not exist">>}};
{error, #{status_code := 403}} ->
{error, {unhealthy_target, <<"Permission denied for topic">>}};
{error, #{status_code := 401}} ->
{error, {unhealthy_target, <<"Bad credentials">>}};
{error, Reason} ->
{error, Reason};
{ok, _} ->
{ok, #{
attributes_template => preproc_attributes(AttributesTemplate),
ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate),
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
pubsub_topic => PubSubTopic,
request_ttl => RequestTTL
}.
}}
end.
-spec do_send_requests_sync(
connector_state(),

View File

@ -76,6 +76,7 @@ only_sync_tests() ->
[t_query_sync].
init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Apps = emqx_cth_suite:start(
[
emqx,
@ -257,6 +258,16 @@ create_rule_and_action_http(Config) ->
success_http_handler() ->
TestPid = self(),
fun(Req0, State) ->
case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of
{<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} ->
Rep = cowboy_req:reply(
200,
#{<<"content-type">> => <<"application/json">>},
<<"{}">>,
Req0
),
{ok, Rep, State};
_ ->
{ok, Body, Req} = cowboy_req:read_body(Req0),
TestPid ! {http, cowboy_req:headers(Req), Body},
Rep = cowboy_req:reply(
@ -266,11 +277,12 @@ success_http_handler() ->
Req
),
{ok, Rep, State}
end
end.
start_echo_http_server() ->
HTTPHost = "localhost",
HTTPPath = <<"/v1/projects/myproject/topics/mytopic:publish">>,
HTTPPath = '_',
ServerSSLOpts =
[
{verify, verify_none},
@ -656,6 +668,20 @@ wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) ->
error({timeout_waiting_for_telemetry, EventName})
end.
kill_gun_process(EhttpcPid) ->
State = ehttpc:get_state(EhttpcPid, minimal),
GunPid = maps:get(client, State),
true = is_pid(GunPid),
_ = exit(GunPid, kill),
ok.
kill_gun_processes(ConnectorResourceId) ->
Pool = ehttpc:workers(ConnectorResourceId),
Workers = lists:map(fun({_, Pid}) -> Pid end, Pool),
%% assert there is at least one pool member
?assertMatch([_ | _], Workers),
lists:foreach(fun(Pid) -> kill_gun_process(Pid) end, Workers).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -1343,6 +1369,16 @@ t_failure_with_body(Config) ->
TestPid = self(),
FailureWithBodyHandler =
fun(Req0, State) ->
case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of
{<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} ->
Rep = cowboy_req:reply(
200,
#{<<"content-type">> => <<"application/json">>},
<<"{}">>,
Req0
),
{ok, Rep, State};
_ ->
{ok, Body, Req} = cowboy_req:read_body(Req0),
TestPid ! {http, cowboy_req:headers(Req), Body},
Rep = cowboy_req:reply(
@ -1352,6 +1388,7 @@ t_failure_with_body(Config) ->
Req
),
{ok, Rep, State}
end
end,
ok = emqx_bridge_http_connector_test_server:set_handler(FailureWithBodyHandler),
Topic = <<"t/topic">>,
@ -1381,6 +1418,16 @@ t_failure_no_body(Config) ->
TestPid = self(),
FailureNoBodyHandler =
fun(Req0, State) ->
case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of
{<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} ->
Rep = cowboy_req:reply(
200,
#{<<"content-type">> => <<"application/json">>},
<<"{}">>,
Req0
),
{ok, Rep, State};
_ ->
{ok, Body, Req} = cowboy_req:read_body(Req0),
TestPid ! {http, cowboy_req:headers(Req), Body},
Rep = cowboy_req:reply(
@ -1390,6 +1437,7 @@ t_failure_no_body(Config) ->
Req
),
{ok, Rep, State}
end
end,
ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler),
Topic = <<"t/topic">>,
@ -1415,20 +1463,6 @@ t_failure_no_body(Config) ->
),
ok.
kill_gun_process(EhttpcPid) ->
State = ehttpc:get_state(EhttpcPid, minimal),
GunPid = maps:get(client, State),
true = is_pid(GunPid),
_ = exit(GunPid, kill),
ok.
kill_gun_processes(ConnectorResourceId) ->
Pool = ehttpc:workers(ConnectorResourceId),
Workers = lists:map(fun({_, Pid}) -> Pid end, Pool),
%% assert there is at least one pool member
?assertMatch([_ | _], Workers),
lists:foreach(fun(Pid) -> kill_gun_process(Pid) end, Workers).
t_unrecoverable_error(Config) ->
ActionResourceId = ?config(action_resource_id, Config),
ConnectorResourceId = ?config(connector_resource_id, Config),
@ -1436,6 +1470,16 @@ t_unrecoverable_error(Config) ->
TestPid = self(),
FailureNoBodyHandler =
fun(Req0, State) ->
case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of
{<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} ->
Rep = cowboy_req:reply(
200,
#{<<"content-type">> => <<"application/json">>},
<<"{}">>,
Req0
),
{ok, Rep, State};
_ ->
{ok, Body, Req} = cowboy_req:read_body(Req0),
TestPid ! {http, cowboy_req:headers(Req), Body},
%% kill the gun process while it's waiting for the
@ -1449,6 +1493,7 @@ t_unrecoverable_error(Config) ->
Req
),
{ok, Rep, State}
end
end,
ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler),
Topic = <<"t/topic">>,

View File

@ -0,0 +1,215 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_v2_gcp_pubsub_producer_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_producer">>).
-define(ACTION_TYPE_BIN, <<"gcp_pubsub_producer">>).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
emqx_bridge_gcp_pubsub_consumer_SUITE:init_per_suite(Config).
end_per_suite(Config) ->
emqx_bridge_gcp_pubsub_consumer_SUITE:end_per_suite(Config).
init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config).
common_init_per_testcase(TestCase, Config0) ->
ct:timetrap(timer:seconds(60)),
ServiceAccountJSON =
#{<<"project_id">> := ProjectId} =
emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>,
ConnectorConfig = connector_config(Name, ServiceAccountJSON),
PubsubTopic = Name,
ActionConfig = action_config(#{
connector => Name,
parameters => #{pubsub_topic => PubsubTopic}
}),
Config = [
{bridge_kind, action},
{action_type, ?ACTION_TYPE_BIN},
{action_name, Name},
{action_config, ActionConfig},
{connector_name, Name},
{connector_type, ?CONNECTOR_TYPE_BIN},
{connector_config, ConnectorConfig},
{service_account_json, ServiceAccountJSON},
{project_id, ProjectId},
{pubsub_topic, PubsubTopic}
| Config0
],
ok = emqx_bridge_gcp_pubsub_consumer_SUITE:ensure_topic(Config, PubsubTopic),
Config.
end_per_testcase(_Testcase, Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
connector_config(Name, ServiceAccountJSON) ->
InnerConfigMap0 =
#{
<<"enable">> => true,
<<"tags">> => [<<"bridge">>],
<<"description">> => <<"my cool bridge">>,
<<"connect_timeout">> => <<"5s">>,
<<"pool_size">> => 8,
<<"pipelining">> => <<"100">>,
<<"max_retries">> => <<"2">>,
<<"service_account_json">> => ServiceAccountJSON,
<<"resource_opts">> =>
#{
<<"health_check_interval">> => <<"1s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
},
emqx_bridge_v2_testlib:parse_and_check_connector(?ACTION_TYPE_BIN, Name, InnerConfigMap0).
action_config(Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
CommonConfig =
#{
<<"enable">> => true,
<<"connector">> => <<"please override">>,
<<"parameters">> =>
#{
<<"pubsub_topic">> => <<"please override">>
},
<<"resource_opts">> => #{
<<"batch_size">> => 1,
<<"batch_time">> => <<"0ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"15s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"45s">>,
<<"resume_interval">> => <<"15s">>,
<<"worker_pool_size">> => <<"1">>
}
},
maps:merge(CommonConfig, Overrides).
assert_persisted_service_account_json_is_binary(ConnectorName) ->
%% ensure cluster.hocon has a binary encoded json string as the value
{ok, Hocon} = hocon:files([application:get_env(emqx, cluster_hocon_file, undefined)]),
?assertMatch(
Bin when is_binary(Bin),
emqx_utils_maps:deep_get(
[
<<"connectors">>,
<<"gcp_pubsub_producer">>,
ConnectorName,
<<"service_account_json">>
],
Hocon
)
),
ok.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_start_stop(Config) ->
ok = emqx_bridge_v2_testlib:t_start_stop(Config, gcp_pubsub_stop),
ok.
t_create_via_http(Config) ->
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.
t_create_via_http_json_object_service_account(Config0) ->
%% After the config goes through the roundtrip with `hocon_tconf:check_plain', service
%% account json comes back as a binary even if the input is a json object.
ConnectorName = ?config(connector_name, Config0),
ConnConfig0 = ?config(connector_config, Config0),
Config1 = proplists:delete(connector_config, Config0),
ConnConfig1 = maps:update_with(
<<"service_account_json">>,
fun(X) ->
?assert(is_binary(X), #{json => X}),
JSON = emqx_utils_json:decode(X, [return_maps]),
?assert(is_map(JSON)),
JSON
end,
ConnConfig0
),
Config = [{connector_config, ConnConfig1} | Config1],
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
assert_persisted_service_account_json_is_binary(ConnectorName),
ok.
%% Check that creating an action (V2) with a non-existent topic leads returns an error.
t_bad_topic(Config) ->
?check_trace(
begin
%% Should it really be 201 here?
?assertMatch(
{ok, {{_, 201, _}, _, #{}}},
emqx_bridge_v2_testlib:create_bridge_api(
Config,
#{<<"parameters">> => #{<<"pubsub_topic">> => <<"i-dont-exist">>}}
)
),
#{
kind := Kind,
type := Type,
name := Name
} = emqx_bridge_v2_testlib:get_common_values(Config),
ActionConfig0 = emqx_bridge_v2_testlib:get_value(action_config, Config),
ProbeRes = emqx_bridge_v2_testlib:probe_bridge_api(
Kind,
Type,
Name,
emqx_utils_maps:deep_merge(
ActionConfig0,
#{<<"parameters">> => #{<<"pubsub_topic">> => <<"i-dont-exist">>}}
)
),
?assertMatch(
{error, {{_, 400, _}, _, _}},
ProbeRes
),
{error, {{_, 400, _}, _, #{<<"message">> := Msg}}} = ProbeRes,
?assertMatch(match, re:run(Msg, <<"unhealthy_target">>, [{capture, none}]), #{
msg => Msg
}),
?assertMatch(match, re:run(Msg, <<"Topic does not exist">>, [{capture, none}]), #{
msg => Msg
}),
ok
end,
[]
),
ok.

View File

@ -0,0 +1 @@
Added a topic check when creating a GCP PubSub Producer action, so it now fails when the topic does not exist or the provided credentials do not have enough permissions to use it.