fix(emqx_resource): fix resource leakage
This commit is contained in:
parent
8ac63f823d
commit
acc4ad0542
|
@ -189,7 +189,11 @@ do_create_dry_run(ResourceType, Config) ->
|
||||||
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
||||||
{ok, ResourceState} ->
|
{ok, ResourceState} ->
|
||||||
case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
|
case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
|
||||||
{ok, _} -> ok;
|
{ok, _} ->
|
||||||
|
case emqx_resource:call_stop(InstId, ResourceType, ResourceState) of
|
||||||
|
{error, _} = Error -> Error;
|
||||||
|
_ -> ok
|
||||||
|
end;
|
||||||
{error, Reason, _} -> {error, Reason}
|
{error, Reason, _} -> {error, Reason}
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
-include("emqx_authn.hrl").
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
@ -61,12 +60,12 @@ t_create_remove(_) ->
|
||||||
{error, _} = emqx_resource:check_and_create_local(
|
{error, _} = emqx_resource:check_and_create_local(
|
||||||
?ID,
|
?ID,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{unknown => <<"test_resource">>}),
|
#{unknown => test_resource}),
|
||||||
|
|
||||||
{ok, _} = emqx_resource:create_local(
|
{ok, _} = emqx_resource:create_local(
|
||||||
?ID,
|
?ID,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{name => <<"test_resource">>}),
|
#{name => test_resource}),
|
||||||
|
|
||||||
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
||||||
|
|
||||||
|
@ -81,7 +80,7 @@ t_query(_) ->
|
||||||
{ok, _} = emqx_resource:create_local(
|
{ok, _} = emqx_resource:create_local(
|
||||||
?ID,
|
?ID,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{name => <<"test_resource">>}),
|
#{name => test_resource}),
|
||||||
|
|
||||||
Pid = self(),
|
Pid = self(),
|
||||||
Success = fun() -> Pid ! success end,
|
Success = fun() -> Pid ! success end,
|
||||||
|
@ -105,19 +104,25 @@ t_healthy(_) ->
|
||||||
{ok, _} = emqx_resource:create_local(
|
{ok, _} = emqx_resource:create_local(
|
||||||
?ID,
|
?ID,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{name => <<"test_resource">>}),
|
#{name => test_resource}),
|
||||||
|
|
||||||
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
||||||
|
|
||||||
ok = emqx_resource:health_check(?ID),
|
ok = emqx_resource:health_check(?ID),
|
||||||
|
|
||||||
[#{status := started}] = emqx_resource:list_instances_verbose(),
|
?assertMatch(
|
||||||
|
[#{status := started}],
|
||||||
|
emqx_resource:list_instances_verbose()),
|
||||||
|
|
||||||
erlang:exit(Pid, shutdown),
|
erlang:exit(Pid, shutdown),
|
||||||
|
|
||||||
{error, dead} = emqx_resource:health_check(?ID),
|
?assertEqual(
|
||||||
|
{error, dead},
|
||||||
|
emqx_resource:health_check(?ID)),
|
||||||
|
|
||||||
[#{status := stopped}] = emqx_resource:list_instances_verbose(),
|
?assertMatch(
|
||||||
|
[#{status := stopped}],
|
||||||
|
emqx_resource:list_instances_verbose()),
|
||||||
|
|
||||||
ok = emqx_resource:remove_local(?ID).
|
ok = emqx_resource:remove_local(?ID).
|
||||||
|
|
||||||
|
@ -125,12 +130,12 @@ t_stop_start(_) ->
|
||||||
{error, _} = emqx_resource:check_and_create_local(
|
{error, _} = emqx_resource:check_and_create_local(
|
||||||
?ID,
|
?ID,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{unknown => <<"test_resource">>}),
|
#{unknown => test_resource}),
|
||||||
|
|
||||||
{ok, _} = emqx_resource:create_local(
|
{ok, _} = emqx_resource:create_local(
|
||||||
?ID,
|
?ID,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{name => <<"test_resource">>}),
|
#{name => test_resource}),
|
||||||
|
|
||||||
#{pid := Pid0} = emqx_resource:query(?ID, get_state),
|
#{pid := Pid0} = emqx_resource:query(?ID, get_state),
|
||||||
|
|
||||||
|
@ -160,10 +165,23 @@ t_list_filter(_) ->
|
||||||
#{name => grouped_a}),
|
#{name => grouped_a}),
|
||||||
|
|
||||||
[Id1] = emqx_resource:list_group_instances(<<"default">>),
|
[Id1] = emqx_resource:list_group_instances(<<"default">>),
|
||||||
{ok, #{config := #{name := a}}} = emqx_resource:get_instance(Id1),
|
?assertMatch(
|
||||||
|
{ok, #{config := #{name := a}}},
|
||||||
|
emqx_resource:get_instance(Id1)),
|
||||||
|
|
||||||
[Id2] = emqx_resource:list_group_instances(<<"group">>),
|
[Id2] = emqx_resource:list_group_instances(<<"group">>),
|
||||||
{ok, #{config := #{name := grouped_a}}} = emqx_resource:get_instance(Id2).
|
?assertMatch(
|
||||||
|
{ok, #{config := #{name := grouped_a}}},
|
||||||
|
emqx_resource:get_instance(Id2)).
|
||||||
|
|
||||||
|
t_create_dry_run_local(_) ->
|
||||||
|
?assertEqual(
|
||||||
|
ok,
|
||||||
|
emqx_resource:create_dry_run_local(
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource, register => true})),
|
||||||
|
|
||||||
|
?assertEqual(undefined, whereis(test_resource)).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Helpers
|
%% Helpers
|
||||||
|
|
|
@ -31,16 +31,23 @@
|
||||||
%% callbacks for emqx_resource config schema
|
%% callbacks for emqx_resource config schema
|
||||||
-export([roots/0]).
|
-export([roots/0]).
|
||||||
|
|
||||||
roots() -> [{"name", fun name/1}].
|
roots() -> [{name, fun name/1},
|
||||||
|
{register, fun register/1}].
|
||||||
|
|
||||||
name(type) -> binary();
|
name(type) -> atom();
|
||||||
name(nullable) -> false;
|
name(nullable) -> false;
|
||||||
name(_) -> undefined.
|
name(_) -> undefined.
|
||||||
|
|
||||||
on_start(InstId, #{name := Name}) ->
|
register(type) -> boolean();
|
||||||
|
register(nullable) -> false;
|
||||||
|
register(default) -> false;
|
||||||
|
register(_) -> undefined.
|
||||||
|
|
||||||
|
on_start(InstId, #{name := Name} = Opts) ->
|
||||||
|
Register = maps:get(register, Opts, false),
|
||||||
{ok, #{name => Name,
|
{ok, #{name => Name,
|
||||||
id => InstId,
|
id => InstId,
|
||||||
pid => spawn_dummy_process()}}.
|
pid => spawn_dummy_process(Name, Register)}}.
|
||||||
|
|
||||||
on_stop(_InstId, #{pid := Pid}) ->
|
on_stop(_InstId, #{pid := Pid}) ->
|
||||||
erlang:exit(Pid, shutdown),
|
erlang:exit(Pid, shutdown),
|
||||||
|
@ -59,9 +66,13 @@ on_health_check(_InstId, State = #{pid := Pid}) ->
|
||||||
on_config_merge(OldConfig, NewConfig, _Params) ->
|
on_config_merge(OldConfig, NewConfig, _Params) ->
|
||||||
maps:merge(OldConfig, NewConfig).
|
maps:merge(OldConfig, NewConfig).
|
||||||
|
|
||||||
spawn_dummy_process() ->
|
spawn_dummy_process(Name, Register) ->
|
||||||
spawn(
|
spawn(
|
||||||
fun() ->
|
fun() ->
|
||||||
|
true = case Register of
|
||||||
|
true -> register(Name, self());
|
||||||
|
_ -> true
|
||||||
|
end,
|
||||||
Ref = make_ref(),
|
Ref = make_ref(),
|
||||||
receive
|
receive
|
||||||
Ref -> ok
|
Ref -> ok
|
||||||
|
|
Loading…
Reference in New Issue