%%-------------------------------------------------------------------- %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_rule_engine_api). -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). -logger_header("[RuleEngineAPI]"). -import(minirest, [return/1]). -rest_api(#{name => create_rule, method => 'POST', path => "/rules/", func => create_rule, descr => "Create a rule" }). -rest_api(#{name => update_rule, method => 'PUT', path => "/rules/:bin:id", func => update_rule, descr => "Update a rule" }). -rest_api(#{name => list_rules, method => 'GET', path => "/rules/", func => list_rules, descr => "A list of all rules" }). -rest_api(#{name => show_rule, method => 'GET', path => "/rules/:bin:id", func => show_rule, descr => "Show a rule" }). -rest_api(#{name => delete_rule, method => 'DELETE', path => "/rules/:bin:id", func => delete_rule, descr => "Delete a rule" }). -rest_api(#{name => reset_metrics, method => 'PUT', path => "/rules/:bin:id/reset_metrics", func => reset_metrics, descr => "reset a rule metrics" }). -rest_api(#{name => list_actions, method => 'GET', path => "/actions/", func => list_actions, descr => "A list of all actions" }). -rest_api(#{name => show_action, method => 'GET', path => "/actions/:atom:name", func => show_action, descr => "Show an action" }). -rest_api(#{name => list_resources, method => 'GET', path => "/resources/", func => list_resources, descr => "A list of all resources" }). -rest_api(#{name => create_resource, method => 'POST', path => "/resources/", func => create_resource, descr => "Create a resource" }). -rest_api(#{name => update_resource, method => 'PUT', path => "/resources/:bin:id", func => update_resource, descr => "Update a resource" }). -rest_api(#{name => show_resource, method => 'GET', path => "/resources/:bin:id", func => show_resource, descr => "Show a resource" }). -rest_api(#{name => get_resource_status, method => 'GET', path => "/resource_status/:bin:id", func => get_resource_status, descr => "Get status of a resource" }). -rest_api(#{name => start_resource, method => 'POST', path => "/resources/:bin:id", func => start_resource, descr => "Start a resource" }). -rest_api(#{name => delete_resource, method => 'DELETE', path => "/resources/:bin:id", func => delete_resource, descr => "Delete a resource" }). -rest_api(#{name => list_resource_types, method => 'GET', path => "/resource_types/", func => list_resource_types, descr => "List all resource types" }). -rest_api(#{name => show_resource_type, method => 'GET', path => "/resource_types/:atom:name", func => show_resource_type, descr => "Show a resource type" }). -rest_api(#{name => list_resources_by_type, method => 'GET', path => "/resource_types/:atom:type/resources", func => list_resources_by_type, descr => "List all resources of a resource type" }). -rest_api(#{name => list_events, method => 'GET', path => "/rule_events/", func => list_events, descr => "List all events with detailed info" }). -export([ create_rule/2 , update_rule/2 , list_rules/2 , show_rule/2 , delete_rule/2 , reset_metrics/2 , reset_metrics_local/1 ]). -export([ list_actions/2 , show_action/2 ]). -export([ create_resource/2 , list_resources/2 , show_resource/2 , get_resource_status/2 , start_resource/2 , delete_resource/2 , update_resource/2 ]). -export([ list_resource_types/2 , list_resources_by_type/2 , show_resource_type/2 ]). -export([list_events/2]). -export([query/3]). -define(RULE_QS_SCHEMA, {?RULE_TAB, [ {<<"enabled">>, atom}, {<<"for">>, binary}, {<<"_like_id">>, binary}, {<<"_like_for">>, binary}, {<<"_match_for">>, binary}, {<<"_like_description">>, binary} ]}). -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~s Not Found", [(ID)]))). -define(ERR_NO_ACTION(NAME), list_to_binary(io_lib:format("Action ~s Not Found", [(NAME)]))). -define(ERR_NO_RESOURCE(RESID), list_to_binary(io_lib:format("Resource ~s Not Found", [(RESID)]))). -define(ERR_NO_RESOURCE_TYPE(TYPE), list_to_binary(io_lib:format("Resource Type ~s Not Found", [(TYPE)]))). -define(ERR_DEP_RULES_EXISTS(RULEIDS), list_to_binary(io_lib:format("Found rules ~0p depends on this resource, disable them first", [(RULEIDS)]))). -define(ERR_BADARGS(REASON), begin R0 = list_to_binary(io_lib:format("~0p", [REASON])), <<"Bad Arguments: ", R0/binary>> end). %%------------------------------------------------------------------------------ %% Rules API %%------------------------------------------------------------------------------ create_rule(_Bindings, Params) -> if_test(fun() -> test_rule_sql(Params) end, fun() -> do_create_rule(Params) end, Params). test_rule_sql(Params) -> case emqx_rule_sqltester:test(emqx_json:decode(emqx_json:encode(Params), [return_maps])) of {ok, Result} -> return({ok, Result}); {error, nomatch} -> return({error, 404, <<"SQL Not Match">>}); {error, Reason} -> ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. do_create_rule(Params) -> case parse_rule_params(Params) of {ok, ParsedParams} -> case maps:find(id, ParsedParams) of {ok, RuleId} -> case emqx_rule_registry:get_rule(RuleId) of {ok, _} -> return({error, 400, <<"Already Exists">>}); not_found -> do_create_rule2(ParsedParams) end; error -> do_create_rule2(ParsedParams) end; {error, Reason} -> ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. do_create_rule2(ParsedParams) -> case emqx_rule_engine:create_rule(ParsedParams) of {ok, Rule} -> return({ok, record_to_map(Rule)}); {error, {action_not_found, ActionName}} -> return({error, 400, ?ERR_NO_ACTION(ActionName)}); {error, Reason} -> ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. update_rule(#{id := Id0}, Params) -> Id = urldecode(Id0), case parse_rule_params(Params, #{id => Id}) of {ok, ParsedParams} -> case emqx_rule_engine:update_rule(ParsedParams) of {ok, Rule} -> return({ok, record_to_map(Rule)}); {error, {not_found, RuleId}} -> return({error, 400, ?ERR_NO_RULE(RuleId)}); {error, Reason} -> ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end; {error, Reason} -> ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. list_rules(_Bindings, Params) -> case proplists:get_value(<<"enable_paging">>, Params, true) of true -> SortFun = fun(#{created_at := C1}, #{created_at := C2}) -> C1 > C2 end, return({ok, emqx_mgmt_api:node_query(node(), Params, ?RULE_QS_SCHEMA, {?MODULE, query}, SortFun)}); false -> return_all(emqx_rule_registry:get_rules_ordered_by_ts()) end. show_rule(#{id := Id0}, _Params) -> Id = urldecode(Id0), reply_with(fun emqx_rule_registry:get_rule/1, Id). delete_rule(#{id := Id0}, _Params) -> Id = urldecode(Id0), ok = emqx_rule_engine:delete_rule(Id), return(ok). reset_metrics_local(Id0) -> Id = urldecode(Id0), emqx_rule_metrics:reset_metrics(Id). reset_metrics(#{id := Id0}, _Params) -> Id = urldecode(Id0), _ = ?CLUSTER_CALL(reset_metrics_local, [Id]), return(ok). %%------------------------------------------------------------------------------ %% Actions API %%------------------------------------------------------------------------------ list_actions(#{}, _Params) -> return_all( sort_by_title(action, emqx_rule_registry:get_actions())). show_action(#{name := Name}, _Params) -> reply_with(fun emqx_rule_registry:find_action/1, Name). %%------------------------------------------------------------------------------ %% Resources API %%------------------------------------------------------------------------------ create_resource(#{}, Params) -> case parse_resource_params(Params) of {ok, ParsedParams} -> if_test(fun() -> do_create_resource(test_resource, maps:without([id], ParsedParams)) end, fun() -> do_create_resource(create_resource, ParsedParams) end, Params); {error, Reason} -> ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. do_create_resource(Create, ParsedParams) -> case maps:find(id, ParsedParams) of {ok, ResId} -> case emqx_rule_registry:find_resource(ResId) of {ok, _} -> return({error, 400, <<"Already Exists">>}); not_found -> do_create_resource2(Create, ParsedParams) end; error -> do_create_resource2(Create, ParsedParams) end. do_create_resource2(Create, ParsedParams) -> case emqx_rule_engine:Create(ParsedParams) of ok -> return(ok); {ok, Resource} -> return({ok, record_to_map(Resource)}); {error, {resource_type_not_found, Type}} -> return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)}); {error, {init_resource, _}} -> return({error, 500, <<"Init resource failure!">>}); {error, Reason} -> ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. list_resources(#{}, _Params) -> Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), Data = lists:map(fun(Res = #{id := ResId}) -> Status = emqx_rule_engine:is_resource_alive(ResId), maps:put(status, Status, Res) end, Data0), return({ok, Data}). list_resources_by_type(#{type := Type}, _Params) -> return_all(emqx_rule_registry:get_resources_by_type(Type)). show_resource(#{id := Id0}, _Params) -> Id = urldecode(Id0), case emqx_rule_registry:find_resource(Id) of {ok, R} -> StatusFun = fun(Node) -> #{ node => Node, is_alive => emqx_rule_engine:is_resource_alive(Node, Id, #{fetch => false}) } end, Status = [StatusFun(Node) || Node <- ekka_mnesia:running_nodes()], return({ok, maps:put(status, Status, record_to_map(R))}); not_found -> return({error, 404, <<"Not Found">>}) end. get_resource_status(#{id := Id0}, _Params) -> Id = urldecode(Id0), case emqx_rule_engine:get_resource_status(Id) of {ok, Status} -> return({ok, Status}); {error, resource_not_initialized} -> return({error, 400, ?ERR_NO_RESOURCE(Id)}) end. start_resource(#{id := Id0}, _Params) -> Id = urldecode(Id0), case emqx_rule_engine:start_resource(Id) of ok -> return(ok); {error, {resource_not_found, ResId}} -> return({error, 400, ?ERR_NO_RESOURCE(ResId)}); {error, Reason} -> ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. update_resource(#{id := Id0}, NewParams) -> Id = urldecode(Id0), P1 = case proplists:get_value(<<"description">>, NewParams) of undefined -> #{}; Value -> #{<<"description">> => Value} end, P2 = case proplists:get_value(<<"config">>, NewParams) of undefined -> #{}; [{}] -> #{}; Config -> #{<<"config">> => ?RAISE(json_term_to_map(Config), {invalid_config, Config})} end, case emqx_rule_engine:update_resource(Id, maps:merge(P1, P2)) of ok -> return(ok); {error, not_found} -> return({error, 400, <<"Resource not found:", Id/binary>>}); {error, {init_resource, _}} -> return({error, 500, <<"Init resource failure:", Id/binary>>}); {error, {dependent_rules_exists, RuleIds}} -> return({error, 400, ?ERR_DEP_RULES_EXISTS(RuleIds)}); {error, Reason} -> ?LOG(error, "Resource update failed: ~0p", [Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. delete_resource(#{id := Id0}, _Params) -> Id = urldecode(Id0), case emqx_rule_engine:delete_resource(Id) of ok -> return(ok); {error, not_found} -> return(ok); {error, {dependent_rules_exists, RuleIds}} -> return({error, 400, ?ERR_DEP_RULES_EXISTS(RuleIds)}); {error, Reason} -> return({error, 400, ?ERR_BADARGS(Reason)}) end. %%------------------------------------------------------------------------------ %% Resource Types API %%------------------------------------------------------------------------------ list_resource_types(#{}, _Params) -> return_all( sort_by_title(resource_type, emqx_rule_registry:get_resource_types())). show_resource_type(#{name := Name}, _Params) -> reply_with(fun emqx_rule_registry:find_resource_type/1, Name). %%------------------------------------------------------------------------------ %% Events API %%------------------------------------------------------------------------------ list_events(#{}, _Params) -> return({ok, emqx_rule_events:event_info()}). %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ if_test(True, False, Params) -> case proplists:get_value(<<"test">>, Params) of Test when Test =:= true; Test =:= <<"true">> -> True(); _ -> False() end. return_all(Records) -> Data = lists:foldr(fun maybe_record_to_map/2, [], Records), return({ok, Data}). maybe_record_to_map(Rec, Acc) -> case record_to_map(Rec) of ignore -> Acc; Map -> [Map | Acc] end. reply_with(Find, Key) -> case Find(Key) of {ok, R} -> return({ok, record_to_map(R)}); not_found -> return({error, 404, <<"Not Found">>}) end. record_to_map(#rule{id = Id, for = Hook, rawsql = RawSQL, actions = Actions, on_action_failed = OnFailed, enabled = Enabled, created_at = CreatedAt, description = Descr}) -> #{id => Id, for => Hook, rawsql => RawSQL, actions => printable_actions(Actions), on_action_failed => OnFailed, metrics => get_rule_metrics(Id), enabled => Enabled, created_at => CreatedAt, description => Descr }; record_to_map(#action{hidden = true}) -> ignore; record_to_map(#action{name = Name, category = Category, app = App, for = Hook, types = Types, params_spec = Params, title = Title, description = Descr}) -> #{name => Name, category => Category, app => App, for => Hook, types => Types, params => Params, title => Title, description => Descr }; record_to_map(#resource{id = Id, type = Type, config = Config, description = Descr}) -> #{id => Id, type => Type, config => Config, description => Descr }; record_to_map(#resource_type{name = Name, provider = Provider, params_spec = Params, title = Title, description = Descr}) -> #{name => Name, provider => Provider, params => Params, title => Title, description => Descr }. printable_actions(Actions) -> [#{id => Id, name => Name, params => Args, metrics => get_action_metrics(Id), fallbacks => printable_actions(Fallbacks)} || #action_instance{id = Id, name = Name, args = Args, fallbacks = Fallbacks} <- Actions]. parse_rule_params(Params) -> parse_rule_params(Params, #{description => <<"">>}). parse_rule_params([], Rule) -> {ok, Rule}; parse_rule_params([{<<"id">>, <<>>} | _], _) -> {error, {empty_string_not_allowed, id}}; parse_rule_params([{<<"id">>, Id} | Params], Rule) -> parse_rule_params(Params, Rule#{id => Id}); parse_rule_params([{<<"rawsql">>, RawSQL} | Params], Rule) -> parse_rule_params(Params, Rule#{rawsql => RawSQL}); parse_rule_params([{<<"enabled">>, Enabled} | Params], Rule) -> parse_rule_params(Params, Rule#{enabled => enabled(Enabled)}); parse_rule_params([{<<"on_action_failed">>, OnFailed} | Params], Rule) -> parse_rule_params(Params, Rule#{on_action_failed => on_failed(OnFailed)}); parse_rule_params([{<<"actions">>, Actions} | Params], Rule) -> parse_rule_params(Params, Rule#{actions => parse_actions(Actions)}); parse_rule_params([{<<"description">>, Descr} | Params], Rule) -> parse_rule_params(Params, Rule#{description => Descr}); parse_rule_params([_ | Params], Rule) -> parse_rule_params(Params, Rule). on_failed(<<"continue">>) -> continue; on_failed(<<"stop">>) -> stop; on_failed(OnFailed) -> error({invalid_on_failed, OnFailed}). enabled(Enabled) when is_boolean(Enabled) -> Enabled; enabled(Enabled) -> error({invalid_enabled, Enabled}). parse_actions(Actions) -> [parse_action(json_term_to_map(A)) || A <- Actions]. parse_action(Action) -> #{name => binary_to_existing_atom(maps:get(<<"name">>, Action), utf8), args => maps:get(<<"params">>, Action, #{}), fallbacks => parse_actions(maps:get(<<"fallbacks">>, Action, []))}. parse_resource_params(Params) -> parse_resource_params(Params, #{config => #{}, description => <<"">>}). parse_resource_params([], Res) -> {ok, Res}; parse_resource_params([{<<"id">>, <<>>} | _], _Res) -> {error, {empty_string_not_allowed, id}}; parse_resource_params([{<<"id">>, Id} | Params], Res) -> parse_resource_params(Params, Res#{id => Id}); parse_resource_params([{<<"type">>, ResourceType} | Params], Res) -> try parse_resource_params(Params, Res#{type => binary_to_existing_atom(ResourceType, utf8)}) catch error:badarg -> {error, {resource_type_not_found, ResourceType}} end; parse_resource_params([{<<"config">>, Config} | Params], Res) -> parse_resource_params(Params, Res#{config => json_term_to_map(Config)}); parse_resource_params([{<<"description">>, Descr} | Params], Res) -> parse_resource_params(Params, Res#{description => Descr}); parse_resource_params([_ | Params], Res) -> parse_resource_params(Params, Res). json_term_to_map(List) -> emqx_json:decode(emqx_json:encode(List), [return_maps]). sort_by_title(action, Actions) -> sort_by(#action.title, Actions); sort_by_title(resource_type, ResourceTypes) -> sort_by(#resource_type.title, ResourceTypes). sort_by(Pos, TplList) -> lists:sort( fun(RecA, RecB) -> maps:get(en, element(Pos, RecA), 0) =< maps:get(en, element(Pos, RecB), 0) end, TplList). get_rule_metrics(Id) -> lists:concat( [ case rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]) of {badrpc, _} -> []; Res -> [maps:put(node, Node, Res)] end || Node <- ekka_mnesia:running_nodes()]). get_action_metrics(Id) -> lists:concat( [ case rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]) of {badrpc, _} -> []; Res -> [maps:put(node, Node, Res)] end || Node <- ekka_mnesia:running_nodes()]). query({Qs, []}, Start, Limit) -> Ms = qs2ms(Qs), emqx_mgmt_api:select_table(?RULE_TAB, Ms, Start, Limit, fun record_to_map/1); query({Qs, Fuzzy}, Start, Limit) -> Ms = qs2ms(Qs), MatchFun = match_fun(Ms, Fuzzy), emqx_mgmt_api:traverse_table(?RULE_TAB, MatchFun, Start, Limit, fun record_to_map/1). qs2ms(Qs) -> Init = #rule{for = '_', enabled = '_', _ = '_'}, MatchHead = lists:foldl(fun(Q, Acc) -> match_ms(Q, Acc) end, Init, Qs), [{MatchHead, [], ['$_']}]. match_ms({for, '=:=', Value}, MatchHead) -> MatchHead#rule{for = Value}; match_ms({enabled, '=:=', Value}, MatchHead) -> MatchHead#rule{enabled = Value}; match_ms(_, MatchHead) -> MatchHead. match_fun(Ms, Fuzzy) -> MsC = ets:match_spec_compile(Ms), fun(Rows) -> Ls = ets:match_spec_run(Rows, MsC), lists:filter(fun(E) -> run_fuzzy_match(E, Fuzzy) end, Ls) end. run_fuzzy_match(_, []) -> true; run_fuzzy_match(E = #rule{id = Id}, [{id, like, Pattern}|Fuzzy]) -> binary:match(Id, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy); run_fuzzy_match(E = #rule{description = Desc}, [{description, like, Pattern}|Fuzzy]) -> binary:match(Desc, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy); run_fuzzy_match(E = #rule{for = Topics}, [{for, match, Pattern}|Fuzzy]) -> lists:any(fun(For) -> emqx_topic:match(For, Pattern) end, Topics) andalso run_fuzzy_match(E, Fuzzy); run_fuzzy_match(E = #rule{for = Topics}, [{for, like, Pattern}|Fuzzy]) -> lists:any(fun(For) -> binary:match(For, Pattern) /= nomatch end, Topics) andalso run_fuzzy_match(E, Fuzzy); run_fuzzy_match(_E, [{_Key, like, _SubStr}| _Fuzzy]) -> false. urldecode(S) -> emqx_http_lib:uri_decode(S).