From 416b9f8d7c52299ef579af9574399a02f53ef535 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 3 Dec 2021 10:50:22 +0800 Subject: [PATCH] refactor(rule): generate swagger from hocon schema for /rules --- .../src/emqx_rule_api_schema.erl | 41 ++- .../src/emqx_rule_engine_api.erl | 282 +++++++----------- .../src/emqx_rule_engine_schema.erl | 63 +++- .../emqx_rule_engine/src/emqx_rule_events.erl | 26 +- .../src/emqx_rule_runtime.erl | 6 +- 5 files changed, 217 insertions(+), 201 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 448f63138..1fe75447e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -32,13 +32,40 @@ check_params(Params, Tag) -> roots() -> [ {"rule_creation", sc(ref("rule_creation"), #{desc => "Schema for creating rules"})} + , {"rule_info", sc(ref("rule_info"), #{desc => "Schema for rule info"})} + , {"rule_events", sc(ref("rule_events"), #{desc => "Schema for rule events"})} , {"rule_test", sc(ref("rule_test"), #{desc => "Schema for testing rules"})} ]. fields("rule_creation") -> - [ {"id", sc(binary(), #{desc => "The Id of the rule", nullable => false})} + [ {"id", sc(binary(), + #{ desc => "The Id of the rule", nullable => false + , example => "my_rule_id" + })} ] ++ emqx_rule_engine_schema:fields("rules"); +fields("rule_info") -> + [ {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})} + , {"node_metrics", sc(ref("node_metrics"), #{desc => "The metrics of the rule"})} + , {"from", sc(hoconsc:array(binary()), + #{desc => "The topics of the rule", example => "t/#"})} + , {"created_at", sc(binary(), + #{ desc => "The created time of the rule" + , example => "2021-12-01T15:00:43.153+08:00" + })} + ] ++ fields("rule_creation"); + +%% TODO: we can delete this API if the Dashboard not denpends on it +fields("rule_events") -> + ETopics = [emqx_rule_events:event_topic(E) || E <- emqx_rule_events:event_names()], + [ {"event", sc(hoconsc:enum(ETopics), #{desc => "The event topics", nullable => false})} + , {"title", sc(binary(), #{desc => "The title", example => "some title"})} + , {"description", sc(binary(), #{desc => "The description", example => "some desc"})} + , {"columns", sc(map(), #{desc => "The columns"})} + , {"test_columns", sc(map(), #{desc => "The test columns"})} + , {"sql_example", sc(binary(), #{desc => "The sql_example"})} + ]; + fields("rule_test") -> [ {"context", sc(hoconsc:union([ ref("ctx_pub") , ref("ctx_sub") @@ -53,6 +80,18 @@ fields("rule_test") -> , {"sql", sc(binary(), #{desc => "The SQL of the rule for testing", nullable => false})} ]; +fields("metrics") -> + [ {"matched", sc(integer(), #{desc => "How much times this rule is matched"})} + , {"rate", sc(float(), #{desc => "The rate of matched, times/second"})} + , {"rate_max", sc(float(), #{desc => "The max rate of matched, times/second"})} + , {"rate_last5m", sc(float(), + #{desc => "The average rate of matched in last 5 mins, times/second"})} + ]; + +fields("node_metrics") -> + [ {"node", sc(binary(), #{desc => "The node name", example => "emqx@127.0.0.1"})} + ] ++ fields("metrics"); + fields("ctx_pub") -> [ {"event_type", sc(message_publish, #{desc => "Event Type", nullable => false})} , {"id", sc(binary(), #{desc => "Message ID"})} diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index e5fd2de1c..75238fb71 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -18,16 +18,17 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("typerefl/include/types.hrl"). -behaviour(minirest_api). --export([api_spec/0]). +-import(hoconsc, [mk/2, ref/2, array/1]). --export([ crud_rules/2 - , list_events/2 - , crud_rules_by_id/2 - , rule_test/2 - ]). +%% Swagger specs from hocon schema +-export([api_spec/0, paths/0, schema/1, namespace/0]). + +%% API callbacks +-export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2]). -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))). -define(ERR_BADARGS(REASON), @@ -43,210 +44,130 @@ {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}} end). +namespace() -> "rule". + api_spec() -> - { - [ api_rules_list_create() - , api_rules_crud() - , api_rule_test() - , api_events_list() - ], - [] - }. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). -api_rules_list_create() -> - Metadata = #{ +paths() -> ["/rule_events", "/rule_test", "/rules", "/rules/:id"]. + +error_schema(Code, Message) -> + [ {code, mk(string(), #{example => Code})} + , {message, mk(string(), #{example => Message})} + ]. + +rule_creation_schema() -> + ref(emqx_rule_api_schema, "rule_creation"). + +rule_update_schema() -> + ref(emqx_rule_engine_schema, "rules"). + +rule_test_schema() -> + ref(emqx_rule_api_schema, "rule_test"). + +rule_info_schema() -> + ref(emqx_rule_api_schema, "rule_info"). + +schema("/rules") -> + #{ + operationId => '/rules', get => #{ + tags => [<<"rules">>], description => <<"List all rules">>, + summary => <<"List Rules">>, responses => #{ - <<"200">> => - emqx_mgmt_util:array_schema(resp_schema(), <<"List rules successfully">>)}}, + 200 => mk(array(rule_info_schema()), #{desc => "List of rules"}) + }}, post => #{ - description => <<"Create a new rule using given Id to all nodes in the cluster">>, - 'requestBody' => emqx_mgmt_util:schema(post_req_schema(), <<"Rule parameters">>), + tags => [<<"rules">>], + description => <<"Create a new rule using given Id">>, + summary => <<"Create a Rule">>, + requestBody => rule_creation_schema(), responses => #{ - <<"400">> => - emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']), - <<"201">> => - emqx_mgmt_util:schema(resp_schema(), <<"Create rule successfully">>)}} - }, - {"/rules", Metadata, crud_rules}. + 400 => error_schema('BAD_ARGS', "Invalid Parameters"), + 201 => rule_info_schema() + }} + }; -api_events_list() -> - Metadata = #{ +schema("/rule_events") -> + #{ + operationId => '/rule_events', get => #{ + tags => [<<"rules">>], description => <<"List all events can be used in rules">>, + summary => <<"List Events">>, responses => #{ - <<"200">> => - emqx_mgmt_util:array_schema(resp_schema(), <<"List events successfully">>)}} - }, - {"/rule_events", Metadata, list_events}. + 200 => mk(ref(emqx_rule_api_schema, "rule_events"), #{}) + } + } + }; -api_rules_crud() -> - Metadata = #{ +schema("/rules/:id") -> + #{ + operationId => '/rules/:id', get => #{ + tags => [<<"rules">>], description => <<"Get a rule by given Id">>, - parameters => [param_path_id()], + summary => <<"Get a Rule">>, + parameters => param_path_id(), responses => #{ - <<"404">> => - emqx_mgmt_util:error_schema(<<"Rule not found">>, ['NOT_FOUND']), - <<"200">> => - emqx_mgmt_util:schema(resp_schema(), <<"Get rule successfully">>)}}, + 404 => error_schema('NOT_FOUND', "Rule not found"), + 200 => rule_info_schema() + } + }, put => #{ + tags => [<<"rules">>], description => <<"Update a rule by given Id to all nodes in the cluster">>, - parameters => [param_path_id()], - 'requestBody' => emqx_mgmt_util:schema(put_req_schema(), <<"Rule parameters">>), + summary => <<"Update a Rule">>, + parameters => param_path_id(), + requestBody => rule_update_schema(), responses => #{ - <<"400">> => - emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']), - <<"200">> => - emqx_mgmt_util:schema(resp_schema(), - <<"Update rule successfully">>)}}, + 400 => error_schema('BAD_ARGS', "Invalid Parameters"), + 200 => rule_info_schema() + } + }, delete => #{ + tags => [<<"rules">>], description => <<"Delete a rule by given Id from all nodes in the cluster">>, - parameters => [param_path_id()], + summary => <<"Delete a Rule">>, + parameters => param_path_id(), responses => #{ - <<"204">> => - emqx_mgmt_util:schema(<<"Delete rule successfully">>)}} - }, - {"/rules/:id", Metadata, crud_rules_by_id}. + 204 => <<"Delete rule successfully">> + } + } + }; -api_rule_test() -> - Metadata = #{ +schema("/rule_test") -> + #{ + operationId => '/rule_test', post => #{ + tags => [<<"rules">>], description => <<"Test a rule">>, - 'requestBody' => emqx_mgmt_util:schema(rule_test_req_schema(), <<"Rule parameters">>), + summary => <<"Test a Rule">>, + requestBody => rule_test_schema(), responses => #{ - <<"400">> => - emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']), - <<"412">> => - emqx_mgmt_util:error_schema(<<"SQL Not Match">>, ['NOT_MATCH']), - <<"200">> => - emqx_mgmt_util:schema(rule_test_resp_schema(), <<"Rule Test Pass">>)}} - }, - {"/rule_test", Metadata, rule_test}. - -put_req_schema() -> - #{type => object, - properties => #{ - sql => #{ - description => <<"The SQL">>, - type => string, - example => <<"SELECT * from \"t/1\"">> - }, - enable => #{ - description => <<"Enable or disable the rule">>, - type => boolean, - example => true - }, - outputs => #{ - description => <<"The outputs of the rule">>, - type => array, - items => #{ - 'oneOf' => [ - #{ - type => string, - example => <<"channel_id_of_my_bridge">>, - description => <<"The channel id of an emqx bridge">> - }, - #{ - type => object, - properties => #{ - function => #{ - type => string, - example => <<"console">> - } - } - } - ] + 400 => error_schema('BAD_ARGS', "Invalid Parameters"), + 412 => error_schema('NOT_MATCH', "SQL Not Match"), + 200 => <<"Rule Test Pass">> } - }, - description => #{ - description => <<"The description for the rule">>, - type => string, - example => <<"A simple rule that handles MQTT messages from topic \"t/1\"">> } - } }. -post_req_schema() -> - Req = #{properties := Prop} = put_req_schema(), - Req#{properties => Prop#{ - id => #{ - description => <<"The Id for the rule">>, - example => <<"my_rule">>, - type => string - } - }}. - -resp_schema() -> - Req = #{properties := Prop} = put_req_schema(), - Req#{properties => Prop#{ - id => #{ - description => <<"The Id for the rule">>, - type => string - }, - created_at => #{ - description => <<"The time that this rule was created, in rfc3339 format">>, - type => string, - example => <<"2021-09-18T13:57:29+08:00">> - } - }}. - -rule_test_req_schema() -> - #{type => object, properties => #{ - sql => #{ - description => <<"The SQL">>, - type => string, - example => <<"SELECT * from \"t/1\"">> - }, - context => #{ - type => object, - properties => #{ - event_type => #{ - description => <<"Event Type">>, - type => string, - enum => [<<"message_publish">>, <<"message_acked">>, <<"message_delivered">>, - <<"message_dropped">>, <<"session_subscribed">>, <<"session_unsubscribed">>, - <<"client_connected">>, <<"client_disconnected">>], - example => <<"message_publish">> - }, - clientid => #{ - description => <<"The Client ID">>, - type => string, - example => <<"\"c_emqx\"">> - }, - topic => #{ - description => <<"The Topic">>, - type => string, - example => <<"t/1">> - } - } - } - }}. - -rule_test_resp_schema() -> - #{type => object}. - param_path_id() -> - #{ - name => id, - in => path, - schema => #{type => string}, - required => true - }. + [{id, mk(binary(), #{in => path, example => <<"my_rule_id">>})}]. %%------------------------------------------------------------------------------ %% Rules API %%------------------------------------------------------------------------------ -list_events(#{}, _Params) -> +'/rule_events'(get, _Params) -> {200, emqx_rule_events:event_info()}. -crud_rules(get, _Params) -> +'/rules'(get, _Params) -> Records = emqx_rule_engine:get_rules_ordered_by_ts(), {200, format_rule_resp(Records)}; -crud_rules(post, #{body := #{<<"id">> := Id} = Params}) -> +'/rules'(post, #{body := #{<<"id">> := Id} = Params}) -> ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx_rule_engine:get_rule(Id) of {ok, _Rule} -> @@ -263,13 +184,13 @@ crud_rules(post, #{body := #{<<"id">> := Id} = Params}) -> end end. -rule_test(post, #{body := Params}) -> +'/rule_test'(post, #{body := Params}) -> ?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of {ok, Result} -> {200, Result}; {error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}} end). -crud_rules_by_id(get, #{bindings := #{id := Id}}) -> +'/rules/:id'(get, #{bindings := #{id := Id}}) -> case emqx_rule_engine:get_rule(Id) of {ok, Rule} -> {200, format_rule_resp(Rule)}; @@ -277,7 +198,7 @@ crud_rules_by_id(get, #{bindings := #{id := Id}}) -> {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}} end; -crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) -> +'/rules/:id'(put, #{bindings := #{id := Id}, body := Params}) -> ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> @@ -289,7 +210,7 @@ crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) -> {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} end; -crud_rules_by_id(delete, #{bindings := #{id := Id}}) -> +'/rules/:id'(delete, #{bindings := #{id := Id}}) -> ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx:remove_config(ConfPath, #{}) of {ok, _} -> {204}; @@ -315,11 +236,13 @@ format_rule_resp(#{ id := Id, created_at := CreatedAt, sql := SQL, enabled := Enabled, description := Descr}) -> + NodeMetrics = get_rule_metrics(Id), #{id => Id, from => Topics, outputs => format_output(Output), sql => SQL, - metrics => get_rule_metrics(Id), + metrics => aggregate_metrics(NodeMetrics), + node_metrics => NodeMetrics, enabled => Enabled, created_at => format_datetime(CreatedAt, millisecond), description => Descr @@ -353,5 +276,14 @@ get_rule_metrics(Id) -> [Format(Node, rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [rule_metrics, Id])) || Node <- mria_mnesia:running_nodes()]. +aggregate_metrics(AllMetrics) -> + InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0}, + lists:foldl(fun + (#{matched := Match1, rate := Rate1, rate_max := RateMax1, rate_last5m := Rate5m1}, + #{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) -> + #{matched => Match1 + Match0, rate => Rate1 + Rate0, + rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0} + end, InitMetrics, AllMetrics). + get_one_rule(AllRules, Id) -> [R || R = #{id := Id0} <- AllRules, Id0 == Id]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index 995044fc7..93661ab53 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -44,19 +44,17 @@ fields("rules") -> SQL query to transform the messages.
Example: SELECT * FROM \"test/topic\" WHERE payload.x = 1
""" + , example => "SELECT * FROM \"test/topic\" WHERE payload.x = 1" , nullable => false - , validator => fun ?MODULE:validate_sql/1})} - , {"outputs", sc(hoconsc:array(hoconsc:union( - [ binary() - , ref("builtin_output_republish") - , ref("builtin_output_console") - ])), + , validator => fun ?MODULE:validate_sql/1 + })} + , {"outputs", sc(hoconsc:array(hoconsc:union(outputs())), #{ desc => """ A list of outputs of the rule.
An output can be a string that refers to the channel Id of a emqx bridge, or a object that refers to a function.
There a some built-in functions like \"republish\" and \"console\", and we also support user -provided functions like \"ModuleName:FunctionName\".
+provided functions in the format: \"{module}:{function}\".
The outputs in the list is executed one by one in order. This means that if one of the output is executing slowly, all of the outputs comes after it will not be executed until it returns.
@@ -66,9 +64,19 @@ If there's any error when running an output, there will be an error message, and counter of the function output or the bridge channel will increase. """ , default => [] + , example => [ + <<"http:my_http_bridge">>, + #{function => republish, args => #{ + topic => <<"t/1">>, payload => <<"${payload}">>}}, + #{function => console} + ] })} , {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})} - , {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})} + , {"description", sc(binary(), + #{ desc => "The description of the rule" + , example => "Some description" + , default => <<>> + })} ]; fields("builtin_output_republish") -> @@ -106,6 +114,27 @@ fields("builtin_output_console") -> % default => #{}})} ]; +fields("user_provided_function") -> + [ {function, sc(binary(), + #{ desc => """ +The user provided function. Should be in the format: '{module}:{function}'.
+Where the is the erlang callback module and the {function} is the erlang function.
+To write your own function, checkout the function console and +republish in the source file: +apps/emqx_rule_engine/src/emqx_rule_outputs.erl as an example. +""" + , example => "module:function" + })} + , {args, sc(map(), + #{ desc => """ +The args will be passed as the 3rd argument to module:function/3, +checkout the function console and republish in the source file: +apps/emqx_rule_engine/src/emqx_rule_outputs.erl as an example. +""" + , default => #{} + })} + ]; + fields("republish_args") -> [ {topic, sc(binary(), #{ desc =>""" @@ -113,8 +142,9 @@ The target topic of message to be re-published.
Template with variables is allowed, see description of the 'republish_args'. """ , nullable => false + , example => <<"a/1">> })} - , {qos, sc(binary(), + , {qos, sc(qos(), #{ desc => """ The qos of the message to be re-published. Template with with variables is allowed, see description of the 'republish_args.
@@ -122,8 +152,9 @@ Defaults to ${qos}. If variable ${qos} is not found from the selected result of 0 is used. """ , default => <<"${qos}">> + , example => <<"${qos}">> })} - , {retain, sc(binary(), + , {retain, sc(hoconsc:union([binary(), boolean()]), #{ desc => """ The retain flag of the message to be re-published. Template with with variables is allowed, see description of the 'republish_args.
@@ -131,6 +162,7 @@ Defaults to ${retain}. If variable ${retain} is not found from the selected resu of the rule, false is used. """ , default => <<"${retain}">> + , example => <<"${retain}">> })} , {payload, sc(binary(), #{ desc => """ @@ -140,9 +172,20 @@ Defaults to ${payload}. If variable ${payload} is not found from the selected re of the rule, then the string \"undefined\" is used. """ , default => <<"${payload}">> + , example => <<"${payload}">> })} ]. +outputs() -> + [ binary() + , ref("builtin_output_republish") + , ref("builtin_output_console") + , ref("user_provided_function") + ]. + +qos() -> + hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2), binary()]). + validate_sql(Sql) -> case emqx_rule_sqlparser:parse(Sql) of {ok, _Result} -> ok; diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 0aff9f018..c61629b39 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -25,7 +25,9 @@ , load/1 , unload/0 , unload/1 + , event_names/0 , event_name/1 + , event_topic/1 , eventmsg_publish/1 ]). @@ -45,17 +47,6 @@ , columns_with_exam/1 ]). --define(SUPPORTED_HOOK, - [ 'client.connected' - , 'client.disconnected' - , 'session.subscribed' - , 'session.unsubscribed' - , 'message.publish' - , 'message.delivered' - , 'message.acked' - , 'message.dropped' - ]). - -ifdef(TEST). -export([ reason/1 , hook_fun/1 @@ -63,6 +54,17 @@ ]). -endif. +event_names() -> + [ 'client.connected' + , 'client.disconnected' + , 'session.subscribed' + , 'session.unsubscribed' + , 'message.publish' + , 'message.delivered' + , 'message.acked' + , 'message.dropped' + ]. + reload() -> lists:foreach(fun(Rule) -> ok = emqx_rule_engine:load_hooks_for_rule(Rule) @@ -78,7 +80,7 @@ load(Topic) -> unload() -> lists:foreach(fun(HookPoint) -> emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}) - end, ?SUPPORTED_HOOK). + end, event_names()). unload(Topic) -> HookPoint = event_name(Topic), diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 7b68b3ee3..1cabf3e32 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -247,9 +247,9 @@ handle_output(OutId, Selected, Envs) -> }) end. -do_handle_output(ChannelId, Selected, _Envs) when is_binary(ChannelId) -> - ?SLOG(debug, #{msg => "output to bridge", channel_id => ChannelId}), - emqx_bridge:send_message(ChannelId, Selected); +do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) -> + ?SLOG(debug, #{msg => "output to bridge", bridge_id => BridgeId}), + emqx_bridge:send_message(BridgeId, Selected); do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> Mod:Func(Selected, Envs, Args).