refactor: simplify emqx_bridge_kinesis (thanks to @thalesmg's tips)

This commit is contained in:
Kjell Winblad 2024-01-25 12:48:31 +01:00
parent cfad0923cf
commit 2887a05ba3
2 changed files with 49 additions and 101 deletions

View File

@ -3,6 +3,7 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_kinesis).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
@ -20,6 +21,9 @@
connector_examples/1
]).
-define(CONNECTOR_TYPE, kinesis).
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
@ -37,11 +41,11 @@ fields(Field) when
->
emqx_connector_schema:api_fields(
Field,
kinesis,
?CONNECTOR_TYPE,
connector_config_fields()
);
fields(action) ->
{kinesis,
{?ACTION_TYPE,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, kinesis_action)),
#{
@ -174,18 +178,12 @@ fields("config_connector") ->
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields("put_bridge_v2") ->
fields(kinesis_action);
fields("get_bridge_v2") ->
fields(kinesis_action);
fields("post_bridge_v2") ->
fields("post", kinesis, kinesis_action).
fields("post", Type, StructName) ->
[type_field(Type), name_field() | fields(StructName)].
type_field(Type) ->
{type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(kinesis_action)).
desc("config_producer") ->
?DESC("desc_config");
@ -202,12 +200,12 @@ desc(connector_resource_opts) ->
desc(_) ->
undefined.
conn_bridge_examples(Method) ->
conn_bridge_examples(_Method) ->
[
#{
<<"kinesis_producer">> => #{
summary => <<"Amazon Kinesis Producer Bridge">>,
value => values(producer, Method)
value => conn_bridge_values()
}
}
].
@ -215,102 +213,52 @@ conn_bridge_examples(Method) ->
connector_examples(Method) ->
[
#{
<<"kinesis">> => #{
summary => <<"Kinesis Connector">>,
value => values({Method, connector})
}
<<"kinesis">> =>
#{
summary => <<"Kinesis Connector">>,
value => emqx_connector_schema:connector_values(
Method, ?CONNECTOR_TYPE, connector_values()
)
}
}
].
bridge_v2_examples(Method) ->
[
#{
<<"kinesis">> => #{
summary => <<"Kinesis Action">>,
value => values({Method, bridge_v2_producer})
}
}
].
values({get, connector}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
],
actions => [<<"my_action">>]
},
values({post, connector})
);
values({get, Type}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
},
values({post, Type})
);
values({post, connector}) ->
maps:merge(
#{
name => <<"my_kinesis_connector">>,
type => <<"kinesis">>
},
values(common_config)
);
values({post, Type}) ->
maps:merge(
#{
name => <<"my_kinesis_action">>,
type => <<"kinesis">>
},
values({put, Type})
);
values({put, bridge_v2_producer}) ->
values(bridge_v2_producer);
values({put, connector}) ->
values(common_config);
values({put, Type}) ->
maps:merge(values(common_config), values(Type));
values(bridge_v2_producer) ->
connector_values() ->
#{
enable => true,
connector => <<"my_kinesis_connector">>,
parameters => values(producer_values),
resource_opts => #{
<<"batch_size">> => 100,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"request_ttl">> => <<"45s">>
}
};
values(common_config) ->
#{
<<"enable">> => true,
<<"aws_access_key_id">> => <<"your_access_key">>,
<<"aws_secret_access_key">> => <<"aws_secret_key">>,
<<"endpoint">> => <<"http://localhost:4566">>,
<<"max_retries">> => 2,
<<"pool_size">> => 8
};
values(producer_values) ->
#{
<<"partition_key">> => <<"any_key">>,
<<"payload_template">> => <<"${.}">>,
<<"stream_name">> => <<"my_stream">>
}.
values(producer, _Method) ->
bridge_v2_examples(Method) ->
[
#{
<<"kinesis">> =>
#{
summary => <<"Kinesis Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
)
}
}
].
action_values() ->
#{
parameters => #{
<<"partition_key">> => <<"any_key">>,
<<"payload_template">> => <<"${.}">>,
<<"stream_name">> => <<"my_stream">>
}
}.
conn_bridge_values() ->
#{
enable => true,
type => kinesis_producer,
name => <<"foo">>,
aws_access_key_id => <<"aws_access_key_id">>,
aws_secret_access_key => <<"******">>,
endpoint => <<"https://kinesis.us-east-1.amazonaws.com">>,

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_kinesis_action_info).