feat(gcp_producer): add support for defining message attributes and ordering key

Fixes https://emqx.atlassian.net/browse/EMQX-10652
This commit is contained in:
Thales Macedo Garitezi 2023-07-28 15:01:09 -03:00
parent 340deb062d
commit 82b8538041
10 changed files with 475 additions and 15 deletions

View File

@ -30,7 +30,7 @@
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.10"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.1.6"},
{vsn, "0.1.7"},
{registered, []},
{applications, [
kernel,

View File

@ -113,6 +113,22 @@ fields(connector_config) ->
];
fields(producer) ->
[
{attributes_template,
sc(
hoconsc:array(ref(key_value_pair)),
#{
default => [],
desc => ?DESC("attributes_template")
}
)},
{ordering_key_template,
sc(
binary(),
#{
default => <<>>,
desc => ?DESC("ordering_key_template")
}
)},
{payload_template,
sc(
binary(),
@ -203,6 +219,18 @@ fields("consumer_resource_opts") ->
fun({Field, _Sc}) -> lists:member(Field, SupportedFields) end,
ResourceFields
);
fields(key_value_pair) ->
[
{key,
mk(binary(), #{
required => true,
validator => [
emqx_resource_validator:not_empty("Key templates must not be empty")
],
desc => ?DESC(kv_pair_key)
})},
{value, mk(binary(), #{required => true, desc => ?DESC(kv_pair_value)})}
];
fields("get_producer") ->
emqx_bridge_schema:status_fields() ++ fields("post_producer");
fields("post_producer") ->
@ -218,6 +246,8 @@ fields("put_consumer") ->
desc("config_producer") ->
?DESC("desc_config");
desc(key_value_pair) ->
?DESC("kv_pair_desc");
desc("config_consumer") ->
?DESC("desc_config");
desc("consumer_resource_opts") ->

View File

@ -9,15 +9,20 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-type config() :: #{
attributes_template := [#{key := binary(), value := binary()}],
connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(),
ordering_key_template := binary(),
payload_template := binary(),
pubsub_topic := binary(),
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(),
any() => term()
}.
-type state() :: #{
attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()},
client := emqx_bridge_gcp_pubsub_client:state(),
ordering_key_template := emqx_placeholder:tmpl_token(),
payload_template := emqx_placeholder:tmpl_token(),
project_id := emqx_bridge_gcp_pubsub_client:project_id(),
pubsub_topic := binary()
@ -57,6 +62,8 @@ on_start(InstanceId, Config0) ->
}),
Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
#{
attributes_template := AttributesTemplate,
ordering_key_template := OrderingKeyTemplate,
payload_template := PayloadTemplate,
pubsub_topic := PubSubTopic,
service_account_json := #{<<"project_id">> := ProjectId}
@ -65,6 +72,8 @@ on_start(InstanceId, Config0) ->
{ok, Client} ->
State = #{
client => Client,
attributes_template => preproc_attributes(AttributesTemplate),
ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate),
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
project_id => ProjectId,
pubsub_topic => PubSubTopic
@ -197,14 +206,76 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs0) ->
Request, ReplyFunAndArgs, Client
).
-spec encode_payload(state(), Selected :: map()) -> #{data := binary()}.
encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) ->
Interpolated =
case PayloadTemplate of
[] -> emqx_utils_json:encode(Selected);
_ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected)
-spec encode_payload(state(), Selected :: map()) ->
#{
data := binary(),
attributes => #{binary() => binary()},
'orderingKey' => binary()
}.
encode_payload(State, Selected) ->
#{
attributes_template := AttributesTemplate,
ordering_key_template := OrderingKeyTemplate,
payload_template := PayloadTemplate
} = State,
Data = render_payload(PayloadTemplate, Selected),
OrderingKey = render(OrderingKeyTemplate, Selected),
Attributes = proc_attributes(AttributesTemplate, Selected),
Payload0 = #{data => base64:encode(Data)},
Payload1 = put_if(Payload0, attributes, Attributes, map_size(Attributes) > 0),
put_if(Payload1, 'orderingKey', OrderingKey, OrderingKey =/= <<>>).
put_if(Acc, K, V, true) ->
Acc#{K => V};
put_if(Acc, _K, _V, false) ->
Acc.
-spec render_payload(emqx_placeholder:tmpl_token(), map()) -> binary().
render_payload([] = _Template, Selected) ->
emqx_utils_json:encode(Selected);
render_payload(Template, Selected) ->
render(Template, Selected).
render(Template, Selected) ->
Opts = #{
return => full_binary,
var_trans => fun
(undefined) -> <<>>;
(X) -> emqx_utils_conv:bin(X)
end
},
emqx_placeholder:proc_tmpl(Template, Selected, Opts).
-spec preproc_attributes([#{key := binary(), value := binary()}]) ->
#{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}.
preproc_attributes(AttributesTemplate) ->
lists:foldl(
fun(#{key := K, value := V}, Acc) ->
KT = emqx_placeholder:preproc_tmpl(K),
VT = emqx_placeholder:preproc_tmpl(V),
Acc#{KT => VT}
end,
#{data => base64:encode(Interpolated)}.
#{},
AttributesTemplate
).
-spec proc_attributes(#{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}, map()) ->
#{binary() => binary()}.
proc_attributes(AttributesTemplate, Selected) ->
maps:fold(
fun(KT, VT, Acc) ->
K = render(KT, Selected),
case K =:= <<>> of
true ->
Acc;
false ->
V = render(VT, Selected),
Acc#{K => V}
end
end,
#{},
AttributesTemplate
).
-spec to_pubsub_request([#{data := binary()}]) -> binary().
to_pubsub_request(Payloads) ->

View File

@ -63,7 +63,8 @@ single_config_tests() ->
t_get_status_down,
t_get_status_no_worker,
t_get_status_timeout_calling_workers,
t_on_start_ehttpc_pool_already_started
t_on_start_ehttpc_pool_already_started,
t_attributes
].
only_sync_tests() ->
@ -212,7 +213,9 @@ create_bridge_http(Config, GCPPubSubConfigOverrides) ->
Error
end,
ct:pal("bridge creation result: ~p", [Res]),
?assertEqual(element(1, ProbeResult), element(1, Res)),
?assertEqual(element(1, ProbeResult), element(1, Res), #{
creation_result => Res, probe_result => ProbeResult
}),
case ProbeResult of
{error, {{_, 500, _}, _, _}} -> error({bad_probe_result, ProbeResult});
_ -> ok
@ -456,6 +459,7 @@ assert_valid_request_headers(Headers, ServiceAccountJSON) ->
assert_valid_request_body(Body) ->
BodyMap = emqx_utils_json:decode(Body, [return_maps]),
?assertMatch(#{<<"messages">> := [_ | _]}, BodyMap),
ct:pal("request: ~p", [BodyMap]),
#{<<"messages">> := Messages} = BodyMap,
lists:map(
fun(Msg) ->
@ -480,6 +484,25 @@ assert_http_request(ServiceAccountJSON) ->
error({timeout, #{mailbox => Mailbox}})
end.
receive_http_request(ServiceAccountJSON) ->
receive
{http, Headers, Body} ->
assert_valid_request_headers(Headers, ServiceAccountJSON),
#{<<"messages">> := Msgs} = emqx_utils_json:decode(Body, [return_maps]),
lists:map(
fun(Msg) ->
#{<<"data">> := Content64} = Msg,
Content = base64:decode(Content64),
Decoded = emqx_utils_json:decode(Content, [return_maps]),
Msg#{<<"data">> := Decoded}
end,
Msgs
)
after 5_000 ->
{messages, Mailbox} = process_info(self(), messages),
error({timeout, #{mailbox => Mailbox}})
end.
install_telemetry_handler(TestCase) ->
Tid = ets:new(TestCase, [ordered_set, public]),
HandlerId = TestCase,
@ -585,8 +608,8 @@ t_publish_success(Config) ->
<<"topic">> := Topic,
<<"payload">> := Payload,
<<"metadata">> := #{<<"rule_id">> := RuleId}
}
],
} = Msg
] when not (is_map_key(<<"attributes">>, Msg) orelse is_map_key(<<"orderingKey">>, Msg)),
DecodedMessages
),
%% to avoid test flakiness
@ -1524,3 +1547,145 @@ t_query_sync(Config) ->
[]
),
ok.
t_attributes(Config) ->
Name = ?config(gcp_pubsub_name, Config),
ServiceAccountJSON = ?config(service_account_json, Config),
LocalTopic = <<"t/topic">>,
?check_trace(
begin
{ok, _} = create_bridge_http(
Config,
#{
<<"local_topic">> => LocalTopic,
<<"attributes_template">> =>
[
#{
<<"key">> => <<"${.payload.key}">>,
<<"value">> => <<"fixed_value">>
},
#{
<<"key">> => <<"${.payload.key}2">>,
<<"value">> => <<"${.payload.value}">>
},
#{
<<"key">> => <<"fixed_key">>,
<<"value">> => <<"fixed_value">>
},
#{
<<"key">> => <<"fixed_key2">>,
<<"value">> => <<"${.payload.value}">>
}
],
<<"ordering_key_template">> => <<"${.payload.ok}">>
}
),
%% without ordering key
Payload0 =
emqx_utils_json:encode(
#{
<<"value">> => <<"payload_value">>,
<<"key">> => <<"payload_key">>
}
),
Message0 = emqx_message:make(LocalTopic, Payload0),
emqx:publish(Message0),
DecodedMessages0 = receive_http_request(ServiceAccountJSON),
?assertMatch(
[
#{
<<"attributes">> :=
#{
<<"fixed_key">> := <<"fixed_value">>,
<<"fixed_key2">> := <<"payload_value">>,
<<"payload_key">> := <<"fixed_value">>,
<<"payload_key2">> := <<"payload_value">>
},
<<"data">> := #{
<<"topic">> := _,
<<"payload">> := _
}
} = Msg
] when not is_map_key(<<"orderingKey">>, Msg),
DecodedMessages0
),
%% with ordering key
Payload1 =
emqx_utils_json:encode(
#{
<<"value">> => <<"payload_value">>,
<<"key">> => <<"payload_key">>,
<<"ok">> => <<"ordering_key">>
}
),
Message1 = emqx_message:make(LocalTopic, Payload1),
emqx:publish(Message1),
DecodedMessages1 = receive_http_request(ServiceAccountJSON),
?assertMatch(
[
#{
<<"attributes">> :=
#{
<<"fixed_key">> := <<"fixed_value">>,
<<"fixed_key2">> := <<"payload_value">>,
<<"payload_key">> := <<"fixed_value">>,
<<"payload_key2">> := <<"payload_value">>
},
<<"orderingKey">> := <<"ordering_key">>,
<<"data">> := #{
<<"topic">> := _,
<<"payload">> := _
}
}
],
DecodedMessages1
),
%% will result in empty key
Payload2 =
emqx_utils_json:encode(
#{
<<"value">> => <<"payload_value">>,
<<"ok">> => <<"ordering_key">>
}
),
Message2 = emqx_message:make(LocalTopic, Payload2),
emqx:publish(Message2),
[DecodedMessage2] = receive_http_request(ServiceAccountJSON),
?assertEqual(
#{
<<"fixed_key">> => <<"fixed_value">>,
<<"fixed_key2">> => <<"payload_value">>,
<<"2">> => <<"payload_value">>
},
maps:get(<<"attributes">>, DecodedMessage2)
),
%% ensure loading cluster override file doesn't mangle the attribute
%% placeholders...
#{<<"bridges">> := #{?BRIDGE_TYPE_BIN := #{Name := RawConf}}} =
emqx_config:read_override_conf(#{override_to => cluster}),
?assertEqual(
[
#{
<<"key">> => <<"${.payload.key}">>,
<<"value">> => <<"fixed_value">>
},
#{
<<"key">> => <<"${.payload.key}2">>,
<<"value">> => <<"${.payload.value}">>
},
#{
<<"key">> => <<"fixed_key">>,
<<"value">> => <<"fixed_value">>
},
#{
<<"key">> => <<"fixed_key2">>,
<<"value">> => <<"${.payload.value}">>
}
],
maps:get(<<"attributes_template">>, RawConf)
),
ok
end,
[]
),
ok.

