chore: dont allocate resource for simple connectiors

This commit is contained in:
JianBo He 2023-06-05 10:53:55 +08:00
parent e717ddafd7
commit dbc0cdce67
13 changed files with 12 additions and 84 deletions

View File

@ -132,7 +132,6 @@ on_start(
[] []
end, end,
State = parse_prepare_cql(Config), State = parse_prepare_cql(Config),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
{ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
@ -149,12 +148,7 @@ on_stop(InstId, _State) ->
msg => "stopping_cassandra_connector", msg => "stopping_cassandra_connector",
connector => InstId connector => InstId
}), }),
case emqx_resource:get_allocated_resources(InstId) of emqx_resource_pool:stop(InstId).
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
-type request() :: -type request() ::
% emqx_bridge.erl % emqx_bridge.erl

View File

@ -158,7 +158,6 @@ on_start(
templates => Templates, templates => Templates,
connect_timeout => ConnectTimeout connect_timeout => ConnectTimeout
}, },
ok = emqx_resource:allocate_resource(InstanceID, pool_name, InstanceID),
case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -280,12 +279,7 @@ on_stop(InstanceID, _State) ->
msg => "stopping clickouse connector", msg => "stopping clickouse connector",
connector => InstanceID connector => InstanceID
}), }),
case emqx_resource:get_allocated_resources(InstanceID) of emqx_resource_pool:stop(InstanceID).
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% on_get_status emqx_resouce callback and related functions %% on_get_status emqx_resouce callback and related functions

View File

@ -104,7 +104,6 @@ on_start(
table => Table, table => Table,
templates => Templates templates => Templates
}, },
ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId),
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -117,12 +116,7 @@ on_stop(InstanceId, _State) ->
msg => "stopping_dynamo_connector", msg => "stopping_dynamo_connector",
connector => InstanceId connector => InstanceId
}), }),
case emqx_resource:get_allocated_resources(InstanceId) of emqx_resource_pool:stop(InstanceId).
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query(InstanceId, Query, State) -> on_query(InstanceId, Query, State) ->
do_query(InstanceId, Query, State). do_query(InstanceId, Query, State).

View File

@ -78,7 +78,6 @@ on_start(
State = #{pool_name => InstanceId, server => Server}, State = #{pool_name => InstanceId, server => Server},
case opentsdb_connectivity(Server) of case opentsdb_connectivity(Server) of
ok -> ok ->
ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId),
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -95,12 +94,7 @@ on_stop(InstanceId, _State) ->
msg => "stopping_opents_connector", msg => "stopping_opents_connector",
connector => InstanceId connector => InstanceId
}), }),
case emqx_resource:get_allocated_resources(InstanceId) of emqx_resource_pool:stop(InstanceId).
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query(InstanceId, Request, State) -> on_query(InstanceId, Request, State) ->
on_batch_query(InstanceId, [Request], State). on_batch_query(InstanceId, [Request], State).

View File

@ -242,7 +242,6 @@ on_start(
%% Already initialized %% Already initialized
ok ok
end, end,
ok = emqx_resource:allocate_resource(InstanceID, pool_name, InstanceID),
case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -268,12 +267,7 @@ on_stop(
msg => "stopping RabbitMQ connector", msg => "stopping RabbitMQ connector",
connector => ResourceID connector => ResourceID
}), }),
case emqx_resource:get_allocated_resources(ResourceID) of stop_clients_and_pool(ResourceID).
#{pool_name := PoolName} ->
stop_clients_and_pool(PoolName);
_ ->
ok
end.
stop_clients_and_pool(PoolName) -> stop_clients_and_pool(PoolName) ->
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],

View File

@ -215,7 +215,6 @@ on_start(
sql_templates => parse_sql_template(Config), sql_templates => parse_sql_template(Config),
resource_opts => ResourceOpts resource_opts => ResourceOpts
}, },
ok = emqx_resource:allocate_resource(ResourceId, pool_name, PoolName),
case emqx_resource_pool:start(PoolName, ?MODULE, Options) of case emqx_resource_pool:start(PoolName, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -232,12 +231,7 @@ on_stop(ResourceId, _State) ->
msg => "stopping_sqlserver_connector", msg => "stopping_sqlserver_connector",
connector => ResourceId connector => ResourceId
}), }),
case emqx_resource:get_allocated_resources(ResourceId) of emqx_resource_pool:stop(ResourceId).
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
-spec on_query( -spec on_query(
resource_id(), resource_id(),

View File

@ -108,7 +108,6 @@ on_start(
Prepares = parse_prepare_sql(Config), Prepares = parse_prepare_sql(Config),
State = Prepares#{pool_name => InstanceId, query_opts => query_opts(Config)}, State = Prepares#{pool_name => InstanceId, query_opts => query_opts(Config)},
ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId),
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -121,12 +120,7 @@ on_stop(InstanceId, _State) ->
msg => "stopping_tdengine_connector", msg => "stopping_tdengine_connector",
connector => InstanceId connector => InstanceId
}), }),
case emqx_resource:get_allocated_resources(InstanceId) of emqx_resource_pool:stop(InstanceId).
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query(InstanceId, {query, SQL}, State) -> on_query(InstanceId, {query, SQL}, State) ->
do_query(InstanceId, SQL, State); do_query(InstanceId, SQL, State);

