feat(respool): switch to `emqx_resource_pool`

Which was previously known as `emqx_plugin_libs_pool`. This is part
of the effort to get rid of `emqx_plugin_libs` application.
This commit is contained in:
Andrew Mayorov 2023-04-12 16:37:51 +03:00
parent 4240720753
commit 21e19a33ce
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
22 changed files with 312 additions and 372 deletions

View File

@ -35,18 +35,17 @@
callback_mode() -> always_sync.
on_start(InstId, Opts) ->
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
PoolOpts = [
{pool_size, maps:get(pool_size, Opts, ?DEFAULT_POOL_SIZE)},
{connector_opts, Opts}
],
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, PoolOpts) of
ok -> {ok, #{pool_name => PoolName}};
case emqx_resource_pool:start(InstId, ?MODULE, PoolOpts) of
ok -> {ok, #{pool_name => InstId}};
{error, Reason} -> {error, Reason}
end.
on_stop(_InstId, #{pool_name := PoolName}) ->
emqx_plugin_libs_pool:stop_pool(PoolName).
emqx_resource_pool:stop(PoolName).
on_query(InstId, get_jwks, #{pool_name := PoolName}) ->
Result = ecpool:pick_and_do(PoolName, {emqx_authn_jwks_client, get_jwks, []}, no_handover),
@ -72,16 +71,15 @@ on_query(_InstId, {update, Opts}, #{pool_name := PoolName}) ->
ok.
on_get_status(_InstId, #{pool_name := PoolName}) ->
Func =
fun(Conn) ->
case emqx_resource_pool:health_check_workers(PoolName, fun health_check/1) of
true -> connected;
false -> disconnected
end.
health_check(Conn) ->
case emqx_authn_jwks_client:get_jwks(Conn) of
{ok, _} -> true;
_ -> false
end
end,
case emqx_plugin_libs_pool:health_check_ecpool_workers(PoolName, Func) of
true -> connected;
false -> disconnected
end.
connect(Opts) ->

View File

@ -231,9 +231,8 @@ on_start(
{transport_opts, NTransportOpts},
{enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)}
],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
State = #{
pool_name => PoolName,
pool_name => InstId,
pool_type => PoolType,
host => Host,
port => Port,
@ -241,7 +240,7 @@ on_start(
base_path => BasePath,
request => preprocess_request(maps:get(request, Config, undefined))
},
case ehttpc_sup:start_pool(PoolName, PoolOpts) of
case ehttpc_sup:start_pool(InstId, PoolOpts) of
{ok, _} -> {ok, State};
{error, {already_started, _}} -> {ok, State};
{error, Reason} -> {error, Reason}

View File

@ -87,20 +87,19 @@ on_start(
{pool_size, PoolSize},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of
ok -> {ok, #{poolname => PoolName}};
case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ SslOpts) of
ok -> {ok, #{pool_name => InstId}};
{error, Reason} -> {error, Reason}
end.
on_stop(InstId, #{poolname := PoolName}) ->
on_stop(InstId, #{pool_name := PoolName}) ->
?SLOG(info, #{
msg => "stopping_ldap_connector",
connector => InstId
}),
emqx_plugin_libs_pool:stop_pool(PoolName).
emqx_resource_pool:stop(PoolName).
on_query(InstId, {search, Base, Filter, Attributes}, #{poolname := PoolName} = State) ->
on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} = State) ->
Request = {Base, Filter, Attributes},
?TRACE(
"QUERY",

View File

@ -182,12 +182,11 @@ on_start(
{options, init_topology_options(maps:to_list(Topology), [])},
{worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}
],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
Collection = maps:get(collection, Config, <<"mqtt">>),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts) of
case emqx_resource_pool:start(InstId, ?MODULE, Opts) of
ok ->
{ok, #{
poolname => PoolName,
pool_name => InstId,
type => Type,
collection => Collection
}};
@ -195,17 +194,17 @@ on_start(
{error, Reason}
end.
on_stop(InstId, #{poolname := PoolName}) ->
on_stop(InstId, #{pool_name := PoolName}) ->
?SLOG(info, #{
msg => "stopping_mongodb_connector",
connector => InstId
}),
emqx_plugin_libs_pool:stop_pool(PoolName).
emqx_resource_pool:stop(PoolName).
on_query(
InstId,
{send_message, Document},
#{poolname := PoolName, collection := Collection} = State
#{pool_name := PoolName, collection := Collection} = State
) ->
Request = {insert, Collection, Document},
?TRACE(
@ -234,7 +233,7 @@ on_query(
on_query(
InstId,
{Action, Collection, Filter, Projector},
#{poolname := PoolName} = State
#{pool_name := PoolName} = State
) ->
Request = {Action, Collection, Filter, Projector},
?TRACE(
@ -263,8 +262,7 @@ on_query(
{ok, Result}
end.
-dialyzer({nowarn_function, [on_get_status/2]}).
on_get_status(InstId, #{poolname := PoolName} = _State) ->
on_get_status(InstId, #{pool_name := PoolName}) ->
case health_check(PoolName) of
true ->
?tp(debug, emqx_connector_mongo_health_check, #{
@ -281,8 +279,10 @@ on_get_status(InstId, #{poolname := PoolName} = _State) ->
end.
health_check(PoolName) ->
emqx_plugin_libs_pool:health_check_ecpool_workers(
PoolName, fun ?MODULE:check_worker_health/1, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
emqx_resource_pool:health_check_workers(
PoolName,
fun ?MODULE:check_worker_health/1,
?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
).
%% ===================================================================

View File

@ -51,7 +51,7 @@
-type sqls() :: #{atom() => binary()}.
-type state() ::
#{
poolname := atom(),
pool_name := binary(),
prepare_statement := prepares(),
params_tokens := params_tokens(),
batch_inserts := sqls(),
@ -123,13 +123,10 @@ on_start(
{pool_size, PoolSize}
]
),
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
Prepares = parse_prepare_sql(Config),
State = maps:merge(#{poolname => PoolName}, Prepares),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
State = parse_prepare_sql(Config),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok ->
{ok, init_prepare(State)};
{ok, init_prepare(State#{pool_name => InstId})};
{error, Reason} ->
?tp(
mysql_connector_start_failed,
@ -143,12 +140,12 @@ maybe_add_password_opt(undefined, Options) ->
maybe_add_password_opt(Password, Options) ->
[{password, Password} | Options].
on_stop(InstId, #{poolname := PoolName}) ->
on_stop(InstId, #{pool_name := PoolName}) ->
?SLOG(info, #{
msg => "stopping_mysql_connector",
connector => InstId
}),
emqx_plugin_libs_pool:stop_pool(PoolName).
emqx_resource_pool:stop(PoolName).
on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);
@ -157,7 +154,7 @@ on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) ->
on_query(
InstId,
{TypeOrKey, SQLOrKey, Params, Timeout},
#{poolname := PoolName, prepare_statement := Prepares} = State
#{pool_name := PoolName, prepare_statement := Prepares} = State
) ->
MySqlFunction = mysql_function(TypeOrKey),
{SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
@ -216,8 +213,8 @@ mysql_function(prepared_query) ->
mysql_function(_) ->
mysql_function(prepared_query).
on_get_status(_InstId, #{poolname := Pool} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
on_get_status(_InstId, #{pool_name := PoolName} = State) ->
case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
true ->
case do_check_prepares(State) of
ok ->
@ -238,7 +235,7 @@ do_get_status(Conn) ->
do_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) ->
ok;
do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, Prepares}}) ->
do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, Prepares}}) ->
%% retry to prepare
case prepare_sql(Prepares, PoolName) of
ok ->
@ -253,7 +250,7 @@ do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, P
connect(Options) ->
mysql:start_link(Options).
init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) ->
init_prepare(State = #{prepare_statement := Prepares, pool_name := PoolName}) ->
case maps:size(Prepares) of
0 ->
State;
@ -409,7 +406,7 @@ on_sql_query(
SQLOrKey,
Params,
Timeout,
#{poolname := PoolName} = State
#{pool_name := PoolName} = State
) ->
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
?TRACE("QUERY", "mysql_connector_received", LogMeta),

View File

@ -56,7 +56,7 @@
-type state() ::
#{
poolname := atom(),
pool_name := binary(),
prepare_sql := prepares(),
params_tokens := params_tokens(),
prepare_statement := epgsql:statement()
@ -120,13 +120,10 @@ on_start(
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize}
],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
Prepares = parse_prepare_sql(Config),
InitState = #{poolname => PoolName, prepare_statement => #{}},
State = maps:merge(InitState, Prepares),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
State = parse_prepare_sql(Config),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok ->
{ok, init_prepare(State)};
{ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
{error, Reason} ->
?tp(
pgsql_connector_start_failed,
@ -135,19 +132,19 @@ on_start(
{error, Reason}
end.
on_stop(InstId, #{poolname := PoolName}) ->
on_stop(InstId, #{pool_name := PoolName}) ->
?SLOG(info, #{
msg => "stopping postgresql connector",
connector => InstId
}),
emqx_plugin_libs_pool:stop_pool(PoolName).
emqx_resource_pool:stop(PoolName).
on_query(InstId, {TypeOrKey, NameOrSQL}, #{poolname := _PoolName} = State) ->
on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
on_query(
InstId,
{TypeOrKey, NameOrSQL, Params},
#{poolname := PoolName} = State
#{pool_name := PoolName} = State
) ->
?SLOG(debug, #{
msg => "postgresql connector received sql query",
@ -174,7 +171,7 @@ pgsql_query_type(_) ->
on_batch_query(
InstId,
BatchReq,
#{poolname := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State
#{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State
) ->
case BatchReq of
[{Key, _} = Request | _] ->
@ -258,8 +255,8 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
{error, {unrecoverable_error, invalid_request}}
end.
on_get_status(_InstId, #{poolname := Pool} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
on_get_status(_InstId, #{pool_name := PoolName} = State) ->
case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
true ->
case do_check_prepares(State) of
ok ->
@ -280,7 +277,7 @@ do_get_status(Conn) ->
do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
ok;
do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepares}}) ->
do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) ->
%% retry to prepare
case prepare_sql(Prepares, PoolName) of
{ok, Sts} ->
@ -358,7 +355,7 @@ parse_prepare_sql([], Prepares, Tokens) ->
params_tokens => Tokens
}.
init_prepare(State = #{prepare_sql := Prepares, poolname := PoolName}) ->
init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) ->
case maps:size(Prepares) of
0 ->
State;
@ -389,17 +386,17 @@ prepare_sql(Prepares, PoolName) ->
end.
do_prepare_sql(Prepares, PoolName) ->
do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}).
do_prepare_sql(ecpool:workers(PoolName), Prepares, #{}).
do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) ->
do_prepare_sql([{_Name, Worker} | T], Prepares, _LastSts) ->
{ok, Conn} = ecpool_worker:client(Worker),
case prepare_sql_to_conn(Conn, Prepares) of
{ok, Sts} ->
do_prepare_sql(T, Prepares, PoolName, Sts);
do_prepare_sql(T, Prepares, Sts);
Error ->
Error
end;
do_prepare_sql([], _Prepares, _PoolName, LastSts) ->
do_prepare_sql([], _Prepares, LastSts) ->
{ok, LastSts}.
prepare_sql_to_conn(Conn, Prepares) ->

View File

@ -153,11 +153,10 @@ on_start(
false ->
[{ssl, false}]
end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
PoolName = InstId,
State = #{poolname => PoolName, type => Type},
State = #{pool_name => InstId, type => Type},
case Type of
cluster ->
case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of
case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of
{ok, _} ->
{ok, State};
{ok, _, _} ->
@ -166,22 +165,20 @@ on_start(
{error, Reason}
end;
_ ->
case
emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}])
of
case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ [{options, Options}]) of
ok -> {ok, State};
{error, Reason} -> {error, Reason}
end
end.
on_stop(InstId, #{poolname := PoolName, type := Type}) ->
on_stop(InstId, #{pool_name := PoolName, type := Type}) ->
?SLOG(info, #{
msg => "stopping_redis_connector",
connector => InstId
}),
case Type of
cluster -> eredis_cluster:stop_pool(PoolName);
_ -> emqx_plugin_libs_pool:stop_pool(PoolName)
_ -> emqx_resource_pool:stop(PoolName)
end.
on_query(InstId, {cmd, _} = Query, State) ->
@ -189,7 +186,7 @@ on_query(InstId, {cmd, _} = Query, State) ->
on_query(InstId, {cmds, _} = Query, State) ->
do_query(InstId, Query, State).
do_query(InstId, Query, #{poolname := PoolName, type := Type} = State) ->
do_query(InstId, Query, #{pool_name := PoolName, type := Type} = State) ->
?TRACE(
"QUERY",
"redis_connector_received",
@ -227,7 +224,7 @@ is_unrecoverable_error({error, invalid_cluster_command}) ->
is_unrecoverable_error(_) ->
false.
on_get_status(_InstId, #{type := cluster, poolname := PoolName}) ->
on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
case eredis_cluster:pool_exists(PoolName) of
true ->
Health = eredis_cluster:ping_all(PoolName),
@ -235,8 +232,8 @@ on_get_status(_InstId, #{type := cluster, poolname := PoolName}) ->
false ->
disconnected
end;
on_get_status(_InstId, #{poolname := Pool}) ->
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1),
on_get_status(_InstId, #{pool_name := PoolName}) ->
Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1),
status_result(Health).
do_get_status(Conn) ->

View File

@ -64,15 +64,15 @@ t_lifecycle(_Config) ->
mongo_config()
).
perform_lifecycle_check(PoolName, InitialConfig) ->
perform_lifecycle_check(ResourceId, InitialConfig) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?MONGO_RESOURCE_MOD, InitialConfig),
{ok, #{
state := #{poolname := ReturnedPoolName} = State,
state := #{pool_name := PoolName} = State,
status := InitialStatus
}} =
emqx_resource:create_local(
PoolName,
ResourceId,
?CONNECTOR_RESOURCE_GROUP,
?MONGO_RESOURCE_MOD,
CheckedConfig,
@ -84,39 +84,39 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
state := State,
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
emqx_resource:get_instance(ResourceId),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)),
% % Perform query as further check that the resource is working as expected
?assertMatch({ok, []}, emqx_resource:query(PoolName, test_query_find())),
?assertMatch({ok, undefined}, emqx_resource:query(PoolName, test_query_find_one())),
?assertEqual(ok, emqx_resource:stop(PoolName)),
?assertMatch({ok, []}, emqx_resource:query(ResourceId, test_query_find())),
?assertMatch({ok, undefined}, emqx_resource:query(ResourceId, test_query_find_one())),
?assertEqual(ok, emqx_resource:stop(ResourceId)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State,
status := StoppedStatus
}} =
emqx_resource:get_instance(PoolName),
emqx_resource:get_instance(ResourceId),
?assertEqual(stopped, StoppedStatus),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceId)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
% Can call stop/1 again on an already stopped instance
?assertEqual(ok, emqx_resource:stop(PoolName)),
?assertEqual(ok, emqx_resource:stop(ResourceId)),
% Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName)),
?assertEqual(ok, emqx_resource:restart(ResourceId)),
% async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertMatch({ok, []}, emqx_resource:query(PoolName, test_query_find())),
?assertMatch({ok, undefined}, emqx_resource:query(PoolName, test_query_find_one())),
emqx_resource:get_instance(ResourceId),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)),
?assertMatch({ok, []}, emqx_resource:query(ResourceId, test_query_find())),
?assertMatch({ok, undefined}, emqx_resource:query(ResourceId, test_query_find_one())),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
?assertEqual(ok, emqx_resource:remove_local(ResourceId)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
% Should not even be able to get the resource data out of ets now unlike just stopping.
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceId)).
% %%------------------------------------------------------------------------------
% %% Helpers

View File

@ -64,14 +64,14 @@ t_lifecycle(_Config) ->
mysql_config()
).
perform_lifecycle_check(PoolName, InitialConfig) ->
perform_lifecycle_check(ResourceId, InitialConfig) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?MYSQL_RESOURCE_MOD, InitialConfig),
{ok, #{
state := #{poolname := ReturnedPoolName} = State,
state := #{pool_name := PoolName} = State,
status := InitialStatus
}} = emqx_resource:create_local(
PoolName,
ResourceId,
?CONNECTOR_RESOURCE_GROUP,
?MYSQL_RESOURCE_MOD,
CheckedConfig,
@ -83,53 +83,53 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
state := State,
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
emqx_resource:get_instance(ResourceId),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)),
% % Perform query as further check that the resource is working as expected
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(ResourceId, test_query_no_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(ResourceId, test_query_with_params())),
?assertMatch(
{ok, _, [[1]]},
emqx_resource:query(
PoolName,
ResourceId,
test_query_with_params_and_timeout()
)
),
?assertEqual(ok, emqx_resource:stop(PoolName)),
?assertEqual(ok, emqx_resource:stop(ResourceId)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State,
status := StoppedStatus
}} =
emqx_resource:get_instance(PoolName),
emqx_resource:get_instance(ResourceId),
?assertEqual(stopped, StoppedStatus),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceId)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
% Can call stop/1 again on an already stopped instance
?assertEqual(ok, emqx_resource:stop(PoolName)),
?assertEqual(ok, emqx_resource:stop(ResourceId)),
% Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName)),
?assertEqual(ok, emqx_resource:restart(ResourceId)),
% async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())),
emqx_resource:get_instance(ResourceId),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(ResourceId, test_query_no_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(ResourceId, test_query_with_params())),
?assertMatch(
{ok, _, [[1]]},
emqx_resource:query(
PoolName,
ResourceId,
test_query_with_params_and_timeout()
)
),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
?assertEqual(ok, emqx_resource:remove_local(ResourceId)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
% Should not even be able to get the resource data out of ets now unlike just stopping.
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceId)).
% %%------------------------------------------------------------------------------
% %% Helpers

