From 82b8538041fe1ae0d472fe7645ba73b502f0fb59 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 28 Jul 2023 15:01:09 -0300 Subject: [PATCH] feat(gcp_producer): add support for defining message attributes and ordering key Fixes https://emqx.atlassian.net/browse/EMQX-10652 --- apps/emqx/rebar.config | 2 +- .../src/emqx_bridge_gcp_pubsub.app.src | 2 +- .../src/emqx_bridge_gcp_pubsub.erl | 30 +++ .../emqx_bridge_gcp_pubsub_impl_producer.erl | 85 ++++++++- .../emqx_bridge_gcp_pubsub_producer_SUITE.erl | 173 +++++++++++++++++- .../test/emqx_bridge_gcp_pubsub_tests.erl | 149 +++++++++++++++ changes/ee/feat-11403.en.md | 3 + mix.exs | 2 +- rebar.config | 2 +- rel/i18n/emqx_bridge_gcp_pubsub.hocon | 42 +++++ 10 files changed, 475 insertions(+), 15 deletions(-) create mode 100644 apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_tests.erl create mode 100644 changes/ee/feat-11403.en.md diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index c2dfccad6..355f005a8 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -30,7 +30,7 @@ {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.10"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, - {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}}, + {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index 9faf65860..c7dcea5c0 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl index b3792da71..d1e827d84 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl @@ -113,6 +113,22 @@ fields(connector_config) -> ]; fields(producer) -> [ + {attributes_template, + sc( + hoconsc:array(ref(key_value_pair)), + #{ + default => [], + desc => ?DESC("attributes_template") + } + )}, + {ordering_key_template, + sc( + binary(), + #{ + default => <<>>, + desc => ?DESC("ordering_key_template") + } + )}, {payload_template, sc( binary(), @@ -203,6 +219,18 @@ fields("consumer_resource_opts") -> fun({Field, _Sc}) -> lists:member(Field, SupportedFields) end, ResourceFields ); +fields(key_value_pair) -> + [ + {key, + mk(binary(), #{ + required => true, + validator => [ + emqx_resource_validator:not_empty("Key templates must not be empty") + ], + desc => ?DESC(kv_pair_key) + })}, + {value, mk(binary(), #{required => true, desc => ?DESC(kv_pair_value)})} + ]; fields("get_producer") -> emqx_bridge_schema:status_fields() ++ fields("post_producer"); fields("post_producer") -> @@ -218,6 +246,8 @@ fields("put_consumer") -> desc("config_producer") -> ?DESC("desc_config"); +desc(key_value_pair) -> + ?DESC("kv_pair_desc"); desc("config_consumer") -> ?DESC("desc_config"); desc("consumer_resource_opts") -> diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index b1ded2121..f80fdc333 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -9,15 +9,20 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -type config() :: #{ + attributes_template := [#{key := binary(), value := binary()}], connect_timeout := emqx_schema:duration_ms(), max_retries := non_neg_integer(), + ordering_key_template := binary(), + payload_template := binary(), pubsub_topic := binary(), resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}, service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(), any() => term() }. -type state() :: #{ + attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}, client := emqx_bridge_gcp_pubsub_client:state(), + ordering_key_template := emqx_placeholder:tmpl_token(), payload_template := emqx_placeholder:tmpl_token(), project_id := emqx_bridge_gcp_pubsub_client:project_id(), pubsub_topic := binary() @@ -57,6 +62,8 @@ on_start(InstanceId, Config0) -> }), Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0), #{ + attributes_template := AttributesTemplate, + ordering_key_template := OrderingKeyTemplate, payload_template := PayloadTemplate, pubsub_topic := PubSubTopic, service_account_json := #{<<"project_id">> := ProjectId} @@ -65,6 +72,8 @@ on_start(InstanceId, Config0) -> {ok, Client} -> State = #{ client => Client, + attributes_template => preproc_attributes(AttributesTemplate), + ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate), payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), project_id => ProjectId, pubsub_topic => PubSubTopic @@ -197,14 +206,76 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs0) -> Request, ReplyFunAndArgs, Client ). --spec encode_payload(state(), Selected :: map()) -> #{data := binary()}. -encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) -> - Interpolated = - case PayloadTemplate of - [] -> emqx_utils_json:encode(Selected); - _ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected) +-spec encode_payload(state(), Selected :: map()) -> + #{ + data := binary(), + attributes => #{binary() => binary()}, + 'orderingKey' => binary() + }. +encode_payload(State, Selected) -> + #{ + attributes_template := AttributesTemplate, + ordering_key_template := OrderingKeyTemplate, + payload_template := PayloadTemplate + } = State, + Data = render_payload(PayloadTemplate, Selected), + OrderingKey = render(OrderingKeyTemplate, Selected), + Attributes = proc_attributes(AttributesTemplate, Selected), + Payload0 = #{data => base64:encode(Data)}, + Payload1 = put_if(Payload0, attributes, Attributes, map_size(Attributes) > 0), + put_if(Payload1, 'orderingKey', OrderingKey, OrderingKey =/= <<>>). + +put_if(Acc, K, V, true) -> + Acc#{K => V}; +put_if(Acc, _K, _V, false) -> + Acc. + +-spec render_payload(emqx_placeholder:tmpl_token(), map()) -> binary(). +render_payload([] = _Template, Selected) -> + emqx_utils_json:encode(Selected); +render_payload(Template, Selected) -> + render(Template, Selected). + +render(Template, Selected) -> + Opts = #{ + return => full_binary, + var_trans => fun + (undefined) -> <<>>; + (X) -> emqx_utils_conv:bin(X) + end + }, + emqx_placeholder:proc_tmpl(Template, Selected, Opts). + +-spec preproc_attributes([#{key := binary(), value := binary()}]) -> + #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}. +preproc_attributes(AttributesTemplate) -> + lists:foldl( + fun(#{key := K, value := V}, Acc) -> + KT = emqx_placeholder:preproc_tmpl(K), + VT = emqx_placeholder:preproc_tmpl(V), + Acc#{KT => VT} end, - #{data => base64:encode(Interpolated)}. + #{}, + AttributesTemplate + ). + +-spec proc_attributes(#{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}, map()) -> + #{binary() => binary()}. +proc_attributes(AttributesTemplate, Selected) -> + maps:fold( + fun(KT, VT, Acc) -> + K = render(KT, Selected), + case K =:= <<>> of + true -> + Acc; + false -> + V = render(VT, Selected), + Acc#{K => V} + end + end, + #{}, + AttributesTemplate + ). -spec to_pubsub_request([#{data := binary()}]) -> binary(). to_pubsub_request(Payloads) -> diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index a9bbf6178..74affb9c8 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -63,7 +63,8 @@ single_config_tests() -> t_get_status_down, t_get_status_no_worker, t_get_status_timeout_calling_workers, - t_on_start_ehttpc_pool_already_started + t_on_start_ehttpc_pool_already_started, + t_attributes ]. only_sync_tests() -> @@ -212,7 +213,9 @@ create_bridge_http(Config, GCPPubSubConfigOverrides) -> Error end, ct:pal("bridge creation result: ~p", [Res]), - ?assertEqual(element(1, ProbeResult), element(1, Res)), + ?assertEqual(element(1, ProbeResult), element(1, Res), #{ + creation_result => Res, probe_result => ProbeResult + }), case ProbeResult of {error, {{_, 500, _}, _, _}} -> error({bad_probe_result, ProbeResult}); _ -> ok @@ -456,6 +459,7 @@ assert_valid_request_headers(Headers, ServiceAccountJSON) -> assert_valid_request_body(Body) -> BodyMap = emqx_utils_json:decode(Body, [return_maps]), ?assertMatch(#{<<"messages">> := [_ | _]}, BodyMap), + ct:pal("request: ~p", [BodyMap]), #{<<"messages">> := Messages} = BodyMap, lists:map( fun(Msg) -> @@ -480,6 +484,25 @@ assert_http_request(ServiceAccountJSON) -> error({timeout, #{mailbox => Mailbox}}) end. +receive_http_request(ServiceAccountJSON) -> + receive + {http, Headers, Body} -> + assert_valid_request_headers(Headers, ServiceAccountJSON), + #{<<"messages">> := Msgs} = emqx_utils_json:decode(Body, [return_maps]), + lists:map( + fun(Msg) -> + #{<<"data">> := Content64} = Msg, + Content = base64:decode(Content64), + Decoded = emqx_utils_json:decode(Content, [return_maps]), + Msg#{<<"data">> := Decoded} + end, + Msgs + ) + after 5_000 -> + {messages, Mailbox} = process_info(self(), messages), + error({timeout, #{mailbox => Mailbox}}) + end. + install_telemetry_handler(TestCase) -> Tid = ets:new(TestCase, [ordered_set, public]), HandlerId = TestCase, @@ -585,8 +608,8 @@ t_publish_success(Config) -> <<"topic">> := Topic, <<"payload">> := Payload, <<"metadata">> := #{<<"rule_id">> := RuleId} - } - ], + } = Msg + ] when not (is_map_key(<<"attributes">>, Msg) orelse is_map_key(<<"orderingKey">>, Msg)), DecodedMessages ), %% to avoid test flakiness @@ -1524,3 +1547,145 @@ t_query_sync(Config) -> [] ), ok. + +t_attributes(Config) -> + Name = ?config(gcp_pubsub_name, Config), + ServiceAccountJSON = ?config(service_account_json, Config), + LocalTopic = <<"t/topic">>, + ?check_trace( + begin + {ok, _} = create_bridge_http( + Config, + #{ + <<"local_topic">> => LocalTopic, + <<"attributes_template">> => + [ + #{ + <<"key">> => <<"${.payload.key}">>, + <<"value">> => <<"fixed_value">> + }, + #{ + <<"key">> => <<"${.payload.key}2">>, + <<"value">> => <<"${.payload.value}">> + }, + #{ + <<"key">> => <<"fixed_key">>, + <<"value">> => <<"fixed_value">> + }, + #{ + <<"key">> => <<"fixed_key2">>, + <<"value">> => <<"${.payload.value}">> + } + ], + <<"ordering_key_template">> => <<"${.payload.ok}">> + } + ), + %% without ordering key + Payload0 = + emqx_utils_json:encode( + #{ + <<"value">> => <<"payload_value">>, + <<"key">> => <<"payload_key">> + } + ), + Message0 = emqx_message:make(LocalTopic, Payload0), + emqx:publish(Message0), + DecodedMessages0 = receive_http_request(ServiceAccountJSON), + ?assertMatch( + [ + #{ + <<"attributes">> := + #{ + <<"fixed_key">> := <<"fixed_value">>, + <<"fixed_key2">> := <<"payload_value">>, + <<"payload_key">> := <<"fixed_value">>, + <<"payload_key2">> := <<"payload_value">> + }, + <<"data">> := #{ + <<"topic">> := _, + <<"payload">> := _ + } + } = Msg + ] when not is_map_key(<<"orderingKey">>, Msg), + DecodedMessages0 + ), + %% with ordering key + Payload1 = + emqx_utils_json:encode( + #{ + <<"value">> => <<"payload_value">>, + <<"key">> => <<"payload_key">>, + <<"ok">> => <<"ordering_key">> + } + ), + Message1 = emqx_message:make(LocalTopic, Payload1), + emqx:publish(Message1), + DecodedMessages1 = receive_http_request(ServiceAccountJSON), + ?assertMatch( + [ + #{ + <<"attributes">> := + #{ + <<"fixed_key">> := <<"fixed_value">>, + <<"fixed_key2">> := <<"payload_value">>, + <<"payload_key">> := <<"fixed_value">>, + <<"payload_key2">> := <<"payload_value">> + }, + <<"orderingKey">> := <<"ordering_key">>, + <<"data">> := #{ + <<"topic">> := _, + <<"payload">> := _ + } + } + ], + DecodedMessages1 + ), + %% will result in empty key + Payload2 = + emqx_utils_json:encode( + #{ + <<"value">> => <<"payload_value">>, + <<"ok">> => <<"ordering_key">> + } + ), + Message2 = emqx_message:make(LocalTopic, Payload2), + emqx:publish(Message2), + [DecodedMessage2] = receive_http_request(ServiceAccountJSON), + ?assertEqual( + #{ + <<"fixed_key">> => <<"fixed_value">>, + <<"fixed_key2">> => <<"payload_value">>, + <<"2">> => <<"payload_value">> + }, + maps:get(<<"attributes">>, DecodedMessage2) + ), + %% ensure loading cluster override file doesn't mangle the attribute + %% placeholders... + #{<<"bridges">> := #{?BRIDGE_TYPE_BIN := #{Name := RawConf}}} = + emqx_config:read_override_conf(#{override_to => cluster}), + ?assertEqual( + [ + #{ + <<"key">> => <<"${.payload.key}">>, + <<"value">> => <<"fixed_value">> + }, + #{ + <<"key">> => <<"${.payload.key}2">>, + <<"value">> => <<"${.payload.value}">> + }, + #{ + <<"key">> => <<"fixed_key">>, + <<"value">> => <<"fixed_value">> + }, + #{ + <<"key">> => <<"fixed_key2">>, + <<"value">> => <<"${.payload.value}">> + } + ], + maps:get(<<"attributes_template">>, RawConf) + ), + ok + end, + [] + ), + ok. diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_tests.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_tests.erl new file mode 100644 index 000000000..885754470 --- /dev/null +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_tests.erl @@ -0,0 +1,149 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_gcp_pubsub_tests). + +-include_lib("eunit/include/eunit.hrl"). + +%%=========================================================================== +%% Data section +%%=========================================================================== + +%% erlfmt-ignore +gcp_pubsub_producer_hocon() -> +""" +bridges.gcp_pubsub.my_producer { + attributes_template = [ + {key = \"${payload.key}\", value = fixed_value} + {key = \"${payload.key}2\", value = \"${.payload.value}\"} + {key = fixed_key, value = fixed_value} + {key = fixed_key2, value = \"${.payload.value}\"} + ] + connect_timeout = 15s + enable = false + local_topic = \"t/gcp/produ\" + max_retries = 2 + ordering_key_template = \"${.payload.ok}\" + payload_template = \"${.}\" + pipelining = 100 + pool_size = 8 + pubsub_topic = my-topic + resource_opts { + batch_size = 1 + batch_time = 0ms + health_check_interval = 15s + inflight_window = 100 + max_buffer_bytes = 256MB + query_mode = async + request_ttl = 15s + start_after_created = true + start_timeout = 5s + worker_pool_size = 16 + } + service_account_json { + auth_provider_x509_cert_url = \"https://www.googleapis.com/oauth2/v1/certs\" + auth_uri = \"https://accounts.google.com/o/oauth2/auth\" + client_email = \"test@myproject.iam.gserviceaccount.com\" + client_id = \"123812831923812319190\" + client_x509_cert_url = \"https://www.googleapis.com/robot/v1/metadata/x509/...\" + private_key = \"-----BEGIN PRIVATE KEY-----...\" + private_key_id = \"kid\" + project_id = myproject + token_uri = \"https://oauth2.googleapis.com/token\" + type = service_account + } +} +""". + +%%=========================================================================== +%% Helper functions +%%=========================================================================== + +parse(Hocon) -> + {ok, Conf} = hocon:binary(Hocon), + Conf. + +check(Conf) when is_map(Conf) -> + hocon_tconf:check_plain(emqx_bridge_schema, Conf). + +-define(validation_error(Reason, Value), + {emqx_bridge_schema, [ + #{ + kind := validation_error, + reason := Reason, + value := Value + } + ]} +). + +-define(ok_config(Cfg), #{ + <<"bridges">> := + #{ + <<"gcp_pubsub">> := + #{ + <<"my_producer">> := + Cfg + } + } +}). + +%%=========================================================================== +%% Test cases +%%=========================================================================== + +producer_attributes_validator_test_() -> + %% ensure this module is loaded when testing only this file + _ = emqx_bridge_enterprise:module_info(), + BaseConf = parse(gcp_pubsub_producer_hocon()), + Override = fun(Cfg) -> + emqx_utils_maps:deep_merge( + BaseConf, + #{ + <<"bridges">> => + #{ + <<"gcp_pubsub">> => + #{<<"my_producer">> => Cfg} + } + } + ) + end, + [ + {"base config", + ?_assertMatch( + ?ok_config(#{ + <<"attributes_template">> := [_, _, _, _] + }), + check(BaseConf) + )}, + {"empty key template", + ?_assertThrow( + ?validation_error("Key templates must not be empty", _), + check( + Override(#{ + <<"attributes_template">> => [ + #{ + <<"key">> => <<>>, + <<"value">> => <<"some_value">> + } + ] + }) + ) + )}, + {"empty value template", + ?_assertMatch( + ?ok_config(#{ + <<"attributes_template">> := [_] + }), + check( + Override(#{ + <<"attributes_template">> => [ + #{ + <<"key">> => <<"some_key">>, + <<"value">> => <<>> + } + ] + }) + ) + )} + ]. diff --git a/changes/ee/feat-11403.en.md b/changes/ee/feat-11403.en.md new file mode 100644 index 000000000..9942a2490 --- /dev/null +++ b/changes/ee/feat-11403.en.md @@ -0,0 +1,3 @@ +Added support for defining message attributes and ordering key templates for GCP PubSub Producer bridge. + +Also updated our HOCON library to fix an issue where objects in an array were being concatenated even if they lay on different lines. diff --git a/mix.exs b/mix.exs index 3d29642ba..2580cc3d8 100644 --- a/mix.exs +++ b/mix.exs @@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true}, - {:hocon, github: "emqx/hocon", tag: "0.39.14", override: true}, + {:hocon, github: "emqx/hocon", tag: "0.39.16", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, diff --git a/rebar.config b/rebar.config index dd1e139f8..2d605c18f 100644 --- a/rebar.config +++ b/rebar.config @@ -75,7 +75,7 @@ , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} diff --git a/rel/i18n/emqx_bridge_gcp_pubsub.hocon b/rel/i18n/emqx_bridge_gcp_pubsub.hocon index 39c4b7417..b5dffec1f 100644 --- a/rel/i18n/emqx_bridge_gcp_pubsub.hocon +++ b/rel/i18n/emqx_bridge_gcp_pubsub.hocon @@ -46,6 +46,18 @@ payload_template.desc: payload_template.label: """Payload template""" +attributes_template.desc: +"""The template for formatting the outgoing message attributes. Undefined values will be rendered as empty string values. Empty keys are removed from the attribute map.""" + +attributes_template.label: +"""Attributes template""" + +ordering_key_template.desc: +"""The template for formatting the outgoing message ordering key. Undefined values will be rendered as empty string values. This value will not be added to the message if it's empty.""" + +ordering_key_template.label: +"""Ordering Key template""" + pipelining.desc: """A positive integer. Whether to send HTTP requests continuously, when set to 1, it means that after each HTTP request is sent, you need to wait for the server to return and then continue to send the next request.""" @@ -64,6 +76,36 @@ pubsub_topic.desc: pubsub_topic.label: """GCP PubSub Topic""" +producer_attributes.desc: +"""List of key-value pairs representing templates to construct the attributes for a given GCP PubSub message. Both keys and values support the placeholder `${var_name}` notation. Keys that are undefined or resolve to an empty string are omitted from the attribute map.""" + +producer_attributes.label: +"""Attributes Template""" + +producer_ordering_key.desc: +"""Template for the Ordering Key of a given GCP PubSub message. If the resolved value is undefined or an empty string, the ordering key property is omitted from the message.""" + +producer_ordering_key.label: +"""Ordering Key Template""" + +kv_pair_desc.desc: +"""Key-value pair.""" + +kv_pair_desc.label: +"""Key-value pair""" + +kv_pair_key.desc: +"""Key""" + +kv_pair_key.label: +"""Key""" + +kv_pair_value.desc: +"""Value""" + +kv_pair_value.label: +"""Value""" + service_account_json.desc: """JSON containing the GCP Service Account credentials to be used with PubSub. When a GCP Service Account is created (as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), you have the option of downloading the credentials in JSON form. That's the file needed."""