Merge pull request #8770 from JimMoen/fix-resource-opts

* `password` format.
* Hide `pool_size` opt for users, use `worker_pool_size` instead.
* Hide `start_after_created` and `start_timeout` opts. They are internal opts now.
* Fix resource `auto_retry` in `disconnected` state.
This commit is contained in:
JimMoen 2022-08-22 11:28:54 +08:00 committed by GitHub
commit 0102a35770
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 70 additions and 63 deletions

View File

@ -1902,6 +1902,7 @@ common_ssl_opts_schema(Defaults) ->
sensitive => true,
required => false,
example => <<"">>,
format => <<"password">>,
desc => ?DESC(common_ssl_opts_schema_password)
}
)},

View File

@ -68,6 +68,8 @@ ssl_fields() ->
relational_db_fields() ->
[
{database, fun database/1},
%% TODO: The `pool_size` for drivers will be deprecated. Ues `worker_pool_size` for emqx_resource
%% See emqx_resource.hrl
{pool_size, fun pool_size/1},
{username, fun username/1},
{password, fun password/1},
@ -102,6 +104,7 @@ username(_) -> undefined.
password(type) -> binary();
password(desc) -> ?DESC("password");
password(required) -> false;
password(format) -> <<"password">>;
password(_) -> undefined.
auto_reconnect(type) -> boolean();

View File

@ -96,6 +96,7 @@ fields("connector") ->
binary(),
#{
default => "emqx",
format => <<"password">>,
desc => ?DESC("password")
}
)},

View File