View File

@ -64,15 +64,15 @@ t_lifecycle(_Config) ->
pgsql_config()
).
perform_lifecycle_check(PoolName, InitialConfig) ->
perform_lifecycle_check(ResourceId, InitialConfig) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?PGSQL_RESOURCE_MOD, InitialConfig),
{ok, #{
state := #{poolname := ReturnedPoolName} = State,
state := #{pool_name := PoolName} = State,
status := InitialStatus
}} =
emqx_resource:create_local(
PoolName,
ResourceId,
?CONNECTOR_RESOURCE_GROUP,
?PGSQL_RESOURCE_MOD,
CheckedConfig,
@ -84,39 +84,39 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
state := State,
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
emqx_resource:get_instance(ResourceId),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)),
% % Perform query as further check that the resource is working as expected
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_with_params())),
?assertEqual(ok, emqx_resource:stop(PoolName)),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(ResourceId, test_query_no_params())),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(ResourceId, test_query_with_params())),
?assertEqual(ok, emqx_resource:stop(ResourceId)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State,
status := StoppedStatus
}} =
emqx_resource:get_instance(PoolName),
emqx_resource:get_instance(ResourceId),
?assertEqual(stopped, StoppedStatus),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceId)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
% Can call stop/1 again on an already stopped instance
?assertEqual(ok, emqx_resource:stop(PoolName)),
?assertEqual(ok, emqx_resource:stop(ResourceId)),
% Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName)),
?assertEqual(ok, emqx_resource:restart(ResourceId)),
% async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_with_params())),
emqx_resource:get_instance(ResourceId),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(ResourceId, test_query_no_params())),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(ResourceId, test_query_with_params())),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
?assertEqual(ok, emqx_resource:remove_local(ResourceId)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
% Should not even be able to get the resource data out of ets now unlike just stopping.
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceId)).
% %%------------------------------------------------------------------------------
% %% Helpers

