fix(sources): remote irrelevant `resource_opts` fields for sources

Since they don't use buffer workers, they shouldn't have buffer-related sub-fields.
This commit is contained in:
Thales Macedo Garitezi 2024-01-16 10:53:27 -03:00
parent a8f9e5676f
commit 2a41cad54f
12 changed files with 84 additions and 56 deletions

View File

@ -53,7 +53,8 @@
-export([action_types/0, action_types_sc/0]). -export([action_types/0, action_types_sc/0]).
-export([source_types/0, source_types_sc/0]). -export([source_types/0, source_types_sc/0]).
-export([resource_opts_fields/0, resource_opts_fields/1]). -export([action_resource_opts_fields/0, action_resource_opts_fields/1]).
-export([source_resource_opts_fields/0, source_resource_opts_fields/1]).
-export([ -export([
api_fields/3 api_fields/3
@ -63,7 +64,8 @@
make_producer_action_schema/1, make_producer_action_schema/2, make_producer_action_schema/1, make_producer_action_schema/2,
make_consumer_action_schema/1, make_consumer_action_schema/2, make_consumer_action_schema/1, make_consumer_action_schema/2,
top_level_common_action_keys/0, top_level_common_action_keys/0,
project_to_actions_resource_opts/1 project_to_actions_resource_opts/1,
project_to_sources_resource_opts/1
]). ]).
-export([actions_convert_from_connectors/1]). -export([actions_convert_from_connectors/1]).
@ -317,8 +319,10 @@ fields(actions) ->
registered_schema_fields_actions(); registered_schema_fields_actions();
fields(sources) -> fields(sources) ->
registered_schema_fields_sources(); registered_schema_fields_sources();
fields(resource_opts) -> fields(action_resource_opts) ->
resource_opts_fields(_Overrides = []). action_resource_opts_fields(_Overrides = []);
fields(source_resource_opts) ->
source_resource_opts_fields(_Overrides = []).
registered_schema_fields_actions() -> registered_schema_fields_actions() ->
[ [
@ -336,7 +340,9 @@ desc(actions) ->
?DESC("desc_bridges_v2"); ?DESC("desc_bridges_v2");
desc(sources) -> desc(sources) ->
?DESC("desc_sources"); ?DESC("desc_sources");
desc(resource_opts) -> desc(action_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(source_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts"); ?DESC(emqx_resource_schema, "resource_opts");
desc(_) -> desc(_) ->
undefined. undefined.
@ -357,10 +363,13 @@ source_types() ->
source_types_sc() -> source_types_sc() ->
hoconsc:enum(source_types()). hoconsc:enum(source_types()).
resource_opts_fields() -> action_resource_opts_fields() ->
resource_opts_fields(_Overrides = []). action_resource_opts_fields(_Overrides = []).
common_resource_opts_subfields() -> source_resource_opts_fields() ->
source_resource_opts_fields(_Overrides = []).
common_action_resource_opts_subfields() ->
[ [
batch_size, batch_size,
batch_time, batch_time,
@ -376,11 +385,27 @@ common_resource_opts_subfields() ->
worker_pool_size worker_pool_size
]. ].
common_resource_opts_subfields_bin() -> common_source_resource_opts_subfields() ->
lists:map(fun atom_to_binary/1, common_resource_opts_subfields()). [
health_check_interval,
resume_interval
].
resource_opts_fields(Overrides) -> common_action_resource_opts_subfields_bin() ->
ActionROFields = common_resource_opts_subfields(), lists:map(fun atom_to_binary/1, common_action_resource_opts_subfields()).
common_source_resource_opts_subfields_bin() ->
lists:map(fun atom_to_binary/1, common_source_resource_opts_subfields()).
action_resource_opts_fields(Overrides) ->
ActionROFields = common_action_resource_opts_subfields(),
lists:filter(
fun({Key, _Sc}) -> lists:member(Key, ActionROFields) end,
emqx_resource_schema:create_opts(Overrides)
).
source_resource_opts_fields(Overrides) ->
ActionROFields = common_source_resource_opts_subfields(),
lists:filter( lists:filter(
fun({Key, _Sc}) -> lists:member(Key, ActionROFields) end, fun({Key, _Sc}) -> lists:member(Key, ActionROFields) end,
emqx_resource_schema:create_opts(Overrides) emqx_resource_schema:create_opts(Overrides)
@ -404,24 +429,12 @@ make_producer_action_schema(ActionParametersRef) ->
make_producer_action_schema(ActionParametersRef, _Opts = #{}). make_producer_action_schema(ActionParametersRef, _Opts = #{}).
make_producer_action_schema(ActionParametersRef, Opts) -> make_producer_action_schema(ActionParametersRef, Opts) ->
ResourceOptsRef = maps:get(resource_opts_ref, Opts, ref(?MODULE, action_resource_opts)),
[ [
{local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})} {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})}
| make_consumer_action_schema(ActionParametersRef, Opts) | common_schema(ActionParametersRef, Opts)
]. ] ++
make_consumer_action_schema(ActionParametersRef) ->
make_consumer_action_schema(ActionParametersRef, _Opts = #{}).
make_consumer_action_schema(ActionParametersRef, Opts) ->
ResourceOptsRef = maps:get(resource_opts_ref, Opts, ref(?MODULE, resource_opts)),
[ [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{connector,
mk(binary(), #{
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})},
{description, emqx_schema:description_schema()},
{parameters, ActionParametersRef},
{resource_opts, {resource_opts,
mk(ResourceOptsRef, #{ mk(ResourceOptsRef, #{
default => #{}, default => #{},
@ -429,8 +442,37 @@ make_consumer_action_schema(ActionParametersRef, Opts) ->
})} })}
]. ].
make_consumer_action_schema(ParametersRef) ->
make_consumer_action_schema(ParametersRef, _Opts = #{}).
make_consumer_action_schema(ParametersRef, Opts) ->
ResourceOptsRef = maps:get(resource_opts_ref, Opts, ref(?MODULE, source_resource_opts)),
common_schema(ParametersRef, Opts) ++
[
{resource_opts,
mk(ResourceOptsRef, #{
default => #{},
desc => ?DESC(emqx_resource_schema, "resource_opts")
})}
].
common_schema(ParametersRef, _Opts) ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{connector,
mk(binary(), #{
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})},
{description, emqx_schema:description_schema()},
{parameters, ParametersRef}
].
project_to_actions_resource_opts(OldResourceOpts) -> project_to_actions_resource_opts(OldResourceOpts) ->
Subfields = common_resource_opts_subfields_bin(), Subfields = common_action_resource_opts_subfields_bin(),
maps:with(Subfields, OldResourceOpts).
project_to_sources_resource_opts(OldResourceOpts) ->
Subfields = common_source_resource_opts_subfields_bin(),
maps:with(Subfields, OldResourceOpts). maps:with(Subfields, OldResourceOpts).
actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) -> actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) ->