@ -22,6 +22,17 @@ emqx_resource_schema {
}
}
worker_pool_size {
desc {
en: """Resource worker pool size."""
zh: """资源连接池大小。"""
}
label {
en: """Worker Pool Size"""
zh: """资源连接池大小"""
}
}
health_check_interval {
desc {
en: """Health check interval, in milliseconds."""
@ -134,8 +145,8 @@ emqx_resource_schema {
queue_max_bytes {
desc {
en: """Maximum queue storage size in bytes."""
zh: """消息队列的最大长度,以字节计。"""
en: """Maximum queue storage."""
zh: """消息队列的最大长度。"""
}
label {
en: """Queue max bytes"""

View File

@ -49,25 +49,27 @@
%% use auto_restart_interval instead
auto_retry_interval => integer(),
%%======================================= Deprecated Opts End
worker_pool_size => pos_integer(),
%% use `integer()` compatibility to release 5.0.0 bpapi
health_check_interval => integer(),
%% We can choose to block the return of emqx_resource:start until
%% the resource connected, wait max to `start_timeout` ms.
start_timeout => integer(),
start_timeout => pos_integer(),
%% If `start_after_created` is set to true, the resource is started right
%% after it is created. But note that a `started` resource is not guaranteed
%% to be `connected`.
start_after_created => boolean(),
%% If the resource disconnected, we can set to retry starting the resource
%% periodically.
auto_restart_interval => integer(),
auto_restart_interval => pos_integer(),
enable_batch => boolean(),
batch_size => integer(),
batch_time => integer(),
batch_size => pos_integer(),
batch_time => pos_integer(),
enable_queue => boolean(),
queue_max_bytes => integer(),
queue_max_bytes => pos_integer(),
query_mode => async | sync | dynamic,
resume_interval => integer(),
async_inflight_window => integer()
resume_interval => pos_integer(),
async_inflight_window => pos_integer()
}.
-type query_result() ::
ok
@ -75,6 +77,8 @@
| {error, term()}
| {resource_down, term()}.
-define(WORKER_POOL_SIZE, 16).
-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
-define(DEFAULT_QUEUE_SIZE_RAW, <<"1GB">>).
@ -92,12 +96,6 @@
-define(HEALTHCHECK_INTERVAL, 15000).
-define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>).
-define(START_AFTER_CREATED, true).
%% milliseconds
-define(START_TIMEOUT, 5000).
-define(START_TIMEOUT_RAW, <<"5s">>).
%% milliseconds
-define(AUTO_RESTART_INTERVAL, 60000).
-define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>).

View File

@ -114,6 +114,11 @@ create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) ->
{ok, _Group, Data} = lookup(ResId),
{ok, Data}.
%% internal configs
-define(START_AFTER_CREATED, true).
%% in milliseconds
-define(START_TIMEOUT, 5000).
%% @doc Create a resource_manager and wait until it is running
create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
% The state machine will make the actual call to the callback/resource module after init
@ -379,9 +384,17 @@ handle_event(EventType, EventData, State, Data) ->
%%------------------------------------------------------------------------------
insert_cache(ResId, Group, Data = #data{manager_id = MgrId}) ->
case get_owner(ResId) of
not_found -> ets:insert(?ETS_TABLE, {ResId, Group, Data});
MgrId -> ets:insert(?ETS_TABLE, {ResId, Group, Data});
_ -> self() ! quit
not_found ->
ets:insert(?ETS_TABLE, {ResId, Group, Data});
MgrId ->
ets:insert(?ETS_TABLE, {ResId, Group, Data});
_ ->
?SLOG(error, #{
msg => get_resource_owner_failed,
resource_id => ResId,
action => quit_resource
}),
self() ! quit
end.
read_cache(ResId) ->
@ -420,12 +433,14 @@ get_owner(ResId) ->
end.
handle_disconnected_state_enter(Data) ->
{next_state, disconnected, Data, retry_actions(Data)}.
retry_actions(Data) ->
case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of
undefined ->
{next_state, disconnected, Data};
[];
RetryInterval ->
Actions = [{state_timeout, RetryInterval, auto_retry}],
{next_state, disconnected, Data, Actions}
[{state_timeout, RetryInterval, auto_retry}]
end.
handle_remove_event(From, ClearMetrics, Data) ->
@ -456,7 +471,7 @@ start_resource(Data, From) ->
%% Keep track of the error reason why the connection did not work
%% so that the Reason can be returned when the verification call is made.
UpdatedData = Data#data{error = Reason},
Actions = maybe_reply([], From, Err),
Actions = maybe_reply(retry_actions(UpdatedData), From, Err),
{next_state, disconnected, UpdatedData, Actions}
end.

View File

@ -53,23 +53,23 @@ init([]) ->
{ok, {SupFlags, ChildSpecs}}.
start_workers(ResId, Opts) ->
PoolSize = pool_size(Opts),
_ = ensure_worker_pool(ResId, hash, [{size, PoolSize}]),
WorkerPoolSize = worker_pool_size(Opts),
_ = ensure_worker_pool(ResId, hash, [{size, WorkerPoolSize}]),
lists:foreach(
fun(Idx) ->
_ = ensure_worker_added(ResId, Idx),
ok = ensure_worker_started(ResId, Idx, Opts)
end,
lists:seq(1, PoolSize)
lists:seq(1, WorkerPoolSize)
).
stop_workers(ResId, Opts) ->
PoolSize = pool_size(Opts),
WorkerPoolSize = worker_pool_size(Opts),
lists:foreach(
fun(Idx) ->
ensure_worker_removed(ResId, Idx)
end,
lists:seq(1, PoolSize)
lists:seq(1, WorkerPoolSize)
),
ensure_worker_pool_removed(ResId),
ok.
@ -77,7 +77,7 @@ stop_workers(ResId, Opts) ->
%%%=============================================================================
%%% Internal
%%%=============================================================================
pool_size(Opts) ->
worker_pool_size(Opts) ->
maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)).
ensure_worker_pool(ResId, Type, Opts) ->

View File

