From 2872f0b6685061fa4242d829b6f33cdf88e5f76c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 11 Aug 2022 19:11:44 +0800 Subject: [PATCH] fix(bridges): support create resources with options --- apps/emqx_bridge/src/emqx_bridge.erl | 9 +++-- apps/emqx_bridge/src/emqx_bridge_resource.erl | 11 +++-- .../i18n/emqx_resource_schema_i18n.conf | 28 ------------- apps/emqx_resource/include/emqx_resource.hrl | 12 +++++- apps/emqx_resource/src/emqx_resource.erl | 40 ++++++++++++++----- .../src/emqx_resource_manager.erl | 8 ++-- .../src/proto/emqx_resource_proto_v1.erl | 4 +- .../src/schema/emqx_resource_schema.erl | 12 +----- .../src/emqx_ee_bridge_influxdb.erl | 2 +- 9 files changed, 61 insertions(+), 65 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index ba6c64dbc..c794a25a5 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -171,9 +171,9 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) -> diff_confs(NewConf, OldConf), %% The config update will be failed if any task in `perform_bridge_changes` failed. Result = perform_bridge_changes([ - {fun emqx_bridge_resource:remove/3, Removed}, - {fun emqx_bridge_resource:create/3, Added}, - {fun emqx_bridge_resource:update/3, Updated} + {fun emqx_bridge_resource:remove/4, Removed}, + {fun emqx_bridge_resource:create/4, Added}, + {fun emqx_bridge_resource:update/4, Updated} ]), ok = unload_hook(), ok = load_hook(NewConf), @@ -261,7 +261,8 @@ perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) -> ({_Type, _Name}, _Conf, {error, Reason}) -> {error, Reason}; ({Type, Name}, Conf, _) -> - case Action(Type, Name, Conf) of + ResOpts = emqx_resource:fetch_creation_opts(Conf), + case Action(Type, Name, Conf, ResOpts) of {error, Reason} -> {error, Reason}; Return -> Return end diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 35ace560c..ac1ec6ba3 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -158,11 +158,14 @@ recreate(Type, Name) -> recreate(Type, Name, emqx:get_config([bridges, Type, Name])). recreate(Type, Name, Conf) -> + recreate(Type, Name, Conf, #{}). + +recreate(Type, Name, Conf, Opts) -> emqx_resource:recreate_local( resource_id(Type, Name), bridge_to_resource_type(Type), parse_confs(Type, Name, Conf), - #{auto_retry_interval => 60000} + Opts#{auto_retry_interval => 60000} ). create_dry_run(Type, Conf) -> @@ -186,13 +189,13 @@ create_dry_run(Type, Conf) -> remove(BridgeId) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), - remove(BridgeType, BridgeName, #{}). + remove(BridgeType, BridgeName, #{}, #{}). remove(Type, Name) -> - remove(Type, Name, undefined). + remove(Type, Name, #{}, #{}). %% just for perform_bridge_changes/1 -remove(Type, Name, _Conf) -> +remove(Type, Name, _Conf, _Opts) -> ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), case emqx_resource:remove_local(resource_id(Type, Name)) of ok -> ok; diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index a3fb6c402..abdb220bb 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -1,32 +1,4 @@ emqx_resource_schema { - batch { - desc { - en: """ -Configuration of batch query.
-Batch requests are made immediately when the number of requests reaches the `batch_size`, or also immediately when the number of requests is less than the batch request size but the maximum batch_time has been reached. -""" - zh: """ -批量请求配置。
-请求数达到批量请求大小时立刻进行批量请求,或当请求数不足批量请求数大小,但已经达到最大批量等待时间时也立即进行批量请求。 -""" - } - label { - en: """batch""" - zh: """批量请求""" - } - } - - queue { - desc { - en: """Configuration of queue.""" - zh: """请求队列配置""" - } - label { - en: """queue""" - zh: """请求队列""" - } - } - query_mode { desc { en: """Query mode. Optional 'sync/async', default 'sync'.""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 5327e3aae..5b6856dc0 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -40,7 +40,7 @@ metrics := emqx_metrics_worker:metrics() }. -type resource_group() :: binary(). --type create_opts() :: #{ +-type creation_opts() :: #{ health_check_interval => integer(), health_check_timeout => integer(), %% We can choose to block the return of emqx_resource:start until @@ -52,7 +52,15 @@ start_after_created => boolean(), %% If the resource disconnected, we can set to retry starting the resource %% periodically. - auto_retry_interval => integer() + auto_retry_interval => integer(), + enable_batch => boolean(), + batch_size => integer(), + batch_time => integer(), + enable_queue => boolean(), + queue_max_bytes => integer(), + query_mode => async | sync | dynamic + resume_interval => integer(), + async_inflight_window => integer() }. -type query_result() :: ok diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 6601b9eea..b134d2af1 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -102,6 +102,7 @@ list_instances_verbose/0, %% return the data of the instance get_instance/1, + fetch_creation_opts/1, %% return all the instances of the same resource type list_instances_by_type/1, generate_id/1, @@ -159,7 +160,7 @@ is_resource_mod(Module) -> create(ResId, Group, ResourceType, Config) -> create(ResId, Group, ResourceType, Config, #{}). --spec create(resource_id(), resource_group(), resource_type(), resource_config(), create_opts()) -> +-spec create(resource_id(), resource_group(), resource_type(), resource_config(), creation_opts()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create(ResId, Group, ResourceType, Config, Opts) -> emqx_resource_proto_v1:create(ResId, Group, ResourceType, Config, Opts). @@ -175,7 +176,7 @@ create_local(ResId, Group, ResourceType, Config) -> resource_group(), resource_type(), resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data()}. create_local(ResId, Group, ResourceType, Config, Opts) -> @@ -196,7 +197,7 @@ create_dry_run_local(ResourceType, Config) -> recreate(ResId, ResourceType, Config) -> recreate(ResId, ResourceType, Config, #{}). --spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) -> +-spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. recreate(ResId, ResourceType, Config, Opts) -> emqx_resource_proto_v1:recreate(ResId, ResourceType, Config, Opts). @@ -206,7 +207,7 @@ recreate(ResId, ResourceType, Config, Opts) -> recreate_local(ResId, ResourceType, Config) -> recreate_local(ResId, ResourceType, Config, #{}). --spec recreate_local(resource_id(), resource_type(), resource_config(), create_opts()) -> +-spec recreate_local(resource_id(), resource_type(), resource_config(), creation_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. recreate_local(ResId, ResourceType, Config, Opts) -> emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts). @@ -249,7 +250,7 @@ simple_async_query(ResId, Request, ReplyFun) -> start(ResId) -> start(ResId, #{}). --spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. +-spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. start(ResId, Opts) -> emqx_resource_manager:start(ResId, Opts). @@ -257,7 +258,7 @@ start(ResId, Opts) -> restart(ResId) -> restart(ResId, #{}). --spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. +-spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. restart(ResId, Opts) -> emqx_resource_manager:restart(ResId, Opts). @@ -277,6 +278,25 @@ set_resource_status_connecting(ResId) -> get_instance(ResId) -> emqx_resource_manager:lookup(ResId). +-spec fetch_creation_opts(map()) -> creation_opts(). +fetch_creation_opts(Opts) -> + SupportedOpts = [ + health_check_interval, + health_check_timeout, + wait_for_resource_ready, + start_after_created, + auto_retry_interval, + enable_batch, + batch_size, + batch_time, + enable_queue, + queue_max_bytes, + query_mode, + resume_interval, + async_inflight_window + ], + maps:with(creation_opts(), SupportedOpts). + -spec list_instances() -> [resource_id()]. list_instances() -> [Id || #{id := Id} <- list_instances_verbose()]. @@ -341,7 +361,7 @@ check_and_create(ResId, Group, ResourceType, RawConfig) -> resource_group(), resource_type(), raw_resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data() | 'already_created'} | {error, term()}. check_and_create(ResId, Group, ResourceType, RawConfig, Opts) -> @@ -366,7 +386,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig) -> resource_group(), resource_type(), raw_resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data()} | {error, term()}. check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) -> check_and_do( @@ -379,7 +399,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) -> resource_id(), resource_type(), raw_resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data()} | {error, term()}. check_and_recreate(ResId, ResourceType, RawConfig, Opts) -> @@ -393,7 +413,7 @@ check_and_recreate(ResId, ResourceType, RawConfig, Opts) -> resource_id(), resource_type(), raw_resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data()} | {error, term()}. check_and_recreate_local(ResId, ResourceType, RawConfig, Opts) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 3310555d1..608548898 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -85,7 +85,7 @@ manager_id_to_resource_id(MgrId) -> resource_group(), resource_type(), resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data()}. ensure_resource(ResId, Group, ResourceType, Config, Opts) -> case lookup(ResId) of @@ -97,7 +97,7 @@ ensure_resource(ResId, Group, ResourceType, Config, Opts) -> end. %% @doc Called from emqx_resource when recreating a resource which may or may not exist --spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) -> +-spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) -> {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}. recreate(ResId, ResourceType, NewConfig, Opts) -> case lookup(ResId) of @@ -166,7 +166,7 @@ remove(ResId, ClearMetrics) when is_binary(ResId) -> safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION). %% @doc Stops and then starts an instance that was already running --spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. +-spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. restart(ResId, Opts) when is_binary(ResId) -> case safe_call(ResId, restart, ?T_OPERATION) of ok -> @@ -177,7 +177,7 @@ restart(ResId, Opts) when is_binary(ResId) -> end. %% @doc Start the resource --spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. +-spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. start(ResId, Opts) -> case safe_call(ResId, start, ?T_OPERATION) of ok -> diff --git a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl index cdd2592d9..11af1a62c 100644 --- a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl +++ b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl @@ -38,7 +38,7 @@ introduced_in() -> resource_group(), resource_type(), resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create(ResId, Group, ResourceType, Config, Opts) -> @@ -58,7 +58,7 @@ create_dry_run(ResourceType, Config) -> resource_id(), resource_type(), resource_config(), - create_opts() + creation_opts() ) -> {ok, resource_data()} | {error, Reason :: term()}. recreate(ResId, ResourceType, Config, Opts) -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 933cd0189..464c055b7 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -30,22 +30,14 @@ namespace() -> "resource_schema". roots() -> []. -fields('batch&async&queue') -> +fields('creation_opts') -> [ {query_mode, fun query_mode/1}, {resume_interval, fun resume_interval/1}, {async_inflight_window, fun async_inflight_window/1}, - {batch, mk(ref(?MODULE, batch), #{desc => ?DESC("batch")})}, - {queue, mk(ref(?MODULE, queue), #{desc => ?DESC("queue")})} - ]; -fields(batch) -> - [ {enable_batch, fun enable_batch/1}, {batch_size, fun batch_size/1}, - {batch_time, fun batch_time/1} - ]; -fields(queue) -> - [ + {batch_time, fun batch_time/1}, {enable_queue, fun enable_queue/1}, {max_queue_bytes, fun queue_max_bytes/1} ]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 4edeb786a..23c6788e8 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -111,7 +111,7 @@ fields(Name) when Name == influxdb_udp orelse Name == influxdb_api_v1 orelse Name == influxdb_api_v2 -> fields(basic) ++ - emqx_resource_schema:fields('batch&async&queue') ++ + emqx_resource_schema:fields('creation_opts') ++ connector_field(Name). method_fileds(post, ConnectorType) ->