View File

@ -102,14 +102,14 @@ t_sentinel_lifecycle(_Config) ->
[<<"PING">>]
).
perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
perform_lifecycle_check(ResourceId, InitialConfig, RedisCommand) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?REDIS_RESOURCE_MOD, InitialConfig),
{ok, #{
state := #{poolname := ReturnedPoolName} = State,
state := #{pool_name := PoolName} = State,
status := InitialStatus
}} = emqx_resource:create_local(
PoolName,
ResourceId,
?CONNECTOR_RESOURCE_GROUP,
?REDIS_RESOURCE_MOD,
CheckedConfig,
@ -121,49 +121,49 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
state := State,
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
emqx_resource:get_instance(ResourceId),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)),
% Perform query as further check that the resource is working as expected
?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})),
?assertEqual({ok, <<"PONG">>}, emqx_resource:query(ResourceId, {cmd, RedisCommand})),
?assertEqual(
{ok, [{ok, <<"PONG">>}, {ok, <<"PONG">>}]},
emqx_resource:query(PoolName, {cmds, [RedisCommand, RedisCommand]})
emqx_resource:query(ResourceId, {cmds, [RedisCommand, RedisCommand]})
),
?assertMatch(
{error, {unrecoverable_error, [{ok, <<"PONG">>}, {error, _}]}},
emqx_resource:query(
PoolName,
ResourceId,
{cmds, [RedisCommand, [<<"INVALID_COMMAND">>]]},
#{timeout => 500}
)
),
?assertEqual(ok, emqx_resource:stop(PoolName)),
?assertEqual(ok, emqx_resource:stop(ResourceId)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State,
status := StoppedStatus
}} =
emqx_resource:get_instance(PoolName),
emqx_resource:get_instance(ResourceId),
?assertEqual(stopped, StoppedStatus),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceId)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
% Can call stop/1 again on an already stopped instance
?assertEqual(ok, emqx_resource:stop(PoolName)),
?assertEqual(ok, emqx_resource:stop(ResourceId)),
% Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName)),
?assertEqual(ok, emqx_resource:restart(ResourceId)),
% async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})),
emqx_resource:get_instance(ResourceId),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)),
?assertEqual({ok, <<"PONG">>}, emqx_resource:query(ResourceId, {cmd, RedisCommand})),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
?assertEqual(ok, emqx_resource:remove_local(ResourceId)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
% Should not even be able to get the resource data out of ets now unlike just stopping.
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceId)).
% %%------------------------------------------------------------------------------
% %% Helpers

