feat: add test cases for batching query

This commit is contained in:
Shawn 2022-08-01 13:06:28 +08:00
parent 8f0954837b
commit a2afdeeb48
3 changed files with 113 additions and 21 deletions

View File

@ -14,5 +14,5 @@ the config operations (like config validation, config dump back to files), and t
And we put all the `specific` codes to the callback modules. And we put all the `specific` codes to the callback modules.
See See
* `test/emqx_test_resource.erl` for a minimal `emqx_resource` implementation; * `test/emqx_connector_demo.erl` for a minimal `emqx_resource` implementation;
* `test/emqx_resource_SUITE.erl` for examples of `emqx_resource` usage. * `test/emqx_resource_SUITE.erl` for examples of `emqx_resource` usage.

View File

@ -14,7 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_test_resource). -module(emqx_connector_demo).
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
@ -25,9 +25,12 @@
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_batch_query/3,
on_get_status/2 on_get_status/2
]). ]).
-export([counter_loop/1]).
%% callbacks for emqx_resource config schema %% callbacks for emqx_resource config schema
-export([roots/0]). -export([roots/0]).
@ -53,19 +56,19 @@ on_start(InstId, #{name := Name, stop_error := true} = Opts) ->
{ok, Opts#{ {ok, Opts#{
id => InstId, id => InstId,
stop_error => true, stop_error => true,
pid => spawn_dummy_process(Name, Register) pid => spawn_counter_process(Name, Register)
}}; }};
on_start(InstId, #{name := Name} = Opts) -> on_start(InstId, #{name := Name} = Opts) ->
Register = maps:get(register, Opts, false), Register = maps:get(register, Opts, false),
{ok, Opts#{ {ok, Opts#{
id => InstId, id => InstId,
pid => spawn_dummy_process(Name, Register) pid => spawn_counter_process(Name, Register)
}}; }};
on_start(InstId, #{name := Name} = Opts) -> on_start(InstId, #{name := Name} = Opts) ->
Register = maps:get(register, Opts, false), Register = maps:get(register, Opts, false),
{ok, Opts#{ {ok, Opts#{
id => InstId, id => InstId,
pid => spawn_dummy_process(Name, Register) pid => spawn_counter_process(Name, Register)
}}. }}.
on_stop(_InstId, #{stop_error := true}) -> on_stop(_InstId, #{stop_error := true}) ->
@ -77,7 +80,44 @@ on_stop(_InstId, #{pid := Pid}) ->
on_query(_InstId, get_state, State) -> on_query(_InstId, get_state, State) ->
{ok, State}; {ok, State};
on_query(_InstId, get_state_failed, State) -> on_query(_InstId, get_state_failed, State) ->
{error, State}. {error, State};
on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
Pid ! {inc, N},
ok;
on_query(_InstId, get_counter, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, get},
receive
{ReqRef, Num} -> {ok, Num}
after 1000 ->
{error, timeout}
end.
on_batch_query(InstId, BatchReq, State) ->
%% Requests can be either 'get_counter' or 'inc_counter', but cannot be mixed.
case hd(BatchReq) of
{_From, {inc_counter, _}} ->
batch_inc_counter(InstId, BatchReq, State);
{_From, get_counter} ->
batch_get_counter(InstId, State)
end.
batch_inc_counter(InstId, BatchReq, State) ->
TotalN = lists:foldl(
fun
({_From, {inc_counter, N}}, Total) ->
Total + N;
({_From, Req}, _Total) ->
error({mixed_requests_not_allowed, {inc_counter, Req}})
end,
0,
BatchReq
),
on_query(InstId, {inc_counter, TotalN}, State).
batch_get_counter(InstId, State) ->
on_query(InstId, get_counter, State).
on_get_status(_InstId, #{health_check_error := true}) -> on_get_status(_InstId, #{health_check_error := true}) ->
disconnected; disconnected;
@ -88,18 +128,25 @@ on_get_status(_InstId, #{pid := Pid}) ->
false -> disconnected false -> disconnected
end. end.
spawn_dummy_process(Name, Register) -> spawn_counter_process(Name, Register) ->
ct:pal("---- Register Name: ~p", [Name]), Pid = spawn_link(?MODULE, counter_loop, [#{counter => 0}]),
spawn( true = maybe_register(Name, Pid, Register),
fun() -> Pid.
true =
case Register of counter_loop(#{counter := Num} = State) ->
true -> register(Name, self()); NewState =
_ -> true
end,
Ref = make_ref(),
receive receive
Ref -> ok {inc, N} ->
end #{counter => Num + N};
end {{FromPid, ReqRef}, get} ->
). FromPid ! {ReqRef, Num},
State
end,
counter_loop(NewState).
maybe_register(Name, Pid, true) ->
ct:pal("---- Register Name: ~p", [Name]),
ct:pal("---- whereis(): ~p", [whereis(Name)]),
erlang:register(Name, Pid);
maybe_register(_Name, _Pid, false) ->
true.

View File

@ -23,7 +23,7 @@
-include("emqx_resource.hrl"). -include("emqx_resource.hrl").
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-define(TEST_RESOURCE, emqx_test_resource). -define(TEST_RESOURCE, emqx_connector_demo).
-define(ID, <<"id">>). -define(ID, <<"id">>).
-define(DEFAULT_RESOURCE_GROUP, <<"default">>). -define(DEFAULT_RESOURCE_GROUP, <<"default">>).
-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
@ -184,6 +184,51 @@ t_query(_) ->
ok = emqx_resource:remove_local(?ID). ok = emqx_resource:remove_local(?ID).
t_query_counter(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true}
),
{ok, 0} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:query(?ID, {inc_counter, 1}),
{ok, 1} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:query(?ID, {inc_counter, 5}),
{ok, 6} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:remove_local(?ID).
t_batch_query_counter(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true, batch_enabled => true}
),
{ok, 0} = emqx_resource:query(?ID, get_counter),
Parent = self(),
Pids = [
erlang:spawn(fun() ->
ok = emqx_resource:query(?ID, {inc_counter, 1}),
Parent ! {complete, self()}
end)
|| _ <- lists:seq(1, 1000)
],
[
receive
{complete, Pid} -> ok
after 1000 ->
ct:fail({wait_for_query_timeout, Pid})
end
|| Pid <- Pids
],
{ok, 1000} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:remove_local(?ID).
t_healthy_timeout(_) -> t_healthy_timeout(_) ->
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
?ID, ?ID,