fix: test cases for batch query sync

This commit is contained in:
Shawn 2022-08-09 13:00:21 +08:00
parent 145ff66a9a
commit efd6c56dd9
3 changed files with 38 additions and 12 deletions

View File

@ -22,6 +22,7 @@
-include("emqx_resource.hrl"). -include("emqx_resource.hrl").
-include("emqx_resource_utils.hrl"). -include("emqx_resource_utils.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(gen_statem). -behaviour(gen_statem).
@ -351,9 +352,11 @@ call_query(QM, Id, Query, QueryLen) ->
end end
). ).
apply_query_fun(sync, Mod, Id, ?QUERY(_From, Request), ResSt) -> apply_query_fun(sync, Mod, Id, ?QUERY(_From, Request) = _Query, ResSt) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request);
apply_query_fun(async, Mod, Id, ?QUERY(_From, Request) = Query, ResSt) -> apply_query_fun(async, Mod, Id, ?QUERY(_From, Request) = Query, ResSt) ->
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
ReplyFun = fun ?MODULE:reply_after_query/4, ReplyFun = fun ?MODULE:reply_after_query/4,
?APPLY_RESOURCE( ?APPLY_RESOURCE(
begin begin
@ -363,9 +366,11 @@ apply_query_fun(async, Mod, Id, ?QUERY(_From, Request) = Query, ResSt) ->
Request Request
); );
apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) -> apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) ->
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Requests = [Request || ?QUERY(_From, Request) <- Batch], Requests = [Request || ?QUERY(_From, Request) <- Batch],
?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch); ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch);
apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) -> apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) ->
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Requests = [Request || ?QUERY(_From, Request) <- Batch], Requests = [Request || ?QUERY(_From, Request) <- Batch],
ReplyFun = fun ?MODULE:batch_reply_after_query/4, ReplyFun = fun ?MODULE:batch_reply_after_query/4,
?APPLY_RESOURCE( ?APPLY_RESOURCE(

View File

@ -94,18 +94,18 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
on_batch_query(InstId, BatchReq, State) -> on_batch_query(InstId, BatchReq, State) ->
%% Requests can be either 'get_counter' or 'inc_counter', but cannot be mixed. %% Requests can be either 'get_counter' or 'inc_counter', but cannot be mixed.
case hd(BatchReq) of case hd(BatchReq) of
{_From, {inc_counter, _}} -> {inc_counter, _} ->
batch_inc_counter(InstId, BatchReq, State); batch_inc_counter(InstId, BatchReq, State);
{_From, get_counter} -> get_counter ->
batch_get_counter(InstId, State) batch_get_counter(InstId, State)
end. end.
batch_inc_counter(InstId, BatchReq, State) -> batch_inc_counter(InstId, BatchReq, State) ->
TotalN = lists:foldl( TotalN = lists:foldl(
fun fun
({_From, {inc_counter, N}}, Total) -> ({inc_counter, N}, Total) ->
Total + N; Total + N;
({_From, Req}, _Total) -> (Req, _Total) ->
error({mixed_requests_not_allowed, {inc_counter, Req}}) error({mixed_requests_not_allowed, {inc_counter, Req}})
end, end,
0, 0,

View File

@ -22,6 +22,7 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include("emqx_resource.hrl"). -include("emqx_resource.hrl").
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(TEST_RESOURCE, emqx_connector_demo). -define(TEST_RESOURCE, emqx_connector_demo).
-define(ID, <<"id">>). -define(ID, <<"id">>).
@ -207,17 +208,40 @@ t_batch_query_counter(_) ->
?ID, ?ID,
?DEFAULT_RESOURCE_GROUP, ?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource, register => true, batch_enabled => true} #{name => test_resource, register => true},
#{batch_enabled => true}
), ),
{ok, 0} = emqx_resource:query(?ID, get_counter), ?check_trace(
#{timetrap => 10000, timeout => 1000},
emqx_resource:query(?ID, get_counter),
fun(Result, Trace) ->
?assertMatch({ok, 0}, Result),
QueryTrace = ?of_kind(call_batch_query, Trace),
?assertMatch([#{batch := [{query, _, get_counter}]}], QueryTrace)
end
),
?check_trace(
#{timetrap => 10000, timeout => 1000},
inc_counter_in_parallel(1000),
fun(Trace) ->
QueryTrace = ?of_kind(call_batch_query, Trace),
?assertMatch([#{batch := BatchReq} | _] when length(BatchReq) > 1, QueryTrace)
end
),
{ok, 1000} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:remove_local(?ID).
inc_counter_in_parallel(N) ->
Parent = self(), Parent = self(),
Pids = [ Pids = [
erlang:spawn(fun() -> erlang:spawn(fun() ->
ok = emqx_resource:query(?ID, {inc_counter, 1}), ok = emqx_resource:query(?ID, {inc_counter, 1}),
Parent ! {complete, self()} Parent ! {complete, self()}
end) end)
|| _ <- lists:seq(1, 1000) || _ <- lists:seq(1, N)
], ],
[ [
receive receive
@ -226,10 +250,7 @@ t_batch_query_counter(_) ->
ct:fail({wait_for_query_timeout, Pid}) ct:fail({wait_for_query_timeout, Pid})
end end
|| Pid <- Pids || Pid <- Pids
], ].
{ok, 1000} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:remove_local(?ID).
t_healthy_timeout(_) -> t_healthy_timeout(_) ->
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(