@ -44,9 +44,8 @@ fields("resource_opts") ->
];
fields("creation_opts") ->
[
{worker_pool_size, fun worker_pool_size/1},
{health_check_interval, fun health_check_interval/1},
{start_after_created, fun start_after_created/1},
{start_timeout, fun start_timeout/1},
{auto_restart_interval, fun auto_restart_interval/1},
{query_mode, fun query_mode/1},
{async_inflight_window, fun async_inflight_window/1},
@ -57,24 +56,18 @@ fields("creation_opts") ->
{max_queue_bytes, fun queue_max_bytes/1}
].
worker_pool_size(type) -> pos_integer();
worker_pool_size(desc) -> ?DESC("worker_pool_size");
worker_pool_size(default) -> ?WORKER_POOL_SIZE;
worker_pool_size(required) -> false;
worker_pool_size(_) -> undefined.
health_check_interval(type) -> emqx_schema:duration_ms();
health_check_interval(desc) -> ?DESC("health_check_interval");
health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW;
health_check_interval(required) -> false;
health_check_interval(_) -> undefined.
start_after_created(type) -> boolean();
start_after_created(required) -> false;
start_after_created(default) -> ?START_AFTER_CREATED;
start_after_created(desc) -> ?DESC("start_after_created");
start_after_created(_) -> undefined.
start_timeout(type) -> emqx_schema:duration_ms();
start_timeout(desc) -> ?DESC("start_timeout");
start_timeout(default) -> ?START_TIMEOUT_RAW;
start_timeout(required) -> false;
start_timeout(_) -> undefined.
auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
auto_restart_interval(desc) -> ?DESC("auto_restart_interval");
auto_restart_interval(default) -> ?AUTO_RESTART_INTERVAL_RAW;

View File

@ -150,15 +150,5 @@ emqx_ee_connector_influxdb {
zh: """时间精度"""
}
}
pool_size {
desc {
en: """InfluxDB Pool Size. Default value is CPU threads."""
zh: """InfluxDB 连接池大小,默认为 CPU 线程数。"""
}
label {
en: """InfluxDB Pool Size"""
zh: """InfluxDB 连接池大小"""
}
}
}

View File

@ -135,16 +135,15 @@ fields(basic) ->
{precision,
mk(enum([ns, us, ms, s, m, h]), #{
required => false, default => ms, desc => ?DESC("precision")
})},
{pool_size, mk(pos_integer(), #{desc => ?DESC("pool_size")})}
})}
];
fields(influxdb_udp) ->
fields(basic);
fields(influxdb_api_v1) ->
[
{database, mk(binary(), #{required => true, desc => ?DESC("database")})},
{username, mk(binary(), #{required => true, desc => ?DESC("username")})},
{password, mk(binary(), #{required => true, desc => ?DESC("password")})}
{username, mk(binary(), #{desc => ?DESC("username")})},
{password, mk(binary(), #{desc => ?DESC("password"), format => <<"password">>})}
] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic);
fields(influxdb_api_v2) ->
[
@ -190,15 +189,13 @@ values(udp, put) ->
#{
host => <<"127.0.0.1">>,
port => 8089,
precision => ms,
pool_size => 8
precision => ms
};
values(api_v1, put) ->
#{
host => <<"127.0.0.1">>,
port => 8086,
precision => ms,
pool_size => 8,
database => <<"my_db">>,
username => <<"my_user">>,
password => <<"my_password">>,
@ -209,7 +206,6 @@ values(api_v2, put) ->
host => <<"127.0.0.1">>,
port => 8086,
precision => ms,
pool_size => 8,
bucket => <<"my_bucket">>,
org => <<"my_org">>,
token => <<"my_token">>,
@ -302,14 +298,13 @@ client_config(
InstId,
Config = #{
host := Host,
port := Port,
pool_size := PoolSize
port := Port
}
) ->
[
{host, binary_to_list(Host)},
{port, Port},
{pool_size, PoolSize},
{pool_size, erlang:system_info(schedulers)},
{pool, binary_to_atom(InstId, utf8)},
{precision, atom_to_binary(maps:get(precision, Config, ms), utf8)}
] ++ protocol_config(Config).