From 82550a585a4a0ce7a3e72733d67d683c03f00645 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 10 Aug 2022 00:30:42 +0800 Subject: [PATCH] fix: add test cases for query async --- apps/emqx_resource/include/emqx_resource.hrl | 6 + apps/emqx_resource/src/emqx_resource.erl | 13 +- .../src/emqx_resource_worker.erl | 48 +++++-- .../test/emqx_connector_demo.erl | 17 ++- .../test/emqx_resource_SUITE.erl | 122 +++++++++++++++--- 5 files changed, 171 insertions(+), 35 deletions(-) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index a59877a30..75cba14ad 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -23,6 +23,12 @@ -type resource_state() :: term(). -type resource_status() :: connected | disconnected | connecting | stopped. -type callback_mode() :: always_sync | async_if_possible. +-type result() :: term(). +-type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined. +-type query_opts() :: #{ + %% The key used for picking a resource worker + pick_key => term() +}. -type resource_data() :: #{ id := resource_id(), mod := module(), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index d17f4ce19..f3f2d5fb9 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -75,7 +75,10 @@ %% stop the instance stop/1, %% query the instance - query/2 + query/2, + %% query the instance without batching and queuing messages. + simple_sync_query/2, + simple_async_query/3 ]). %% Direct calls to the callback module @@ -232,6 +235,14 @@ query(ResId, Request) -> query(ResId, Request, Opts) -> emqx_resource_worker:query(ResId, Request, Opts). +-spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term(). +simple_sync_query(ResId, Request) -> + emqx_resource_worker:simple_sync_query(ResId, Request). + +-spec simple_async_query(resource_id(), Request :: term(), reply_fun()) -> Result :: term(). +simple_async_query(ResId, Request, ReplyFun) -> + emqx_resource_worker:simple_async_query(ResId, Request, ReplyFun). + -spec start(resource_id()) -> ok | {error, Reason :: term()}. start(ResId) -> start(ResId, #{}). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index e20345c2b..d19353a29 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -34,6 +34,11 @@ resume/1 ]). +-export([ + simple_sync_query/2, + simple_async_query/3 +]). + -export([ callback_mode/0, init/1, @@ -68,13 +73,7 @@ -type id() :: binary(). -type query() :: {query, from(), request()}. -type request() :: term(). --type result() :: term(). --type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined. -type from() :: pid() | reply_fun(). --type query_opts() :: #{ - %% The key used for picking a resource worker - pick_key => term() -}. -export_type([query_opts/0]). @@ -92,6 +91,19 @@ query(Id, Request, Opts) -> Timeout = maps:get(timeout, Opts, infinity), pick_call(Id, PickKey, {query, Request}, Timeout). +%% simple query the resource without batching and queuing messages. +-spec simple_sync_query(id(), request()) -> Result :: term(). +simple_sync_query(Id, Request) -> + Result = call_query(sync, Id, ?QUERY(self(), Request), 1), + _ = handle_query_result(Id, Result, false), + Result. + +-spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). +simple_async_query(Id, Request, ReplyFun) -> + Result = call_query(async, Id, ?QUERY(ReplyFun, Request), 1), + _ = handle_query_result(Id, Result, false), + Result. + -spec block(pid() | atom()) -> ok. block(ServerRef) -> gen_statem:cast(ServerRef, block). @@ -188,7 +200,7 @@ estimate_size(QItem) -> maybe_quick_return(sync, From, _ReplyFun) -> From; maybe_quick_return(async, From, ReplyFun) -> - ok = gen_statem:reply(From), + gen_statem:reply(From, ok), ReplyFun. pick_call(Id, Key, Query, Timeout) -> @@ -295,7 +307,11 @@ reply_caller(Id, Reply) -> reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) -> handle_query_result(Id, Result, BlockWorker); reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) -> - ?SAFE_CALL(ReplyFun(Result, Args)), + _ = + case Result of + {async_return, _} -> ok; + _ -> apply(ReplyFun, Args ++ [Result]) + end, handle_query_result(Id, Result, BlockWorker); reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) -> gen_statem:reply(From, Result), @@ -316,6 +332,10 @@ handle_query_result(Id, {error, _}, BlockWorker) -> handle_query_result(Id, {resource_down, _}, _BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down), true; +handle_query_result(_Id, {async_return, {resource_down, _}}, _BlockWorker) -> + true; +handle_query_result(_Id, {async_return, ok}, BlockWorker) -> + BlockWorker; handle_query_result(Id, Result, BlockWorker) -> %% assert true = is_ok_result(Result), @@ -352,16 +372,16 @@ call_query(QM, Id, Query, QueryLen) -> end ). -apply_query_fun(sync, Mod, Id, ?QUERY(_From, Request) = _Query, ResSt) -> +apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); -apply_query_fun(async, Mod, Id, ?QUERY(_From, Request) = Query, ResSt) -> +apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), ReplyFun = fun ?MODULE:reply_after_query/4, ?APPLY_RESOURCE( begin - _ = Mod:on_query_async(Id, Request, {ReplyFun, [self(), Id, Query]}, ResSt), - ok_async + Result = Mod:on_query_async(Id, Request, {ReplyFun, [self(), Id, Query]}, ResSt), + {async_return, Result} end, Request ); @@ -375,8 +395,8 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) -> ReplyFun = fun ?MODULE:batch_reply_after_query/4, ?APPLY_RESOURCE( begin - _ = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [self(), Id, Batch]}, ResSt), - ok_async + Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [self(), Id, Batch]}, ResSt), + {async_return, Result} end, Batch ). diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 40734de68..3bea71993 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -26,11 +26,12 @@ on_start/2, on_stop/2, on_query/3, + on_query_async/4, on_batch_query/3, on_get_status/2 ]). --export([counter_loop/1]). +-export([counter_loop/1, set_callback_mode/1]). %% callbacks for emqx_resource config schema -export([roots/0]). @@ -50,7 +51,12 @@ register(required) -> true; register(default) -> false; register(_) -> undefined. -callback_mode() -> always_sync. +-define(CM_KEY, {?MODULE, callback_mode}). +callback_mode() -> + persistent_term:get(?CM_KEY). + +set_callback_mode(Mode) -> + persistent_term:put(?CM_KEY, Mode). on_start(_InstId, #{create_error := true}) -> error("some error"); @@ -91,6 +97,10 @@ on_query(_InstId, get_counter, #{pid := Pid}) -> {error, timeout} end. +on_query_async(_InstId, Query, ReplyFun, State) -> + Result = on_query(_InstId, Query, State), + apply_reply(ReplyFun, Result). + on_batch_query(InstId, BatchReq, State) -> %% Requests can be either 'get_counter' or 'inc_counter', but cannot be mixed. case hd(BatchReq) of @@ -147,3 +157,6 @@ maybe_register(Name, Pid, true) -> erlang:register(Name, Pid); maybe_register(_Name, _Pid, false) -> true. + +apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) -> + apply(ReplyFun, Args ++ [Result]). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index fb2bdfd7c..5177e792c 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -28,6 +28,7 @@ -define(ID, <<"id">>). -define(DEFAULT_RESOURCE_GROUP, <<"default">>). -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). +-define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). all() -> emqx_common_test_helpers:all(?MODULE). @@ -36,6 +37,7 @@ groups() -> []. init_per_testcase(_, Config) -> + emqx_connector_demo:set_callback_mode(always_sync), Config. end_per_testcase(_, _Config) -> _ = emqx_resource:remove(?ID). @@ -213,7 +215,7 @@ t_batch_query_counter(_) -> ), ?check_trace( - #{timetrap => 10000, timeout => 1000}, + ?TRACE_OPTS, emqx_resource:query(?ID, get_counter), fun(Result, Trace) -> ?assertMatch({ok, 0}, Result), @@ -223,7 +225,7 @@ t_batch_query_counter(_) -> ), ?check_trace( - #{timetrap => 10000, timeout => 1000}, + ?TRACE_OPTS, inc_counter_in_parallel(1000), fun(Trace) -> QueryTrace = ?of_kind(call_batch_query, Trace), @@ -234,23 +236,90 @@ t_batch_query_counter(_) -> ok = emqx_resource:remove_local(?ID). -inc_counter_in_parallel(N) -> - Parent = self(), - Pids = [ - erlang:spawn(fun() -> - ok = emqx_resource:query(?ID, {inc_counter, 1}), - Parent ! {complete, self()} - end) - || _ <- lists:seq(1, N) - ], - [ - receive - {complete, Pid} -> ok - after 1000 -> - ct:fail({wait_for_query_timeout, Pid}) +t_query_counter_async(_) -> + {ok, _} = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, register => true}, + #{query_mode => async} + ), + ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), + ?check_trace( + ?TRACE_OPTS, + inc_counter_in_parallel(1000), + fun(Trace) -> + %% the callback_mode if 'emqx_connector_demo' is 'always_sync'. + QueryTrace = ?of_kind(call_query, Trace), + ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) end - || Pid <- Pids - ]. + ), + %% wait for 1s to make sure all the aysnc query is sent to the resource. + timer:sleep(1000), + %% simple query ignores the query_mode and batching settings in the resource_worker + ?check_trace( + ?TRACE_OPTS, + emqx_resource:simple_sync_query(?ID, get_counter), + fun(Result, Trace) -> + ?assertMatch({ok, 1000}, Result), + %% the callback_mode if 'emqx_connector_demo' is 'always_sync'. + QueryTrace = ?of_kind(call_query, Trace), + ?assertMatch([#{query := {query, _, get_counter}}], QueryTrace) + end + ), + {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), + ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C), + ok = emqx_resource:remove_local(?ID). + +t_query_counter_async_2(_) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + + Tab0 = ets:new(?FUNCTION_NAME, [bag, public]), + Insert = fun(Tab, Result) -> + ets:insert(Tab, {make_ref(), Result}) + end, + {ok, _} = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, register => true}, + #{query_mode => async, async_reply_fun => {Insert, [Tab0]}} + ), + ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), + ?check_trace( + ?TRACE_OPTS, + inc_counter_in_parallel(1000), + fun(Trace) -> + QueryTrace = ?of_kind(call_query_async, Trace), + ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) + end + ), + + %% wait for 1s to make sure all the aysnc query is sent to the resource. + timer:sleep(1000), + %% simple query ignores the query_mode and batching settings in the resource_worker + ?check_trace( + ?TRACE_OPTS, + emqx_resource:simple_sync_query(?ID, get_counter), + fun(Result, Trace) -> + ?assertMatch({ok, 1000}, Result), + QueryTrace = ?of_kind(call_query, Trace), + ?assertMatch([#{query := {query, _, get_counter}}], QueryTrace) + end + ), + {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), + ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C), + ?assertMatch(1000, ets:info(Tab0, size)), + ?assert( + lists:all( + fun + ({_, ok}) -> true; + (_) -> false + end, + ets:tab2list(Tab0) + ) + ), + ok = emqx_resource:remove_local(?ID). t_healthy_timeout(_) -> {ok, _} = emqx_resource:create_local( @@ -480,6 +549,23 @@ t_auto_retry(_) -> %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ +inc_counter_in_parallel(N) -> + Parent = self(), + Pids = [ + erlang:spawn(fun() -> + emqx_resource:query(?ID, {inc_counter, 1}), + Parent ! {complete, self()} + end) + || _ <- lists:seq(1, N) + ], + [ + receive + {complete, Pid} -> ok + after 1000 -> + ct:fail({wait_for_query_timeout, Pid}) + end + || Pid <- Pids + ]. bin_config() -> <<"\"name\": \"test_resource\"">>.