Merge pull request #9650 from thalesmg/fix-gcp-pubsub-leak-rv44

fix(gcp_pubsub): ensure jwt token is deleted after worker stops and always ensure refresh timer (rv4.4)
This commit is contained in:
Zaiming (Stone) Shi 2022-12-30 17:06:35 +01:00 committed by GitHub
commit 00412149f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 154 additions and 50 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_rule_engine,
[{description, "EMQ X Rule Engine"},
{vsn, "4.4.12"}, % strict semver, bump manually!
{vsn, "4.4.13"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_registry, emqx_rule_engine_jwt_sup]},
{applications, [kernel,stdlib,rulesql,getopt,jose]},

View File

@ -1,13 +1,20 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.11",[
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,ensure_api_delegator_started,[]}},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
]},
[{"4.4.12",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_jwt_worker,brutal_purge,soft_purge,[]},
{update,emqx_rule_engine_jwt_sup,supervisor},
{load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]}]},
{"4.4.11",
[{load_module,emqx_rule_engine_jwt_worker,brutal_purge,soft_purge,[]},
{update,emqx_rule_engine_jwt_sup,supervisor},
{load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,ensure_api_delegator_started,[]}},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.4.10",
[{add_module,emqx_rule_engine_jwt},
{add_module,emqx_rule_engine_jwt_worker},
@ -213,13 +220,20 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.11",[
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,ensure_api_delegator_stopped,[]}},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
]},
[{"4.4.12",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_jwt_worker,brutal_purge,soft_purge,[]},
{update,emqx_rule_engine_jwt_sup,supervisor},
{load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]}]},
{"4.4.11",
[{load_module,emqx_rule_engine_jwt_worker,brutal_purge,soft_purge,[]},
{update,emqx_rule_engine_jwt_sup,supervisor},
{load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,ensure_api_delegator_stopped,[]}},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.4.10",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,ensure_api_delegator_stopped,[]}},

View File

@ -22,6 +22,7 @@
%% API
-export([ lookup_jwt/1
, lookup_jwt/2
, delete_jwt/2
]).
-type jwt() :: binary().
@ -43,3 +44,13 @@ lookup_jwt(TId, ResourceId) ->
error:badarg ->
{error, not_found}
end.
-spec delete_jwt(ets:table(), resource_id()) -> ok.
delete_jwt(TId, ResourceId) ->
try
ets:delete(TId, {ResourceId, jwt}),
ok
catch
error:badarg ->
ok
end.

View File

@ -62,8 +62,10 @@ ensure_worker_present(Id, Config) ->
-spec ensure_worker_deleted(worker_id()) -> ok.
ensure_worker_deleted(Id) ->
case supervisor:terminate_child(?MODULE, Id) of
ok -> ok;
{error, not_found} -> ok
ok ->
ok = supervisor:delete_child(?MODULE, Id);
{error, not_found} ->
ok
end.
jwt_worker_child_spec(Id, Config) ->
@ -72,7 +74,7 @@ jwt_worker_child_spec(Id, Config) ->
, restart => transient
, type => worker
, significant => false
, shutdown => brutal_kill
, shutdown => 5_000
, modules => [emqx_rule_engine_jwt_worker]
}.

View File