View File

@ -14,31 +14,27 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_plugin_libs_pool).
-module(emqx_resource_pool).
-export([
start_pool/3,
stop_pool/1,
pool_name/1,
health_check_ecpool_workers/2,
health_check_ecpool_workers/3
start/3,
stop/1,
health_check_workers/2,
health_check_workers/3
]).
-include_lib("emqx/include/logger.hrl").
-define(HEALTH_CHECK_TIMEOUT, 15000).
pool_name(ID) when is_binary(ID) ->
list_to_atom(binary_to_list(ID)).
start_pool(Name, Mod, Options) ->
start(Name, Mod, Options) ->
case ecpool:start_sup_pool(Name, Mod, Options) of
{ok, _} ->
?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}),
ok;
{error, {already_started, _Pid}} ->
stop_pool(Name),
start_pool(Name, Mod, Options);
stop(Name),
start(Name, Mod, Options);
{error, Reason} ->
NReason = parse_reason(Reason),
?SLOG(error, #{
@ -49,7 +45,7 @@ start_pool(Name, Mod, Options) ->
{error, {start_pool_failed, Name, NReason}}
end.
stop_pool(Name) ->
stop(Name) ->
case ecpool:stop_sup_pool(Name) of
ok ->
?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name});
@ -64,10 +60,10 @@ stop_pool(Name) ->
error({stop_pool_failed, Name, Reason})
end.
health_check_ecpool_workers(PoolName, CheckFunc) ->
health_check_ecpool_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT).
health_check_workers(PoolName, CheckFunc) ->
health_check_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT).
health_check_ecpool_workers(PoolName, CheckFunc, Timeout) ->
health_check_workers(PoolName, CheckFunc, Timeout) ->
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
DoPerWorker =
fun(Worker) ->

View File

