Merge pull request #6832 from savonarola/fix-resource-leakage
fix(emqx_resource): fix resource leakage
This commit is contained in:
commit
7c5e638732
|
@ -189,7 +189,11 @@ do_create_dry_run(ResourceType, Config) ->
|
|||
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
||||
{ok, ResourceState} ->
|
||||
case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
|
||||
{ok, _} -> ok;
|
||||
{ok, _} ->
|
||||
case emqx_resource:call_stop(InstId, ResourceType, ResourceState) of
|
||||
{error, _} = Error -> Error;
|
||||
_ -> ok
|
||||
end;
|
||||
{error, Reason, _} -> {error, Reason}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include("emqx_authn.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
|
@ -61,12 +60,12 @@ t_create_remove(_) ->
|
|||
{error, _} = emqx_resource:check_and_create_local(
|
||||
?ID,
|
||||
?TEST_RESOURCE,
|
||||
#{unknown => <<"test_resource">>}),
|
||||
#{unknown => test_resource}),
|
||||
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?ID,
|
||||
?TEST_RESOURCE,
|
||||
#{name => <<"test_resource">>}),
|
||||
#{name => test_resource}),
|
||||
|
||||
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
||||
|
||||
|
@ -81,7 +80,7 @@ t_query(_) ->
|
|||
{ok, _} = emqx_resource:create_local(
|
||||
?ID,
|
||||
?TEST_RESOURCE,
|
||||
#{name => <<"test_resource">>}),
|
||||
#{name => test_resource}),
|
||||
|
||||
Pid = self(),
|
||||
Success = fun() -> Pid ! success end,
|
||||
|
@ -105,19 +104,25 @@ t_healthy(_) ->
|
|||
{ok, _} = emqx_resource:create_local(
|
||||
?ID,
|
||||
?TEST_RESOURCE,
|
||||
#{name => <<"test_resource">>}),
|
||||
#{name => test_resource}),
|
||||
|
||||
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
||||
|
||||
ok = emqx_resource:health_check(?ID),
|
||||
|
||||
[#{status := started}] = emqx_resource:list_instances_verbose(),
|
||||
?assertMatch(
|
||||
[#{status := started}],
|
||||
emqx_resource:list_instances_verbose()),
|
||||
|
||||
erlang:exit(Pid, shutdown),
|
||||
|
||||
{error, dead} = emqx_resource:health_check(?ID),
|
||||
?assertEqual(
|
||||
{error, dead},
|
||||
emqx_resource:health_check(?ID)),
|
||||
|
||||
[#{status := stopped}] = emqx_resource:list_instances_verbose(),
|
||||
?assertMatch(
|
||||
[#{status := stopped}],
|
||||
emqx_resource:list_instances_verbose()),
|
||||
|
||||
ok = emqx_resource:remove_local(?ID).
|
||||
|
||||
|
@ -125,12 +130,12 @@ t_stop_start(_) ->
|
|||
{error, _} = emqx_resource:check_and_create_local(
|
||||
?ID,
|
||||
?TEST_RESOURCE,
|
||||
#{unknown => <<"test_resource">>}),
|
||||
#{unknown => test_resource}),
|
||||
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?ID,
|
||||
?TEST_RESOURCE,
|
||||
#{name => <<"test_resource">>}),
|
||||
#{name => test_resource}),
|
||||
|
||||
#{pid := Pid0} = emqx_resource:query(?ID, get_state),
|
||||
|
||||
|
@ -160,10 +165,23 @@ t_list_filter(_) ->
|
|||
#{name => grouped_a}),
|
||||
|
||||
[Id1] = emqx_resource:list_group_instances(<<"default">>),
|
||||
{ok, #{config := #{name := a}}} = emqx_resource:get_instance(Id1),
|
||||
?assertMatch(
|
||||
{ok, #{config := #{name := a}}},
|
||||
emqx_resource:get_instance(Id1)),
|
||||
|
||||
[Id2] = emqx_resource:list_group_instances(<<"group">>),
|
||||
{ok, #{config := #{name := grouped_a}}} = emqx_resource:get_instance(Id2).
|
||||
?assertMatch(
|
||||
{ok, #{config := #{name := grouped_a}}},
|
||||
emqx_resource:get_instance(Id2)).
|
||||
|
||||
t_create_dry_run_local(_) ->
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_resource:create_dry_run_local(
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource, register => true})),
|
||||
|
||||
?assertEqual(undefined, whereis(test_resource)).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helpers
|
||||
|
|
|
@ -31,16 +31,23 @@
|
|||
%% callbacks for emqx_resource config schema
|
||||
-export([roots/0]).
|
||||
|
||||
roots() -> [{"name", fun name/1}].
|
||||
roots() -> [{name, fun name/1},
|
||||
{register, fun register/1}].
|
||||
|
||||
name(type) -> binary();
|
||||
name(type) -> atom();
|
||||
name(nullable) -> false;
|
||||
name(_) -> undefined.
|
||||
|
||||
on_start(InstId, #{name := Name}) ->
|
||||
register(type) -> boolean();
|
||||
register(nullable) -> false;
|
||||
register(default) -> false;
|
||||
register(_) -> undefined.
|
||||
|
||||
on_start(InstId, #{name := Name} = Opts) ->
|
||||
Register = maps:get(register, Opts, false),
|
||||
{ok, #{name => Name,
|
||||
id => InstId,
|
||||
pid => spawn_dummy_process()}}.
|
||||
pid => spawn_dummy_process(Name, Register)}}.
|
||||
|
||||
on_stop(_InstId, #{pid := Pid}) ->
|
||||
erlang:exit(Pid, shutdown),
|
||||
|
@ -59,9 +66,13 @@ on_health_check(_InstId, State = #{pid := Pid}) ->
|
|||
on_config_merge(OldConfig, NewConfig, _Params) ->
|
||||
maps:merge(OldConfig, NewConfig).
|
||||
|
||||
spawn_dummy_process() ->
|
||||
spawn_dummy_process(Name, Register) ->
|
||||
spawn(
|
||||
fun() ->
|
||||
true = case Register of
|
||||
true -> register(Name, self());
|
||||
_ -> true
|
||||
end,
|
||||
Ref = make_ref(),
|
||||
receive
|
||||
Ref -> ok
|
||||
|
|
Loading…
Reference in New Issue