Merge pull request #12144 from thalesmg/fix-conn-schema-query-mode-r54-20231211

fix({action,connector}_schema): fix `resource_opts` subfields for connectors and actions, and remove redundant subfields from actions, use query mode from actions
This commit is contained in:
Thales Macedo Garitezi 2023-12-13 09:19:46 -03:00 committed by GitHub
commit edec431a6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 104 additions and 66 deletions

View File

@ -48,7 +48,8 @@
-export([ -export([
make_producer_action_schema/1, make_producer_action_schema/1,
make_consumer_action_schema/1, make_consumer_action_schema/1,
top_level_common_action_keys/0 top_level_common_action_keys/0,
project_to_actions_resource_opts/1
]). ]).
-export_type([action_type/0]). -export_type([action_type/0]).
@ -203,8 +204,8 @@ types_sc() ->
resource_opts_fields() -> resource_opts_fields() ->
resource_opts_fields(_Overrides = []). resource_opts_fields(_Overrides = []).
resource_opts_fields(Overrides) -> common_resource_opts_subfields() ->
ActionROFields = [ [
batch_size, batch_size,
batch_time, batch_time,
buffer_mode, buffer_mode,
@ -216,10 +217,14 @@ resource_opts_fields(Overrides) ->
query_mode, query_mode,
request_ttl, request_ttl,
resume_interval, resume_interval,
start_after_created,
start_timeout,
worker_pool_size worker_pool_size
], ].
common_resource_opts_subfields_bin() ->
lists:map(fun atom_to_binary/1, common_resource_opts_subfields()).
resource_opts_fields(Overrides) ->
ActionROFields = common_resource_opts_subfields(),
lists:filter( lists:filter(
fun({Key, _Sc}) -> lists:member(Key, ActionROFields) end, fun({Key, _Sc}) -> lists:member(Key, ActionROFields) end,
emqx_resource_schema:create_opts(Overrides) emqx_resource_schema:create_opts(Overrides)
@ -274,6 +279,10 @@ make_consumer_action_schema(ActionParametersRef) ->
})} })}
]. ].
project_to_actions_resource_opts(OldResourceOpts) ->
Subfields = common_resource_opts_subfields_bin(),
maps:with(Subfields, OldResourceOpts).
-ifdef(TEST). -ifdef(TEST).
-include_lib("hocon/include/hocon_types.hrl"). -include_lib("hocon/include/hocon_types.hrl").
schema_homogeneous_test() -> schema_homogeneous_test() ->

View File

@ -69,7 +69,6 @@ connector_resource_opts_test() ->
%% These are used by `emqx_resource_manager' itself to manage the resource lifecycle. %% These are used by `emqx_resource_manager' itself to manage the resource lifecycle.
MinimumROFields = [ MinimumROFields = [
health_check_interval, health_check_interval,
query_mode,
start_after_created, start_after_created,
start_timeout start_timeout
], ],

View File

