diff --git a/.ci/docker-compose-file/docker-compose-gcp-emulator.yaml b/.ci/docker-compose-file/docker-compose-gcp-emulator.yaml
new file mode 100644
index 000000000..1f68e05d4
--- /dev/null
+++ b/.ci/docker-compose-file/docker-compose-gcp-emulator.yaml
@@ -0,0 +1,23 @@
+version: '3.9'
+
+services:
+ gcp_emulator:
+ container_name: gcp_emulator
+ image: gcr.io/google.com/cloudsdktool/google-cloud-cli:435.0.1-emulators
+ restart: always
+ expose:
+ - "8085"
+ # ports:
+ # - "8085:8085"
+ networks:
+ - emqx_bridge
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:8085"]
+ interval: 30s
+ timeout: 5s
+ retries: 4
+ command:
+ - bash
+ - "-c"
+ - |
+ gcloud beta emulators pubsub start --project=emqx-pubsub --host-port=0.0.0.0:8085 --impersonate-service-account test@emqx.iam.gserviceaccount.com
diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json
index 8695acde9..87878ac92 100644
--- a/.ci/docker-compose-file/toxiproxy.json
+++ b/.ci/docker-compose-file/toxiproxy.json
@@ -149,5 +149,11 @@
"listen": "0.0.0.0:19100",
"upstream": "minio-tls:9100",
"enabled": true
+ },
+ {
+ "name": "gcp_emulator",
+ "listen": "0.0.0.0:8085",
+ "upstream": "gcp_emulator:8085",
+ "enabled": true
}
]
diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl
index b19b7139f..db8669f49 100644
--- a/apps/emqx_bridge/src/emqx_bridge_resource.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl
@@ -54,7 +54,9 @@
(TYPE) =:= <<"mqtt">>
).
-define(IS_INGRESS_BRIDGE(TYPE),
- (TYPE) =:= <<"kafka_consumer">> orelse ?IS_BI_DIR_BRIDGE(TYPE)
+ (TYPE) =:= <<"kafka_consumer">> orelse
+ (TYPE) =:= <<"gcp_pubsub_consumer">> orelse
+ ?IS_BI_DIR_BRIDGE(TYPE)
).
-if(?EMQX_RELEASE_EDITION == ee).
diff --git a/apps/emqx_bridge_gcp_pubsub/docker-ct b/apps/emqx_bridge_gcp_pubsub/docker-ct
new file mode 100644
index 000000000..e08426b9c
--- /dev/null
+++ b/apps/emqx_bridge_gcp_pubsub/docker-ct
@@ -0,0 +1,2 @@
+toxiproxy
+gcp_emulator
diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl
index 1bd21ce5c..d9d18ec48 100644
--- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl
+++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl
@@ -7,7 +7,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
--import(hoconsc, [mk/2, enum/1, ref/2]).
+-import(hoconsc, [mk/2, enum/1]).
%% hocon_schema API
-export([
@@ -39,11 +39,22 @@ namespace() ->
roots() ->
[].
-fields("config") ->
+fields("config_producer") ->
emqx_bridge_schema:common_bridge_fields() ++
emqx_resource_schema:fields("resource_opts") ++
- fields(bridge_config);
-fields(bridge_config) ->
+ fields(connector_config) ++ fields(producer);
+fields("config_consumer") ->
+ emqx_bridge_schema:common_bridge_fields() ++
+ [
+ {resource_opts,
+ mk(
+ ref("consumer_resource_opts"),
+ #{required => true, desc => ?DESC(emqx_resource_schema, "creation_opts")}
+ )}
+ ] ++
+ fields(connector_config) ++
+ [{consumer, mk(ref(consumer), #{required => true, desc => ?DESC(consumer_opts)})}];
+fields(connector_config) ->
[
{connect_timeout,
sc(
@@ -88,6 +99,20 @@ fields(bridge_config) ->
desc => ?DESC("request_timeout")
}
)},
+ {service_account_json,
+ sc(
+ service_account_json(),
+ #{
+ required => true,
+ validator => fun ?MODULE:service_account_json_validator/1,
+ converter => fun ?MODULE:service_account_json_converter/1,
+ sensitive => true,
+ desc => ?DESC("service_account_json")
+ }
+ )}
+ ];
+fields(producer) ->
+ [
{payload_template,
sc(
binary(),
@@ -110,28 +135,88 @@ fields(bridge_config) ->
required => true,
desc => ?DESC("pubsub_topic")
}
+ )}
+ ];
+fields(consumer) ->
+ [
+ {ack_retry_interval,
+ mk(
+ emqx_schema:timeout_duration_ms(),
+ #{
+ default => <<"5s">>,
+ importance => ?IMPORTANCE_HIDDEN
+ }
)},
- {service_account_json,
- sc(
- service_account_json(),
+ {pull_max_messages,
+ mk(
+ pos_integer(),
+ #{default => 100, desc => ?DESC("consumer_pull_max_messages")}
+ )},
+ {pull_worker_multiplier,
+ mk(
+ pos_integer(),
+ #{
+ default => 1,
+ importance => ?IMPORTANCE_HIDDEN
+ }
+ )},
+ {topic_mapping,
+ mk(
+ hoconsc:array(ref(consumer_topic_mapping)),
#{
required => true,
- validator => fun ?MODULE:service_account_json_validator/1,
- converter => fun ?MODULE:service_account_json_converter/1,
- sensitive => true,
- desc => ?DESC("service_account_json")
+ validator => fun consumer_topic_mapping_validator/1,
+ desc => ?DESC("consumer_topic_mapping")
}
)}
];
-fields("get") ->
- emqx_bridge_schema:status_fields() ++ fields("post");
-fields("post") ->
- [type_field(), name_field() | fields("config")];
-fields("put") ->
- fields("config").
+fields(consumer_topic_mapping) ->
+ [
+ {pubsub_topic, mk(binary(), #{required => true, desc => ?DESC(consumer_pubsub_topic)})},
+ {mqtt_topic, mk(binary(), #{required => true, desc => ?DESC(consumer_mqtt_topic)})},
+ {qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})},
+ {payload_template,
+ mk(
+ string(),
+ #{default => <<"${.}">>, desc => ?DESC(consumer_mqtt_payload)}
+ )}
+ ];
+fields("consumer_resource_opts") ->
+ ResourceFields = emqx_resource_schema:fields("creation_opts"),
+ SupportedFields = [
+ auto_restart_interval,
+ health_check_interval,
+ request_ttl,
+ resume_interval,
+ worker_pool_size
+ ],
+ lists:filter(
+ fun({Field, _Sc}) -> lists:member(Field, SupportedFields) end,
+ ResourceFields
+ );
+fields("get_producer") ->
+ emqx_bridge_schema:status_fields() ++ fields("post_producer");
+fields("post_producer") ->
+ [type_field_producer(), name_field() | fields("config_producer")];
+fields("put_producer") ->
+ fields("config_producer");
+fields("get_consumer") ->
+ emqx_bridge_schema:status_fields() ++ fields("post_consumer");
+fields("post_consumer") ->
+ [type_field_consumer(), name_field() | fields("config_consumer")];
+fields("put_consumer") ->
+ fields("config_consumer").
-desc("config") ->
+desc("config_producer") ->
?DESC("desc_config");
+desc("config_consumer") ->
+ ?DESC("desc_config");
+desc("consumer_resource_opts") ->
+ ?DESC(emqx_resource_schema, "creation_opts");
+desc(consumer_topic_mapping) ->
+ ?DESC("consumer_topic_mapping");
+desc(consumer) ->
+ ?DESC("consumer");
desc(_) ->
undefined.
@@ -139,13 +224,19 @@ conn_bridge_examples(Method) ->
[
#{
<<"gcp_pubsub">> => #{
- summary => <<"GCP PubSub Bridge">>,
- value => values(Method)
+ summary => <<"GCP PubSub Producer Bridge">>,
+ value => values(producer, Method)
+ }
+ },
+ #{
+ <<"gcp_pubsub_consumer">> => #{
+ summary => <<"GCP PubSub Consumer Bridge">>,
+ value => values(consumer, Method)
}
}
].
-values(_Method) ->
+values(producer, _Method) ->
#{
pubsub_topic => <<"mytopic">>,
service_account_json =>
@@ -173,17 +264,71 @@ values(_Method) ->
<<"https://oauth2.googleapis.com/token">>,
type => <<"service_account">>
}
+ };
+values(consumer, _Method) ->
+ #{
+ connect_timeout => <<"15s">>,
+ consumer =>
+ #{
+ pull_max_messages => 100,
+ topic_mapping => [
+ #{
+ pubsub_topic => <<"pubsub-topic-1">>,
+ mqtt_topic => <<"mqtt/topic/1">>,
+ qos => 1,
+ payload_template => <<"${.}">>
+ },
+ #{
+ pubsub_topic => <<"pubsub-topic-2">>,
+ mqtt_topic => <<"mqtt/topic/2">>,
+ qos => 2,
+ payload_template =>
+ <<"v = ${.value}, a = ${.attributes}, o = ${.ordering_key}">>
+ }
+ ]
+ },
+ resource_opts => #{request_ttl => <<"20s">>},
+ service_account_json =>
+ #{
+ auth_provider_x509_cert_url =>
+ <<"https://www.googleapis.com/oauth2/v1/certs">>,
+ auth_uri =>
+ <<"https://accounts.google.com/o/oauth2/auth">>,
+ client_email =>
+ <<"test@myproject.iam.gserviceaccount.com">>,
+ client_id => <<"123812831923812319190">>,
+ client_x509_cert_url =>
+ <<
+ "https://www.googleapis.com/robot/v1/"
+ "metadata/x509/test%40myproject.iam.gserviceaccount.com"
+ >>,
+ private_key =>
+ <<
+ "-----BEGIN PRIVATE KEY-----\n"
+ "MIIEvQI..."
+ >>,
+ private_key_id => <<"kid">>,
+ project_id => <<"myproject">>,
+ token_uri =>
+ <<"https://oauth2.googleapis.com/token">>,
+ type => <<"service_account">>
+ }
}.
%%-------------------------------------------------------------------------------------------------
%% Helper fns
%%-------------------------------------------------------------------------------------------------
+ref(Name) -> hoconsc:ref(?MODULE, Name).
+
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
-type_field() ->
+type_field_producer() ->
{type, mk(enum([gcp_pubsub]), #{required => true, desc => ?DESC("desc_type")})}.
+type_field_consumer() ->
+ {type, mk(enum([gcp_pubsub_consumer]), #{required => true, desc => ?DESC("desc_type")})}.
+
name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
@@ -225,3 +370,16 @@ service_account_json_converter(Map) when is_map(Map) ->
maps:with(ExpectedKeys, Map);
service_account_json_converter(Val) ->
Val.
+
+consumer_topic_mapping_validator(_TopicMapping = []) ->
+ {error, "There must be at least one GCP PubSub-MQTT topic mapping"};
+consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
+ NumEntries = length(TopicMapping),
+ PubSubTopics = [KT || #{<<"pubsub_topic">> := KT} <- TopicMapping],
+ DistinctPubSubTopics = length(lists:usort(PubSubTopics)),
+ case DistinctPubSubTopics =:= NumEntries of
+ true ->
+ ok;
+ false ->
+ {error, "GCP PubSub topics must not be repeated in a bridge"}
+ end.
diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl
index f651bc95d..80bd50113 100644
--- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl
+++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl
@@ -24,9 +24,12 @@
]).
-export([reply_delegator/3]).
+-export([get_topic/3]).
+
-export([get_jwt_authorization_header/1]).
-type service_account_json() :: emqx_bridge_gcp_pubsub:service_account_json().
+-type project_id() :: binary().
-type config() :: #{
connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(),
@@ -39,17 +42,26 @@
jwt_config := emqx_connector_jwt:jwt_config(),
max_retries := non_neg_integer(),
pool_name := binary(),
- project_id := binary(),
+ project_id := project_id(),
request_ttl := infinity | timer:time()
}.
-type headers() :: [{binary(), iodata()}].
-type body() :: iodata().
-type status_code() :: 100..599.
--type method() :: post.
+-type method() :: get | post | put | patch.
-type path() :: binary().
-type prepared_request() :: {method(), path(), body()}.
+-type topic() :: binary().
--export_type([service_account_json/0, state/0, headers/0, body/0, status_code/0]).
+-export_type([
+ service_account_json/0,
+ state/0,
+ headers/0,
+ body/0,
+ status_code/0,
+ project_id/0,
+ topic/0
+]).
-define(DEFAULT_PIPELINE_SIZE, 100).
@@ -76,11 +88,22 @@ on_start(
}),
%% emulating the emulator behavior
%% https://cloud.google.com/pubsub/docs/emulator
- HostPort = os:getenv("PUBSUB_EMULATOR_HOST", "pubsub.googleapis.com:443"),
+ {Transport, HostPort} =
+ case os:getenv("PUBSUB_EMULATOR_HOST") of
+ false ->
+ {tls, "pubsub.googleapis.com:443"};
+ HostPort0 ->
+ %% The emulator is plain HTTP...
+ Transport0 = persistent_term:get({?MODULE, transport}, tcp),
+ {Transport0, HostPort0}
+ end,
#{hostname := Host, port := Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}),
PoolType = random,
- Transport = tls,
- TransportOpts = emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none}),
+ TransportOpts =
+ case Transport of
+ tls -> emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none});
+ tcp -> []
+ end,
NTransportOpts = emqx_utils:ipv6_probe(TransportOpts),
PoolOpts = [
{host, Host},
@@ -91,7 +114,7 @@ on_start(
{pool_size, PoolSize},
{transport, Transport},
{transport_opts, NTransportOpts},
- {enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)}
+ {enable_pipelining, maps:get(pipelining, Config, ?DEFAULT_PIPELINE_SIZE)}
],
#{
jwt_config := JWTConfig,
@@ -150,10 +173,7 @@ on_stop(ResourceId, _State) ->
{prepared_request, prepared_request()},
state()
) ->
- {ok, status_code(), headers()}
- | {ok, status_code(), headers(), body()}
- | {error, {recoverable_error, term()}}
- | {error, term()}.
+ {ok, map()} | {error, {recoverable_error, term()} | term()}.
on_query(ResourceId, {prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) ->
?TRACE(
"QUERY_SYNC",
@@ -194,6 +214,19 @@ on_get_status(ResourceId, #{connect_timeout := Timeout} = State) ->
disconnected
end.
+%%-------------------------------------------------------------------------------------------------
+%% API
+%%-------------------------------------------------------------------------------------------------
+
+-spec get_topic(resource_id(), topic(), state()) -> {ok, map()} | {error, term()}.
+get_topic(ResourceId, Topic, ConnectorState) ->
+ #{project_id := ProjectId} = ConnectorState,
+ Method = get,
+ Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
+ Body = <<>>,
+ PreparedRequest = {prepared_request, {Method, Path, Body}},
+ on_query(ResourceId, PreparedRequest, ConnectorState).
+
%%-------------------------------------------------------------------------------------------------
%% Helper fns
%%-------------------------------------------------------------------------------------------------
@@ -266,10 +299,7 @@ get_jwt_authorization_header(JWTConfig) ->
{prepared_request, prepared_request()},
resource_id()
) ->
- {ok, status_code(), headers()}
- | {ok, status_code(), headers(), body()}
- | {error, {recoverable_error, term()}}
- | {error, term()}.
+ {ok, map()} | {error, {recoverable_error, term()} | term()}.
do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceId) ->
#{
jwt_config := JWTConfig,
@@ -280,12 +310,17 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceI
?tp(
gcp_pubsub_bridge_do_send_requests,
#{
+ request => {prepared_request, {Method, Path, Body}},
query_mode => sync,
resource_id => ResourceId
}
),
Headers = get_jwt_authorization_header(JWTConfig),
- Request = {Path, Headers, Body},
+ Request =
+ case {Method, Body} of
+ {get, <<>>} -> {Path, Headers};
+ _ -> {Path, Headers, Body}
+ end,
Response = ehttpc:request(
PoolName,
Method,
@@ -312,12 +347,17 @@ do_send_requests_async(
?tp(
gcp_pubsub_bridge_do_send_requests,
#{
+ request => {prepared_request, {Method, Path, Body}},
query_mode => async,
resource_id => ResourceId
}
),
Headers = get_jwt_authorization_header(JWTConfig),
- Request = {Path, Headers, Body},
+ Request =
+ case {Method, Body} of
+ {get, <<>>} -> {Path, Headers};
+ _ -> {Path, Headers, Body}
+ end,
Worker = ehttpc_pool:pick_worker(PoolName),
ok = ehttpc:request_async(
Worker,
@@ -328,6 +368,7 @@ do_send_requests_async(
),
{ok, Worker}.
+-spec handle_response(term(), resource_id(), sync | async) -> {ok, map()} | {error, term()}.
handle_response(Result, ResourceId, QueryMode) ->
case Result of
{error, Reason} ->
diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl
new file mode 100644
index 000000000..79d0ad68a
--- /dev/null
+++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl
@@ -0,0 +1,552 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_consumer_worker).
+
+-behaviour(ecpool_worker).
+-behaviour(gen_server).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% `ecpool_worker' API
+-export([connect/1, health_check/1]).
+
+%% `gen_server' API
+-export([
+ init/1,
+ handle_info/2,
+ handle_cast/2,
+ handle_call/3,
+ handle_continue/2,
+ terminate/2
+]).
+
+-export([get_subscription/1]).
+-export([reply_delegator/3, pull_async/1, process_pull_response/2, ensure_subscription/1]).
+
+-type subscription_id() :: binary().
+-type bridge_name() :: atom() | binary().
+-type ack_id() :: binary().
+-type config() :: #{
+ ack_retry_interval := emqx_schema:timeout_duration_ms(),
+ connector_state := emqx_bridge_gcp_pubsub_connector:state(),
+ ecpool_worker_id => non_neg_integer(),
+ hookpoint := binary(),
+ instance_id := binary(),
+ mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
+ pull_max_messages := non_neg_integer(),
+ subscription_id => subscription_id(),
+ topic => emqx_bridge_gcp_pubsub_connector:topic()
+}.
+-type state() :: #{
+ ack_retry_interval := emqx_schema:timeout_duration_ms(),
+ ack_timer := undefined | reference(),
+ async_workers := #{pid() => reference()},
+ connector_state := emqx_bridge_gcp_pubsub_connector:state(),
+ ecpool_worker_id := non_neg_integer(),
+ hookpoint := binary(),
+ instance_id := binary(),
+ mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
+ pending_acks => [ack_id()],
+ pull_max_messages := non_neg_integer(),
+ subscription_id => subscription_id(),
+ topic => emqx_bridge_gcp_pubsub_connector:topic()
+}.
+-type decoded_message() :: map().
+
+-define(HEALTH_CHECK_TIMEOUT, 10_000).
+-define(OPTVAR_SUB_OK(PID), {?MODULE, PID}).
+
+%%-------------------------------------------------------------------------------------------------
+%% API used by `reply_delegator'
+%%-------------------------------------------------------------------------------------------------
+
+-spec pull_async(pid()) -> ok.
+pull_async(WorkerPid) ->
+ gen_server:cast(WorkerPid, pull_async).
+
+-spec process_pull_response(pid(), binary()) -> ok.
+process_pull_response(WorkerPid, RespBody) ->
+ gen_server:cast(WorkerPid, {process_pull_response, RespBody}).
+
+-spec ensure_subscription(pid()) -> ok.
+ensure_subscription(WorkerPid) ->
+ gen_server:cast(WorkerPid, ensure_subscription).
+
+-spec reply_delegator(pid(), binary(), {ok, map()} | {error, timeout | term()}) -> ok.
+reply_delegator(WorkerPid, InstanceId, Result) ->
+ case Result of
+ {error, timeout} ->
+ ?MODULE:pull_async(WorkerPid);
+ {error, Reason} ->
+ ?SLOG(warning, #{
+ msg => "gcp_pubsub_consumer_worker_pull_error",
+ instance_id => InstanceId,
+ reason => Reason
+ }),
+ case Reason of
+ #{status_code := 409} ->
+ %% the subscription was not found; deleted?!
+ ?MODULE:ensure_subscription(WorkerPid);
+ _ ->
+ ?MODULE:pull_async(WorkerPid)
+ end;
+ {ok, #{status_code := 200, body := RespBody}} ->
+ ?MODULE:process_pull_response(WorkerPid, RespBody)
+ end.
+
+%%-------------------------------------------------------------------------------------------------
+%% Debugging API
+%%-------------------------------------------------------------------------------------------------
+
+-spec get_subscription(pid()) -> {ok, map()} | {error, term()}.
+get_subscription(WorkerPid) ->
+ gen_server:call(WorkerPid, get_subscription, 5_000).
+
+%%-------------------------------------------------------------------------------------------------
+%% `ecpool' health check
+%%-------------------------------------------------------------------------------------------------
+
+-spec health_check(pid()) -> boolean().
+health_check(WorkerPid) ->
+ case optvar:read(?OPTVAR_SUB_OK(WorkerPid), ?HEALTH_CHECK_TIMEOUT) of
+ {ok, _} ->
+ true;
+ timeout ->
+ false
+ end.
+
+%%-------------------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------------------
+
+connect(Opts0) ->
+ Opts = maps:from_list(Opts0),
+ #{
+ ack_retry_interval := AckRetryInterval,
+ bridge_name := BridgeName,
+ connector_state := ConnectorState,
+ ecpool_worker_id := WorkerId,
+ hookpoint := Hookpoint,
+ instance_id := InstanceId,
+ pull_max_messages := PullMaxMessages,
+ topic_mapping := TopicMapping
+ } = Opts,
+ TopicMappingList = lists:keysort(1, maps:to_list(TopicMapping)),
+ Index = 1 + (WorkerId rem map_size(TopicMapping)),
+ {Topic, MQTTConfig} = lists:nth(Index, TopicMappingList),
+ Config = #{
+ ack_retry_interval => AckRetryInterval,
+ connector_state => ConnectorState,
+ hookpoint => Hookpoint,
+ instance_id => InstanceId,
+ mqtt_config => MQTTConfig,
+ pull_max_messages => PullMaxMessages,
+ topic => Topic,
+ subscription_id => subscription_id(BridgeName, Topic)
+ },
+ start_link(Config).
+
+%%-------------------------------------------------------------------------------------------------
+%% `gen_server' API
+%%-------------------------------------------------------------------------------------------------
+
+-spec init(config()) -> {ok, state(), {continue, ensure_subscription}}.
+init(Config) ->
+ process_flag(trap_exit, true),
+ State = Config#{
+ ack_timer => undefined,
+ async_workers => #{},
+ pending_acks => []
+ },
+ {ok, State, {continue, ensure_subscription}}.
+
+handle_continue(ensure_subscription, State0) ->
+ case ensure_subscription_exists(State0) of
+ ok ->
+ #{instance_id := InstanceId} = State0,
+ ?tp(
+ debug,
+ "gcp_pubsub_consumer_worker_subscription_ready",
+ #{instance_id => InstanceId}
+ ),
+ ?MODULE:pull_async(self()),
+ optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok),
+ {noreply, State0};
+ error ->
+ %% FIXME: add delay if topic does not exist?!
+ %% retry
+ {noreply, State0, {continue, ensure_subscription}}
+ end.
+
+handle_call(get_subscription, _From, State0) ->
+ Res = do_get_subscription(State0),
+ {reply, Res, State0};
+handle_call(_Request, _From, State0) ->
+ {reply, {error, unknown_call}, State0}.
+
+handle_cast(pull_async, State0) ->
+ State = do_pull_async(State0),
+ {noreply, State};
+handle_cast({process_pull_response, RespBody}, State0) ->
+ State = do_process_pull_response(State0, RespBody),
+ {noreply, State};
+handle_cast(ensure_subscription, State0) ->
+ {noreply, State0, {continue, ensure_subscription}};
+handle_cast(_Request, State0) ->
+ {noreply, State0}.
+
+handle_info({timeout, TRef, ack}, State0 = #{ack_timer := TRef}) ->
+ State1 = acknowledge(State0),
+ State = ensure_ack_timer(State1),
+ {noreply, State};
+handle_info(
+ {'DOWN', _Ref, process, AsyncWorkerPid, _Reason}, State0 = #{async_workers := Workers0}
+) when
+ is_map_key(AsyncWorkerPid, Workers0)
+->
+ Workers = maps:remove(AsyncWorkerPid, Workers0),
+ State1 = State0#{async_workers := Workers},
+ State = do_pull_async(State1),
+ {noreply, State};
+handle_info(Msg, State0) ->
+ #{
+ instance_id := InstanceId,
+ topic := Topic
+ } = State0,
+ ?SLOG(debug, #{
+ msg => "gcp_pubsub_consumer_worker_unexpected_message",
+ unexpected_msg => Msg,
+ instance_id => InstanceId,
+ topic => Topic
+ }),
+ {noreply, State0}.
+
+terminate(_Reason, _State) ->
+ optvar:unset(?OPTVAR_SUB_OK(self())),
+ ok.
+
+%%-------------------------------------------------------------------------------------------------
+%% Internal fns
+%%-------------------------------------------------------------------------------------------------
+
+-spec start_link(config()) -> gen_server:start_ret().
+start_link(Config) ->
+ gen_server:start_link(?MODULE, Config, []).
+
+-spec ensure_ack_timer(state()) -> state().
+ensure_ack_timer(State = #{pending_acks := []}) ->
+ State;
+ensure_ack_timer(State = #{ack_timer := TRef}) when is_reference(TRef) ->
+ State;
+ensure_ack_timer(State = #{ack_retry_interval := AckRetryInterval}) ->
+ State#{ack_timer := emqx_utils:start_timer(AckRetryInterval, ack)}.
+
+-spec ensure_subscription_exists(state()) -> ok | error.
+ensure_subscription_exists(State) ->
+ #{
+ connector_state := ConnectorState,
+ instance_id := InstanceId,
+ subscription_id := SubscriptionId,
+ topic := Topic
+ } = State,
+ Method = put,
+ Path = path(State, create),
+ Body = body(State, create),
+ PreparedRequest = {prepared_request, {Method, Path, Body}},
+ Res = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, PreparedRequest, ConnectorState),
+ case Res of
+ {error, #{status_code := 409}} ->
+ %% already exists
+ ?SLOG(debug, #{
+ msg => "gcp_pubsub_consumer_worker_subscription_already_exists",
+ instance_id => InstanceId,
+ topic => Topic,
+ subscription_id => SubscriptionId
+ }),
+ Method1 = patch,
+ Path1 = path(State, create),
+ Body1 = body(State, patch_subscription),
+ PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}},
+ Res1 = emqx_bridge_gcp_pubsub_connector:on_query(
+ InstanceId, PreparedRequest1, ConnectorState
+ ),
+ ?SLOG(debug, #{
+ msg => "gcp_pubsub_consumer_worker_subscription_patch",
+ instance_id => InstanceId,
+ topic => Topic,
+ subscription_id => SubscriptionId,
+ result => Res1
+ }),
+ ok;
+ {ok, #{status_code := 200}} ->
+ ?SLOG(debug, #{
+ msg => "gcp_pubsub_consumer_worker_subscription_created",
+ instance_id => InstanceId,
+ topic => Topic,
+ subscription_id => SubscriptionId
+ }),
+ ok;
+ {error, Reason} ->
+ ?SLOG(error, #{
+ msg => "gcp_pubsub_consumer_worker_subscription_error",
+ instance_id => InstanceId,
+ topic => Topic,
+ reason => Reason
+ }),
+ error
+ end.
+
+%% We use async requests so that this process will be more responsive to system messages.
+do_pull_async(State) ->
+ #{
+ connector_state := ConnectorState,
+ instance_id := InstanceId
+ } = State,
+ Method = post,
+ Path = path(State, pull),
+ Body = body(State, pull),
+ PreparedRequest = {prepared_request, {Method, Path, Body}},
+ ReplyFunAndArgs = {fun ?MODULE:reply_delegator/3, [self(), InstanceId]},
+ {ok, AsyncWorkerPid} = emqx_bridge_gcp_pubsub_connector:on_query_async(
+ InstanceId,
+ PreparedRequest,
+ ReplyFunAndArgs,
+ ConnectorState
+ ),
+ ensure_async_worker_monitored(State, AsyncWorkerPid).
+
+-spec ensure_async_worker_monitored(state(), pid()) -> state().
+ensure_async_worker_monitored(State = #{async_workers := Workers0}, AsyncWorkerPid) ->
+ case is_map_key(AsyncWorkerPid, Workers0) of
+ true ->
+ State;
+ false ->
+ Ref = monitor(process, AsyncWorkerPid),
+ Workers = Workers0#{AsyncWorkerPid => Ref},
+ State#{async_workers := Workers}
+ end.
+
+-spec do_process_pull_response(state(), binary()) -> state().
+do_process_pull_response(State0, RespBody) ->
+ Messages = decode_response(RespBody),
+ AckIds = lists:map(fun(Msg) -> handle_message(State0, Msg) end, Messages),
+ State1 = maps:update_with(pending_acks, fun(AckIds0) -> AckIds0 ++ AckIds end, State0),
+ State2 = acknowledge(State1),
+ pull_async(self()),
+ ensure_ack_timer(State2).
+
+-spec acknowledge(state()) -> state().
+acknowledge(State0 = #{pending_acks := []}) ->
+ State0;
+acknowledge(State0) ->
+ State1 = State0#{ack_timer := undefined},
+ #{
+ connector_state := ConnectorState,
+ instance_id := InstanceId,
+ pending_acks := AckIds
+ } = State1,
+ Method = post,
+ Path = path(State1, ack),
+ Body = body(State1, ack, #{ack_ids => AckIds}),
+ PreparedRequest = {prepared_request, {Method, Path, Body}},
+ Res = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, PreparedRequest, ConnectorState),
+ case Res of
+ {error, Reason} ->
+ ?SLOG(warning, #{msg => "gcp_pubsub_consumer_worker_ack_error", reason => Reason}),
+ State1;
+ {ok, #{status_code := 200}} ->
+ ?tp(gcp_pubsub_consumer_worker_acknowledged, #{ack_ids => AckIds}),
+ State1#{pending_acks := []};
+ {ok, Details} ->
+ ?SLOG(warning, #{msg => "gcp_pubsub_consumer_worker_ack_error", details => Details}),
+ State1
+ end.
+
+do_get_subscription(State) ->
+ #{
+ connector_state := ConnectorState,
+ instance_id := InstanceId
+ } = State,
+ Method = get,
+ Path = path(State, get_subscription),
+ Body = body(State, get_subscription),
+ PreparedRequest = {prepared_request, {Method, Path, Body}},
+ Res = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, PreparedRequest, ConnectorState),
+ case Res of
+ {error, Reason} ->
+ ?SLOG(warning, #{
+ msg => "gcp_pubsub_consumer_worker_get_subscription_error",
+ reason => Reason
+ }),
+ {error, Reason};
+ {ok, #{status_code := 200, body := RespBody}} ->
+ DecodedBody = emqx_utils_json:decode(RespBody, [return_maps]),
+ {ok, DecodedBody};
+ {ok, Details} ->
+ ?SLOG(warning, #{
+ msg => "gcp_pubsub_consumer_worker_get_subscription_unexpected_response",
+ details => Details
+ }),
+ {error, Details}
+ end.
+
+-spec subscription_id(bridge_name(), emqx_bridge_gcp_pubsub_connector:topic()) -> subscription_id().
+subscription_id(BridgeName0, Topic) ->
+ %% The real GCP PubSub accepts colons in subscription names, but its emulator
+ %% doesn't... We currently validate bridge names to not include that character. The
+ %% exception is the prefix from the probe API.
+ BridgeName1 = to_bin(BridgeName0),
+ BridgeName = binary:replace(BridgeName1, <<":">>, <<"-">>),
+ to_bin(uri_string:quote(<<"emqx-sub-", BridgeName/binary, "-", Topic/binary>>)).
+
+-spec path(state(), pull | create | ack | get_subscription) -> binary().
+path(State, Type) ->
+ #{
+ connector_state := #{project_id := ProjectId},
+ subscription_id := SubscriptionId
+ } = State,
+ SubscriptionResource = subscription_resource(ProjectId, SubscriptionId),
+ case Type of
+ pull ->
+ <<"/v1/", SubscriptionResource/binary, ":pull">>;
+ create ->
+ <<"/v1/", SubscriptionResource/binary>>;
+ ack ->
+ <<"/v1/", SubscriptionResource/binary, ":acknowledge">>;
+ get_subscription ->
+ <<"/v1/", SubscriptionResource/binary>>
+ end.
+
+-spec body(state(), pull | create | patch_subscription | get_subscription) -> binary().
+body(State, pull) ->
+ #{pull_max_messages := PullMaxMessages} = State,
+ emqx_utils_json:encode(#{<<"maxMessages">> => PullMaxMessages});
+body(State, create) ->
+ #{
+ ack_retry_interval := AckRetryInterval,
+ connector_state := #{project_id := ProjectId},
+ topic := PubSubTopic
+ } = State,
+ TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>,
+ AckDeadlineSeconds = 5 + erlang:convert_time_unit(AckRetryInterval, millisecond, second),
+ JSON = #{
+ <<"topic">> => TopicResource,
+ <<"ackDeadlineSeconds">> => AckDeadlineSeconds
+ },
+ emqx_utils_json:encode(JSON);
+body(State, patch_subscription) ->
+ #{
+ ack_retry_interval := AckRetryInterval,
+ connector_state := #{project_id := ProjectId},
+ topic := PubSubTopic,
+ subscription_id := SubscriptionId
+ } = State,
+ TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>,
+ SubscriptionResource = subscription_resource(ProjectId, SubscriptionId),
+ AckDeadlineSeconds = 5 + erlang:convert_time_unit(AckRetryInterval, millisecond, second),
+ JSON = #{
+ <<"subscription">> =>
+ #{
+ <<"ackDeadlineSeconds">> => AckDeadlineSeconds,
+ <<"name">> => SubscriptionResource,
+ <<"topic">> => TopicResource
+ },
+ %% topic is immutable; don't add it here.
+ <<"updateMask">> => <<"ackDeadlineSeconds">>
+ },
+ emqx_utils_json:encode(JSON);
+body(_State, get_subscription) ->
+ <<>>.
+
+-spec body(state(), ack, map()) -> binary().
+body(_State, ack, Opts) ->
+ #{ack_ids := AckIds} = Opts,
+ JSON = #{<<"ackIds">> => AckIds},
+ emqx_utils_json:encode(JSON).
+
+-spec subscription_resource(emqx_bridge_gcp_pubsub_connector:project_id(), subscription_id()) ->
+ binary().
+subscription_resource(ProjectId, SubscriptionId) ->
+ <<"projects/", ProjectId/binary, "/subscriptions/", SubscriptionId/binary>>.
+
+-spec decode_response(binary()) -> [decoded_message()].
+decode_response(RespBody) ->
+ case emqx_utils_json:decode(RespBody, [return_maps]) of
+ #{<<"receivedMessages">> := Msgs0} ->
+ lists:map(
+ fun(Msg0 = #{<<"message">> := InnerMsg0}) ->
+ InnerMsg = emqx_utils_maps:update_if_present(
+ <<"data">>, fun base64:decode/1, InnerMsg0
+ ),
+ Msg0#{<<"message">> := InnerMsg}
+ end,
+ Msgs0
+ );
+ #{} ->
+ []
+ end.
+
+-spec handle_message(state(), decoded_message()) -> [ack_id()].
+handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Message) ->
+ ?tp(
+ debug,
+ "gcp_pubsub_consumer_worker_handle_message",
+ #{message_id => maps:get(<<"messageId">>, InnerMsg), message => _Message, ack_id => AckId}
+ ),
+ #{
+ instance_id := InstanceId,
+ hookpoint := Hookpoint,
+ mqtt_config := #{
+ payload_template := PayloadTemplate,
+ qos := MQTTQoS,
+ mqtt_topic := MQTTTopic
+ },
+ topic := Topic
+ } = State,
+ #{
+ <<"messageId">> := MessageId,
+ <<"publishTime">> := PublishTime
+ } = InnerMsg,
+ FullMessage0 = #{
+ message_id => MessageId,
+ publish_time => PublishTime,
+ topic => Topic
+ },
+ FullMessage =
+ lists:foldl(
+ fun({FromKey, ToKey}, Acc) ->
+ add_if_present(FromKey, InnerMsg, ToKey, Acc)
+ end,
+ FullMessage0,
+ [
+ {<<"data">>, value},
+ {<<"attributes">>, attributes},
+ {<<"orderingKey">>, ordering_key}
+ ]
+ ),
+ Payload = render(FullMessage, PayloadTemplate),
+ MQTTMessage = emqx_message:make(InstanceId, MQTTQoS, MQTTTopic, Payload),
+ _ = emqx:publish(MQTTMessage),
+ emqx:run_hook(Hookpoint, [FullMessage]),
+ emqx_resource_metrics:received_inc(InstanceId),
+ AckId.
+
+-spec add_if_present(any(), map(), any(), map()) -> map().
+add_if_present(FromKey, Message, ToKey, Map) ->
+ case maps:get(FromKey, Message, undefined) of
+ undefined ->
+ Map;
+ Value ->
+ Map#{ToKey => Value}
+ end.
+
+render(FullMessage, PayloadTemplate) ->
+ Opts = #{return => full_binary},
+ emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts).
+
+to_bin(A) when is_atom(A) -> atom_to_binary(A);
+to_bin(L) when is_list(L) -> iolist_to_binary(L);
+to_bin(B) when is_binary(B) -> B.
diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl
new file mode 100644
index 000000000..73220a807
--- /dev/null
+++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl
@@ -0,0 +1,200 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_impl_consumer).
+
+-behaviour(emqx_resource).
+
+%% `emqx_resource' API
+-export([
+ callback_mode/0,
+ query_mode/1,
+ on_start/2,
+ on_stop/2,
+ on_get_status/2
+]).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-type mqtt_config() :: #{
+ mqtt_topic := emqx_types:topic(),
+ qos := emqx_types:qos(),
+ payload_template := string()
+}.
+-type config() :: #{
+ connect_timeout := emqx_schema:duration_ms(),
+ max_retries := non_neg_integer(),
+ pool_size := non_neg_integer(),
+ resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
+ service_account_json := emqx_bridge_gcp_pubsub_connector:service_account_json(),
+ any() => term()
+}.
+-type state() :: #{
+ connector_state := emqx_bridge_gcp_pubsub_connector:state()
+}.
+
+-export_type([mqtt_config/0]).
+
+-define(AUTO_RECONNECT_S, 2).
+
+%%-------------------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------------------
+
+-spec callback_mode() -> callback_mode().
+callback_mode() -> async_if_possible.
+
+-spec query_mode(any()) -> query_mode().
+query_mode(_Config) -> no_queries.
+
+-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
+on_start(InstanceId, Config) ->
+ case emqx_bridge_gcp_pubsub_connector:on_start(InstanceId, Config) of
+ {ok, ConnectorState} ->
+ start_consumers(InstanceId, ConnectorState, Config);
+ Error ->
+ Error
+ end.
+
+-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
+on_stop(InstanceId, #{connector_state := ConnectorState}) ->
+ ok = stop_consumers(InstanceId),
+ emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState);
+on_stop(InstanceId, undefined = _State) ->
+ ok = stop_consumers(InstanceId),
+ emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, undefined).
+
+-spec on_get_status(resource_id(), state()) -> connected | disconnected.
+on_get_status(InstanceId, _State) ->
+ case
+ emqx_resource_pool:health_check_workers(
+ InstanceId,
+ fun emqx_bridge_gcp_pubsub_consumer_worker:health_check/1
+ )
+ of
+ true -> connected;
+ false -> connecting
+ end.
+
+%%-------------------------------------------------------------------------------------------------
+%% Internal fns
+%%-------------------------------------------------------------------------------------------------
+
+start_consumers(InstanceId, ConnectorState, Config) ->
+ #{
+ bridge_name := BridgeName,
+ consumer := ConsumerConfig0,
+ hookpoint := Hookpoint
+ } = Config,
+ ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0),
+ TopicMapping = maps:get(topic_mapping, ConsumerConfig1),
+ PullWorkerMultiplier = maps:get(pull_worker_multiplier, ConsumerConfig1),
+ PoolSize = map_size(TopicMapping) * PullWorkerMultiplier,
+ ConsumerConfig = ConsumerConfig1#{
+ auto_reconnect => ?AUTO_RECONNECT_S,
+ bridge_name => BridgeName,
+ connector_state => ConnectorState,
+ hookpoint => Hookpoint,
+ instance_id => InstanceId,
+ pool_size => PoolSize
+ },
+ ConsumerOpts = maps:to_list(ConsumerConfig),
+ %% FIXME: mark as unhealthy if topics do not exist!
+ case validate_pubsub_topics(InstanceId, TopicMapping, ConnectorState) of
+ ok ->
+ ok;
+ error ->
+ _ = emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState),
+ throw(
+ "GCP PubSub topics are invalid. Please check the logs, check if the "
+ "topic exists in GCP and if the service account has permissions to use them."
+ )
+ end,
+ case
+ emqx_resource_pool:start(InstanceId, emqx_bridge_gcp_pubsub_consumer_worker, ConsumerOpts)
+ of
+ ok ->
+ State = #{
+ connector_state => ConnectorState,
+ pool_name => InstanceId
+ },
+ {ok, State};
+ {error, Reason} ->
+ _ = emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState),
+ {error, Reason}
+ end.
+
+stop_consumers(InstanceId) ->
+ _ = log_when_error(
+ fun() ->
+ ok = emqx_resource_pool:stop(InstanceId)
+ end,
+ #{
+ msg => "failed_to_stop_pull_worker_pool",
+ instance_id => InstanceId
+ }
+ ),
+ ok.
+
+convert_topic_mapping(TopicMappingList) ->
+ lists:foldl(
+ fun(Fields, Acc) ->
+ #{
+ pubsub_topic := PubSubTopic,
+ mqtt_topic := MQTTTopic,
+ qos := QoS,
+ payload_template := PayloadTemplate0
+ } = Fields,
+ PayloadTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate0),
+ Acc#{
+ PubSubTopic => #{
+ payload_template => PayloadTemplate,
+ mqtt_topic => MQTTTopic,
+ qos => QoS
+ }
+ }
+ end,
+ #{},
+ TopicMappingList
+ ).
+
+validate_pubsub_topics(InstanceId, TopicMapping, ConnectorState) ->
+ PubSubTopics = maps:keys(TopicMapping),
+ do_validate_pubsub_topics(InstanceId, ConnectorState, PubSubTopics).
+
+do_validate_pubsub_topics(InstanceId, ConnectorState, [Topic | Rest]) ->
+ case check_for_topic_existence(InstanceId, Topic, ConnectorState) of
+ ok ->
+ do_validate_pubsub_topics(InstanceId, ConnectorState, Rest);
+ {error, _} ->
+ error
+ end;
+do_validate_pubsub_topics(_InstanceId, _ConnectorState, []) ->
+ %% we already validate that the mapping is not empty in the config schema.
+ ok.
+
+check_for_topic_existence(InstanceId, Topic, ConnectorState) ->
+ Res = emqx_bridge_gcp_pubsub_connector:get_topic(InstanceId, Topic, ConnectorState),
+ case Res of
+ {ok, _} ->
+ ok;
+ {error, #{status_code := 404}} ->
+ {error, not_found};
+ {error, Details} ->
+ ?tp(warning, "gcp_pubsub_consumer_check_topic_error", Details),
+ {error, Details}
+ end.
+
+log_when_error(Fun, Log) ->
+ try
+ Fun()
+ catch
+ C:E ->
+ ?SLOG(error, Log#{
+ exception => C,
+ reason => E
+ })
+ end.
diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl
new file mode 100644
index 000000000..41b2a11b2
--- /dev/null
+++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl
@@ -0,0 +1,690 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_consumer_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("jose/include/jose_jwt.hrl").
+-include_lib("jose/include/jose_jws.hrl").
+
+-define(BRIDGE_TYPE, gcp_pubsub_consumer).
+-define(BRIDGE_TYPE_BIN, <<"gcp_pubsub_consumer">>).
+-define(REPUBLISH_TOPIC, <<"republish/t">>).
+
+-import(emqx_common_test_helpers, [on_exit/1]).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+ emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+ GCPEmulatorHost = os:getenv("GCP_EMULATOR_HOST", "toxiproxy"),
+ GCPEmulatorPortStr = os:getenv("GCP_EMULATOR_PORT", "8085"),
+ GCPEmulatorPort = list_to_integer(GCPEmulatorPortStr),
+ ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+ ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+ ProxyName = "gcp_emulator",
+ case emqx_common_test_helpers:is_tcp_server_available(GCPEmulatorHost, GCPEmulatorPort) of
+ true ->
+ emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+ ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+ ok = emqx_connector_test_helpers:start_apps([
+ emqx_resource, emqx_bridge, emqx_rule_engine
+ ]),
+ {ok, _} = application:ensure_all_started(emqx_connector),
+ emqx_mgmt_api_test_util:init_suite(),
+ HostPort = GCPEmulatorHost ++ ":" ++ GCPEmulatorPortStr,
+ true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort),
+ ConnectorState = start_control_connector(),
+ [
+ {proxy_name, ProxyName},
+ {proxy_host, ProxyHost},
+ {proxy_port, ProxyPort},
+ {gcp_emulator_host, GCPEmulatorHost},
+ {gcp_emulator_port, GCPEmulatorPort},
+ {connector_state, ConnectorState}
+ | Config
+ ];
+ false ->
+ case os:getenv("IS_CI") of
+ "yes" ->
+ throw(no_gcp_emulator);
+ _ ->
+ {skip, no_gcp_emulator}
+ end
+ end.
+
+end_per_suite(Config) ->
+ ConnectorState = ?config(connector_state, Config),
+ stop_control_connector(ConnectorState),
+ emqx_mgmt_api_test_util:end_suite(),
+ ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
+ ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
+ _ = application:stop(emqx_connector),
+ os:unsetenv("PUBSUB_EMULATOR_HOST"),
+ ok.
+
+init_per_testcase(TestCase, Config) ->
+ common_init_per_testcase(TestCase, Config).
+
+common_init_per_testcase(TestCase, Config0) ->
+ ct:timetrap(timer:seconds(60)),
+ emqx_bridge_testlib:delete_all_bridges(),
+ emqx_config:delete_override_conf_files(),
+ ConsumerTopic =
+ <<
+ (atom_to_binary(TestCase))/binary,
+ (integer_to_binary(erlang:unique_integer()))/binary
+ >>,
+ UniqueNum = integer_to_binary(erlang:unique_integer()),
+ MQTTTopic = proplists:get_value(mqtt_topic, Config0, <<"mqtt/topic/", UniqueNum/binary>>),
+ MQTTQoS = proplists:get_value(mqtt_qos, Config0, 0),
+ DefaultTopicMapping = [
+ #{
+ pubsub_topic => ConsumerTopic,
+ mqtt_topic => MQTTTopic,
+ qos => MQTTQoS,
+ payload_template => <<"${.}">>
+ }
+ ],
+ TopicMapping = proplists:get_value(topic_mapping, Config0, DefaultTopicMapping),
+ ServiceAccountJSON =
+ #{<<"project_id">> := ProjectId} =
+ emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
+ Config = [
+ {consumer_topic, ConsumerTopic},
+ {topic_mapping, TopicMapping},
+ {service_account_json, ServiceAccountJSON},
+ {project_id, ProjectId}
+ | Config0
+ ],
+ {Name, ConfigString, ConsumerConfig} = consumer_config(TestCase, Config),
+ ensure_topics(Config),
+ ok = snabbkaffe:start_trace(),
+ [
+ {consumer_name, Name},
+ {consumer_config_string, ConfigString},
+ {consumer_config, ConsumerConfig}
+ | Config
+ ].
+
+end_per_testcase(_Testcase, Config) ->
+ case proplists:get_bool(skip_does_not_apply, Config) of
+ true ->
+ ok;
+ false ->
+ ProxyHost = ?config(proxy_host, Config),
+ ProxyPort = ?config(proxy_port, Config),
+ emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+ emqx_bridge_testlib:delete_all_bridges(),
+ emqx_common_test_helpers:call_janitor(60_000),
+ ok = snabbkaffe:stop(),
+ ok
+ end.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+consumer_config(TestCase, Config) ->
+ UniqueNum = integer_to_binary(erlang:unique_integer()),
+ ConsumerTopic = ?config(consumer_topic, Config),
+ ServiceAccountJSON = ?config(service_account_json, Config),
+ Name = <<
+ (atom_to_binary(TestCase))/binary, UniqueNum/binary
+ >>,
+ ServiceAccountJSONStr = emqx_utils_json:encode(ServiceAccountJSON),
+ MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>),
+ MQTTQoS = proplists:get_value(mqtt_qos, Config, 0),
+ PullWorkerMultiplier = proplists:get_value(pull_worker_multiplier, Config, 1),
+ DefaultTopicMapping = [
+ #{
+ pubsub_topic => ConsumerTopic,
+ mqtt_topic => MQTTTopic,
+ qos => MQTTQoS,
+ payload_template => <<"${.}">>
+ }
+ ],
+ TopicMapping0 = proplists:get_value(topic_mapping, Config, DefaultTopicMapping),
+ TopicMappingStr = topic_mapping(TopicMapping0),
+ ConfigString =
+ io_lib:format(
+ "bridges.gcp_pubsub_consumer.~s {\n"
+ " enable = true\n"
+ %% gcp pubsub emulator doesn't do pipelining very well...
+ " pipelining = 1\n"
+ " connect_timeout = \"15s\"\n"
+ " service_account_json = ~s\n"
+ " consumer {\n"
+ " ack_retry_interval = \"5s\"\n"
+ " pull_max_messages = 10\n"
+ " pull_worker_multiplier = ~b\n"
+ %% topic mapping
+ "~s"
+ " }\n"
+ " max_retries = 2\n"
+ " pipelining = 100\n"
+ " pool_size = 8\n"
+ " resource_opts {\n"
+ " health_check_interval = \"1s\"\n"
+ " request_ttl = \"15s\"\n"
+ " }\n"
+ "}\n",
+ [
+ Name,
+ ServiceAccountJSONStr,
+ PullWorkerMultiplier,
+ TopicMappingStr
+ ]
+ ),
+ {Name, ConfigString, parse_and_check(ConfigString, Name)}.
+
+parse_and_check(ConfigString, Name) ->
+ {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+ TypeBin = ?BRIDGE_TYPE_BIN,
+ hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+ #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
+ Config.
+
+topic_mapping(TopicMapping0) ->
+ Template0 = <<
+ "{pubsub_topic = \"{{ pubsub_topic }}\","
+ " mqtt_topic = \"{{ mqtt_topic }}\","
+ " qos = {{ qos }},"
+ " payload_template = \"{{{ payload_template }}}\" }"
+ >>,
+ Template = bbmustache:parse_binary(Template0),
+ Entries =
+ lists:map(
+ fun(Params) ->
+ bbmustache:compile(Template, Params, [{key_type, atom}])
+ end,
+ TopicMapping0
+ ),
+ iolist_to_binary(
+ [
+ " topic_mapping = [",
+ lists:join(<<",\n">>, Entries),
+ "]\n"
+ ]
+ ).
+
+ensure_topics(Config) ->
+ TopicMapping = ?config(topic_mapping, Config),
+ lists:foreach(
+ fun(#{pubsub_topic := T}) ->
+ ensure_topic(Config, T)
+ end,
+ TopicMapping
+ ).
+
+ensure_topic(Config, Topic) ->
+ ProjectId = ?config(project_id, Config),
+ ConnectorState = #{pool_name := PoolName} = ?config(connector_state, Config),
+ Method = put,
+ Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
+ Body = <<"{}">>,
+ Res = emqx_bridge_gcp_pubsub_connector:on_query(
+ PoolName,
+ {prepared_request, {Method, Path, Body}},
+ ConnectorState
+ ),
+ case Res of
+ {ok, _} ->
+ ok;
+ {error, #{status_code := 409}} ->
+ %% already exists
+ ok
+ end,
+ ok.
+
+start_control_connector() ->
+ RawServiceAccount = emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
+ ServiceAccount = emqx_utils_maps:unsafe_atom_key_map(RawServiceAccount),
+ ConnectorConfig =
+ #{
+ connect_timeout => 5_000,
+ max_retries => 0,
+ pool_size => 1,
+ resource_opts => #{request_ttl => 5_000},
+ service_account_json => ServiceAccount
+ },
+ PoolName = <<"control_connector">>,
+ {ok, ConnectorState} = emqx_bridge_gcp_pubsub_connector:on_start(PoolName, ConnectorConfig),
+ ConnectorState.
+
+stop_control_connector(ConnectorState) ->
+ #{pool_name := PoolName} = ConnectorState,
+ ok = emqx_bridge_gcp_pubsub_connector:on_stop(PoolName, ConnectorState),
+ ok.
+
+pubsub_publish(Config, Topic, Messages0) ->
+ ConnectorState = #{pool_name := PoolName} = ?config(connector_state, Config),
+ ProjectId = ?config(project_id, Config),
+ Method = post,
+ Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary, ":publish">>,
+ Messages =
+ lists:map(
+ fun(Msg) ->
+ emqx_utils_maps:update_if_present(
+ <<"data">>,
+ fun
+ (D) when is_binary(D) -> base64:encode(D);
+ (M) when is_map(M) -> base64:encode(emqx_utils_json:encode(M))
+ end,
+ Msg
+ )
+ end,
+ Messages0
+ ),
+ Body = emqx_utils_json:encode(#{<<"messages">> => Messages}),
+ {ok, _} = emqx_bridge_gcp_pubsub_connector:on_query(
+ PoolName,
+ {prepared_request, {Method, Path, Body}},
+ ConnectorState
+ ),
+ ok.
+
+create_bridge(Config) ->
+ create_bridge(Config, _Overrides = #{}).
+
+create_bridge(Config, Overrides) ->
+ Type = ?BRIDGE_TYPE_BIN,
+ Name = ?config(consumer_name, Config),
+ BridgeConfig0 = ?config(consumer_config, Config),
+ BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
+ emqx_bridge:create(Type, Name, BridgeConfig).
+
+create_bridge_api(Config) ->
+ create_bridge_api(Config, _Overrides = #{}).
+
+create_bridge_api(Config, Overrides) ->
+ TypeBin = ?BRIDGE_TYPE_BIN,
+ Name = ?config(consumer_name, Config),
+ BridgeConfig0 = ?config(consumer_config, Config),
+ BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
+ Params = BridgeConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+ Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+ Opts = #{return_all => true},
+ ct:pal("creating bridge (via http): ~p", [Params]),
+ Res =
+ case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+ {ok, {Status, Headers, Body0}} ->
+ {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
+ Error ->
+ Error
+ end,
+ ct:pal("bridge create result: ~p", [Res]),
+ Res.
+
+probe_bridge_api(Config) ->
+ TypeBin = ?BRIDGE_TYPE_BIN,
+ Name = ?config(consumer_name, Config),
+ ConsumerConfig = ?config(consumer_config, Config),
+ Params = ConsumerConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+ Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+ Opts = #{return_all => true},
+ ct:pal("probing bridge (via http): ~p", [Params]),
+ Res =
+ case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+ {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
+ Error -> Error
+ end,
+ ct:pal("bridge probe result: ~p", [Res]),
+ Res.
+
+start_and_subscribe_mqtt(Config) ->
+ TopicMapping = ?config(topic_mapping, Config),
+ {ok, C} = emqtt:start_link([{proto_ver, v5}]),
+ on_exit(fun() -> emqtt:stop(C) end),
+ {ok, _} = emqtt:connect(C),
+ lists:foreach(
+ fun(#{mqtt_topic := MQTTTopic}) ->
+ {ok, _, [2]} = emqtt:subscribe(C, MQTTTopic, _QoS = 2)
+ end,
+ TopicMapping
+ ),
+ ok.
+
+resource_id(Config) ->
+ Type = ?BRIDGE_TYPE_BIN,
+ Name = ?config(consumer_name, Config),
+ emqx_bridge_resource:resource_id(Type, Name).
+
+receive_published() ->
+ receive_published(#{}).
+
+receive_published(Opts0) ->
+ Default = #{n => 1, timeout => 20_000},
+ Opts = maps:merge(Default, Opts0),
+ receive_published(Opts, []).
+
+receive_published(#{n := N, timeout := _Timeout}, Acc) when N =< 0 ->
+ {ok, lists:reverse(Acc)};
+receive_published(#{n := N, timeout := Timeout} = Opts, Acc) ->
+ receive
+ {publish, Msg0 = #{payload := Payload}} ->
+ Msg =
+ case emqx_utils_json:safe_decode(Payload, [return_maps]) of
+ {ok, Decoded} -> Msg0#{payload := Decoded};
+ {error, _} -> Msg0
+ end,
+ receive_published(Opts#{n := N - 1}, [Msg | Acc])
+ after Timeout ->
+ {timeout, #{
+ msgs_so_far => Acc,
+ mailbox => process_info(self(), messages),
+ expected_remaining => N
+ }}
+ end.
+
+create_rule_and_action_http(Config) ->
+ ConsumerName = ?config(consumer_name, Config),
+ BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, ConsumerName),
+ ActionFn = <<(atom_to_binary(?MODULE))/binary, ":action_response">>,
+ Params = #{
+ enable => true,
+ sql => <<"SELECT * FROM \"$bridges/", BridgeId/binary, "\"">>,
+ actions =>
+ [
+ #{
+ <<"function">> => <<"republish">>,
+ <<"args">> =>
+ #{
+ <<"topic">> => ?REPUBLISH_TOPIC,
+ <<"payload">> => <<>>,
+ <<"qos">> => 0,
+ <<"retain">> => false,
+ <<"user_properties">> => <<"${headers}">>
+ }
+ },
+ #{<<"function">> => ActionFn}
+ ]
+ },
+ Path = emqx_mgmt_api_test_util:api_path(["rules"]),
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+ ct:pal("rule action params: ~p", [Params]),
+ case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+ {ok, Res = #{<<"id">> := RuleId}} ->
+ on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+ {ok, emqx_utils_json:decode(Res, [return_maps])};
+ Error ->
+ Error
+ end.
+
+action_response(Selected, Envs, Args) ->
+ ?tp(action_response, #{
+ selected => Selected,
+ envs => Envs,
+ args => Args
+ }),
+ ok.
+
+assert_non_received_metrics(BridgeName) ->
+ Metrics = emqx_bridge:get_metrics(?BRIDGE_TYPE, BridgeName),
+ #{counters := Counters0, gauges := Gauges} = Metrics,
+ Counters = maps:remove(received, Counters0),
+ ?assert(lists:all(fun(V) -> V == 0 end, maps:values(Counters)), #{metrics => Metrics}),
+ ?assert(lists:all(fun(V) -> V == 0 end, maps:values(Gauges)), #{metrics => Metrics}),
+ ok.
+
+%%------------------------------------------------------------------------------
+%% Trace properties
+%%------------------------------------------------------------------------------
+
+prop_pulled_only_once(Trace) ->
+ PulledIds = ?projection(
+ message_id, ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace)
+ ),
+ NumPulled = length(PulledIds),
+ UniqueNumPulled = sets:size(sets:from_list(PulledIds, [{version, 2}])),
+ ?assertEqual(UniqueNumPulled, NumPulled),
+ ok.
+
+prop_all_pulled_are_acked(Trace) ->
+ PulledAckIds = ?projection(
+ ack_id, ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace)
+ ),
+ AckedIds0 = ?projection(ack_ids, ?of_kind(gcp_pubsub_consumer_worker_acknowledged, Trace)),
+ AckedIds = lists:flatten(AckedIds0),
+ ?assertEqual(
+ sets:from_list(PulledAckIds, [{version, 2}]),
+ sets:from_list(AckedIds, [{version, 2}])
+ ),
+ ok.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_consume_ok(Config) ->
+ BridgeName = ?config(consumer_name, Config),
+ TopicMapping = ?config(topic_mapping, Config),
+ ResourceId = resource_id(Config),
+ ?check_trace(
+ begin
+ start_and_subscribe_mqtt(Config),
+ ?assertMatch(
+ {{ok, _}, {ok, _}},
+ ?wait_async_action(
+ create_bridge(Config),
+ #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
+ 40_000
+ )
+ ),
+ [
+ #{
+ pubsub_topic := Topic,
+ mqtt_topic := MQTTTopic,
+ qos := QoS
+ }
+ ] = TopicMapping,
+ Payload0 = emqx_guid:to_hexstr(emqx_guid:gen()),
+ Messages0 = [
+ #{
+ <<"data">> => Data0 = #{<<"value">> => Payload0},
+ <<"attributes">> => Attributes0 = #{<<"key">> => <<"value">>},
+ <<"orderingKey">> => <<"some_ordering_key">>
+ }
+ ],
+ pubsub_publish(Config, Topic, Messages0),
+ {ok, Published0} = receive_published(),
+ EncodedData0 = emqx_utils_json:encode(Data0),
+ ?assertMatch(
+ [
+ #{
+ qos := QoS,
+ topic := MQTTTopic,
+ payload :=
+ #{
+ <<"attributes">> := Attributes0,
+ <<"message_id">> := MsgId,
+ <<"ordering_key">> := <<"some_ordering_key">>,
+ <<"publish_time">> := PubTime,
+ <<"topic">> := Topic,
+ <<"value">> := EncodedData0
+ }
+ }
+ ] when is_binary(MsgId) andalso is_binary(PubTime),
+ Published0
+ ),
+ %% no need to check return value; we check the property in
+ %% the check phase. this is just to give it a chance to do
+ %% so and avoid flakiness. should be fast.
+ ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}, 1_000),
+ ?retry(
+ _Interval = 200,
+ _NAttempts = 20,
+ ?assertEqual(1, emqx_resource_metrics:received_get(ResourceId))
+ ),
+
+ %% Batch with only data and only attributes
+ Payload1 = emqx_guid:to_hexstr(emqx_guid:gen()),
+ Messages1 = [
+ #{<<"data">> => Data1 = #{<<"val">> => Payload1}},
+ #{<<"attributes">> => Attributes1 = #{<<"other_key">> => <<"other_value">>}}
+ ],
+ pubsub_publish(Config, Topic, Messages1),
+ {ok, Published1} = receive_published(#{n => 2}),
+ EncodedData1 = emqx_utils_json:encode(Data1),
+ ?assertMatch(
+ [
+ #{
+ qos := QoS,
+ topic := MQTTTopic,
+ payload :=
+ #{
+ <<"message_id">> := _,
+ <<"publish_time">> := _,
+ <<"topic">> := Topic,
+ <<"value">> := EncodedData1
+ }
+ },
+ #{
+ qos := QoS,
+ topic := MQTTTopic,
+ payload :=
+ #{
+ <<"attributes">> := Attributes1,
+ <<"message_id">> := _,
+ <<"publish_time">> := _,
+ <<"topic">> := Topic
+ }
+ }
+ ],
+ Published1
+ ),
+ ?assertNotMatch(
+ [
+ #{payload := #{<<"attributes">> := _, <<"ordering_key">> := _}},
+ #{payload := #{<<"value">> := _, <<"ordering_key">> := _}}
+ ],
+ Published1
+ ),
+ %% no need to check return value; we check the property in
+ %% the check phase. this is just to give it a chance to do
+ %% so and avoid flakiness. should be fast.
+ ?block_until(
+ #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged, ack_ids := [_, _]}, 1_000
+ ),
+ ?retry(
+ _Interval = 200,
+ _NAttempts = 20,
+ ?assertEqual(3, emqx_resource_metrics:received_get(ResourceId))
+ ),
+
+ %% Check that the bridge probe API doesn't leak atoms.
+ ProbeRes0 = probe_bridge_api(Config),
+ ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
+ AtomsBefore = erlang:system_info(atom_count),
+ %% Probe again; shouldn't have created more atoms.
+ ProbeRes1 = probe_bridge_api(Config),
+ ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
+ AtomsAfter = erlang:system_info(atom_count),
+ ?assertEqual(AtomsBefore, AtomsAfter),
+
+ assert_non_received_metrics(BridgeName),
+
+ ok
+ end,
+ [
+ {"all pulled ack ids are acked", fun ?MODULE:prop_all_pulled_are_acked/1},
+ {"all pulled message ids are unique", fun ?MODULE:prop_pulled_only_once/1}
+ ]
+ ),
+ ok.
+
+t_bridge_rule_action_source(Config) ->
+ BridgeName = ?config(consumer_name, Config),
+ TopicMapping = ?config(topic_mapping, Config),
+ ResourceId = resource_id(Config),
+ ?check_trace(
+ begin
+ ?assertMatch(
+ {{ok, _}, {ok, _}},
+ ?wait_async_action(
+ create_bridge(Config),
+ #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
+ 40_000
+ )
+ ),
+ {ok, _} = create_rule_and_action_http(Config),
+
+ [#{pubsub_topic := PubSubTopic}] = TopicMapping,
+ {ok, C} = emqtt:start_link([{proto_ver, v5}]),
+ on_exit(fun() -> emqtt:stop(C) end),
+ {ok, _} = emqtt:connect(C),
+ {ok, _, [0]} = emqtt:subscribe(C, ?REPUBLISH_TOPIC),
+
+ Payload0 = emqx_guid:to_hexstr(emqx_guid:gen()),
+ Messages0 = [
+ #{
+ <<"data">> => Data0 = #{<<"payload">> => Payload0},
+ <<"attributes">> => Attributes0 = #{<<"key">> => <<"value">>}
+ }
+ ],
+ {_, {ok, _}} =
+ ?wait_async_action(
+ pubsub_publish(Config, PubSubTopic, Messages0),
+ #{?snk_kind := action_response},
+ 5_000
+ ),
+ Published0 = receive_published(),
+ EncodedData0 = emqx_utils_json:encode(Data0),
+ ?assertMatch(
+ {ok, [
+ #{
+ topic := ?REPUBLISH_TOPIC,
+ qos := 0,
+ payload := #{
+ <<"event">> := <<"$bridges/", _/binary>>,
+ <<"message_id">> := _,
+ <<"metadata">> := #{<<"rule_id">> := _},
+ <<"publish_time">> := _,
+ <<"topic">> := PubSubTopic,
+ <<"attributes">> := Attributes0,
+ <<"value">> := EncodedData0
+ }
+ }
+ ]},
+ Published0
+ ),
+ ?retry(
+ _Interval = 200,
+ _NAttempts = 20,
+ ?assertEqual(1, emqx_resource_metrics:received_get(ResourceId))
+ ),
+
+ assert_non_received_metrics(BridgeName),
+
+ #{payload => Payload0}
+ end,
+ [{"all pulled message ids are unique", fun ?MODULE:prop_pulled_only_once/1}]
+ ),
+ ok.
+
+%% TODO TEST:
+%% * multi-topic mapping
+%% * get status
+%% * 2+ pull workers do not duplicate delivered messages
+%% * inexistent topic
+%% * connection cut then restored
+%% * pull worker death
+%% * async worker death mid-pull
+%% * ensure subscription creation error
+%% * cluster subscription
+%% * connection down during pull
+%% * connection down during ack
+%% * topic deleted while consumer is running
+%% * subscription deleted while consumer is running
diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl
index 2b9584432..5b1e7c6c6 100644
--- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl
+++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl
@@ -74,6 +74,7 @@ init_per_suite(Config) ->
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
{ok, _} = application:ensure_all_started(emqx_connector),
emqx_mgmt_api_test_util:init_suite(),
+ persistent_term:put({emqx_bridge_gcp_pubsub_connector, transport}, tls),
Config.
end_per_suite(_Config) ->
@@ -81,6 +82,7 @@ end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
_ = application:stop(emqx_connector),
+ persistent_term:erase({emqx_bridge_gcp_pubsub_connector, transport}),
ok.
init_per_group(sync_query, Config) ->
@@ -276,7 +278,7 @@ gcp_pubsub_config(Config) ->
PayloadTemplate = proplists:get_value(payload_template, Config, ""),
PubSubTopic = proplists:get_value(pubsub_topic, Config, <<"mytopic">>),
PipelineSize = proplists:get_value(pipeline_size, Config, 100),
- ServiceAccountJSON = proplists:get_value(pubsub_topic, Config, generate_service_account_json()),
+ ServiceAccountJSON = emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
ServiceAccountJSONStr = emqx_utils_json:encode(ServiceAccountJSON),
GUID = emqx_guid:to_hexstr(emqx_guid:gen()),
Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>,
@@ -324,32 +326,6 @@ parse_and_check(ConfigString, Name) ->
#{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
Config.
-generate_service_account_json() ->
- PrivateKeyPEM = generate_private_key_pem(),
- service_account_json(PrivateKeyPEM).
-
-generate_private_key_pem() ->
- PublicExponent = 65537,
- Size = 2048,
- Key = public_key:generate_key({rsa, Size, PublicExponent}),
- DERKey = public_key:der_encode('PrivateKeyInfo', Key),
- public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]).
-
-service_account_json(PrivateKeyPEM) ->
- #{
- <<"type">> => <<"service_account">>,
- <<"project_id">> => <<"myproject">>,
- <<"private_key_id">> => <<"kid">>,
- <<"private_key">> => PrivateKeyPEM,
- <<"client_email">> => <<"test@myproject.iam.gserviceaccount.com">>,
- <<"client_id">> => <<"123812831923812319190">>,
- <<"auth_uri">> => <<"https://accounts.google.com/o/oauth2/auth">>,
- <<"token_uri">> => <<"https://oauth2.googleapis.com/token">>,
- <<"auth_provider_x509_cert_url">> => <<"https://www.googleapis.com/oauth2/v1/certs">>,
- <<"client_x509_cert_url">> =>
- <<"https://www.googleapis.com/robot/v1/metadata/x509/test%40myproject.iam.gserviceaccount.com">>
- }.
-
metrics_mapping() ->
#{
dropped => fun emqx_resource_metrics:dropped_get/1,
@@ -1019,7 +995,7 @@ t_publish_timeout(Config) ->
<<"pipelining">> => 1,
<<"resource_opts">> => #{
<<"batch_size">> => 1,
- <<"resume_interval">> => <<"15s">>
+ <<"resume_interval">> => <<"1s">>
}
}),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
@@ -1085,8 +1061,8 @@ do_econnrefused_or_timeout_test(Config, Error) ->
emqx:publish(Message),
{ok, _} = snabbkaffe:block_until(
?match_n_events(2, #{
- ?snk_kind := gcp_pubsub_response,
- query_mode := async
+ ?snk_kind := handle_async_reply_expired,
+ expired := [_]
}),
_Timeout1 = 15_000
)
@@ -1107,7 +1083,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
timeout ->
?assertMatch(
[_, _ | _],
- ?of_kind(gcp_pubsub_response, Trace)
+ ?of_kind(handle_async_reply_expired, Trace)
)
end,
ok
diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_utils.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_utils.erl
new file mode 100644
index 000000000..c3a516fb3
--- /dev/null
+++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_utils.erl
@@ -0,0 +1,34 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_utils).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+generate_service_account_json() ->
+ PrivateKeyPEM = generate_private_key_pem(),
+ service_account_json(PrivateKeyPEM).
+
+generate_private_key_pem() ->
+ PublicExponent = 65537,
+ Size = 2048,
+ Key = public_key:generate_key({rsa, Size, PublicExponent}),
+ DERKey = public_key:der_encode('PrivateKeyInfo', Key),
+ public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]).
+
+service_account_json(PrivateKeyPEM) ->
+ #{
+ <<"type">> => <<"service_account">>,
+ <<"project_id">> => <<"myproject">>,
+ <<"private_key_id">> => <<"kid">>,
+ <<"private_key">> => PrivateKeyPEM,
+ <<"client_email">> => <<"test@myproject.iam.gserviceaccount.com">>,
+ <<"client_id">> => <<"123812831923812319190">>,
+ <<"auth_uri">> => <<"https://accounts.google.com/o/oauth2/auth">>,
+ <<"token_uri">> => <<"https://oauth2.googleapis.com/token">>,
+ <<"auth_provider_x509_cert_url">> => <<"https://www.googleapis.com/oauth2/v1/certs">>,
+ <<"client_x509_cert_url">> =>
+ <<"https://www.googleapis.com/robot/v1/metadata/x509/test%40myproject.iam.gserviceaccount.com">>
+ }.
diff --git a/apps/emqx_utils/src/emqx_utils_maps.erl b/apps/emqx_utils/src/emqx_utils_maps.erl
index d1caf1f1e..d49c90e53 100644
--- a/apps/emqx_utils/src/emqx_utils_maps.erl
+++ b/apps/emqx_utils/src/emqx_utils_maps.erl
@@ -32,7 +32,8 @@
deep_convert/3,
diff_maps/2,
best_effort_recursive_sum/3,
- if_only_to_toggle_enable/2
+ if_only_to_toggle_enable/2,
+ update_if_present/3
]).
-export_type([config_key/0, config_key_path/0]).
@@ -293,3 +294,12 @@ if_only_to_toggle_enable(OldConf, Conf) ->
{_, _, _} ->
false
end.
+
+%% Like `maps:update_with', but does nothing if key does not exist.
+update_if_present(Key, Fun, Map) ->
+ case Map of
+ #{Key := Val} ->
+ Map#{Key := Fun(Val)};
+ _ ->
+ Map
+ end.
diff --git a/changes/ee/feat-11090.en.md b/changes/ee/feat-11090.en.md
new file mode 100644
index 000000000..153dbac5f
--- /dev/null
+++ b/changes/ee/feat-11090.en.md
@@ -0,0 +1 @@
+Implemented GCP PubSub Consumer data integration bridge.
diff --git a/changes/ee/fix-11090.en.md b/changes/ee/fix-11090.en.md
new file mode 100644
index 000000000..faef72ae3
--- /dev/null
+++ b/changes/ee/fix-11090.en.md
@@ -0,0 +1 @@
+Fixed a configuration that prevented the pipelining option from being correctly set for GCP PubSub Producer bridge.
diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
index e56a96e4f..ab23cbc02 100644
--- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
+++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
@@ -16,7 +16,8 @@ api_schemas(Method) ->
[
%% We need to map the `type' field of a request (binary) to a
%% bridge schema module.
- api_ref(emqx_bridge_gcp_pubsub, <<"gcp_pubsub">>, Method),
+ api_ref(emqx_bridge_gcp_pubsub, <<"gcp_pubsub">>, Method ++ "_producer"),
+ api_ref(emqx_bridge_gcp_pubsub, <<"gcp_pubsub_consumer">>, Method ++ "_consumer"),
api_ref(emqx_bridge_kafka, <<"kafka_consumer">>, Method ++ "_consumer"),
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
@@ -92,6 +93,7 @@ resource_type(kafka) -> emqx_bridge_kafka_impl_producer;
resource_type(cassandra) -> emqx_bridge_cassandra_connector;
resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
resource_type(gcp_pubsub) -> emqx_bridge_gcp_pubsub_impl_producer;
+resource_type(gcp_pubsub_consumer) -> emqx_bridge_gcp_pubsub_impl_consumer;
resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
resource_type(mongodb_sharded) -> emqx_ee_connector_mongodb;
resource_type(mongodb_single) -> emqx_ee_connector_mongodb;
@@ -125,14 +127,6 @@ fields(bridges) ->
required => false
}
)},
- {gcp_pubsub,
- mk(
- hoconsc:map(name, ref(emqx_bridge_gcp_pubsub, "config")),
- #{
- desc => <<"EMQX Enterprise Config">>,
- required => false
- }
- )},
{mysql,
mk(
hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")),
@@ -198,7 +192,8 @@ fields(bridges) ->
required => false
}
)}
- ] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++
+ ] ++ kafka_structs() ++ pulsar_structs() ++ gcp_pubsub_structs() ++ mongodb_structs() ++
+ influxdb_structs() ++
redis_structs() ++
pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs().
@@ -249,6 +244,26 @@ pulsar_structs() ->
)}
].
+gcp_pubsub_structs() ->
+ [
+ {gcp_pubsub,
+ mk(
+ hoconsc:map(name, ref(emqx_bridge_gcp_pubsub, "config_producer")),
+ #{
+ desc => <<"EMQX Enterprise Config">>,
+ required => false
+ }
+ )},
+ {gcp_pubsub_consumer,
+ mk(
+ hoconsc:map(name, ref(emqx_bridge_gcp_pubsub, "config_consumer")),
+ #{
+ desc => <<"EMQX Enterprise Config">>,
+ required => false
+ }
+ )}
+ ].
+
influxdb_structs() ->
[
{Protocol,
diff --git a/mix.exs b/mix.exs
index e53f0c224..cb7c428e7 100644
--- a/mix.exs
+++ b/mix.exs
@@ -49,7 +49,7 @@ defmodule EMQXUmbrella.MixProject do
{:redbug, "2.0.8"},
{:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true},
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
- {:ehttpc, github: "emqx/ehttpc", tag: "0.4.10", override: true},
+ {:ehttpc, github: "emqx/ehttpc", tag: "0.4.11", override: true},
{:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true},
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
diff --git a/rebar.config b/rebar.config
index f6830f83b..525161cf5 100644
--- a/rebar.config
+++ b/rebar.config
@@ -56,7 +56,7 @@
, {gpb, "4.19.7"}
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}}
- , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.10"}}}
+ , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.11"}}}
, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
diff --git a/rel/i18n/emqx_bridge_gcp_pubsub.hocon b/rel/i18n/emqx_bridge_gcp_pubsub.hocon
index ca6b855dc..39c4b7417 100644
--- a/rel/i18n/emqx_bridge_gcp_pubsub.hocon
+++ b/rel/i18n/emqx_bridge_gcp_pubsub.hocon
@@ -71,4 +71,52 @@ When a GCP Service Account is created (as described in https://developers.google
service_account_json.label:
"""GCP Service Account Credentials"""
+ consumer_opts {
+ desc: "Local MQTT publish and GCP PubSub consumer configs."
+ label: "GCP PubSub to MQTT"
+ }
+
+ consumer_pull_max_messages {
+ desc: "The maximum number of messages to retrieve from GCP PubSub in a single pull request."
+ " The actual number may be less than the specified value."
+ label: "Maximum Messages to Pull"
+ }
+
+ consumer_topic_mapping {
+ desc: "Defines the mapping between GCP PubSub topics and MQTT topics. Must contain at least one item."
+ label: "Topic Mapping"
+ }
+
+ consumer_pubsub_topic {
+ desc: "GCP PubSub topic to consume from."
+ label: "GCP PubSub"
+ }
+
+ consumer_mqtt_topic {
+ desc: "Local topic to which consumed GCP PubSub messages should be published to."
+ label: "MQTT Topic"
+ }
+
+ consumer_mqtt_qos {
+ desc: "MQTT QoS used to publish messages consumed from GCP PubSub."
+ label: "QoS"
+ }
+
+consumer_mqtt_payload.desc:
+"""The template for transforming the incoming GCP PubSub message. By default, it will use JSON format to serialize inputs from the GCP PubSub message. Available fields are:
+message_id
: the message ID assigned by GCP PubSub.
+publish_time
: message timestamp assigned by GCP PubSub.
+topic
: GCP PubSub topic.
+value
: the payload of the GCP PubSub message. Omitted if there's no payload.
+attributes
: an object containing string key-value pairs. Omitted if there are no attributes.
+ordering_key
: GCP PubSub message ordering key. Omitted if there's none."""
+
+consumer_mqtt_payload.label:
+"Payload Template"
+
+ consumer {
+ desc: "GCP PubSub Consumer configuration."
+ label: "GCP PubSub Consumer"
+ }
+
}
diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh
index 135eec919..01e6cf329 100755
--- a/scripts/ct/run.sh
+++ b/scripts/ct/run.sh
@@ -213,6 +213,9 @@ for dep in ${CT_DEPS}; do
FILES+=( '.ci/docker-compose-file/docker-compose-minio-tcp.yaml'
'.ci/docker-compose-file/docker-compose-minio-tls.yaml' )
;;
+ gcp_emulator)
+ FILES+=( '.ci/docker-compose-file/docker-compose-gcp-emulator.yaml' )
+ ;;
*)
echo "unknown_ct_dependency $dep"
exit 1