View File

@ -0,0 +1,149 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_gcp_pubsub_tests).
-include_lib("eunit/include/eunit.hrl").
%%===========================================================================
%% Data section
%%===========================================================================
%% erlfmt-ignore
gcp_pubsub_producer_hocon() ->
"""
bridges.gcp_pubsub.my_producer {
attributes_template = [
{key = \"${payload.key}\", value = fixed_value}
{key = \"${payload.key}2\", value = \"${.payload.value}\"}
{key = fixed_key, value = fixed_value}
{key = fixed_key2, value = \"${.payload.value}\"}
]
connect_timeout = 15s
enable = false
local_topic = \"t/gcp/produ\"
max_retries = 2
ordering_key_template = \"${.payload.ok}\"
payload_template = \"${.}\"
pipelining = 100
pool_size = 8
pubsub_topic = my-topic
resource_opts {
batch_size = 1
batch_time = 0ms
health_check_interval = 15s
inflight_window = 100
max_buffer_bytes = 256MB
query_mode = async
request_ttl = 15s
start_after_created = true
start_timeout = 5s
worker_pool_size = 16
}
service_account_json {
auth_provider_x509_cert_url = \"https://www.googleapis.com/oauth2/v1/certs\"
auth_uri = \"https://accounts.google.com/o/oauth2/auth\"
client_email = \"test@myproject.iam.gserviceaccount.com\"
client_id = \"123812831923812319190\"
client_x509_cert_url = \"https://www.googleapis.com/robot/v1/metadata/x509/...\"
private_key = \"-----BEGIN PRIVATE KEY-----...\"
private_key_id = \"kid\"
project_id = myproject
token_uri = \"https://oauth2.googleapis.com/token\"
type = service_account
}
}
""".
%%===========================================================================
%% Helper functions
%%===========================================================================
parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon),
Conf.
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf).
-define(validation_error(Reason, Value),
{emqx_bridge_schema, [
#{
kind := validation_error,
reason := Reason,
value := Value
}
]}
).
-define(ok_config(Cfg), #{
<<"bridges">> :=
#{
<<"gcp_pubsub">> :=
#{
<<"my_producer">> :=
Cfg
}
}
}).
%%===========================================================================
%% Test cases
%%===========================================================================
producer_attributes_validator_test_() ->
%% ensure this module is loaded when testing only this file
_ = emqx_bridge_enterprise:module_info(),
BaseConf = parse(gcp_pubsub_producer_hocon()),
Override = fun(Cfg) ->
emqx_utils_maps:deep_merge(
BaseConf,
#{
<<"bridges">> =>
#{
<<"gcp_pubsub">> =>
#{<<"my_producer">> => Cfg}
}
}
)
end,
[
{"base config",
?_assertMatch(
?ok_config(#{
<<"attributes_template">> := [_, _, _, _]
}),
check(BaseConf)
)},
{"empty key template",
?_assertThrow(
?validation_error("Key templates must not be empty", _),
check(
Override(#{
<<"attributes_template">> => [
#{
<<"key">> => <<>>,
<<"value">> => <<"some_value">>
}
]
})
)
)},
{"empty value template",
?_assertMatch(
?ok_config(#{
<<"attributes_template">> := [_]
}),
check(
Override(#{
<<"attributes_template">> => [
#{
<<"key">> => <<"some_key">>,
<<"value">> => <<>>
}
]
})
)
)}
].

