diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl index 3e442a926..2e3510aed 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl @@ -404,7 +404,7 @@ t_setup_via_config_and_publish(Config) -> end, fun(Trace0) -> Trace = ?of_kind(cassandra_connector_query_return, Trace0), - ?assertMatch([#{result := ok}], Trace), + ?assertMatch([#{result := {ok, _Pid}}], Trace), ok end ), @@ -443,7 +443,7 @@ t_setup_via_http_api_and_publish(Config) -> end, fun(Trace0) -> Trace = ?of_kind(cassandra_connector_query_return, Trace0), - ?assertMatch([#{result := ok}], Trace), + ?assertMatch([#{result := {ok, _Pid}}], Trace), ok end ), @@ -603,7 +603,7 @@ t_missing_data(Config) -> fun(Trace0) -> %% 1. ecql driver will return `ok` first in async query Trace = ?of_kind(cassandra_connector_query_return, Trace0), - ?assertMatch([#{result := ok}], Trace), + ?assertMatch([#{result := {ok, _Pid}}], Trace), %% 2. then it will return an error in callback function Trace1 = ?of_kind(handle_async_reply, Trace0), ?assertMatch([#{result := {error, {8704, _}}}], Trace1), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl index 86b908038..c4f3e9b87 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl @@ -278,7 +278,7 @@ proc_cql_params(query, SQL, Params, _State) -> exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when Type == query; Type == prepared_query -> - case ecpool:pick_and_do(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}, no_handover) of + case exec(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}) of {error, Reason} = Result -> ?tp( error, @@ -292,7 +292,7 @@ exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when end. exec_cql_batch_query(InstId, PoolName, Async, CQLs) -> - case ecpool:pick_and_do(PoolName, {?MODULE, batch_query, [Async, CQLs]}, no_handover) of + case exec(PoolName, {?MODULE, batch_query, [Async, CQLs]}) of {error, Reason} = Result -> ?tp( error, @@ -305,6 +305,13 @@ exec_cql_batch_query(InstId, PoolName, Async, CQLs) -> Result end. +%% Pick one of the pool members to do the query. +%% Using 'no_handoever' strategy, +%% meaning the buffer worker does the gen_server call or gen_server cast +%% towards the connection process. +exec(PoolName, Query) -> + ecpool:pick_and_do(PoolName, Query, no_handover). + on_get_status(_InstId, #{poolname := Pool} = State) -> case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of true -> @@ -343,17 +350,23 @@ do_check_prepares(State = #{poolname := PoolName, prepare_cql := {error, Prepare query(Conn, sync, CQL, Params) -> ecql:query(Conn, CQL, Params); query(Conn, {async, Callback}, CQL, Params) -> - ecql:async_query(Conn, CQL, Params, one, Callback). + ok = ecql:async_query(Conn, CQL, Params, one, Callback), + %% return the connection pid for buffer worker to monitor + {ok, Conn}. prepared_query(Conn, sync, PreparedKey, Params) -> ecql:execute(Conn, PreparedKey, Params); prepared_query(Conn, {async, Callback}, PreparedKey, Params) -> - ecql:async_execute(Conn, PreparedKey, Params, Callback). + ok = ecql:async_execute(Conn, PreparedKey, Params, Callback), + %% return the connection pid for buffer worker to monitor + {ok, Conn}. batch_query(Conn, sync, Rows) -> ecql:batch(Conn, Rows); batch_query(Conn, {async, Callback}, Rows) -> - ecql:async_batch(Conn, Rows, Callback). + ok = ecql:async_batch(Conn, Rows, Callback), + %% return the connection pid for buffer worker to monitor + {ok, Conn}. %%-------------------------------------------------------------------- %% callbacks for ecpool diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index a85aa36af..f8ed3dff0 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -47,7 +47,7 @@ while [ "$#" -gt 0 ]; do exit 0 ;; --app) - WHICH_APP="$2" + WHICH_APP="${2%/}" shift 2 ;; --only-up)