diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl index f27aab422..67218fcf0 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl @@ -198,13 +198,13 @@ get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) -> %%------------------------------------------------------------------------------------------------- -spec get_topic(topic(), state(), request_opts()) -> {ok, map()} | {error, term()}. -get_topic(Topic, ConnectorState, ReqOpts) -> - #{project_id := ProjectId} = ConnectorState, +get_topic(Topic, ClientState, ReqOpts) -> + #{project_id := ProjectId} = ClientState, Method = get, Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>, Body = <<>>, PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts}, - ?MODULE:query_sync(PreparedRequest, ConnectorState). + ?MODULE:query_sync(PreparedRequest, ClientState). %%------------------------------------------------------------------------------------------------- %% Helper fns diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 299b90226..13040dccf 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -186,10 +186,14 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, ConnectorState) -> {ok, connector_state()}. on_add_channel(_ConnectorResId, ConnectorState0, ActionId, ActionConfig) -> #{installed_actions := InstalledActions0} = ConnectorState0, - ChannelState = install_channel(ActionConfig), - InstalledActions = InstalledActions0#{ActionId => ChannelState}, - ConnectorState = ConnectorState0#{installed_actions := InstalledActions}, - {ok, ConnectorState}. + case install_channel(ActionConfig, ConnectorState0) of + {ok, ChannelState} -> + InstalledActions = InstalledActions0#{ActionId => ChannelState}, + ConnectorState = ConnectorState0#{installed_actions := InstalledActions}, + {ok, ConnectorState}; + Error = {error, _} -> + Error + end. -spec on_remove_channel( connector_resource_id(), @@ -218,8 +222,7 @@ on_get_channel_status(_ConnectorResId, _ChannelId, _ConnectorState) -> %% Helper fns %%------------------------------------------------------------------------------------------------- -%% TODO: check if topic exists ("unhealthy target") -install_channel(ActionConfig) -> +install_channel(ActionConfig, ConnectorState) -> #{ parameters := #{ attributes_template := AttributesTemplate, @@ -231,13 +234,27 @@ install_channel(ActionConfig) -> request_ttl := RequestTTL } } = ActionConfig, - #{ - attributes_template => preproc_attributes(AttributesTemplate), - ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate), - payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), - pubsub_topic => PubSubTopic, - request_ttl => RequestTTL - }. + #{client := Client} = ConnectorState, + case + emqx_bridge_gcp_pubsub_client:get_topic(PubSubTopic, Client, #{request_ttl => RequestTTL}) + of + {error, #{status_code := 404}} -> + {error, {unhealthy_target, <<"Topic does not exist">>}}; + {error, #{status_code := 403}} -> + {error, {unhealthy_target, <<"Permission denied for topic">>}}; + {error, #{status_code := 401}} -> + {error, {unhealthy_target, <<"Bad credentials">>}}; + {error, Reason} -> + {error, Reason}; + {ok, _} -> + {ok, #{ + attributes_template => preproc_attributes(AttributesTemplate), + ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate), + payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), + pubsub_topic => PubSubTopic, + request_ttl => RequestTTL + }} + end. -spec do_send_requests_sync( connector_state(), diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index 4acc5ff3c..6666a3fd0 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -76,6 +76,7 @@ only_sync_tests() -> [t_query_sync]. init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), Apps = emqx_cth_suite:start( [ emqx, @@ -257,20 +258,31 @@ create_rule_and_action_http(Config) -> success_http_handler() -> TestPid = self(), fun(Req0, State) -> - {ok, Body, Req} = cowboy_req:read_body(Req0), - TestPid ! {http, cowboy_req:headers(Req), Body}, - Rep = cowboy_req:reply( - 200, - #{<<"content-type">> => <<"application/json">>}, - emqx_utils_json:encode(#{messageIds => [<<"6058891368195201">>]}), - Req - ), - {ok, Rep, State} + case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of + {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} -> + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<"{}">>, + Req0 + ), + {ok, Rep, State}; + _ -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + emqx_utils_json:encode(#{messageIds => [<<"6058891368195201">>]}), + Req + ), + {ok, Rep, State} + end end. start_echo_http_server() -> HTTPHost = "localhost", - HTTPPath = <<"/v1/projects/myproject/topics/mytopic:publish">>, + HTTPPath = '_', ServerSSLOpts = [ {verify, verify_none}, @@ -656,6 +668,20 @@ wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) -> error({timeout_waiting_for_telemetry, EventName}) end. +kill_gun_process(EhttpcPid) -> + State = ehttpc:get_state(EhttpcPid, minimal), + GunPid = maps:get(client, State), + true = is_pid(GunPid), + _ = exit(GunPid, kill), + ok. + +kill_gun_processes(ConnectorResourceId) -> + Pool = ehttpc:workers(ConnectorResourceId), + Workers = lists:map(fun({_, Pid}) -> Pid end, Pool), + %% assert there is at least one pool member + ?assertMatch([_ | _], Workers), + lists:foreach(fun(Pid) -> kill_gun_process(Pid) end, Workers). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -1343,15 +1369,26 @@ t_failure_with_body(Config) -> TestPid = self(), FailureWithBodyHandler = fun(Req0, State) -> - {ok, Body, Req} = cowboy_req:read_body(Req0), - TestPid ! {http, cowboy_req:headers(Req), Body}, - Rep = cowboy_req:reply( - 400, - #{<<"content-type">> => <<"application/json">>}, - emqx_utils_json:encode(#{}), - Req - ), - {ok, Rep, State} + case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of + {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} -> + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<"{}">>, + Req0 + ), + {ok, Rep, State}; + _ -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = cowboy_req:reply( + 400, + #{<<"content-type">> => <<"application/json">>}, + emqx_utils_json:encode(#{}), + Req + ), + {ok, Rep, State} + end end, ok = emqx_bridge_http_connector_test_server:set_handler(FailureWithBodyHandler), Topic = <<"t/topic">>, @@ -1381,15 +1418,26 @@ t_failure_no_body(Config) -> TestPid = self(), FailureNoBodyHandler = fun(Req0, State) -> - {ok, Body, Req} = cowboy_req:read_body(Req0), - TestPid ! {http, cowboy_req:headers(Req), Body}, - Rep = cowboy_req:reply( - 400, - #{<<"content-type">> => <<"application/json">>}, - <<>>, - Req - ), - {ok, Rep, State} + case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of + {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} -> + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<"{}">>, + Req0 + ), + {ok, Rep, State}; + _ -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = cowboy_req:reply( + 400, + #{<<"content-type">> => <<"application/json">>}, + <<>>, + Req + ), + {ok, Rep, State} + end end, ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler), Topic = <<"t/topic">>, @@ -1415,20 +1463,6 @@ t_failure_no_body(Config) -> ), ok. -kill_gun_process(EhttpcPid) -> - State = ehttpc:get_state(EhttpcPid, minimal), - GunPid = maps:get(client, State), - true = is_pid(GunPid), - _ = exit(GunPid, kill), - ok. - -kill_gun_processes(ConnectorResourceId) -> - Pool = ehttpc:workers(ConnectorResourceId), - Workers = lists:map(fun({_, Pid}) -> Pid end, Pool), - %% assert there is at least one pool member - ?assertMatch([_ | _], Workers), - lists:foreach(fun(Pid) -> kill_gun_process(Pid) end, Workers). - t_unrecoverable_error(Config) -> ActionResourceId = ?config(action_resource_id, Config), ConnectorResourceId = ?config(connector_resource_id, Config), @@ -1436,19 +1470,30 @@ t_unrecoverable_error(Config) -> TestPid = self(), FailureNoBodyHandler = fun(Req0, State) -> - {ok, Body, Req} = cowboy_req:read_body(Req0), - TestPid ! {http, cowboy_req:headers(Req), Body}, - %% kill the gun process while it's waiting for the - %% response so we provoke an `{error, _}' response from - %% ehttpc. - ok = kill_gun_processes(ConnectorResourceId), - Rep = cowboy_req:reply( - 200, - #{<<"content-type">> => <<"application/json">>}, - <<>>, - Req - ), - {ok, Rep, State} + case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of + {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} -> + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<"{}">>, + Req0 + ), + {ok, Rep, State}; + _ -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + %% kill the gun process while it's waiting for the + %% response so we provoke an `{error, _}' response from + %% ehttpc. + ok = kill_gun_processes(ConnectorResourceId), + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<>>, + Req + ), + {ok, Rep, State} + end end, ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler), Topic = <<"t/topic">>, diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_producer_SUITE.erl new file mode 100644 index 000000000..f2255c343 --- /dev/null +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_producer_SUITE.erl @@ -0,0 +1,215 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_v2_gcp_pubsub_producer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_producer">>). +-define(ACTION_TYPE_BIN, <<"gcp_pubsub_producer">>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), + emqx_bridge_gcp_pubsub_consumer_SUITE:init_per_suite(Config). + +end_per_suite(Config) -> + emqx_bridge_gcp_pubsub_consumer_SUITE:end_per_suite(Config). + +init_per_testcase(TestCase, Config) -> + common_init_per_testcase(TestCase, Config). + +common_init_per_testcase(TestCase, Config0) -> + ct:timetrap(timer:seconds(60)), + ServiceAccountJSON = + #{<<"project_id">> := ProjectId} = + emqx_bridge_gcp_pubsub_utils:generate_service_account_json(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>, + ConnectorConfig = connector_config(Name, ServiceAccountJSON), + PubsubTopic = Name, + ActionConfig = action_config(#{ + connector => Name, + parameters => #{pubsub_topic => PubsubTopic} + }), + Config = [ + {bridge_kind, action}, + {action_type, ?ACTION_TYPE_BIN}, + {action_name, Name}, + {action_config, ActionConfig}, + {connector_name, Name}, + {connector_type, ?CONNECTOR_TYPE_BIN}, + {connector_config, ConnectorConfig}, + {service_account_json, ServiceAccountJSON}, + {project_id, ProjectId}, + {pubsub_topic, PubsubTopic} + | Config0 + ], + ok = emqx_bridge_gcp_pubsub_consumer_SUITE:ensure_topic(Config, PubsubTopic), + Config. + +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +connector_config(Name, ServiceAccountJSON) -> + InnerConfigMap0 = + #{ + <<"enable">> => true, + <<"tags">> => [<<"bridge">>], + <<"description">> => <<"my cool bridge">>, + <<"connect_timeout">> => <<"5s">>, + <<"pool_size">> => 8, + <<"pipelining">> => <<"100">>, + <<"max_retries">> => <<"2">>, + <<"service_account_json">> => ServiceAccountJSON, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"1s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> + } + }, + emqx_bridge_v2_testlib:parse_and_check_connector(?ACTION_TYPE_BIN, Name, InnerConfigMap0). + +action_config(Overrides0) -> + Overrides = emqx_utils_maps:binary_key_map(Overrides0), + CommonConfig = + #{ + <<"enable">> => true, + <<"connector">> => <<"please override">>, + <<"parameters">> => + #{ + <<"pubsub_topic">> => <<"please override">> + }, + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"batch_time">> => <<"0ms">>, + <<"buffer_mode">> => <<"memory_only">>, + <<"buffer_seg_bytes">> => <<"10MB">>, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"metrics_flush_interval">> => <<"1s">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"45s">>, + <<"resume_interval">> => <<"15s">>, + <<"worker_pool_size">> => <<"1">> + } + }, + maps:merge(CommonConfig, Overrides). + +assert_persisted_service_account_json_is_binary(ConnectorName) -> + %% ensure cluster.hocon has a binary encoded json string as the value + {ok, Hocon} = hocon:files([application:get_env(emqx, cluster_hocon_file, undefined)]), + ?assertMatch( + Bin when is_binary(Bin), + emqx_utils_maps:deep_get( + [ + <<"connectors">>, + <<"gcp_pubsub_producer">>, + ConnectorName, + <<"service_account_json">> + ], + Hocon + ) + ), + ok. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_start_stop(Config) -> + ok = emqx_bridge_v2_testlib:t_start_stop(Config, gcp_pubsub_stop), + ok. + +t_create_via_http(Config) -> + ok = emqx_bridge_v2_testlib:t_create_via_http(Config), + ok. + +t_create_via_http_json_object_service_account(Config0) -> + %% After the config goes through the roundtrip with `hocon_tconf:check_plain', service + %% account json comes back as a binary even if the input is a json object. + ConnectorName = ?config(connector_name, Config0), + ConnConfig0 = ?config(connector_config, Config0), + Config1 = proplists:delete(connector_config, Config0), + ConnConfig1 = maps:update_with( + <<"service_account_json">>, + fun(X) -> + ?assert(is_binary(X), #{json => X}), + JSON = emqx_utils_json:decode(X, [return_maps]), + ?assert(is_map(JSON)), + JSON + end, + ConnConfig0 + ), + Config = [{connector_config, ConnConfig1} | Config1], + ok = emqx_bridge_v2_testlib:t_create_via_http(Config), + assert_persisted_service_account_json_is_binary(ConnectorName), + ok. + +%% Check that creating an action (V2) with a non-existent topic leads returns an error. +t_bad_topic(Config) -> + ?check_trace( + begin + %% Should it really be 201 here? + ?assertMatch( + {ok, {{_, 201, _}, _, #{}}}, + emqx_bridge_v2_testlib:create_bridge_api( + Config, + #{<<"parameters">> => #{<<"pubsub_topic">> => <<"i-dont-exist">>}} + ) + ), + #{ + kind := Kind, + type := Type, + name := Name + } = emqx_bridge_v2_testlib:get_common_values(Config), + ActionConfig0 = emqx_bridge_v2_testlib:get_value(action_config, Config), + ProbeRes = emqx_bridge_v2_testlib:probe_bridge_api( + Kind, + Type, + Name, + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"pubsub_topic">> => <<"i-dont-exist">>}} + ) + ), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, + ProbeRes + ), + {error, {{_, 400, _}, _, #{<<"message">> := Msg}}} = ProbeRes, + ?assertMatch(match, re:run(Msg, <<"unhealthy_target">>, [{capture, none}]), #{ + msg => Msg + }), + ?assertMatch(match, re:run(Msg, <<"Topic does not exist">>, [{capture, none}]), #{ + msg => Msg + }), + ok + end, + [] + ), + ok. diff --git a/changes/ee/fix-12656.en.md b/changes/ee/fix-12656.en.md new file mode 100644 index 000000000..85e2a7e3b --- /dev/null +++ b/changes/ee/fix-12656.en.md @@ -0,0 +1 @@ +Added a topic check when creating a GCP PubSub Producer action, so it now fails when the topic does not exist or the provided credentials do not have enough permissions to use it.