366 lines
14 KiB
Erlang
366 lines
14 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_rule_api_schema).
|
|
|
|
-behaviour(hocon_schema).
|
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-export([check_params/2]).
|
|
|
|
-export([namespace/0, roots/0, fields/1]).
|
|
|
|
-type tag() :: rule_creation | rule_test | rule_engine.
|
|
|
|
-spec check_params(map(), tag()) -> {ok, map()} | {error, term()}.
|
|
check_params(Params, Tag) ->
|
|
BTag = atom_to_binary(Tag),
|
|
Opts = #{atom_key => true, required => false},
|
|
try hocon_tconf:check_plain(?MODULE, #{BTag => Params}, Opts, [Tag]) of
|
|
#{Tag := Checked} -> {ok, Checked}
|
|
catch
|
|
throw:Reason ->
|
|
?SLOG(error, #{
|
|
msg => "check_rule_params_failed",
|
|
reason => Reason
|
|
}),
|
|
{error, Reason}
|
|
end.
|
|
|
|
%%======================================================================================
|
|
%% Hocon Schema Definitions
|
|
|
|
namespace() -> "rule_engine".
|
|
|
|
roots() ->
|
|
[
|
|
{"rule_engine", sc(ref("rule_engine"), #{desc => ?DESC("root_rule_engine")})},
|
|
{"rule_creation", sc(ref("rule_creation"), #{desc => ?DESC("root_rule_creation")})},
|
|
{"rule_info", sc(ref("rule_info"), #{desc => ?DESC("root_rule_info")})},
|
|
{"rule_events", sc(ref("rule_events"), #{desc => ?DESC("root_rule_events")})},
|
|
{"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})}
|
|
].
|
|
|
|
fields("rule_engine") ->
|
|
emqx_rule_engine_schema:rule_engine_settings();
|
|
fields("rule_creation") ->
|
|
emqx_rule_engine_schema:fields("rules");
|
|
fields("rule_info") ->
|
|
[
|
|
rule_id(),
|
|
{"from",
|
|
sc(
|
|
hoconsc:array(binary()),
|
|
#{desc => ?DESC("ri_from"), example => "t/#"}
|
|
)},
|
|
{"created_at",
|
|
sc(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC("ri_created_at"),
|
|
example => "2021-12-01T15:00:43.153+08:00"
|
|
}
|
|
)}
|
|
] ++ fields("rule_creation");
|
|
fields("rule_metrics") ->
|
|
[
|
|
rule_id(),
|
|
{"metrics", sc(ref("metrics"), #{desc => ?DESC("ri_metrics")})},
|
|
{"node_metrics",
|
|
sc(
|
|
hoconsc:array(ref("node_metrics")),
|
|
#{desc => ?DESC("ri_node_metrics")}
|
|
)}
|
|
];
|
|
%% TODO: we can delete this API if the Dashboard not depends on it
|
|
fields("rule_events") ->
|
|
ETopics = emqx_rule_events:event_topics_enum(),
|
|
[
|
|
{"event", sc(hoconsc:enum(ETopics), #{desc => ?DESC("rs_event"), required => true})},
|
|
{"title", sc(binary(), #{desc => ?DESC("rs_title"), example => "some title"})},
|
|
{"description", sc(binary(), #{desc => ?DESC("rs_description"), example => "some desc"})},
|
|
{"columns", sc(map(), #{desc => ?DESC("rs_columns")})},
|
|
{"test_columns", sc(map(), #{desc => ?DESC("rs_test_columns")})},
|
|
{"sql_example", sc(binary(), #{desc => ?DESC("rs_sql_example")})}
|
|
];
|
|
fields("rule_test") ->
|
|
[
|
|
{"context",
|
|
sc(
|
|
hoconsc:union([
|
|
ref("ctx_pub"),
|
|
ref("ctx_sub"),
|
|
ref("ctx_unsub"),
|
|
ref("ctx_delivered"),
|
|
ref("ctx_acked"),
|
|
ref("ctx_dropped"),
|
|
ref("ctx_connected"),
|
|
ref("ctx_disconnected"),
|
|
ref("ctx_connack"),
|
|
ref("ctx_check_authz_complete"),
|
|
ref("ctx_bridge_mqtt"),
|
|
ref("ctx_delivery_dropped")
|
|
]),
|
|
#{
|
|
desc => ?DESC("test_context"),
|
|
default => #{}
|
|
}
|
|
)},
|
|
{"sql", sc(binary(), #{desc => ?DESC("test_sql"), required => true})}
|
|
];
|
|
fields("metrics") ->
|
|
[
|
|
{"matched",
|
|
sc(non_neg_integer(), #{
|
|
desc => ?DESC("metrics_sql_matched")
|
|
})},
|
|
{"matched.rate", sc(float(), #{desc => ?DESC("metrics_sql_matched_rate")})},
|
|
{"matched.rate.max", sc(float(), #{desc => ?DESC("metrics_sql_matched_rate_max")})},
|
|
{"matched.rate.last5m",
|
|
sc(
|
|
float(),
|
|
#{desc => ?DESC("metrics_sql_matched_rate_last5m")}
|
|
)},
|
|
{"passed", sc(non_neg_integer(), #{desc => ?DESC("metrics_sql_passed")})},
|
|
{"failed", sc(non_neg_integer(), #{desc => ?DESC("metrics_sql_failed")})},
|
|
{"failed.exception",
|
|
sc(non_neg_integer(), #{
|
|
desc => ?DESC("metrics_sql_failed_exception")
|
|
})},
|
|
{"failed.unknown",
|
|
sc(non_neg_integer(), #{
|
|
desc => ?DESC("metrics_sql_failed_unknown")
|
|
})},
|
|
{"actions.total",
|
|
sc(non_neg_integer(), #{
|
|
desc => ?DESC("metrics_actions_total")
|
|
})},
|
|
{"actions.success",
|
|
sc(non_neg_integer(), #{
|
|
desc => ?DESC("metrics_actions_success")
|
|
})},
|
|
{"actions.failed",
|
|
sc(non_neg_integer(), #{
|
|
desc => ?DESC("metrics_actions_failed")
|
|
})},
|
|
{"actions.failed.out_of_service",
|
|
sc(non_neg_integer(), #{
|
|
desc => ?DESC("metrics_actions_failed_out_of_service")
|
|
})},
|
|
{"actions.failed.unknown",
|
|
sc(non_neg_integer(), #{
|
|
desc => ?DESC("metrics_actions_failed_unknown")
|
|
})}
|
|
];
|
|
fields("node_metrics") ->
|
|
[{"node", sc(binary(), #{desc => ?DESC("node_node"), example => "emqx@127.0.0.1"})}] ++
|
|
fields("metrics");
|
|
fields("ctx_pub") ->
|
|
Event = 'message.publish',
|
|
[
|
|
{"event_type", event_type_sc(Event)},
|
|
{"event", event_sc(Event)},
|
|
{"id", sc(binary(), #{desc => ?DESC("event_id")})}
|
|
| msg_event_common_fields()
|
|
];
|
|
fields("ctx_sub") ->
|
|
Event = 'session.subscribed',
|
|
[
|
|
{"event_type", event_type_sc(Event)},
|
|
{"event", event_sc(Event)}
|
|
| msg_event_common_fields()
|
|
];
|
|
fields("ctx_unsub") ->
|
|
Event = 'session.unsubscribed',
|
|
[
|
|
{"event_type", event_type_sc(Event)},
|
|
{"event", event_sc(Event)}
|
|
| without(["event_type", "event_topic", "event"], fields("ctx_sub"))
|
|
];
|
|
fields("ctx_delivered") ->
|
|
Event = 'message.delivered',
|
|
[
|
|
{"event_type", event_type_sc(Event)},
|
|
{"event", event_sc(Event)},
|
|
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
|
|
{"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})},
|
|
{"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})}
|
|
| msg_event_common_fields()
|
|
];
|
|
fields("ctx_acked") ->
|
|
Event = 'message.acked',
|
|
[
|
|
{"event_type", event_type_sc(Event)},
|
|
{"event", event_sc(Event)}
|
|
| without(["event_type", "event_topic", "event"], fields("ctx_delivered"))
|
|
];
|
|
fields("ctx_dropped") ->
|
|
Event = 'message.dropped',
|
|
[
|
|
{"event_type", event_type_sc(Event)},
|
|
{"event", event_sc(Event)},
|
|
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
|
|
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})}
|
|
| msg_event_common_fields()
|
|
];
|
|
fields("ctx_connected") ->
|
|
Event = 'client.connected',
|
|
[
|
|
{"event_type", event_type_sc(Event)},
|
|
{"event", event_sc(Event)},
|
|
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
|
|
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
|
|
{"mountpoint", sc(binary(), #{desc => ?DESC("event_mountpoint")})},
|
|
{"peername", sc(binary(), #{desc => ?DESC("event_peername")})},
|
|
{"sockname", sc(binary(), #{desc => ?DESC("event_sockname")})},
|
|
{"proto_name", sc(binary(), #{desc => ?DESC("event_proto_name")})},
|
|
{"proto_ver", sc(binary(), #{desc => ?DESC("event_proto_ver")})},
|
|
{"keepalive", sc(integer(), #{desc => ?DESC("event_keepalive")})},
|
|
{"clean_start", sc(boolean(), #{desc => ?DESC("event_clean_start"), default => true})},
|
|
{"expiry_interval", sc(integer(), #{desc => ?DESC("event_expiry_interval")})},
|
|
{"is_bridge", sc(boolean(), #{desc => ?DESC("event_is_bridge"), default => false})},
|
|
{"connected_at",
|
|
sc(integer(), #{
|
|
desc => ?DESC("event_connected_at")
|
|
})}
|
|
];
|
|
fields("ctx_disconnected") ->
|
|
Event = 'client.disconnected',
|
|
[
|
|
{"event_type", event_type_sc(Event)},
|
|
{"event", event_sc(Event)},
|
|
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
|
|
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
|
|
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_disconnected_reason")})},
|
|
{"peername", sc(binary(), #{desc => ?DESC("event_peername")})},
|
|
{"sockname", sc(binary(), #{desc => ?DESC("event_sockname")})},
|
|
{"disconnected_at",
|
|
sc(integer(), #{
|
|
desc => ?DESC("event_ctx_disconnected_da")
|
|
})}
|
|
];
|
|
fields("ctx_connack") ->
|
|
Event = 'client.connack',
|
|
[
|
|
{"event_type", event_type_sc(Event)},
|
|
{"event", event_sc(Event)},
|
|
{"reason_code", sc(binary(), #{desc => ?DESC("event_ctx_connack_reason_code")})},
|
|
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
|
|
{"clean_start", sc(boolean(), #{desc => ?DESC("event_clean_start"), default => true})},
|
|
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
|
|
{"peername", sc(binary(), #{desc => ?DESC("event_peername")})},
|
|
{"sockname", sc(binary(), #{desc => ?DESC("event_sockname")})},
|
|
{"proto_name", sc(binary(), #{desc => ?DESC("event_proto_name")})},
|
|
{"proto_ver", sc(binary(), #{desc => ?DESC("event_proto_ver")})},
|
|
{"keepalive", sc(integer(), #{desc => ?DESC("event_keepalive")})},
|
|
{"expiry_interval", sc(integer(), #{desc => ?DESC("event_expiry_interval")})},
|
|
{"connected_at",
|
|
sc(integer(), #{
|
|
desc => ?DESC("event_connected_at")
|
|
})}
|
|
];
|
|
fields("ctx_check_authz_complete") ->
|
|
Event = 'client.check_authz_complete',
|
|
[
|
|
{"event_type", event_type_sc(Event)},
|
|
{"event", event_sc(Event)},
|
|
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
|
|
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
|
|
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
|
|
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
|
|
{"action", sc(binary(), #{desc => ?DESC("event_action")})},
|
|
{"authz_source", sc(binary(), #{desc => ?DESC("event_authz_source")})},
|
|
{"result", sc(binary(), #{desc => ?DESC("event_result")})}
|
|
];
|
|
fields("ctx_bridge_mqtt") ->
|
|
Event = '$bridges/mqtt:*',
|
|
EventBin = atom_to_binary(Event),
|
|
[
|
|
{"event_type", event_type_sc(Event)},
|
|
{"event", event_sc(EventBin)},
|
|
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
|
|
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
|
|
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
|
|
{"server", sc(binary(), #{desc => ?DESC("event_server")})},
|
|
{"dup", sc(binary(), #{desc => ?DESC("event_dup")})},
|
|
{"retain", sc(binary(), #{desc => ?DESC("event_retain")})},
|
|
{"message_received_at", publish_received_at_sc()},
|
|
qos()
|
|
];
|
|
fields("ctx_delivery_dropped") ->
|
|
Event = 'delivery.dropped',
|
|
[
|
|
{"event_type", event_type_sc(Event)},
|
|
{"event", event_sc(Event)},
|
|
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
|
|
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})},
|
|
{"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})},
|
|
{"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})}
|
|
| msg_event_common_fields()
|
|
].
|
|
|
|
qos() ->
|
|
{"qos", sc(emqx_schema:qos(), #{desc => ?DESC("event_qos")})}.
|
|
|
|
rule_id() ->
|
|
{"id",
|
|
sc(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC("rule_id"),
|
|
required => true,
|
|
example => "293fb66f"
|
|
}
|
|
)}.
|
|
|
|
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
|
|
|
|
ref(Field) -> hoconsc:ref(?MODULE, Field).
|
|
|
|
event_type_sc(Event) ->
|
|
EventType = event_to_event_type(Event),
|
|
sc(EventType, #{desc => ?DESC("event_event_type"), required => true}).
|
|
|
|
-spec event_to_event_type(atom()) -> atom().
|
|
event_to_event_type(Event) ->
|
|
binary_to_atom(binary:replace(atom_to_binary(Event), <<".">>, <<"_">>)).
|
|
|
|
event_sc(Event) when is_binary(Event) ->
|
|
%% only exception is `$bridges/...'.
|
|
sc(binary(), #{default => Event, importance => ?IMPORTANCE_HIDDEN});
|
|
event_sc(Event) ->
|
|
sc(Event, #{default => Event, importance => ?IMPORTANCE_HIDDEN}).
|
|
|
|
without(FieldNames, Fields) ->
|
|
lists:foldl(fun proplists:delete/2, Fields, FieldNames).
|
|
|
|
publish_received_at_sc() ->
|
|
sc(integer(), #{desc => ?DESC("event_publish_received_at")}).
|
|
|
|
msg_event_common_fields() ->
|
|
[
|
|
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
|
|
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
|
|
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
|
|
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
|
|
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
|
|
{"publish_received_at", publish_received_at_sc()},
|
|
qos()
|
|
].
|