commit
dae7371f3b
|
@ -132,8 +132,8 @@ jobs:
|
||||||
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
|
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
|
||||||
- uses: actions/upload-artifact@v3
|
- uses: actions/upload-artifact@v3
|
||||||
with:
|
with:
|
||||||
name: ${EMQX_NAME}-${{ matrix.otp }}
|
name: ${{ env.EMQX_NAME }}-${{ matrix.otp }}
|
||||||
path: _packages/${EMQX_NAME}/
|
path: _packages/${{ env.EMQX_NAME }}/
|
||||||
|
|
||||||
linux:
|
linux:
|
||||||
runs-on: ubuntu-20.04
|
runs-on: ubuntu-20.04
|
||||||
|
|
2
Makefile
2
Makefile
|
@ -21,7 +21,7 @@ REL_PROFILES := emqx emqx-edge
|
||||||
PKG_PROFILES := emqx-pkg emqx-edge-pkg
|
PKG_PROFILES := emqx-pkg emqx-edge-pkg
|
||||||
PROFILES := $(REL_PROFILES) $(PKG_PROFILES) default
|
PROFILES := $(REL_PROFILES) $(PKG_PROFILES) default
|
||||||
|
|
||||||
CT_READABLE ?= false
|
CT_READABLE ?= true
|
||||||
|
|
||||||
export REBAR_GIT_CLONE_OPTIONS += --depth=1
|
export REBAR_GIT_CLONE_OPTIONS += --depth=1
|
||||||
|
|
||||||
|
|
|
@ -79,18 +79,19 @@
|
||||||
, action_instance_params/0
|
, action_instance_params/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(T_RETRY, 60000).
|
|
||||||
|
|
||||||
%% redefine this macro to confine the appup scope
|
%% redefine this macro to confine the appup scope
|
||||||
-undef(RAISE).
|
-undef(RAISE).
|
||||||
-define(RAISE(_EXP_, _ERROR_CONTEXT_),
|
-define(RAISE(_EXP_, _ERROR_CONTEXT_),
|
||||||
|
?RAISE(_EXP_, do_nothing, _ERROR_CONTEXT_)).
|
||||||
|
-define(RAISE(_EXP_, _EXP_ON_FAIL_, _ERROR_CONTEXT_),
|
||||||
fun() ->
|
fun() ->
|
||||||
try (_EXP_)
|
try (_EXP_)
|
||||||
catch
|
catch
|
||||||
throw : Reason ->
|
throw : Reason ->
|
||||||
throw({_ERROR_CONTEXT_, Reason});
|
throw({_ERROR_CONTEXT_, Reason});
|
||||||
_EXCLASS_:_EXCPTION_:_ST_ ->
|
_EXCLASS_:_EXCPTION_:_ST_ ->
|
||||||
throw({_ERROR_CONTEXT_, {_EXCPTION_, _EXCPTION_, _ST_}})
|
_EXP_ON_FAIL_,
|
||||||
|
throw({_ERROR_CONTEXT_, {_EXCLASS_, _EXCPTION_, _ST_}})
|
||||||
end
|
end
|
||||||
end()).
|
end()).
|
||||||
|
|
||||||
|
@ -498,7 +499,12 @@ refresh_resource(Type) when is_atom(Type) ->
|
||||||
refresh_resource(#resource{id = ResId, type = Type, config = Config}) ->
|
refresh_resource(#resource{id = ResId, type = Type, config = Config}) ->
|
||||||
{ok, #resource_type{on_create = {M, F}}} =
|
{ok, #resource_type{on_create = {M, F}}} =
|
||||||
emqx_rule_registry:find_resource_type(Type),
|
emqx_rule_registry:find_resource_type(Type),
|
||||||
ok = emqx_rule_engine:init_resource_with_retrier(M, F, ResId, Config).
|
try
|
||||||
|
init_resource_with_retrier(M, F, ResId, Config)
|
||||||
|
catch
|
||||||
|
throw:Reason ->
|
||||||
|
?LOG_SENSITIVE(warning, "refresh_resource failed: ~0p", [Reason])
|
||||||
|
end.
|
||||||
|
|
||||||
-spec(refresh_rules_when_boot() -> ok).
|
-spec(refresh_rules_when_boot() -> ok).
|
||||||
refresh_rules_when_boot() ->
|
refresh_rules_when_boot() ->
|
||||||
|
@ -677,17 +683,12 @@ init_resource(Module, OnCreate, ResId, Config) ->
|
||||||
emqx_rule_registry:add_resource_params(ResParams).
|
emqx_rule_registry:add_resource_params(ResParams).
|
||||||
|
|
||||||
init_resource_with_retrier(Module, OnCreate, ResId, Config) ->
|
init_resource_with_retrier(Module, OnCreate, ResId, Config) ->
|
||||||
try
|
Params = ?RAISE(Module:OnCreate(ResId, Config),
|
||||||
Params = Module:OnCreate(ResId, Config),
|
emqx_rule_monitor:ensure_resource_retrier(ResId), {Module, OnCreate}),
|
||||||
ResParams = #resource_params{id = ResId,
|
ResParams = #resource_params{id = ResId,
|
||||||
params = Params,
|
params = Params,
|
||||||
status = #{is_alive => true}},
|
status = #{is_alive => true}},
|
||||||
emqx_rule_registry:add_resource_params(ResParams)
|
emqx_rule_registry:add_resource_params(ResParams).
|
||||||
catch Class:Reason:ST ->
|
|
||||||
Interval = persistent_term:get({emqx_rule_engine, resource_restart_interval}, ?T_RETRY),
|
|
||||||
emqx_rule_monitor:ensure_resource_retrier(ResId, Interval),
|
|
||||||
erlang:raise(Class, {init_resource, Reason}, ST)
|
|
||||||
end.
|
|
||||||
|
|
||||||
init_action(Module, OnCreate, ActionInstId, Params) ->
|
init_action(Module, OnCreate, ActionInstId, Params) ->
|
||||||
ok = emqx_rule_metrics:create_metrics(ActionInstId),
|
ok = emqx_rule_metrics:create_metrics(ActionInstId),
|
||||||
|
|
|
@ -32,10 +32,18 @@
|
||||||
-export([ start_link/0
|
-export([ start_link/0
|
||||||
, stop/0
|
, stop/0
|
||||||
, async_refresh_resources_rules/0
|
, async_refresh_resources_rules/0
|
||||||
, ensure_resource_retrier/2
|
, ensure_resource_retrier/1
|
||||||
, retry_loop/3
|
, retry_loop/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% fot test
|
||||||
|
-export([ put_retry_interval/1
|
||||||
|
, get_retry_interval/0
|
||||||
|
, erase_retry_interval/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(T_RETRY, 60000).
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
@ -46,10 +54,22 @@ init([]) ->
|
||||||
_ = erlang:process_flag(trap_exit, true),
|
_ = erlang:process_flag(trap_exit, true),
|
||||||
{ok, #{retryers => #{}}}.
|
{ok, #{retryers => #{}}}.
|
||||||
|
|
||||||
|
put_retry_interval(I) when is_integer(I) andalso I >= 10 ->
|
||||||
|
_ = persistent_term:put({?MODULE, resource_restart_interval}, I),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
erase_retry_interval() ->
|
||||||
|
_ = persistent_term:erase({?MODULE, resource_restart_interval}),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
get_retry_interval() ->
|
||||||
|
persistent_term:get({?MODULE, resource_restart_interval}, ?T_RETRY).
|
||||||
|
|
||||||
async_refresh_resources_rules() ->
|
async_refresh_resources_rules() ->
|
||||||
gen_server:cast(?MODULE, async_refresh).
|
gen_server:cast(?MODULE, async_refresh).
|
||||||
|
|
||||||
ensure_resource_retrier(ResId, Interval) ->
|
ensure_resource_retrier(ResId) ->
|
||||||
|
Interval = get_retry_interval(),
|
||||||
gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}).
|
gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}).
|
||||||
|
|
||||||
handle_call(_Msg, _From, State) ->
|
handle_call(_Msg, _From, State) ->
|
||||||
|
@ -111,11 +131,12 @@ update_object(Tag, Obj, Retryer, State) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
create_restart_handler(Tag, Obj, Interval) ->
|
create_restart_handler(Tag, Obj, Interval) ->
|
||||||
?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]),
|
?LOG(info, "starting_a_retry_loop for ~p ~p, with delay interval: ~p", [Tag, Obj, Interval]),
|
||||||
%% spawn a dedicated process to handle the restarting asynchronously
|
%% spawn a dedicated process to handle the restarting asynchronously
|
||||||
spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]).
|
spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]).
|
||||||
|
|
||||||
retry_loop(resource, ResId, Interval) ->
|
retry_loop(resource, ResId, Interval) ->
|
||||||
|
timer:sleep(Interval),
|
||||||
case emqx_rule_registry:find_resource(ResId) of
|
case emqx_rule_registry:find_resource(ResId) of
|
||||||
{ok, #resource{type = Type, config = Config}} ->
|
{ok, #resource{type = Type, config = Config}} ->
|
||||||
try
|
try
|
||||||
|
@ -124,10 +145,15 @@ retry_loop(resource, ResId, Interval) ->
|
||||||
ok = emqx_rule_engine:init_resource(M, F, ResId, Config),
|
ok = emqx_rule_engine:init_resource(M, F, ResId, Config),
|
||||||
refresh_and_enable_rules_of_resource(ResId)
|
refresh_and_enable_rules_of_resource(ResId)
|
||||||
catch
|
catch
|
||||||
Err:Reason:ST ->
|
Err:Reason:Stacktrace ->
|
||||||
?LOG(warning, "init_resource failed: ~p, ~0p",
|
%% do not log stacktrace if it's a throw
|
||||||
[{Err, Reason}, ST]),
|
LogContext =
|
||||||
timer:sleep(Interval),
|
case Err of
|
||||||
|
throw -> Reason;
|
||||||
|
_ -> {Reason, Stacktrace}
|
||||||
|
end,
|
||||||
|
?LOG_SENSITIVE(warning, "init_resource_retry_failed ~p, ~0p", [ResId, LogContext]),
|
||||||
|
%% keep looping
|
||||||
?MODULE:retry_loop(resource, ResId, Interval)
|
?MODULE:retry_loop(resource, ResId, Interval)
|
||||||
end;
|
end;
|
||||||
not_found ->
|
not_found ->
|
||||||
|
|
|
@ -48,7 +48,7 @@ end_per_suite(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_testcase(t_restart_resource, Config) ->
|
init_per_testcase(t_restart_resource, Config) ->
|
||||||
persistent_term:put({emqx_rule_engine, resource_restart_interval}, 100),
|
emqx_rule_monitor:put_retry_interval(100),
|
||||||
Opts = [public, named_table, set, {read_concurrency, true}],
|
Opts = [public, named_table, set, {read_concurrency, true}],
|
||||||
_ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
|
_ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
|
||||||
ets:new(t_restart_resource, [named_table, public]),
|
ets:new(t_restart_resource, [named_table, public]),
|
||||||
|
@ -77,7 +77,6 @@ init_per_testcase(_, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(t_restart_resource, Config) ->
|
end_per_testcase(t_restart_resource, Config) ->
|
||||||
persistent_term:put({emqx_rule_engine, resource_restart_interval}, 60000),
|
|
||||||
ets:delete(t_restart_resource),
|
ets:delete(t_restart_resource),
|
||||||
common_end_per_testcases(),
|
common_end_per_testcases(),
|
||||||
Config;
|
Config;
|
||||||
|
@ -91,7 +90,9 @@ end_per_testcase(_, Config) ->
|
||||||
|
|
||||||
common_init_per_testcase() ->
|
common_init_per_testcase() ->
|
||||||
{ok, _} = emqx_rule_monitor:start_link().
|
{ok, _} = emqx_rule_monitor:start_link().
|
||||||
|
|
||||||
common_end_per_testcases() ->
|
common_end_per_testcases() ->
|
||||||
|
emqx_rule_monitor:erase_retry_interval(),
|
||||||
emqx_rule_monitor:stop().
|
emqx_rule_monitor:stop().
|
||||||
|
|
||||||
t_restart_resource(_) ->
|
t_restart_resource(_) ->
|
||||||
|
|
|
@ -349,7 +349,7 @@ t_1000_msg_send(_) ->
|
||||||
receive
|
receive
|
||||||
{deliver, Topic, _Msg}->
|
{deliver, Topic, _Msg}->
|
||||||
ok
|
ok
|
||||||
after 100 ->
|
after 5000 ->
|
||||||
?assert(false, "waiting message timeout")
|
?assert(false, "waiting message timeout")
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
|
|
11
build
11
build
|
@ -62,7 +62,18 @@ log() {
|
||||||
echo "===< $msg"
|
echo "===< $msg"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
delete_unwanted_file() {
|
||||||
|
if [ -e "${1}" ]; then
|
||||||
|
log "Deleting file: ${1}"
|
||||||
|
rm -f "${1}"
|
||||||
|
else
|
||||||
|
log "Cannot delete file: ${1} -- file not found"
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
make_rel() {
|
make_rel() {
|
||||||
|
./rebar3 as "$PROFILE" release
|
||||||
|
delete_unwanted_file _build/"${PROFILE}"/rel/emqx/lib/certifi*/priv/cacerts.pem
|
||||||
./rebar3 as "$PROFILE" tar
|
./rebar3 as "$PROFILE" tar
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,13 +36,15 @@
|
||||||
For example: `acl_order = jwt,http`, this will make sure `jwt` is always checked before `http`,
|
For example: `acl_order = jwt,http`, this will make sure `jwt` is always checked before `http`,
|
||||||
meaning if JWT is not found (or no `acl` cliam) for a client, then the ACL check will fallback to use the HTTP backend.
|
meaning if JWT is not found (or no `acl` cliam) for a client, then the ACL check will fallback to use the HTTP backend.
|
||||||
|
|
||||||
|
|
||||||
- Added configurations to enable more `client.disconnected` events (and counter bumps) [#9267](https://github.com/emqx/emqx/pull/9267).
|
- Added configurations to enable more `client.disconnected` events (and counter bumps) [#9267](https://github.com/emqx/emqx/pull/9267).
|
||||||
Prior to this change, the `client.disconnected` event (and counter bump) is triggered when a client
|
Prior to this change, the `client.disconnected` event (and counter bump) is triggered when a client
|
||||||
performs a 'normal' disconnect, or is 'kicked' by system admin, but NOT triggered when a
|
performs a 'normal' disconnect, or is 'kicked' by system admin, but NOT triggered when a
|
||||||
stale connection had to be 'discarded' (for clean session) or 'takenover' (for non-clean session).
|
stale connection had to be 'discarded' (for clean session) or 'takenover' (for non-clean session).
|
||||||
Now it is possible to set configs `broker.client_disconnect_discarded` and `broker.client_disconnect_takenover` to `on` to enable the event in these scenarios.
|
Now it is possible to set configs `broker.client_disconnect_discarded` and `broker.client_disconnect_takenover` to `on` to enable the event in these scenarios.
|
||||||
|
|
||||||
|
- For Rule-Engine resource creation failure, delay before the first retry [#9313](https://github.com/emqx/emqx/pull/9313).
|
||||||
|
Prior to this change, the retry delay was added *after* the retry failure.
|
||||||
|
|
||||||
## Bug fixes
|
## Bug fixes
|
||||||
|
|
||||||
- Fix that after uploading a backup file with an non-ASCII filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
|
- Fix that after uploading a backup file with an non-ASCII filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
|
||||||
|
|
|
@ -37,6 +37,9 @@
|
||||||
但不会在旧 session 被废弃 (clean_session = true) 或旧 session 被接管 (clean_session = false) 时被触发。
|
但不会在旧 session 被废弃 (clean_session = true) 或旧 session 被接管 (clean_session = false) 时被触发。
|
||||||
可将 `broker.client_disconnect_discarded` 和 `broker.client_disconnect_takovered` 选项设置为 `on` 来启用此场景下的客户端断连事件。
|
可将 `broker.client_disconnect_discarded` 和 `broker.client_disconnect_takovered` 选项设置为 `on` 来启用此场景下的客户端断连事件。
|
||||||
|
|
||||||
|
- 规则引擎资源创建失败后,第一次重试前增加一个延迟 [#9313](https://github.com/emqx/emqx/pull/9313)。
|
||||||
|
在此之前,重试的延迟发生在重试失败之后。
|
||||||
|
|
||||||
## 修复
|
## 修复
|
||||||
|
|
||||||
- 修复若上传的备份文件名中包含非 ASCII 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
|
- 修复若上传的备份文件名中包含非 ASCII 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
|
||||||
|
|
|
@ -13,8 +13,8 @@ type: application
|
||||||
|
|
||||||
# This is the chart version. This version number should be incremented each time you make changes
|
# This is the chart version. This version number should be incremented each time you make changes
|
||||||
# to the chart and its templates, including the app version.
|
# to the chart and its templates, including the app version.
|
||||||
version: 4.4.10
|
version: 4.4.11
|
||||||
|
|
||||||
# This is the version number of the application being deployed. This version number should be
|
# This is the version number of the application being deployed. This version number should be
|
||||||
# incremented each time you make changes to the application.
|
# incremented each time you make changes to the application.
|
||||||
appVersion: 4.4.10
|
appVersion: 4.4.11
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
|
|
||||||
-ifndef(EMQX_ENTERPRISE).
|
-ifndef(EMQX_ENTERPRISE).
|
||||||
|
|
||||||
-define(EMQX_RELEASE, {opensource, "4.4.10"}).
|
-define(EMQX_RELEASE, {opensource, "4.4.11-alpha.1"}).
|
||||||
|
|
||||||
-else.
|
-else.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue