From 30bdffe3187be7db88f85821417a909bd7b9e536 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 10 Apr 2023 15:08:10 +0800 Subject: [PATCH] feat: support async and batch callback for cassandra connector --- lib-ee/emqx_ee_bridge/rebar.config | 2 +- .../test/emqx_ee_bridge_cassa_SUITE.erl | 65 +++------ .../src/emqx_ee_connector_cassa.erl | 125 ++++++++++++++++-- 3 files changed, 133 insertions(+), 59 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index 985b387d4..2df1f9f6a 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -3,7 +3,7 @@ , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} - , {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.4.2"}}} + , {ecql, {git, "https://github.com/emqx/ecql.git", {branch, "batch-support"}}} , {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl index f1ea6e930..826df9be8 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl @@ -72,10 +72,10 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), - NonBatchCases = [t_write_timeout], + NonBatchCases = [t_write_timeout, t_simple_sql_query], QueryModeGroups = [{group, async}, {group, sync}], BatchingGroups = [ - %{group, with_batch}, + {group, with_batch}, {group, without_batch} ], [ @@ -404,12 +404,7 @@ t_setup_via_config_and_publish(Config) -> end, fun(Trace0) -> Trace = ?of_kind(cassandra_connector_query_return, Trace0), - case ?config(enable_batch, Config) of - true -> - ?assertMatch([#{result := {_, [ok]}}], Trace); - false -> - ?assertMatch([#{result := ok}], Trace) - end, + ?assertMatch([#{result := ok}], Trace), ok end ), @@ -448,12 +443,7 @@ t_setup_via_http_api_and_publish(Config) -> end, fun(Trace0) -> Trace = ?of_kind(cassandra_connector_query_return, Trace0), - case ?config(enable_batch, Config) of - true -> - ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace); - false -> - ?assertMatch([#{result := ok}], Trace) - end, + ?assertMatch([#{result := ok}], Trace), ok end ), @@ -540,8 +530,8 @@ t_write_failure(Config) -> fun(Trace0) -> ct:pal("trace: ~p", [Trace0]), Trace = ?of_kind(buffer_worker_flush_nack, Trace0), - ?assertMatch([#{result := {error, _}} | _], Trace), - [#{result := {error, Error}} | _] = Trace, + ?assertMatch([#{result := {async_return, {error, _}}} | _], Trace), + [#{result := {async_return, {error, Error}}} | _] = Trace, case Error of {resource_error, _} -> ok; @@ -576,7 +566,6 @@ t_write_failure(Config) -> % ok. t_simple_sql_query(Config) -> - EnableBatch = ?config(enable_batch, Config), QueryMode = ?config(query_mode, Config), ?assertMatch( {ok, _}, @@ -592,12 +581,7 @@ t_simple_sql_query(Config) -> {ok, Res} = receive_result(Ref, 2_000), Res end, - case EnableBatch of - true -> - ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); - false -> - ?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result) - end, + ?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result), ok. t_missing_data(Config) -> @@ -607,27 +591,25 @@ t_missing_data(Config) -> ), %% emqx_ee_connector_cassa will send missed data as a `null` atom %% to ecql driver - {_, {ok, Event}} = - ?wait_async_action( - send_message(Config, #{}), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), - ?assertMatch( - %% TODO: match error msgs + send_message(Config, #{}), + ?block_until( #{ - result := - {error, {unrecoverable_error, {8704, <<"Expected 8 or 0 byte long for date (4)">>}}} + ?snk_kind := buffer_worker_flush_ack, + result := {async_return, ok} }, - Event + 2_000 + ), + ?block_until( + #{ + ?snk_kind := handle_async_reply_enter, + result := {error, {8704, _}} + }, + 2_000 ), ok. t_bad_sql_parameter(Config) -> QueryMode = ?config(query_mode, Config), - EnableBatch = ?config(enable_batch, Config), - Name = ?config(cassa_name, Config), - ResourceId = emqx_bridge_resource:resource_id(cassandra, Name), ?assertMatch( {ok, _}, create_bridge( @@ -656,14 +638,7 @@ t_bad_sql_parameter(Config) -> ct:fail("no response received") end end, - case EnableBatch of - true -> - ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); - false -> - ?assertMatch( - {error, {unrecoverable_error, _}}, Result - ) - end, + ?assertMatch({error, _}, Result), ok. t_nasty_sql_string(Config) -> diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl index a6a77f233..4b3804c01 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl @@ -33,8 +33,9 @@ on_start/2, on_stop/2, on_query/3, - %% TODO: not_supported_now - %%on_batch_query/3, + on_query_async/4, + on_batch_query/3, + on_batch_query_async/4, on_get_status/2 ]). @@ -45,7 +46,7 @@ ]). %% callbacks for query executing --export([query/3, prepared_query/3]). +-export([query/4, prepared_query/4, batch_query/3]). -export([do_get_status/1]). @@ -96,7 +97,7 @@ keyspace(_) -> undefined. %%-------------------------------------------------------------------- %% callbacks for emqx_resource -callback_mode() -> always_sync. +callback_mode() -> async_if_possible. -spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}. on_start( @@ -172,6 +173,28 @@ on_stop(InstId, #{poolname := PoolName}) -> on_query( InstId, Request, + State +) -> + do_signle_query(InstId, Request, sync, State). + +-spec on_query_async( + emqx_resource:resource_id(), + request(), + {function(), list()}, + state() +) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}. +on_query_async( + InstId, + Request, + Callback, + State +) -> + do_signle_query(InstId, Request, {async, Callback}, State). + +do_signle_query( + InstId, + Request, + Async, #{poolname := PoolName} = State ) -> {Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request), @@ -187,7 +210,59 @@ on_query( } ), {PreparedKeyOrSQL1, Data} = proc_cql_params(Type, PreparedKeyOrSQL, Params, State), - Res = exec_cql_query(InstId, PoolName, Type, PreparedKeyOrSQL1, Data), + Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrSQL1, Data), + handle_result(Res). + +-spec on_batch_query( + emqx_resource:resource_id(), + [request()], + state() +) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}. +on_batch_query( + InstId, + Requests, + State +) -> + do_batch_query(InstId, Requests, sync, State). + +-spec on_batch_query_async( + emqx_resource:resource_id(), + [request()], + {function(), list()}, + state() +) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}. +on_batch_query_async( + InstId, + Requests, + Callback, + State +) -> + do_batch_query(InstId, Requests, {async, Callback}, State). + +do_batch_query( + InstId, + Requests, + Async, + #{poolname := PoolName} = State +) -> + CQLs = + lists:map( + fun(Request) -> + {Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request), + proc_cql_params(Type, PreparedKeyOrSQL, Params, State) + end, + Requests + ), + ?tp( + debug, + cassandra_connector_received_cql_batch_query, + #{ + connector => InstId, + cqls => CQLs, + state => State + } + ), + Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs), handle_result(Res). parse_request_to_cql({send_message, Params}) -> @@ -203,17 +278,32 @@ proc_cql_params( Params, #{prepare_statement := Prepares, params_tokens := ParamsTokens} ) -> - PreparedKey = maps:get(PreparedKey0, Prepares), + %% assert + _PreparedKey = maps:get(PreparedKey0, Prepares), Tokens = maps:get(PreparedKey0, ParamsTokens), - {PreparedKey, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}; + {PreparedKey0, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}; proc_cql_params(query, SQL, Params, _State) -> {SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'), {SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}. -exec_cql_query(InstId, PoolName, Type, PreparedKey, Data) when +exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when Type == query; Type == prepared_query -> - case ecpool:pick_and_do(PoolName, {?MODULE, Type, [PreparedKey, Data]}, no_handover) of + case ecpool:pick_and_do(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}, no_handover) of + {error, Reason} = Result -> + ?tp( + error, + cassandra_connector_query_return, + #{connector => InstId, error => Reason} + ), + Result; + Result -> + ?tp(debug, cassandra_connector_query_return, #{result => Result}), + Result + end. + +exec_cql_batch_query(InstId, PoolName, Async, CQLs) -> + case ecpool:pick_and_do(PoolName, {?MODULE, batch_query, [Async, CQLs]}, no_handover) of {error, Reason} = Result -> ?tp( error, @@ -261,11 +351,20 @@ do_check_prepares(State = #{poolname := PoolName, prepare_cql := {error, Prepare %%-------------------------------------------------------------------- %% callbacks query -query(Conn, SQL, Params) -> - ecql:query(Conn, SQL, Params). +query(Conn, sync, CQL, Params) -> + ecql:query(Conn, CQL, Params); +query(Conn, {async, Callback}, CQL, Params) -> + ecql:async_query(Conn, CQL, Params, one, Callback). -prepared_query(Conn, PreparedKey, Params) -> - ecql:execute(Conn, PreparedKey, Params). +prepared_query(Conn, sync, PreparedKey, Params) -> + ecql:execute(Conn, PreparedKey, Params); +prepared_query(Conn, {async, Callback}, PreparedKey, Params) -> + ecql:async_execute(Conn, PreparedKey, Params, Callback). + +batch_query(Conn, sync, Rows) -> + ecql:batch(Conn, Rows); +batch_query(Conn, {async, Callback}, Rows) -> + ecql:async_batch(Conn, Rows, Callback). %%-------------------------------------------------------------------- %% callbacks for ecpool