fix(cassandra): ensure async calls return connection pid

so the buffer worker can monitor it and perform retries
if the connection restarted
This commit is contained in:
Zaiming (Stone) Shi 2023-04-26 14:30:08 +02:00
parent 1c4f4037a5
commit c83d630c97
2 changed files with 21 additions and 8 deletions

View File

@ -404,7 +404,7 @@ t_setup_via_config_and_publish(Config) ->
end, end,
fun(Trace0) -> fun(Trace0) ->
Trace = ?of_kind(cassandra_connector_query_return, Trace0), Trace = ?of_kind(cassandra_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace), ?assertMatch([#{result := {ok, _Pid}}], Trace),
ok ok
end end
), ),
@ -443,7 +443,7 @@ t_setup_via_http_api_and_publish(Config) ->
end, end,
fun(Trace0) -> fun(Trace0) ->
Trace = ?of_kind(cassandra_connector_query_return, Trace0), Trace = ?of_kind(cassandra_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace), ?assertMatch([#{result := {ok, _Pid}}], Trace),
ok ok
end end
), ),
@ -603,7 +603,7 @@ t_missing_data(Config) ->
fun(Trace0) -> fun(Trace0) ->
%% 1. ecql driver will return `ok` first in async query %% 1. ecql driver will return `ok` first in async query
Trace = ?of_kind(cassandra_connector_query_return, Trace0), Trace = ?of_kind(cassandra_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace), ?assertMatch([#{result := {ok, _Pid}}], Trace),
%% 2. then it will return an error in callback function %% 2. then it will return an error in callback function
Trace1 = ?of_kind(handle_async_reply, Trace0), Trace1 = ?of_kind(handle_async_reply, Trace0),
?assertMatch([#{result := {error, {8704, _}}}], Trace1), ?assertMatch([#{result := {error, {8704, _}}}], Trace1),

View File

@ -278,7 +278,7 @@ proc_cql_params(query, SQL, Params, _State) ->
exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
Type == query; Type == prepared_query Type == query; Type == prepared_query
-> ->
case ecpool:pick_and_do(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}, no_handover) of case exec(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}) of
{error, Reason} = Result -> {error, Reason} = Result ->
?tp( ?tp(
error, error,
@ -292,7 +292,7 @@ exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
end. end.
exec_cql_batch_query(InstId, PoolName, Async, CQLs) -> exec_cql_batch_query(InstId, PoolName, Async, CQLs) ->
case ecpool:pick_and_do(PoolName, {?MODULE, batch_query, [Async, CQLs]}, no_handover) of case exec(PoolName, {?MODULE, batch_query, [Async, CQLs]}) of
{error, Reason} = Result -> {error, Reason} = Result ->
?tp( ?tp(
error, error,
@ -305,6 +305,13 @@ exec_cql_batch_query(InstId, PoolName, Async, CQLs) ->
Result Result
end. end.
%% Pick one of the pool members to do the query.
%% Using 'no_handoever' strategy,
%% meaning the buffer worker does the gen_server call or gen_server cast
%% towards the connection process.
exec(PoolName, Query) ->
ecpool:pick_and_do(PoolName, Query, no_handover).
on_get_status(_InstId, #{poolname := Pool} = State) -> on_get_status(_InstId, #{poolname := Pool} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
true -> true ->
@ -343,17 +350,23 @@ do_check_prepares(State = #{poolname := PoolName, prepare_cql := {error, Prepare
query(Conn, sync, CQL, Params) -> query(Conn, sync, CQL, Params) ->
ecql:query(Conn, CQL, Params); ecql:query(Conn, CQL, Params);
query(Conn, {async, Callback}, CQL, Params) -> query(Conn, {async, Callback}, CQL, Params) ->
ecql:async_query(Conn, CQL, Params, one, Callback). ok = ecql:async_query(Conn, CQL, Params, one, Callback),
%% return the connection pid for buffer worker to monitor
{ok, Conn}.
prepared_query(Conn, sync, PreparedKey, Params) -> prepared_query(Conn, sync, PreparedKey, Params) ->
ecql:execute(Conn, PreparedKey, Params); ecql:execute(Conn, PreparedKey, Params);
prepared_query(Conn, {async, Callback}, PreparedKey, Params) -> prepared_query(Conn, {async, Callback}, PreparedKey, Params) ->
ecql:async_execute(Conn, PreparedKey, Params, Callback). ok = ecql:async_execute(Conn, PreparedKey, Params, Callback),
%% return the connection pid for buffer worker to monitor
{ok, Conn}.
batch_query(Conn, sync, Rows) -> batch_query(Conn, sync, Rows) ->
ecql:batch(Conn, Rows); ecql:batch(Conn, Rows);
batch_query(Conn, {async, Callback}, Rows) -> batch_query(Conn, {async, Callback}, Rows) ->
ecql:async_batch(Conn, Rows, Callback). ok = ecql:async_batch(Conn, Rows, Callback),
%% return the connection pid for buffer worker to monitor
{ok, Conn}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% callbacks for ecpool %% callbacks for ecpool