From 4bfd7dd14a3a3ef63ecdb047d8efce6a972c3c63 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 21 Mar 2023 17:05:13 +0800 Subject: [PATCH 1/5] fix: create dedicated pool for retainer --- apps/emqx_retainer/include/emqx_retainer.hrl | 1 + apps/emqx_retainer/src/emqx_retainer.erl | 2 +- apps/emqx_retainer/src/emqx_retainer_sup.erl | 31 +++++++++++++++----- src/emqx_pool.erl | 26 +++++++++++++--- 4 files changed, 47 insertions(+), 13 deletions(-) diff --git a/apps/emqx_retainer/include/emqx_retainer.hrl b/apps/emqx_retainer/include/emqx_retainer.hrl index d25aecc6b..dbaf8937e 100644 --- a/apps/emqx_retainer/include/emqx_retainer.hrl +++ b/apps/emqx_retainer/include/emqx_retainer.hrl @@ -16,5 +16,6 @@ -define(APP, emqx_retainer). -define(TAB, ?APP). +-define(POOL, retainer_worker_pool). -record(retained, {topic, msg, expiry_time}). diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 84e49c5be..281aacb6b 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -69,7 +69,7 @@ on_session_subscribed(_, _, #{share := ShareName}) when ShareName =/= undefined ok; on_session_subscribed(_, Topic, #{rh := Rh, is_new := IsNew}) -> case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of - true -> emqx_pool:async_submit(fun ?MODULE:dispatch/2, [self(), Topic]); + true -> emqx_pool:async_submit(?POOL, fun ?MODULE:dispatch/2, [self(), Topic]); _ -> ok end. diff --git a/apps/emqx_retainer/src/emqx_retainer_sup.erl b/apps/emqx_retainer/src/emqx_retainer_sup.erl index 550858afa..01e8770e6 100644 --- a/apps/emqx_retainer/src/emqx_retainer_sup.erl +++ b/apps/emqx_retainer/src/emqx_retainer_sup.erl @@ -18,21 +18,36 @@ -behaviour(supervisor). --export([start_link/1]). +-include("emqx_retainer.hrl"). +-export([start_link/1]). -export([init/1]). start_link(Env) -> supervisor:start_link({local, ?MODULE}, ?MODULE, [Env]). init([Env]) -> - {ok, {{one_for_one, 10, 3600}, - [#{id => retainer, - start => {emqx_retainer, start_link, [Env]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_retainer]} || not is_managed_by_modules()]}}. + Retainer = #{ + id => retainer, + start => {emqx_retainer, start_link, [Env]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_retainer] + }, + WorkerPool = #{ + id => ?POOL, + start => {emqx_pool_sup, start_link, [?POOL, random, {emqx_pool, start_link, []}]}, + restart => permanent, + shutdown => 5000, + type => supervisor, + modules => [emqx_pool_sup] + }, + ChildSpecs = case is_managed_by_modules() of + false -> [Retainer, WorkerPool]; + true -> [WorkerPool] + end, + {ok, {{one_for_one, 10, 3600}, ChildSpecs}}. -ifdef(EMQX_ENTERPRISE). diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index ae1ed1d23..0872b4186 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -28,8 +28,10 @@ -export([ submit/1 , submit/2 + , submit/3 , async_submit/1 , async_submit/2 + , async_submit/3 ]). -ifdef(TEST). @@ -56,7 +58,7 @@ %% @doc Start pool. -spec(start_link(atom(), pos_integer()) -> startlink_ret()). start_link(Pool, Id) -> - gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, + gen_server:start_link({local, emqx_misc:proc_name(Pool, Id)}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). %% @doc Submit work to the pool. @@ -68,9 +70,9 @@ submit(Task) -> submit(Fun, Args) -> call({submit, {Fun, Args}}). -%% @private -call(Req) -> - gen_server:call(worker(), Req, infinity). +-spec(submit(atom(), fun(), list(any())) -> any()). +submit(Pool, Fun, Args) -> + call(Pool, {submit, {Fun, Args}}). %% @doc Submit work to the pool asynchronously. -spec(async_submit(task()) -> ok). @@ -81,14 +83,30 @@ async_submit(Task) -> async_submit(Fun, Args) -> cast({async_submit, {Fun, Args}}). +-spec(async_submit(atom(), fun(), list(any())) -> ok). +async_submit(Pool, Fun, Args) -> + cast(Pool, {async_submit, {Fun, Args}}). + +%% @private +call(Req) -> + gen_server:call(worker(), Req, infinity). + +call(Pool, Req) -> + gen_server:call(worker(Pool), Req, infinity). + %% @private cast(Msg) -> gen_server:cast(worker(), Msg). +cast(Pool, Msg) -> + gen_server:cast(worker(Pool), Msg). %% @private worker() -> gproc_pool:pick_worker(?POOL). +worker(Pool) -> + gproc_pool:pick_worker(Pool). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- From 3e24d287a357ca3b62dc370e8b6e62e6c27077b5 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 21 Mar 2023 17:08:39 +0800 Subject: [PATCH 2/5] fix: hide ws_cookie from logs --- apps/emqx_auth_mnesia/src/emqx_auth_mnesia.erl | 2 +- .../test/emqx_rule_engine_jwt_worker_SUITE.erl | 2 +- src/emqx.appup.src | 14 ++++++++++++-- src/emqx_misc.erl | 3 +++ 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.erl index bd1215d70..dc62ad77d 100644 --- a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.erl @@ -60,7 +60,7 @@ check(ClientInfo = #{ clientid := Clientid MatchSpec = ets:fun2ms(fun({?TABLE, {clientid, X}, Password, InterTime}) when X =:= Clientid-> Password; ({?TABLE, {username, X}, Password, InterTime}) when X =:= Username andalso X =/= undefined -> Password end), - Info = maps:without([password], ClientInfo), + Info = maps:without([password, ws_cookie], ClientInfo), case ets:select(?TABLE, MatchSpec) of [] -> ?LOG(debug, "[Mnesia] Auth ignored, Client: ~p", [Info]); diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl index 5b810ddae..8ed4f197d 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl @@ -82,7 +82,7 @@ t_create_success(_Config) -> {Ref, token_created} -> ok after - 1_000 -> + 5_000 -> ct:fail("should have confirmed token creation; msgs: ~0p", [process_info(self(), messages)]) end, diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 8dc543d4d..2530b9ec6 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,8 +1,13 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.15", + [{"4.4.16", + [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_relup,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, + {"4.4.15", [{load_module,emqx,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, @@ -499,8 +504,13 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {<<".*">>,[]}], - [{"4.4.15", + [{"4.4.16", + [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_relup,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, + {"4.4.15", [{load_module,emqx,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 4d8f9f464..09ce4a457 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -507,6 +507,9 @@ safe_io_device() -> standard_error end. +is_sensitive_key(ws_cookie) -> true; +is_sensitive_key("ws_cookie") -> true; +is_sensitive_key(<<"ws_cookie">>) -> true; is_sensitive_key(token) -> true; is_sensitive_key("token") -> true; is_sensitive_key(<<"token">>) -> true; From 0eee57e4f7cd0b198bf835fca46229e2efa9768e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 21 Mar 2023 17:15:26 +0800 Subject: [PATCH 3/5] chore: update appup files --- .../src/emqx_auth_mnesia.app.src | 2 +- .../src/emqx_auth_mnesia.appup.src | 6 +- apps/emqx_retainer/src/emqx_retainer.app.src | 2 +- .../emqx_retainer/src/emqx_retainer.appup.src | 44 +++++--- src/emqx.appup.src | 102 ++++++++++++------ 5 files changed, 104 insertions(+), 52 deletions(-) diff --git a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src index 09ad7b779..da11b7d8a 100644 --- a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src +++ b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_mnesia, [{description, "EMQX Authentication with Mnesia"}, - {vsn, "4.3.12"}, % strict semver, bump manually + {vsn, "4.3.13"}, % strict semver, bump manually {modules, []}, {registered, []}, {applications, [kernel,stdlib,mnesia]}, diff --git a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.appup.src b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.appup.src index 74d194dd3..fe2614a08 100644 --- a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.appup.src +++ b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.appup.src @@ -1,7 +1,8 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.11", + [{"4.3.12",[{load_module,emqx_auth_mnesia,brutal_purge,soft_purge,[]}]}, + {"4.3.11", [{load_module,emqx_auth_mnesia,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_mnesia,brutal_purge,soft_purge,[]}]}, {"4.3.10", @@ -50,7 +51,8 @@ {load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_mnesia_app,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.11", + [{"4.3.12",[{load_module,emqx_auth_mnesia,brutal_purge,soft_purge,[]}]}, + {"4.3.11", [{load_module,emqx_auth_mnesia,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_mnesia,brutal_purge,soft_purge,[]}]}, {"4.3.10", diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 1f1e5821b..5f206a155 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -1,6 +1,6 @@ {application, emqx_retainer, [{description, "EMQX Retainer"}, - {vsn, "4.4.5"}, % strict semver, bump manually! + {vsn, "4.4.6"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_retainer/src/emqx_retainer.appup.src b/apps/emqx_retainer/src/emqx_retainer.appup.src index 54413bfa2..e732189a1 100644 --- a/apps/emqx_retainer/src/emqx_retainer.appup.src +++ b/apps/emqx_retainer/src/emqx_retainer.appup.src @@ -1,25 +1,41 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.3",[{load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, - {"4.4.2",[{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]} - ]}, - {"4.4.1",[{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]} - ]}, + [{"4.4.5", + [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, + {"4.4.4", + [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, + {"4.4.3", + [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, + {"4.4.2", + [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, + {"4.4.1", + [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, {"4.4.0", [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, {load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.4.3",[{load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, - {"4.4.2",[{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]} - ]}, - {"4.4.1",[{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]} - ]}, + [{"4.4.5", + [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, + {"4.4.4", + [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, + {"4.4.3", + [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, + {"4.4.2", + [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, + {"4.4.1", + [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, {"4.4.0", [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, {load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]}, diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 2530b9ec6..96e8a8a17 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,17 +2,20 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.16", - [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.15", - [{load_module,emqx,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.14", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -25,7 +28,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]}, {"4.4.13", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -39,7 +43,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.12", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -53,7 +58,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.11", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -70,7 +76,8 @@ {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.4.10", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -96,7 +103,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.9", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -128,7 +136,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.8", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -161,7 +170,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.7", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -194,7 +204,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.6", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -227,7 +238,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.5", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -262,7 +274,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.4", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, @@ -303,7 +316,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.3", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, @@ -350,7 +364,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.2", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, @@ -398,7 +413,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.1", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {add_module,emqx_cover}, @@ -450,7 +466,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.0", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {add_module,emqx_cover}, @@ -505,17 +522,20 @@ [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {<<".*">>,[]}], [{"4.4.16", - [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.15", - [{load_module,emqx,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.14", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -528,7 +548,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]}, {"4.4.13", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -542,7 +563,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.12", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -556,7 +578,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.11", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -573,7 +596,8 @@ {load_module,emqx_session,brutal_purge,soft_purge,[]}, {delete_module,emqx_cover}]}, {"4.4.10", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -596,7 +620,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.9", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -624,7 +649,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.8", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -653,7 +679,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.7", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -682,7 +709,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.6", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -711,7 +739,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.5", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -742,7 +771,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.4", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, @@ -779,7 +809,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.3", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, @@ -821,7 +852,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.2", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, @@ -864,7 +896,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.1", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]}, @@ -911,7 +944,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.0", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]}, From 0b2660f64d871dc6f060fa66a6cc0e12425996ed Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 21 Mar 2023 18:47:28 +0800 Subject: [PATCH 4/5] chore: start emqx_retainer_worker_pool when relup --- .../emqx_retainer/src/emqx_retainer.appup.src | 50 ++++++------------- apps/emqx_retainer/src/emqx_retainer_sup.erl | 31 +++++++++--- 2 files changed, 39 insertions(+), 42 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer.appup.src b/apps/emqx_retainer/src/emqx_retainer.appup.src index e732189a1..3609fa78a 100644 --- a/apps/emqx_retainer/src/emqx_retainer.appup.src +++ b/apps/emqx_retainer/src/emqx_retainer.appup.src @@ -1,43 +1,25 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.5", + [{<<"4\\.4\\.[1-5]">>, [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, - {"4.4.4", - [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, - {"4.4.3", - [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, - {"4.4.2", - [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, - {"4.4.1", - [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, + {apply,{emqx_retainer_sup,ensure_worker_pool_started,[]}}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]} + ]}, {"4.4.0", - [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_retainer_sup,ensure_worker_pool_started,[]}}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]} + ]}, {<<".*">>,[]}], - [{"4.4.5", + [{<<"4\\.4\\.[1-5]">>, [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, - {"4.4.4", - [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, - {"4.4.3", - [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, - {"4.4.2", - [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, - {"4.4.1", - [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]} + ]}, {"4.4.0", - [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]} + ]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_retainer/src/emqx_retainer_sup.erl b/apps/emqx_retainer/src/emqx_retainer_sup.erl index 01e8770e6..3dadd4d6d 100644 --- a/apps/emqx_retainer/src/emqx_retainer_sup.erl +++ b/apps/emqx_retainer/src/emqx_retainer_sup.erl @@ -20,12 +20,23 @@ -include("emqx_retainer.hrl"). --export([start_link/1]). +-export([ start_link/1 + , ensure_worker_pool_started/0 + , worker_pool_spec/0 + ]). + -export([init/1]). start_link(Env) -> supervisor:start_link({local, ?MODULE}, ?MODULE, [Env]). +ensure_worker_pool_started() -> + try + supervisor:start_child(?MODULE, worker_pool_spec()) + catch + _:_ -> ignore + end. + init([Env]) -> Retainer = #{ id => retainer, @@ -35,19 +46,22 @@ init([Env]) -> type => worker, modules => [emqx_retainer] }, - WorkerPool = #{ + WorkerPool = worker_pool_spec(), + ChildSpecs = case is_managed_by_modules() of + false -> [Retainer, WorkerPool]; + true -> [] + end, + {ok, {{one_for_one, 10, 3600}, ChildSpecs}}. + +worker_pool_spec() -> + #{ id => ?POOL, start => {emqx_pool_sup, start_link, [?POOL, random, {emqx_pool, start_link, []}]}, restart => permanent, shutdown => 5000, type => supervisor, modules => [emqx_pool_sup] - }, - ChildSpecs = case is_managed_by_modules() of - false -> [Retainer, WorkerPool]; - true -> [WorkerPool] - end, - {ok, {{one_for_one, 10, 3600}, ChildSpecs}}. + }. -ifdef(EMQX_ENTERPRISE). @@ -69,3 +83,4 @@ is_managed_by_modules() -> false. -endif. + From cb61e5deca5899bf44ed43bdea12b094188717f1 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 22 Mar 2023 10:51:10 +0800 Subject: [PATCH 5/5] chore: update the change logs --- apps/emqx_retainer/src/emqx_retainer_sup.erl | 1 + changes/v4.4.17-en.md | 8 +++++++- changes/v4.4.17-zh.md | 8 +++++++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer_sup.erl b/apps/emqx_retainer/src/emqx_retainer_sup.erl index 3dadd4d6d..02ced41ef 100644 --- a/apps/emqx_retainer/src/emqx_retainer_sup.erl +++ b/apps/emqx_retainer/src/emqx_retainer_sup.erl @@ -37,6 +37,7 @@ ensure_worker_pool_started() -> _:_ -> ignore end. +-dialyzer({no_match, [init/1]}). init([Env]) -> Retainer = #{ id => retainer, diff --git a/changes/v4.4.17-en.md b/changes/v4.4.17-en.md index 620ad6e38..16f235e3a 100644 --- a/changes/v4.4.17-en.md +++ b/changes/v4.4.17-en.md @@ -26,4 +26,10 @@ - Fix that `Erlang distribution` can't use TLS [#9981](https://github.com/emqx/emqx/pull/9981). About `Erlang distribution`, See [here](https://www.emqx.io/docs/en/v4.4/advanced/cluster.html#distributed-erlang) for details. -- Fixed MQTT bridge TLS connection could not verify wildcard certificate from peer[#10094](https://github.com/emqx/emqx/pull/10094). +- Fixed MQTT bridge TLS connection could not verify wildcard certificate from peer [#10094](https://github.com/emqx/emqx/pull/10094). + +- Fixed the problem that EMQX could not timely clear the disconnected MQTT connection information due to a large number of retained messages [#10189](https://github.com/emqx/emqx/pull/10189). + Before this fix, the `emqx_retainer` plugin and the connection cleanup process of emqx shared the same process pool. + Therefore, if the process pool was blocked by a large number of retain messages, many disconnected MQTT connections would not be cleaned up in time. + For more details, see [#9409](https://github.com/emqx/emqx/issues/9409). + In this fix, we created a separate process pool for the `emqx_retainer` plugin to avoid this problem. diff --git a/changes/v4.4.17-zh.md b/changes/v4.4.17-zh.md index df07cd348..c21a8fd8b 100644 --- a/changes/v4.4.17-zh.md +++ b/changes/v4.4.17-zh.md @@ -25,4 +25,10 @@ - 修复 `Erlang distribution` 无法使用 TLS 的问题 [#9981](https://github.com/emqx/emqx/pull/9981)。 关于 `Erlang distribution`, 详见 [这里](https://www.emqx.io/docs/zh/v4.4/advanced/cluster.html)。 -- 修正了 MQTT 桥接 TLS 连接无法验证对端的带通配符的证书[#10094](https://github.com/emqx/emqx/pull/10094)。 +- 修正了 MQTT 桥接 TLS 连接无法验证对端的带通配符的证书 [#10094](https://github.com/emqx/emqx/pull/10094)。 + +- 修复由于大量 retain 消息导致 EMQX 无法及时清除已掉线的 MQTT 连接信息的问题。[#10189](https://github.com/emqx/emqx/pull/10189)。 + 在此修复之前,`emqx_retainer` 插件和 emqx 的连接信息清理过程共用了同一个进程池,所以 + 如果该进程池被大量的 retain 消息下发任务阻塞时,许多已经掉线的 MQTT 连接将得不到及时清理。 + 问题详情见 [#9409](https://github.com/emqx/emqx/issues/9409)。 + 在此修复中,我们给 `emqx_retainer` 插件创建了单独的进程池,从而避免了该问题。