@ -69,13 +69,23 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
bridge_v1_config_to_connector_config(BridgeV1Conf) -> bridge_v1_config_to_connector_config(BridgeV1Conf) ->
%% To satisfy the emqx_bridge_api_SUITE:t_http_crud_apis/1 %% To satisfy the emqx_bridge_api_SUITE:t_http_crud_apis/1
ok = validate_webhook_url(maps:get(<<"url">>, BridgeV1Conf, undefined)), ok = validate_webhook_url(maps:get(<<"url">>, BridgeV1Conf, undefined)),
maps:without(?REMOVED_KEYS ++ ?ACTION_KEYS ++ ?PARAMETER_KEYS, BridgeV1Conf). ConnectorConfig0 = maps:without(?REMOVED_KEYS ++ ?ACTION_KEYS ++ ?PARAMETER_KEYS, BridgeV1Conf),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
ConnectorConfig0
).
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
Parameters = maps:with(?PARAMETER_KEYS, BridgeV1Conf), Parameters = maps:with(?PARAMETER_KEYS, BridgeV1Conf),
Parameters1 = Parameters#{<<"path">> => <<>>, <<"headers">> => #{}}, Parameters1 = Parameters#{<<"path">> => <<>>, <<"headers">> => #{}},
CommonKeys = [<<"enable">>, <<"description">>], CommonKeys = [<<"enable">>, <<"description">>],
ActionConfig = maps:with(?ACTION_KEYS ++ CommonKeys, BridgeV1Conf), ActionConfig0 = maps:with(?ACTION_KEYS ++ CommonKeys, BridgeV1Conf),
ActionConfig = emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
ActionConfig0
),
ActionConfig#{<<"parameters">> => Parameters1, <<"connector">> => ConnectorName}. ActionConfig#{<<"parameters">> => Parameters1, <<"connector">> => ConnectorName}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -48,7 +48,15 @@ fields("get") ->
%%--- v1 bridges config file %%--- v1 bridges config file
%% see: emqx_bridge_schema:fields(bridges) %% see: emqx_bridge_schema:fields(bridges)
fields("config") -> fields("config") ->
basic_config() ++ request_config(); basic_config() ++
request_config() ++
emqx_connector_schema:resource_opts_ref(?MODULE, "v1_resource_opts");
fields("v1_resource_opts") ->
UnsupportedOpts = [enable_batch, batch_size, batch_time],
lists:filter(
fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
emqx_resource_schema:fields("creation_opts")
);
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% v2: configuration %% v2: configuration
fields(action) -> fields(action) ->
@ -89,7 +97,13 @@ fields("http_action") ->
required => true, required => true,
desc => ?DESC("config_parameters_opts") desc => ?DESC("config_parameters_opts")
})} })}
] ++ http_resource_opts(); ] ++ emqx_connector_schema:resource_opts_ref(?MODULE, action_resource_opts);
fields(action_resource_opts) ->
UnsupportedOpts = [batch_size, batch_time],
lists:filter(
fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
emqx_bridge_v2_schema:resource_opts_fields()
);
fields("parameters_opts") -> fields("parameters_opts") ->
[ [
{path, {path,
@ -129,20 +143,20 @@ fields("config_connector") ->
} }
)}, )},
{description, emqx_schema:description_schema()} {description, emqx_schema:description_schema()}
] ++ connector_url_headers() ++ connector_opts(); ] ++ connector_url_headers() ++
%%-------------------------------------------------------------------- connector_opts() ++
%% v1/v2 emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields("resource_opts") -> fields(connector_resource_opts) ->
UnsupportedOpts = [enable_batch, batch_size, batch_time], emqx_connector_schema:resource_opts_fields().
lists:filter(
fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
emqx_resource_schema:fields("creation_opts")
).
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc("resource_opts") -> desc("v1_resource_opts") ->
?DESC(emqx_resource_schema, "creation_opts"); ?DESC(emqx_resource_schema, "creation_opts");
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(action_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for WebHook using `", string:to_upper(Method), "` method."]; ["Configuration for WebHook using `", string:to_upper(Method), "` method."];
desc("config_connector") -> desc("config_connector") ->
@ -304,23 +318,10 @@ request_timeout_field() ->
} }
)}. )}.
http_resource_opts() ->
[
{resource_opts,
mk(
ref(?MODULE, "resource_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
].
connector_opts() -> connector_opts() ->
mark_request_field_deperecated( mark_request_field_deperecated(
proplists:delete(max_retries, emqx_bridge_http_connector:fields(config)) proplists:delete(max_retries, emqx_bridge_http_connector:fields(config))
) ++ http_resource_opts(). ).
mark_request_field_deperecated(Fields) -> mark_request_field_deperecated(Fields) ->
lists:map( lists:map(

View File

@ -120,6 +120,7 @@ fields("get_sentinel") ->
method_fields(get, redis_sentinel); method_fields(get, redis_sentinel);
fields("get_cluster") -> fields("get_cluster") ->
method_fields(get, redis_cluster); method_fields(get, redis_cluster);
%% old bridge v1 schema
fields(Type) when fields(Type) when
Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster
-> ->
@ -147,7 +148,7 @@ redis_bridge_common_fields(Type) ->
{local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})} {local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})}
| fields(action_parameters) | fields(action_parameters)
] ++ ] ++
resource_fields(Type). v1_resource_fields(Type).
connector_fields(Type) -> connector_fields(Type) ->
emqx_redis:fields(Type). emqx_redis:fields(Type).
@ -158,7 +159,7 @@ type_name_fields(Type) ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
]. ].
resource_fields(Type) -> v1_resource_fields(Type) ->
[ [
{resource_opts, {resource_opts,
mk( mk(

View File

@ -43,7 +43,12 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)), ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),
ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)), ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), ActionConfig0 = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
ActionConfig = emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
ActionConfig0
),
ActionConfig#{<<"connector">> => ConnectorName}. ActionConfig#{<<"connector">> => ConnectorName}.
bridge_v1_config_to_connector_config(BridgeV1Config) -> bridge_v1_config_to_connector_config(BridgeV1Config) ->
@ -57,7 +62,12 @@ bridge_v1_config_to_connector_config(BridgeV1Config) ->
(maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys)) ++ (maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys)) ++
[<<"redis_type">>], [<<"redis_type">>],
ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys, ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys,
make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config). ConnectorConfig0 = make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
ConnectorConfig0
).
%%------------------------------------------------------------------------------------------ %%------------------------------------------------------------------------------------------
%% Internal helper fns %% Internal helper fns

View File

