diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 97d0afb43..a9aeb0f8e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -260,11 +260,10 @@ create(BridgeType, BridgeName, RawConf) -> #{override_to => cluster} ). -%% NOTE: This function can cause broken references from rules but it is only -%% called directly from test cases. - -spec remove(bridge_v2_type(), bridge_v2_name()) -> ok | {error, any()}. remove(BridgeType, BridgeName) -> + %% NOTE: This function can cause broken references from rules but it is only + %% called directly from test cases. ?SLOG(debug, #{ brige_action => remove, bridge_version => 2, diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl b/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl index 4e28f3d88..d85830828 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl @@ -16,6 +16,32 @@ -module(emqx_bridge_v2_tests). -include_lib("eunit/include/eunit.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +non_deprecated_fields(Fields) -> + [K || {K, Schema} <- Fields, not hocon_schema:is_deprecated(Schema)]. + +find_resource_opts_fields(SchemaMod, FieldName) -> + Fields = hocon_schema:fields(SchemaMod, FieldName), + case lists:keyfind(resource_opts, 1, Fields) of + false -> + undefined; + {resource_opts, ROSc} -> + get_resource_opts_subfields(ROSc) + end. + +get_resource_opts_subfields(Sc) -> + ?R_REF(SchemaModRO, FieldNameRO) = hocon_schema:field_schema(Sc, type), + ROFields = non_deprecated_fields(hocon_schema:fields(SchemaModRO, FieldNameRO)), + proplists:get_keys(ROFields). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ resource_opts_union_connector_actions_test() -> %% The purpose of this test is to ensure we have split `resource_opts' fields @@ -37,5 +63,47 @@ resource_opts_union_connector_actions_test() -> ), ok. -non_deprecated_fields(Fields) -> - [K || {K, Schema} <- Fields, not hocon_schema:is_deprecated(Schema)]. +connector_resource_opts_test() -> + %% The purpose of this test is to ensure that all connectors have the `resource_opts' + %% field with at least some sub-fields that should always be present. + %% These are used by `emqx_resource_manager' itself to manage the resource lifecycle. + MinimumROFields = [ + health_check_interval, + query_mode, + start_after_created, + start_timeout + ], + ConnectorSchemasRefs = + lists:map( + fun({Type, #{type := ?MAP(_, ?R_REF(SchemaMod, FieldName))}}) -> + {Type, find_resource_opts_fields(SchemaMod, FieldName)} + end, + emqx_connector_schema:fields(connectors) + ), + ConnectorsMissingRO = [Type || {Type, undefined} <- ConnectorSchemasRefs], + ConnectorsMissingROSubfields = + lists:filtermap( + fun + ({_Type, undefined}) -> + false; + ({Type, Fs}) -> + case MinimumROFields -- Fs of + [] -> + false; + MissingFields -> + {true, {Type, MissingFields}} + end + end, + ConnectorSchemasRefs + ), + ?assertEqual( + #{ + missing_resource_opts_field => #{}, + missing_subfields => #{} + }, + #{ + missing_resource_opts_field => maps:from_keys(ConnectorsMissingRO, true), + missing_subfields => maps:from_list(ConnectorsMissingROSubfields) + } + ), + ok. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src index 9cd71323e..f9f68634e 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src +++ b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_http, [ {description, "EMQX HTTP Bridge and Connector Application"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]}, {env, [{emqx_action_info_modules, [emqx_bridge_http_action_info]}]}, diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl index 6a9219c11..225405a4a 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl @@ -24,7 +24,6 @@ -export([ bridge_v2_examples/1, - %%conn_bridge_examples/1, connector_examples/1 ]). @@ -169,7 +168,7 @@ basic_config() -> } )}, {description, emqx_schema:description_schema()} - ] ++ http_resource_opts() ++ connector_opts(). + ] ++ connector_opts(). request_config() -> [ @@ -321,7 +320,7 @@ http_resource_opts() -> connector_opts() -> mark_request_field_deperecated( proplists:delete(max_retries, emqx_bridge_http_connector:fields(config)) - ). + ) ++ http_resource_opts(). mark_request_field_deperecated(Fields) -> lists:map( diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 951fb5ef5..d74ff40a1 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -548,6 +548,8 @@ fields(consumer_kafka_opts) -> #{default => <<"5s">>, desc => ?DESC(consumer_offset_commit_interval_seconds)} )} ]; +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields(resource_opts) -> SupportedFields = [health_check_interval], CreationOpts = emqx_bridge_v2_schema:resource_opts_fields(), @@ -568,6 +570,8 @@ desc("config_connector") -> ?DESC("desc_config"); desc(resource_opts) -> ?DESC(emqx_resource_schema, "resource_opts"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc("get_" ++ Type) when Type =:= "consumer"; Type =:= "producer"; Type =:= "connector"; Type =:= "bridge_v2" -> @@ -626,7 +630,7 @@ kafka_connector_config_fields() -> })}, {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})}, {ssl, mk(ref(ssl_client_opts), #{})} - ] ++ [resource_opts()]. + ] ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts). producer_opts(ActionOrBridgeV1) -> [ diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl index 796a4a4d1..bc5e2eb74 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl @@ -53,7 +53,8 @@ fields("config") -> ]; fields("config_connector") -> emqx_connector_schema:common_fields() ++ - fields("connection_fields"); + fields("connection_fields") ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields("connection_fields") -> [ {parameters, @@ -93,6 +94,8 @@ fields(action_parameters) -> {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})}, {payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})} ]; +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields(resource_opts) -> fields("creation_opts"); fields(mongodb_rs) -> @@ -202,6 +205,8 @@ desc("creation_opts") -> ?DESC(emqx_resource_schema, "creation_opts"); desc(resource_opts) -> ?DESC(emqx_resource_schema, "resource_opts"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(mongodb_rs) -> ?DESC(mongodb_rs_conf); desc(mongodb_sharded) -> diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl index 8bbe5ff3a..823418f28 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl @@ -54,7 +54,15 @@ bridge_v1_config_to_connector_config(BridgeV1Config) -> ConnectorTopLevelKeys = schema_keys("config_connector"), ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys), ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys, - make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config). + ConnConfig0 = make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun(ResourceOpts) -> + CommonROSubfields = emqx_connector_schema:common_resource_opts_subfields_bin(), + maps:with(CommonROSubfields, ResourceOpts) + end, + ConnConfig0 + ). make_config_map(PickKeys, IndentKeys, Config) -> Conf0 = maps:with(PickKeys, Config), diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl index f02bf3322..2169aa1f3 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl @@ -51,6 +51,7 @@ fields("config_connector") -> )} ] ++ emqx_redis:redis_fields() ++ + emqx_connector_schema:resource_opts_ref(?MODULE, resource_opts) ++ emqx_connector_schema_lib:ssl_fields(); fields(action) -> {?TYPE, diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index 6887582b3..a70cd8923 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -93,7 +93,9 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. fields(config) -> - emqx_connector_schema:common_fields() ++ fields("connection_fields"); + emqx_connector_schema:common_fields() ++ + fields("connection_fields") ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields("connection_fields") -> [ {server, server()}, @@ -114,6 +116,8 @@ fields("connection_fields") -> emqx_connector_schema_lib:pool_size(Other) end} ]; +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields(Field) when Field == "get"; Field == "post"; @@ -125,6 +129,8 @@ fields(Field) when desc(config) -> ?DESC("desc_config"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Syskeeper Proxy using `", string:to_upper(Method), "` method."]; desc(_) -> diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl index f930b0042..6c20a460c 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl @@ -77,7 +77,9 @@ namespace() -> "connector_syskeeper_proxy". roots() -> []. fields(config) -> - emqx_connector_schema:common_fields() ++ fields("connection_fields"); + emqx_connector_schema:common_fields() ++ + fields("connection_fields") ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields("connection_fields") -> [ {listen, listen()}, @@ -92,6 +94,8 @@ fields("connection_fields") -> #{desc => ?DESC(handshake_timeout), default => <<"10s">>} )} ]; +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields(Field) when Field == "get"; Field == "post"; @@ -103,6 +107,8 @@ fields(Field) when desc(config) -> ?DESC("desc_config"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Syskeeper Proxy using `", string:to_upper(Method), "` method."]; desc(_) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index d4f82d474..31e3a335f 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -40,7 +40,13 @@ type_and_name_fields/1 ]). --export([resource_opts_fields/0, resource_opts_fields/1]). +-export([ + common_resource_opts_subfields/0, + common_resource_opts_subfields_bin/0, + resource_opts_fields/0, + resource_opts_fields/1, + resource_opts_ref/2 +]). -export([examples/1]). @@ -178,14 +184,19 @@ split_bridge_to_connector_and_action( %% Get connector fields from bridge config lists:foldl( fun({ConnectorFieldName, _Spec}, ToTransformSoFar) -> - case maps:is_key(to_bin(ConnectorFieldName), BridgeV1Conf) of + ConnectorFieldNameBin = to_bin(ConnectorFieldName), + case maps:is_key(ConnectorFieldNameBin, BridgeV1Conf) of true -> - NewToTransform = maps:put( - to_bin(ConnectorFieldName), - maps:get(to_bin(ConnectorFieldName), BridgeV1Conf), + PrevFieldConfig = + project_to_connector_resource_opts( + ConnectorFieldNameBin, + maps:get(ConnectorFieldNameBin, BridgeV1Conf) + ), + maps:put( + ConnectorFieldNameBin, + PrevFieldConfig, ToTransformSoFar - ), - NewToTransform; + ); false -> ToTransformSoFar end @@ -213,6 +224,12 @@ split_bridge_to_connector_and_action( end, {BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}. +project_to_connector_resource_opts(<<"resource_opts">>, OldResourceOpts) -> + Subfields = common_resource_opts_subfields_bin(), + maps:with(Subfields, OldResourceOpts); +project_to_connector_resource_opts(_, OldConfig) -> + OldConfig. + transform_bridge_v1_config_to_action_config( BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName ) -> @@ -497,19 +514,33 @@ status_and_actions_fields() -> )} ]. +resource_opts_ref(Module, RefName) -> + [ + {resource_opts, + mk( + ref(Module, RefName), + emqx_resource_schema:resource_opts_meta() + )} + ]. + +common_resource_opts_subfields() -> + [ + health_check_interval, + query_mode, + start_after_created, + start_timeout + ]. + +common_resource_opts_subfields_bin() -> + lists:map(fun atom_to_binary/1, common_resource_opts_subfields()). + resource_opts_fields() -> resource_opts_fields(_Overrides = []). resource_opts_fields(Overrides) -> %% Note: these don't include buffer-related configurations because buffer workers are %% tied to the action. - ConnectorROFields = [ - health_check_interval, - query_mode, - request_ttl, - start_after_created, - start_timeout - ], + ConnectorROFields = common_resource_opts_subfields(), lists:filter( fun({Key, _Sc}) -> lists:member(Key, ConnectorROFields) end, emqx_resource_schema:create_opts(Overrides) diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl index 2b3f7febc..304a7356c 100644 --- a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -49,7 +49,11 @@ fields("connection_fields") -> adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ emqx_connector_schema_lib:ssl_fields(); fields("config_connector") -> - fields("connection_fields") ++ emqx_connector_schema:common_fields(); + fields("connection_fields") ++ + emqx_connector_schema:common_fields() ++ + emqx_connector_schema:resource_opts_ref(?MODULE, resource_opts); +fields(resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields(config) -> fields("config_connector") ++ fields(action); @@ -159,5 +163,7 @@ values(common) -> desc("config_connector") -> ?DESC("config_connector"); +desc(resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index dcf3414e6..4d2e55681 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -23,7 +23,7 @@ -export([namespace/0, roots/0, fields/1, desc/1]). --export([create_opts/1]). +-export([create_opts/1, resource_opts_meta/0]). %% range interval in ms -define(HEALTH_CHECK_INTERVAL_RANGE_MIN, 1).