@ -31,8 +31,11 @@
, handle_info/2
, format_status/1
, format_status/2
, terminate/2
]).
-export([force_refresh/1]).
-include_lib("jose/include/jose_jwk.hrl").
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
@ -49,7 +52,7 @@
, alg := binary()
}.
-type jwt() :: binary().
-type state() :: #{ refresh_timer := undefined | timer:tref()
-type state() :: #{ refresh_timer := undefined | timer:tref() | reference()
, resource_id := resource_id()
, expiration := timer:time()
, table := ets:table()
@ -88,6 +91,11 @@ ensure_jwt(Worker) ->
gen_server:cast(Worker, {ensure_jwt, Ref}),
Ref.
-spec force_refresh(pid()) -> ok.
force_refresh(Worker) ->
_ = erlang:send(Worker, {timeout, force_refresh, ?refresh_jwt}),
ok.
%%-----------------------------------------------------------------------------------------
%% gen_server API
%%-----------------------------------------------------------------------------------------
@ -95,6 +103,7 @@ ensure_jwt(Worker) ->
-spec init(config()) -> {ok, state(), {continue, {make_key, binary()}}}
| {stop, {error, term()}}.
init(#{private_key := PrivateKeyPEM} = Config) ->
process_flag(trap_exit, true),
State0 = maps:without([private_key], Config),
State = State0#{ jwk => undefined
, jwt => undefined
@ -139,7 +148,7 @@ handle_cast({ensure_jwt, From}, State0 = #{jwt := JWT}) ->
handle_cast(_Req, State) ->
{noreply, State}.
handle_info({timeout, TRef, ?refresh_jwt}, State0 = #{refresh_timer := TRef}) ->
handle_info({timeout, _TRef, ?refresh_jwt}, State0) ->
State = generate_and_store_jwt(State0),
{noreply, State};
handle_info(_Msg, State) ->
@ -152,6 +161,11 @@ format_status(_Opt, [_PDict, State0]) ->
State = censor_secrets(State0),
[{data, [{"State", State}]}].
terminate(_Reason, State) ->
#{resource_id := ResourceId, table := TId} = State,
emqx_rule_engine_jwt:delete_jwt(TId, ResourceId),
ok.
%%-----------------------------------------------------------------------------------------
%% Helper fns
%%-----------------------------------------------------------------------------------------
@ -195,14 +209,13 @@ store_jwt(#{resource_id := ResourceId, table := TId}, JWT) ->
ok.
-spec ensure_timer(state()) -> state().
ensure_timer(State = #{ refresh_timer := undefined
ensure_timer(State = #{ refresh_timer := OldTimer
, expiration := ExpirationMS0
}) ->
ExpirationMS = max(5_000, ExpirationMS0 - 5_000),
ok = cancel_timer(OldTimer),
ExpirationMS = max(5_000, ExpirationMS0 - 60_000),
TRef = erlang:start_timer(ExpirationMS, self(), ?refresh_jwt),
State#{refresh_timer => TRef};
ensure_timer(State) ->
State.
State#{refresh_timer => TRef}.
-spec censor_secrets(state()) -> map().
censor_secrets(State) ->
@ -214,3 +227,10 @@ censor_secrets(State) ->
Value
end,
State).
-spec cancel_timer(undefined | timer:tref() | reference()) -> ok.
cancel_timer(undefined) ->
ok;
cancel_timer(TRef) ->
_ = erlang:cancel_timer(TRef),
ok.

View File

@ -140,12 +140,22 @@ t_refresh(_Config) ->
{ok, SecondJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId),
?assertNot(is_expired(SecondJWT)),
?assert(is_expired(FirstJWT)),
{FirstJWT, SecondJWT}
%% check yet another refresh to ensure the timer was properly
%% reset.
?block_until(#{?snk_kind := rule_engine_jwt_worker_refresh,
jwt := JWT1} when JWT1 =/= SecondJWT
andalso JWT1 =/= FirstJWT, 15_000),
{ok, ThirdJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId),
?assertNot(is_expired(ThirdJWT)),
?assert(is_expired(SecondJWT)),
{FirstJWT, SecondJWT, ThirdJWT}
end,
fun({FirstJWT, SecondJWT}, Trace) ->
?assertMatch([_, _ | _],
fun({FirstJWT, SecondJWT, ThirdJWT}, Trace) ->
?assertMatch([_, _, _ | _],
?of_kind(rule_engine_jwt_worker_token_stored, Trace)),
?assertNotEqual(FirstJWT, SecondJWT),
?assertNotEqual(SecondJWT, ThirdJWT),
?assertNotEqual(FirstJWT, ThirdJWT),
ok
end),
ok.
@ -225,7 +235,7 @@ t_lookup_badarg(_Config) ->
t_start_supervised_worker(_Config) ->
{ok, _} = emqx_rule_engine_jwt_sup:start_link(),
Config = #{resource_id := ResourceId} = generate_config(),
Config = #{resource_id := ResourceId, table := TId} = generate_config(),
{ok, Pid} = emqx_rule_engine_jwt_sup:ensure_worker_present(ResourceId, Config),
Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Pid),
receive
@ -237,6 +247,7 @@ t_start_supervised_worker(_Config) ->
end,
MRef = monitor(process, Pid),
?assert(is_process_alive(Pid)),
?assertMatch({ok, _}, emqx_rule_engine_jwt:lookup_jwt(TId, ResourceId)),
ok = emqx_rule_engine_jwt_sup:ensure_worker_deleted(ResourceId),
receive
{'DOWN', MRef, process, Pid, _} ->
@ -245,4 +256,9 @@ t_start_supervised_worker(_Config) ->
1_000 ->
ct:fail("timeout")
end,
%% ensure it cleans up its own tokens to avoid leakage when
%% probing/testing rule resources.
?assertEqual({error, not_found}, emqx_rule_engine_jwt:lookup_jwt(TId, ResourceId)),
%% ensure the specs are removed from the supervision tree.
?assertEqual([], supervisor:which_children(emqx_rule_engine_jwt_sup)),
ok.

8
changes/v4.4.13-en.md Normal file
View File

@ -0,0 +1,8 @@
# v4.4.13
## Enhancements
## Bug Fixes
- Fix an issue where testing the GCP PubSub could leak memory, and an issue where its JWT token would fail to refresh a second time. [#9640](https://github.com/emqx/emqx/pull/9640)

8
changes/v4.4.13-zh.md Normal file
View File

@ -0,0 +1,8 @@
# v4.4.13
## 增强
## 修复
- 修复了测试GCP PubSub可能泄露内存的问题以及其JWT令牌第二次刷新失败的问题。 [#9640](https://github.com/emqx/emqx/pull/9640)

View File

@ -51,7 +51,13 @@
<<"4.4.5">>,<<"4.4.6">>,<<"4.4.7">>,<<"4.4.8">>,<<"4.4.9">>,
<<"4.4.10">>,<<"4.4.11">>],
otp => <<"24.3.4.2-1">>}}.
{<<"4.5.0">>,
#{from_versions => [<<"4.4.8">>,<<"4.4.9">>,<<"4.4.10">>,
<<"4.4.11">>],
{<<"4.4.13">>,
#{from_versions =>
[<<"4.4.0">>,<<"4.4.1">>,<<"4.4.10">>,<<"4.4.11">>,<<"4.4.12">>,
<<"4.4.2">>,<<"4.4.3">>,<<"4.4.4">>,<<"4.4.5">>,<<"4.4.6">>,
<<"4.4.7">>,<<"4.4.8">>,<<"4.4.9">>],
otp => <<"24.3.4.2-1">>}}.
{<<"4.5.0">>,
#{from_versions =>
[<<"4.4.10">>,<<"4.4.11">>,<<"4.4.13">>,<<"4.4.8">>,<<"4.4.9">>],
otp => <<"24.3.4.2-1">>}}.

