From c3237b6281db2576d2f40aae39ee3e01119cc24b Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 24 Mar 2022 10:39:54 +0800 Subject: [PATCH 1/3] fix(rule): connection test when creating a resource --- .../src/emqx_rule_engine.app.src | 2 +- .../src/emqx_rule_engine.appup.src | 8 ++- .../emqx_rule_engine/src/emqx_rule_engine.erl | 49 ++++++++++++++----- 3 files changed, 44 insertions(+), 15 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index cd57630b4..f0d73e581 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -1,6 +1,6 @@ {application, emqx_rule_engine, [{description, "EMQ X Rule Engine"}, - {vsn, "4.3.8"}, % strict semver, bump manually! + {vsn, "4.3.9"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_registry]}, {applications, [kernel,stdlib,rulesql,getopt]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 0ed0fee3a..010171ab2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,7 +1,9 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.7", + [{"4.3.8", + [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {"4.3.7", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -78,7 +80,9 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.7", + [{"4.3.8", + [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {"4.3.7", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index c8e69a17f..1a5fecea1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -314,24 +314,49 @@ start_resource(ResId) -> end. -spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}). -test_resource(#{type := Type, config := Config0}) -> +test_resource(#{type := Type} = Params) -> case emqx_rule_registry:find_resource_type(Type) of - {ok, #resource_type{on_create = {ModC, Create}, - on_destroy = {ModD, Destroy}, - params_spec = ParamSpec}} -> - Config = emqx_rule_validator:validate_params(Config0, ParamSpec), - ResId = resource_id(), - try - _ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]), - _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), - ok - catch - throw:Reason -> {error, Reason} + {ok, #resource_type{}} -> + ResId = maps:get(id, Params, resource_id()), + CreateFun = fun() -> _ = create_resource(Params) end, + StatusFun = + fun() -> + case get_resource_status(ResId) of + {ok, #{is_alive := true}} -> + ignore; + {ok, #{is_alive := false}} -> + error(not_alive); + {error, R} -> + error(R) + end + end, + DeleteFun = fun() -> _ = ?CLUSTER_CALL(delete_resource, [ResId]) end, + case + %% create error or status failed + (ok == safe_test_resource(CreateFun, create_resource)) + andalso + safe_test_resource(StatusFun, get_resource_status) + of + ok -> + _ = safe_test_resource(DeleteFun, delete_resource), + ok; + _ -> + _ = safe_test_resource(DeleteFun, delete_resource), + {error, {resource_error, not_available}} end; not_found -> {error, {resource_type_not_found, Type}} end. +safe_test_resource(Fun, ErrorLogInfo) -> + try + _ = Fun(), + ok + catch E:R:S -> + ?LOG(warning, "safe exec fun error, ~0p, ~0p:~0p ~0p", [ErrorLogInfo, E, R, S]), + {error, R} + end. + -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). get_resource_status(ResId) -> case emqx_rule_registry:find_resource_params(ResId) of From 46cfcf662e7f1b06846f576be0e996a2a834228b Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 29 Mar 2022 11:24:13 +0800 Subject: [PATCH 2/3] fix(rule): safe apply & test resource in cluster --- .../src/emqx_rule_engine.appup.src | 6 ++- .../emqx_rule_engine/src/emqx_rule_engine.erl | 53 ++++++++----------- .../src/emqx_rule_engine_api.erl | 10 +--- 3 files changed, 26 insertions(+), 43 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 010171ab2..76af48e8f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,7 +2,8 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.3.8", - [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -81,7 +82,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.8", - [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 1a5fecea1..6c50aca2f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -36,6 +36,7 @@ , create_resource/1 , test_resource/1 , start_resource/1 + , is_source_alive/1 , get_resource_status/1 , get_resource_params/1 , delete_resource/1 @@ -318,45 +319,33 @@ test_resource(#{type := Type} = Params) -> case emqx_rule_registry:find_resource_type(Type) of {ok, #resource_type{}} -> ResId = maps:get(id, Params, resource_id()), - CreateFun = fun() -> _ = create_resource(Params) end, - StatusFun = - fun() -> - case get_resource_status(ResId) of - {ok, #{is_alive := true}} -> - ignore; - {ok, #{is_alive := false}} -> - error(not_alive); - {error, R} -> - error(R) - end - end, - DeleteFun = fun() -> _ = ?CLUSTER_CALL(delete_resource, [ResId]) end, - case - %% create error or status failed - (ok == safe_test_resource(CreateFun, create_resource)) - andalso - safe_test_resource(StatusFun, get_resource_status) - of - ok -> - _ = safe_test_resource(DeleteFun, delete_resource), - ok; - _ -> - _ = safe_test_resource(DeleteFun, delete_resource), - {error, {resource_error, not_available}} + try + _ = create_resource(maps:put(id, ResId, Params)), + true = is_source_alive(ResId), + ok + catch E:R:S -> + ?LOG(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]), + {error, R} + after + _ = ?CLUSTER_CALL(delete_resource, [ResId]) end; not_found -> {error, {resource_type_not_found, Type}} end. -safe_test_resource(Fun, ErrorLogInfo) -> - try - _ = Fun(), - ok - catch E:R:S -> - ?LOG(warning, "safe exec fun error, ~0p, ~0p:~0p ~0p", [ErrorLogInfo, E, R, S]), - {error, R} +is_source_alive(ResId) -> + case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, get_resource_status, [ResId], 5000) of + {ResL, []} -> + is_source_alive_(ResL); + {_, _Errors} -> + false end. +is_source_alive_([]) -> true; +is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL); +is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false; +is_source_alive_([_Error | _ResL]) -> false. + -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). get_resource_status(ResId) -> case emqx_rule_registry:find_resource_params(ResId) of diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 39ac1e9c2..313591ff3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -296,7 +296,7 @@ do_create_resource(Create, ParsedParams) -> list_resources(#{}, _Params) -> Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), Data = lists:map(fun(Res = #{id := ResId}) -> - Status = get_aggregated_status(ResId), + Status = emqx_rule_engine:is_source_alive(ResId), maps:put(status, Status, Res) end, Data0), return({ok, Data}). @@ -304,14 +304,6 @@ list_resources(#{}, _Params) -> list_resources_by_type(#{type := Type}, _Params) -> return_all(emqx_rule_registry:get_resources_by_type(Type)). -get_aggregated_status(ResId) -> - lists:all(fun(Node) -> - case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of - {ok, #{is_alive := true}} -> true; - _ -> false - end - end, ekka_mnesia:running_nodes()). - show_resource(#{id := Id}, _Params) -> case emqx_rule_registry:find_resource(Id) of {ok, R} -> From d297e883c24d40dda582a8d8a4803b70d0d493d7 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 29 Mar 2022 17:37:00 +0800 Subject: [PATCH 3/3] fix: appup --- apps/emqx_rule_engine/src/emqx_rule_engine.appup.src | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 76af48e8f..98ca43579 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,7 +2,10 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.3.8", - [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -82,7 +85,10 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.8", - [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},