From fa842736d20f0427172347cdc49d2fbdda43c74c Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 19 Jan 2024 17:13:04 +0100 Subject: [PATCH] feat: refactor kinesis bridge to connector and action Fixes: https://emqx.atlassian.net/browse/EMQX-11461 --- apps/emqx_bridge/src/emqx_action_info.erl | 1 + .../src/emqx_bridge_kinesis.app.src | 4 +- .../src/emqx_bridge_kinesis.erl | 163 +++++++++++++++++- .../src/emqx_bridge_kinesis_action_info.erl | 22 +++ .../emqx_bridge_kinesis_connector_client.erl | 51 ++++-- .../src/emqx_bridge_kinesis_impl_producer.erl | 137 ++++++++++++--- ...mqx_bridge_kinesis_impl_producer_SUITE.erl | 108 ++++++++---- .../src/schema/emqx_connector_ee_schema.erl | 12 ++ .../src/schema/emqx_connector_schema.erl | 2 + .../src/emqx_resource_manager.erl | 5 + rel/i18n/emqx_bridge_kinesis.hocon | 19 ++ 11 files changed, 443 insertions(+), 81 deletions(-) create mode 100644 apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_action_info.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 8e5a823e3..7dce9d7cb 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -89,6 +89,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_confluent_producer_action_info, emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, + emqx_bridge_kinesis_action_info, emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, emqx_bridge_influxdb_action_info, diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src index 74d7dc94f..2e59fa8b2 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src @@ -1,13 +1,13 @@ {application, emqx_bridge_kinesis, [ {description, "EMQX Enterprise Amazon Kinesis Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, stdlib, erlcloud ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_kinesis_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl index 14e197113..41717abe8 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl @@ -15,7 +15,9 @@ ]). -export([ - conn_bridge_examples/1 + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 ]). %%------------------------------------------------------------------------------------------------- @@ -28,6 +30,37 @@ namespace() -> roots() -> []. +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields( + Field, + kinesis, + connector_config_fields() + ); +fields(action) -> + {kinesis, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, kinesis_action)), + #{ + desc => <<"Kinesis Action Config">>, + required => false + } + )}; +fields(action_parameters) -> + fields(producer); +fields(kinesis_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + hoconsc:ref(?MODULE, action_parameters), + #{ + required => true, + desc => ?DESC("action_parameters") + } + ) + ); fields("config_producer") -> emqx_bridge_schema:common_bridge_fields() ++ fields("resource_opts") ++ @@ -134,12 +167,38 @@ fields("get_producer") -> fields("post_producer") -> [type_field_producer(), name_field() | fields("config_producer")]; fields("put_producer") -> - fields("config_producer"). + fields("config_producer"); +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + connector_config_fields() ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields("put_bridge_v2") -> + fields(kinesis_action); +fields("get_bridge_v2") -> + fields(kinesis_action); +fields("post_bridge_v2") -> + fields("post", kinesis, kinesis_action). + +fields("post", Type, StructName) -> + [type_field(Type), name_field() | fields(StructName)]. + +type_field(Type) -> + {type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. desc("config_producer") -> ?DESC("desc_config"); desc("creation_opts") -> ?DESC(emqx_resource_schema, "creation_opts"); +desc("config_connector") -> + ?DESC("config_connector"); +desc(kinesis_action) -> + ?DESC("kinesis_action"); +desc(action_parameters) -> + ?DESC("action_parameters"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. @@ -153,6 +212,103 @@ conn_bridge_examples(Method) -> } ]. +connector_examples(Method) -> + [ + #{ + <<"kinesis">> => #{ + summary => <<"Kinesis Connector">>, + value => values({Method, connector}) + } + } + ]. + +bridge_v2_examples(Method) -> + [ + #{ + <<"kinesis">> => #{ + summary => <<"Kinesis Action">>, + value => values({Method, bridge_v2_producer}) + } + } + ]. + +values({get, connector}) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ], + actions => [<<"my_action">>] + }, + values({post, connector}) + ); +values({get, Type}) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values({post, Type}) + ); +values({post, connector}) -> + maps:merge( + #{ + name => <<"my_kinesis_connector">>, + type => <<"kinesis">> + }, + values(common_config) + ); +values({post, Type}) -> + maps:merge( + #{ + name => <<"my_kinesis_action">>, + type => <<"kinesis">> + }, + values({put, Type}) + ); +values({put, bridge_v2_producer}) -> + values(bridge_v2_producer); +values({put, connector}) -> + values(common_config); +values({put, Type}) -> + maps:merge(values(common_config), values(Type)); +values(bridge_v2_producer) -> + #{ + enable => true, + connector => <<"my_kinesis_connector">>, + parameters => values(producer_values), + resource_opts => #{ + <<"batch_size">> => 100, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"request_ttl">> => <<"45s">> + } + }; +values(common_config) -> + #{ + <<"enable">> => true, + <<"aws_access_key_id">> => <<"your_access_key">>, + <<"aws_secret_access_key">> => <<"aws_secret_key">>, + <<"endpoint">> => <<"http://localhost:4566">>, + <<"max_retries">> => 2, + <<"pool_size">> => 8 + }; +values(producer_values) -> + #{ + <<"partition_key">> => <<"any_key">>, + <<"payload_template">> => <<"${.}">>, + <<"stream_name">> => <<"my_stream">> + }. + values(producer, _Method) -> #{ aws_access_key_id => <<"aws_access_key_id">>, @@ -174,6 +330,9 @@ values(producer, _Method) -> %% Helper fns %%------------------------------------------------------------------------------------------------- +connector_config_fields() -> + fields(connector_config). + sc(Type, Meta) -> hoconsc:mk(Type, Meta). mk(Type, Meta) -> hoconsc:mk(Type, Meta). diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_action_info.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_action_info.erl new file mode 100644 index 000000000..c7fb5e1e5 --- /dev/null +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_kinesis_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> kinesis_producer. + +action_type_name() -> kinesis. + +connector_type_name() -> kinesis. + +schema_module() -> emqx_bridge_kinesis. diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl index 959b539a0..518a9b668 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl @@ -11,9 +11,7 @@ -behaviour(gen_server). -type state() :: #{ - instance_id := resource_id(), - partition_key := binary(), - stream_name := binary() + instance_id := resource_id() }. -type record() :: {Data :: binary(), PartitionKey :: binary()}. @@ -23,7 +21,8 @@ -export([ start_link/1, connection_status/1, - query/2 + connection_status/2, + query/3 ]). %% gen_server callbacks @@ -56,8 +55,16 @@ connection_status(Pid) -> {error, timeout} end. -query(Pid, Records) -> - gen_server:call(Pid, {query, Records}, infinity). +connection_status(Pid, StreamName) -> + try + gen_server:call(Pid, {connection_status, StreamName}, ?HEALTH_CHECK_TIMEOUT) + catch + _:_ -> + {error, timeout} + end. + +query(Pid, Records, StreamName) -> + gen_server:call(Pid, {query, Records, StreamName}, infinity). %%-------------------------------------------------------------------- %% @doc @@ -72,13 +79,12 @@ start_link(Options) -> %%%=================================================================== %% Initialize kinesis connector --spec init(emqx_bridge_kinesis_impl_producer:config()) -> {ok, state()}. +-spec init(emqx_bridge_kinesis_impl_producer:config_connector()) -> + {ok, state()} | {stop, Reason :: term()}. init(#{ aws_access_key_id := AwsAccessKey, aws_secret_access_key := AwsSecretAccessKey, endpoint := Endpoint, - partition_key := PartitionKey, - stream_name := StreamName, max_retries := MaxRetries, instance_id := InstanceId }) -> @@ -93,9 +99,7 @@ init(#{ } ), State = #{ - instance_id => InstanceId, - partition_key => PartitionKey, - stream_name => StreamName + instance_id => InstanceId }, %% TODO: teach `erlcloud` to to accept 0-arity closures as passwords. ok = erlcloud_config:configure( @@ -124,18 +128,19 @@ init(#{ {stop, Reason} end. -handle_call(connection_status, _From, #{stream_name := StreamName} = State) -> +handle_call({connection_status, StreamName}, _From, State) -> + Status = get_status(StreamName), + {reply, Status, State}; +handle_call(connection_status, _From, State) -> Status = - case erlcloud_kinesis:describe_stream(StreamName) of - {ok, _} -> + case erlcloud_kinesis:list_streams() of + {ok, _ListStreamsResult} -> {ok, connected}; - {error, {<<"ResourceNotFoundException">>, _}} -> - {error, unhealthy_target}; Error -> {error, Error} end, {reply, Status, State}; -handle_call({query, Records}, _From, #{stream_name := StreamName} = State) -> +handle_call({query, Records, StreamName}, _From, State) -> Result = do_query(StreamName, Records), {reply, Result, State}; handle_call(_Request, _From, State) -> @@ -158,6 +163,16 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== +get_status(StreamName) -> + case erlcloud_kinesis:describe_stream(StreamName) of + {ok, _} -> + {ok, connected}; + {error, {<<"ResourceNotFoundException">>, _}} -> + {error, unhealthy_target}; + Error -> + {error, Error} + end. + -spec do_query(binary(), [record()]) -> {ok, jsx:json_term() | binary()} | {error, {unrecoverable_error, term()}} diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl index decf3e83b..b71373897 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl @@ -13,27 +13,20 @@ "Kinesis stream is invalid. Please check if the stream exist in Kinesis account." ). --type config() :: #{ +-type config_connector() :: #{ aws_access_key_id := binary(), aws_secret_access_key := emqx_secret:t(binary()), endpoint := binary(), - stream_name := binary(), - partition_key := binary(), - payload_template := binary(), max_retries := non_neg_integer(), pool_size := non_neg_integer(), instance_id => resource_id(), any() => term() }. --type templates() :: #{ - partition_key := list(), - send_message := list() -}. -type state() :: #{ pool_name := resource_id(), - templates := templates() + installed_channels := map() }. --export_type([config/0]). +-export_type([config_connector/0]). %% `emqx_resource' API -export([ @@ -42,7 +35,11 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([ @@ -55,7 +52,7 @@ callback_mode() -> always_sync. --spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. +-spec on_start(resource_id(), config_connector()) -> {ok, state()} | {error, term()}. on_start( InstanceId, #{ @@ -72,10 +69,9 @@ on_start( {config, Config}, {pool_size, PoolSize} ], - Templates = parse_template(Config), State = #{ pool_name => InstanceId, - templates => Templates + installed_channels => #{} }, case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of @@ -123,31 +119,111 @@ on_get_status(_InstanceId, #{pool_name := Pool} = State) -> disconnected end. +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId, + ChannelConfig +) -> + {ok, ChannelState} = create_channel_state(ChannelConfig), + NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +create_channel_state( + #{parameters := Parameters} = _ChannelConfig +) -> + #{ + stream_name := StreamName, + partition_key := PartitionKey + } = Parameters, + {ok, #{ + templates => parse_template(Parameters), + stream_name => StreamName, + partition_key => PartitionKey + }}. + +on_remove_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId +) -> + NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +on_get_channel_status( + _ResId, + ChannelId, + #{ + pool_name := PoolName, + installed_channels := Channels + } = State +) -> + #{stream_name := StreamName} = maps:get(ChannelId, Channels), + case + emqx_resource_pool:health_check_workers( + PoolName, + {emqx_bridge_kinesis_connector_client, connection_status, [StreamName]}, + ?HEALTH_CHECK_TIMEOUT, + #{return_values => true} + ) + of + {ok, Values} -> + AllOk = lists:all(fun(S) -> S =:= {ok, connected} end, Values), + case AllOk of + true -> + connected; + false -> + Unhealthy = lists:any(fun(S) -> S =:= {error, unhealthy_target} end, Values), + case Unhealthy of + true -> {disconnected, {unhealthy_target, ?TOPIC_MESSAGE}}; + false -> disconnected + end + end; + {error, Reason} -> + ?SLOG(error, #{ + msg => "kinesis_producer_get_status_failed", + state => State, + reason => Reason + }), + disconnected + end. + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + -spec on_query( resource_id(), - {send_message, map()}, + {channel_id(), map()}, state() ) -> {ok, map()} | {error, {recoverable_error, term()}} | {error, term()}. -on_query(ResourceId, {send_message, Message}, State) -> - Requests = [{send_message, Message}], +on_query(ResourceId, {ChannelId, Message}, State) -> + Requests = [{ChannelId, Message}], ?tp(emqx_bridge_kinesis_impl_producer_sync_query, #{message => Message}), - do_send_requests_sync(ResourceId, Requests, State). + do_send_requests_sync(ResourceId, Requests, State, ChannelId). -spec on_batch_query( resource_id(), - [{send_message, map()}], + [{channel_id(), map()}], state() ) -> {ok, map()} | {error, {recoverable_error, term()}} | {error, term()}. %% we only support batch insert -on_batch_query(ResourceId, [{send_message, _} | _] = Requests, State) -> +on_batch_query(ResourceId, [{ChannelId, _} | _] = Requests, State) -> ?tp(emqx_bridge_kinesis_impl_producer_sync_batch_query, #{requests => Requests}), - do_send_requests_sync(ResourceId, Requests, State). + do_send_requests_sync(ResourceId, Requests, State, ChannelId). connect(Opts) -> Options = proplists:get_value(config, Opts), @@ -159,8 +235,9 @@ connect(Opts) -> -spec do_send_requests_sync( resource_id(), - [{send_message, map()}], - state() + [{channel_id(), map()}], + state(), + channel_id() ) -> {ok, jsx:json_term() | binary()} | {error, {recoverable_error, term()}} @@ -171,12 +248,20 @@ connect(Opts) -> do_send_requests_sync( InstanceId, Requests, - #{pool_name := PoolName, templates := Templates} + #{ + pool_name := PoolName, + installed_channels := InstalledChannels + } = _State, + ChannelId ) -> + #{ + templates := Templates, + stream_name := StreamName + } = maps:get(ChannelId, InstalledChannels), Records = render_records(Requests, Templates), Result = ecpool:pick_and_do( PoolName, - {emqx_bridge_kinesis_connector_client, query, [Records]}, + {emqx_bridge_kinesis_connector_client, query, [Records, StreamName]}, no_handover ), handle_result(Result, Requests, InstanceId). @@ -239,7 +324,7 @@ render_records(Items, Templates) -> render_messages([], _Templates, RenderedMsgs) -> RenderedMsgs; render_messages( - [{send_message, Msg} | Others], + [{_, Msg} | Others], {MsgTemplate, PartitionKeyTemplate} = Templates, RenderedMsgs ) -> diff --git a/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl b/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl index 61b354ea3..04a084462 100644 --- a/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl @@ -13,6 +13,7 @@ -define(BRIDGE_TYPE, kinesis_producer). -define(BRIDGE_TYPE_BIN, <<"kinesis_producer">>). +-define(BRIDGE_V2_TYPE_BIN, <<"kinesis">>). -define(KINESIS_PORT, 4566). -define(KINESIS_ACCESS_KEY, "aws_access_key_id"). -define(KINESIS_SECRET_KEY, "aws_secret_access_key"). @@ -48,7 +49,7 @@ init_per_suite(Config) -> [ {proxy_host, ProxyHost}, {proxy_port, ProxyPort}, - {kinesis_port, ?KINESIS_PORT}, + {kinesis_port, list_to_integer(os:getenv("KINESIS_PORT", integer_to_list(?KINESIS_PORT)))}, {kinesis_secretfile, SecretFile}, {proxy_name, ProxyName} | Config @@ -116,7 +117,7 @@ generate_config(Config0) -> } ), ErlcloudConfig = erlcloud_kinesis:new("access_key", "secret", Host, Port, Scheme ++ "://"), - ResourceId = emqx_bridge_resource:resource_id(?BRIDGE_TYPE_BIN, Name), + ResourceId = connector_resource_id(?BRIDGE_V2_TYPE_BIN, Name), BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, Name), [ {kinesis_name, Name}, @@ -129,6 +130,9 @@ generate_config(Config0) -> | Config0 ]. +connector_resource_id(BridgeType, Name) -> + <<"connector:", BridgeType/binary, ":", Name/binary>>. + kinesis_config(Config) -> QueryMode = proplists:get_value(query_mode, Config, async), Scheme = proplists:get_value(connection_scheme, Config, "http"), @@ -505,7 +509,7 @@ t_start_failed_then_fix(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), - ResourceId = ?config(resource_id, Config), + Name = ?config(kinesis_name, Config), emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ct:sleep(1000), ?wait_async_action( @@ -517,7 +521,7 @@ t_start_failed_then_fix(Config) -> ?retry( _Sleep1 = 1_000, _Attempts1 = 30, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name)) ), ok. @@ -538,40 +542,58 @@ t_stop(Config) -> ok. t_get_status_ok(Config) -> - ResourceId = ?config(resource_id, Config), + Name = ?config(kinesis_name, Config), {ok, _} = create_bridge(Config), - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name)), ok. t_create_unhealthy(Config) -> delete_stream(Config), - ResourceId = ?config(resource_id, Config), + Name = ?config(kinesis_name, Config), {ok, _} = create_bridge(Config), - ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), ?assertMatch( - {ok, _, #{error := {unhealthy_target, _}}}, - emqx_resource_manager:lookup_cached(ResourceId) + #{ + status := disconnected, + error := {unhealthy_target, _} + }, + emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name) ), ok. t_get_status_unhealthy(Config) -> - delete_stream(Config), - ResourceId = ?config(resource_id, Config), + Name = ?config(kinesis_name, Config), {ok, _} = create_bridge(Config), - ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), ?assertMatch( - {ok, _, #{error := {unhealthy_target, _}}}, - emqx_resource_manager:lookup_cached(ResourceId) + #{ + status := connected + }, + emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name) + ), + delete_stream(Config), + ?retry( + 100, + 100, + fun() -> + ?assertMatch( + #{ + status := disconnected, + error := {unhealthy_target, _} + }, + emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name) + ) + end ), ok. t_publish_success(Config) -> ResourceId = ?config(resource_id, Config), TelemetryTable = ?config(telemetry_table, Config), + Name = ?config(kinesis_name, Config), ?assertMatch({ok, _}, create_bridge(Config)), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - assert_empty_metrics(ResourceId), + ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name), + assert_empty_metrics(ActionId), ShardIt = get_shard_iterator(Config), Payload = <<"payload">>, Message = emqx_message:make(?TOPIC, Payload), @@ -590,7 +612,7 @@ t_publish_success(Config) -> retried => 0, success => 1 }, - ResourceId + ActionId ), Record = wait_record(Config, ShardIt, 100, 10), ?assertEqual(Payload, proplists:get_value(<<"Data">>, Record)), @@ -599,6 +621,7 @@ t_publish_success(Config) -> t_publish_success_with_template(Config) -> ResourceId = ?config(resource_id, Config), TelemetryTable = ?config(telemetry_table, Config), + Name = ?config(kinesis_name, Config), Overrides = #{ <<"payload_template">> => <<"${payload.data}">>, @@ -607,7 +630,8 @@ t_publish_success_with_template(Config) -> ?assertMatch({ok, _}, create_bridge(Config, Overrides)), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - assert_empty_metrics(ResourceId), + ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name), + assert_empty_metrics(ActionId), ShardIt = get_shard_iterator(Config), Payload = <<"{\"key\":\"my_key\", \"data\":\"my_data\"}">>, Message = emqx_message:make(?TOPIC, Payload), @@ -626,7 +650,7 @@ t_publish_success_with_template(Config) -> retried => 0, success => 1 }, - ResourceId + ActionId ), Record = wait_record(Config, ShardIt, 100, 10), ?assertEqual(<<"my_data">>, proplists:get_value(<<"Data">>, Record)), @@ -635,10 +659,12 @@ t_publish_success_with_template(Config) -> t_publish_multiple_msgs_success(Config) -> ResourceId = ?config(resource_id, Config), TelemetryTable = ?config(telemetry_table, Config), + Name = ?config(kinesis_name, Config), ?assertMatch({ok, _}, create_bridge(Config)), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - assert_empty_metrics(ResourceId), + ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name), + assert_empty_metrics(ActionId), ShardIt = get_shard_iterator(Config), lists:foreach( fun(I) -> @@ -675,17 +701,19 @@ t_publish_multiple_msgs_success(Config) -> retried => 0, success => 10 }, - ResourceId + ActionId ), ok. t_publish_unhealthy(Config) -> ResourceId = ?config(resource_id, Config), TelemetryTable = ?config(telemetry_table, Config), + Name = ?config(kinesis_name, Config), ?assertMatch({ok, _}, create_bridge(Config)), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - assert_empty_metrics(ResourceId), + ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name), + assert_empty_metrics(ActionId), ShardIt = get_shard_iterator(Config), Payload = <<"payload">>, Message = emqx_message:make(?TOPIC, Payload), @@ -709,22 +737,26 @@ t_publish_unhealthy(Config) -> retried => 0, success => 0 }, - ResourceId + ActionId ), - ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), ?assertMatch( - {ok, _, #{error := {unhealthy_target, _}}}, - emqx_resource_manager:lookup_cached(ResourceId) + #{ + status := disconnected, + error := {unhealthy_target, _} + }, + emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name) ), ok. t_publish_big_msg(Config) -> ResourceId = ?config(resource_id, Config), TelemetryTable = ?config(telemetry_table, Config), + Name = ?config(kinesis_name, Config), ?assertMatch({ok, _}, create_bridge(Config)), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - assert_empty_metrics(ResourceId), + ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name), + assert_empty_metrics(ActionId), % Maximum size is 1MB. Using 1MB + 1 here. Payload = binary:copy(<<"a">>, 1 * 1024 * 1024 + 1), Message = emqx_message:make(?TOPIC, Payload), @@ -743,7 +775,7 @@ t_publish_big_msg(Config) -> retried => 0, success => 0 }, - ResourceId + ActionId ), ok. @@ -754,15 +786,20 @@ t_publish_connection_down(Config0) -> ProxyName = ?config(proxy_name, Config), ResourceId = ?config(resource_id, Config), TelemetryTable = ?config(telemetry_table, Config), + Name = ?config(kinesis_name, Config), ?assertMatch({ok, _}, create_bridge(Config)), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), ?retry( _Sleep1 = 1_000, _Attempts1 = 30, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertMatch( + #{status := connected}, + emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name) + ) ), emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - assert_empty_metrics(ResourceId), + ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name), + assert_empty_metrics(ActionId), ShardIt = get_shard_iterator(Config), Payload = <<"payload">>, Message = emqx_message:make(?TOPIC, Payload), @@ -784,7 +821,10 @@ t_publish_connection_down(Config0) -> ?retry( _Sleep3 = 1_000, _Attempts3 = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertMatch( + #{status := connected}, + emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name) + ) ), Record = wait_record(Config, ShardIt, 2000, 10), %% to avoid test flakiness @@ -802,7 +842,7 @@ t_publish_connection_down(Config0) -> success => 1, retried_success => 1 }, - ResourceId + ActionId ), Data = proplists:get_value(<<"Data">>, Record), ?assertEqual(Payload, Data), @@ -880,9 +920,11 @@ t_empty_payload_template(Config) -> ResourceId = ?config(resource_id, Config), TelemetryTable = ?config(telemetry_table, Config), Removes = [<<"payload_template">>], + Name = ?config(kinesis_name, Config), ?assertMatch({ok, _}, create_bridge(Config, #{}, Removes)), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name), assert_empty_metrics(ResourceId), ShardIt = get_shard_iterator(Config), Payload = <<"payload">>, @@ -902,7 +944,7 @@ t_empty_payload_template(Config) -> retried => 0, success => 1 }, - ResourceId + ActionId ), Record = wait_record(Config, ShardIt, 100, 10), Data = proplists:get_value(<<"Data">>, Record), diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 077723538..8e81b12ea 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -30,6 +30,8 @@ resource_type(gcp_pubsub_producer) -> emqx_bridge_gcp_pubsub_impl_producer; resource_type(kafka_producer) -> emqx_bridge_kafka_impl_producer; +resource_type(kinesis) -> + emqx_bridge_kinesis_impl_producer; resource_type(matrix) -> emqx_postgresql; resource_type(mongodb) -> @@ -112,6 +114,14 @@ connector_structs() -> required => false } )}, + {kinesis, + mk( + hoconsc:map(name, ref(emqx_bridge_kinesis, "config_connector")), + #{ + desc => <<"Kinesis Connector Config">>, + required => false + } + )}, {matrix, mk( hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")), @@ -224,6 +234,7 @@ schema_modules() -> emqx_bridge_confluent_producer, emqx_bridge_gcp_pubsub_producer_schema, emqx_bridge_kafka, + emqx_bridge_kinesis, emqx_bridge_matrix, emqx_bridge_mongodb, emqx_bridge_influxdb, @@ -255,6 +266,7 @@ api_schemas(Method) -> Method ++ "_connector" ), api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), + api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"), api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 3c5fdfc03..d5c450529 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -131,6 +131,8 @@ connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; +connector_type_to_bridge_types(kinesis) -> + [kinesis, kinesis_producer]; connector_type_to_bridge_types(matrix) -> [matrix]; connector_type_to_bridge_types(mongodb) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 4fd566f26..a9b4ebeb1 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -1247,6 +1247,11 @@ channel_status({?status_connecting, Error}) -> status => ?status_connecting, error => Error }; +channel_status({?status_disconnected, Error}) -> + #{ + status => ?status_disconnected, + error => Error + }; channel_status(?status_disconnected) -> #{ status => ?status_disconnected, diff --git a/rel/i18n/emqx_bridge_kinesis.hocon b/rel/i18n/emqx_bridge_kinesis.hocon index 188ab82f3..bd4a3080b 100644 --- a/rel/i18n/emqx_bridge_kinesis.hocon +++ b/rel/i18n/emqx_bridge_kinesis.hocon @@ -82,4 +82,23 @@ max_retries.desc: max_retries.label: """Max Retries""" +action_parameters.desc: +"""Action specific configuration.""" + +action_parameters.label: +"""Action""" + +kinesis_action.desc: +"""Configuration for Kinesis Action""" + +kinesis_action.label: +"""Kinesis Action Configuration""" + + +config_connector.desc: +"""Configuration for a Kinesis Client.""" + +config_connector.label: +"""Kinesis Client Configuration""" + }