Merge pull request #6780 from EMQ-YangM/moreTest
test(emqx_resource): improve emqx_resource test coverage from 73% to 80%
This commit is contained in:
commit
6affac0fe9
|
@ -1,23 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
-module(emqx_resource_api).
|
|
||||||
|
|
||||||
-export([stringify/1]).
|
|
||||||
|
|
||||||
stringify(Bin) when is_binary(Bin) -> Bin;
|
|
||||||
stringify(Str) when is_list(Str) -> list_to_binary(Str);
|
|
||||||
stringify(Reason) ->
|
|
||||||
iolist_to_binary(io_lib:format("~p", [Reason])).
|
|
|
@ -15,38 +15,38 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_resource_health_check).
|
-module(emqx_resource_health_check).
|
||||||
|
|
||||||
-export([ start_link/2
|
-export([ start_link/3
|
||||||
, create_checker/2
|
, create_checker/3
|
||||||
, delete_checker/1
|
, delete_checker/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ start_health_check/2
|
-export([ start_health_check/3
|
||||||
, health_check_timeout_checker/3
|
, health_check_timeout_checker/4
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(SUP, emqx_resource_health_check_sup).
|
-define(SUP, emqx_resource_health_check_sup).
|
||||||
-define(ID(NAME), {resource_health_check, NAME}).
|
-define(ID(NAME), {resource_health_check, NAME}).
|
||||||
|
|
||||||
child_spec(Name, Sleep) ->
|
child_spec(Name, Sleep, Timeout) ->
|
||||||
#{id => ?ID(Name),
|
#{id => ?ID(Name),
|
||||||
start => {?MODULE, start_link, [Name, Sleep]},
|
start => {?MODULE, start_link, [Name, Sleep, Timeout]},
|
||||||
restart => transient,
|
restart => transient,
|
||||||
shutdown => 5000, type => worker, modules => [?MODULE]}.
|
shutdown => 5000, type => worker, modules => [?MODULE]}.
|
||||||
|
|
||||||
start_link(Name, Sleep) ->
|
start_link(Name, Sleep, Timeout) ->
|
||||||
Pid = proc_lib:spawn_link(?MODULE, start_health_check, [Name, Sleep]),
|
Pid = proc_lib:spawn_link(?MODULE, start_health_check, [Name, Sleep, Timeout]),
|
||||||
{ok, Pid}.
|
{ok, Pid}.
|
||||||
|
|
||||||
create_checker(Name, Sleep) ->
|
create_checker(Name, Sleep, Timeout) ->
|
||||||
create_checker(Name, Sleep, false).
|
create_checker(Name, Sleep, false, Timeout).
|
||||||
|
|
||||||
create_checker(Name, Sleep, Retry) ->
|
create_checker(Name, Sleep, Retry, Timeout) ->
|
||||||
case supervisor:start_child(?SUP, child_spec(Name, Sleep)) of
|
case supervisor:start_child(?SUP, child_spec(Name, Sleep, Timeout)) of
|
||||||
{ok, _} -> ok;
|
{ok, _} -> ok;
|
||||||
{error, already_present} -> ok;
|
{error, already_present} -> ok;
|
||||||
{error, {already_started, _}} when Retry == false ->
|
{error, {already_started, _}} when Retry == false ->
|
||||||
ok = delete_checker(Name),
|
ok = delete_checker(Name),
|
||||||
create_checker(Name, Sleep, true);
|
create_checker(Name, Sleep, true, Timeout);
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -56,9 +56,9 @@ delete_checker(Name) ->
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
start_health_check(Name, Sleep) ->
|
start_health_check(Name, Sleep, Timeout) ->
|
||||||
Pid = self(),
|
Pid = self(),
|
||||||
_ = proc_lib:spawn_link(?MODULE, health_check_timeout_checker, [Pid, Name, Sleep]),
|
_ = proc_lib:spawn_link(?MODULE, health_check_timeout_checker, [Pid, Name, Sleep, Timeout]),
|
||||||
health_check(Name).
|
health_check(Name).
|
||||||
|
|
||||||
health_check(Name) ->
|
health_check(Name) ->
|
||||||
|
@ -75,12 +75,12 @@ health_check(Name) ->
|
||||||
end,
|
end,
|
||||||
health_check(Name).
|
health_check(Name).
|
||||||
|
|
||||||
health_check_timeout_checker(Pid, Name, SleepTime) ->
|
health_check_timeout_checker(Pid, Name, SleepTime, Timeout) ->
|
||||||
SelfPid = self(),
|
SelfPid = self(),
|
||||||
Pid ! {SelfPid, begin_health_check},
|
Pid ! {SelfPid, begin_health_check},
|
||||||
receive
|
receive
|
||||||
health_check_finish -> timer:sleep(SleepTime)
|
health_check_finish -> timer:sleep(SleepTime)
|
||||||
after 10000 ->
|
after Timeout ->
|
||||||
emqx_alarm:activate(Name, #{name => Name},
|
emqx_alarm:activate(Name, #{name => Name},
|
||||||
<<Name/binary, " health check timout">>),
|
<<Name/binary, " health check timout">>),
|
||||||
emqx_resource:set_resource_status_stoped(Name),
|
emqx_resource:set_resource_status_stoped(Name),
|
||||||
|
@ -88,4 +88,4 @@ health_check_timeout_checker(Pid, Name, SleepTime) ->
|
||||||
health_check_finish -> timer:sleep(SleepTime)
|
health_check_finish -> timer:sleep(SleepTime)
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
health_check_timeout_checker(Pid, Name, SleepTime).
|
health_check_timeout_checker(Pid, Name, SleepTime, Timeout).
|
||||||
|
|
|
@ -249,7 +249,8 @@ start_and_check(InstId, ResourceType, Config, Opts, Data) ->
|
||||||
case maps:get(async_create, Opts, false) of
|
case maps:get(async_create, Opts, false) of
|
||||||
false -> do_health_check(Data2);
|
false -> do_health_check(Data2);
|
||||||
true -> emqx_resource_health_check:create_checker(InstId,
|
true -> emqx_resource_health_check:create_checker(InstId,
|
||||||
maps:get(health_check_interval, Opts, 15000))
|
maps:get(health_check_interval, Opts, 15000),
|
||||||
|
maps:get(health_check_timeout, Opts, 10000))
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
|
|
||||||
-export([ min/2
|
-export([ min/2
|
||||||
, max/2
|
, max/2
|
||||||
, equals/2
|
|
||||||
, enum/1
|
|
||||||
, not_empty/1
|
, not_empty/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -29,15 +27,6 @@ max(Type, Max) ->
|
||||||
min(Type, Min) ->
|
min(Type, Min) ->
|
||||||
limit(Type, '>=', Min).
|
limit(Type, '>=', Min).
|
||||||
|
|
||||||
equals(Type, Expected) ->
|
|
||||||
limit(Type, '==', Expected).
|
|
||||||
|
|
||||||
enum(Items) ->
|
|
||||||
fun(Value) ->
|
|
||||||
return(lists:member(Value, Items),
|
|
||||||
err_limit({enum, {is_member_of, Items}, {got, Value}}))
|
|
||||||
end.
|
|
||||||
|
|
||||||
not_empty(ErrMsg) ->
|
not_empty(ErrMsg) ->
|
||||||
fun(<<>>) -> {error, ErrMsg};
|
fun(<<>>) -> {error, ErrMsg};
|
||||||
(_) -> ok
|
(_) -> ok
|
||||||
|
|
|
@ -35,12 +35,12 @@ init_per_testcase(_, Config) ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
code:ensure_loaded(?TEST_RESOURCE),
|
code:ensure_loaded(?TEST_RESOURCE),
|
||||||
ok = emqx_common_test_helpers:start_apps([]),
|
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||||
{ok, _} = application:ensure_all_started(emqx_resource),
|
{ok, _} = application:ensure_all_started(emqx_resource),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok = emqx_common_test_helpers:stop_apps([emqx_resource]).
|
ok = emqx_common_test_helpers:stop_apps([emqx_resource, emqx_conf]).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Tests
|
%% Tests
|
||||||
|
@ -62,15 +62,53 @@ t_create_remove(_) ->
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{unknown => test_resource}),
|
#{unknown => test_resource}),
|
||||||
|
|
||||||
|
{ok, _} = emqx_resource:create(
|
||||||
|
?ID,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource}),
|
||||||
|
|
||||||
|
emqx_resource:recreate(
|
||||||
|
?ID,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource},
|
||||||
|
#{}),
|
||||||
|
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
||||||
|
|
||||||
|
?assert(is_process_alive(Pid)),
|
||||||
|
|
||||||
|
ok = emqx_resource:remove(?ID),
|
||||||
|
{error, _} = emqx_resource:remove(?ID),
|
||||||
|
|
||||||
|
?assertNot(is_process_alive(Pid)).
|
||||||
|
|
||||||
|
t_create_remove_local(_) ->
|
||||||
|
{error, _} = emqx_resource:check_and_create_local(
|
||||||
|
?ID,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{unknown => test_resource}),
|
||||||
|
|
||||||
{ok, _} = emqx_resource:create_local(
|
{ok, _} = emqx_resource:create_local(
|
||||||
?ID,
|
?ID,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{name => test_resource}),
|
#{name => test_resource}),
|
||||||
|
|
||||||
|
emqx_resource:recreate_local(
|
||||||
|
?ID,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource},
|
||||||
|
#{}),
|
||||||
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
||||||
|
|
||||||
?assert(is_process_alive(Pid)),
|
?assert(is_process_alive(Pid)),
|
||||||
|
|
||||||
|
emqx_resource:set_resource_status_stoped(?ID),
|
||||||
|
|
||||||
|
emqx_resource:recreate_local(
|
||||||
|
?ID,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource},
|
||||||
|
#{}),
|
||||||
|
|
||||||
ok = emqx_resource:remove_local(?ID),
|
ok = emqx_resource:remove_local(?ID),
|
||||||
{error, _} = emqx_resource:remove_local(?ID),
|
{error, _} = emqx_resource:remove_local(?ID),
|
||||||
|
|
||||||
|
@ -88,6 +126,8 @@ t_query(_) ->
|
||||||
|
|
||||||
#{pid := _} = emqx_resource:query(?ID, get_state),
|
#{pid := _} = emqx_resource:query(?ID, get_state),
|
||||||
#{pid := _} = emqx_resource:query(?ID, get_state, {[{Success, []}], [{Failure, []}]}),
|
#{pid := _} = emqx_resource:query(?ID, get_state, {[{Success, []}], [{Failure, []}]}),
|
||||||
|
#{pid := _} = emqx_resource:query(?ID, get_state, undefined),
|
||||||
|
#{pid := _} = emqx_resource:query(?ID, get_state_failed, undefined),
|
||||||
|
|
||||||
receive
|
receive
|
||||||
Message -> ?assertEqual(success, Message)
|
Message -> ?assertEqual(success, Message)
|
||||||
|
@ -100,14 +140,28 @@ t_query(_) ->
|
||||||
|
|
||||||
ok = emqx_resource:remove_local(?ID).
|
ok = emqx_resource:remove_local(?ID).
|
||||||
|
|
||||||
|
t_healthy_timeout(_) ->
|
||||||
|
{ok, _} = emqx_resource:create_local(
|
||||||
|
?ID,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => <<"test_resource">>},
|
||||||
|
#{async_create => true, health_check_timeout => 200}),
|
||||||
|
timer:sleep(500),
|
||||||
|
|
||||||
|
ok = emqx_resource:remove_local(?ID).
|
||||||
|
|
||||||
t_healthy(_) ->
|
t_healthy(_) ->
|
||||||
{ok, _} = emqx_resource:create_local(
|
{ok, _} = emqx_resource:create_local(
|
||||||
?ID,
|
?ID,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{name => <<"test_resource">>}, #{async_create => true}),
|
#{name => <<"test_resource">>},
|
||||||
timer:sleep(300),
|
#{async_create => true}),
|
||||||
emqx_resource_health_check:create_checker(?ID, 15000),
|
timer:sleep(400),
|
||||||
|
|
||||||
|
emqx_resource_health_check:create_checker(?ID, 15000, 10000),
|
||||||
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
||||||
|
timer:sleep(300),
|
||||||
|
emqx_resource:set_resource_status_stoped(?ID),
|
||||||
|
|
||||||
ok = emqx_resource:health_check(?ID),
|
ok = emqx_resource:health_check(?ID),
|
||||||
|
|
||||||
|
@ -128,15 +182,55 @@ t_healthy(_) ->
|
||||||
ok = emqx_resource:remove_local(?ID).
|
ok = emqx_resource:remove_local(?ID).
|
||||||
|
|
||||||
t_stop_start(_) ->
|
t_stop_start(_) ->
|
||||||
|
{error, _} = emqx_resource:check_and_create(
|
||||||
|
?ID,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{unknown => test_resource}),
|
||||||
|
|
||||||
|
{ok, _} = emqx_resource:check_and_create(
|
||||||
|
?ID,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{<<"name">> => <<"test_resource">>}),
|
||||||
|
|
||||||
|
{ok, _} = emqx_resource:check_and_recreate(
|
||||||
|
?ID,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{<<"name">> => <<"test_resource">>},
|
||||||
|
#{}),
|
||||||
|
|
||||||
|
#{pid := Pid0} = emqx_resource:query(?ID, get_state),
|
||||||
|
|
||||||
|
?assert(is_process_alive(Pid0)),
|
||||||
|
|
||||||
|
ok = emqx_resource:stop(?ID),
|
||||||
|
|
||||||
|
?assertNot(is_process_alive(Pid0)),
|
||||||
|
|
||||||
|
?assertMatch({error, {emqx_resource, #{reason := stopped}}},
|
||||||
|
emqx_resource:query(?ID, get_state)),
|
||||||
|
|
||||||
|
ok = emqx_resource:restart(?ID),
|
||||||
|
|
||||||
|
#{pid := Pid1} = emqx_resource:query(?ID, get_state),
|
||||||
|
|
||||||
|
?assert(is_process_alive(Pid1)).
|
||||||
|
|
||||||
|
t_stop_start_local(_) ->
|
||||||
{error, _} = emqx_resource:check_and_create_local(
|
{error, _} = emqx_resource:check_and_create_local(
|
||||||
?ID,
|
?ID,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{unknown => test_resource}),
|
#{unknown => test_resource}),
|
||||||
|
|
||||||
{ok, _} = emqx_resource:create_local(
|
{ok, _} = emqx_resource:check_and_create_local(
|
||||||
?ID,
|
?ID,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{name => test_resource}),
|
#{<<"name">> => <<"test_resource">>}),
|
||||||
|
|
||||||
|
{ok, _} = emqx_resource:check_and_recreate_local(
|
||||||
|
?ID,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{<<"name">> => <<"test_resource">>},
|
||||||
|
#{}),
|
||||||
|
|
||||||
#{pid := Pid0} = emqx_resource:query(?ID, get_state),
|
#{pid := Pid0} = emqx_resource:query(?ID, get_state),
|
||||||
|
|
||||||
|
@ -184,6 +278,25 @@ t_create_dry_run_local(_) ->
|
||||||
|
|
||||||
?assertEqual(undefined, whereis(test_resource)).
|
?assertEqual(undefined, whereis(test_resource)).
|
||||||
|
|
||||||
|
t_create_dry_run_local_failed(_) ->
|
||||||
|
{Res, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE,
|
||||||
|
#{cteate_error => true}),
|
||||||
|
?assertEqual(error, Res),
|
||||||
|
|
||||||
|
{Res, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE,
|
||||||
|
#{name => test_resource, health_check_error => true}),
|
||||||
|
?assertEqual(error, Res),
|
||||||
|
|
||||||
|
{Res, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE,
|
||||||
|
#{name => test_resource, stop_error => true}),
|
||||||
|
?assertEqual(error, Res).
|
||||||
|
|
||||||
|
t_test_func(_) ->
|
||||||
|
?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),
|
||||||
|
?assertEqual(ok, erlang:apply(emqx_resource_validator:min(int, 3), [4])),
|
||||||
|
?assertEqual(ok, erlang:apply(emqx_resource_validator:max(array, 10), [[a,b,c,d]])),
|
||||||
|
?assertEqual(ok, erlang:apply(emqx_resource_validator:max(string, 10), ["less10"])).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Helpers
|
%% Helpers
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -43,21 +43,43 @@ register(nullable) -> false;
|
||||||
register(default) -> false;
|
register(default) -> false;
|
||||||
register(_) -> undefined.
|
register(_) -> undefined.
|
||||||
|
|
||||||
|
on_start(_InstId, #{create_error := true}) ->
|
||||||
|
error("some error");
|
||||||
|
on_start(InstId, #{name := Name, stop_error := true} = Opts) ->
|
||||||
|
Register = maps:get(register, Opts, false),
|
||||||
|
{ok, #{name => Name,
|
||||||
|
id => InstId,
|
||||||
|
stop_error => true,
|
||||||
|
pid => spawn_dummy_process(Name, Register)}};
|
||||||
|
on_start(InstId, #{name := Name, health_check_error := true} = Opts) ->
|
||||||
|
Register = maps:get(register, Opts, false),
|
||||||
|
{ok, #{name => Name,
|
||||||
|
id => InstId,
|
||||||
|
health_check_error => true,
|
||||||
|
pid => spawn_dummy_process(Name, Register)}};
|
||||||
on_start(InstId, #{name := Name} = Opts) ->
|
on_start(InstId, #{name := Name} = Opts) ->
|
||||||
Register = maps:get(register, Opts, false),
|
Register = maps:get(register, Opts, false),
|
||||||
{ok, #{name => Name,
|
{ok, #{name => Name,
|
||||||
id => InstId,
|
id => InstId,
|
||||||
pid => spawn_dummy_process(Name, Register)}}.
|
pid => spawn_dummy_process(Name, Register)}}.
|
||||||
|
|
||||||
|
on_stop(_InstId, #{stop_error := true}) ->
|
||||||
|
{error, stop_error};
|
||||||
on_stop(_InstId, #{pid := Pid}) ->
|
on_stop(_InstId, #{pid := Pid}) ->
|
||||||
erlang:exit(Pid, shutdown),
|
erlang:exit(Pid, shutdown),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
on_query(_InstId, get_state, AfterQuery, State) ->
|
on_query(_InstId, get_state, AfterQuery, State) ->
|
||||||
emqx_resource:query_success(AfterQuery),
|
emqx_resource:query_success(AfterQuery),
|
||||||
|
State;
|
||||||
|
on_query(_InstId, get_state_failed, AfterQuery, State) ->
|
||||||
|
emqx_resource:query_failed(AfterQuery),
|
||||||
State.
|
State.
|
||||||
|
|
||||||
|
on_health_check(_InstId, State = #{health_check_error := true}) ->
|
||||||
|
{error, dead, State};
|
||||||
on_health_check(_InstId, State = #{pid := Pid}) ->
|
on_health_check(_InstId, State = #{pid := Pid}) ->
|
||||||
|
timer:sleep(300),
|
||||||
case is_process_alive(Pid) of
|
case is_process_alive(Pid) of
|
||||||
true -> {ok, State};
|
true -> {ok, State};
|
||||||
false -> {error, dead, State}
|
false -> {error, dead, State}
|
||||||
|
|
Loading…
Reference in New Issue