View File

@ -219,7 +219,6 @@ on_start(
base_path => BasePath, base_path => BasePath,
request => preprocess_request(maps:get(request, Config, undefined)) request => preprocess_request(maps:get(request, Config, undefined))
}, },
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case ehttpc_sup:start_pool(InstId, PoolOpts) of case ehttpc_sup:start_pool(InstId, PoolOpts) of
{ok, _} -> {ok, State}; {ok, _} -> {ok, State};
{error, {already_started, _}} -> {ok, State}; {error, {already_started, _}} -> {ok, State};
@ -231,12 +230,7 @@ on_stop(InstId, _State) ->
msg => "stopping_http_connector", msg => "stopping_http_connector",
connector => InstId connector => InstId
}), }),
case emqx_resource:get_allocated_resources(InstId) of ehttpc_sup:stop_pool(InstId).
#{pool_name := PoolName} ->
ehttpc_sup:stop_pool(PoolName);
_ ->
ok
end.
on_query(InstId, {send_message, Msg}, State) -> on_query(InstId, {send_message, Msg}, State) ->
case maps:get(request, State, undefined) of case maps:get(request, State, undefined) of

View File

@ -97,7 +97,6 @@ on_start(
{pool_size, PoolSize}, {pool_size, PoolSize},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL} {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
], ],
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ SslOpts) of
ok -> {ok, #{pool_name => InstId}}; ok -> {ok, #{pool_name => InstId}};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
@ -108,12 +107,7 @@ on_stop(InstId, _State) ->
msg => "stopping_ldap_connector", msg => "stopping_ldap_connector",
connector => InstId connector => InstId
}), }),
case emqx_resource:get_allocated_resources(InstId) of emqx_resource_pool:stop(InstId).
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} = State) -> on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} = State) ->
Request = {Base, Filter, Attributes}, Request = {Base, Filter, Attributes},

View File

@ -183,7 +183,6 @@ on_start(
{worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)} {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}
], ],
Collection = maps:get(collection, Config, <<"mqtt">>), Collection = maps:get(collection, Config, <<"mqtt">>),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Opts) of case emqx_resource_pool:start(InstId, ?MODULE, Opts) of
ok -> ok ->
{ok, #{ {ok, #{
@ -200,12 +199,7 @@ on_stop(InstId, _State) ->
msg => "stopping_mongodb_connector", msg => "stopping_mongodb_connector",
connector => InstId connector => InstId
}), }),
case emqx_resource:get_allocated_resources(InstId) of emqx_resource_pool:stop(InstId).
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query( on_query(
InstId, InstId,

View File

@ -124,7 +124,6 @@ on_start(
] ]
), ),
State = parse_prepare_sql(Config), State = parse_prepare_sql(Config),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
{ok, init_prepare(State#{pool_name => InstId})}; {ok, init_prepare(State#{pool_name => InstId})};
@ -146,12 +145,7 @@ on_stop(InstId, _State) ->
msg => "stopping_mysql_connector", msg => "stopping_mysql_connector",
connector => InstId connector => InstId
}), }),
case emqx_resource:get_allocated_resources(InstId) of emqx_resource_pool:stop(InstId).
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query(InstId, {TypeOrKey, SQLOrKey}, State) -> on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State); on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);

View File

@ -121,7 +121,6 @@ on_start(
{pool_size, PoolSize} {pool_size, PoolSize}
], ],
State = parse_prepare_sql(Config), State = parse_prepare_sql(Config),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
{ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
@ -138,12 +137,7 @@ on_stop(InstId, _State) ->
msg => "stopping postgresql connector", msg => "stopping postgresql connector",
connector => InstId connector => InstId
}), }),
case emqx_resource:get_allocated_resources(InstId) of emqx_resource_pool:stop(InstId).
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query(InstId, {TypeOrKey, NameOrSQL}, State) -> on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);