feat: support async and batch callback for cassandra connector

This commit is contained in:
JianBo He 2023-04-10 15:08:10 +08:00
parent e186477531
commit 30bdffe318
3 changed files with 133 additions and 59 deletions

View File

@ -3,7 +3,7 @@
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}}
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}
, {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.4.2"}}} , {ecql, {git, "https://github.com/emqx/ecql.git", {branch, "batch-support"}}}
, {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_resource, {path, "../../apps/emqx_resource"}}
, {emqx_bridge, {path, "../../apps/emqx_bridge"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}}

View File

@ -72,10 +72,10 @@ all() ->
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout], NonBatchCases = [t_write_timeout, t_simple_sql_query],
QueryModeGroups = [{group, async}, {group, sync}], QueryModeGroups = [{group, async}, {group, sync}],
BatchingGroups = [ BatchingGroups = [
%{group, with_batch}, {group, with_batch},
{group, without_batch} {group, without_batch}
], ],
[ [
@ -404,12 +404,7 @@ t_setup_via_config_and_publish(Config) ->
end, end,
fun(Trace0) -> fun(Trace0) ->
Trace = ?of_kind(cassandra_connector_query_return, Trace0), Trace = ?of_kind(cassandra_connector_query_return, Trace0),
case ?config(enable_batch, Config) of ?assertMatch([#{result := ok}], Trace),
true ->
?assertMatch([#{result := {_, [ok]}}], Trace);
false ->
?assertMatch([#{result := ok}], Trace)
end,
ok ok
end end
), ),
@ -448,12 +443,7 @@ t_setup_via_http_api_and_publish(Config) ->
end, end,
fun(Trace0) -> fun(Trace0) ->
Trace = ?of_kind(cassandra_connector_query_return, Trace0), Trace = ?of_kind(cassandra_connector_query_return, Trace0),
case ?config(enable_batch, Config) of ?assertMatch([#{result := ok}], Trace),
true ->
?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
false ->
?assertMatch([#{result := ok}], Trace)
end,
ok ok
end end
), ),
@ -540,8 +530,8 @@ t_write_failure(Config) ->
fun(Trace0) -> fun(Trace0) ->
ct:pal("trace: ~p", [Trace0]), ct:pal("trace: ~p", [Trace0]),
Trace = ?of_kind(buffer_worker_flush_nack, Trace0), Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
?assertMatch([#{result := {error, _}} | _], Trace), ?assertMatch([#{result := {async_return, {error, _}}} | _], Trace),
[#{result := {error, Error}} | _] = Trace, [#{result := {async_return, {error, Error}}} | _] = Trace,
case Error of case Error of
{resource_error, _} -> {resource_error, _} ->
ok; ok;
@ -576,7 +566,6 @@ t_write_failure(Config) ->
% ok. % ok.
t_simple_sql_query(Config) -> t_simple_sql_query(Config) ->
EnableBatch = ?config(enable_batch, Config),
QueryMode = ?config(query_mode, Config), QueryMode = ?config(query_mode, Config),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
@ -592,12 +581,7 @@ t_simple_sql_query(Config) ->
{ok, Res} = receive_result(Ref, 2_000), {ok, Res} = receive_result(Ref, 2_000),
Res Res
end, end,
case EnableBatch of ?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result),
true ->
?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
false ->
?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result)
end,
ok. ok.
t_missing_data(Config) -> t_missing_data(Config) ->
@ -607,27 +591,25 @@ t_missing_data(Config) ->
), ),
%% emqx_ee_connector_cassa will send missed data as a `null` atom %% emqx_ee_connector_cassa will send missed data as a `null` atom
%% to ecql driver %% to ecql driver
{_, {ok, Event}} =
?wait_async_action(
send_message(Config, #{}), send_message(Config, #{}),
#{?snk_kind := buffer_worker_flush_ack}, ?block_until(
#{
?snk_kind := buffer_worker_flush_ack,
result := {async_return, ok}
},
2_000 2_000
), ),
?assertMatch( ?block_until(
%% TODO: match error msgs
#{ #{
result := ?snk_kind := handle_async_reply_enter,
{error, {unrecoverable_error, {8704, <<"Expected 8 or 0 byte long for date (4)">>}}} result := {error, {8704, _}}
}, },
Event 2_000
), ),
ok. ok.
t_bad_sql_parameter(Config) -> t_bad_sql_parameter(Config) ->
QueryMode = ?config(query_mode, Config), QueryMode = ?config(query_mode, Config),
EnableBatch = ?config(enable_batch, Config),
Name = ?config(cassa_name, Config),
ResourceId = emqx_bridge_resource:resource_id(cassandra, Name),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge( create_bridge(
@ -656,14 +638,7 @@ t_bad_sql_parameter(Config) ->
ct:fail("no response received") ct:fail("no response received")
end end
end, end,
case EnableBatch of ?assertMatch({error, _}, Result),
true ->
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false ->
?assertMatch(
{error, {unrecoverable_error, _}}, Result
)
end,
ok. ok.
t_nasty_sql_string(Config) -> t_nasty_sql_string(Config) ->

View File

@ -33,8 +33,9 @@
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
%% TODO: not_supported_now on_query_async/4,
%%on_batch_query/3, on_batch_query/3,
on_batch_query_async/4,
on_get_status/2 on_get_status/2
]). ]).
@ -45,7 +46,7 @@
]). ]).
%% callbacks for query executing %% callbacks for query executing
-export([query/3, prepared_query/3]). -export([query/4, prepared_query/4, batch_query/3]).
-export([do_get_status/1]). -export([do_get_status/1]).
@ -96,7 +97,7 @@ keyspace(_) -> undefined.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% callbacks for emqx_resource %% callbacks for emqx_resource
callback_mode() -> always_sync. callback_mode() -> async_if_possible.
-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}. -spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
on_start( on_start(
@ -172,6 +173,28 @@ on_stop(InstId, #{poolname := PoolName}) ->
on_query( on_query(
InstId, InstId,
Request, Request,
State
) ->
do_signle_query(InstId, Request, sync, State).
-spec on_query_async(
emqx_resource:resource_id(),
request(),
{function(), list()},
state()
) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
on_query_async(
InstId,
Request,
Callback,
State
) ->
do_signle_query(InstId, Request, {async, Callback}, State).
do_signle_query(
InstId,
Request,
Async,
#{poolname := PoolName} = State #{poolname := PoolName} = State
) -> ) ->
{Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request), {Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request),
@ -187,7 +210,59 @@ on_query(
} }
), ),
{PreparedKeyOrSQL1, Data} = proc_cql_params(Type, PreparedKeyOrSQL, Params, State), {PreparedKeyOrSQL1, Data} = proc_cql_params(Type, PreparedKeyOrSQL, Params, State),
Res = exec_cql_query(InstId, PoolName, Type, PreparedKeyOrSQL1, Data), Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrSQL1, Data),
handle_result(Res).
-spec on_batch_query(
emqx_resource:resource_id(),
[request()],
state()
) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
on_batch_query(
InstId,
Requests,
State
) ->
do_batch_query(InstId, Requests, sync, State).
-spec on_batch_query_async(
emqx_resource:resource_id(),
[request()],
{function(), list()},
state()
) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
on_batch_query_async(
InstId,
Requests,
Callback,
State
) ->
do_batch_query(InstId, Requests, {async, Callback}, State).
do_batch_query(
InstId,
Requests,
Async,
#{poolname := PoolName} = State
) ->
CQLs =
lists:map(
fun(Request) ->
{Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request),
proc_cql_params(Type, PreparedKeyOrSQL, Params, State)
end,
Requests
),
?tp(
debug,
cassandra_connector_received_cql_batch_query,
#{
connector => InstId,
cqls => CQLs,
state => State
}
),
Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs),
handle_result(Res). handle_result(Res).
parse_request_to_cql({send_message, Params}) -> parse_request_to_cql({send_message, Params}) ->
@ -203,17 +278,32 @@ proc_cql_params(
Params, Params,
#{prepare_statement := Prepares, params_tokens := ParamsTokens} #{prepare_statement := Prepares, params_tokens := ParamsTokens}
) -> ) ->
PreparedKey = maps:get(PreparedKey0, Prepares), %% assert
_PreparedKey = maps:get(PreparedKey0, Prepares),
Tokens = maps:get(PreparedKey0, ParamsTokens), Tokens = maps:get(PreparedKey0, ParamsTokens),
{PreparedKey, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}; {PreparedKey0, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))};
proc_cql_params(query, SQL, Params, _State) -> proc_cql_params(query, SQL, Params, _State) ->
{SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'), {SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'),
{SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}. {SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}.
exec_cql_query(InstId, PoolName, Type, PreparedKey, Data) when exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
Type == query; Type == prepared_query Type == query; Type == prepared_query
-> ->
case ecpool:pick_and_do(PoolName, {?MODULE, Type, [PreparedKey, Data]}, no_handover) of case ecpool:pick_and_do(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}, no_handover) of
{error, Reason} = Result ->
?tp(
error,
cassandra_connector_query_return,
#{connector => InstId, error => Reason}
),
Result;
Result ->
?tp(debug, cassandra_connector_query_return, #{result => Result}),
Result
end.
exec_cql_batch_query(InstId, PoolName, Async, CQLs) ->
case ecpool:pick_and_do(PoolName, {?MODULE, batch_query, [Async, CQLs]}, no_handover) of
{error, Reason} = Result -> {error, Reason} = Result ->
?tp( ?tp(
error, error,
@ -261,11 +351,20 @@ do_check_prepares(State = #{poolname := PoolName, prepare_cql := {error, Prepare
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% callbacks query %% callbacks query
query(Conn, SQL, Params) -> query(Conn, sync, CQL, Params) ->
ecql:query(Conn, SQL, Params). ecql:query(Conn, CQL, Params);
query(Conn, {async, Callback}, CQL, Params) ->
ecql:async_query(Conn, CQL, Params, one, Callback).
prepared_query(Conn, PreparedKey, Params) -> prepared_query(Conn, sync, PreparedKey, Params) ->
ecql:execute(Conn, PreparedKey, Params). ecql:execute(Conn, PreparedKey, Params);
prepared_query(Conn, {async, Callback}, PreparedKey, Params) ->
ecql:async_execute(Conn, PreparedKey, Params, Callback).
batch_query(Conn, sync, Rows) ->
ecql:batch(Conn, Rows);
batch_query(Conn, {async, Callback}, Rows) ->
ecql:async_batch(Conn, Rows, Callback).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% callbacks for ecpool %% callbacks for ecpool