diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index f0d73e581..cd57630b4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -1,6 +1,6 @@ {application, emqx_rule_engine, [{description, "EMQ X Rule Engine"}, - {vsn, "4.3.9"}, % strict semver, bump manually! + {vsn, "4.3.8"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_registry]}, {applications, [kernel,stdlib,rulesql,getopt]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 98ca43579..0ed0fee3a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,13 +1,7 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.8", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, - {"4.3.7", + [{"4.3.7", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -84,13 +78,7 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.8", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, - {"4.3.7", + [{"4.3.7", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 6c50aca2f..c8e69a17f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -36,7 +36,6 @@ , create_resource/1 , test_resource/1 , start_resource/1 - , is_source_alive/1 , get_resource_status/1 , get_resource_params/1 , delete_resource/1 @@ -315,37 +314,24 @@ start_resource(ResId) -> end. -spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}). -test_resource(#{type := Type} = Params) -> +test_resource(#{type := Type, config := Config0}) -> case emqx_rule_registry:find_resource_type(Type) of - {ok, #resource_type{}} -> - ResId = maps:get(id, Params, resource_id()), + {ok, #resource_type{on_create = {ModC, Create}, + on_destroy = {ModD, Destroy}, + params_spec = ParamSpec}} -> + Config = emqx_rule_validator:validate_params(Config0, ParamSpec), + ResId = resource_id(), try - _ = create_resource(maps:put(id, ResId, Params)), - true = is_source_alive(ResId), + _ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]), + _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), ok - catch E:R:S -> - ?LOG(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]), - {error, R} - after - _ = ?CLUSTER_CALL(delete_resource, [ResId]) + catch + throw:Reason -> {error, Reason} end; not_found -> {error, {resource_type_not_found, Type}} end. -is_source_alive(ResId) -> - case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, get_resource_status, [ResId], 5000) of - {ResL, []} -> - is_source_alive_(ResL); - {_, _Errors} -> - false - end. - -is_source_alive_([]) -> true; -is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL); -is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false; -is_source_alive_([_Error | _ResL]) -> false. - -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). get_resource_status(ResId) -> case emqx_rule_registry:find_resource_params(ResId) of diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 313591ff3..39ac1e9c2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -296,7 +296,7 @@ do_create_resource(Create, ParsedParams) -> list_resources(#{}, _Params) -> Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), Data = lists:map(fun(Res = #{id := ResId}) -> - Status = emqx_rule_engine:is_source_alive(ResId), + Status = get_aggregated_status(ResId), maps:put(status, Status, Res) end, Data0), return({ok, Data}). @@ -304,6 +304,14 @@ list_resources(#{}, _Params) -> list_resources_by_type(#{type := Type}, _Params) -> return_all(emqx_rule_registry:get_resources_by_type(Type)). +get_aggregated_status(ResId) -> + lists:all(fun(Node) -> + case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of + {ok, #{is_alive := true}} -> true; + _ -> false + end + end, ekka_mnesia:running_nodes()). + show_resource(#{id := Id}, _Params) -> case emqx_rule_registry:find_resource(Id) of {ok, R} ->