Merge remote-tracking branch 'origin/release-v43' into release-v44

This commit is contained in:
Zaiming (Stone) Shi 2022-11-07 22:16:05 +01:00
commit 45eed402d4
5 changed files with 43 additions and 14 deletions

View File

@ -79,8 +79,6 @@
, action_instance_params/0
]).
-define(T_RETRY, 60000).
%% redefine this macro to confine the appup scope
-undef(RAISE).
-define(RAISE(_EXP_, _ERROR_CONTEXT_),
@ -684,8 +682,7 @@ init_resource_with_retrier(Module, OnCreate, ResId, Config) ->
status = #{is_alive => true}},
emqx_rule_registry:add_resource_params(ResParams)
catch Class:Reason:ST ->
Interval = persistent_term:get({emqx_rule_engine, resource_restart_interval}, ?T_RETRY),
emqx_rule_monitor:ensure_resource_retrier(ResId, Interval),
emqx_rule_monitor:ensure_resource_retrier(ResId),
erlang:raise(Class, {init_resource, Reason}, ST)
end.

View File

@ -32,10 +32,18 @@
-export([ start_link/0
, stop/0
, async_refresh_resources_rules/0
, ensure_resource_retrier/2
, ensure_resource_retrier/1
, retry_loop/3
]).
%% fot test
-export([ put_retry_interval/1
, get_retry_interval/0
, erase_retry_interval/0
]).
-define(T_RETRY, 60000).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@ -46,10 +54,22 @@ init([]) ->
_ = erlang:process_flag(trap_exit, true),
{ok, #{retryers => #{}}}.
put_retry_interval(I) when is_integer(I) andalso I >= 10 ->
_ = persistent_term:put({?MODULE, resource_restart_interval}, I),
ok.
erase_retry_interval() ->
_ = persistent_term:erase({?MODULE, resource_restart_interval}),
ok.
get_retry_interval() ->
persistent_term:get({?MODULE, resource_restart_interval}, ?T_RETRY).
async_refresh_resources_rules() ->
gen_server:cast(?MODULE, async_refresh).
ensure_resource_retrier(ResId, Interval) ->
ensure_resource_retrier(ResId) ->
Interval = get_retry_interval(),
gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}).
handle_call(_Msg, _From, State) ->
@ -111,11 +131,12 @@ update_object(Tag, Obj, Retryer, State) ->
}.
create_restart_handler(Tag, Obj, Interval) ->
?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]),
?LOG(info, "starting_a_retry_loop for ~p ~p, with delay interval: ~p", [Tag, Obj, Interval]),
%% spawn a dedicated process to handle the restarting asynchronously
spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]).
retry_loop(resource, ResId, Interval) ->
timer:sleep(Interval),
case emqx_rule_registry:find_resource(ResId) of
{ok, #resource{type = Type, config = Config}} ->
try
@ -124,10 +145,15 @@ retry_loop(resource, ResId, Interval) ->
ok = emqx_rule_engine:init_resource(M, F, ResId, Config),
refresh_and_enable_rules_of_resource(ResId)
catch
Err:Reason:ST ->
?LOG(warning, "init_resource failed: ~p, ~0p",
[{Err, Reason}, ST]),
timer:sleep(Interval),
Err:Reason:Stacktrace ->
%% do not log stacktrace if it's a throw
LogContext =
case Err of
throw -> Reason;
_ -> {Reason, Stacktrace}
end,
?LOG_SENSITIVE(warning, "init_resource_retry_failed ~p, ~0p", [ResId, LogContext]),
%% keep looping
?MODULE:retry_loop(resource, ResId, Interval)
end;
not_found ->

View File

@ -48,7 +48,7 @@ end_per_suite(_Config) ->
ok.
init_per_testcase(t_restart_resource, Config) ->
persistent_term:put({emqx_rule_engine, resource_restart_interval}, 100),
emqx_rule_monitor:put_retry_interval(100),
Opts = [public, named_table, set, {read_concurrency, true}],
_ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
ets:new(t_restart_resource, [named_table, public]),
@ -77,7 +77,6 @@ init_per_testcase(_, Config) ->
Config.
end_per_testcase(t_restart_resource, Config) ->
persistent_term:put({emqx_rule_engine, resource_restart_interval}, 60000),
ets:delete(t_restart_resource),
common_end_per_testcases(),
Config;
@ -91,7 +90,9 @@ end_per_testcase(_, Config) ->
common_init_per_testcase() ->
{ok, _} = emqx_rule_monitor:start_link().
common_end_per_testcases() ->
emqx_rule_monitor:erase_retry_interval(),
emqx_rule_monitor:stop().
t_restart_resource(_) ->

View File

@ -36,13 +36,15 @@
For example: `acl_order = jwt,http`, this will make sure `jwt` is always checked before `http`,
meaning if JWT is not found (or no `acl` cliam) for a client, then the ACL check will fallback to use the HTTP backend.
- Added configurations to enable more `client.disconnected` events (and counter bumps) [#9267](https://github.com/emqx/emqx/pull/9267).
Prior to this change, the `client.disconnected` event (and counter bump) is triggered when a client
performs a 'normal' disconnect, or is 'kicked' by system admin, but NOT triggered when a
stale connection had to be 'discarded' (for clean session) or 'takenover' (for non-clean session).
Now it is possible to set configs `broker.client_disconnect_discarded` and `broker.client_disconnect_takenover` to `on` to enable the event in these scenarios.
- For Rule-Engine resource creation failure, delay before the first retry [#9313](https://github.com/emqx/emqx/pull/9313).
Prior to this change, the retry delay was added *after* the retry failure.
## Bug fixes
- Fix that after uploading a backup file with an non-ASCII filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).

View File

@ -37,6 +37,9 @@
但不会在旧 session 被废弃 (clean_session = true) 或旧 session 被接管 (clean_session = false) 时被触发。
可将 `broker.client_disconnect_discarded``broker.client_disconnect_takovered` 选项设置为 `on` 来启用此场景下的客户端断连事件。
- 规则引擎资源创建失败后,第一次重试前增加一个延迟 [#9313](https://github.com/emqx/emqx/pull/9313)。
在此之前,重试的延迟发生在重试失败之后。
## 修复
- 修复若上传的备份文件名中包含非 ASCII 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。