emqx/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl

366 lines
14 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_api_schema).
-behaviour(hocon_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-export([check_params/2]).
-export([namespace/0, roots/0, fields/1]).
-type tag() :: rule_creation | rule_test | rule_engine.
-spec check_params(map(), tag()) -> {ok, map()} | {error, term()}.
check_params(Params, Tag) ->
BTag = atom_to_binary(Tag),
Opts = #{atom_key => true, required => false},
try hocon_tconf:check_plain(?MODULE, #{BTag => Params}, Opts, [Tag]) of
#{Tag := Checked} -> {ok, Checked}
catch
throw:Reason ->
?SLOG(error, #{
msg => "check_rule_params_failed",
reason => Reason
}),
{error, Reason}
end.
%%======================================================================================
%% Hocon Schema Definitions
namespace() -> "rule_engine".
roots() ->
[
{"rule_engine", sc(ref("rule_engine"), #{desc => ?DESC("root_rule_engine")})},
{"rule_creation", sc(ref("rule_creation"), #{desc => ?DESC("root_rule_creation")})},
{"rule_info", sc(ref("rule_info"), #{desc => ?DESC("root_rule_info")})},
{"rule_events", sc(ref("rule_events"), #{desc => ?DESC("root_rule_events")})},
{"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})}
].
fields("rule_engine") ->
emqx_rule_engine_schema:rule_engine_settings();
fields("rule_creation") ->
emqx_rule_engine_schema:fields("rules");
fields("rule_info") ->
[
rule_id(),
{"from",
sc(
hoconsc:array(binary()),
#{desc => ?DESC("ri_from"), example => "t/#"}
)},
{"created_at",
sc(
binary(),
#{
desc => ?DESC("ri_created_at"),
example => "2021-12-01T15:00:43.153+08:00"
}
)}
] ++ fields("rule_creation");
fields("rule_metrics") ->
[
rule_id(),
{"metrics", sc(ref("metrics"), #{desc => ?DESC("ri_metrics")})},
{"node_metrics",
sc(
hoconsc:array(ref("node_metrics")),
#{desc => ?DESC("ri_node_metrics")}
)}
];
%% TODO: we can delete this API if the Dashboard not depends on it
fields("rule_events") ->
ETopics = emqx_rule_events:event_topics_enum(),
[
{"event", sc(hoconsc:enum(ETopics), #{desc => ?DESC("rs_event"), required => true})},
{"title", sc(binary(), #{desc => ?DESC("rs_title"), example => "some title"})},
{"description", sc(binary(), #{desc => ?DESC("rs_description"), example => "some desc"})},
{"columns", sc(map(), #{desc => ?DESC("rs_columns")})},
{"test_columns", sc(map(), #{desc => ?DESC("rs_test_columns")})},
{"sql_example", sc(binary(), #{desc => ?DESC("rs_sql_example")})}
];
fields("rule_test") ->
[
{"context",
sc(
hoconsc:union([
ref("ctx_pub"),
ref("ctx_sub"),
ref("ctx_unsub"),
ref("ctx_delivered"),
ref("ctx_acked"),
ref("ctx_dropped"),
ref("ctx_connected"),
ref("ctx_disconnected"),
ref("ctx_connack"),
ref("ctx_check_authz_complete"),
ref("ctx_bridge_mqtt"),
ref("ctx_delivery_dropped")
]),
#{
desc => ?DESC("test_context"),
default => #{}
}
)},
{"sql", sc(binary(), #{desc => ?DESC("test_sql"), required => true})}
];
fields("metrics") ->
[
{"matched",
sc(non_neg_integer(), #{
desc => ?DESC("metrics_sql_matched")
})},
{"matched.rate", sc(float(), #{desc => ?DESC("metrics_sql_matched_rate")})},
{"matched.rate.max", sc(float(), #{desc => ?DESC("metrics_sql_matched_rate_max")})},
{"matched.rate.last5m",
sc(
float(),
#{desc => ?DESC("metrics_sql_matched_rate_last5m")}
)},
{"passed", sc(non_neg_integer(), #{desc => ?DESC("metrics_sql_passed")})},
{"failed", sc(non_neg_integer(), #{desc => ?DESC("metrics_sql_failed")})},
{"failed.exception",
sc(non_neg_integer(), #{
desc => ?DESC("metrics_sql_failed_exception")
})},
{"failed.unknown",
sc(non_neg_integer(), #{
desc => ?DESC("metrics_sql_failed_unknown")
})},
{"actions.total",
sc(non_neg_integer(), #{
desc => ?DESC("metrics_actions_total")
})},
{"actions.success",
sc(non_neg_integer(), #{
desc => ?DESC("metrics_actions_success")
})},
{"actions.failed",
sc(non_neg_integer(), #{
desc => ?DESC("metrics_actions_failed")
})},
{"actions.failed.out_of_service",
sc(non_neg_integer(), #{
desc => ?DESC("metrics_actions_failed_out_of_service")
})},
{"actions.failed.unknown",
sc(non_neg_integer(), #{
desc => ?DESC("metrics_actions_failed_unknown")
})}
];
fields("node_metrics") ->
[{"node", sc(binary(), #{desc => ?DESC("node_node"), example => "emqx@127.0.0.1"})}] ++
fields("metrics");
fields("ctx_pub") ->
Event = 'message.publish',
[
{"event_type", event_type_sc(Event)},
{"event", event_sc(Event)},
{"id", sc(binary(), #{desc => ?DESC("event_id")})}
| msg_event_common_fields()
];
fields("ctx_sub") ->
Event = 'session.subscribed',
[
{"event_type", event_type_sc(Event)},
{"event", event_sc(Event)}
| msg_event_common_fields()
];
fields("ctx_unsub") ->
Event = 'session.unsubscribed',
[
{"event_type", event_type_sc(Event)},
{"event", event_sc(Event)}
| without(["event_type", "event_topic", "event"], fields("ctx_sub"))
];
fields("ctx_delivered") ->
Event = 'message.delivered',
[
{"event_type", event_type_sc(Event)},
{"event", event_sc(Event)},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})},
{"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})}
| msg_event_common_fields()
];
fields("ctx_acked") ->
Event = 'message.acked',
[
{"event_type", event_type_sc(Event)},
{"event", event_sc(Event)}
| without(["event_type", "event_topic", "event"], fields("ctx_delivered"))
];
fields("ctx_dropped") ->
Event = 'message.dropped',
[
{"event_type", event_type_sc(Event)},
{"event", event_sc(Event)},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})}
| msg_event_common_fields()
];
fields("ctx_connected") ->
Event = 'client.connected',
[
{"event_type", event_type_sc(Event)},
{"event", event_sc(Event)},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"mountpoint", sc(binary(), #{desc => ?DESC("event_mountpoint")})},
{"peername", sc(binary(), #{desc => ?DESC("event_peername")})},
{"sockname", sc(binary(), #{desc => ?DESC("event_sockname")})},
{"proto_name", sc(binary(), #{desc => ?DESC("event_proto_name")})},
{"proto_ver", sc(binary(), #{desc => ?DESC("event_proto_ver")})},
{"keepalive", sc(integer(), #{desc => ?DESC("event_keepalive")})},
{"clean_start", sc(boolean(), #{desc => ?DESC("event_clean_start"), default => true})},
{"expiry_interval", sc(integer(), #{desc => ?DESC("event_expiry_interval")})},
{"is_bridge", sc(boolean(), #{desc => ?DESC("event_is_bridge"), default => false})},
{"connected_at",
sc(integer(), #{
desc => ?DESC("event_connected_at")
})}
];
fields("ctx_disconnected") ->
Event = 'client.disconnected',
[
{"event_type", event_type_sc(Event)},
{"event", event_sc(Event)},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_disconnected_reason")})},
{"peername", sc(binary(), #{desc => ?DESC("event_peername")})},
{"sockname", sc(binary(), #{desc => ?DESC("event_sockname")})},
{"disconnected_at",
sc(integer(), #{
desc => ?DESC("event_ctx_disconnected_da")
})}
];
fields("ctx_connack") ->
Event = 'client.connack',
[
{"event_type", event_type_sc(Event)},
{"event", event_sc(Event)},
{"reason_code", sc(binary(), #{desc => ?DESC("event_ctx_connack_reason_code")})},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"clean_start", sc(boolean(), #{desc => ?DESC("event_clean_start"), default => true})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"peername", sc(binary(), #{desc => ?DESC("event_peername")})},
{"sockname", sc(binary(), #{desc => ?DESC("event_sockname")})},
{"proto_name", sc(binary(), #{desc => ?DESC("event_proto_name")})},
{"proto_ver", sc(binary(), #{desc => ?DESC("event_proto_ver")})},
{"keepalive", sc(integer(), #{desc => ?DESC("event_keepalive")})},
{"expiry_interval", sc(integer(), #{desc => ?DESC("event_expiry_interval")})},
{"connected_at",
sc(integer(), #{
desc => ?DESC("event_connected_at")
})}
];
fields("ctx_check_authz_complete") ->
Event = 'client.check_authz_complete',
[
{"event_type", event_type_sc(Event)},
{"event", event_sc(Event)},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"action", sc(binary(), #{desc => ?DESC("event_action")})},
{"authz_source", sc(binary(), #{desc => ?DESC("event_authz_source")})},
{"result", sc(binary(), #{desc => ?DESC("event_result")})}
];
fields("ctx_bridge_mqtt") ->
Event = '$bridges/mqtt:*',
EventBin = atom_to_binary(Event),
[
{"event_type", event_type_sc(Event)},
{"event", event_sc(EventBin)},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"server", sc(binary(), #{desc => ?DESC("event_server")})},
{"dup", sc(binary(), #{desc => ?DESC("event_dup")})},
{"retain", sc(binary(), #{desc => ?DESC("event_retain")})},
{"message_received_at", publish_received_at_sc()},
qos()
];
fields("ctx_delivery_dropped") ->
Event = 'delivery.dropped',
[
{"event_type", event_type_sc(Event)},
{"event", event_sc(Event)},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})},
{"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})},
{"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})}
| msg_event_common_fields()
].
qos() ->
{"qos", sc(emqx_schema:qos(), #{desc => ?DESC("event_qos")})}.
rule_id() ->
{"id",
sc(
binary(),
#{
desc => ?DESC("rule_id"),
required => true,
example => "293fb66f"
}
)}.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).
event_type_sc(Event) ->
EventType = event_to_event_type(Event),
sc(EventType, #{desc => ?DESC("event_event_type"), required => true}).
-spec event_to_event_type(atom()) -> atom().
event_to_event_type(Event) ->
binary_to_atom(binary:replace(atom_to_binary(Event), <<".">>, <<"_">>)).
event_sc(Event) when is_binary(Event) ->
%% only exception is `$bridges/...'.
sc(binary(), #{default => Event, importance => ?IMPORTANCE_HIDDEN});
event_sc(Event) ->
sc(Event, #{default => Event, importance => ?IMPORTANCE_HIDDEN}).
without(FieldNames, Fields) ->
lists:foldl(fun proplists:delete/2, Fields, FieldNames).
publish_received_at_sc() ->
sc(integer(), #{desc => ?DESC("event_publish_received_at")}).
msg_event_common_fields() ->
[
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"publish_received_at", publish_received_at_sc()},
qos()
].