diff --git a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src index 09ad7b779..da11b7d8a 100644 --- a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src +++ b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_mnesia, [{description, "EMQX Authentication with Mnesia"}, - {vsn, "4.3.12"}, % strict semver, bump manually + {vsn, "4.3.13"}, % strict semver, bump manually {modules, []}, {registered, []}, {applications, [kernel,stdlib,mnesia]}, diff --git a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.appup.src b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.appup.src index 74d194dd3..fe2614a08 100644 --- a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.appup.src +++ b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.appup.src @@ -1,7 +1,8 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.11", + [{"4.3.12",[{load_module,emqx_auth_mnesia,brutal_purge,soft_purge,[]}]}, + {"4.3.11", [{load_module,emqx_auth_mnesia,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_mnesia,brutal_purge,soft_purge,[]}]}, {"4.3.10", @@ -50,7 +51,8 @@ {load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_mnesia_app,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.11", + [{"4.3.12",[{load_module,emqx_auth_mnesia,brutal_purge,soft_purge,[]}]}, + {"4.3.11", [{load_module,emqx_auth_mnesia,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_mnesia,brutal_purge,soft_purge,[]}]}, {"4.3.10", diff --git a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.erl index bd1215d70..dc62ad77d 100644 --- a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.erl @@ -60,7 +60,7 @@ check(ClientInfo = #{ clientid := Clientid MatchSpec = ets:fun2ms(fun({?TABLE, {clientid, X}, Password, InterTime}) when X =:= Clientid-> Password; ({?TABLE, {username, X}, Password, InterTime}) when X =:= Username andalso X =/= undefined -> Password end), - Info = maps:without([password], ClientInfo), + Info = maps:without([password, ws_cookie], ClientInfo), case ets:select(?TABLE, MatchSpec) of [] -> ?LOG(debug, "[Mnesia] Auth ignored, Client: ~p", [Info]); diff --git a/apps/emqx_retainer/include/emqx_retainer.hrl b/apps/emqx_retainer/include/emqx_retainer.hrl index d25aecc6b..dbaf8937e 100644 --- a/apps/emqx_retainer/include/emqx_retainer.hrl +++ b/apps/emqx_retainer/include/emqx_retainer.hrl @@ -16,5 +16,6 @@ -define(APP, emqx_retainer). -define(TAB, ?APP). +-define(POOL, retainer_worker_pool). -record(retained, {topic, msg, expiry_time}). diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 1f1e5821b..5f206a155 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -1,6 +1,6 @@ {application, emqx_retainer, [{description, "EMQX Retainer"}, - {vsn, "4.4.5"}, % strict semver, bump manually! + {vsn, "4.4.6"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_retainer/src/emqx_retainer.appup.src b/apps/emqx_retainer/src/emqx_retainer.appup.src index 54413bfa2..3609fa78a 100644 --- a/apps/emqx_retainer/src/emqx_retainer.appup.src +++ b/apps/emqx_retainer/src/emqx_retainer.appup.src @@ -1,27 +1,25 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.3",[{load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, - {"4.4.2",[{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]} - ]}, - {"4.4.1",[{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]} - ]}, + [{<<"4\\.4\\.[1-5]">>, + [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_retainer_sup,ensure_worker_pool_started,[]}}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]} + ]}, {"4.4.0", - [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_retainer_sup,ensure_worker_pool_started,[]}}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]} + ]}, {<<".*">>,[]}], - [{"4.4.3",[{load_module,emqx_retainer,brutal_purge,soft_purge,[]}]}, - {"4.4.2",[{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]} - ]}, - {"4.4.1",[{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]} - ]}, + [{<<"4\\.4\\.[1-5]">>, + [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]} + ]}, {"4.4.0", - [{load_module,emqx_retainer,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]}, - {load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer,brutal_purge,soft_purge,[]}, + {load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]} + ]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 84e49c5be..281aacb6b 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -69,7 +69,7 @@ on_session_subscribed(_, _, #{share := ShareName}) when ShareName =/= undefined ok; on_session_subscribed(_, Topic, #{rh := Rh, is_new := IsNew}) -> case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of - true -> emqx_pool:async_submit(fun ?MODULE:dispatch/2, [self(), Topic]); + true -> emqx_pool:async_submit(?POOL, fun ?MODULE:dispatch/2, [self(), Topic]); _ -> ok end. diff --git a/apps/emqx_retainer/src/emqx_retainer_sup.erl b/apps/emqx_retainer/src/emqx_retainer_sup.erl index 550858afa..02ced41ef 100644 --- a/apps/emqx_retainer/src/emqx_retainer_sup.erl +++ b/apps/emqx_retainer/src/emqx_retainer_sup.erl @@ -18,21 +18,51 @@ -behaviour(supervisor). --export([start_link/1]). +-include("emqx_retainer.hrl"). + +-export([ start_link/1 + , ensure_worker_pool_started/0 + , worker_pool_spec/0 + ]). -export([init/1]). start_link(Env) -> supervisor:start_link({local, ?MODULE}, ?MODULE, [Env]). +ensure_worker_pool_started() -> + try + supervisor:start_child(?MODULE, worker_pool_spec()) + catch + _:_ -> ignore + end. + +-dialyzer({no_match, [init/1]}). init([Env]) -> - {ok, {{one_for_one, 10, 3600}, - [#{id => retainer, - start => {emqx_retainer, start_link, [Env]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_retainer]} || not is_managed_by_modules()]}}. + Retainer = #{ + id => retainer, + start => {emqx_retainer, start_link, [Env]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_retainer] + }, + WorkerPool = worker_pool_spec(), + ChildSpecs = case is_managed_by_modules() of + false -> [Retainer, WorkerPool]; + true -> [] + end, + {ok, {{one_for_one, 10, 3600}, ChildSpecs}}. + +worker_pool_spec() -> + #{ + id => ?POOL, + start => {emqx_pool_sup, start_link, [?POOL, random, {emqx_pool, start_link, []}]}, + restart => permanent, + shutdown => 5000, + type => supervisor, + modules => [emqx_pool_sup] + }. -ifdef(EMQX_ENTERPRISE). @@ -54,3 +84,4 @@ is_managed_by_modules() -> false. -endif. + diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl index 5b810ddae..8ed4f197d 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl @@ -82,7 +82,7 @@ t_create_success(_Config) -> {Ref, token_created} -> ok after - 1_000 -> + 5_000 -> ct:fail("should have confirmed token creation; msgs: ~0p", [process_info(self(), messages)]) end, diff --git a/changes/v4.4.17-en.md b/changes/v4.4.17-en.md index 620ad6e38..16f235e3a 100644 --- a/changes/v4.4.17-en.md +++ b/changes/v4.4.17-en.md @@ -26,4 +26,10 @@ - Fix that `Erlang distribution` can't use TLS [#9981](https://github.com/emqx/emqx/pull/9981). About `Erlang distribution`, See [here](https://www.emqx.io/docs/en/v4.4/advanced/cluster.html#distributed-erlang) for details. -- Fixed MQTT bridge TLS connection could not verify wildcard certificate from peer[#10094](https://github.com/emqx/emqx/pull/10094). +- Fixed MQTT bridge TLS connection could not verify wildcard certificate from peer [#10094](https://github.com/emqx/emqx/pull/10094). + +- Fixed the problem that EMQX could not timely clear the disconnected MQTT connection information due to a large number of retained messages [#10189](https://github.com/emqx/emqx/pull/10189). + Before this fix, the `emqx_retainer` plugin and the connection cleanup process of emqx shared the same process pool. + Therefore, if the process pool was blocked by a large number of retain messages, many disconnected MQTT connections would not be cleaned up in time. + For more details, see [#9409](https://github.com/emqx/emqx/issues/9409). + In this fix, we created a separate process pool for the `emqx_retainer` plugin to avoid this problem. diff --git a/changes/v4.4.17-zh.md b/changes/v4.4.17-zh.md index df07cd348..c21a8fd8b 100644 --- a/changes/v4.4.17-zh.md +++ b/changes/v4.4.17-zh.md @@ -25,4 +25,10 @@ - 修复 `Erlang distribution` 无法使用 TLS 的问题 [#9981](https://github.com/emqx/emqx/pull/9981)。 关于 `Erlang distribution`, 详见 [这里](https://www.emqx.io/docs/zh/v4.4/advanced/cluster.html)。 -- 修正了 MQTT 桥接 TLS 连接无法验证对端的带通配符的证书[#10094](https://github.com/emqx/emqx/pull/10094)。 +- 修正了 MQTT 桥接 TLS 连接无法验证对端的带通配符的证书 [#10094](https://github.com/emqx/emqx/pull/10094)。 + +- 修复由于大量 retain 消息导致 EMQX 无法及时清除已掉线的 MQTT 连接信息的问题。[#10189](https://github.com/emqx/emqx/pull/10189)。 + 在此修复之前,`emqx_retainer` 插件和 emqx 的连接信息清理过程共用了同一个进程池,所以 + 如果该进程池被大量的 retain 消息下发任务阻塞时,许多已经掉线的 MQTT 连接将得不到及时清理。 + 问题详情见 [#9409](https://github.com/emqx/emqx/issues/9409)。 + 在此修复中,我们给 `emqx_retainer` 插件创建了单独的进程池,从而避免了该问题。 diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 8dc543d4d..96e8a8a17 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,13 +1,21 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.15", - [{load_module,emqx,brutal_purge,soft_purge,[]}, + [{"4.4.16", + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_relup,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, + {"4.4.15", + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.14", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -20,7 +28,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]}, {"4.4.13", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -34,7 +43,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.12", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -48,7 +58,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.11", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -65,7 +76,8 @@ {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.4.10", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -91,7 +103,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.9", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -123,7 +136,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.8", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -156,7 +170,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.7", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -189,7 +204,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.6", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -222,7 +238,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.5", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -257,7 +274,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.4", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, @@ -298,7 +316,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.3", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, @@ -345,7 +364,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.2", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, @@ -393,7 +413,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.1", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {add_module,emqx_cover}, @@ -445,7 +466,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.0", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {add_module,emqx_cover}, @@ -499,13 +521,21 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {<<".*">>,[]}], - [{"4.4.15", - [{load_module,emqx,brutal_purge,soft_purge,[]}, + [{"4.4.16", + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_relup,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, + {"4.4.15", + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.14", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -518,7 +548,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]}, {"4.4.13", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -532,7 +563,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.12", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -546,7 +578,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.11", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -563,7 +596,8 @@ {load_module,emqx_session,brutal_purge,soft_purge,[]}, {delete_module,emqx_cover}]}, {"4.4.10", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -586,7 +620,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.9", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -614,7 +649,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.8", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -643,7 +679,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.7", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -672,7 +709,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.6", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -701,7 +739,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.5", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -732,7 +771,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.4", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, @@ -769,7 +809,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.3", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, @@ -811,7 +852,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.2", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, @@ -854,7 +896,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.1", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]}, @@ -901,7 +944,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.0", - [{load_module,emqx_vm,brutal_purge,soft_purge,[]}, + [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_kernel_sup,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 4d8f9f464..09ce4a457 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -507,6 +507,9 @@ safe_io_device() -> standard_error end. +is_sensitive_key(ws_cookie) -> true; +is_sensitive_key("ws_cookie") -> true; +is_sensitive_key(<<"ws_cookie">>) -> true; is_sensitive_key(token) -> true; is_sensitive_key("token") -> true; is_sensitive_key(<<"token">>) -> true; diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index ae1ed1d23..0872b4186 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -28,8 +28,10 @@ -export([ submit/1 , submit/2 + , submit/3 , async_submit/1 , async_submit/2 + , async_submit/3 ]). -ifdef(TEST). @@ -56,7 +58,7 @@ %% @doc Start pool. -spec(start_link(atom(), pos_integer()) -> startlink_ret()). start_link(Pool, Id) -> - gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, + gen_server:start_link({local, emqx_misc:proc_name(Pool, Id)}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). %% @doc Submit work to the pool. @@ -68,9 +70,9 @@ submit(Task) -> submit(Fun, Args) -> call({submit, {Fun, Args}}). -%% @private -call(Req) -> - gen_server:call(worker(), Req, infinity). +-spec(submit(atom(), fun(), list(any())) -> any()). +submit(Pool, Fun, Args) -> + call(Pool, {submit, {Fun, Args}}). %% @doc Submit work to the pool asynchronously. -spec(async_submit(task()) -> ok). @@ -81,14 +83,30 @@ async_submit(Task) -> async_submit(Fun, Args) -> cast({async_submit, {Fun, Args}}). +-spec(async_submit(atom(), fun(), list(any())) -> ok). +async_submit(Pool, Fun, Args) -> + cast(Pool, {async_submit, {Fun, Args}}). + +%% @private +call(Req) -> + gen_server:call(worker(), Req, infinity). + +call(Pool, Req) -> + gen_server:call(worker(Pool), Req, infinity). + %% @private cast(Msg) -> gen_server:cast(worker(), Msg). +cast(Pool, Msg) -> + gen_server:cast(worker(Pool), Msg). %% @private worker() -> gproc_pool:pick_worker(?POOL). +worker(Pool) -> + gproc_pool:pick_worker(Pool). + %%-------------------------------------------------------------------- %% gen_server callbacks %%--------------------------------------------------------------------