refactor(rule): generate swagger from hocon schema for /rules

This commit is contained in:
Shawn 2021-12-03 10:50:22 +08:00
parent 24bded99d5
commit 416b9f8d7c
5 changed files with 217 additions and 201 deletions

View File

@ -32,13 +32,40 @@ check_params(Params, Tag) ->
roots() ->
[ {"rule_creation", sc(ref("rule_creation"), #{desc => "Schema for creating rules"})}
, {"rule_info", sc(ref("rule_info"), #{desc => "Schema for rule info"})}
, {"rule_events", sc(ref("rule_events"), #{desc => "Schema for rule events"})}
, {"rule_test", sc(ref("rule_test"), #{desc => "Schema for testing rules"})}
].
fields("rule_creation") ->
[ {"id", sc(binary(), #{desc => "The Id of the rule", nullable => false})}
[ {"id", sc(binary(),
#{ desc => "The Id of the rule", nullable => false
, example => "my_rule_id"
})}
] ++ emqx_rule_engine_schema:fields("rules");
fields("rule_info") ->
[ {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})}
, {"node_metrics", sc(ref("node_metrics"), #{desc => "The metrics of the rule"})}
, {"from", sc(hoconsc:array(binary()),
#{desc => "The topics of the rule", example => "t/#"})}
, {"created_at", sc(binary(),
#{ desc => "The created time of the rule"
, example => "2021-12-01T15:00:43.153+08:00"
})}
] ++ fields("rule_creation");
%% TODO: we can delete this API if the Dashboard not denpends on it
fields("rule_events") ->
ETopics = [emqx_rule_events:event_topic(E) || E <- emqx_rule_events:event_names()],
[ {"event", sc(hoconsc:enum(ETopics), #{desc => "The event topics", nullable => false})}
, {"title", sc(binary(), #{desc => "The title", example => "some title"})}
, {"description", sc(binary(), #{desc => "The description", example => "some desc"})}
, {"columns", sc(map(), #{desc => "The columns"})}
, {"test_columns", sc(map(), #{desc => "The test columns"})}
, {"sql_example", sc(binary(), #{desc => "The sql_example"})}
];
fields("rule_test") ->
[ {"context", sc(hoconsc:union([ ref("ctx_pub")
, ref("ctx_sub")
@ -53,6 +80,18 @@ fields("rule_test") ->
, {"sql", sc(binary(), #{desc => "The SQL of the rule for testing", nullable => false})}
];
fields("metrics") ->
[ {"matched", sc(integer(), #{desc => "How much times this rule is matched"})}
, {"rate", sc(float(), #{desc => "The rate of matched, times/second"})}
, {"rate_max", sc(float(), #{desc => "The max rate of matched, times/second"})}
, {"rate_last5m", sc(float(),
#{desc => "The average rate of matched in last 5 mins, times/second"})}
];
fields("node_metrics") ->
[ {"node", sc(binary(), #{desc => "The node name", example => "emqx@127.0.0.1"})}
] ++ fields("metrics");
fields("ctx_pub") ->
[ {"event_type", sc(message_publish, #{desc => "Event Type", nullable => false})}
, {"id", sc(binary(), #{desc => "Message ID"})}

View File

@ -18,16 +18,17 @@
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("typerefl/include/types.hrl").
-behaviour(minirest_api).
-export([api_spec/0]).
-import(hoconsc, [mk/2, ref/2, array/1]).
-export([ crud_rules/2
, list_events/2
, crud_rules_by_id/2
, rule_test/2
]).
%% Swagger specs from hocon schema
-export([api_spec/0, paths/0, schema/1, namespace/0]).
%% API callbacks
-export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2]).
-define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))).
-define(ERR_BADARGS(REASON),
@ -43,210 +44,130 @@
{400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}}
end).
namespace() -> "rule".
api_spec() ->
{
[ api_rules_list_create()
, api_rules_crud()
, api_rule_test()
, api_events_list()
],
[]
}.
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
api_rules_list_create() ->
Metadata = #{
paths() -> ["/rule_events", "/rule_test", "/rules", "/rules/:id"].
error_schema(Code, Message) ->
[ {code, mk(string(), #{example => Code})}
, {message, mk(string(), #{example => Message})}
].
rule_creation_schema() ->
ref(emqx_rule_api_schema, "rule_creation").
rule_update_schema() ->
ref(emqx_rule_engine_schema, "rules").
rule_test_schema() ->
ref(emqx_rule_api_schema, "rule_test").
rule_info_schema() ->
ref(emqx_rule_api_schema, "rule_info").
schema("/rules") ->
#{
operationId => '/rules',
get => #{
tags => [<<"rules">>],
description => <<"List all rules">>,
summary => <<"List Rules">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:array_schema(resp_schema(), <<"List rules successfully">>)}},
200 => mk(array(rule_info_schema()), #{desc => "List of rules"})
}},
post => #{
description => <<"Create a new rule using given Id to all nodes in the cluster">>,
'requestBody' => emqx_mgmt_util:schema(post_req_schema(), <<"Rule parameters">>),
tags => [<<"rules">>],
description => <<"Create a new rule using given Id">>,
summary => <<"Create a Rule">>,
requestBody => rule_creation_schema(),
responses => #{
<<"400">> =>
emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
<<"201">> =>
emqx_mgmt_util:schema(resp_schema(), <<"Create rule successfully">>)}}
},
{"/rules", Metadata, crud_rules}.
400 => error_schema('BAD_ARGS', "Invalid Parameters"),
201 => rule_info_schema()
}}
};
api_events_list() ->
Metadata = #{
schema("/rule_events") ->
#{
operationId => '/rule_events',
get => #{
tags => [<<"rules">>],
description => <<"List all events can be used in rules">>,
summary => <<"List Events">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:array_schema(resp_schema(), <<"List events successfully">>)}}
},
{"/rule_events", Metadata, list_events}.
200 => mk(ref(emqx_rule_api_schema, "rule_events"), #{})
}
}
};
api_rules_crud() ->
Metadata = #{
schema("/rules/:id") ->
#{
operationId => '/rules/:id',
get => #{
tags => [<<"rules">>],
description => <<"Get a rule by given Id">>,
parameters => [param_path_id()],
summary => <<"Get a Rule">>,
parameters => param_path_id(),
responses => #{
<<"404">> =>
emqx_mgmt_util:error_schema(<<"Rule not found">>, ['NOT_FOUND']),
<<"200">> =>
emqx_mgmt_util:schema(resp_schema(), <<"Get rule successfully">>)}},
404 => error_schema('NOT_FOUND', "Rule not found"),
200 => rule_info_schema()
}
},
put => #{
tags => [<<"rules">>],
description => <<"Update a rule by given Id to all nodes in the cluster">>,
parameters => [param_path_id()],
'requestBody' => emqx_mgmt_util:schema(put_req_schema(), <<"Rule parameters">>),
summary => <<"Update a Rule">>,
parameters => param_path_id(),
requestBody => rule_update_schema(),
responses => #{
<<"400">> =>
emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
<<"200">> =>
emqx_mgmt_util:schema(resp_schema(),
<<"Update rule successfully">>)}},
400 => error_schema('BAD_ARGS', "Invalid Parameters"),
200 => rule_info_schema()
}
},
delete => #{
tags => [<<"rules">>],
description => <<"Delete a rule by given Id from all nodes in the cluster">>,
parameters => [param_path_id()],
summary => <<"Delete a Rule">>,
parameters => param_path_id(),
responses => #{
<<"204">> =>
emqx_mgmt_util:schema(<<"Delete rule successfully">>)}}
},
{"/rules/:id", Metadata, crud_rules_by_id}.
204 => <<"Delete rule successfully">>
}
}
};
api_rule_test() ->
Metadata = #{
schema("/rule_test") ->
#{
operationId => '/rule_test',
post => #{
tags => [<<"rules">>],
description => <<"Test a rule">>,
'requestBody' => emqx_mgmt_util:schema(rule_test_req_schema(), <<"Rule parameters">>),
summary => <<"Test a Rule">>,
requestBody => rule_test_schema(),
responses => #{
<<"400">> =>
emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
<<"412">> =>
emqx_mgmt_util:error_schema(<<"SQL Not Match">>, ['NOT_MATCH']),
<<"200">> =>
emqx_mgmt_util:schema(rule_test_resp_schema(), <<"Rule Test Pass">>)}}
},
{"/rule_test", Metadata, rule_test}.
put_req_schema() ->
#{type => object,
properties => #{
sql => #{
description => <<"The SQL">>,
type => string,
example => <<"SELECT * from \"t/1\"">>
},
enable => #{
description => <<"Enable or disable the rule">>,
type => boolean,
example => true
},
outputs => #{
description => <<"The outputs of the rule">>,
type => array,
items => #{
'oneOf' => [
#{
type => string,
example => <<"channel_id_of_my_bridge">>,
description => <<"The channel id of an emqx bridge">>
},
#{
type => object,
properties => #{
function => #{
type => string,
example => <<"console">>
}
}
}
]
400 => error_schema('BAD_ARGS', "Invalid Parameters"),
412 => error_schema('NOT_MATCH', "SQL Not Match"),
200 => <<"Rule Test Pass">>
}
},
description => #{
description => <<"The description for the rule">>,
type => string,
example => <<"A simple rule that handles MQTT messages from topic \"t/1\"">>
}
}
}.
post_req_schema() ->
Req = #{properties := Prop} = put_req_schema(),
Req#{properties => Prop#{
id => #{
description => <<"The Id for the rule">>,
example => <<"my_rule">>,
type => string
}
}}.
resp_schema() ->
Req = #{properties := Prop} = put_req_schema(),
Req#{properties => Prop#{
id => #{
description => <<"The Id for the rule">>,
type => string
},
created_at => #{
description => <<"The time that this rule was created, in rfc3339 format">>,
type => string,
example => <<"2021-09-18T13:57:29+08:00">>
}
}}.
rule_test_req_schema() ->
#{type => object, properties => #{
sql => #{
description => <<"The SQL">>,
type => string,
example => <<"SELECT * from \"t/1\"">>
},
context => #{
type => object,
properties => #{
event_type => #{
description => <<"Event Type">>,
type => string,
enum => [<<"message_publish">>, <<"message_acked">>, <<"message_delivered">>,
<<"message_dropped">>, <<"session_subscribed">>, <<"session_unsubscribed">>,
<<"client_connected">>, <<"client_disconnected">>],
example => <<"message_publish">>
},
clientid => #{
description => <<"The Client ID">>,
type => string,
example => <<"\"c_emqx\"">>
},
topic => #{
description => <<"The Topic">>,
type => string,
example => <<"t/1">>
}
}
}
}}.
rule_test_resp_schema() ->
#{type => object}.
param_path_id() ->
#{
name => id,
in => path,
schema => #{type => string},
required => true
}.
[{id, mk(binary(), #{in => path, example => <<"my_rule_id">>})}].
%%------------------------------------------------------------------------------
%% Rules API
%%------------------------------------------------------------------------------
list_events(#{}, _Params) ->
'/rule_events'(get, _Params) ->
{200, emqx_rule_events:event_info()}.
crud_rules(get, _Params) ->
'/rules'(get, _Params) ->
Records = emqx_rule_engine:get_rules_ordered_by_ts(),
{200, format_rule_resp(Records)};
crud_rules(post, #{body := #{<<"id">> := Id} = Params}) ->
'/rules'(post, #{body := #{<<"id">> := Id} = Params}) ->
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx_rule_engine:get_rule(Id) of
{ok, _Rule} ->
@ -263,13 +184,13 @@ crud_rules(post, #{body := #{<<"id">> := Id} = Params}) ->
end
end.
rule_test(post, #{body := Params}) ->
'/rule_test'(post, #{body := Params}) ->
?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of
{ok, Result} -> {200, Result};
{error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}}
end).
crud_rules_by_id(get, #{bindings := #{id := Id}}) ->
'/rules/:id'(get, #{bindings := #{id := Id}}) ->
case emqx_rule_engine:get_rule(Id) of
{ok, Rule} ->
{200, format_rule_resp(Rule)};
@ -277,7 +198,7 @@ crud_rules_by_id(get, #{bindings := #{id := Id}}) ->
{404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
end;
crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) ->
'/rules/:id'(put, #{bindings := #{id := Id}, body := Params}) ->
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
@ -289,7 +210,7 @@ crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) ->
{400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
end;
crud_rules_by_id(delete, #{bindings := #{id := Id}}) ->
'/rules/:id'(delete, #{bindings := #{id := Id}}) ->
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx:remove_config(ConfPath, #{}) of
{ok, _} -> {204};
@ -315,11 +236,13 @@ format_rule_resp(#{ id := Id, created_at := CreatedAt,
sql := SQL,
enabled := Enabled,
description := Descr}) ->
NodeMetrics = get_rule_metrics(Id),
#{id => Id,
from => Topics,
outputs => format_output(Output),
sql => SQL,
metrics => get_rule_metrics(Id),
metrics => aggregate_metrics(NodeMetrics),
node_metrics => NodeMetrics,
enabled => Enabled,
created_at => format_datetime(CreatedAt, millisecond),
description => Descr
@ -353,5 +276,14 @@ get_rule_metrics(Id) ->
[Format(Node, rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [rule_metrics, Id]))
|| Node <- mria_mnesia:running_nodes()].
aggregate_metrics(AllMetrics) ->
InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0},
lists:foldl(fun
(#{matched := Match1, rate := Rate1, rate_max := RateMax1, rate_last5m := Rate5m1},
#{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) ->
#{matched => Match1 + Match0, rate => Rate1 + Rate0,
rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0}
end, InitMetrics, AllMetrics).
get_one_rule(AllRules, Id) ->
[R || R = #{id := Id0} <- AllRules, Id0 == Id].

View File

@ -44,19 +44,17 @@ fields("rules") ->
SQL query to transform the messages.<br>
Example: <code>SELECT * FROM \"test/topic\" WHERE payload.x = 1</code><br>
"""
, example => "SELECT * FROM \"test/topic\" WHERE payload.x = 1"
, nullable => false
, validator => fun ?MODULE:validate_sql/1})}
, {"outputs", sc(hoconsc:array(hoconsc:union(
[ binary()
, ref("builtin_output_republish")
, ref("builtin_output_console")
])),
, validator => fun ?MODULE:validate_sql/1
})}
, {"outputs", sc(hoconsc:array(hoconsc:union(outputs())),
#{ desc => """
A list of outputs of the rule.<br>
An output can be a string that refers to the channel Id of a emqx bridge, or a object
that refers to a function.<br>
There a some built-in functions like \"republish\" and \"console\", and we also support user
provided functions like \"ModuleName:FunctionName\".<br>
provided functions in the format: \"{module}:{function}\".<br>
The outputs in the list is executed one by one in order.
This means that if one of the output is executing slowly, all of the outputs comes after it will not
be executed until it returns.<br>
@ -66,9 +64,19 @@ If there's any error when running an output, there will be an error message, and
counter of the function output or the bridge channel will increase.
"""
, default => []
, example => [
<<"http:my_http_bridge">>,
#{function => republish, args => #{
topic => <<"t/1">>, payload => <<"${payload}">>}},
#{function => console}
]
})}
, {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})}
, {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})}
, {"description", sc(binary(),
#{ desc => "The description of the rule"
, example => "Some description"
, default => <<>>
})}
];
fields("builtin_output_republish") ->
@ -106,6 +114,27 @@ fields("builtin_output_console") ->
% default => #{}})}
];
fields("user_provided_function") ->
[ {function, sc(binary(),
#{ desc => """
The user provided function. Should be in the format: '{module}:{function}'.<br>
Where the <module> is the erlang callback module and the {function} is the erlang function.<br>
To write your own function, checkout the function <code>console</code> and
<code>republish</code> in the source file:
<code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> as an example.
"""
, example => "module:function"
})}
, {args, sc(map(),
#{ desc => """
The args will be passed as the 3rd argument to module:function/3,
checkout the function <code>console</code> and <code>republish</code> in the source file:
<code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> as an example.
"""
, default => #{}
})}
];
fields("republish_args") ->
[ {topic, sc(binary(),
#{ desc =>"""
@ -113,8 +142,9 @@ The target topic of message to be re-published.<br>
Template with variables is allowed, see description of the 'republish_args'.
"""
, nullable => false
, example => <<"a/1">>
})}
, {qos, sc(binary(),
, {qos, sc(qos(),
#{ desc => """
The qos of the message to be re-published.
Template with with variables is allowed, see description of the 'republish_args.<br>
@ -122,8 +152,9 @@ Defaults to ${qos}. If variable ${qos} is not found from the selected result of
0 is used.
"""
, default => <<"${qos}">>
, example => <<"${qos}">>
})}
, {retain, sc(binary(),
, {retain, sc(hoconsc:union([binary(), boolean()]),
#{ desc => """
The retain flag of the message to be re-published.
Template with with variables is allowed, see description of the 'republish_args.<br>
@ -131,6 +162,7 @@ Defaults to ${retain}. If variable ${retain} is not found from the selected resu
of the rule, false is used.
"""
, default => <<"${retain}">>
, example => <<"${retain}">>
})}
, {payload, sc(binary(),
#{ desc => """
@ -140,9 +172,20 @@ Defaults to ${payload}. If variable ${payload} is not found from the selected re
of the rule, then the string \"undefined\" is used.
"""
, default => <<"${payload}">>
, example => <<"${payload}">>
})}
].
outputs() ->
[ binary()
, ref("builtin_output_republish")
, ref("builtin_output_console")
, ref("user_provided_function")
].
qos() ->
hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2), binary()]).
validate_sql(Sql) ->
case emqx_rule_sqlparser:parse(Sql) of
{ok, _Result} -> ok;

View File

@ -25,7 +25,9 @@
, load/1
, unload/0
, unload/1
, event_names/0
, event_name/1
, event_topic/1
, eventmsg_publish/1
]).
@ -45,17 +47,6 @@
, columns_with_exam/1
]).
-define(SUPPORTED_HOOK,
[ 'client.connected'
, 'client.disconnected'
, 'session.subscribed'
, 'session.unsubscribed'
, 'message.publish'
, 'message.delivered'
, 'message.acked'
, 'message.dropped'
]).
-ifdef(TEST).
-export([ reason/1
, hook_fun/1
@ -63,6 +54,17 @@
]).
-endif.
event_names() ->
[ 'client.connected'
, 'client.disconnected'
, 'session.subscribed'
, 'session.unsubscribed'
, 'message.publish'
, 'message.delivered'
, 'message.acked'
, 'message.dropped'
].
reload() ->
lists:foreach(fun(Rule) ->
ok = emqx_rule_engine:load_hooks_for_rule(Rule)
@ -78,7 +80,7 @@ load(Topic) ->
unload() ->
lists:foreach(fun(HookPoint) ->
emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)})
end, ?SUPPORTED_HOOK).
end, event_names()).
unload(Topic) ->
HookPoint = event_name(Topic),

View File

@ -247,9 +247,9 @@ handle_output(OutId, Selected, Envs) ->
})
end.
do_handle_output(ChannelId, Selected, _Envs) when is_binary(ChannelId) ->
?SLOG(debug, #{msg => "output to bridge", channel_id => ChannelId}),
emqx_bridge:send_message(ChannelId, Selected);
do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
?SLOG(debug, #{msg => "output to bridge", bridge_id => BridgeId}),
emqx_bridge:send_message(BridgeId, Selected);
do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
Mod:Func(Selected, Envs, Args).