fix: tmp probing resources are not removed after testing resources

This commit is contained in:
Shawn 2022-12-14 13:52:24 +08:00
parent 4760d8715e
commit 781c8949b3
2 changed files with 132 additions and 8 deletions

View File

@ -16,6 +16,8 @@
-module(emqx_rule_engine_api).
-behaviour(gen_server).
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
@ -156,6 +158,17 @@
descr => "List all events with detailed info"
}).
-export([start_link/0]).
% gen_server Callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([ create_rule/2
, update_rule/2
, list_rules/2
@ -207,10 +220,90 @@
<<"Bad Arguments: ", R0/binary>>
end).
-define(T_CALL, 30000).
start_link() ->
%% The caller process (the cowboy process serves the HTTP request) may times out and dies
%% before some time-consuming operations complete, e.g. creating rules/resources or testing
%% the connectivity on unreachable resources.
%% To avoid this problem, we delegate the operations to a gen_server.
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
create_rule(_Bindings, Params) ->
delegate_call({create_rule, _Bindings, Params}).
update_rule(_Bindings, Params) ->
delegate_call({update_rule, _Bindings, Params}).
delete_rule(_Bindings, Params) ->
delegate_call({delete_rule, _Bindings, Params}).
create_resource(_Bindings, Params) ->
delegate_call({create_resource, _Bindings, Params}).
update_resource(_Bindings, Params) ->
delegate_call({update_resource, _Bindings, Params}).
start_resource(_Bindings, Params) ->
delegate_call({start_resource, _Bindings, Params}).
delete_resource(_Bindings, Params) ->
delegate_call({delete_resource, _Bindings, Params}).
%% delegate API calls to a single process.
delegate_call(Req) ->
gen_server:call(?MODULE, Req, ?T_CALL).
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------
init([]) ->
{ok, #{}}.
handle_call({create_rule, _Bindings, Params}, _From, State) ->
{reply, delegate_create_rule(_Bindings, Params), State};
handle_call({update_rule, _Bindings, Params}, _From, State) ->
{reply, delegate_update_rule(_Bindings, Params), State};
handle_call({delete_rule, _Bindings, Params}, _From, State) ->
{reply, delegate_delete_rule(_Bindings, Params), State};
handle_call({create_resource, _Bindings, Params}, _From, State) ->
{reply, delegate_create_resource(_Bindings, Params), State};
handle_call({start_resource, _Bindings, Params}, _From, State) ->
{reply, delegate_start_resource(_Bindings, Params), State};
handle_call({update_resource, _Bindings, Params}, _From, State) ->
{reply, delegate_update_resource(_Bindings, Params), State};
handle_call({delete_resource, _Bindings, Params}, _From, State) ->
{reply, delegate_delete_resource(_Bindings, Params), State};
handle_call(Req, _From, State) ->
?LOG(error, "unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------
%% Rules API
%%------------------------------------------------------------------------------
create_rule(_Bindings, Params) ->
delegate_create_rule(_Bindings, Params) ->
if_test(fun() -> test_rule_sql(Params) end,
fun() -> do_create_rule(Params) end,
Params).
@ -250,7 +343,7 @@ do_create_rule2(ParsedParams) ->
return({error, 400, ?ERR_BADARGS(Reason)})
end.
update_rule(#{id := Id0}, Params) ->
delegate_update_rule(#{id := Id0}, Params) ->
Id = urldecode(Id0),
case parse_rule_params(Params, #{id => Id}) of
{ok, ParsedParams} ->
@ -280,7 +373,7 @@ show_rule(#{id := Id0}, _Params) ->
Id = urldecode(Id0),
reply_with(fun emqx_rule_registry:get_rule/1, Id).
delete_rule(#{id := Id0}, _Params) ->
delegate_delete_rule(#{id := Id0}, _Params) ->
Id = urldecode(Id0),
ok = emqx_rule_engine:delete_rule(Id),
return(ok).
@ -309,7 +402,7 @@ show_action(#{name := Name}, _Params) ->
%%------------------------------------------------------------------------------
%% Resources API
%%------------------------------------------------------------------------------
create_resource(#{}, Params) ->
delegate_create_resource(#{}, Params) ->
case parse_resource_params(Params) of
{ok, ParsedParams} ->
if_test(fun() -> do_create_resource(test_resource, maps:without([id], ParsedParams)) end,
@ -382,7 +475,7 @@ get_resource_status(#{id := Id0}, _Params) ->
return({error, 400, ?ERR_NO_RESOURCE(Id)})
end.
start_resource(#{id := Id0}, _Params) ->
delegate_start_resource(#{id := Id0}, _Params) ->
Id = urldecode(Id0),
case emqx_rule_engine:start_resource(Id) of
ok ->
@ -394,7 +487,7 @@ start_resource(#{id := Id0}, _Params) ->
return({error, 400, ?ERR_BADARGS(Reason)})
end.
update_resource(#{id := Id0}, NewParams) ->
delegate_update_resource(#{id := Id0}, NewParams) ->
Id = urldecode(Id0),
P1 = case proplists:get_value(<<"description">>, NewParams) of
undefined -> #{};
@ -419,7 +512,7 @@ update_resource(#{id := Id0}, NewParams) ->
return({error, 400, ?ERR_BADARGS(Reason)})
end.
delete_resource(#{id := Id0}, _Params) ->
delegate_delete_resource(#{id := Id0}, _Params) ->
Id = urldecode(Id0),
case emqx_rule_engine:delete_resource(Id) of
ok -> return(ok);

View File

@ -24,6 +24,8 @@
-export([ start_locker/0
, start_jwt_sup/0
, ensure_api_delegator_started/0
, ensure_api_delegator_stopped/0
]).
-export([init/1]).
@ -58,7 +60,8 @@ init([]) ->
type => worker,
modules => [emqx_rule_monitor]},
JWTSup = jwt_sup_child_spec(),
{ok, {SupFlags, [Registry, Metrics, Monitor, JWTSup]}}.
API = api_delegator_sup_spec(),
{ok, {SupFlags, [Registry, Metrics, Monitor, JWTSup, API]}}.
start_locker() ->
Locker = #{id => emqx_rule_locker,
@ -97,3 +100,31 @@ ensure_table(Name, Opts) ->
error:badarg ->
ok
end.
%% This is called by the emqx_rule_engine.appup.src when release upgrade
ensure_api_delegator_started() ->
case supervisor:start_child(?MODULE, api_delegator_sup_spec()) of
{ok, _} -> ok;
{error, already_present} -> ok;
{error, {already_started, _Pid}} -> ok;
{error, _} = Err -> throw({failed_to_start_ensure_api, Err})
end.
%% This is called by the emqx_rule_engine.appup.src when release downgrade
ensure_api_delegator_stopped() ->
case supervisor:terminate_child(?MODULE, emqx_rule_engine_api) of
ok ->
%% don't crash if delete failed
supervisor:delete_child(?MODULE, emqx_rule_engine_api);
{error, not_found} -> ok
end.
api_delegator_sup_spec() ->
#{
id => emqx_rule_engine_api,
start => {emqx_rule_engine_api, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_rule_engine_api]
}.