@ -51,8 +51,10 @@ fields("config_connector") ->
)} )}
] ++ ] ++
emqx_redis:redis_fields() ++ emqx_redis:redis_fields() ++
emqx_connector_schema:resource_opts_ref(?MODULE, resource_opts) ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts) ++
emqx_connector_schema_lib:ssl_fields(); emqx_connector_schema_lib:ssl_fields();
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields(action) -> fields(action) ->
{?TYPE, {?TYPE,
?HOCON( ?HOCON(
@ -74,15 +76,7 @@ fields(redis_action) ->
} }
) )
), ),
ResOpts = [ResOpts] = emqx_connector_schema:resource_opts_ref(?MODULE, action_resource_opts),
{resource_opts,
?HOCON(
?R_REF(resource_opts),
#{
required => true,
desc => ?DESC(emqx_resource_schema, resource_opts)
}
)},
RedisType = RedisType =
{redis_type, {redis_type,
?HOCON( ?HOCON(
@ -90,8 +84,8 @@ fields(redis_action) ->
#{required => true, desc => ?DESC(redis_type)} #{required => true, desc => ?DESC(redis_type)}
)}, )},
[RedisType | lists:keyreplace(resource_opts, 1, Schema, ResOpts)]; [RedisType | lists:keyreplace(resource_opts, 1, Schema, ResOpts)];
fields(resource_opts) -> fields(action_resource_opts) ->
emqx_resource_schema:create_opts([ emqx_bridge_v2_schema:resource_opts_fields([
{batch_size, #{desc => ?DESC(batch_size)}}, {batch_size, #{desc => ?DESC(batch_size)}},
{batch_time, #{desc => ?DESC(batch_time)}} {batch_time, #{desc => ?DESC(batch_time)}}
]); ]);
@ -124,6 +118,10 @@ desc(redis_action) ->
?DESC(redis_action); ?DESC(redis_action);
desc(resource_opts) -> desc(resource_opts) ->
?DESC(emqx_resource_schema, resource_opts); ?DESC(emqx_resource_schema, resource_opts);
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(action_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_Name) -> desc(_Name) ->
undefined. undefined.

View File

@ -123,10 +123,19 @@ wait_for_ci_redis(Checks, Config) ->
ProxyHost = os:getenv("PROXY_HOST", ?PROXY_HOST), ProxyHost = os:getenv("PROXY_HOST", ?PROXY_HOST),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", ?PROXY_PORT)), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", ?PROXY_PORT)),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ok = emqx_common_test_helpers:start_apps([ Apps = emqx_cth_suite:start(
emqx_conf, emqx_resource, emqx_connector, emqx_bridge, emqx_rule_engine
]),
[ [
emqx,
emqx_conf,
emqx_resource,
emqx_connector,
emqx_bridge,
emqx_rule_engine
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[
{apps, Apps},
{proxy_host, ProxyHost}, {proxy_host, ProxyHost},
{proxy_port, ProxyPort} {proxy_port, ProxyPort}
| Config | Config
@ -143,11 +152,9 @@ redis_checks() ->
1 1
end. end.
end_per_suite(_Config) -> end_per_suite(Config) ->
ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(), Apps = ?config(apps, Config),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]), emqx_cth_suite:stop(Apps),
ok = emqx_connector_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_resource]),
_ = application:stop(emqx_connector),
ok. ok.
init_per_testcase(Testcase, Config0) -> init_per_testcase(Testcase, Config0) ->

View File

@ -28,7 +28,8 @@
-export([ -export([
transform_bridges_v1_to_connectors_and_bridges_v2/1, transform_bridges_v1_to_connectors_and_bridges_v2/1,
transform_bridge_v1_config_to_action_config/4, transform_bridge_v1_config_to_action_config/4,
top_level_common_connector_keys/0 top_level_common_connector_keys/0,
project_to_connector_resource_opts/1
]). ]).
-export([roots/0, fields/1, desc/1, namespace/0, tags/0]). -export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
@ -195,7 +196,7 @@ split_bridge_to_connector_and_action(
case maps:is_key(ConnectorFieldNameBin, BridgeV1Conf) of case maps:is_key(ConnectorFieldNameBin, BridgeV1Conf) of
true -> true ->
PrevFieldConfig = PrevFieldConfig =
project_to_connector_resource_opts( maybe_project_to_connector_resource_opts(
ConnectorFieldNameBin, ConnectorFieldNameBin,
maps:get(ConnectorFieldNameBin, BridgeV1Conf) maps:get(ConnectorFieldNameBin, BridgeV1Conf)
), ),
@ -231,12 +232,15 @@ split_bridge_to_connector_and_action(
end, end,
{BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}. {BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}.
project_to_connector_resource_opts(<<"resource_opts">>, OldResourceOpts) -> maybe_project_to_connector_resource_opts(<<"resource_opts">>, OldResourceOpts) ->
Subfields = common_resource_opts_subfields_bin(), project_to_connector_resource_opts(OldResourceOpts);
maps:with(Subfields, OldResourceOpts); maybe_project_to_connector_resource_opts(_, OldConfig) ->
project_to_connector_resource_opts(_, OldConfig) ->
OldConfig. OldConfig.
project_to_connector_resource_opts(OldResourceOpts) ->
Subfields = common_resource_opts_subfields_bin(),
maps:with(Subfields, OldResourceOpts).
transform_bridge_v1_config_to_action_config( transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
) -> ) ->
@ -533,7 +537,6 @@ resource_opts_ref(Module, RefName) ->
common_resource_opts_subfields() -> common_resource_opts_subfields() ->
[ [
health_check_interval, health_check_interval,
query_mode,
start_after_created, start_after_created,
start_timeout start_timeout
]. ].