diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index ca03cb2e5..321f8a2ae 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -51,6 +51,7 @@ -define(EGRESS_DIR_BRIDGES(T), T == webhook; T == mysql; + T == gcp_pubsub; T == influxdb_api_v1; T == influxdb_api_v2 ). diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf index 9e85e2a18..65aa354f7 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf @@ -97,6 +97,24 @@ emqx_ee_bridge_gcp_pubsub { } } + local_topic { + desc { + en: """The MQTT topic filter to be forwarded to GCP PubSub. All MQTT 'PUBLISH' messages with the topic +matching `local_topic` will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded. +""" + zh: """发送到 'local_topic' 的消息都会转发到 GCP PubSub。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发到 GCP PubSub。 +""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + pubsub_topic { desc { en: "The GCP PubSub topic to publish messages to." diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl index 9d0354916..63566bed9 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl @@ -96,6 +96,13 @@ fields(bridge_config) -> desc => ?DESC("payload_template") } )}, + {local_topic, + sc( + binary(), + #{ + desc => ?DESC("local_topic") + } + )}, {pubsub_topic, sc( binary(), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index cce98b6e7..7c59bff57 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -37,25 +37,22 @@ groups() -> TCs = emqx_common_test_helpers:all(?MODULE), SimpleTCs = single_config_tests(), MatrixTCs = TCs -- SimpleTCs, + SynchronyGroups = [ + {group, sync_query}, + {group, async_query} + ], + QueueGroups = [ + {group, queue_enabled}, + {group, queue_disabled} + ], + ResourceGroups = [{group, gcp_pubsub}], [ - {with_batch, [ - {group, sync_query}, - {group, async_query} - ]}, - {without_batch, [ - {group, sync_query}, - {group, async_query} - ]}, - {sync_query, [ - {group, queue_enabled}, - {group, queue_disabled} - ]}, - {async_query, [ - {group, queue_enabled}, - {group, queue_disabled} - ]}, - {queue_enabled, [{group, gcp_pubsub}]}, - {queue_disabled, [{group, gcp_pubsub}]}, + {with_batch, SynchronyGroups}, + {without_batch, SynchronyGroups}, + {sync_query, QueueGroups}, + {async_query, QueueGroups}, + {queue_enabled, ResourceGroups}, + {queue_disabled, ResourceGroups}, {gcp_pubsub, MatrixTCs} ]. @@ -603,6 +600,43 @@ t_publish_success(Config) -> ), ok. +t_publish_success_local_topic(Config) -> + ResourceId = ?config(resource_id, Config), + ServiceAccountJSON = ?config(service_account_json, Config), + TelemetryTable = ?config(telemetry_table, Config), + LocalTopic = <<"local/topic">>, + {ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}), + assert_empty_metrics(ResourceId), + Payload = <<"payload">>, + Message = emqx_message:make(LocalTopic, Payload), + emqx:publish(Message), + DecodedMessages = assert_http_request(ServiceAccountJSON), + ?assertMatch( + [ + #{ + <<"topic">> := LocalTopic, + <<"payload">> := Payload + } + ], + DecodedMessages + ), + %% to avoid test flakiness + wait_telemetry_event(TelemetryTable, success, ResourceId), + assert_metrics( + #{ + batching => 0, + dropped => 0, + failed => 0, + inflight => 0, + matched => 1, + queuing => 0, + retried => 0, + success => 1 + }, + ResourceId + ), + ok. + t_create_via_http(Config) -> ?check_trace( create_bridge_http(Config),