From efd6c56dd9554a05b8b0772856008cdae40ab6ad Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 9 Aug 2022 13:00:21 +0800 Subject: [PATCH] fix: test cases for batch query sync --- .../src/emqx_resource_worker.erl | 7 +++- .../test/emqx_connector_demo.erl | 8 ++--- .../test/emqx_resource_SUITE.erl | 35 +++++++++++++++---- 3 files changed, 38 insertions(+), 12 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 2115fed86..e20345c2b 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -22,6 +22,7 @@ -include("emqx_resource.hrl"). -include("emqx_resource_utils.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -behaviour(gen_statem). @@ -351,9 +352,11 @@ call_query(QM, Id, Query, QueryLen) -> end ). -apply_query_fun(sync, Mod, Id, ?QUERY(_From, Request), ResSt) -> +apply_query_fun(sync, Mod, Id, ?QUERY(_From, Request) = _Query, ResSt) -> + ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); apply_query_fun(async, Mod, Id, ?QUERY(_From, Request) = Query, ResSt) -> + ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), ReplyFun = fun ?MODULE:reply_after_query/4, ?APPLY_RESOURCE( begin @@ -363,9 +366,11 @@ apply_query_fun(async, Mod, Id, ?QUERY(_From, Request) = Query, ResSt) -> Request ); apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) -> + ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request) <- Batch], ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch); apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) -> + ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request) <- Batch], ReplyFun = fun ?MODULE:batch_reply_after_query/4, ?APPLY_RESOURCE( diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 740f110ec..40734de68 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -94,18 +94,18 @@ on_query(_InstId, get_counter, #{pid := Pid}) -> on_batch_query(InstId, BatchReq, State) -> %% Requests can be either 'get_counter' or 'inc_counter', but cannot be mixed. case hd(BatchReq) of - {_From, {inc_counter, _}} -> + {inc_counter, _} -> batch_inc_counter(InstId, BatchReq, State); - {_From, get_counter} -> + get_counter -> batch_get_counter(InstId, State) end. batch_inc_counter(InstId, BatchReq, State) -> TotalN = lists:foldl( fun - ({_From, {inc_counter, N}}, Total) -> + ({inc_counter, N}, Total) -> Total + N; - ({_From, Req}, _Total) -> + (Req, _Total) -> error({mixed_requests_not_allowed, {inc_counter, Req}}) end, 0, diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 4c5a36f4b..fb2bdfd7c 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("common_test/include/ct.hrl"). -include("emqx_resource.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(TEST_RESOURCE, emqx_connector_demo). -define(ID, <<"id">>). @@ -207,17 +208,40 @@ t_batch_query_counter(_) -> ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, - #{name => test_resource, register => true, batch_enabled => true} + #{name => test_resource, register => true}, + #{batch_enabled => true} ), - {ok, 0} = emqx_resource:query(?ID, get_counter), + ?check_trace( + #{timetrap => 10000, timeout => 1000}, + emqx_resource:query(?ID, get_counter), + fun(Result, Trace) -> + ?assertMatch({ok, 0}, Result), + QueryTrace = ?of_kind(call_batch_query, Trace), + ?assertMatch([#{batch := [{query, _, get_counter}]}], QueryTrace) + end + ), + + ?check_trace( + #{timetrap => 10000, timeout => 1000}, + inc_counter_in_parallel(1000), + fun(Trace) -> + QueryTrace = ?of_kind(call_batch_query, Trace), + ?assertMatch([#{batch := BatchReq} | _] when length(BatchReq) > 1, QueryTrace) + end + ), + {ok, 1000} = emqx_resource:query(?ID, get_counter), + + ok = emqx_resource:remove_local(?ID). + +inc_counter_in_parallel(N) -> Parent = self(), Pids = [ erlang:spawn(fun() -> ok = emqx_resource:query(?ID, {inc_counter, 1}), Parent ! {complete, self()} end) - || _ <- lists:seq(1, 1000) + || _ <- lists:seq(1, N) ], [ receive @@ -226,10 +250,7 @@ t_batch_query_counter(_) -> ct:fail({wait_for_query_timeout, Pid}) end || Pid <- Pids - ], - {ok, 1000} = emqx_resource:query(?ID, get_counter), - - ok = emqx_resource:remove_local(?ID). + ]. t_healthy_timeout(_) -> {ok, _} = emqx_resource:create_local(