From 7f4ffee7b51f51ba743f60bd4c376f8d7205264f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 26 Oct 2022 15:42:54 -0300 Subject: [PATCH 1/9] feat: add jwt worker for rule actions Will be used in GCP PubSub resource for EE. --- .../emqx_rule_engine/include/rule_actions.hrl | 2 + .../src/emqx_rule_engine.app.src | 2 +- .../src/emqx_rule_engine.appup.src | 110 ++++++-- .../src/emqx_rule_engine_jwt_sup.erl | 57 ++++ .../src/emqx_rule_engine_jwt_worker.erl | 220 ++++++++++++++++ .../src/emqx_rule_engine_sup.erl | 44 +++- .../src/emqx_rule_metrics.erl | 5 +- .../emqx_rule_engine_jwt_worker_SUITE.erl | 246 ++++++++++++++++++ changes/v4.4.11-en.md | 2 + changes/v4.4.11-zh.md | 4 +- test/emqx_node_helpers.erl | 24 +- 11 files changed, 684 insertions(+), 32 deletions(-) create mode 100644 apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl create mode 100644 apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl create mode 100644 apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl diff --git a/apps/emqx_rule_engine/include/rule_actions.hrl b/apps/emqx_rule_engine/include/rule_actions.hrl index 4571849d3..cd47e2123 100644 --- a/apps/emqx_rule_engine/include/rule_actions.hrl +++ b/apps/emqx_rule_engine/include/rule_actions.hrl @@ -28,3 +28,5 @@ -define(bound_v(Key, ENVS0), maps:get(Key, maps:get(?BINDING_KEYS, ENVS0, #{}))). + +-define(JWT_TABLE, emqx_rule_engine_jwt_table). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index b02edb456..1490c3e94 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ [{description, "EMQ X Rule Engine"}, {vsn, "4.4.11"}, % strict semver, bump manually! {modules, []}, - {registered, [emqx_rule_engine_sup, emqx_rule_registry]}, + {registered, [emqx_rule_engine_sup, emqx_rule_registry, emqx_rule_engine_jwt_sup]}, {applications, [kernel,stdlib,rulesql,getopt]}, {mod, {emqx_rule_engine_app, []}}, {env, []}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index c877c23a2..e14f41440 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,7 +2,11 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.10", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, @@ -12,7 +16,11 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.9", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, @@ -23,7 +31,11 @@ {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.8", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, @@ -35,7 +47,11 @@ {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {<<"4\\.4\\.[6-7]">>, - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -48,7 +64,11 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.5", - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -62,7 +82,11 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.4", - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -76,7 +100,11 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, {"4.4.3", - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -92,7 +120,11 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.2", - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -109,7 +141,11 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.1", - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -126,7 +162,11 @@ {add_module,emqx_rule_date}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.0", - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -149,37 +189,47 @@ {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}]}, {"4.4.9", [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}]}, {"4.4.8", [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}]}, {<<"4\\.4\\.[6-7]">>, [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -188,11 +238,14 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}]}, {"4.4.5", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -202,11 +255,14 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}]}, {"4.4.4", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -216,10 +272,13 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}]}, {"4.4.3", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -232,10 +291,13 @@ {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}]}, {"4.4.2", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -249,10 +311,13 @@ {delete_module,emqx_rule_date}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}]}, {"4.4.1", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -266,10 +331,13 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}]}, {"4.4.0", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -283,5 +351,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, - {delete_module,emqx_rule_date}]}, + {delete_module,emqx_rule_date}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl new file mode 100644 index 000000000..bdf38378c --- /dev/null +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl @@ -0,0 +1,57 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_jwt_sup). + +-behaviour(supervisor). + +-export([ start_link/0 + , start_worker/2 + , stop_worker/1 + ]). + +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + SupFlags = #{ strategy => one_for_one + , intensity => 10 + , period => 5 + , auto_shutdown => never + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +start_worker(Id, Config) -> + Ref = erlang:alias([reply]), + ChildSpec = jwt_worker_child_spec(Id, Config, Ref), + {ok, Pid} = supervisor:start_child(?MODULE, ChildSpec), + {Ref, Pid}. + +stop_worker(Id) -> + supervisor:terminate_child(?MODULE, Id). + +jwt_worker_child_spec(Id, Config, Ref) -> + #{ id => Id + , start => {emqx_rule_engine_jwt_worker, start_link, [Config, Ref]} + , restart => permanent + , type => worker + , significant => false + , shutdown => brutal_kill + , modules => [emqx_rule_engine_jwt_worker] + }. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl new file mode 100644 index 000000000..8c5f1b0af --- /dev/null +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl @@ -0,0 +1,220 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_jwt_worker). + +-behaviour(gen_server). + +%% API +-export([ start_link/2 + , lookup_jwt/1 + , lookup_jwt/2 + ]). + +%% gen_server API +-export([ init/1 + , handle_continue/2 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , format_status/1 + , format_status/2 + ]). + +-include_lib("jose/include/jose_jwk.hrl"). +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). +-include_lib("emqx_rule_engine/include/rule_actions.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-type config() :: #{ private_key := binary() + , resource_id := resource_id() + , expiration := timer:time() + , table := ets:table() + , iss := binary() + , sub := binary() + , aud := binary() + , kid := binary() + , alg := binary() + }. +-type jwt() :: binary(). +-type state() :: #{ refresh_timer := undefined | timer:tref() + , resource_id := resource_id() + , expiration := timer:time() + , table := ets:table() + , jwt := undefined | jwt() + %% only undefined during startup + , jwk := undefined | jose_jwk:key() + , iss := binary() + , sub := binary() + , aud := binary() + , kid := binary() + , alg := binary() + }. + +-define(refresh_jwt, refresh_jwt). + +%%----------------------------------------------------------------------------------------- +%% API +%%----------------------------------------------------------------------------------------- + +-spec start_link(config(), reference()) -> gen_server:start_ret(). +start_link(#{ private_key := _ + , expiration := _ + , resource_id := _ + , table := _ + , iss := _ + , sub := _ + , aud := _ + , kid := _ + , alg := _ + } = Config, + Ref) -> + gen_server:start_link(?MODULE, {Config, Ref}, []). + +-spec lookup_jwt(resource_id()) -> {ok, jwt()} | {error, not_found}. +lookup_jwt(ResourceId) -> + ?MODULE:lookup_jwt(?JWT_TABLE, ResourceId). + +-spec lookup_jwt(ets:table(), resource_id()) -> {ok, jwt()} | {error, not_found}. +lookup_jwt(TId, ResourceId) -> + try + case ets:lookup(TId, {ResourceId, jwt}) of + [{{ResourceId, jwt}, JWT}] -> + {ok, JWT}; + [] -> + {error, not_found} + end + catch + error:badarg -> + {error, not_found} + end. + +%%----------------------------------------------------------------------------------------- +%% gen_server API +%%----------------------------------------------------------------------------------------- + +-spec init({config(), Ref}) -> {ok, state(), {continue, {make_key, binary(), Ref}}} + | {stop, {error, term()}} + when Ref :: reference(). +init({#{private_key := PrivateKeyPEM} = Config, Ref}) -> + {ok, _} = application:ensure_all_started(jose), + State0 = maps:without([private_key], Config), + State = State0#{ jwk => undefined + , jwt => undefined + , refresh_timer => undefined + }, + {ok, State, {continue, {make_key, PrivateKeyPEM, Ref}}}. + +handle_continue({make_key, PrivateKeyPEM, Ref}, State0) -> + case jose_jwk:from_pem(PrivateKeyPEM) of + JWK = #jose_jwk{} -> + State = State0#{jwk := JWK}, + {noreply, State, {continue, {create_token, Ref}}}; + [] -> + Ref ! {Ref, {error, {invalid_private_key, empty_key}}}, + {stop, {error, empty_key}, State0}; + {error, Reason} -> + Ref ! {Ref, {error, {invalid_private_key, Reason}}}, + {stop, {error, Reason}, State0}; + Error -> + Ref ! {Ref, {error, {invalid_private_key, Error}}}, + {stop, {error, Error}, State0} + end; +handle_continue({create_token, Ref}, State0) -> + JWT = do_generate_jwt(State0), + store_jwt(State0, JWT), + State1 = State0#{jwt := JWT}, + State = ensure_timer(State1), + Ref ! {Ref, token_created}, + {noreply, State}. + +handle_call(_Req, _From, State) -> + {reply, {error, bad_call}, State}. + +handle_cast(_Req, State) -> + {noreply, State}. + +handle_info({timeout, TRef, ?refresh_jwt}, State0 = #{refresh_timer := TRef}) -> + JWT = do_generate_jwt(State0), + store_jwt(State0, JWT), + ?tp(rule_engine_jwt_worker_refresh, #{}), + State1 = State0#{jwt := JWT}, + State = ensure_timer(State1#{refresh_timer := undefined}), + {noreply, State}; +handle_info(_Msg, State) -> + {noreply, State}. + +format_status(State) -> + censor_secrets(State). + +format_status(_Opt, [_PDict, State0]) -> + State = censor_secrets(State0), + [{data, [{"State", State}]}]. + +%%----------------------------------------------------------------------------------------- +%% Helper fns +%%----------------------------------------------------------------------------------------- + +-spec do_generate_jwt(state()) -> jwt(). +do_generate_jwt(#{ expiration := ExpirationMS + , iss := Iss + , sub := Sub + , aud := Aud + , kid := KId + , alg := Alg + , jwk := JWK + } = _State) -> + Headers = #{ <<"alg">> => Alg + , <<"kid">> => KId + }, + Now = erlang:system_time(seconds), + ExpirationS = erlang:convert_time_unit(ExpirationMS, millisecond, second), + Claims = #{ <<"iss">> => Iss + , <<"sub">> => Sub + , <<"aud">> => Aud + , <<"iat">> => Now + , <<"exp">> => Now + ExpirationS + }, + JWT0 = jose_jwt:sign(JWK, Headers, Claims), + {_, JWT} = jose_jws:compact(JWT0), + JWT. + +-spec store_jwt(state(), jwt()) -> ok. +store_jwt(#{resource_id := ResourceId, table := TId}, JWT) -> + true = ets:insert(TId, {{ResourceId, jwt}, JWT}), + ?tp(jwt_worker_token_stored, #{resource_id => ResourceId}), + ok. + +-spec ensure_timer(state()) -> state(). +ensure_timer(State = #{ refresh_timer := undefined + , expiration := ExpirationMS0 + }) -> + ExpirationMS = max(5_000, ExpirationMS0 - 5_000), + TRef = erlang:start_timer(ExpirationMS, self(), ?refresh_jwt), + State#{refresh_timer => TRef}; +ensure_timer(State) -> + State. + +-spec censor_secrets(state()) -> map(). +censor_secrets(State) -> + maps:map( + fun(Key, _Value) when Key =:= jwt; + Key =:= jwk -> + "******"; + (_Key, Value) -> + Value + end, + State). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl index 03e3fd11a..087dbcbfb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl @@ -22,7 +22,9 @@ -export([start_link/0]). --export([start_locker/0]). +-export([ start_locker/0 + , start_jwt_sup/0 + ]). -export([init/1]). @@ -31,8 +33,12 @@ start_link() -> init([]) -> Opts = [public, named_table, set, {read_concurrency, true}], - _ = ets:new(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]), - _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), + ensure_table(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]), + ensure_table(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), + SupFlags = #{ strategy => one_for_one + , intensity => 10 + , period => 10 + }, Registry = #{id => emqx_rule_registry, start => {emqx_rule_registry, start_link, []}, restart => permanent, @@ -51,7 +57,8 @@ init([]) -> shutdown => 5000, type => worker, modules => [emqx_rule_monitor]}, - {ok, {{one_for_one, 10, 10}, [Registry, Metrics, Monitor]}}. + JWTSup = jwt_sup_child_spec(), + {ok, {SupFlags, [Registry, Metrics, Monitor, JWTSup]}}. start_locker() -> Locker = #{id => emqx_rule_locker, @@ -61,3 +68,32 @@ start_locker() -> type => worker, modules => [emqx_rule_locker]}, supervisor:start_child(?MODULE, Locker). + +start_jwt_sup() -> + JWTSup = jwt_sup_child_spec(), + supervisor:start_child(?MODULE, JWTSup). + +jwt_sup_child_spec() -> + #{ id => emqx_rule_engine_jwt_sup + , start => {emqx_rule_engine_jwt_sup, start_link, []} + , type => supervisor + , restart => permanent + , shutdown => 5_000 + , modules => [emqx_rule_engine_jwt_sup] + }. + +ensure_table(Name, Opts) -> + try + case ets:whereis(name) of + undefined -> + _ = ets:new(Name, Opts), + ok; + _ -> + ok + end + catch + %% stil the table exists (somehow can happen in hot-upgrade, + %% it seems). + error:badarg -> + ok + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl index 2cebf96e5..4d8de9cff 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -include("rule_engine.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% API functions -export([ start_link/0 @@ -222,7 +223,9 @@ inc(Id, Metric, Val) -> counters:add(couters_ref(Id), metrics_idx(Metric), Val); Ref -> counters:add(Ref, metrics_idx(Metric), Val) - end. + end, + ?tp(rule_metrics_inc, #{id => Id, metric => Metric, value => Val}), + ok. inc_actions_taken(Id) -> inc_actions_taken(Id, 1). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl new file mode 100644 index 000000000..8cc350b52 --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl @@ -0,0 +1,246 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_jwt_worker_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). +-include_lib("jose/include/jose_jwt.hrl"). +-include_lib("jose/include/jose_jws.hrl"). + +-compile([export_all, nowarn_export_all]). + +%%----------------------------------------------------------------------------- +%% CT boilerplate +%%----------------------------------------------------------------------------- + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +%%----------------------------------------------------------------------------- +%% Helper fns +%%----------------------------------------------------------------------------- + +generate_private_key_pem() -> + PublicExponent = 65537, + Size = 2048, + Key = public_key:generate_key({rsa, Size, PublicExponent}), + DERKey = public_key:der_encode('PrivateKeyInfo', Key), + public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]). + +generate_config() -> + PrivateKeyPEM = generate_private_key_pem(), + ResourceID = emqx_guid:gen(), + #{ private_key => PrivateKeyPEM + , expiration => timer:hours(1) + , resource_id => ResourceID + , table => ets:new(test_jwt_table, [ordered_set, public]) + , iss => <<"issuer">> + , sub => <<"subject">> + , aud => <<"audience">> + , kid => <<"key id">> + , alg => <<"RS256">> + }. + +is_expired(JWT) -> + #jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT), + Now = erlang:system_time(seconds), + Now >= Exp. + +%%----------------------------------------------------------------------------- +%% Test cases +%%----------------------------------------------------------------------------- + +t_create_success(_Config) -> + Ref = alias([reply]), + Config = generate_config(), + ?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config, Ref)), + receive + {Ref, token_created} -> + ok + after + 1_000 -> + ct:fail("should have confirmed token creation; msgs: ~0p", + [process_info(self(), messages)]) + end, + ok. + +t_empty_key(_Config) -> + Ref = alias([reply]), + Config0 = generate_config(), + Config = Config0#{private_key := <<>>}, + process_flag(trap_exit, true), + ?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config, Ref)), + receive + {Ref, {error, {invalid_private_key, empty_key}}} -> + ok + after + 1_000 -> + ct:fail("should have errored; msgs: ~0p", + [process_info(self(), messages)]) + end, + ok. + +t_invalid_pem(_Config) -> + Ref = alias([reply]), + Config0 = generate_config(), + InvalidPEM = public_key:pem_encode([{'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted}, + {'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted}]), + Config = Config0#{private_key := InvalidPEM}, + process_flag(trap_exit, true), + ?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config, Ref)), + receive + {Ref, {error, {invalid_private_key, _}}} -> + ok + after + 1_000 -> + ct:fail("should have errored; msgs: ~0p", + [process_info(self(), messages)]) + end, + ok. + +t_refresh(_Config) -> + Ref = alias([reply]), + Config0 = #{ table := Table + , resource_id := ResourceId + } = generate_config(), + Config = Config0#{expiration => 5_000}, + ?check_trace( + begin + {{ok, _Pid}, {ok, _Event}} = + ?wait_async_action( + emqx_rule_engine_jwt_worker:start_link(Config, Ref), + #{?snk_kind := jwt_worker_token_stored}, + 5_000), + {ok, FirstJWT} = emqx_rule_engine_jwt_worker:lookup_jwt(Table, ResourceId), + ?block_until(#{?snk_kind := rule_engine_jwt_worker_refresh}, 15_000), + {ok, SecondJWT} = emqx_rule_engine_jwt_worker:lookup_jwt(Table, ResourceId), + ?assertNot(is_expired(SecondJWT)), + ?assert(is_expired(FirstJWT)), + {FirstJWT, SecondJWT} + end, + fun({FirstJWT, SecondJWT}, Trace) -> + ?assertMatch([_, _ | _], + ?of_kind(jwt_worker_token_stored, Trace)), + ?assertNotEqual(FirstJWT, SecondJWT), + ok + end), + ok. + +t_format_status(_Config) -> + Ref = alias([reply]), + Config = generate_config(), + {ok, Pid} = emqx_rule_engine_jwt_worker:start_link(Config, Ref), + {status, _, _, Props} = sys:get_status(Pid), + [State] = [State + || Info = [_ | _] <- Props, + {data, Data = [_ | _]} <- Info, + {"State", State} <- Data], + ?assertMatch( + #{ jwt := "******" + , jwk := "******" + }, + State), + ok. + +t_lookup_ok(_Config) -> + Ref = alias([reply]), + Config = #{ table := Table + , resource_id := ResourceId + , private_key := PrivateKeyPEM + , aud := Aud + , iss := Iss + , sub := Sub + , kid := KId + } = generate_config(), + {ok, _} = emqx_rule_engine_jwt_worker:start_link(Config, Ref), + receive + {Ref, token_created} -> + ok + after + 500 -> + error(timeout) + end, + Res = emqx_rule_engine_jwt_worker:lookup_jwt(Table, ResourceId), + ?assertMatch({ok, _}, Res), + {ok, JWT} = Res, + ?assert(is_binary(JWT)), + JWK = jose_jwk:from_pem(PrivateKeyPEM), + {IsValid, ParsedJWT, JWS} = jose_jwt:verify_strict(JWK, [<<"RS256">>], JWT), + ?assertMatch( + #jose_jwt{ + fields = #{ <<"aud">> := Aud + , <<"iss">> := Iss + , <<"sub">> := Sub + , <<"exp">> := _ + , <<"iat">> := _ + }}, + ParsedJWT), + ?assertNot(is_expired(JWT)), + ?assertMatch( + #jose_jws{ + alg = {_, 'RS256'}, + fields = #{ <<"kid">> := KId + , <<"typ">> := <<"JWT">> + }}, + JWS), + ?assert(IsValid), + ok. + +t_lookup_not_found(_Config) -> + Table = ets:new(test_jwt_table, [ordered_set, public]), + InexistentResource = <<"xxx">>, + ?assertEqual({error, not_found}, + emqx_rule_engine_jwt_worker:lookup_jwt(Table, InexistentResource)), + ok. + +t_lookup_badarg(_Config) -> + InexistentTable = i_dont_exist, + InexistentResource = <<"xxx">>, + ?assertEqual({error, not_found}, + emqx_rule_engine_jwt_worker:lookup_jwt(InexistentTable, InexistentResource)), + ok. + +t_start_supervised_worker(_Config) -> + {ok, _} = emqx_rule_engine_jwt_sup:start_link(), + Config = #{resource_id := ResourceId} = generate_config(), + {Ref, Pid} = emqx_rule_engine_jwt_sup:start_worker(ResourceId, Config), + receive + {Ref, token_created} -> + ok + after + 1_000 -> + ct:fail("timeout") + end, + MRef = monitor(process, Pid), + ?assert(is_process_alive(Pid)), + ok = emqx_rule_engine_jwt_sup:stop_worker(ResourceId), + receive + {'DOWN', MRef, process, Pid, _} -> + ok + after + 1_000 -> + ct:fail("timeout") + end, + ok. diff --git a/changes/v4.4.11-en.md b/changes/v4.4.11-en.md index 55507c981..09e73b3c7 100644 --- a/changes/v4.4.11-en.md +++ b/changes/v4.4.11-en.md @@ -6,6 +6,8 @@ - Added support for OCSP (Online Certificate Status Protocol) Stapling - Added CRL (Certificate Revocation List) cache auto refresh +- Added a JWT worker for creating and refreshing JWT tokens in rule engine actions. [#9241](https://github.com/emqx/emqx/pull/9241) + ### Bug fixes - Fix get trace list crash when trace not initialize. [#9156](https://github.com/emqx/emqx/pull/9156) diff --git a/changes/v4.4.11-zh.md b/changes/v4.4.11-zh.md index 82c4558e1..4549c0ccf 100644 --- a/changes/v4.4.11-zh.md +++ b/changes/v4.4.11-zh.md @@ -5,7 +5,9 @@ - Erlang/OTP [SSL库漏洞修复](https://nvd.nist.gov/vuln/detail/CVE-2022-37026) - 增加了对 OCSP (Online Certificate Status Protocol) Stapling 的支持 - 增加了 CRL(证书吊销列表)缓存的自动刷新功能 - + +- 增加了一个JWT工作者,用于在规则引擎动作中创建和刷新JWT令牌。[#9241](https://github.com/emqx/emqx/pull/9241) + ### 修复 - 修复日志追踪模块没开启时,GET Trace 列表接口报错的问题。[#9156](https://github.com/emqx/emqx/pull/9156) diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index af7093dd7..c2d16275e 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -31,12 +31,23 @@ start_slave(Name) -> start_slave(Name, #{}). start_slave(Name, Opts) -> + SlaveMod = maps:get(slave_mod, Opts, ct_slave), Node = make_node_name(Name), - case ct_slave:start(Node, [{kill_if_fail, true}, - {monitor_master, true}, - {init_timeout, 10000}, - {startup_timeout, 10000}, - {erl_flags, ebin_path()}]) of + DoStart = + fun() -> + case SlaveMod of + ct_slave -> + ct_slave:start(Node, + [{kill_if_fail, true}, + {monitor_master, true}, + {init_timeout, 10000}, + {startup_timeout, 10000}, + {erl_flags, ebin_path()}]); + slave -> + slave:start_link(host(), Name, ebin_path()) + end + end, + case DoStart() of {ok, _} -> ok; {error, started_not_connected, _} -> @@ -115,6 +126,9 @@ setup_node(Node, #{} = Opts) -> ?assertEqual( node() , gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []]) ), + + ok = snabbkaffe:forward_trace(Node), + ok. %% Routes are replicated async. From 4a06c25178f3261bce956ee5f6b1d69c25e81438 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 27 Oct 2022 13:19:07 -0300 Subject: [PATCH 2/9] chore: add some docs and specs; treat some possible errors --- .../src/emqx_rule_engine_jwt_sup.erl | 24 ++++++++++++++++--- .../emqx_rule_engine_jwt_worker_SUITE.erl | 4 ++-- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl index bdf38378c..72ead42ab 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl @@ -25,6 +25,8 @@ -export([init/1]). +-type worker_id() :: term(). + start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -37,14 +39,30 @@ init([]) -> ChildSpecs = [], {ok, {SupFlags, ChildSpecs}}. +%% @doc Starts a new JWT worker. The worker will send the caller a +%% message when it creates and stores its first JWT, or if it fails to +%% do so, using a generated reference. +-spec start_worker(worker_id(), map()) -> + {ok, {reference(), supervisor:child()}} + | {error, already_present} + | {error, {already_started, supervisor:child()}}. start_worker(Id, Config) -> Ref = erlang:alias([reply]), ChildSpec = jwt_worker_child_spec(Id, Config, Ref), - {ok, Pid} = supervisor:start_child(?MODULE, ChildSpec), - {Ref, Pid}. + case supervisor:start_child(?MODULE, ChildSpec) of + {ok, Pid} -> + {ok, {Ref, Pid}}; + Error -> + Error + end. +%% @doc Stops a given JWT worker by its id. +-spec stop_worker(worker_id()) -> ok. stop_worker(Id) -> - supervisor:terminate_child(?MODULE, Id). + case supervisor:terminate_child(?MODULE, Id) of + ok -> ok; + {error, not_found} -> ok + end. jwt_worker_child_spec(Id, Config, Ref) -> #{ id => Id diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl index 8cc350b52..85d2409ef 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl @@ -225,12 +225,12 @@ t_lookup_badarg(_Config) -> t_start_supervised_worker(_Config) -> {ok, _} = emqx_rule_engine_jwt_sup:start_link(), Config = #{resource_id := ResourceId} = generate_config(), - {Ref, Pid} = emqx_rule_engine_jwt_sup:start_worker(ResourceId, Config), + {ok, {Ref, Pid}} = emqx_rule_engine_jwt_sup:start_worker(ResourceId, Config), receive {Ref, token_created} -> ok after - 1_000 -> + 5_000 -> ct:fail("timeout") end, MRef = monitor(process, Pid), From f926bcfbc0537fb6868915f0c7d651ca03d70955 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 31 Oct 2022 10:35:14 -0300 Subject: [PATCH 3/9] fix(appup): stop workers and supervisors before deleting code when downgrading --- .../src/emqx_rule_engine.appup.src | 80 ++++++++++++++----- 1 file changed, 60 insertions(+), 20 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index e14f41440..6d014b8ad 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,7 +2,8 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.10", - [{add_module,emqx_rule_engine_jwt_worker}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, {add_module,emqx_rule_engine_jwt_sup}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, @@ -16,7 +17,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.9", - [{add_module,emqx_rule_engine_jwt_worker}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, {add_module,emqx_rule_engine_jwt_sup}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, @@ -31,7 +33,8 @@ {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.8", - [{add_module,emqx_rule_engine_jwt_worker}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, {add_module,emqx_rule_engine_jwt_sup}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, @@ -47,7 +50,8 @@ {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {<<"4\\.4\\.[6-7]">>, - [{add_module,emqx_rule_engine_jwt_worker}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, {add_module,emqx_rule_engine_jwt_sup}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, @@ -64,7 +68,8 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.5", - [{add_module,emqx_rule_engine_jwt_worker}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, {add_module,emqx_rule_engine_jwt_sup}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, @@ -82,7 +87,8 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.4", - [{add_module,emqx_rule_engine_jwt_worker}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, {add_module,emqx_rule_engine_jwt_sup}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, @@ -100,7 +106,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, {"4.4.3", - [{add_module,emqx_rule_engine_jwt_worker}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, {add_module,emqx_rule_engine_jwt_sup}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, @@ -120,7 +127,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.2", - [{add_module,emqx_rule_engine_jwt_worker}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, {add_module,emqx_rule_engine_jwt_sup}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, @@ -141,7 +149,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.1", - [{add_module,emqx_rule_engine_jwt_worker}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, {add_module,emqx_rule_engine_jwt_sup}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, @@ -162,7 +171,8 @@ {add_module,emqx_rule_date}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.0", - [{add_module,emqx_rule_engine_jwt_worker}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, {add_module,emqx_rule_engine_jwt_sup}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, @@ -194,8 +204,11 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, {delete_module,emqx_rule_engine_jwt_sup}, - {delete_module,emqx_rule_engine_jwt_worker}]}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.9", [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, @@ -208,8 +221,11 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, {delete_module,emqx_rule_engine_jwt_sup}, - {delete_module,emqx_rule_engine_jwt_worker}]}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.8", [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, @@ -223,8 +239,11 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, {delete_module,emqx_rule_engine_jwt_sup}, - {delete_module,emqx_rule_engine_jwt_worker}]}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {<<"4\\.4\\.[6-7]">>, [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -239,8 +258,11 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, {delete_module,emqx_rule_engine_jwt_sup}, - {delete_module,emqx_rule_engine_jwt_worker}]}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.5", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -256,8 +278,11 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, {delete_module,emqx_rule_engine_jwt_sup}, - {delete_module,emqx_rule_engine_jwt_worker}]}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.4", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -273,8 +298,11 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, {delete_module,emqx_rule_engine_jwt_sup}, - {delete_module,emqx_rule_engine_jwt_worker}]}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.3", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, @@ -292,8 +320,11 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, {delete_module,emqx_rule_engine_jwt_sup}, - {delete_module,emqx_rule_engine_jwt_worker}]}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.2", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, @@ -312,8 +343,11 @@ {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, {delete_module,emqx_rule_engine_jwt_sup}, - {delete_module,emqx_rule_engine_jwt_worker}]}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.1", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, @@ -332,8 +366,11 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, {delete_module,emqx_rule_engine_jwt_sup}, - {delete_module,emqx_rule_engine_jwt_worker}]}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.0", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, @@ -351,7 +388,10 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, {delete_module,emqx_rule_date}, {delete_module,emqx_rule_engine_jwt_sup}, - {delete_module,emqx_rule_engine_jwt_worker}]}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {<<".*">>,[]}]}. From 270f47aafc5b8e0e4243a4d67707feddb0bec2dc Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 31 Oct 2022 10:36:59 -0300 Subject: [PATCH 4/9] refactor: start jose as an app dependency --- apps/emqx_rule_engine/rebar.config | 3 ++- apps/emqx_rule_engine/src/emqx_rule_engine.app.src | 2 +- apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_rule_engine/rebar.config b/apps/emqx_rule_engine/rebar.config index f9ad1b283..5154decf5 100644 --- a/apps/emqx_rule_engine/rebar.config +++ b/apps/emqx_rule_engine/rebar.config @@ -1,5 +1,6 @@ %% -*- mode: erlang -*- -{deps, []}. +{deps, [ {jose, {git, "https://github.com/emqx/erlang-jose", {tag, "emqx-1.11.3"}}} + ]}. %% Comple Opts {erl_opts, [warn_unused_vars, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 1490c3e94..31e08f10f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -3,7 +3,7 @@ {vsn, "4.4.11"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_registry, emqx_rule_engine_jwt_sup]}, - {applications, [kernel,stdlib,rulesql,getopt]}, + {applications, [kernel,stdlib,rulesql,getopt,jose]}, {mod, {emqx_rule_engine_app, []}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl index 8c5f1b0af..eef319901 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl @@ -110,7 +110,6 @@ lookup_jwt(TId, ResourceId) -> | {stop, {error, term()}} when Ref :: reference(). init({#{private_key := PrivateKeyPEM} = Config, Ref}) -> - {ok, _} = application:ensure_all_started(jose), State0 = maps:without([private_key], Config), State = State0#{ jwk => undefined , jwt => undefined From b4837302aab51181ef232fd31e1adb34a9dd4419 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 31 Oct 2022 11:01:58 -0300 Subject: [PATCH 5/9] refactor: move api to emqx_rule_engine_jwt module --- .../src/emqx_rule_engine_jwt.erl | 45 +++++++++++++++++++ .../src/emqx_rule_engine_jwt_worker.erl | 20 --------- .../emqx_rule_engine_jwt_worker_SUITE.erl | 10 ++--- 3 files changed, 50 insertions(+), 25 deletions(-) create mode 100644 apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl new file mode 100644 index 000000000..db82c50ad --- /dev/null +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl @@ -0,0 +1,45 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_jwt). + +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). +-include_lib("emqx_rule_engine/include/rule_actions.hrl"). + +%% API +-export([ lookup_jwt/1 + , lookup_jwt/2 + ]). + +-type jwt() :: binary(). + +-spec lookup_jwt(resource_id()) -> {ok, jwt()} | {error, not_found}. +lookup_jwt(ResourceId) -> + ?MODULE:lookup_jwt(?JWT_TABLE, ResourceId). + +-spec lookup_jwt(ets:table(), resource_id()) -> {ok, jwt()} | {error, not_found}. +lookup_jwt(TId, ResourceId) -> + try + case ets:lookup(TId, {ResourceId, jwt}) of + [{{ResourceId, jwt}, JWT}] -> + {ok, JWT}; + [] -> + {error, not_found} + end + catch + error:badarg -> + {error, not_found} + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl index eef319901..855c3e076 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl @@ -20,8 +20,6 @@ %% API -export([ start_link/2 - , lookup_jwt/1 - , lookup_jwt/2 ]). %% gen_server API @@ -84,24 +82,6 @@ start_link(#{ private_key := _ Ref) -> gen_server:start_link(?MODULE, {Config, Ref}, []). --spec lookup_jwt(resource_id()) -> {ok, jwt()} | {error, not_found}. -lookup_jwt(ResourceId) -> - ?MODULE:lookup_jwt(?JWT_TABLE, ResourceId). - --spec lookup_jwt(ets:table(), resource_id()) -> {ok, jwt()} | {error, not_found}. -lookup_jwt(TId, ResourceId) -> - try - case ets:lookup(TId, {ResourceId, jwt}) of - [{{ResourceId, jwt}, JWT}] -> - {ok, JWT}; - [] -> - {error, not_found} - end - catch - error:badarg -> - {error, not_found} - end. - %%----------------------------------------------------------------------------------------- %% gen_server API %%----------------------------------------------------------------------------------------- diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl index 85d2409ef..bd701765b 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl @@ -133,9 +133,9 @@ t_refresh(_Config) -> emqx_rule_engine_jwt_worker:start_link(Config, Ref), #{?snk_kind := jwt_worker_token_stored}, 5_000), - {ok, FirstJWT} = emqx_rule_engine_jwt_worker:lookup_jwt(Table, ResourceId), + {ok, FirstJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId), ?block_until(#{?snk_kind := rule_engine_jwt_worker_refresh}, 15_000), - {ok, SecondJWT} = emqx_rule_engine_jwt_worker:lookup_jwt(Table, ResourceId), + {ok, SecondJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId), ?assertNot(is_expired(SecondJWT)), ?assert(is_expired(FirstJWT)), {FirstJWT, SecondJWT} @@ -182,7 +182,7 @@ t_lookup_ok(_Config) -> 500 -> error(timeout) end, - Res = emqx_rule_engine_jwt_worker:lookup_jwt(Table, ResourceId), + Res = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId), ?assertMatch({ok, _}, Res), {ok, JWT} = Res, ?assert(is_binary(JWT)), @@ -212,14 +212,14 @@ t_lookup_not_found(_Config) -> Table = ets:new(test_jwt_table, [ordered_set, public]), InexistentResource = <<"xxx">>, ?assertEqual({error, not_found}, - emqx_rule_engine_jwt_worker:lookup_jwt(Table, InexistentResource)), + emqx_rule_engine_jwt:lookup_jwt(Table, InexistentResource)), ok. t_lookup_badarg(_Config) -> InexistentTable = i_dont_exist, InexistentResource = <<"xxx">>, ?assertEqual({error, not_found}, - emqx_rule_engine_jwt_worker:lookup_jwt(InexistentTable, InexistentResource)), + emqx_rule_engine_jwt:lookup_jwt(InexistentTable, InexistentResource)), ok. t_start_supervised_worker(_Config) -> From a33977e635f254e4a7a40dd73def20a6b5f5822d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 31 Oct 2022 15:08:25 -0300 Subject: [PATCH 6/9] feat: add ets table creation to jwt supervisor --- .../src/emqx_rule_engine_jwt_sup.erl | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl index 72ead42ab..18cc48c9e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl @@ -25,12 +25,15 @@ -export([init/1]). +-include_lib("emqx_rule_engine/include/rule_actions.hrl"). + -type worker_id() :: term(). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> + ensure_jwt_table(), SupFlags = #{ strategy => one_for_one , intensity => 10 , period => 5 @@ -73,3 +76,15 @@ jwt_worker_child_spec(Id, Config, Ref) -> , shutdown => brutal_kill , modules => [emqx_rule_engine_jwt_worker] }. + +-spec ensure_jwt_table() -> ok. +ensure_jwt_table() -> + case ets:whereis(?JWT_TABLE) of + undefined -> + Opts = [named_table, public, + {read_concurrency, true}, ordered_set], + _ = ets:new(?JWT_TABLE, Opts), + ok; + _ -> + ok + end. From d714f785903702e42fe1438af8d2e1bc50ba280c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 3 Nov 2022 11:12:06 -0300 Subject: [PATCH 7/9] refactor: use `include` for headers --- apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl index db82c50ad..828c77f93 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl @@ -16,8 +16,8 @@ -module(emqx_rule_engine_jwt). --include_lib("emqx_rule_engine/include/rule_engine.hrl"). --include_lib("emqx_rule_engine/include/rule_actions.hrl"). +-include("rule_engine.hrl"). +-include("rule_actions.hrl"). %% API -export([ lookup_jwt/1 From f5c655ec1b77cb90ba3a201948a9fbb41710ffb2 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 3 Nov 2022 13:21:42 -0300 Subject: [PATCH 8/9] refactor: add api to ensure jwt token is created --- .../src/emqx_rule_engine_jwt_sup.erl | 33 ++++---- .../src/emqx_rule_engine_jwt_worker.erl | 79 +++++++++++-------- .../emqx_rule_engine_jwt_worker_SUITE.erl | 68 ++++++++-------- 3 files changed, 99 insertions(+), 81 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl index 18cc48c9e..4f3e24cce 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl @@ -19,8 +19,8 @@ -behaviour(supervisor). -export([ start_link/0 - , start_worker/2 - , stop_worker/1 + , ensure_worker_present/2 + , ensure_worker_deleted/1 ]). -export([init/1]). @@ -45,32 +45,31 @@ init([]) -> %% @doc Starts a new JWT worker. The worker will send the caller a %% message when it creates and stores its first JWT, or if it fails to %% do so, using a generated reference. --spec start_worker(worker_id(), map()) -> - {ok, {reference(), supervisor:child()}} - | {error, already_present} - | {error, {already_started, supervisor:child()}}. -start_worker(Id, Config) -> - Ref = erlang:alias([reply]), - ChildSpec = jwt_worker_child_spec(Id, Config, Ref), +-spec ensure_worker_present(worker_id(), map()) -> + {ok, supervisor:child()}. +ensure_worker_present(Id, Config) -> + ChildSpec = jwt_worker_child_spec(Id, Config), case supervisor:start_child(?MODULE, ChildSpec) of {ok, Pid} -> - {ok, {Ref, Pid}}; - Error -> - Error + {ok, Pid}; + {error, {already_started, Pid}} -> + {ok, Pid}; + {error, already_present} -> + supervisor:restart_child(?MODULE, Id) end. %% @doc Stops a given JWT worker by its id. --spec stop_worker(worker_id()) -> ok. -stop_worker(Id) -> +-spec ensure_worker_deleted(worker_id()) -> ok. +ensure_worker_deleted(Id) -> case supervisor:terminate_child(?MODULE, Id) of ok -> ok; {error, not_found} -> ok end. -jwt_worker_child_spec(Id, Config, Ref) -> +jwt_worker_child_spec(Id, Config) -> #{ id => Id - , start => {emqx_rule_engine_jwt_worker, start_link, [Config, Ref]} - , restart => permanent + , start => {emqx_rule_engine_jwt_worker, start_link, [Config]} + , restart => transient , type => worker , significant => false , shutdown => brutal_kill diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl index 855c3e076..4190a3536 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl @@ -19,7 +19,8 @@ -behaviour(gen_server). %% API --export([ start_link/2 +-export([ start_link/1 + , ensure_jwt/1 ]). %% gen_server API @@ -68,7 +69,7 @@ %% API %%----------------------------------------------------------------------------------------- --spec start_link(config(), reference()) -> gen_server:start_ret(). +-spec start_link(config()) -> gen_server:start_ret(). start_link(#{ private_key := _ , expiration := _ , resource_id := _ @@ -78,60 +79,68 @@ start_link(#{ private_key := _ , aud := _ , kid := _ , alg := _ - } = Config, - Ref) -> - gen_server:start_link(?MODULE, {Config, Ref}, []). + } = Config) -> + gen_server:start_link(?MODULE, Config, []). + +-spec ensure_jwt(pid()) -> reference(). +ensure_jwt(Worker) -> + Ref = alias([reply]), + gen_server:cast(Worker, {ensure_jwt, Ref}), + Ref. %%----------------------------------------------------------------------------------------- %% gen_server API %%----------------------------------------------------------------------------------------- --spec init({config(), Ref}) -> {ok, state(), {continue, {make_key, binary(), Ref}}} - | {stop, {error, term()}} - when Ref :: reference(). -init({#{private_key := PrivateKeyPEM} = Config, Ref}) -> +-spec init(config()) -> {ok, state(), {continue, {make_key, binary()}}} + | {stop, {error, term()}}. +init(#{private_key := PrivateKeyPEM} = Config) -> State0 = maps:without([private_key], Config), State = State0#{ jwk => undefined , jwt => undefined , refresh_timer => undefined }, - {ok, State, {continue, {make_key, PrivateKeyPEM, Ref}}}. + {ok, State, {continue, {make_key, PrivateKeyPEM}}}. -handle_continue({make_key, PrivateKeyPEM, Ref}, State0) -> +handle_continue({make_key, PrivateKeyPEM}, State0) -> case jose_jwk:from_pem(PrivateKeyPEM) of JWK = #jose_jwk{} -> State = State0#{jwk := JWK}, - {noreply, State, {continue, {create_token, Ref}}}; + {noreply, State, {continue, create_token}}; [] -> - Ref ! {Ref, {error, {invalid_private_key, empty_key}}}, - {stop, {error, empty_key}, State0}; + ?tp(rule_engine_jwt_worker_startup_error, #{error => empty_key}), + {stop, {shutdown, {error, empty_key}}, State0}; {error, Reason} -> - Ref ! {Ref, {error, {invalid_private_key, Reason}}}, - {stop, {error, Reason}, State0}; - Error -> - Ref ! {Ref, {error, {invalid_private_key, Error}}}, - {stop, {error, Error}, State0} + Error = {invalid_private_key, Reason}, + ?tp(rule_engine_jwt_worker_startup_error, #{error => Error}), + {stop, {shutdown, {error, Error}}, State0}; + Error0 -> + Error = {invalid_private_key, Error0}, + ?tp(rule_engine_jwt_worker_startup_error, #{error => Error}), + {stop, {shutdown, {error, Error}}, State0} end; -handle_continue({create_token, Ref}, State0) -> - JWT = do_generate_jwt(State0), - store_jwt(State0, JWT), - State1 = State0#{jwt := JWT}, - State = ensure_timer(State1), - Ref ! {Ref, token_created}, +handle_continue(create_token, State0) -> + State = generate_and_store_jwt(State0), {noreply, State}. handle_call(_Req, _From, State) -> {reply, {error, bad_call}, State}. +handle_cast({ensure_jwt, From}, State0 = #{jwt := JWT}) -> + State = + case JWT of + undefined -> + generate_and_store_jwt(State0); + _ -> + State0 + end, + From ! {From, token_created}, + {noreply, State}; handle_cast(_Req, State) -> {noreply, State}. handle_info({timeout, TRef, ?refresh_jwt}, State0 = #{refresh_timer := TRef}) -> - JWT = do_generate_jwt(State0), - store_jwt(State0, JWT), - ?tp(rule_engine_jwt_worker_refresh, #{}), - State1 = State0#{jwt := JWT}, - State = ensure_timer(State1#{refresh_timer := undefined}), + State = generate_and_store_jwt(State0), {noreply, State}; handle_info(_Msg, State) -> {noreply, State}. @@ -171,10 +180,18 @@ do_generate_jwt(#{ expiration := ExpirationMS {_, JWT} = jose_jws:compact(JWT0), JWT. +-spec generate_and_store_jwt(state()) -> state(). +generate_and_store_jwt(State0) -> + JWT = do_generate_jwt(State0), + store_jwt(State0, JWT), + ?tp(rule_engine_jwt_worker_refresh, #{jwt => JWT}), + State1 = State0#{jwt := JWT}, + ensure_timer(State1). + -spec store_jwt(state(), jwt()) -> ok. store_jwt(#{resource_id := ResourceId, table := TId}, JWT) -> true = ets:insert(TId, {{ResourceId, jwt}, JWT}), - ?tp(jwt_worker_token_stored, #{resource_id => ResourceId}), + ?tp(rule_engine_jwt_worker_token_stored, #{resource_id => ResourceId}), ok. -spec ensure_timer(state()) -> state(). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl index bd701765b..fc84293e3 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl @@ -73,9 +73,11 @@ is_expired(JWT) -> %%----------------------------------------------------------------------------- t_create_success(_Config) -> - Ref = alias([reply]), Config = generate_config(), - ?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config, Ref)), + Res = emqx_rule_engine_jwt_worker:start_link(Config), + ?assertMatch({ok, _}, Res), + {ok, Worker} = Res, + Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Worker), receive {Ref, token_created} -> ok @@ -87,41 +89,40 @@ t_create_success(_Config) -> ok. t_empty_key(_Config) -> - Ref = alias([reply]), Config0 = generate_config(), Config = Config0#{private_key := <<>>}, process_flag(trap_exit, true), - ?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config, Ref)), - receive - {Ref, {error, {invalid_private_key, empty_key}}} -> - ok - after - 1_000 -> - ct:fail("should have errored; msgs: ~0p", - [process_info(self(), messages)]) - end, + ?check_trace( + ?wait_async_action( + ?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config)), + #{?snk_kind := rule_engine_jwt_worker_startup_error}, + 1_000), + fun(Trace) -> + ?assertMatch([#{error := empty_key}], + ?of_kind(rule_engine_jwt_worker_startup_error, Trace)), + ok + end), ok. t_invalid_pem(_Config) -> - Ref = alias([reply]), Config0 = generate_config(), InvalidPEM = public_key:pem_encode([{'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted}, {'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted}]), Config = Config0#{private_key := InvalidPEM}, process_flag(trap_exit, true), - ?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config, Ref)), - receive - {Ref, {error, {invalid_private_key, _}}} -> - ok - after - 1_000 -> - ct:fail("should have errored; msgs: ~0p", - [process_info(self(), messages)]) - end, + ?check_trace( + ?wait_async_action( + ?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config)), + #{?snk_kind := rule_engine_jwt_worker_startup_error}, + 1_000), + fun(Trace) -> + ?assertMatch([#{error := {invalid_private_key, _}}], + ?of_kind(rule_engine_jwt_worker_startup_error, Trace)), + ok + end), ok. t_refresh(_Config) -> - Ref = alias([reply]), Config0 = #{ table := Table , resource_id := ResourceId } = generate_config(), @@ -130,11 +131,12 @@ t_refresh(_Config) -> begin {{ok, _Pid}, {ok, _Event}} = ?wait_async_action( - emqx_rule_engine_jwt_worker:start_link(Config, Ref), - #{?snk_kind := jwt_worker_token_stored}, + emqx_rule_engine_jwt_worker:start_link(Config), + #{?snk_kind := rule_engine_jwt_worker_token_stored}, 5_000), {ok, FirstJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId), - ?block_until(#{?snk_kind := rule_engine_jwt_worker_refresh}, 15_000), + ?block_until(#{?snk_kind := rule_engine_jwt_worker_refresh, + jwt := JWT0} when JWT0 =/= FirstJWT, 15_000), {ok, SecondJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId), ?assertNot(is_expired(SecondJWT)), ?assert(is_expired(FirstJWT)), @@ -142,16 +144,15 @@ t_refresh(_Config) -> end, fun({FirstJWT, SecondJWT}, Trace) -> ?assertMatch([_, _ | _], - ?of_kind(jwt_worker_token_stored, Trace)), + ?of_kind(rule_engine_jwt_worker_token_stored, Trace)), ?assertNotEqual(FirstJWT, SecondJWT), ok end), ok. t_format_status(_Config) -> - Ref = alias([reply]), Config = generate_config(), - {ok, Pid} = emqx_rule_engine_jwt_worker:start_link(Config, Ref), + {ok, Pid} = emqx_rule_engine_jwt_worker:start_link(Config), {status, _, _, Props} = sys:get_status(Pid), [State] = [State || Info = [_ | _] <- Props, @@ -165,7 +166,6 @@ t_format_status(_Config) -> ok. t_lookup_ok(_Config) -> - Ref = alias([reply]), Config = #{ table := Table , resource_id := ResourceId , private_key := PrivateKeyPEM @@ -174,7 +174,8 @@ t_lookup_ok(_Config) -> , sub := Sub , kid := KId } = generate_config(), - {ok, _} = emqx_rule_engine_jwt_worker:start_link(Config, Ref), + {ok, Worker} = emqx_rule_engine_jwt_worker:start_link(Config), + Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Worker), receive {Ref, token_created} -> ok @@ -225,7 +226,8 @@ t_lookup_badarg(_Config) -> t_start_supervised_worker(_Config) -> {ok, _} = emqx_rule_engine_jwt_sup:start_link(), Config = #{resource_id := ResourceId} = generate_config(), - {ok, {Ref, Pid}} = emqx_rule_engine_jwt_sup:start_worker(ResourceId, Config), + {ok, Pid} = emqx_rule_engine_jwt_sup:ensure_worker_present(ResourceId, Config), + Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Pid), receive {Ref, token_created} -> ok @@ -235,7 +237,7 @@ t_start_supervised_worker(_Config) -> end, MRef = monitor(process, Pid), ?assert(is_process_alive(Pid)), - ok = emqx_rule_engine_jwt_sup:stop_worker(ResourceId), + ok = emqx_rule_engine_jwt_sup:ensure_worker_deleted(ResourceId), receive {'DOWN', MRef, process, Pid, _} -> ok From d3c6ade64059743d7806f816a594b37b7fab4d74 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 3 Nov 2022 16:18:45 -0300 Subject: [PATCH 9/9] docs: rewrite fn doc --- apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl index 4f3e24cce..b393dd08b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl @@ -42,9 +42,9 @@ init([]) -> ChildSpecs = [], {ok, {SupFlags, ChildSpecs}}. -%% @doc Starts a new JWT worker. The worker will send the caller a -%% message when it creates and stores its first JWT, or if it fails to -%% do so, using a generated reference. +%% @doc Starts a new JWT worker. The caller should use +%% `emqx_rule_engine_jwt_sup:ensure_jwt/1' to ensure that a JWT has +%% been stored, if synchronization is needed. -spec ensure_worker_present(worker_id(), map()) -> {ok, supervisor:child()}. ensure_worker_present(Id, Config) ->