View File

@ -0,0 +1,3 @@
Added support for defining message attributes and ordering key templates for GCP PubSub Producer bridge.
Also updated our HOCON library to fix an issue where objects in an array were being concatenated even if they lay on different lines.

View File

@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do
# in conflict by emqtt and hocon
{:getopt, "1.0.2", override: true},
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
{:hocon, github: "emqx/hocon", tag: "0.39.14", override: true},
{:hocon, github: "emqx/hocon", tag: "0.39.16", override: true},
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
{:esasl, github: "emqx/esasl", tag: "0.2.0"},
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},

View File

@ -75,7 +75,7 @@
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
, {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}

View File

@ -46,6 +46,18 @@ payload_template.desc:
payload_template.label:
"""Payload template"""
attributes_template.desc:
"""The template for formatting the outgoing message attributes. Undefined values will be rendered as empty string values. Empty keys are removed from the attribute map."""
attributes_template.label:
"""Attributes template"""
ordering_key_template.desc:
"""The template for formatting the outgoing message ordering key. Undefined values will be rendered as empty string values. This value will not be added to the message if it's empty."""
ordering_key_template.label:
"""Ordering Key template"""
pipelining.desc:
"""A positive integer. Whether to send HTTP requests continuously, when set to 1, it means that after each HTTP request is sent, you need to wait for the server to return and then continue to send the next request."""
@ -64,6 +76,36 @@ pubsub_topic.desc:
pubsub_topic.label:
"""GCP PubSub Topic"""
producer_attributes.desc:
"""List of key-value pairs representing templates to construct the attributes for a given GCP PubSub message. Both keys and values support the placeholder `${var_name}` notation. Keys that are undefined or resolve to an empty string are omitted from the attribute map."""
producer_attributes.label:
"""Attributes Template"""
producer_ordering_key.desc:
"""Template for the Ordering Key of a given GCP PubSub message. If the resolved value is undefined or an empty string, the ordering key property is omitted from the message."""
producer_ordering_key.label:
"""Ordering Key Template"""
kv_pair_desc.desc:
"""Key-value pair."""
kv_pair_desc.label:
"""Key-value pair"""
kv_pair_key.desc:
"""Key"""
kv_pair_key.label:
"""Key"""
kv_pair_value.desc:
"""Value"""
kv_pair_value.label:
"""Value"""
service_account_json.desc:
"""JSON containing the GCP Service Account credentials to be used with PubSub.
When a GCP Service Account is created (as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), you have the option of downloading the credentials in JSON form. That's the file needed."""