@ -917,7 +917,7 @@ t_invalid_private_key(Config) ->
#{<<"private_key">> => InvalidPrivateKeyPEM}
}
),
#{?snk_kind := gcp_pubsub_bridge_jwt_worker_failed_to_start},
#{?snk_kind := "gcp_pubsub_bridge_jwt_worker_failed_to_start"},
20_000
),
Res
@ -928,7 +928,7 @@ t_invalid_private_key(Config) ->
[#{reason := Reason}] when
Reason =:= noproc orelse
Reason =:= {shutdown, {error, empty_key}},
?of_kind(gcp_pubsub_bridge_jwt_worker_failed_to_start, Trace)
?of_kind("gcp_pubsub_bridge_jwt_worker_failed_to_start", Trace)
),
?assertMatch(
[#{error := empty_key}],
@ -956,14 +956,14 @@ t_jwt_worker_start_timeout(Config) ->
#{<<"private_key">> => InvalidPrivateKeyPEM}
}
),
#{?snk_kind := gcp_pubsub_bridge_jwt_timeout},
#{?snk_kind := "gcp_pubsub_bridge_jwt_timeout"},
20_000
),
Res
end,
fun(Res, Trace) ->
?assertMatch({ok, _}, Res),
?assertMatch([_], ?of_kind(gcp_pubsub_bridge_jwt_timeout, Trace)),
?assertMatch([_], ?of_kind("gcp_pubsub_bridge_jwt_timeout", Trace)),
ok
end
),
@ -1329,7 +1329,7 @@ t_failed_to_start_jwt_worker(Config) ->
fun(Trace) ->
?assertMatch(
[#{reason := {error, restarting}}],
?of_kind(gcp_pubsub_bridge_jwt_worker_failed_to_start, Trace)
?of_kind("gcp_pubsub_bridge_jwt_worker_failed_to_start", Trace)
),
ok
end

View File

@ -265,7 +265,7 @@ unprepare(Config, Key) ->
Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
{ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID),
{ok, _, #{state := #{pool_name := PoolName}}} = emqx_resource:get_instance(ResourceID),
[
begin
{ok, Conn} = ecpool_worker:client(Worker),

View File

@ -44,7 +44,7 @@
-type state() ::
#{
poolname := atom(),
pool_name := binary(),
prepare_cql := prepares(),
params_tokens := params_tokens(),
%% returned by ecql:prepare/2
@ -124,14 +124,10 @@ on_start(
false ->
[]
end,
%% use InstaId of binary type as Pool name, which is supported in ecpool.
PoolName = InstId,
Prepares = parse_prepare_cql(Config),
InitState = #{poolname => PoolName, prepare_statement => #{}},
State = maps:merge(InitState, Prepares),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
State = parse_prepare_cql(Config),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok ->
{ok, init_prepare(State)};
{ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
{error, Reason} ->
?tp(
cassandra_connector_start_failed,
@ -140,12 +136,12 @@ on_start(
{error, Reason}
end.
on_stop(InstId, #{poolname := PoolName}) ->
on_stop(InstId, #{pool_name := PoolName}) ->
?SLOG(info, #{
msg => "stopping_cassandra_connector",
connector => InstId
}),
emqx_plugin_libs_pool:stop_pool(PoolName).
emqx_resource_pool:stop(PoolName).
-type request() ::
% emqx_bridge.erl
@ -184,7 +180,7 @@ do_single_query(
InstId,
Request,
Async,
#{poolname := PoolName} = State
#{pool_name := PoolName} = State
) ->
{Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request),
?tp(
@ -232,7 +228,7 @@ do_batch_query(
InstId,
Requests,
Async,
#{poolname := PoolName} = State
#{pool_name := PoolName} = State
) ->
CQLs =
lists:map(
@ -305,8 +301,8 @@ exec_cql_batch_query(InstId, PoolName, Async, CQLs) ->
Result
end.
on_get_status(_InstId, #{poolname := Pool} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
on_get_status(_InstId, #{pool_name := PoolName} = State) ->
case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
true ->
case do_check_prepares(State) of
ok ->
@ -327,7 +323,7 @@ do_get_status(Conn) ->
do_check_prepares(#{prepare_cql := Prepares}) when is_map(Prepares) ->
ok;
do_check_prepares(State = #{poolname := PoolName, prepare_cql := {error, Prepares}}) ->
do_check_prepares(State = #{pool_name := PoolName, prepare_cql := {error, Prepares}}) ->
%% retry to prepare
case prepare_cql(Prepares, PoolName) of
{ok, Sts} ->
@ -397,7 +393,7 @@ parse_prepare_cql([], Prepares, Tokens) ->
params_tokens => Tokens
}.
init_prepare(State = #{prepare_cql := Prepares, poolname := PoolName}) ->
init_prepare(State = #{prepare_cql := Prepares, pool_name := PoolName}) ->
case maps:size(Prepares) of
0 ->
State;
@ -429,17 +425,17 @@ prepare_cql(Prepares, PoolName) ->
end.
do_prepare_cql(Prepares, PoolName) ->
do_prepare_cql(ecpool:workers(PoolName), Prepares, PoolName, #{}).
do_prepare_cql(ecpool:workers(PoolName), Prepares, #{}).
do_prepare_cql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) ->
do_prepare_cql([{_Name, Worker} | T], Prepares, _LastSts) ->
{ok, Conn} = ecpool_worker:client(Worker),
case prepare_cql_to_conn(Conn, Prepares) of
{ok, Sts} ->
do_prepare_cql(T, Prepares, PoolName, Sts);
do_prepare_cql(T, Prepares, Sts);
Error ->
Error
end;
do_prepare_cql([], _Prepares, _PoolName, LastSts) ->
do_prepare_cql([], _Prepares, LastSts) ->
{ok, LastSts}.
prepare_cql_to_conn(Conn, Prepares) ->

View File

@ -62,7 +62,8 @@
-type state() ::
#{
templates := templates(),
poolname := atom()
pool_name := binary(),
connect_timeout := pos_integer()
}.
-type clickhouse_config() :: map().
@ -141,7 +142,6 @@ on_start(
connector => InstanceID,
config => emqx_utils:redact(Config)
}),
PoolName = emqx_plugin_libs_pool:pool_name(InstanceID),
Options = [
{url, URL},
{user, maps:get(username, Config, "default")},
@ -149,46 +149,43 @@ on_start(
{database, DB},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize},
{pool, PoolName}
{pool, InstanceID}
],
InitState = #{
poolname => PoolName,
connect_timeout => ConnectTimeout
},
try
Templates = prepare_sql_templates(Config),
State = maps:merge(InitState, #{templates => Templates}),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options) of
State = #{
pool_name => InstanceID,
templates => Templates,
connect_timeout => ConnectTimeout
},
case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of
ok ->
{ok, State};
{error, Reason} ->
log_start_error(Config, Reason, none),
?tp(
info,
"clickhouse_connector_start_failed",
#{
error => Reason,
config => emqx_utils:redact(Config)
}
),
{error, Reason}
end
catch
_:CatchReason:Stacktrace ->
log_start_error(Config, CatchReason, Stacktrace),
?tp(
info,
"clickhouse_connector_start_failed",
#{
error => CatchReason,
stacktrace => Stacktrace,
config => emqx_utils:redact(Config)
}
),
{error, CatchReason}
end.
log_start_error(Config, Reason, Stacktrace) ->
StacktraceMap =
case Stacktrace of
none -> #{};
_ -> #{stacktrace => Stacktrace}
end,
LogMessage =
#{
msg => "clickhouse_connector_start_failed",
error_reason => Reason,
config => emqx_utils:redact(Config)
},
?SLOG(info, maps:merge(LogMessage, StacktraceMap)),
?tp(
clickhouse_connector_start_failed,
#{error => Reason}
).
%% Helper functions to prepare SQL tempaltes
prepare_sql_templates(#{
@ -240,7 +237,7 @@ split_clickhouse_insert_sql(SQL) ->
end.
% This is a callback for ecpool which is triggered by the call to
% emqx_plugin_libs_pool:start_pool in on_start/2
% emqx_resource_pool:start in on_start/2
connect(Options) ->
URL = iolist_to_binary(emqx_http_lib:normalize(proplists:get_value(url, Options))),
@ -277,23 +274,20 @@ connect(Options) ->
-spec on_stop(resource_id(), resource_state()) -> term().
on_stop(ResourceID, #{poolname := PoolName}) ->
on_stop(InstanceID, #{pool_name := PoolName}) ->
?SLOG(info, #{
msg => "stopping clickouse connector",
connector => ResourceID
connector => InstanceID
}),
emqx_plugin_libs_pool:stop_pool(PoolName).
emqx_resource_pool:stop(PoolName).
%% -------------------------------------------------------------------
%% on_get_status emqx_resouce callback and related functions
%% -------------------------------------------------------------------
on_get_status(
_InstId,
#{
poolname := PoolName,
connect_timeout := Timeout
} = State
_InstanceID,
#{pool_name := PoolName, connect_timeout := Timeout} = State
) ->
case do_get_status(PoolName, Timeout) of
ok ->
@ -352,7 +346,7 @@ do_get_status(PoolName, Timeout) ->
on_query(
ResourceID,
{RequestType, DataOrSQL},
#{poolname := PoolName} = State
#{pool_name := PoolName} = State
) ->
?SLOG(debug, #{
msg => "clickhouse connector received sql query",
@ -391,16 +385,11 @@ query_type(send_message) ->
on_batch_query(
ResourceID,
BatchReq,
State
#{pool_name := PoolName, templates := Templates} = _State
) ->
%% Currently we only support batch requests with the send_message key
{Keys, ObjectsToInsert} = lists:unzip(BatchReq),
ensure_keys_are_of_type_send_message(Keys),
%% Pick out the SQL template
#{
templates := Templates,
poolname := PoolName
} = State,
%% Create batch insert SQL statement
SQL = objects_to_sql(ObjectsToInsert, Templates),
%% Do the actual query in the database

View File

@ -114,23 +114,23 @@ on_start(
Templates = parse_template(Config),
State = #{
poolname => InstanceId,
pool_name => InstanceId,
database => Database,
templates => Templates
},
case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
ok ->
{ok, State};
Error ->
Error
end.
on_stop(InstanceId, #{poolname := PoolName} = _State) ->
on_stop(InstanceId, #{pool_name := PoolName}) ->
?SLOG(info, #{
msg => "stopping_dynamo_connector",
connector => InstanceId
}),
emqx_plugin_libs_pool:stop_pool(PoolName).
emqx_resource_pool:stop(PoolName).
on_query(InstanceId, Query, State) ->
do_query(InstanceId, Query, handover, State).
@ -160,8 +160,8 @@ on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, Reply, State)
on_batch_query_async(_InstanceId, Query, _Reply, _State) ->
{error, {unrecoverable_error, {invalid_request, Query}}}.
on_get_status(_InstanceId, #{poolname := Pool}) ->
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1),
on_get_status(_InstanceId, #{pool_name := PoolName}) ->
Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1),
status_result(Health).
do_get_status(_Conn) ->
@ -183,7 +183,7 @@ do_query(
InstanceId,
Query,
ApplyMode,
#{poolname := PoolName, templates := Templates, database := Database} = State
#{pool_name := PoolName, templates := Templates, database := Database} = State
) ->
?TRACE(
"QUERY",

View File

@ -26,7 +26,6 @@
]).
-export([reply_delegator/3]).
-type bridge_id() :: binary().
-type jwt_worker() :: binary().
-type service_account_json() :: emqx_ee_bridge_gcp_pubsub:service_account_json().
-type config() :: #{
@ -43,7 +42,7 @@
jwt_worker_id := jwt_worker(),
max_retries := non_neg_integer(),
payload_template := emqx_plugin_libs_rule:tmpl_token(),
pool_name := atom(),
pool_name := binary(),
project_id := binary(),
pubsub_topic := binary(),
request_timeout := timer:time()
@ -102,14 +101,13 @@ on_start(
jwt_worker_id := JWTWorkerId,
project_id := ProjectId
} = ensure_jwt_worker(InstanceId, Config),
PoolName = emqx_plugin_libs_pool:pool_name(InstanceId),
State = #{
connect_timeout => ConnectTimeout,
instance_id => InstanceId,
jwt_worker_id => JWTWorkerId,
max_retries => MaxRetries,
payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate),
pool_name => PoolName,
pool_name => InstanceId,
project_id => ProjectId,
pubsub_topic => PubSubTopic,
request_timeout => RequestTimeout
@ -118,20 +116,20 @@ on_start(
gcp_pubsub_on_start_before_starting_pool,
#{
instance_id => InstanceId,
pool_name => PoolName,
pool_name => InstanceId,
pool_opts => PoolOpts
}
),
?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => PoolName}),
case ehttpc_sup:start_pool(PoolName, PoolOpts) of
?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => InstanceId}),
case ehttpc_sup:start_pool(InstanceId, PoolOpts) of
{ok, _} ->
{ok, State};
{error, {already_started, _}} ->
?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => PoolName}),
?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => InstanceId}),
{ok, State};
{error, Reason} ->
?tp(gcp_pubsub_ehttpc_pool_start_failure, #{
pool_name => PoolName,
pool_name => InstanceId,
reason => Reason
}),
{error, Reason}
@ -140,10 +138,7 @@ on_start(
-spec on_stop(manager_id(), state()) -> ok | {error, term()}.
on_stop(
InstanceId,
_State = #{
jwt_worker_id := JWTWorkerId,
pool_name := PoolName
}
_State = #{jwt_worker_id := JWTWorkerId, pool_name := PoolName}
) ->
?tp(gcp_pubsub_stop, #{instance_id => InstanceId, jwt_worker_id => JWTWorkerId}),
?SLOG(info, #{
@ -155,7 +150,7 @@ on_stop(
ehttpc_sup:stop_pool(PoolName).
-spec on_query(
bridge_id(),
resource_id(),
{send_message, map()},
state()
) ->
@ -163,32 +158,32 @@ on_stop(
| {ok, status_code(), headers(), body()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_query(BridgeId, {send_message, Selected}, State) ->
on_query(ResourceId, {send_message, Selected}, State) ->
Requests = [{send_message, Selected}],
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => BridgeId, state => State}
#{requests => Requests, connector => ResourceId, state => State}
),
do_send_requests_sync(State, Requests, BridgeId).
do_send_requests_sync(State, Requests, ResourceId).
-spec on_query_async(
bridge_id(),
resource_id(),
{send_message, map()},
{ReplyFun :: function(), Args :: list()},
state()
) -> {ok, pid()}.
on_query_async(BridgeId, {send_message, Selected}, ReplyFunAndArgs, State) ->
on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) ->
Requests = [{send_message, Selected}],
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => BridgeId, state => State}
#{requests => Requests, connector => ResourceId, state => State}
),
do_send_requests_async(State, Requests, ReplyFunAndArgs, BridgeId).
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId).
-spec on_batch_query(
bridge_id(),
resource_id(),
[{send_message, map()}],
state()
) ->
@ -196,34 +191,30 @@ on_query_async(BridgeId, {send_message, Selected}, ReplyFunAndArgs, State) ->
| {ok, status_code(), headers(), body()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_batch_query(BridgeId, Requests, State) ->
on_batch_query(ResourceId, Requests, State) ->
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => BridgeId, state => State}
#{requests => Requests, connector => ResourceId, state => State}
),
do_send_requests_sync(State, Requests, BridgeId).
do_send_requests_sync(State, Requests, ResourceId).
-spec on_batch_query_async(
bridge_id(),
resource_id(),
[{send_message, map()}],
{ReplyFun :: function(), Args :: list()},
state()
) -> {ok, pid()}.
on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) ->
on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => BridgeId, state => State}
#{requests => Requests, connector => ResourceId, state => State}
),
do_send_requests_async(State, Requests, ReplyFunAndArgs, BridgeId).
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId).
-spec on_get_status(manager_id(), state()) -> connected | disconnected.
on_get_status(InstanceId, State) ->
#{
connect_timeout := Timeout,
pool_name := PoolName
} = State,
on_get_status(InstanceId, #{connect_timeout := Timeout, pool_name := PoolName} = State) ->
case do_get_status(InstanceId, PoolName, Timeout) of
true ->
connected;
@ -245,8 +236,7 @@ on_get_status(InstanceId, State) ->
project_id := binary()
}.
ensure_jwt_worker(InstanceId, #{
service_account_json := ServiceAccountJSON,
pubsub_topic := PubSubTopic
service_account_json := ServiceAccountJSON
}) ->
#{
project_id := ProjectId,
@ -276,14 +266,8 @@ ensure_jwt_worker(InstanceId, #{
{ok, Worker0} ->
Worker0;
Error ->
?tp(
gcp_pubsub_bridge_jwt_worker_failed_to_start,
#{instance_id => InstanceId, reason => Error}
),
?SLOG(error, #{
msg => "failed_to_start_gcp_pubsub_jwt_worker",
instance_id => InstanceId,
pubsub_topic => PubSubTopic,
?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{
connector => InstanceId,
reason => Error
}),
_ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
@ -301,26 +285,14 @@ ensure_jwt_worker(InstanceId, #{
demonitor(MRef, [flush]),
ok;
{'DOWN', MRef, process, Worker, Reason} ->
?tp(
gcp_pubsub_bridge_jwt_worker_failed_to_start,
#{
resource_id => InstanceId,
reason => Reason
}
),
?SLOG(error, #{
msg => "gcp_pubsub_bridge_jwt_worker_failed_to_start",
?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{
connector => InstanceId,
reason => Reason
}),
_ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
throw(failed_to_start_jwt_worker)
after 10_000 ->
?tp(gcp_pubsub_bridge_jwt_timeout, #{resource_id => InstanceId}),
?SLOG(warning, #{
msg => "gcp_pubsub_bridge_jwt_timeout",
connector => InstanceId
}),
?tp(warning, "gcp_pubsub_bridge_jwt_timeout", #{connector => InstanceId}),
demonitor(MRef, [flush]),
_ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
throw(timeout_creating_jwt)
@ -569,7 +541,7 @@ reply_delegator(_ResourceId, ReplyFunAndArgs, Result) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
end.
-spec do_get_status(manager_id(), atom(), timer:time()) -> boolean().
-spec do_get_status(manager_id(), binary(), timer:time()) -> boolean().
do_get_status(InstanceId, PoolName, Timeout) ->
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)],
DoPerWorker =

