diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 35616ae7e..5b9500156 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -53,7 +53,8 @@ -export([action_types/0, action_types_sc/0]). -export([source_types/0, source_types_sc/0]). --export([resource_opts_fields/0, resource_opts_fields/1]). +-export([action_resource_opts_fields/0, action_resource_opts_fields/1]). +-export([source_resource_opts_fields/0, source_resource_opts_fields/1]). -export([ api_fields/3 @@ -63,7 +64,8 @@ make_producer_action_schema/1, make_producer_action_schema/2, make_consumer_action_schema/1, make_consumer_action_schema/2, top_level_common_action_keys/0, - project_to_actions_resource_opts/1 + project_to_actions_resource_opts/1, + project_to_sources_resource_opts/1 ]). -export([actions_convert_from_connectors/1]). @@ -317,8 +319,10 @@ fields(actions) -> registered_schema_fields_actions(); fields(sources) -> registered_schema_fields_sources(); -fields(resource_opts) -> - resource_opts_fields(_Overrides = []). +fields(action_resource_opts) -> + action_resource_opts_fields(_Overrides = []); +fields(source_resource_opts) -> + source_resource_opts_fields(_Overrides = []). registered_schema_fields_actions() -> [ @@ -336,7 +340,9 @@ desc(actions) -> ?DESC("desc_bridges_v2"); desc(sources) -> ?DESC("desc_sources"); -desc(resource_opts) -> +desc(action_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); +desc(source_resource_opts) -> ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. @@ -357,10 +363,13 @@ source_types() -> source_types_sc() -> hoconsc:enum(source_types()). -resource_opts_fields() -> - resource_opts_fields(_Overrides = []). +action_resource_opts_fields() -> + action_resource_opts_fields(_Overrides = []). -common_resource_opts_subfields() -> +source_resource_opts_fields() -> + source_resource_opts_fields(_Overrides = []). + +common_action_resource_opts_subfields() -> [ batch_size, batch_time, @@ -376,11 +385,27 @@ common_resource_opts_subfields() -> worker_pool_size ]. -common_resource_opts_subfields_bin() -> - lists:map(fun atom_to_binary/1, common_resource_opts_subfields()). +common_source_resource_opts_subfields() -> + [ + health_check_interval, + resume_interval + ]. -resource_opts_fields(Overrides) -> - ActionROFields = common_resource_opts_subfields(), +common_action_resource_opts_subfields_bin() -> + lists:map(fun atom_to_binary/1, common_action_resource_opts_subfields()). + +common_source_resource_opts_subfields_bin() -> + lists:map(fun atom_to_binary/1, common_source_resource_opts_subfields()). + +action_resource_opts_fields(Overrides) -> + ActionROFields = common_action_resource_opts_subfields(), + lists:filter( + fun({Key, _Sc}) -> lists:member(Key, ActionROFields) end, + emqx_resource_schema:create_opts(Overrides) + ). + +source_resource_opts_fields(Overrides) -> + ActionROFields = common_source_resource_opts_subfields(), lists:filter( fun({Key, _Sc}) -> lists:member(Key, ActionROFields) end, emqx_resource_schema:create_opts(Overrides) @@ -404,16 +429,34 @@ make_producer_action_schema(ActionParametersRef) -> make_producer_action_schema(ActionParametersRef, _Opts = #{}). make_producer_action_schema(ActionParametersRef, Opts) -> + ResourceOptsRef = maps:get(resource_opts_ref, Opts, ref(?MODULE, action_resource_opts)), [ {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})} - | make_consumer_action_schema(ActionParametersRef, Opts) - ]. + | common_schema(ActionParametersRef, Opts) + ] ++ + [ + {resource_opts, + mk(ResourceOptsRef, #{ + default => #{}, + desc => ?DESC(emqx_resource_schema, "resource_opts") + })} + ]. -make_consumer_action_schema(ActionParametersRef) -> - make_consumer_action_schema(ActionParametersRef, _Opts = #{}). +make_consumer_action_schema(ParametersRef) -> + make_consumer_action_schema(ParametersRef, _Opts = #{}). -make_consumer_action_schema(ActionParametersRef, Opts) -> - ResourceOptsRef = maps:get(resource_opts_ref, Opts, ref(?MODULE, resource_opts)), +make_consumer_action_schema(ParametersRef, Opts) -> + ResourceOptsRef = maps:get(resource_opts_ref, Opts, ref(?MODULE, source_resource_opts)), + common_schema(ParametersRef, Opts) ++ + [ + {resource_opts, + mk(ResourceOptsRef, #{ + default => #{}, + desc => ?DESC(emqx_resource_schema, "resource_opts") + })} + ]. + +common_schema(ParametersRef, _Opts) -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {connector, @@ -421,16 +464,15 @@ make_consumer_action_schema(ActionParametersRef, Opts) -> desc => ?DESC(emqx_connector_schema, "connector_field"), required => true })}, {description, emqx_schema:description_schema()}, - {parameters, ActionParametersRef}, - {resource_opts, - mk(ResourceOptsRef, #{ - default => #{}, - desc => ?DESC(emqx_resource_schema, "resource_opts") - })} + {parameters, ParametersRef} ]. project_to_actions_resource_opts(OldResourceOpts) -> - Subfields = common_resource_opts_subfields_bin(), + Subfields = common_action_resource_opts_subfields_bin(), + maps:with(Subfields, OldResourceOpts). + +project_to_sources_resource_opts(OldResourceOpts) -> + Subfields = common_source_resource_opts_subfields_bin(), maps:with(Subfields, OldResourceOpts). actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index fabaadb92..fc9c9573f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -455,18 +455,8 @@ source_config_base() -> <<"qos">> => 2 }, <<"resource_opts">> => #{ - <<"batch_size">> => 1, - <<"batch_time">> => <<"0ms">>, - <<"buffer_mode">> => <<"memory_only">>, - <<"buffer_seg_bytes">> => <<"10MB">>, <<"health_check_interval">> => <<"15s">>, - <<"inflight_window">> => 100, - <<"max_buffer_bytes">> => <<"256MB">>, - <<"metrics_flush_interval">> => <<"1s">>, - <<"query_mode">> => <<"sync">>, - <<"request_ttl">> => <<"45s">>, - <<"resume_interval">> => <<"15s">>, - <<"worker_pool_size">> => <<"1">> + <<"resume_interval">> => <<"15s">> } }. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl b/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl index c64b1f2cb..a6c66c609 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl @@ -48,7 +48,9 @@ resource_opts_union_connector_actions_test() -> %% consciouly between connector and actions, in particular when/if we introduce new %% fields there. AllROFields = non_deprecated_fields(emqx_resource_schema:create_opts([])), - ActionROFields = non_deprecated_fields(emqx_bridge_v2_schema:resource_opts_fields()), + ActionROFields = non_deprecated_fields( + emqx_bridge_v2_schema:action_resource_opts_fields() + ), ConnectorROFields = non_deprecated_fields(emqx_connector_schema:resource_opts_fields()), UnionROFields = lists:usort(ConnectorROFields ++ ActionROFields), ?assertEqual( diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl index b575f32ed..20a768d53 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -52,7 +52,7 @@ fields(action_resource_opts) -> fun({K, _V}) -> not lists:member(K, unsupported_opts()) end, - emqx_bridge_v2_schema:resource_opts_fields() + emqx_bridge_v2_schema:action_resource_opts_fields() ); fields(action_create) -> [ diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl index a4d956d78..b8968e82c 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl @@ -105,7 +105,7 @@ fields(action_resource_opts) -> UnsupportedOpts = [batch_size, batch_time], lists:filter( fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, - emqx_bridge_v2_schema:resource_opts_fields() + emqx_bridge_v2_schema:action_resource_opts_fields() ); fields("parameters_opts") -> [ diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 4be7feb19..c33ea757b 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -70,7 +70,7 @@ fields(action_resource_opts) -> fun({K, _V}) -> not lists:member(K, unsupported_opts()) end, - emqx_bridge_v2_schema:resource_opts_fields() + emqx_bridge_v2_schema:action_resource_opts_fields() ); fields(action_parameters) -> [ diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index d74ff40a1..235bc4783 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -552,7 +552,7 @@ fields(connector_resource_opts) -> emqx_connector_schema:resource_opts_fields(); fields(resource_opts) -> SupportedFields = [health_check_interval], - CreationOpts = emqx_bridge_v2_schema:resource_opts_fields(), + CreationOpts = emqx_bridge_v2_schema:action_resource_opts_fields(), lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts); fields(action_field) -> {kafka_producer, diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl index e8eb93624..ba7be1ec4 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl @@ -97,7 +97,7 @@ fields(action_parameters) -> fields(connector_resource_opts) -> emqx_connector_schema:resource_opts_fields(); fields(action_resource_opts) -> - emqx_bridge_v2_schema:resource_opts_fields([ + emqx_bridge_v2_schema:action_resource_opts_fields([ {batch_size, #{ importance => ?IMPORTANCE_HIDDEN, converter => fun(_, _) -> 1 end, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_action_info.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_action_info.erl index cf7a5bc04..407a25118 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_action_info.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_action_info.erl @@ -116,7 +116,7 @@ bridge_v1_config_to_action_config_helper( ) -> %% Transform the egress part to mqtt_publisher connector config SchemaFields = emqx_bridge_mqtt_pubsub_schema:fields("mqtt_subscriber_source"), - ResourceOptsSchemaFields = emqx_bridge_mqtt_pubsub_schema:fields(action_resource_opts), + ResourceOptsSchemaFields = emqx_bridge_mqtt_pubsub_schema:fields(source_resource_opts), ConfigMap1 = general_action_conf_map_from_bridge_v1_config( Config, ConnectorName, SchemaFields, ResourceOptsSchemaFields ), diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl index c05566234..05b2d6d3a 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl @@ -105,8 +105,10 @@ fields(action_resource_opts) -> UnsupportedOpts = [enable_batch, batch_size, batch_time], lists:filter( fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, - emqx_bridge_v2_schema:resource_opts_fields() + emqx_bridge_v2_schema:action_resource_opts_fields() ); +fields(source_resource_opts) -> + emqx_bridge_v2_schema:source_resource_opts_fields(); fields(Field) when Field == "get_bridge_v2"; Field == "post_bridge_v2"; @@ -132,6 +134,8 @@ desc("config") -> ?DESC("desc_config"); desc(action_resource_opts) -> ?DESC(emqx_resource_schema, "creation_opts"); +desc(source_resource_opts) -> + ?DESC(emqx_resource_schema, "creation_opts"); desc(action_parameters) -> ?DESC(action_parameters); desc(ingress_parameters) -> diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl index a0b3edfa7..3e5471d55 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl @@ -112,18 +112,8 @@ source_config(Overrides0) -> <<"qos">> => 2 }, <<"resource_opts">> => #{ - <<"batch_size">> => 1, - <<"batch_time">> => <<"0ms">>, - <<"buffer_mode">> => <<"memory_only">>, - <<"buffer_seg_bytes">> => <<"10MB">>, <<"health_check_interval">> => <<"15s">>, - <<"inflight_window">> => 100, - <<"max_buffer_bytes">> => <<"256MB">>, - <<"metrics_flush_interval">> => <<"1s">>, - <<"query_mode">> => <<"sync">>, - <<"request_ttl">> => <<"45s">>, - <<"resume_interval">> => <<"15s">>, - <<"worker_pool_size">> => <<"1">> + <<"resume_interval">> => <<"15s">> } }, maps:merge(CommonConfig, Overrides). diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl index 9373fe8bd..adda91f37 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl @@ -76,7 +76,7 @@ fields(redis_action) -> [ResOpts] = emqx_connector_schema:resource_opts_ref(?MODULE, action_resource_opts), lists:keyreplace(resource_opts, 1, Schema, ResOpts); fields(action_resource_opts) -> - emqx_bridge_v2_schema:resource_opts_fields([ + emqx_bridge_v2_schema:action_resource_opts_fields([ {batch_size, #{desc => ?DESC(batch_size)}}, {batch_time, #{desc => ?DESC(batch_time)}} ]);