diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index f453bb73c..26f6ef738 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -39,10 +39,11 @@ , get_resource_status/1 , get_resource_params/1 , delete_resource/1 - , update_resource/1 + , update_resource/2 ]). -export([ init_resource/4 + , init_resource/5 , init_action/4 , clear_resource/3 , clear_rule/1 @@ -244,27 +245,60 @@ create_resource(#{type := Type, config := Config0} = Params) -> {error, {resource_type_not_found, Type}} end. -update_resource(#{id := Id, type := Type, config := NewConfig, - description := Description} = NewResource) -> +-spec(update_resource(resource_id(), map()) -> ok | {error, Reason :: term()}). +update_resource(ResId, NewParams) -> + try + lists:foreach(fun(#rule{id = RuleId, enabled = Enabled, actions = Actions}) -> + lists:foreach( + fun (#action_instance{args = #{<<"$resource">> := ResId1}}) + when ResId =:= ResId1, Enabled == true -> + throw({dependency_exists, RuleId}); + (_) -> ok + end, Actions) + end, ets:tab2list(?RULE_TAB)), + do_update_resource_check(ResId, NewParams) + catch _ : Reason -> + {error, Reason} + end. + +do_update_resource_check(Id, NewParams) -> + case emqx_rule_registry:find_resource(Id) of + {ok, #resource{id = Id, + type = Type, + config = OldConfig, + description = OldDescription} = _OldResource} -> + try + do_update_resource(#{id => Id, + config => case maps:find(<<"config">>, NewParams) of + {ok, NewConfig} -> NewConfig; + error -> OldConfig + end, + type => Type, + description => case maps:find(<<"description">>, NewParams) of + {ok, NewDescription} -> NewDescription; + error -> OldDescription + end}), + ok + catch _ : Reason -> + {error, Reason} + end; + _Other -> + {error, not_found} + end. + +do_update_resource(#{id := Id, type := Type, description:= NewDescription, config:= NewConfig}) -> case emqx_rule_registry:find_resource_type(Type) of {ok, #resource_type{on_create = {Module, Create}, + on_destroy = {Module, Destroy}, params_spec = ParamSpec}} -> Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec), - case delete_resource(Id) of - {error, not_found} -> {error, not_found}; - _ -> %% deletion might fail because of an associted rule. - emqx_rule_registry:add_resource( - #resource{ - id = Id, - config = Config, - type = Type, - description = Description, - created_at = erlang:system_time(millisecond)}), - catch cluster_call(init_resource, [Module, Create, Id, Config, true]), - {ok, NewResource} - end; - not_found -> - {error, {resource_type_not_found, Type}} + cluster_call(init_resource, [Module, Create, Id, Config]), + emqx_rule_registry:add_resource(#resource{id = Id, + type = Type, + config = Config, + description = NewDescription, + created_at = erlang:system_time(millisecond)}), + cluster_call(clear_resource, [Module, Destroy, Id]) end. -spec(start_resource(resource_id()) -> ok | {error, Reason :: term()}). @@ -499,7 +533,7 @@ init_resource(Module, OnCreate, ResId, Config, Restart) -> Params = ?RAISE( Module:OnCreate(ResId, Config), Restart andalso - timer:apply_after(timer:seconds(60), ?MODULE, do_init_resource, + timer:apply_after(timer:seconds(60), ?MODULE, init_resource, [Module, OnCreate, ResId, Config, Restart]), {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}), ResParams = #resource_params{id = ResId, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 03f34fa5b..987157d63 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -327,27 +327,34 @@ start_resource(#{id := Id}, _Params) -> return({error, 400, ?ERR_BADARGS(Reason)}) end. -update_resource(#{id := Id}, Params) -> - case parse_resource_params(Params) of - {ok, ParsedParams} -> - case emqx_rule_registry:find_resource(Id) of - {ok, #resource{id = Id, type = Type} = _OldResource} -> - Config = maps:get(config, ParsedParams), - Description = maps:get(description, ParsedParams), - _ = emqx_rule_engine:update_resource( - #{id => Id, - config => Config, - type => Type, - description => Description, - created_at => erlang:system_time(millisecond)}), - return(ok); - _Other -> - return({error, 400, ?ERR_NO_RESOURCE(Id)}) - end; +update_resource(#{id := Id}, NewParams) -> + P1 = case proplists:get_value(<<"description">>, NewParams) of + undefined -> #{}; + Value -> #{<<"description">> => Value} + end, + P2 = case proplists:get_value(<<"config">>, NewParams) of + undefined -> #{}; + <<"{}">> -> #{}; + Map -> #{<<"config">> => ?RAISE(maps:from_list(Map), {invalid_config, Map})} + end, + case emqx_rule_engine:update_resource(Id, maps:merge(P1, P2)) of + ok -> + return(ok); + {error, not_found} -> + ?LOG(error, "resource not found: ~0p", [Id]), + return({error, 400, list_to_binary("resource not found:" ++ binary_to_list(Id))}); + {error, {init_resource_failure, _}} -> + ?LOG(error, "init resource failure: ~0p", [Id]), + return({error, 500, list_to_binary("init resource failure:" ++ binary_to_list(Id))}); + {error, {dependency_exists, RuleId}} -> + ?LOG(error, "dependency exists: ~0p", [RuleId]), + return({error, 500, list_to_binary("resource dependency by rule:" ++ binary_to_list(RuleId))}); {error, Reason} -> - return({error, 400, ?ERR_BADARGS(Reason)}) + ?LOG(error, "update resource failed: ~0p", [Reason]), + return({error, 500, <<"update resource failed,error info have been written to logfile!">>}) end. + delete_resource(#{id := Id}, _Params) -> case emqx_rule_engine:delete_resource(Id) of ok -> return(ok); @@ -524,7 +531,14 @@ parse_resource_params([_ | Params], Res) -> parse_resource_params(Params, Res). json_term_to_map(List) -> - emqx_json:decode(emqx_json:encode(List), [return_maps]). + Data = lists:map(fun({K, V}) -> + case V of + {} ->{K, [{}]}; + _ -> {K, V} + end + end, + List), + emqx_json:decode(emqx_json:encode(Data), [return_maps]). sort_by_title(action, Actions) -> sort_by(#action.title, Actions); @@ -544,4 +558,4 @@ get_rule_metrics(Id) -> get_action_metrics(Id) -> [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id])) - || Node <- ekka_mnesia:running_nodes()]. + || Node <- ekka_mnesia:running_nodes()]. \ No newline at end of file diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl index f165b95b1..8f950ab43 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl @@ -44,6 +44,12 @@ , {descr, $d, "descr", {binary, <<"">>}, "Description"} ]). +-define(OPTSPEC_RESOURCES_UPDATE, + [ {id, undefined, undefined, binary, "The resource id"} + , {config, $c, "config", {binary, undefined}, "Config"} + , {description, $d, "descr", {binary, undefined}, "Description"} + ]). + -define(OPTSPEC_RULES_CREATE, [ {sql, undefined, undefined, binary, "Filter Condition SQL"} , {actions, undefined, undefined, binary, "Action List in JSON format: [{\"name\": , \"params\": {: }}]"} @@ -61,7 +67,6 @@ , {on_action_failed, $g, "on_action_failed", {atom, undefined}, "'continue' or 'stop' when an action in the rule fails"} , {descr, $d, "descr", {binary, undefined}, "Description"} ]). - %%----------------------------------------------------------------------------- %% Load/Unload Commands %%----------------------------------------------------------------------------- @@ -148,6 +153,7 @@ actions(_Usage) -> %%------------------------------------------------------------------------------ %% 'resources' command %%------------------------------------------------------------------------------ + resources(["create" | Params]) -> with_opts(fun({Opts, _}) -> case emqx_rule_engine:create_resource(make_resource(Opts)) of @@ -158,6 +164,19 @@ resources(["create" | Params]) -> end end, Params, ?OPTSPEC_RESOURCES_CREATE, {?FUNCTION_NAME, create}); + +resources(["update" | Params]) -> + with_opts(fun({Opts, _}) -> + Id = maps:get(id, maps:from_list(Opts)), + Maps = make_updated_resource(Opts), + case emqx_rule_engine:update_resource(Id, Maps) of + ok -> + emqx_ctl:print("Resource update successfully~n"); + {error, Reason} -> + emqx_ctl:print("update resource failed, reason: ~p!~n", [Reason]) + end + end, Params, ?OPTSPEC_RESOURCES_UPDATE, {?FUNCTION_NAME, update}); + resources(["test" | Params]) -> with_opts(fun({Opts, _}) -> case emqx_rule_engine:test_resource(make_resource(Opts)) of @@ -192,7 +211,8 @@ resources(_Usage) -> emqx_ctl:usage([{"resources create", "Create a resource"}, {"resources list [-t ]", "List resources"}, {"resources show ", "Show a resource"}, - {"resources delete ", "Delete a resource"} + {"resources delete ", "Delete a resource"}, + {"resources update [-c ] [-d ]", "Update a resource"} ]). %%------------------------------------------------------------------------------ @@ -302,6 +322,17 @@ make_resource(Opts) -> config => ?RAISE(emqx_json:decode(Config, [return_maps]), {invalid_config, Config}), description => get_value(descr, Opts)}, id, <<"">>, Opts). +make_updated_resource(Opts) -> + P1 = case proplists:get_value(description, Opts) of + undefined -> #{}; + Value -> #{<<"description">> => Value} + end, + P2 = case proplists:get_value(config, Opts) of + undefined -> #{}; + Map -> #{<<"config">> => ?RAISE((emqx_json:decode(Map, [return_maps])), {invalid_config, Map})} + end, + maps:merge(P1, P2). + printable_actions(Actions) when is_list(Actions) -> emqx_json:encode([#{id => Id, name => Name, params => Args, metrics => get_action_metrics(Id), diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index a857ad60a..fe4c78fb1 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -260,6 +260,7 @@ init_per_testcase(_TestCase, Config) -> %ct:pal("============ ~p", [ets:tab2list(emqx_resource_type)]), Config. + end_per_testcase(t_events, Config) -> ets:delete(events_record_tab), ok = emqx_rule_registry:remove_rule(?config(hook_points_rules, Config)), @@ -443,24 +444,53 @@ t_crud_resources_api(_Config) -> ResId = maps:get(id, Resources1), {ok, #{code := 0, data := Resources}} = emqx_rule_engine_api:list_resources(#{},[]), ?assert(length(Resources) > 0), - {ok, #{code := 0, data := Resources2}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]), ?assertEqual(ResId, maps:get(id, Resources2)), - + % {ok, #{code := 0}} = emqx_rule_engine_api:update_resource(#{id => ResId}, - [{<<"id">>, ResId}, - {<<"type">>, <<"built_in">>}, - {<<"config">>, [{<<"a">>, 2}]}, + [{<<"config">>, [{<<"a">>, 2}]}, {<<"description">>, <<"2">>}]), {ok, #{code := 0, data := Resources3}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]), ?assertEqual(ResId, maps:get(id, Resources3)), ?assertEqual(#{<<"a">> => 2}, maps:get(config, Resources3)), ?assertEqual(<<"2">>, maps:get(description, Resources3)), - + {ok, #{code := 0, data := Resources3}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]), + ?assertEqual(ResId, maps:get(id, Resources3)), + % + {ok, #{code := 0}} = emqx_rule_engine_api:update_resource(#{id => ResId}, + [{<<"config">>, [{<<"a">>, 3}]}]), + {ok, #{code := 0, data := Resources4}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]), + ?assertEqual(ResId, maps:get(id, Resources4)), + ?assertEqual(#{<<"a">> => 3}, maps:get(config, Resources4)), + ?assertEqual(<<"2">>, maps:get(description, Resources4)), + % Only config + {ok, #{code := 0}} = emqx_rule_engine_api:update_resource(#{id => ResId}, + [{<<"config">>, [{<<"a">>, 1}, + {<<"b">>, 2}, + {<<"c">>, 3}]}]), + {ok, #{code := 0, data := Resources5}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]), + ?assertEqual(ResId, maps:get(id, Resources5)), + ?assertEqual(#{<<"a">> => 1, <<"b">> => 2, <<"c">> => 3}, maps:get(config, Resources5)), + ?assertEqual(<<"2">>, maps:get(description, Resources5)), + % Only description + {ok, #{code := 0}} = emqx_rule_engine_api:update_resource(#{id => ResId}, + [{<<"description">>, <<"new5">>}]), + {ok, #{code := 0, data := Resources6}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]), + ?assertEqual(ResId, maps:get(id, Resources6)), + ?assertEqual(#{<<"a">> => 1, <<"b">> => 2, <<"c">> => 3}, maps:get(config, Resources6)), + ?assertEqual(<<"new5">>, maps:get(description, Resources6)), + % None + {ok, #{code := 0}} = emqx_rule_engine_api:update_resource(#{id => ResId},[]), + {ok, #{code := 0, data := Resources7}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]), + ?assertEqual(ResId, maps:get(id, Resources7)), + ?assertEqual(#{<<"a">> => 1, <<"b">> => 2, <<"c">> => 3}, maps:get(config, Resources7)), + ?assertEqual(<<"new5">>, maps:get(description, Resources7)), + % ?assertMatch({ok, #{code := 0}}, emqx_rule_engine_api:delete_resource(#{id => ResId},#{})), ?assertMatch({ok, #{code := 404}}, emqx_rule_engine_api:show_resource(#{id => ResId},[])), ok. + t_list_resource_types_api(_Config) -> {ok, #{code := 0, data := ResourceTypes}} = emqx_rule_engine_api:list_resource_types(#{},[]), ?assert(length(ResourceTypes) > 0),