fix(webhook): pick worker according to pool type

This commit is contained in:
Zaiming (Stone) Shi 2023-02-01 09:53:25 +01:00
parent f4381d90ca
commit 9f6b6cedc6
1 changed files with 16 additions and 11 deletions

View File

@ -234,6 +234,7 @@ on_start(
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
State = #{ State = #{
pool_name => PoolName, pool_name => PoolName,
pool_type => PoolType,
host => Host, host => Host,
port => Port, port => Port,
connect_timeout => ConnectTimeout, connect_timeout => ConnectTimeout,
@ -283,7 +284,7 @@ on_query(InstId, {Method, Request, Timeout}, State) ->
on_query( on_query(
InstId, InstId,
{KeyOrNum, Method, Request, Timeout, Retry}, {KeyOrNum, Method, Request, Timeout, Retry},
#{pool_name := PoolName, base_path := BasePath} = State #{base_path := BasePath} = State
) -> ) ->
?TRACE( ?TRACE(
"QUERY", "QUERY",
@ -291,12 +292,10 @@ on_query(
#{request => Request, connector => InstId, state => State} #{request => Request, connector => InstId, state => State}
), ),
NRequest = formalize_request(Method, BasePath, Request), NRequest = formalize_request(Method, BasePath, Request),
Worker = resolve_pool_worker(State, KeyOrNum),
case case
ehttpc:request( ehttpc:request(
case KeyOrNum of Worker,
undefined -> PoolName;
_ -> {PoolName, KeyOrNum}
end,
Method, Method,
NRequest, NRequest,
Timeout, Timeout,
@ -364,19 +363,15 @@ on_query_async(
InstId, InstId,
{KeyOrNum, Method, Request, Timeout}, {KeyOrNum, Method, Request, Timeout},
ReplyFunAndArgs, ReplyFunAndArgs,
#{pool_name := PoolName, base_path := BasePath} = State #{base_path := BasePath} = State
) -> ) ->
Worker = resolve_pool_worker(State, KeyOrNum),
?TRACE( ?TRACE(
"QUERY_ASYNC", "QUERY_ASYNC",
"http_connector_received", "http_connector_received",
#{request => Request, connector => InstId, state => State} #{request => Request, connector => InstId, state => State}
), ),
NRequest = formalize_request(Method, BasePath, Request), NRequest = formalize_request(Method, BasePath, Request),
Worker =
case KeyOrNum of
undefined -> ehttpc_pool:pick_worker(PoolName);
_ -> ehttpc_pool:pick_worker(PoolName, KeyOrNum)
end,
ok = ehttpc:request_async( ok = ehttpc:request_async(
Worker, Worker,
Method, Method,
@ -386,6 +381,16 @@ on_query_async(
), ),
{ok, Worker}. {ok, Worker}.
resolve_pool_worker(State, undefined) ->
resolve_pool_worker(State, self());
resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
case maps:get(pool_type, State, random) of
random ->
ehttpc_pool:pick_worker(PoolName);
hash ->
ehttpc_pool:pick_worker(PoolName, Key)
end.
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of case do_get_status(PoolName, Timeout) of
ok -> ok ->