diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl index 41717abe8..1ce62dcda 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl @@ -3,6 +3,7 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_kinesis). + -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -20,6 +21,9 @@ connector_examples/1 ]). +-define(CONNECTOR_TYPE, kinesis). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). + %%------------------------------------------------------------------------------------------------- %% `hocon_schema' API %%------------------------------------------------------------------------------------------------- @@ -37,11 +41,11 @@ fields(Field) when -> emqx_connector_schema:api_fields( Field, - kinesis, + ?CONNECTOR_TYPE, connector_config_fields() ); fields(action) -> - {kinesis, + {?ACTION_TYPE, hoconsc:mk( hoconsc:map(name, hoconsc:ref(?MODULE, kinesis_action)), #{ @@ -174,18 +178,12 @@ fields("config_connector") -> emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields(connector_resource_opts) -> emqx_connector_schema:resource_opts_fields(); -fields("put_bridge_v2") -> - fields(kinesis_action); -fields("get_bridge_v2") -> - fields(kinesis_action); -fields("post_bridge_v2") -> - fields("post", kinesis, kinesis_action). - -fields("post", Type, StructName) -> - [type_field(Type), name_field() | fields(StructName)]. - -type_field(Type) -> - {type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(kinesis_action)). desc("config_producer") -> ?DESC("desc_config"); @@ -202,12 +200,12 @@ desc(connector_resource_opts) -> desc(_) -> undefined. -conn_bridge_examples(Method) -> +conn_bridge_examples(_Method) -> [ #{ <<"kinesis_producer">> => #{ summary => <<"Amazon Kinesis Producer Bridge">>, - value => values(producer, Method) + value => conn_bridge_values() } } ]. @@ -215,102 +213,52 @@ conn_bridge_examples(Method) -> connector_examples(Method) -> [ #{ - <<"kinesis">> => #{ - summary => <<"Kinesis Connector">>, - value => values({Method, connector}) - } + <<"kinesis">> => + #{ + summary => <<"Kinesis Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_values() + ) + } } ]. -bridge_v2_examples(Method) -> - [ - #{ - <<"kinesis">> => #{ - summary => <<"Kinesis Action">>, - value => values({Method, bridge_v2_producer}) - } - } - ]. - -values({get, connector}) -> - maps:merge( - #{ - status => <<"connected">>, - node_status => [ - #{ - node => <<"emqx@localhost">>, - status => <<"connected">> - } - ], - actions => [<<"my_action">>] - }, - values({post, connector}) - ); -values({get, Type}) -> - maps:merge( - #{ - status => <<"connected">>, - node_status => [ - #{ - node => <<"emqx@localhost">>, - status => <<"connected">> - } - ] - }, - values({post, Type}) - ); -values({post, connector}) -> - maps:merge( - #{ - name => <<"my_kinesis_connector">>, - type => <<"kinesis">> - }, - values(common_config) - ); -values({post, Type}) -> - maps:merge( - #{ - name => <<"my_kinesis_action">>, - type => <<"kinesis">> - }, - values({put, Type}) - ); -values({put, bridge_v2_producer}) -> - values(bridge_v2_producer); -values({put, connector}) -> - values(common_config); -values({put, Type}) -> - maps:merge(values(common_config), values(Type)); -values(bridge_v2_producer) -> +connector_values() -> #{ - enable => true, - connector => <<"my_kinesis_connector">>, - parameters => values(producer_values), - resource_opts => #{ - <<"batch_size">> => 100, - <<"inflight_window">> => 100, - <<"max_buffer_bytes">> => <<"256MB">>, - <<"request_ttl">> => <<"45s">> - } - }; -values(common_config) -> - #{ - <<"enable">> => true, <<"aws_access_key_id">> => <<"your_access_key">>, <<"aws_secret_access_key">> => <<"aws_secret_key">>, <<"endpoint">> => <<"http://localhost:4566">>, <<"max_retries">> => 2, <<"pool_size">> => 8 - }; -values(producer_values) -> - #{ - <<"partition_key">> => <<"any_key">>, - <<"payload_template">> => <<"${.}">>, - <<"stream_name">> => <<"my_stream">> }. -values(producer, _Method) -> +bridge_v2_examples(Method) -> + [ + #{ + <<"kinesis">> => + #{ + summary => <<"Kinesis Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> #{ + parameters => #{ + <<"partition_key">> => <<"any_key">>, + <<"payload_template">> => <<"${.}">>, + <<"stream_name">> => <<"my_stream">> + } + }. + +conn_bridge_values() -> + #{ + enable => true, + type => kinesis_producer, + name => <<"foo">>, aws_access_key_id => <<"aws_access_key_id">>, aws_secret_access_key => <<"******">>, endpoint => <<"https://kinesis.us-east-1.amazonaws.com">>, diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_action_info.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_action_info.erl index c7fb5e1e5..7987315e4 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_action_info.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_action_info.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_kinesis_action_info).