diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 3b82c915c..46fb20666 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -65,6 +65,8 @@ , action_instance_params/0 ]). +-define(T_RETRY, 60000). + %%------------------------------------------------------------------------------ %% Load resource/action providers from all available applications %%------------------------------------------------------------------------------ @@ -217,7 +219,7 @@ delete_rule(RuleId) -> catch Error:Reason:ST -> ?LOG(error, "clear_rule ~p failed: ~p", [RuleId, {Error, Reason, ST}]), - refresh_actions(Actions, fun(_) -> true end) + refresh_actions(Actions) end; not_found -> ok @@ -388,16 +390,8 @@ refresh_resource(Type) when is_atom(Type) -> lists:foreach(fun refresh_resource/1, emqx_rule_registry:get_resources_by_type(Type)); -refresh_resource(#resource{id = ResId, config = Config, type = Type}) -> - {ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type), - try cluster_call(init_resource, [M, F, ResId, Config]) - catch Error:Reason:ST -> - logger:critical( - "Can not re-stablish resource ~p: ~0p. The resource is disconnected." - "Fix the issue and establish it manually.\n" - "Stacktrace: ~0p", - [ResId, {Error, Reason}, ST]) - end. +refresh_resource(#resource{id = ResId}) -> + emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY). -spec(refresh_rules() -> ok). refresh_rules() -> @@ -414,7 +408,7 @@ refresh_rules() -> refresh_rule(#rule{id = RuleId, actions = Actions}) -> ok = emqx_rule_metrics:create_rule_metrics(RuleId), - refresh_actions(Actions, fun(_) -> true end). + refresh_actions(Actions). -spec(refresh_resource_status() -> ok). refresh_resource_status() -> @@ -529,10 +523,7 @@ cluster_call(Func, Args) -> end. init_resource(Module, OnCreate, ResId, Config) -> - Params = ?RAISE( - begin - Module:OnCreate(ResId, Config) - end, + Params = ?RAISE(Module:OnCreate(ResId, Config), {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}), ResParams = #resource_params{id = ResId, params = Params, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl index 6f2e0018f..d561340bb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl @@ -27,7 +27,7 @@ -export([init/1]). start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). + supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> Opts = [public, named_table, set, {read_concurrency, true}], @@ -45,7 +45,13 @@ init([]) -> shutdown => 5000, type => worker, modules => [emqx_rule_metrics]}, - {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}. + Monitor = #{id => emqx_rule_monitor, + start => {emqx_rule_monitor, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_rule_monitor]}, + {ok, {{one_for_one, 10, 10}, [Registry, Metrics, Monitor]}}. start_locker() -> Locker = #{id => emqx_rule_locker, diff --git a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl new file mode 100644 index 000000000..b4c1ff2fc --- /dev/null +++ b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl @@ -0,0 +1,117 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_monitor). + +-behavior(gen_server). + +-include("rule_engine.hrl"). +-include_lib("emqx/include/logger.hrl"). +-logger_header("[Rule Monitor]"). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-export([ start_link/0 + , stop/0 + , ensure_resource_retrier/2 + , retry_loop/3 + ]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +stop() -> + gen_server:stop(?MODULE). + +init([]) -> + _ = erlang:process_flag(trap_exit, true), + {ok, #{retryers => #{}}}. + +ensure_resource_retrier(ResId, Interval) -> + gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}). + +handle_call(_Msg, _From, State) -> + {reply, ok, State}. + +handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> + Objects = maps:get(Tag, State, #{}), + NewState = case maps:find(Obj, Objects) of + error -> + update_object(Tag, Obj, + create_restart_handler(Tag, Obj, Interval), State); + {ok, _Pid} -> + State + end, + {noreply, NewState}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) -> + case maps:take(Pid, Retryers) of + {{Tag, Obj}, Retryers2} -> + Objects = maps:get(Tag, State, #{}), + {noreply, State#{Tag => maps:remove(Obj, Objects), + retryers => Retryers2}}; + error -> + ?LOG(error, "got unexpected proc down: ~p ~p", [Pid, Reason]), + {noreply, State} + end; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +update_object(Tag, Obj, Retryer, State) -> + Objects = maps:get(Tag, State, #{}), + Retryers = maps:get(retryers, State, #{}), + State#{ + Tag => Objects#{Obj => Retryer}, + retryers => Retryers#{Retryer => {Tag, Obj}} + }. + +create_restart_handler(Tag, Obj, Interval) -> + ?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]), + %% spawn a dedicated process to handle the restarting asynchronously + spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]). + +retry_loop(resource, ResId, Interval) -> + case emqx_rule_registry:find_resource(ResId) of + {ok, #resource{type = Type, config = Config}} -> + try + {ok, #resource_type{on_create = {M, F}}} = + emqx_rule_registry:find_resource_type(Type), + emqx_rule_engine:init_resource(M, F, ResId, Config) + catch + Err:Reason:ST -> + ?LOG(warning, "init_resource failed: ~p, ~0p", + [{Err, Reason}, ST]), + timer:sleep(Interval), + ?MODULE:retry_loop(resource, ResId, Interval) + end; + not_found -> + ok + end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl new file mode 100644 index 000000000..fe44d8df7 --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -0,0 +1,107 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_monitor_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). +-include_lib("emqx/include/emqx.hrl"). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + [ {group, resource} + ]. + +suite() -> + [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. + +groups() -> + [{resource, [sequence], + [ t_restart_resource + ]} + ]. + +init_per_suite(Config) -> + ok = ekka_mnesia:start(), + ok = emqx_rule_registry:mnesia(boot), + Config. + +end_per_suite(_Config) -> + ok. + +init_per_testcase(t_restart_resource, Config) -> + Opts = [public, named_table, set, {read_concurrency, true}], + _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), + ets:new(t_restart_resource, [named_table, public]), + ets:insert(t_restart_resource, {failed_count, 0}), + ets:insert(t_restart_resource, {succ_count, 0}), + Config; + +init_per_testcase(_, Config) -> + Config. + +end_per_testcase(t_restart_resource, Config) -> + ets:delete(t_restart_resource), + Config; +end_per_testcase(_, Config) -> + Config. + +t_restart_resource(_) -> + {ok, _} = emqx_rule_monitor:start_link(), + ok = emqx_rule_registry:register_resource_types( + [#resource_type{ + name = test_res_1, + provider = ?APP, + params_spec = #{}, + on_create = {?MODULE, on_resource_create}, + on_destroy = {?MODULE, on_resource_destroy}, + on_status = {?MODULE, on_get_resource_status}, + title = #{en => <<"Test Resource">>}, + description = #{en => <<"Test Resource">>}}]), + ok = emqx_rule_engine:load_providers(), + {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( + #{type => test_res_1, + config => #{}, + description => <<"debug resource">>}), + [{_, 1}] = ets:lookup(t_restart_resource, failed_count), + [{_, 0}] = ets:lookup(t_restart_resource, succ_count), + ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]), + emqx_rule_monitor:ensure_resource_retrier(ResId, 100), + timer:sleep(1000), + [{_, 5}] = ets:lookup(t_restart_resource, failed_count), + [{_, 1}] = ets:lookup(t_restart_resource, succ_count), + #{retryers := Pids} = sys:get_state(whereis(emqx_rule_monitor)), + ?assertEqual(0, map_size(Pids)), + ok = emqx_rule_engine:unload_providers(), + emqx_rule_registry:remove_resource(ResId), + emqx_rule_monitor:stop(), + ok. + +on_resource_create(Id, _) -> + case ets:lookup(t_restart_resource, failed_count) of + [{_, 5}] -> + ets:insert(t_restart_resource, {succ_count, 1}), + #{}; + [{_, N}] -> + ets:insert(t_restart_resource, {failed_count, N+1}), + error({incorrect_params, Id}) + end. +on_resource_destroy(_Id, _) -> ok. +on_get_resource_status(_Id, _) -> #{}.