feat(gcp_pubsub): add `local_topic` config

Given the implicit convention that an egress bridge containing the
`local_topic` config will forward messages without the need for a rule
action, this was added to avoid needing a rule action.
This commit is contained in:
Thales Macedo Garitezi 2022-12-06 10:43:55 -03:00
parent ac048dbafa
commit b9bc82f87a
4 changed files with 78 additions and 18 deletions

View File

@ -51,6 +51,7 @@
-define(EGRESS_DIR_BRIDGES(T),
T == webhook;
T == mysql;
T == gcp_pubsub;
T == influxdb_api_v1;
T == influxdb_api_v2
).

View File

@ -97,6 +97,24 @@ emqx_ee_bridge_gcp_pubsub {
}
}
local_topic {
desc {
en: """The MQTT topic filter to be forwarded to GCP PubSub. All MQTT 'PUBLISH' messages with the topic
matching `local_topic` will be forwarded.</br>
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
configured, then both the data got from the rule and the MQTT messages that match local_topic
will be forwarded.
"""
zh: """发送到 'local_topic' 的消息都会转发到 GCP PubSub。 </br>
注意:如果这个 Bridge 被用作规则EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发到 GCP PubSub。
"""
}
label {
en: "Local Topic"
zh: "本地 Topic"
}
}
pubsub_topic {
desc {
en: "The GCP PubSub topic to publish messages to."

View File

@ -96,6 +96,13 @@ fields(bridge_config) ->
desc => ?DESC("payload_template")
}
)},
{local_topic,
sc(
binary(),
#{
desc => ?DESC("local_topic")
}
)},
{pubsub_topic,
sc(
binary(),

View File

@ -37,25 +37,22 @@ groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
SimpleTCs = single_config_tests(),
MatrixTCs = TCs -- SimpleTCs,
SynchronyGroups = [
{group, sync_query},
{group, async_query}
],
QueueGroups = [
{group, queue_enabled},
{group, queue_disabled}
],
ResourceGroups = [{group, gcp_pubsub}],
[
{with_batch, [
{group, sync_query},
{group, async_query}
]},
{without_batch, [
{group, sync_query},
{group, async_query}
]},
{sync_query, [
{group, queue_enabled},
{group, queue_disabled}
]},
{async_query, [
{group, queue_enabled},
{group, queue_disabled}
]},
{queue_enabled, [{group, gcp_pubsub}]},
{queue_disabled, [{group, gcp_pubsub}]},
{with_batch, SynchronyGroups},
{without_batch, SynchronyGroups},
{sync_query, QueueGroups},
{async_query, QueueGroups},
{queue_enabled, ResourceGroups},
{queue_disabled, ResourceGroups},
{gcp_pubsub, MatrixTCs}
].
@ -603,6 +600,43 @@ t_publish_success(Config) ->
),
ok.
t_publish_success_local_topic(Config) ->
ResourceId = ?config(resource_id, Config),
ServiceAccountJSON = ?config(service_account_json, Config),
TelemetryTable = ?config(telemetry_table, Config),
LocalTopic = <<"local/topic">>,
{ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}),
assert_empty_metrics(ResourceId),
Payload = <<"payload">>,
Message = emqx_message:make(LocalTopic, Payload),
emqx:publish(Message),
DecodedMessages = assert_http_request(ServiceAccountJSON),
?assertMatch(
[
#{
<<"topic">> := LocalTopic,
<<"payload">> := Payload
}
],
DecodedMessages
),
%% to avoid test flakiness
wait_telemetry_event(TelemetryTable, success, ResourceId),
assert_metrics(
#{
batching => 0,
dropped => 0,
failed => 0,
inflight => 0,
matched => 1,
queuing => 0,
retried => 0,
success => 1
},
ResourceId
),
ok.
t_create_via_http(Config) ->
?check_trace(
create_bridge_http(Config),