View File

@ -126,7 +126,7 @@
%% -type size() :: integer().
-type state() :: #{
poolname := binary(),
pool_name := binary(),
resource_opts := map(),
sql_templates := map()
}.
@ -208,17 +208,16 @@ on_start(
{password, Password},
{driver, Driver},
{database, Database},
{pool_size, PoolSize},
{poolname, PoolName}
{pool_size, PoolSize}
],
State = #{
%% also InstanceId
poolname => PoolName,
pool_name => PoolName,
sql_templates => parse_sql_template(Config),
resource_opts => ResourceOpts
},
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options) of
case emqx_resource_pool:start(PoolName, ?MODULE, Options) of
ok ->
{ok, State};
{error, Reason} ->
@ -229,12 +228,12 @@ on_start(
{error, Reason}
end.
on_stop(InstanceId, #{poolname := PoolName} = _State) ->
on_stop(InstanceId, #{pool_name := PoolName} = _State) ->
?SLOG(info, #{
msg => "stopping_sqlserver_connector",
connector => InstanceId
}),
emqx_plugin_libs_pool:stop_pool(PoolName).
emqx_resource_pool:stop(PoolName).
-spec on_query(
manager_id(),
@ -265,7 +264,6 @@ on_query_async(
InstanceId,
{?ACTION_SEND_MESSAGE, _Msg} = Query,
ReplyFunAndArgs,
%% #{poolname := PoolName, sql_templates := Templates} = State
State
) ->
?TRACE(
@ -306,10 +304,12 @@ on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) ->
),
do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
on_get_status(_InstanceId, #{poolname := Pool, resource_opts := ResourceOpts} = _State) ->
on_get_status(_InstanceId, #{pool_name := PoolName, resource_opts := ResourceOpts} = _State) ->
RequestTimeout = ?REQUEST_TIMEOUT(ResourceOpts),
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(
Pool, {?MODULE, do_get_status, [RequestTimeout]}, RequestTimeout
Health = emqx_resource_pool:health_check_workers(
PoolName,
{?MODULE, do_get_status, [RequestTimeout]},
RequestTimeout
),
status_result(Health).
@ -382,7 +382,7 @@ do_query(
InstanceId,
Query,
ApplyMode,
#{poolname := PoolName, sql_templates := Templates} = State
#{pool_name := PoolName, sql_templates := Templates} = State
) ->
?TRACE(
"SINGLE_QUERY_SYNC",
@ -425,7 +425,7 @@ do_query(
end.
worker_do_insert(
Conn, SQL, #{resource_opts := ResourceOpts, poolname := InstanceId} = State
Conn, SQL, #{resource_opts := ResourceOpts, pool_name := InstanceId} = State
) ->
LogMeta = #{connector => InstanceId, state => State},
try

View File

@ -107,20 +107,20 @@ on_start(
],
Prepares = parse_prepare_sql(Config),
State = maps:merge(Prepares, #{poolname => InstanceId, query_opts => query_opts(Config)}),
case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of
State = Prepares#{pool_name => InstanceId, query_opts => query_opts(Config)},
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
ok ->
{ok, State};
Error ->
Error
end.
on_stop(InstanceId, #{poolname := PoolName} = _State) ->
on_stop(InstanceId, #{pool_name := PoolName}) ->
?SLOG(info, #{
msg => "stopping_tdengine_connector",
connector => InstanceId
}),
emqx_plugin_libs_pool:stop_pool(PoolName).
emqx_resource_pool:stop(PoolName).
on_query(InstanceId, {query, SQL}, State) ->
do_query(InstanceId, SQL, State);
@ -150,8 +150,8 @@ on_batch_query(
{error, {unrecoverable_error, invalid_request}}
end.
on_get_status(_InstanceId, #{poolname := Pool}) ->
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1),
on_get_status(_InstanceId, #{pool_name := PoolName}) ->
Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1),
status_result(Health).
do_get_status(Conn) ->
@ -171,7 +171,7 @@ do_batch_insert(InstanceId, BatchReqs, InsertPart, Tokens, State) ->
SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, InsertPart, Tokens),
do_query(InstanceId, SQL, State).
do_query(InstanceId, Query, #{poolname := PoolName, query_opts := Opts} = State) ->
do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State) ->
?TRACE(
"QUERY",
"tdengine_connector_received",

View File

@ -101,15 +101,15 @@ show(Label, What) ->
erlang:display({Label, What}),
What.
perform_lifecycle_check(PoolName, InitialConfig) ->
perform_lifecycle_check(ResourceId, InitialConfig) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?CASSANDRA_RESOURCE_MOD, InitialConfig),
{ok, #{
state := #{poolname := ReturnedPoolName} = State,
state := #{pool_name := PoolName} = State,
status := InitialStatus
}} =
emqx_resource:create_local(
PoolName,
ResourceId,
?CONNECTOR_RESOURCE_GROUP,
?CASSANDRA_RESOURCE_MOD,
CheckedConfig,
@ -121,45 +121,45 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
state := State,
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
emqx_resource:get_instance(ResourceId),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)),
% % Perform query as further check that the resource is working as expected
(fun() ->
erlang:display({pool_name, PoolName}),
QueryNoParamsResWrapper = emqx_resource:query(PoolName, test_query_no_params()),
erlang:display({pool_name, ResourceId}),
QueryNoParamsResWrapper = emqx_resource:query(ResourceId, test_query_no_params()),
?assertMatch({ok, _}, QueryNoParamsResWrapper)
end)(),
?assertEqual(ok, emqx_resource:stop(PoolName)),
?assertEqual(ok, emqx_resource:stop(ResourceId)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State,
status := StoppedStatus
}} =
emqx_resource:get_instance(PoolName),
emqx_resource:get_instance(ResourceId),
?assertEqual(stopped, StoppedStatus),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceId)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
% Can call stop/1 again on an already stopped instance
?assertEqual(ok, emqx_resource:stop(PoolName)),
?assertEqual(ok, emqx_resource:stop(ResourceId)),
% Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName)),
?assertEqual(ok, emqx_resource:restart(ResourceId)),
% async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
emqx_resource:get_instance(ResourceId),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)),
(fun() ->
QueryNoParamsResWrapper =
emqx_resource:query(PoolName, test_query_no_params()),
emqx_resource:query(ResourceId, test_query_no_params()),
?assertMatch({ok, _}, QueryNoParamsResWrapper)
end)(),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
?assertEqual(ok, emqx_resource:remove_local(ResourceId)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
% Should not even be able to get the resource data out of ets now unlike just stopping.
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceId)).
%%--------------------------------------------------------------------
%% utils

