From a2afdeeb48c3f7ee5c1313017fde4b6caf85281d Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 1 Aug 2022 13:06:28 +0800 Subject: [PATCH] feat: add test cases for batching query --- apps/emqx_resource/README.md | 2 +- ...t_resource.erl => emqx_connector_demo.erl} | 85 ++++++++++++++----- .../test/emqx_resource_SUITE.erl | 47 +++++++++- 3 files changed, 113 insertions(+), 21 deletions(-) rename apps/emqx_resource/test/{emqx_test_resource.erl => emqx_connector_demo.erl} (54%) diff --git a/apps/emqx_resource/README.md b/apps/emqx_resource/README.md index 04f3c2205..0f61df7ff 100644 --- a/apps/emqx_resource/README.md +++ b/apps/emqx_resource/README.md @@ -14,5 +14,5 @@ the config operations (like config validation, config dump back to files), and t And we put all the `specific` codes to the callback modules. See -* `test/emqx_test_resource.erl` for a minimal `emqx_resource` implementation; +* `test/emqx_connector_demo.erl` for a minimal `emqx_resource` implementation; * `test/emqx_resource_SUITE.erl` for examples of `emqx_resource` usage. diff --git a/apps/emqx_resource/test/emqx_test_resource.erl b/apps/emqx_resource/test/emqx_connector_demo.erl similarity index 54% rename from apps/emqx_resource/test/emqx_test_resource.erl rename to apps/emqx_resource/test/emqx_connector_demo.erl index 569579d27..e9c77e915 100644 --- a/apps/emqx_resource/test/emqx_test_resource.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_test_resource). +-module(emqx_connector_demo). -include_lib("typerefl/include/types.hrl"). @@ -25,9 +25,12 @@ on_start/2, on_stop/2, on_query/3, + on_batch_query/3, on_get_status/2 ]). +-export([counter_loop/1]). + %% callbacks for emqx_resource config schema -export([roots/0]). @@ -53,19 +56,19 @@ on_start(InstId, #{name := Name, stop_error := true} = Opts) -> {ok, Opts#{ id => InstId, stop_error => true, - pid => spawn_dummy_process(Name, Register) + pid => spawn_counter_process(Name, Register) }}; on_start(InstId, #{name := Name} = Opts) -> Register = maps:get(register, Opts, false), {ok, Opts#{ id => InstId, - pid => spawn_dummy_process(Name, Register) + pid => spawn_counter_process(Name, Register) }}; on_start(InstId, #{name := Name} = Opts) -> Register = maps:get(register, Opts, false), {ok, Opts#{ id => InstId, - pid => spawn_dummy_process(Name, Register) + pid => spawn_counter_process(Name, Register) }}. on_stop(_InstId, #{stop_error := true}) -> @@ -77,7 +80,44 @@ on_stop(_InstId, #{pid := Pid}) -> on_query(_InstId, get_state, State) -> {ok, State}; on_query(_InstId, get_state_failed, State) -> - {error, State}. + {error, State}; +on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> + Pid ! {inc, N}, + ok; +on_query(_InstId, get_counter, #{pid := Pid}) -> + ReqRef = make_ref(), + From = {self(), ReqRef}, + Pid ! {From, get}, + receive + {ReqRef, Num} -> {ok, Num} + after 1000 -> + {error, timeout} + end. + +on_batch_query(InstId, BatchReq, State) -> + %% Requests can be either 'get_counter' or 'inc_counter', but cannot be mixed. + case hd(BatchReq) of + {_From, {inc_counter, _}} -> + batch_inc_counter(InstId, BatchReq, State); + {_From, get_counter} -> + batch_get_counter(InstId, State) + end. + +batch_inc_counter(InstId, BatchReq, State) -> + TotalN = lists:foldl( + fun + ({_From, {inc_counter, N}}, Total) -> + Total + N; + ({_From, Req}, _Total) -> + error({mixed_requests_not_allowed, {inc_counter, Req}}) + end, + 0, + BatchReq + ), + on_query(InstId, {inc_counter, TotalN}, State). + +batch_get_counter(InstId, State) -> + on_query(InstId, get_counter, State). on_get_status(_InstId, #{health_check_error := true}) -> disconnected; @@ -88,18 +128,25 @@ on_get_status(_InstId, #{pid := Pid}) -> false -> disconnected end. -spawn_dummy_process(Name, Register) -> +spawn_counter_process(Name, Register) -> + Pid = spawn_link(?MODULE, counter_loop, [#{counter => 0}]), + true = maybe_register(Name, Pid, Register), + Pid. + +counter_loop(#{counter := Num} = State) -> + NewState = + receive + {inc, N} -> + #{counter => Num + N}; + {{FromPid, ReqRef}, get} -> + FromPid ! {ReqRef, Num}, + State + end, + counter_loop(NewState). + +maybe_register(Name, Pid, true) -> ct:pal("---- Register Name: ~p", [Name]), - spawn( - fun() -> - true = - case Register of - true -> register(Name, self()); - _ -> true - end, - Ref = make_ref(), - receive - Ref -> ok - end - end - ). + ct:pal("---- whereis(): ~p", [whereis(Name)]), + erlang:register(Name, Pid); +maybe_register(_Name, _Pid, false) -> + true. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 278f556ef..d05d4baf7 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -23,7 +23,7 @@ -include("emqx_resource.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). --define(TEST_RESOURCE, emqx_test_resource). +-define(TEST_RESOURCE, emqx_connector_demo). -define(ID, <<"id">>). -define(DEFAULT_RESOURCE_GROUP, <<"default">>). -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). @@ -184,6 +184,51 @@ t_query(_) -> ok = emqx_resource:remove_local(?ID). +t_query_counter(_) -> + {ok, _} = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, register => true} + ), + + {ok, 0} = emqx_resource:query(?ID, get_counter), + ok = emqx_resource:query(?ID, {inc_counter, 1}), + {ok, 1} = emqx_resource:query(?ID, get_counter), + ok = emqx_resource:query(?ID, {inc_counter, 5}), + {ok, 6} = emqx_resource:query(?ID, get_counter), + + ok = emqx_resource:remove_local(?ID). + +t_batch_query_counter(_) -> + {ok, _} = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, register => true, batch_enabled => true} + ), + + {ok, 0} = emqx_resource:query(?ID, get_counter), + Parent = self(), + Pids = [ + erlang:spawn(fun() -> + ok = emqx_resource:query(?ID, {inc_counter, 1}), + Parent ! {complete, self()} + end) + || _ <- lists:seq(1, 1000) + ], + [ + receive + {complete, Pid} -> ok + after 1000 -> + ct:fail({wait_for_query_timeout, Pid}) + end + || Pid <- Pids + ], + {ok, 1000} = emqx_resource:query(?ID, get_counter), + + ok = emqx_resource:remove_local(?ID). + t_healthy_timeout(_) -> {ok, _} = emqx_resource:create_local( ?ID,