677 lines
23 KiB
Erlang
677 lines
23 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_rule_engine_api).
|
|
|
|
-include("rule_engine.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-logger_header("[RuleEngineAPI]").
|
|
|
|
-import(minirest, [return/1]).
|
|
|
|
-rest_api(#{name => create_rule,
|
|
method => 'POST',
|
|
path => "/rules/",
|
|
func => create_rule,
|
|
descr => "Create a rule"
|
|
}).
|
|
|
|
-rest_api(#{name => update_rule,
|
|
method => 'PUT',
|
|
path => "/rules/:bin:id",
|
|
func => update_rule,
|
|
descr => "Update a rule"
|
|
}).
|
|
|
|
-rest_api(#{name => list_rules,
|
|
method => 'GET',
|
|
path => "/rules/",
|
|
func => list_rules,
|
|
descr => "A list of all rules"
|
|
}).
|
|
|
|
-rest_api(#{name => show_rule,
|
|
method => 'GET',
|
|
path => "/rules/:bin:id",
|
|
func => show_rule,
|
|
descr => "Show a rule"
|
|
}).
|
|
|
|
-rest_api(#{name => delete_rule,
|
|
method => 'DELETE',
|
|
path => "/rules/:bin:id",
|
|
func => delete_rule,
|
|
descr => "Delete a rule"
|
|
}).
|
|
|
|
-rest_api(#{name => reset_metrics,
|
|
method => 'PUT',
|
|
path => "/rules/:bin:id/reset_metrics",
|
|
func => reset_metrics,
|
|
descr => "reset a rule metrics"
|
|
}).
|
|
|
|
-rest_api(#{name => list_actions,
|
|
method => 'GET',
|
|
path => "/actions/",
|
|
func => list_actions,
|
|
descr => "A list of all actions"
|
|
}).
|
|
|
|
-rest_api(#{name => show_action,
|
|
method => 'GET',
|
|
path => "/actions/:atom:name",
|
|
func => show_action,
|
|
descr => "Show an action"
|
|
}).
|
|
|
|
-rest_api(#{name => list_resources,
|
|
method => 'GET',
|
|
path => "/resources/",
|
|
func => list_resources,
|
|
descr => "A list of all resources"
|
|
}).
|
|
|
|
-rest_api(#{name => create_resource,
|
|
method => 'POST',
|
|
path => "/resources/",
|
|
func => create_resource,
|
|
descr => "Create a resource"
|
|
}).
|
|
|
|
-rest_api(#{name => update_resource,
|
|
method => 'PUT',
|
|
path => "/resources/:bin:id",
|
|
func => update_resource,
|
|
descr => "Update a resource"
|
|
}).
|
|
|
|
-rest_api(#{name => show_resource,
|
|
method => 'GET',
|
|
path => "/resources/:bin:id",
|
|
func => show_resource,
|
|
descr => "Show a resource"
|
|
}).
|
|
|
|
-rest_api(#{name => get_resource_status,
|
|
method => 'GET',
|
|
path => "/resource_status/:bin:id",
|
|
func => get_resource_status,
|
|
descr => "Get status of a resource"
|
|
}).
|
|
|
|
-rest_api(#{name => start_resource,
|
|
method => 'POST',
|
|
path => "/resources/:bin:id",
|
|
func => start_resource,
|
|
descr => "Start a resource"
|
|
}).
|
|
|
|
-rest_api(#{name => delete_resource,
|
|
method => 'DELETE',
|
|
path => "/resources/:bin:id",
|
|
func => delete_resource,
|
|
descr => "Delete a resource"
|
|
}).
|
|
|
|
-rest_api(#{name => list_resource_types,
|
|
method => 'GET',
|
|
path => "/resource_types/",
|
|
func => list_resource_types,
|
|
descr => "List all resource types"
|
|
}).
|
|
|
|
-rest_api(#{name => show_resource_type,
|
|
method => 'GET',
|
|
path => "/resource_types/:atom:name",
|
|
func => show_resource_type,
|
|
descr => "Show a resource type"
|
|
}).
|
|
|
|
-rest_api(#{name => list_resources_by_type,
|
|
method => 'GET',
|
|
path => "/resource_types/:atom:type/resources",
|
|
func => list_resources_by_type,
|
|
descr => "List all resources of a resource type"
|
|
}).
|
|
|
|
-rest_api(#{name => list_events,
|
|
method => 'GET',
|
|
path => "/rule_events/",
|
|
func => list_events,
|
|
descr => "List all events with detailed info"
|
|
}).
|
|
|
|
-export([ create_rule/2
|
|
, update_rule/2
|
|
, list_rules/2
|
|
, show_rule/2
|
|
, delete_rule/2
|
|
, reset_metrics/2
|
|
, reset_metrics_local/1
|
|
]).
|
|
|
|
-export([ list_actions/2
|
|
, show_action/2
|
|
]).
|
|
|
|
-export([ create_resource/2
|
|
, list_resources/2
|
|
, show_resource/2
|
|
, get_resource_status/2
|
|
, start_resource/2
|
|
, delete_resource/2
|
|
, update_resource/2
|
|
]).
|
|
|
|
-export([ list_resource_types/2
|
|
, list_resources_by_type/2
|
|
, show_resource_type/2
|
|
]).
|
|
|
|
-export([list_events/2]).
|
|
-export([query/3]).
|
|
|
|
-define(RULE_QS_SCHEMA, {?RULE_TAB,
|
|
[
|
|
{<<"enabled">>, atom},
|
|
{<<"for">>, binary},
|
|
{<<"_like_id">>, binary},
|
|
{<<"_like_for">>, binary},
|
|
{<<"_match_for">>, binary},
|
|
{<<"_like_description">>, binary}
|
|
]}).
|
|
|
|
-define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~s Not Found", [(ID)]))).
|
|
-define(ERR_NO_ACTION(NAME), list_to_binary(io_lib:format("Action ~s Not Found", [(NAME)]))).
|
|
-define(ERR_NO_RESOURCE(RESID), list_to_binary(io_lib:format("Resource ~s Not Found", [(RESID)]))).
|
|
-define(ERR_NO_RESOURCE_TYPE(TYPE), list_to_binary(io_lib:format("Resource Type ~s Not Found", [(TYPE)]))).
|
|
-define(ERR_DEP_RULES_EXISTS(RULEIDS), list_to_binary(io_lib:format("Found rules ~0p depends on this resource, disable them first", [(RULEIDS)]))).
|
|
-define(ERR_BADARGS(REASON),
|
|
begin
|
|
R0 = list_to_binary(io_lib:format("~0p", [REASON])),
|
|
<<"Bad Arguments: ", R0/binary>>
|
|
end).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Rules API
|
|
%%------------------------------------------------------------------------------
|
|
create_rule(_Bindings, Params) ->
|
|
if_test(fun() -> test_rule_sql(Params) end,
|
|
fun() -> do_create_rule(Params) end,
|
|
Params).
|
|
|
|
test_rule_sql(Params) ->
|
|
case emqx_rule_sqltester:test(emqx_json:decode(emqx_json:encode(Params), [return_maps])) of
|
|
{ok, Result} -> return({ok, Result});
|
|
{error, nomatch} -> return({error, 404, <<"SQL Not Match">>});
|
|
{error, Reason} ->
|
|
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
|
end.
|
|
|
|
do_create_rule(Params) ->
|
|
case parse_rule_params(Params) of
|
|
{ok, ParsedParams} ->
|
|
case maps:find(id, ParsedParams) of
|
|
{ok, RuleId} ->
|
|
case emqx_rule_registry:get_rule(RuleId) of
|
|
{ok, _} -> return({error, 400, <<"Already Exists">>});
|
|
not_found -> do_create_rule2(ParsedParams)
|
|
end;
|
|
error -> do_create_rule2(ParsedParams)
|
|
end;
|
|
{error, Reason} ->
|
|
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
|
end.
|
|
|
|
do_create_rule2(ParsedParams) ->
|
|
case emqx_rule_engine:create_rule(ParsedParams) of
|
|
{ok, Rule} -> return({ok, record_to_map(Rule)});
|
|
{error, {action_not_found, ActionName}} ->
|
|
return({error, 400, ?ERR_NO_ACTION(ActionName)});
|
|
{error, Reason} ->
|
|
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
|
end.
|
|
|
|
update_rule(#{id := Id0}, Params) ->
|
|
Id = urldecode(Id0),
|
|
case parse_rule_params(Params, #{id => Id}) of
|
|
{ok, ParsedParams} ->
|
|
case emqx_rule_engine:update_rule(ParsedParams) of
|
|
{ok, Rule} -> return({ok, record_to_map(Rule)});
|
|
{error, {not_found, RuleId}} ->
|
|
return({error, 400, ?ERR_NO_RULE(RuleId)});
|
|
{error, Reason} ->
|
|
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
|
end;
|
|
{error, Reason} ->
|
|
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
|
end.
|
|
|
|
list_rules(_Bindings, Params) ->
|
|
case proplists:get_value(<<"enable_paging">>, Params, true) of
|
|
true ->
|
|
SortFun = fun(#{created_at := C1}, #{created_at := C2}) -> C1 > C2 end,
|
|
return({ok, emqx_mgmt_api:node_query(node(), Params, ?RULE_QS_SCHEMA, {?MODULE, query}, SortFun)});
|
|
false ->
|
|
return_all(emqx_rule_registry:get_rules_ordered_by_ts())
|
|
end.
|
|
|
|
show_rule(#{id := Id0}, _Params) ->
|
|
Id = urldecode(Id0),
|
|
reply_with(fun emqx_rule_registry:get_rule/1, Id).
|
|
|
|
delete_rule(#{id := Id0}, _Params) ->
|
|
Id = urldecode(Id0),
|
|
ok = emqx_rule_engine:delete_rule(Id),
|
|
return(ok).
|
|
|
|
reset_metrics_local(Id0) ->
|
|
Id = urldecode(Id0),
|
|
emqx_rule_metrics:reset_metrics(Id).
|
|
|
|
reset_metrics(#{id := Id0}, _Params) ->
|
|
Id = urldecode(Id0),
|
|
_ = ?CLUSTER_CALL(reset_metrics_local, [Id]),
|
|
return(ok).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Actions API
|
|
%%------------------------------------------------------------------------------
|
|
|
|
list_actions(#{}, _Params) ->
|
|
return_all(
|
|
sort_by_title(action,
|
|
emqx_rule_registry:get_actions())).
|
|
|
|
show_action(#{name := Name}, _Params) ->
|
|
reply_with(fun emqx_rule_registry:find_action/1, Name).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Resources API
|
|
%%------------------------------------------------------------------------------
|
|
create_resource(#{}, Params) ->
|
|
case parse_resource_params(Params) of
|
|
{ok, ParsedParams} ->
|
|
if_test(fun() -> do_create_resource(test_resource, maps:without([id], ParsedParams)) end,
|
|
fun() -> do_create_resource(create_resource, ParsedParams) end,
|
|
Params);
|
|
{error, Reason} ->
|
|
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
|
end.
|
|
|
|
do_create_resource(Create, ParsedParams) ->
|
|
case maps:find(id, ParsedParams) of
|
|
{ok, ResId} ->
|
|
case emqx_rule_registry:find_resource(ResId) of
|
|
{ok, _} -> return({error, 400, <<"Already Exists">>});
|
|
not_found -> do_create_resource2(Create, ParsedParams)
|
|
end;
|
|
error -> do_create_resource2(Create, ParsedParams)
|
|
end.
|
|
|
|
do_create_resource2(Create, ParsedParams) ->
|
|
case emqx_rule_engine:Create(ParsedParams) of
|
|
ok ->
|
|
return(ok);
|
|
{ok, Resource} ->
|
|
return({ok, record_to_map(Resource)});
|
|
{error, {resource_type_not_found, Type}} ->
|
|
return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)});
|
|
{error, {init_resource, _}} ->
|
|
return({error, 500, <<"Init resource failure!">>});
|
|
{error, Reason} ->
|
|
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
|
end.
|
|
|
|
list_resources(#{}, _Params) ->
|
|
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
|
|
Data = lists:map(fun(Res = #{id := ResId}) ->
|
|
Status = emqx_rule_engine:is_resource_alive(ResId),
|
|
maps:put(status, Status, Res)
|
|
end, Data0),
|
|
return({ok, Data}).
|
|
|
|
list_resources_by_type(#{type := Type}, _Params) ->
|
|
return_all(emqx_rule_registry:get_resources_by_type(Type)).
|
|
|
|
show_resource(#{id := Id0}, _Params) ->
|
|
Id = urldecode(Id0),
|
|
case emqx_rule_registry:find_resource(Id) of
|
|
{ok, R} ->
|
|
StatusFun =
|
|
fun(Node) ->
|
|
#{
|
|
node => Node,
|
|
is_alive => emqx_rule_engine:is_resource_alive(Node, Id, #{fetch => false})
|
|
}
|
|
end,
|
|
Status = [StatusFun(Node) || Node <- ekka_mnesia:running_nodes()],
|
|
return({ok, maps:put(status, Status, record_to_map(R))});
|
|
not_found ->
|
|
return({error, 404, <<"Not Found">>})
|
|
end.
|
|
|
|
get_resource_status(#{id := Id0}, _Params) ->
|
|
Id = urldecode(Id0),
|
|
case emqx_rule_engine:get_resource_status(Id) of
|
|
{ok, Status} ->
|
|
return({ok, Status});
|
|
{error, resource_not_initialized} ->
|
|
return({error, 400, ?ERR_NO_RESOURCE(Id)})
|
|
end.
|
|
|
|
start_resource(#{id := Id0}, _Params) ->
|
|
Id = urldecode(Id0),
|
|
case emqx_rule_engine:start_resource(Id) of
|
|
ok ->
|
|
return(ok);
|
|
{error, {resource_not_found, ResId}} ->
|
|
return({error, 400, ?ERR_NO_RESOURCE(ResId)});
|
|
{error, Reason} ->
|
|
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
|
end.
|
|
|
|
update_resource(#{id := Id0}, NewParams) ->
|
|
Id = urldecode(Id0),
|
|
P1 = case proplists:get_value(<<"description">>, NewParams) of
|
|
undefined -> #{};
|
|
Value -> #{<<"description">> => Value}
|
|
end,
|
|
P2 = case proplists:get_value(<<"config">>, NewParams) of
|
|
undefined -> #{};
|
|
[{}] -> #{};
|
|
Config -> #{<<"config">> => ?RAISE(json_term_to_map(Config), {invalid_config, Config})}
|
|
end,
|
|
case emqx_rule_engine:update_resource(Id, maps:merge(P1, P2)) of
|
|
ok ->
|
|
return(ok);
|
|
{error, not_found} ->
|
|
return({error, 400, <<"Resource not found:", Id/binary>>});
|
|
{error, {init_resource, _}} ->
|
|
return({error, 500, <<"Init resource failure:", Id/binary>>});
|
|
{error, {dependent_rules_exists, RuleIds}} ->
|
|
return({error, 400, ?ERR_DEP_RULES_EXISTS(RuleIds)});
|
|
{error, Reason} ->
|
|
?LOG(error, "Resource update failed: ~0p", [Reason]),
|
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
|
end.
|
|
|
|
delete_resource(#{id := Id0}, _Params) ->
|
|
Id = urldecode(Id0),
|
|
case emqx_rule_engine:delete_resource(Id) of
|
|
ok -> return(ok);
|
|
{error, not_found} -> return(ok);
|
|
{error, {dependent_rules_exists, RuleIds}} ->
|
|
return({error, 400, ?ERR_DEP_RULES_EXISTS(RuleIds)});
|
|
{error, Reason} ->
|
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
|
end.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Resource Types API
|
|
%%------------------------------------------------------------------------------
|
|
|
|
list_resource_types(#{}, _Params) ->
|
|
return_all(
|
|
sort_by_title(resource_type,
|
|
emqx_rule_registry:get_resource_types())).
|
|
|
|
show_resource_type(#{name := Name}, _Params) ->
|
|
reply_with(fun emqx_rule_registry:find_resource_type/1, Name).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Events API
|
|
%%------------------------------------------------------------------------------
|
|
|
|
list_events(#{}, _Params) ->
|
|
return({ok, emqx_rule_events:event_info()}).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%------------------------------------------------------------------------------
|
|
|
|
if_test(True, False, Params) ->
|
|
case proplists:get_value(<<"test">>, Params) of
|
|
Test when Test =:= true; Test =:= <<"true">> ->
|
|
True();
|
|
_ ->
|
|
False()
|
|
end.
|
|
|
|
return_all(Records) ->
|
|
Data = lists:foldr(fun maybe_record_to_map/2, [], Records),
|
|
return({ok, Data}).
|
|
|
|
maybe_record_to_map(Rec, Acc) ->
|
|
case record_to_map(Rec) of
|
|
ignore -> Acc;
|
|
Map -> [Map | Acc]
|
|
end.
|
|
|
|
reply_with(Find, Key) ->
|
|
case Find(Key) of
|
|
{ok, R} ->
|
|
return({ok, record_to_map(R)});
|
|
not_found ->
|
|
return({error, 404, <<"Not Found">>})
|
|
end.
|
|
|
|
record_to_map(#rule{id = Id,
|
|
for = Hook,
|
|
rawsql = RawSQL,
|
|
actions = Actions,
|
|
on_action_failed = OnFailed,
|
|
enabled = Enabled,
|
|
created_at = CreatedAt,
|
|
description = Descr}) ->
|
|
#{id => Id,
|
|
for => Hook,
|
|
rawsql => RawSQL,
|
|
actions => printable_actions(Actions),
|
|
on_action_failed => OnFailed,
|
|
metrics => get_rule_metrics(Id),
|
|
enabled => Enabled,
|
|
created_at => CreatedAt,
|
|
description => Descr
|
|
};
|
|
|
|
record_to_map(#action{hidden = true}) ->
|
|
ignore;
|
|
record_to_map(#action{name = Name,
|
|
category = Category,
|
|
app = App,
|
|
for = Hook,
|
|
types = Types,
|
|
params_spec = Params,
|
|
title = Title,
|
|
description = Descr}) ->
|
|
#{name => Name,
|
|
category => Category,
|
|
app => App,
|
|
for => Hook,
|
|
types => Types,
|
|
params => Params,
|
|
title => Title,
|
|
description => Descr
|
|
};
|
|
|
|
record_to_map(#resource{id = Id,
|
|
type = Type,
|
|
config = Config,
|
|
description = Descr}) ->
|
|
#{id => Id,
|
|
type => Type,
|
|
config => Config,
|
|
description => Descr
|
|
};
|
|
|
|
record_to_map(#resource_type{name = Name,
|
|
provider = Provider,
|
|
params_spec = Params,
|
|
title = Title,
|
|
description = Descr}) ->
|
|
#{name => Name,
|
|
provider => Provider,
|
|
params => Params,
|
|
title => Title,
|
|
description => Descr
|
|
}.
|
|
|
|
printable_actions(Actions) ->
|
|
[#{id => Id, name => Name, params => Args,
|
|
metrics => get_action_metrics(Id),
|
|
fallbacks => printable_actions(Fallbacks)}
|
|
|| #action_instance{id = Id, name = Name, args = Args, fallbacks = Fallbacks} <- Actions].
|
|
|
|
parse_rule_params(Params) ->
|
|
parse_rule_params(Params, #{description => <<"">>}).
|
|
parse_rule_params([], Rule) ->
|
|
{ok, Rule};
|
|
parse_rule_params([{<<"id">>, <<>>} | _], _) ->
|
|
{error, {empty_string_not_allowed, id}};
|
|
parse_rule_params([{<<"id">>, Id} | Params], Rule) ->
|
|
parse_rule_params(Params, Rule#{id => Id});
|
|
parse_rule_params([{<<"rawsql">>, RawSQL} | Params], Rule) ->
|
|
parse_rule_params(Params, Rule#{rawsql => RawSQL});
|
|
parse_rule_params([{<<"enabled">>, Enabled} | Params], Rule) ->
|
|
parse_rule_params(Params, Rule#{enabled => enabled(Enabled)});
|
|
parse_rule_params([{<<"on_action_failed">>, OnFailed} | Params], Rule) ->
|
|
parse_rule_params(Params, Rule#{on_action_failed => on_failed(OnFailed)});
|
|
parse_rule_params([{<<"actions">>, Actions} | Params], Rule) ->
|
|
parse_rule_params(Params, Rule#{actions => parse_actions(Actions)});
|
|
parse_rule_params([{<<"description">>, Descr} | Params], Rule) ->
|
|
parse_rule_params(Params, Rule#{description => Descr});
|
|
parse_rule_params([_ | Params], Rule) ->
|
|
parse_rule_params(Params, Rule).
|
|
|
|
on_failed(<<"continue">>) -> continue;
|
|
on_failed(<<"stop">>) -> stop;
|
|
on_failed(OnFailed) -> error({invalid_on_failed, OnFailed}).
|
|
|
|
enabled(Enabled) when is_boolean(Enabled) -> Enabled;
|
|
enabled(Enabled) -> error({invalid_enabled, Enabled}).
|
|
|
|
parse_actions(Actions) ->
|
|
[parse_action(json_term_to_map(A)) || A <- Actions].
|
|
|
|
parse_action(Action) ->
|
|
#{name => binary_to_existing_atom(maps:get(<<"name">>, Action), utf8),
|
|
args => maps:get(<<"params">>, Action, #{}),
|
|
fallbacks => parse_actions(maps:get(<<"fallbacks">>, Action, []))}.
|
|
|
|
parse_resource_params(Params) ->
|
|
parse_resource_params(Params, #{config => #{}, description => <<"">>}).
|
|
parse_resource_params([], Res) ->
|
|
{ok, Res};
|
|
parse_resource_params([{<<"id">>, <<>>} | _], _Res) ->
|
|
{error, {empty_string_not_allowed, id}};
|
|
parse_resource_params([{<<"id">>, Id} | Params], Res) ->
|
|
parse_resource_params(Params, Res#{id => Id});
|
|
parse_resource_params([{<<"type">>, ResourceType} | Params], Res) ->
|
|
try parse_resource_params(Params, Res#{type => binary_to_existing_atom(ResourceType, utf8)})
|
|
catch error:badarg ->
|
|
{error, {resource_type_not_found, ResourceType}}
|
|
end;
|
|
parse_resource_params([{<<"config">>, Config} | Params], Res) ->
|
|
parse_resource_params(Params, Res#{config => json_term_to_map(Config)});
|
|
parse_resource_params([{<<"description">>, Descr} | Params], Res) ->
|
|
parse_resource_params(Params, Res#{description => Descr});
|
|
parse_resource_params([_ | Params], Res) ->
|
|
parse_resource_params(Params, Res).
|
|
|
|
json_term_to_map(List) ->
|
|
emqx_json:decode(emqx_json:encode(List), [return_maps]).
|
|
|
|
sort_by_title(action, Actions) ->
|
|
sort_by(#action.title, Actions);
|
|
sort_by_title(resource_type, ResourceTypes) ->
|
|
sort_by(#resource_type.title, ResourceTypes).
|
|
|
|
sort_by(Pos, TplList) ->
|
|
lists:sort(
|
|
fun(RecA, RecB) ->
|
|
maps:get(en, element(Pos, RecA), 0)
|
|
=< maps:get(en, element(Pos, RecB), 0)
|
|
end, TplList).
|
|
|
|
get_rule_metrics(Id) ->
|
|
lists:concat(
|
|
[ case rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]) of
|
|
{badrpc, _} -> [];
|
|
Res -> [maps:put(node, Node, Res)]
|
|
end
|
|
|| Node <- ekka_mnesia:running_nodes()]).
|
|
|
|
get_action_metrics(Id) ->
|
|
lists:concat(
|
|
[ case rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]) of
|
|
{badrpc, _} -> [];
|
|
Res -> [maps:put(node, Node, Res)]
|
|
end
|
|
|| Node <- ekka_mnesia:running_nodes()]).
|
|
|
|
query({Qs, []}, Start, Limit) ->
|
|
Ms = qs2ms(Qs),
|
|
emqx_mgmt_api:select_table(?RULE_TAB, Ms, Start, Limit, fun record_to_map/1);
|
|
|
|
query({Qs, Fuzzy}, Start, Limit) ->
|
|
Ms = qs2ms(Qs),
|
|
MatchFun = match_fun(Ms, Fuzzy),
|
|
emqx_mgmt_api:traverse_table(?RULE_TAB, MatchFun, Start, Limit, fun record_to_map/1).
|
|
|
|
qs2ms(Qs) ->
|
|
Init = #rule{for = '_', enabled = '_', _ = '_'},
|
|
MatchHead = lists:foldl(fun(Q, Acc) -> match_ms(Q, Acc) end, Init, Qs),
|
|
[{MatchHead, [], ['$_']}].
|
|
|
|
match_ms({for, '=:=', Value}, MatchHead) -> MatchHead#rule{for = Value};
|
|
match_ms({enabled, '=:=', Value}, MatchHead) -> MatchHead#rule{enabled = Value};
|
|
match_ms(_, MatchHead) -> MatchHead.
|
|
|
|
match_fun(Ms, Fuzzy) ->
|
|
MsC = ets:match_spec_compile(Ms),
|
|
fun(Rows) ->
|
|
Ls = ets:match_spec_run(Rows, MsC),
|
|
lists:filter(fun(E) -> run_fuzzy_match(E, Fuzzy) end, Ls)
|
|
end.
|
|
|
|
run_fuzzy_match(_, []) -> true;
|
|
run_fuzzy_match(E = #rule{id = Id}, [{id, like, Pattern}|Fuzzy]) ->
|
|
binary:match(Id, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
|
|
run_fuzzy_match(E = #rule{description = Desc}, [{description, like, Pattern}|Fuzzy]) ->
|
|
binary:match(Desc, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
|
|
run_fuzzy_match(E = #rule{for = Topics}, [{for, match, Pattern}|Fuzzy]) ->
|
|
lists:any(fun(For) -> emqx_topic:match(For, Pattern) end, Topics)
|
|
andalso run_fuzzy_match(E, Fuzzy);
|
|
run_fuzzy_match(E = #rule{for = Topics}, [{for, like, Pattern}|Fuzzy]) ->
|
|
lists:any(fun(For) -> binary:match(For, Pattern) /= nomatch end, Topics)
|
|
andalso run_fuzzy_match(E, Fuzzy);
|
|
run_fuzzy_match(_E, [{_Key, like, _SubStr}| _Fuzzy]) -> false.
|
|
|
|
urldecode(S) ->
|
|
emqx_http_lib:uri_decode(S).
|