View File

@ -13,8 +13,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 4.4.12
version: 4.4.13
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 4.4.12
appVersion: 4.4.13

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.4.12"}).
-define(EMQX_RELEASE, {opensource, "4.4.13"}).
-else.

View File

@ -1,6 +1,6 @@
{application, emqx_dashboard,
[{description, "EMQX Web Dashboard"},
{vsn, "4.4.11"}, % strict semver, bump manually!
{vsn, "4.4.12"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_dashboard_sup]},
{applications, [kernel,stdlib,mnesia,minirest]},

View File

@ -6,7 +6,7 @@
%% the emqx `release' version, which in turn is comprised of several
%% apps, one of which is this. See `emqx_release.hrl' for more
%% info.
{vsn, "4.4.12"}, % strict semver, bump manually!
{vsn, "4.4.13"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [ kernel

View File

@ -1,7 +1,11 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.11",
[{"4.4.12",
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.11",
[{add_module,emqx_cover},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
@ -27,7 +31,8 @@
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]},
{apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.9",
[{add_module,emqx_cover},
{add_module,emqx_ocsp_cache},
@ -52,7 +57,8 @@
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]},
{apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.8",
[{add_module,emqx_cover},
{add_module,emqx_ocsp_cache},
@ -78,7 +84,8 @@
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]},
{apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.7",
[{add_module,emqx_cover},
{add_module,emqx_ocsp_cache},
@ -104,7 +111,8 @@
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]},
{apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.6",
[{add_module,emqx_cover},
{add_module,emqx_ocsp_cache},
@ -130,7 +138,8 @@
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]},
{apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.5",
[{add_module,emqx_cover},
{add_module,emqx_ocsp_cache},
@ -158,7 +167,8 @@
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]},
{apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.4",
[{add_module,emqx_cover},
{add_module,emqx_ocsp_cache},
@ -193,7 +203,8 @@
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]},
{apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.3",
[{add_module,emqx_cover},
{add_module,emqx_ocsp_cache},
@ -235,7 +246,8 @@
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]},
{apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.2",
[{add_module,emqx_cover},
{add_module,emqx_ocsp_cache},
@ -278,7 +290,8 @@
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]},
{apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.1",
[{add_module,emqx_cover},
{add_module,emqx_ocsp_cache},
@ -326,7 +339,8 @@
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{add_module,emqx_relup},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]},
{apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.0",
[{add_module,emqx_cover},
{add_module,emqx_ocsp_cache},
@ -376,9 +390,14 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{apply, {application, set_env, [gen_rpc, insecure_auth_fallback_allowed, true]}}]},
{apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{<<".*">>,[]}],
[{"4.4.11",
[{"4.4.12",
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.11",
[{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},