diff --git a/.ci/docker-compose-file/.env b/.ci/docker-compose-file/.env index 12bc988bf..e99a6d13f 100644 --- a/.ci/docker-compose-file/.env +++ b/.ci/docker-compose-file/.env @@ -9,6 +9,7 @@ DYNAMO_TAG=1.21.0 CASSANDRA_TAG=3.11.6 MINIO_TAG=RELEASE.2023-03-20T20-16-18Z OPENTS_TAG=9aa7f88 +KINESIS_TAG=2.1 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server SQLSERVER_TAG=2019-CU19-ubuntu-20.04 diff --git a/.ci/docker-compose-file/docker-compose-kinesis.yaml b/.ci/docker-compose-file/docker-compose-kinesis.yaml new file mode 100644 index 000000000..d05b7c6c7 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-kinesis.yaml @@ -0,0 +1,12 @@ +version: '3.9' + +services: + kinesis: + container_name: kinesis + image: localstack/localstack:2.1 + environment: + - KINESIS_ERROR_PROBABILITY=0.0 + - KINESIS_LATENCY=0 + restart: always + networks: + - emqx_bridge diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index c0c88aef0..74d2583c9 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -49,6 +49,8 @@ services: - 38080:38080 # HStreamDB - 15670:5670 + # Kinesis + - 4566:4566 command: - "-host=0.0.0.0" - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index d5576108f..c9590354b 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -161,5 +161,11 @@ "listen": "0.0.0.0:6570", "upstream": "hstreamdb:6570", "enabled": true + }, + { + "name": "kinesis", + "listen": "0.0.0.0:4566", + "upstream": "kinesis:4566", + "enabled": true } ] diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index c1692b9af..d5fc42ade 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -88,7 +88,8 @@ T == sqlserver; T == pulsar_producer; T == oracle; - T == iotdb + T == iotdb; + T == kinesis_producer ). -define(ROOT_KEY, bridges). diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 539753b3b..62f0d7d89 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -374,6 +374,8 @@ parse_confs(<<"kafka">> = _Type, Name, Conf) -> Conf#{bridge_name => Name}; parse_confs(<<"pulsar_producer">> = _Type, Name, Conf) -> Conf#{bridge_name => Name}; +parse_confs(<<"kinesis_producer">> = _Type, Name, Conf) -> + Conf#{bridge_name => Name}; parse_confs(_Type, _Name, Conf) -> Conf. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index e76d1af37..02a03a6d6 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -49,7 +49,8 @@ api_schemas(Method) -> api_ref(emqx_bridge_pulsar, <<"pulsar_producer">>, Method ++ "_producer"), api_ref(emqx_bridge_oracle, <<"oracle">>, Method), api_ref(emqx_bridge_iotdb, <<"iotdb">>, Method), - api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method) + api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method), + api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer") ]. schema_modules() -> @@ -74,7 +75,8 @@ schema_modules() -> emqx_bridge_pulsar, emqx_bridge_oracle, emqx_bridge_iotdb, - emqx_bridge_rabbitmq + emqx_bridge_rabbitmq, + emqx_bridge_kinesis ]. examples(Method) -> @@ -119,7 +121,8 @@ resource_type(opents) -> emqx_bridge_opents_connector; resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer; resource_type(oracle) -> emqx_oracle; resource_type(iotdb) -> emqx_bridge_iotdb_impl; -resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector. +resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; +resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer. fields(bridges) -> [ @@ -199,7 +202,8 @@ fields(bridges) -> ] ++ kafka_structs() ++ pulsar_structs() ++ gcp_pubsub_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ - pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs(). + pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs() ++ + kinesis_structs(). mongodb_structs() -> [ @@ -365,6 +369,18 @@ rabbitmq_structs() -> )} ]. +kinesis_structs() -> + [ + {kinesis_producer, + mk( + hoconsc:map(name, ref(emqx_bridge_kinesis, "config_producer")), + #{ + desc => <<"Amazon Kinesis Producer Bridge Config">>, + required => false + } + )} + ]. + api_ref(Module, Type, Method) -> {Type, ref(Module, Method)}. diff --git a/apps/emqx_bridge_dynamo/rebar.config b/apps/emqx_bridge_dynamo/rebar.config index 672e8efc2..e80fb0f80 100644 --- a/apps/emqx_bridge_dynamo/rebar.config +++ b/apps/emqx_bridge_dynamo/rebar.config @@ -1,6 +1,6 @@ %% -*- mode: erlang; -*- {erl_opts, [debug_info]}. -{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}} +{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}} , {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_kinesis/BSL.txt b/apps/emqx_bridge_kinesis/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_bridge_kinesis/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_kinesis/README.md b/apps/emqx_bridge_kinesis/README.md new file mode 100644 index 000000000..097c27a92 --- /dev/null +++ b/apps/emqx_bridge_kinesis/README.md @@ -0,0 +1,22 @@ +# Amazon Kinesis Data Integration Bridge + +This application houses the Amazon Kinesis Producer data +integration bridge for EMQX Enterprise Edition. It provides the means to +connect to Amazon Kinesis Data Streams and publish messages to it. + +# Documentation links + +For more information about Amazon Kinesis Data Streams, please see its +[official site](https://aws.amazon.com/kinesis/data-streams/). + +# Configurations + +Please see [Ingest Data into Kinesis](https://docs.emqx.com/en/enterprise/v5.1/data-integration/data-bridge-kinesis.html) for more detailed info. + +# Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + +# License + +EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt). diff --git a/apps/emqx_bridge_kinesis/docker-ct b/apps/emqx_bridge_kinesis/docker-ct new file mode 100644 index 000000000..4422ee81e --- /dev/null +++ b/apps/emqx_bridge_kinesis/docker-ct @@ -0,0 +1,2 @@ +toxiproxy +kinesis diff --git a/apps/emqx_bridge_kinesis/rebar.config b/apps/emqx_bridge_kinesis/rebar.config new file mode 100644 index 000000000..e4b57846e --- /dev/null +++ b/apps/emqx_bridge_kinesis/rebar.config @@ -0,0 +1,11 @@ +%% -*- mode: erlang; -*- +{erl_opts, [debug_info]}. +{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}} + , {emqx_connector, {path, "../../apps/emqx_connector"}} + , {emqx_resource, {path, "../../apps/emqx_resource"}} + , {emqx_bridge, {path, "../../apps/emqx_bridge"}} + ]}. + +{shell, [ + {apps, [emqx_bridge_kinesis]} +]}. diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src new file mode 100644 index 000000000..36f6c8b0b --- /dev/null +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src @@ -0,0 +1,13 @@ +{application, emqx_bridge_kinesis, [ + {description, "EMQX Enterprise Amazon Kinesis Bridge"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + erlcloud + ]}, + {env, []}, + {modules, []}, + {links, []} +]}. diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl new file mode 100644 index 000000000..cb3cd3788 --- /dev/null +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl @@ -0,0 +1,167 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_kinesis). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +%% hocon_schema API +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-export([ + conn_bridge_examples/1 +]). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + "bridge_kinesis". + +roots() -> + []. + +fields("config_producer") -> + emqx_bridge_schema:common_bridge_fields() ++ + emqx_resource_schema:fields("resource_opts") ++ + fields(connector_config) ++ fields(producer); +fields(connector_config) -> + [ + {aws_access_key_id, + mk( + binary(), + #{ + required => true, + desc => ?DESC("aws_access_key_id") + } + )}, + {aws_secret_access_key, + mk( + binary(), + #{ + required => true, + desc => ?DESC("aws_secret_access_key"), + sensitive => true + } + )}, + {endpoint, + mk( + binary(), + #{ + default => <<"https://kinesis.us-east-1.amazonaws.com">>, + desc => ?DESC("endpoint") + } + )}, + {max_retries, + mk( + non_neg_integer(), + #{ + required => false, + default => 2, + desc => ?DESC("max_retries") + } + )}, + {pool_size, + sc( + pos_integer(), + #{ + default => 8, + desc => ?DESC("pool_size") + } + )} + ]; +fields(producer) -> + [ + {payload_template, + sc( + binary(), + #{ + default => <<>>, + desc => ?DESC("payload_template") + } + )}, + {local_topic, + sc( + binary(), + #{ + desc => ?DESC("local_topic") + } + )}, + {stream_name, + sc( + binary(), + #{ + required => true, + desc => ?DESC("stream_name") + } + )}, + {partition_key, + sc( + binary(), + #{ + required => true, + desc => ?DESC("partition_key") + } + )} + ]; +fields("get_producer") -> + emqx_bridge_schema:status_fields() ++ fields("post_producer"); +fields("post_producer") -> + [type_field_producer(), name_field() | fields("config_producer")]; +fields("put_producer") -> + fields("config_producer"). + +desc("config_producer") -> + ?DESC("desc_config"); +desc(_) -> + undefined. + +conn_bridge_examples(Method) -> + [ + #{ + <<"kinesis_producer">> => #{ + summary => <<"Amazon Kinesis Producer Bridge">>, + value => values(producer, Method) + } + } + ]. + +values(producer, _Method) -> + #{ + aws_access_key_id => <<"aws_access_key_id">>, + aws_secret_access_key => <<"******">>, + endpoint => <<"https://kinesis.us-east-1.amazonaws.com">>, + max_retries => 3, + stream_name => <<"stream_name">>, + partition_key => <<"key">>, + resource_opts => #{ + worker_pool_size => 1, + health_check_interval => 15000, + query_mode => async, + inflight_window => 100, + max_buffer_bytes => 100 * 1024 * 1024 + } + }. + +%%------------------------------------------------------------------------------------------------- +%% Helper fns +%%------------------------------------------------------------------------------------------------- + +sc(Type, Meta) -> hoconsc:mk(Type, Meta). + +mk(Type, Meta) -> hoconsc:mk(Type, Meta). + +enum(OfSymbols) -> hoconsc:enum(OfSymbols). + +type_field_producer() -> + {type, mk(enum([kinesis_producer]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl new file mode 100644 index 000000000..bb1000e5f --- /dev/null +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl @@ -0,0 +1,178 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_kinesis_connector_client). + +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("erlcloud/include/erlcloud_aws.hrl"). + +-behaviour(gen_server). + +-type state() :: #{ + instance_id := resource_id(), + partition_key := binary(), + stream_name := binary() +}. +-type record() :: {Data :: binary(), PartitionKey :: binary()}. + +-define(DEFAULT_PORT, 443). + +%% API +-export([ + start_link/1, + connection_status/1, + query/2 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-ifdef(TEST). +-export([execute/2]). +-endif. + +%% The default timeout for Kinesis API calls is 10 seconds, +%% but this value for `gen_server:call` is 5s, +%% so we should adjust timeout for `gen_server:call` +-define(HEALTH_CHECK_TIMEOUT, 15000). + +%%%=================================================================== +%%% API +%%%=================================================================== +connection_status(Pid) -> + try + gen_server:call(Pid, connection_status, ?HEALTH_CHECK_TIMEOUT) + catch + _:_ -> + {error, timeout} + end. + +query(Pid, Records) -> + gen_server:call(Pid, {query, Records}, infinity). + +%%-------------------------------------------------------------------- +%% @doc +%% Starts Bridge which communicates to Amazon Kinesis Data Streams +%% @end +%%-------------------------------------------------------------------- +start_link(Options) -> + gen_server:start_link(?MODULE, Options, []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% Initialize kinesis connector +-spec init(emqx_bridge_kinesis_impl_producer:config()) -> {ok, state()}. +init(#{ + aws_access_key_id := AwsAccessKey, + aws_secret_access_key := AwsSecretAccessKey, + endpoint := Endpoint, + partition_key := PartitionKey, + stream_name := StreamName, + max_retries := MaxRetries, + instance_id := InstanceId +}) -> + process_flag(trap_exit, true), + + #{scheme := Scheme, hostname := Host, port := Port} = + emqx_schema:parse_server( + Endpoint, + #{ + default_port => ?DEFAULT_PORT, + supported_schemes => ["http", "https"] + } + ), + State = #{ + instance_id => InstanceId, + partition_key => PartitionKey, + stream_name => StreamName + }, + New = + fun(AccessKeyID, SecretAccessKey, HostAddr, HostPort, ConnectionScheme) -> + Config0 = erlcloud_kinesis:new( + AccessKeyID, + SecretAccessKey, + HostAddr, + HostPort, + ConnectionScheme ++ "://" + ), + Config0#aws_config{retry_num = MaxRetries} + end, + erlcloud_config:configure( + to_str(AwsAccessKey), to_str(AwsSecretAccessKey), Host, Port, Scheme, New + ), + {ok, State}. + +handle_call(connection_status, _From, #{stream_name := StreamName} = State) -> + Status = + case erlcloud_kinesis:describe_stream(StreamName) of + {ok, _} -> + {ok, connected}; + {error, {<<"ResourceNotFoundException">>, _}} -> + {error, unhealthy_target}; + Error -> + {error, Error} + end, + {reply, Status, State}; +handle_call({query, Records}, _From, #{stream_name := StreamName} = State) -> + Result = do_query(StreamName, Records), + {reply, Result, State}; +handle_call(_Request, _From, State) -> + {reply, {error, unknown_call}, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(Reason, #{instance_id := InstanceId} = _State) -> + ?tp(kinesis_stop, #{instance_id => InstanceId, reason => Reason}), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec do_query(binary(), [record()]) -> + {ok, jsx:json_term() | binary()} + | {error, {unrecoverable_error, term()}} + | {error, term()}. +do_query(StreamName, Records) -> + try + execute(put_record, {StreamName, Records}) + catch + _Type:Reason -> + {error, {unrecoverable_error, {invalid_request, Reason}}} + end. + +-spec execute(put_record, {binary(), [record()]}) -> + {ok, jsx:json_term() | binary()} + | {error, term()}. +execute(put_record, {StreamName, [{Data, PartitionKey}] = Record}) -> + Result = erlcloud_kinesis:put_record(StreamName, PartitionKey, Data), + ?tp(kinesis_put_record, #{records => Record, result => Result}), + Result; +execute(put_record, {StreamName, Items}) when is_list(Items) -> + Result = erlcloud_kinesis:put_records(StreamName, Items), + ?tp(kinesis_put_record, #{records => Items, result => Result}), + Result. + +-spec to_str(list() | binary()) -> list(). +to_str(List) when is_list(List) -> + List; +to_str(Bin) when is_binary(Bin) -> + erlang:binary_to_list(Bin). diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl new file mode 100644 index 000000000..7948581b5 --- /dev/null +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl @@ -0,0 +1,247 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_kinesis_impl_producer). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(HEALTH_CHECK_TIMEOUT, 15000). +-define(TOPIC_MESSAGE, + "Kinesis stream is invalid. Please check if the stream exist in Kinesis account." +). + +-type config() :: #{ + aws_access_key_id := binary(), + aws_secret_access_key := binary(), + endpoint := binary(), + stream_name := binary(), + partition_key := binary(), + payload_template := binary(), + max_retries := non_neg_integer(), + pool_size := non_neg_integer(), + instance_id => resource_id(), + any() => term() +}. +-type templates() :: #{ + partition_key := list(), + send_message := list() +}. +-type state() :: #{ + pool_name := resource_id(), + templates := templates() +}. +-export_type([config/0]). + +%% `emqx_resource' API +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2 +]). + +-export([ + connect/1 +]). + +%%------------------------------------------------------------------------------------------------- +%% `emqx_resource' API +%%------------------------------------------------------------------------------------------------- + +callback_mode() -> always_sync. + +-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. +on_start( + InstanceId, + #{ + pool_size := PoolSize + } = Config0 +) -> + ?SLOG(info, #{ + msg => "starting_kinesis_bridge", + connector => InstanceId, + config => redact(Config0) + }), + Config = Config0#{instance_id => InstanceId}, + Options = [ + {config, Config}, + {pool_size, PoolSize} + ], + Templates = parse_template(Config), + State = #{ + pool_name => InstanceId, + templates => Templates + }, + + case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of + ok -> + ?tp(emqx_bridge_kinesis_impl_producer_start_ok, #{config => Config}), + {ok, State}; + Error -> + ?tp(emqx_bridge_kinesis_impl_producer_start_failed, #{config => Config}), + Error + end. + +-spec on_stop(resource_id(), state()) -> ok | {error, term()}. +on_stop(InstanceId, _State) -> + emqx_resource_pool:stop(InstanceId). + +-spec on_get_status(resource_id(), state()) -> + connected | disconnected | {disconnected, state(), {unhealthy_target, string()}}. +on_get_status(_InstanceId, #{pool_name := Pool} = State) -> + case + emqx_resource_pool:health_check_workers( + Pool, + {emqx_bridge_kinesis_connector_client, connection_status, []}, + ?HEALTH_CHECK_TIMEOUT, + #{return_values => true} + ) + of + {ok, Values} -> + AllOk = lists:all(fun(S) -> S =:= {ok, connected} end, Values), + case AllOk of + true -> + connected; + false -> + Unhealthy = lists:any(fun(S) -> S =:= {error, unhealthy_target} end, Values), + case Unhealthy of + true -> {disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}}; + false -> disconnected + end + end; + {error, _} -> + disconnected + end. + +-spec on_query( + resource_id(), + {send_message, map()}, + state() +) -> + {ok, map()} + | {error, {recoverable_error, term()}} + | {error, term()}. +on_query(ResourceId, {send_message, Message}, State) -> + Requests = [{send_message, Message}], + ?tp(emqx_bridge_kinesis_impl_producer_sync_query, #{message => Message}), + do_send_requests_sync(ResourceId, Requests, State). + +-spec on_batch_query( + resource_id(), + [{send_message, map()}], + state() +) -> + {ok, map()} + | {error, {recoverable_error, term()}} + | {error, term()}. +%% we only support batch insert +on_batch_query(ResourceId, [{send_message, _} | _] = Requests, State) -> + ?tp(emqx_bridge_kinesis_impl_producer_sync_batch_query, #{requests => Requests}), + do_send_requests_sync(ResourceId, Requests, State). + +connect(Opts) -> + Options = proplists:get_value(config, Opts), + emqx_bridge_kinesis_connector_client:start_link(Options). + +%%------------------------------------------------------------------------------------------------- +%% Helper fns +%%------------------------------------------------------------------------------------------------- + +-spec do_send_requests_sync( + resource_id(), + [{send_message, map()}], + state() +) -> + {ok, jsx:json_term() | binary()} + | {error, {recoverable_error, term()}} + | {error, {unrecoverable_error, {invalid_request, term()}}} + | {error, {unrecoverable_error, {unhealthy_target, string()}}} + | {error, {unrecoverable_error, term()}} + | {error, term()}. +do_send_requests_sync( + InstanceId, + Requests, + #{pool_name := PoolName, templates := Templates} +) -> + Records = render_records(Requests, Templates), + Result = ecpool:pick_and_do( + PoolName, + {emqx_bridge_kinesis_connector_client, query, [Records]}, + no_handover + ), + handle_result(Result, Requests, InstanceId). + +handle_result({ok, _} = Result, _Requests, _InstanceId) -> + Result; +handle_result({error, {<<"ResourceNotFoundException">>, _} = Reason}, Requests, InstanceId) -> + ?SLOG(error, #{ + msg => "kinesis_error_response", + request => Requests, + connector => InstanceId, + reason => Reason + }), + {error, {unrecoverable_error, {unhealthy_target, ?TOPIC_MESSAGE}}}; +handle_result( + {error, {<<"ProvisionedThroughputExceededException">>, _} = Reason}, Requests, InstanceId +) -> + ?SLOG(error, #{ + msg => "kinesis_error_response", + request => Requests, + connector => InstanceId, + reason => Reason + }), + {error, {recoverable_error, Reason}}; +handle_result({error, {<<"InvalidArgumentException">>, _} = Reason}, Requests, InstanceId) -> + ?SLOG(error, #{ + msg => "kinesis_error_response", + request => Requests, + connector => InstanceId, + reason => Reason + }), + {error, {unrecoverable_error, Reason}}; +handle_result({error, {econnrefused = Reason, _}}, Requests, InstanceId) -> + ?SLOG(error, #{ + msg => "kinesis_error_response", + request => Requests, + connector => InstanceId, + reason => Reason + }), + {error, {recoverable_error, Reason}}; +handle_result({error, Reason} = Error, Requests, InstanceId) -> + ?SLOG(error, #{ + msg => "kinesis_error_response", + request => Requests, + connector => InstanceId, + reason => Reason + }), + Error. + +parse_template(Config) -> + #{payload_template := PayloadTemplate, partition_key := PartitionKeyTemplate} = Config, + Templates = #{send_message => PayloadTemplate, partition_key => PartitionKeyTemplate}, + maps:map(fun(_K, V) -> emqx_placeholder:preproc_tmpl(V) end, Templates). + +render_records(Items, Templates) -> + PartitionKeyTemplate = maps:get(partition_key, Templates), + MsgTemplate = maps:get(send_message, Templates), + render_messages(Items, {MsgTemplate, PartitionKeyTemplate}, []). + +render_messages([], _Templates, RenderedMsgs) -> + RenderedMsgs; +render_messages( + [{send_message, Msg} | Others], + {MsgTemplate, PartitionKeyTemplate} = Templates, + RenderedMsgs +) -> + Data = emqx_placeholder:proc_tmpl(MsgTemplate, Msg), + PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTemplate, Msg), + RenderedMsg = {Data, PartitionKey}, + render_messages(Others, Templates, [RenderedMsg | RenderedMsgs]). + +redact(Config) -> + emqx_utils:redact(Config, fun(Any) -> Any =:= aws_secret_access_key end). diff --git a/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl b/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl new file mode 100644 index 000000000..114f324a9 --- /dev/null +++ b/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl @@ -0,0 +1,817 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_kinesis_impl_producer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(PRODUCER, emqx_bridge_kinesis_impl_producer). +-define(BRIDGE_TYPE, kinesis_producer). +-define(BRIDGE_TYPE_BIN, <<"kinesis_producer">>). +-define(KINESIS_PORT, 4566). +-define(TOPIC, <<"t/topic">>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, with_batch}, + {group, without_batch} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {with_batch, TCs}, + {without_batch, TCs} + ]. + +init_per_suite(Config) -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy.emqx.net"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + ProxyName = "kinesis", + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), + {ok, _} = application:ensure_all_started(emqx_connector), + emqx_mgmt_api_test_util:init_suite(), + [ + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {kinesis_port, ?KINESIS_PORT}, + {proxy_name, ProxyName} + | Config + ]. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), + _ = application:stop(emqx_connector), + ok. + +init_per_group(with_batch, Config) -> + [{batch_size, 100} | Config]; +init_per_group(without_batch, Config) -> + [{batch_size, 1} | Config]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(TestCase, Config0) -> + ok = snabbkaffe:start_trace(), + ProxyHost = ?config(proxy_host, Config0), + ProxyPort = ?config(proxy_port, Config0), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + TimeTrap = + case TestCase of + t_wrong_server -> 60; + _ -> 30 + end, + ct:timetrap({seconds, TimeTrap}), + delete_all_bridges(), + Tid = install_telemetry_handler(TestCase), + put(telemetry_table, Tid), + Config = generate_config(Config0), + create_stream(Config), + [{telemetry_table, Tid} | Config]. + +end_per_testcase(_TestCase, Config) -> + ok = snabbkaffe:stop(), + delete_all_bridges(), + delete_stream(Config), + emqx_common_test_helpers:call_janitor(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +generate_config(Config0) -> + #{ + name := Name, + config_string := ConfigString, + kinesis_config := KinesisConfig + } = kinesis_config(Config0), + Endpoint = map_get(<<"endpoint">>, KinesisConfig), + #{scheme := Scheme, hostname := Host, port := Port} = + emqx_schema:parse_server( + Endpoint, + #{ + default_port => 443, + supported_schemes => ["http", "https"] + } + ), + ErlcloudConfig = erlcloud_kinesis:new("access_key", "secret", Host, Port, Scheme ++ "://"), + ResourceId = emqx_bridge_resource:resource_id(?BRIDGE_TYPE_BIN, Name), + BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, Name), + [ + {kinesis_name, Name}, + {connection_scheme, Scheme}, + {kinesis_config, KinesisConfig}, + {kinesis_config_string, ConfigString}, + {resource_id, ResourceId}, + {bridge_id, BridgeId}, + {erlcloud_config, ErlcloudConfig} + | Config0 + ]. + +kinesis_config(Config) -> + QueryMode = proplists:get_value(query_mode, Config, async), + Scheme = proplists:get_value(connection_scheme, Config, "http"), + ProxyHost = proplists:get_value(proxy_host, Config), + KinesisPort = proplists:get_value(kinesis_port, Config), + BatchSize = proplists:get_value(batch_size, Config, 100), + BatchTime = proplists:get_value(batch_time, Config, <<"500ms">>), + PayloadTemplate = proplists:get_value(payload_template, Config, "${payload}"), + StreamName = proplists:get_value(stream_name, Config, <<"mystream">>), + PartitionKey = proplists:get_value(partition_key, Config, <<"key">>), + MaxRetries = proplists:get_value(max_retries, Config, 3), + GUID = emqx_guid:to_hexstr(emqx_guid:gen()), + Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>, + ConfigString = + io_lib:format( + "bridges.kinesis_producer.~s {\n" + " enable = true\n" + " aws_access_key_id = \"aws_access_key_id\"\n" + " aws_secret_access_key = \"aws_secret_access_key\"\n" + " endpoint = \"~s://~s:~b\"\n" + " stream_name = \"~s\"\n" + " partition_key = \"~s\"\n" + " payload_template = \"~s\"\n" + " max_retries = ~b\n" + " pool_size = 1\n" + " resource_opts = {\n" + " health_check_interval = \"3s\"\n" + " request_ttl = 30s\n" + " resume_interval = 1s\n" + " metrics_flush_interval = \"700ms\"\n" + " worker_pool_size = 1\n" + " query_mode = ~s\n" + " batch_size = ~b\n" + " batch_time = \"~s\"\n" + " }\n" + "}\n", + [ + Name, + Scheme, + ProxyHost, + KinesisPort, + StreamName, + PartitionKey, + PayloadTemplate, + MaxRetries, + QueryMode, + BatchSize, + BatchTime + ] + ), + #{ + name => Name, + config_string => ConfigString, + kinesis_config => parse_and_check(ConfigString, Name) + }. + +parse_and_check(ConfigString, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + TypeBin = <<"kinesis_producer">>, + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf, + Config. + +delete_all_bridges() -> + ct:pal("deleting all bridges"), + lists:foreach( + fun(#{name := Name, type := Type}) -> + emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ). + +delete_bridge(Config) -> + Type = ?BRIDGE_TYPE, + Name = ?config(kinesis_name, Config), + ct:pal("deleting bridge ~p", [{Type, Name}]), + emqx_bridge:remove(Type, Name). + +create_bridge_http(Config) -> + create_bridge_http(Config, _KinesisConfigOverrides = #{}). + +create_bridge_http(Config, KinesisConfigOverrides) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(kinesis_name, Config), + KinesisConfig0 = ?config(kinesis_config, Config), + KinesisConfig = emqx_utils_maps:deep_merge(KinesisConfig0, KinesisConfigOverrides), + Params = KinesisConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), + ProbeResult = emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params), + ct:pal("creating bridge (via http): ~p", [Params]), + ct:pal("probe result: ~p", [ProbeResult]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res0} -> {ok, emqx_utils_json:decode(Res0, [return_maps])}; + Error -> Error + end, + ct:pal("bridge creation result: ~p", [Res]), + ?assertEqual(element(1, ProbeResult), element(1, Res)), + Res. + +create_bridge(Config) -> + create_bridge(Config, _KinesisConfigOverrides = #{}). + +create_bridge(Config, KinesisConfigOverrides) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(kinesis_name, Config), + KinesisConfig0 = ?config(kinesis_config, Config), + KinesisConfig = emqx_utils_maps:deep_merge(KinesisConfig0, KinesisConfigOverrides), + ct:pal("creating bridge: ~p", [KinesisConfig]), + Res = emqx_bridge:create(TypeBin, Name, KinesisConfig), + ct:pal("bridge creation result: ~p", [Res]), + Res. + +create_rule_and_action_http(Config) -> + BridgeId = ?config(bridge_id, Config), + Params = #{ + enable => true, + sql => <<"SELECT * FROM \"", ?TOPIC/binary, "\"">>, + actions => [BridgeId] + }, + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; + Error -> Error + end. + +create_stream(Config) -> + KinesisConfig = ?config(kinesis_config, Config), + ErlcloudConfig = ?config(erlcloud_config, Config), + StreamName = map_get(<<"stream_name">>, KinesisConfig), + {ok, _} = application:ensure_all_started(erlcloud), + delete_stream(StreamName, ErlcloudConfig), + {ok, _} = erlcloud_kinesis:create_stream(StreamName, 1, ErlcloudConfig), + ?retry( + _Sleep = 100, + _Attempts = 10, + begin + {ok, [{<<"StreamDescription">>, StreamInfo}]} = + erlcloud_kinesis:describe_stream(StreamName, ErlcloudConfig), + ?assertEqual( + <<"ACTIVE">>, + proplists:get_value(<<"StreamStatus">>, StreamInfo) + ) + end + ), + ok. + +delete_stream(Config) -> + KinesisConfig = ?config(kinesis_config, Config), + ErlcloudConfig = ?config(erlcloud_config, Config), + StreamName = map_get(<<"stream_name">>, KinesisConfig), + {ok, _} = application:ensure_all_started(erlcloud), + delete_stream(StreamName, ErlcloudConfig), + ok. + +delete_stream(StreamName, ErlcloudConfig) -> + case erlcloud_kinesis:delete_stream(StreamName, ErlcloudConfig) of + {ok, _} -> + ?retry( + _Sleep = 100, + _Attempts = 10, + ?assertMatch( + {error, {<<"ResourceNotFoundException">>, _}}, + erlcloud_kinesis:describe_stream(StreamName, ErlcloudConfig) + ) + ); + _ -> + ok + end, + ok. + +wait_record(Config, ShardIt, Timeout, Attempts) -> + [Record] = wait_records(Config, ShardIt, 1, Timeout, Attempts), + Record. + +wait_records(Config, ShardIt, Count, Timeout, Attempts) -> + ErlcloudConfig = ?config(erlcloud_config, Config), + ?retry( + Timeout, + Attempts, + begin + {ok, Ret} = erlcloud_kinesis:get_records(ShardIt, ErlcloudConfig), + Records = proplists:get_value(<<"Records">>, Ret), + Count = length(Records), + Records + end + ). + +get_shard_iterator(Config) -> + get_shard_iterator(Config, 1). + +get_shard_iterator(Config, Index) -> + KinesisConfig = ?config(kinesis_config, Config), + ErlcloudConfig = ?config(erlcloud_config, Config), + StreamName = map_get(<<"stream_name">>, KinesisConfig), + {ok, [{<<"Shards">>, Shards}]} = erlcloud_kinesis:list_shards(StreamName, ErlcloudConfig), + Shard = lists:nth(Index, lists:sort(Shards)), + ShardId = proplists:get_value(<<"ShardId">>, Shard), + {ok, [{<<"ShardIterator">>, ShardIt}]} = + erlcloud_kinesis:get_shard_iterator(StreamName, ShardId, <<"LATEST">>, ErlcloudConfig), + ShardIt. + +install_telemetry_handler(TestCase) -> + Tid = ets:new(TestCase, [ordered_set, public]), + HandlerId = TestCase, + TestPid = self(), + _ = telemetry:attach_many( + HandlerId, + emqx_resource_metrics:events(), + fun(EventName, Measurements, Metadata, _Config) -> + Data = #{ + name => EventName, + measurements => Measurements, + metadata => Metadata + }, + ets:insert(Tid, {erlang:monotonic_time(), Data}), + TestPid ! {telemetry, Data}, + ok + end, + unused_config + ), + emqx_common_test_helpers:on_exit(fun() -> + telemetry:detach(HandlerId), + ets:delete(Tid) + end), + Tid. + +current_metrics(ResourceId) -> + Mapping = metrics_mapping(), + maps:from_list([ + {Metric, F(ResourceId)} + || {Metric, F} <- maps:to_list(Mapping) + ]). + +metrics_mapping() -> + #{ + dropped => fun emqx_resource_metrics:dropped_get/1, + dropped_expired => fun emqx_resource_metrics:dropped_expired_get/1, + dropped_other => fun emqx_resource_metrics:dropped_other_get/1, + dropped_queue_full => fun emqx_resource_metrics:dropped_queue_full_get/1, + dropped_resource_not_found => fun emqx_resource_metrics:dropped_resource_not_found_get/1, + dropped_resource_stopped => fun emqx_resource_metrics:dropped_resource_stopped_get/1, + late_reply => fun emqx_resource_metrics:late_reply_get/1, + failed => fun emqx_resource_metrics:failed_get/1, + inflight => fun emqx_resource_metrics:inflight_get/1, + matched => fun emqx_resource_metrics:matched_get/1, + queuing => fun emqx_resource_metrics:queuing_get/1, + retried => fun emqx_resource_metrics:retried_get/1, + retried_failed => fun emqx_resource_metrics:retried_failed_get/1, + retried_success => fun emqx_resource_metrics:retried_success_get/1, + success => fun emqx_resource_metrics:success_get/1 + }. + +assert_metrics(ExpectedMetrics, ResourceId) -> + Mapping = metrics_mapping(), + Metrics = + lists:foldl( + fun(Metric, Acc) -> + #{Metric := Fun} = Mapping, + Value = Fun(ResourceId), + Acc#{Metric => Value} + end, + #{}, + maps:keys(ExpectedMetrics) + ), + CurrentMetrics = current_metrics(ResourceId), + TelemetryTable = get(telemetry_table), + RecordedEvents = ets:tab2list(TelemetryTable), + ?assertEqual(ExpectedMetrics, Metrics, #{ + current_metrics => CurrentMetrics, recorded_events => RecordedEvents + }), + ok. + +assert_empty_metrics(ResourceId) -> + Mapping = metrics_mapping(), + ExpectedMetrics = + lists:foldl( + fun(Metric, Acc) -> + Acc#{Metric => 0} + end, + #{}, + maps:keys(Mapping) + ), + assert_metrics(ExpectedMetrics, ResourceId). + +wait_telemetry_event(TelemetryTable, EventName, ResourceId) -> + wait_telemetry_event(TelemetryTable, EventName, ResourceId, #{timeout => 5_000, n_events => 1}). + +wait_telemetry_event( + TelemetryTable, + EventName, + ResourceId, + _Opts = #{ + timeout := Timeout, + n_events := NEvents + } +) -> + wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName). + +wait_n_events(_TelemetryTable, _ResourceId, NEvents, _Timeout, _EventName) when NEvents =< 0 -> + ok; +wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) -> + receive + {telemetry, #{name := [_, _, EventName], measurements := #{counter_inc := Inc}} = Event} -> + ct:pal("telemetry event: ~p", [Event]), + wait_n_events(TelemetryTable, ResourceId, NEvents - Inc, Timeout, EventName) + after Timeout -> + RecordedEvents = ets:tab2list(TelemetryTable), + CurrentMetrics = current_metrics(ResourceId), + ct:pal("recorded events: ~p", [RecordedEvents]), + ct:pal("current metrics: ~p", [CurrentMetrics]), + error({timeout_waiting_for_telemetry, EventName}) + end. + +wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> + Events = receive_all_events(GaugeName, Timeout), + case length(Events) > 0 andalso lists:last(Events) of + #{measurements := #{gauge_set := ExpectedValue}} -> + ok; + #{measurements := #{gauge_set := Value}} -> + ct:pal("events: ~p", [Events]), + ct:fail( + "gauge ~p didn't reach expected value ~p; last value: ~p", + [GaugeName, ExpectedValue, Value] + ); + false -> + ct:pal("no ~p gauge events received!", [GaugeName]) + end. + +receive_all_events(EventName, Timeout) -> + receive_all_events(EventName, Timeout, _MaxEvents = 10, _Count = 0, _Acc = []). + +receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents -> + lists:reverse(Acc); +receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) -> + receive + {telemetry, #{name := [_, _, EventName]} = Event} -> + receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc]) + after Timeout -> + lists:reverse(Acc) + end. + +to_str(List) when is_list(List) -> + List; +to_str(Bin) when is_binary(Bin) -> + erlang:binary_to_list(Bin); +to_str(Int) when is_integer(Int) -> + erlang:integer_to_list(Int). + +to_bin(Str) when is_list(Str) -> + erlang:list_to_binary(Str). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_create_via_http(Config) -> + ?assertMatch({ok, _}, create_bridge_http(Config)), + ok. + +t_start_failed_then_fix(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + ResourceId = ?config(resource_id, Config), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ct:sleep(1000), + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := emqx_bridge_kinesis_impl_producer_start_failed}, + 20_000 + ) + end), + ?retry( + _Sleep1 = 1_000, + _Attempts1 = 30, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + ok. + +t_stop(Config) -> + Name = ?config(kinesis_name, Config), + {ok, _} = create_bridge(Config), + ?check_trace( + ?wait_async_action( + emqx_bridge_resource:stop(?BRIDGE_TYPE, Name), + #{?snk_kind := kinesis_stop}, + 5_000 + ), + fun(Trace) -> + ?assertMatch([_], ?of_kind(kinesis_stop, Trace)), + ok + end + ), + ok. + +t_get_status_ok(Config) -> + ResourceId = ?config(resource_id, Config), + {ok, _} = create_bridge(Config), + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + ok. + +t_create_unhealthy(Config) -> + delete_stream(Config), + ResourceId = ?config(resource_id, Config), + {ok, _} = create_bridge(Config), + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch( + {ok, _, #{error := {unhealthy_target, _}}}, + emqx_resource_manager:lookup_cached(ResourceId) + ), + ok. + +t_get_status_unhealthy(Config) -> + delete_stream(Config), + ResourceId = ?config(resource_id, Config), + {ok, _} = create_bridge(Config), + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch( + {ok, _, #{error := {unhealthy_target, _}}}, + emqx_resource_manager:lookup_cached(ResourceId) + ), + ok. + +t_publish_success(Config) -> + ResourceId = ?config(resource_id, Config), + TelemetryTable = ?config(telemetry_table, Config), + ?assertMatch({ok, _}, create_bridge(Config)), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + assert_empty_metrics(ResourceId), + ShardIt = get_shard_iterator(Config), + Payload = <<"payload">>, + Message = emqx_message:make(?TOPIC, Payload), + emqx:publish(Message), + %% to avoid test flakiness + wait_telemetry_event(TelemetryTable, success, ResourceId), + wait_until_gauge_is(queuing, 0, 500), + wait_until_gauge_is(inflight, 0, 500), + assert_metrics( + #{ + dropped => 0, + failed => 0, + inflight => 0, + matched => 1, + queuing => 0, + retried => 0, + success => 1 + }, + ResourceId + ), + Record = wait_record(Config, ShardIt, 100, 10), + ?assertEqual(Payload, proplists:get_value(<<"Data">>, Record)), + ok. + +t_publish_success_with_template(Config) -> + ResourceId = ?config(resource_id, Config), + TelemetryTable = ?config(telemetry_table, Config), + Overrides = + #{ + <<"payload_template">> => <<"${payload.data}">>, + <<"partition_key">> => <<"${payload.key}">> + }, + ?assertMatch({ok, _}, create_bridge(Config, Overrides)), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + assert_empty_metrics(ResourceId), + ShardIt = get_shard_iterator(Config), + Payload = <<"{\"key\":\"my_key\", \"data\":\"my_data\"}">>, + Message = emqx_message:make(?TOPIC, Payload), + emqx:publish(Message), + %% to avoid test flakiness + wait_telemetry_event(TelemetryTable, success, ResourceId), + wait_until_gauge_is(queuing, 0, 500), + wait_until_gauge_is(inflight, 0, 500), + assert_metrics( + #{ + dropped => 0, + failed => 0, + inflight => 0, + matched => 1, + queuing => 0, + retried => 0, + success => 1 + }, + ResourceId + ), + Record = wait_record(Config, ShardIt, 100, 10), + ?assertEqual(<<"my_data">>, proplists:get_value(<<"Data">>, Record)), + ok. + +t_publish_multiple_msgs_success(Config) -> + ResourceId = ?config(resource_id, Config), + TelemetryTable = ?config(telemetry_table, Config), + ?assertMatch({ok, _}, create_bridge(Config)), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + assert_empty_metrics(ResourceId), + ShardIt = get_shard_iterator(Config), + lists:foreach( + fun(I) -> + Payload = "payload_" ++ to_str(I), + Message = emqx_message:make(?TOPIC, Payload), + emqx:publish(Message) + end, + lists:seq(1, 10) + ), + Records = wait_records(Config, ShardIt, 10, 100, 10), + ReceivedPayloads = + lists:map(fun(Record) -> proplists:get_value(<<"Data">>, Record) end, Records), + lists:foreach( + fun(I) -> + ExpectedPayload = to_bin("payload_" ++ to_str(I)), + ?assertEqual( + {ExpectedPayload, true}, + {ExpectedPayload, lists:member(ExpectedPayload, ReceivedPayloads)} + ) + end, + lists:seq(1, 10) + ), + %% to avoid test flakiness + wait_telemetry_event(TelemetryTable, success, ResourceId), + wait_until_gauge_is(queuing, 0, 500), + wait_until_gauge_is(inflight, 0, 500), + assert_metrics( + #{ + dropped => 0, + failed => 0, + inflight => 0, + matched => 10, + queuing => 0, + retried => 0, + success => 10 + }, + ResourceId + ), + ok. + +t_publish_unhealthy(Config) -> + ResourceId = ?config(resource_id, Config), + TelemetryTable = ?config(telemetry_table, Config), + ?assertMatch({ok, _}, create_bridge(Config)), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + assert_empty_metrics(ResourceId), + ShardIt = get_shard_iterator(Config), + Payload = <<"payload">>, + Message = emqx_message:make(?TOPIC, Payload), + delete_stream(Config), + emqx:publish(Message), + ?assertError( + {badmatch, {error, {<<"ResourceNotFoundException">>, _}}}, + wait_record(Config, ShardIt, 100, 10) + ), + %% to avoid test flakiness + wait_telemetry_event(TelemetryTable, failed, ResourceId), + wait_until_gauge_is(queuing, 0, 500), + wait_until_gauge_is(inflight, 0, 500), + assert_metrics( + #{ + dropped => 0, + failed => 1, + inflight => 0, + matched => 1, + queuing => 0, + retried => 0, + success => 0 + }, + ResourceId + ), + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch( + {ok, _, #{error := {unhealthy_target, _}}}, + emqx_resource_manager:lookup_cached(ResourceId) + ), + ok. + +t_publish_big_msg(Config) -> + ResourceId = ?config(resource_id, Config), + TelemetryTable = ?config(telemetry_table, Config), + ?assertMatch({ok, _}, create_bridge(Config)), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + assert_empty_metrics(ResourceId), + % Maximum size is 1MB. Using 1MB + 1 here. + Payload = binary:copy(<<"a">>, 1 * 1024 * 1024 + 1), + Message = emqx_message:make(?TOPIC, Payload), + emqx:publish(Message), + %% to avoid test flakiness + wait_telemetry_event(TelemetryTable, failed, ResourceId), + wait_until_gauge_is(queuing, 0, 500), + wait_until_gauge_is(inflight, 0, 500), + assert_metrics( + #{ + dropped => 0, + failed => 1, + inflight => 0, + matched => 1, + queuing => 0, + retried => 0, + success => 0 + }, + ResourceId + ), + ok. + +t_publish_connection_down(Config0) -> + Config = generate_config([{max_retries, 2} | Config0]), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + ResourceId = ?config(resource_id, Config), + TelemetryTable = ?config(telemetry_table, Config), + ?assertMatch({ok, _}, create_bridge(Config)), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + ?retry( + _Sleep1 = 1_000, + _Attempts1 = 30, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + assert_empty_metrics(ResourceId), + ShardIt = get_shard_iterator(Config), + Payload = <<"payload">>, + Message = emqx_message:make(?TOPIC, Payload), + Kind = + case proplists:get_value(batch_size, Config) of + 1 -> emqx_bridge_kinesis_impl_producer_sync_query; + _ -> emqx_bridge_kinesis_impl_producer_sync_batch_query + end, + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ct:sleep(1000), + ?wait_async_action( + emqx:publish(Message), + #{?snk_kind := Kind}, + 5_000 + ), + ct:sleep(1000) + end), + % Wait for reconnection. + ?retry( + _Sleep3 = 1_000, + _Attempts3 = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + Record = wait_record(Config, ShardIt, 2000, 10), + %% to avoid test flakiness + wait_telemetry_event(TelemetryTable, retried_success, ResourceId), + wait_until_gauge_is(queuing, 0, 500), + wait_until_gauge_is(inflight, 0, 500), + assert_metrics( + #{ + dropped => 0, + failed => 0, + inflight => 0, + matched => 1, + queuing => 0, + retried => 1, + success => 1, + retried_success => 1 + }, + ResourceId + ), + Data = proplists:get_value(<<"Data">>, Record), + ?assertEqual(Payload, Data), + ok. + +t_wrong_server(Config) -> + Name = ?config(kinesis_name, Config), + ResourceId = ?config(resource_id, Config), + Overrides = + #{ + <<"max_retries">> => 0, + <<"endpoint">> => <<"https://wrong_server:12345">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"60s">> + } + }, + ?wait_async_action( + create_bridge(Config, Overrides), + #{?snk_kind := emqx_bridge_kinesis_impl_producer_start_ok}, + 30_000 + ), + ?assertEqual({error, timeout}, emqx_resource_manager:health_check(ResourceId)), + emqx_bridge_resource:stop(?BRIDGE_TYPE, Name), + emqx_bridge_resource:remove(?BRIDGE_TYPE, Name), + ok. diff --git a/apps/emqx_s3/rebar.config b/apps/emqx_s3/rebar.config index 8b0df5c34..1d64e6677 100644 --- a/apps/emqx_s3/rebar.config +++ b/apps/emqx_s3/rebar.config @@ -1,6 +1,6 @@ {deps, [ {emqx, {path, "../../apps/emqx"}}, - {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}}, + {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}}, {emqx_bridge_http, {path, "../emqx_bridge_http"}} ]}. diff --git a/changes/ee/feat-11261.en.md b/changes/ee/feat-11261.en.md new file mode 100644 index 000000000..a23f319c8 --- /dev/null +++ b/changes/ee/feat-11261.en.md @@ -0,0 +1 @@ +Implemented Amazon Kinesis Data Streams producer data integration bridge . diff --git a/mix.exs b/mix.exs index df8cf4c3d..f14fd1a91 100644 --- a/mix.exs +++ b/mix.exs @@ -191,7 +191,8 @@ defmodule EMQXUmbrella.MixProject do :emqx_ft, :emqx_s3, :emqx_schema_registry, - :emqx_enterprise + :emqx_enterprise, + :emqx_bridge_kinesis ]) end @@ -423,7 +424,8 @@ defmodule EMQXUmbrella.MixProject do emqx_schema_registry: :permanent, emqx_eviction_agent: :permanent, emqx_node_rebalance: :permanent, - emqx_ft: :permanent + emqx_ft: :permanent, + emqx_bridge_kinesis: :permanent ], else: [ emqx_telemetry: :permanent diff --git a/rebar.config.erl b/rebar.config.erl index 9be3c68e2..7bd0f0548 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -104,6 +104,7 @@ is_community_umbrella_app("apps/emqx_ft") -> false; is_community_umbrella_app("apps/emqx_s3") -> false; is_community_umbrella_app("apps/emqx_schema_registry") -> false; is_community_umbrella_app("apps/emqx_enterprise") -> false; +is_community_umbrella_app("apps/emqx_bridge_kinesis") -> false; is_community_umbrella_app(_) -> true. is_jq_supported() -> @@ -491,7 +492,8 @@ relx_apps_per_edition(ee) -> emqx_schema_registry, emqx_eviction_agent, emqx_node_rebalance, - emqx_ft + emqx_ft, + emqx_bridge_kinesis ]; relx_apps_per_edition(ce) -> [emqx_telemetry]. diff --git a/rel/i18n/emqx_bridge_kinesis.hocon b/rel/i18n/emqx_bridge_kinesis.hocon new file mode 100644 index 000000000..42329bcd6 --- /dev/null +++ b/rel/i18n/emqx_bridge_kinesis.hocon @@ -0,0 +1,85 @@ +emqx_bridge_kinesis { + +config_enable.desc: +"""Enable or disable this bridge""" + +config_enable.label: +"""Enable Or Disable Bridge""" + +desc_config.desc: +"""Configuration for an Amazon Kinesis bridge.""" + +desc_config.label: +"""Amazon Kinesis Bridge Configuration""" + +desc_name.desc: +"""Bridge name.""" + +desc_name.label: +"""Bridge Name""" + +desc_type.desc: +"""The Bridge Type""" + +desc_type.label: +"""Bridge Type""" + +pool_size.desc: +"""The pool size.""" + +pool_size.label: +"""Pool Size""" + +local_topic.desc: +"""The MQTT topic filter to be forwarded to Amazon Kinesis. All MQTT `PUBLISH` messages with the topic +matching the `local_topic` will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also `local_topic` is +configured, then both the data got from the rule and the MQTT messages that match `local_topic` +will be forwarded.""" + +local_topic.label: +"""Local Topic""" + +payload_template.desc: +"""The template for formatting the outgoing messages. If undefined, will send all the available context in JSON format.""" + +payload_template.label: +"""Payload template""" + +aws_access_key_id.desc: +"""Access Key ID for connecting to Amazon Kinesis.""" + +aws_access_key_id.label: +"""AWS Access Key ID""" + +aws_secret_access_key.desc: +"""AWS Secret Access Key for connecting to Amazon Kinesis.""" + +aws_secret_access_key.label: +"""AWS Secret Access Key""" + +endpoint.desc: +"""The url of Amazon Kinesis endpoint.""" + +endpoint.label: +"""Amazon Kinesis Endpoint""" + +stream_name.desc: +"""The Amazon Kinesis Stream to publish messages to.""" + +stream_name.label: +"""Amazon Kinesis Stream""" + +partition_key.desc: +"""The Amazon Kinesis Partition Key associated to published message. Placeholders in format of ${var} are supported.""" + +partition_key.label: +"""Partition key""" + +max_retries.desc: +"""Max retry times if an error occurs when sending a request.""" + +max_retries.label: +"""Max Retries""" + +} diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index e4061f7cb..785d4065d 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -219,6 +219,9 @@ for dep in ${CT_DEPS}; do hstreamdb) FILES+=( '.ci/docker-compose-file/docker-compose-hstreamdb.yaml' ) ;; + kinesis) + FILES+=( '.ci/docker-compose-file/docker-compose-kinesis.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1