View File

@ -455,18 +455,8 @@ source_config_base() ->
<<"qos">> => 2 <<"qos">> => 2
}, },
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"batch_size">> => 1,
<<"batch_time">> => <<"0ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"15s">>, <<"health_check_interval">> => <<"15s">>,
<<"inflight_window">> => 100, <<"resume_interval">> => <<"15s">>
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"45s">>,
<<"resume_interval">> => <<"15s">>,
<<"worker_pool_size">> => <<"1">>
} }
}. }.

View File

@ -48,7 +48,9 @@ resource_opts_union_connector_actions_test() ->
%% consciouly between connector and actions, in particular when/if we introduce new %% consciouly between connector and actions, in particular when/if we introduce new
%% fields there. %% fields there.
AllROFields = non_deprecated_fields(emqx_resource_schema:create_opts([])), AllROFields = non_deprecated_fields(emqx_resource_schema:create_opts([])),
ActionROFields = non_deprecated_fields(emqx_bridge_v2_schema:resource_opts_fields()), ActionROFields = non_deprecated_fields(
emqx_bridge_v2_schema:action_resource_opts_fields()
),
ConnectorROFields = non_deprecated_fields(emqx_connector_schema:resource_opts_fields()), ConnectorROFields = non_deprecated_fields(emqx_connector_schema:resource_opts_fields()),
UnionROFields = lists:usort(ConnectorROFields ++ ActionROFields), UnionROFields = lists:usort(ConnectorROFields ++ ActionROFields),
?assertEqual( ?assertEqual(

View File

@ -52,7 +52,7 @@ fields(action_resource_opts) ->
fun({K, _V}) -> fun({K, _V}) ->
not lists:member(K, unsupported_opts()) not lists:member(K, unsupported_opts())
end, end,
emqx_bridge_v2_schema:resource_opts_fields() emqx_bridge_v2_schema:action_resource_opts_fields()
); );
fields(action_create) -> fields(action_create) ->
[ [

View File

@ -105,7 +105,7 @@ fields(action_resource_opts) ->
UnsupportedOpts = [batch_size, batch_time], UnsupportedOpts = [batch_size, batch_time],
lists:filter( lists:filter(
fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
emqx_bridge_v2_schema:resource_opts_fields() emqx_bridge_v2_schema:action_resource_opts_fields()
); );
fields("parameters_opts") -> fields("parameters_opts") ->
[ [

View File

@ -70,7 +70,7 @@ fields(action_resource_opts) ->
fun({K, _V}) -> fun({K, _V}) ->
not lists:member(K, unsupported_opts()) not lists:member(K, unsupported_opts())
end, end,
emqx_bridge_v2_schema:resource_opts_fields() emqx_bridge_v2_schema:action_resource_opts_fields()
); );
fields(action_parameters) -> fields(action_parameters) ->
[ [

View File

@ -552,7 +552,7 @@ fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields(); emqx_connector_schema:resource_opts_fields();
fields(resource_opts) -> fields(resource_opts) ->
SupportedFields = [health_check_interval], SupportedFields = [health_check_interval],
CreationOpts = emqx_bridge_v2_schema:resource_opts_fields(), CreationOpts = emqx_bridge_v2_schema:action_resource_opts_fields(),
lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts); lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts);
fields(action_field) -> fields(action_field) ->
{kafka_producer, {kafka_producer,

View File

@ -97,7 +97,7 @@ fields(action_parameters) ->
fields(connector_resource_opts) -> fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields(); emqx_connector_schema:resource_opts_fields();
fields(action_resource_opts) -> fields(action_resource_opts) ->
emqx_bridge_v2_schema:resource_opts_fields([ emqx_bridge_v2_schema:action_resource_opts_fields([
{batch_size, #{ {batch_size, #{
importance => ?IMPORTANCE_HIDDEN, importance => ?IMPORTANCE_HIDDEN,
converter => fun(_, _) -> 1 end, converter => fun(_, _) -> 1 end,

View File

@ -116,7 +116,7 @@ bridge_v1_config_to_action_config_helper(
) -> ) ->
%% Transform the egress part to mqtt_publisher connector config %% Transform the egress part to mqtt_publisher connector config
SchemaFields = emqx_bridge_mqtt_pubsub_schema:fields("mqtt_subscriber_source"), SchemaFields = emqx_bridge_mqtt_pubsub_schema:fields("mqtt_subscriber_source"),
ResourceOptsSchemaFields = emqx_bridge_mqtt_pubsub_schema:fields(action_resource_opts), ResourceOptsSchemaFields = emqx_bridge_mqtt_pubsub_schema:fields(source_resource_opts),
ConfigMap1 = general_action_conf_map_from_bridge_v1_config( ConfigMap1 = general_action_conf_map_from_bridge_v1_config(
Config, ConnectorName, SchemaFields, ResourceOptsSchemaFields Config, ConnectorName, SchemaFields, ResourceOptsSchemaFields
), ),

View File

@ -105,8 +105,10 @@ fields(action_resource_opts) ->
UnsupportedOpts = [enable_batch, batch_size, batch_time], UnsupportedOpts = [enable_batch, batch_size, batch_time],
lists:filter( lists:filter(
fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
emqx_bridge_v2_schema:resource_opts_fields() emqx_bridge_v2_schema:action_resource_opts_fields()
); );
fields(source_resource_opts) ->
emqx_bridge_v2_schema:source_resource_opts_fields();
fields(Field) when fields(Field) when
Field == "get_bridge_v2"; Field == "get_bridge_v2";
Field == "post_bridge_v2"; Field == "post_bridge_v2";
@ -132,6 +134,8 @@ desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(action_resource_opts) -> desc(action_resource_opts) ->
?DESC(emqx_resource_schema, "creation_opts"); ?DESC(emqx_resource_schema, "creation_opts");
desc(source_resource_opts) ->
?DESC(emqx_resource_schema, "creation_opts");
desc(action_parameters) -> desc(action_parameters) ->
?DESC(action_parameters); ?DESC(action_parameters);
desc(ingress_parameters) -> desc(ingress_parameters) ->

View File

@ -112,18 +112,8 @@ source_config(Overrides0) ->
<<"qos">> => 2 <<"qos">> => 2
}, },
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"batch_size">> => 1,
<<"batch_time">> => <<"0ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"15s">>, <<"health_check_interval">> => <<"15s">>,
<<"inflight_window">> => 100, <<"resume_interval">> => <<"15s">>
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"45s">>,
<<"resume_interval">> => <<"15s">>,
<<"worker_pool_size">> => <<"1">>
} }
}, },
maps:merge(CommonConfig, Overrides). maps:merge(CommonConfig, Overrides).

View File

@ -76,7 +76,7 @@ fields(redis_action) ->
[ResOpts] = emqx_connector_schema:resource_opts_ref(?MODULE, action_resource_opts), [ResOpts] = emqx_connector_schema:resource_opts_ref(?MODULE, action_resource_opts),
lists:keyreplace(resource_opts, 1, Schema, ResOpts); lists:keyreplace(resource_opts, 1, Schema, ResOpts);
fields(action_resource_opts) -> fields(action_resource_opts) ->
emqx_bridge_v2_schema:resource_opts_fields([ emqx_bridge_v2_schema:action_resource_opts_fields([
{batch_size, #{desc => ?DESC(batch_size)}}, {batch_size, #{desc => ?DESC(batch_size)}},
{batch_time, #{desc => ?DESC(batch_time)}} {batch_time, #{desc => ?DESC(batch_time)}}
]); ]);