View File

@ -95,15 +95,15 @@ show(Label, What) ->
erlang:display({Label, What}),
What.
perform_lifecycle_check(PoolName, InitialConfig) ->
perform_lifecycle_check(ResourceID, InitialConfig) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?CLICKHOUSE_RESOURCE_MOD, InitialConfig),
{ok, #{
state := #{poolname := ReturnedPoolName} = State,
state := #{pool_name := PoolName} = State,
status := InitialStatus
}} =
emqx_resource:create_local(
PoolName,
ResourceID,
?CONNECTOR_RESOURCE_GROUP,
?CLICKHOUSE_RESOURCE_MOD,
CheckedConfig,
@ -115,49 +115,49 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
state := State,
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
emqx_resource:get_instance(ResourceID),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)),
% % Perform query as further check that the resource is working as expected
(fun() ->
erlang:display({pool_name, PoolName}),
QueryNoParamsResWrapper = emqx_resource:query(PoolName, test_query_no_params()),
erlang:display({pool_name, ResourceID}),
QueryNoParamsResWrapper = emqx_resource:query(ResourceID, test_query_no_params()),
?assertMatch({ok, _}, QueryNoParamsResWrapper),
{_, QueryNoParamsRes} = QueryNoParamsResWrapper,
?assertMatch(<<"1">>, string:trim(QueryNoParamsRes))
end)(),
?assertEqual(ok, emqx_resource:stop(PoolName)),
?assertEqual(ok, emqx_resource:stop(ResourceID)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State,
status := StoppedStatus
}} =
emqx_resource:get_instance(PoolName),
emqx_resource:get_instance(ResourceID),
?assertEqual(stopped, StoppedStatus),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceID)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
% Can call stop/1 again on an already stopped instance
?assertEqual(ok, emqx_resource:stop(PoolName)),
?assertEqual(ok, emqx_resource:stop(ResourceID)),
% Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName)),
?assertEqual(ok, emqx_resource:restart(ResourceID)),
% async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
emqx_resource:get_instance(ResourceID),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)),
(fun() ->
QueryNoParamsResWrapper =
emqx_resource:query(PoolName, test_query_no_params()),
emqx_resource:query(ResourceID, test_query_no_params()),
?assertMatch({ok, _}, QueryNoParamsResWrapper),
{_, QueryNoParamsRes} = QueryNoParamsResWrapper,
?assertMatch(<<"1">>, string:trim(QueryNoParamsRes))
end)(),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
?assertEqual(ok, emqx_resource:remove_local(ResourceID)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)),
% Should not even be able to get the resource data out of ets now unlike just stopping.
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceID)).
% %%------------------------------------------------------------------------------
% %% Helpers