From f40f6bc5ddfb14b7d3a08c7a0cd3662da56fe4e4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 20 Nov 2023 12:37:36 -0300 Subject: [PATCH] refactor: split `resource_opts` fields between connector and actions --- .../src/schema/emqx_bridge_v2_schema.erl | 26 ++++++++++++ .../emqx_bridge/test/emqx_bridge_v2_tests.erl | 41 +++++++++++++++++++ .../src/emqx_bridge_kafka.erl | 2 +- .../src/schema/emqx_connector_schema.erl | 20 +++++++++ 4 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 apps/emqx_bridge/test/emqx_bridge_v2_tests.erl diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index ede783e97..b0ac870e7 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -39,6 +39,7 @@ ]). -export([types/0, types_sc/0]). +-export([resource_opts_fields/0, resource_opts_fields/1]). -export_type([action_type/0]). @@ -137,6 +138,31 @@ types() -> types_sc() -> hoconsc:enum(types()). +resource_opts_fields() -> + resource_opts_fields(_Overrides = []). + +resource_opts_fields(Overrides) -> + ActionROFields = [ + batch_size, + batch_time, + buffer_mode, + buffer_seg_bytes, + health_check_interval, + inflight_window, + max_buffer_bytes, + metrics_flush_interval, + query_mode, + request_ttl, + resume_interval, + start_after_created, + start_timeout, + worker_pool_size + ], + lists:filter( + fun({Key, _Sc}) -> lists:member(Key, ActionROFields) end, + emqx_resource_schema:create_opts(Overrides) + ). + examples(Method) -> MergeFun = fun(Example, Examples) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl b/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl new file mode 100644 index 000000000..4e28f3d88 --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl @@ -0,0 +1,41 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_v2_tests). + +-include_lib("eunit/include/eunit.hrl"). + +resource_opts_union_connector_actions_test() -> + %% The purpose of this test is to ensure we have split `resource_opts' fields + %% consciouly between connector and actions, in particular when/if we introduce new + %% fields there. + AllROFields = non_deprecated_fields(emqx_resource_schema:create_opts([])), + ActionROFields = non_deprecated_fields(emqx_bridge_v2_schema:resource_opts_fields()), + ConnectorROFields = non_deprecated_fields(emqx_connector_schema:resource_opts_fields()), + UnionROFields = lists:usort(ConnectorROFields ++ ActionROFields), + ?assertEqual( + lists:usort(AllROFields), + UnionROFields, + #{ + missing_fields => AllROFields -- UnionROFields, + unexpected_fields => UnionROFields -- AllROFields, + action_fields => ActionROFields, + connector_fields => ConnectorROFields + } + ), + ok. + +non_deprecated_fields(Fields) -> + [K || {K, Schema} <- Fields, not hocon_schema:is_deprecated(Schema)]. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 0eb015cd3..b3934c7bb 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -525,7 +525,7 @@ fields(consumer_kafka_opts) -> ]; fields(resource_opts) -> SupportedFields = [health_check_interval], - CreationOpts = emqx_resource_schema:create_opts(_Overrides = []), + CreationOpts = emqx_bridge_v2_schema:resource_opts_fields(), lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts); fields(action_field) -> {kafka_producer, diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 22eb523be..070c1a165 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -30,6 +30,8 @@ -export([connector_type_to_bridge_types/1]). +-export([resource_opts_fields/0, resource_opts_fields/1]). + -if(?EMQX_RELEASE_EDITION == ee). enterprise_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use @@ -296,6 +298,24 @@ desc(connectors) -> desc(_) -> undefined. +resource_opts_fields() -> + resource_opts_fields(_Overrides = []). + +resource_opts_fields(Overrides) -> + %% Note: these don't include buffer-related configurations because buffer workers are + %% tied to the action. + ConnectorROFields = [ + health_check_interval, + query_mode, + request_ttl, + start_after_created, + start_timeout + ], + lists:filter( + fun({Key, _Sc}) -> lists:member(Key, ConnectorROFields) end, + emqx_resource_schema:create_opts(Overrides) + ). + %%====================================================================================== %% Helper Functions %%======================================================================================