Merge pull request #7283 from terry-xiaoyu/default_value_for_rates

Fix HTTP APIs for getting rule events and testing rule SQLs
This commit is contained in:
Xinyu Liu 2022-03-11 20:17:58 +08:00 committed by GitHub
commit 4025e79d1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 98 additions and 52 deletions

View File

@ -8,7 +8,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.0-8:1.13.3-24.2.1-1-al
export EMQX_DEFAULT_RUNNER = alpine:3.14
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
export EMQX_DASHBOARD_VERSION ?= v0.21.0
export EMQX_DASHBOARD_VERSION ?= v0.23.0
export DOCKERFILE := deploy/docker/Dockerfile
export EMQX_REL_FORM ?= tgz
ifeq ($(OS),Windows_NT)

View File

@ -16,8 +16,9 @@
%% Bad Request
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(NOT_MATCH, 'NOT_MATCH').
-define(ALREADY_EXISTS, 'ALREADY_EXISTS').
-define(ALREADY_EXISTS, 'ALREADY_EXISTS').
-define(BAD_CONFIG_SCHEMA, 'BAD_CONFIG_SCHEMA').
-define(BAD_LISTENER_ID, 'BAD_LISTENER_ID').
-define(BAD_NODE_NAME, 'BAD_NODE_NAME').
@ -49,7 +50,8 @@
%% All codes
-define(ERROR_CODES,
[ {'BAD_REQUEST', <<"Request parameters are not legal">>}
, {'ALREADY_EXISTS', <<"Resource already existed">>}
, {'NOT_MATCH', <<"Conditions are not matched">>}
, {'ALREADY_EXISTS', <<"Resource already existed">>}
, {'BAD_CONFIG_SCHEMA', <<"Configuration data is not legal">>}
, {'BAD_LISTENER_ID', <<"Bad listener ID">>}
, {'BAD_NODE_NAME', <<"Bad Node Name">>}

View File

@ -30,7 +30,7 @@
%% callbacks for emqtt
-export([ handle_puback/2
, handle_publish/2
, handle_publish/3
, handle_disconnected/2
]).
@ -52,7 +52,7 @@ start(Config) ->
Mountpoint = maps:get(receive_mountpoint, Config, undefined),
Subscriptions = maps:get(subscriptions, Config, undefined),
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
Handlers = make_hdlr(Parent, Vars),
Handlers = make_hdlr(Parent, Vars, #{server => ip_port_to_server(Host, Port)}),
Config1 = Config#{
msg_handler => Handlers,
host => Host,
@ -161,12 +161,12 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
?SLOG(warning, #{msg => "publish_to_remote_node_falied",
packet_id => PktId, reason_code => RC}).
handle_publish(Msg, undefined) ->
handle_publish(Msg, undefined, _Opts) ->
?SLOG(error, #{msg => "cannot_publish_to_local_broker_as"
"_'ingress'_is_not_configured",
message => Msg});
handle_publish(#{properties := Props} = Msg0, Vars) ->
Msg = format_msg_received(Msg0),
handle_publish(#{properties := Props} = Msg0, Vars, Opts) ->
Msg = format_msg_received(Msg0, Opts),
?SLOG(debug, #{msg => "publish_to_local_broker",
message => Msg, vars => Vars}),
case Vars of
@ -179,9 +179,9 @@ handle_publish(#{properties := Props} = Msg0, Vars) ->
handle_disconnected(Reason, Parent) ->
Parent ! {disconnected, self(), Reason}.
make_hdlr(Parent, Vars) ->
make_hdlr(Parent, Vars, Opts) ->
#{puback => {fun ?MODULE:handle_puback/2, [Parent]},
publish => {fun ?MODULE:handle_publish/2, [Vars]},
publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]},
disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
}.
@ -212,8 +212,9 @@ maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopi
end.
format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
qos := QoS, retain := Retain, topic := Topic}) ->
qos := QoS, retain := Retain, topic := Topic}, #{server := Server}) ->
#{ id => emqx_guid:to_hexstr(emqx_guid:gen())
, server => Server
, payload => Payload
, topic => Topic
, qos => QoS
@ -236,3 +237,10 @@ printable_maps(Headers) ->
};
(K, V0, AccIn) -> AccIn#{K => V0}
end, #{}, Headers).
ip_port_to_server(Host, Port) ->
HostStr = case inet:ntoa(Host) of
{error, einval} -> Host;
IPStr -> IPStr
end,
list_to_binary(io_lib:format("~s:~w", [HostStr, Port])).

View File

@ -28,6 +28,8 @@
, egress_desc/0
]).
-export([non_empty_string/1]).
-import(emqx_schema, [mk_duration/2]).
roots() ->
@ -101,6 +103,7 @@ fields("ingress") ->
[ {remote_topic,
sc(binary(),
#{ required => true
, validator => fun ?MODULE:non_empty_string/1
, desc => "Receive messages from which topic of the remote broker"
})}
, {remote_qos,
@ -110,7 +113,8 @@ fields("ingress") ->
})}
, {local_topic,
sc(binary(),
#{ desc => """
#{ validator => fun ?MODULE:non_empty_string/1
, desc => """
Send messages to which topic of the local broker.<br>
Template with variables is allowed.
"""
@ -135,10 +139,12 @@ fields("egress") ->
[ {local_topic,
sc(binary(),
#{ desc => "The local topic to be forwarded to the remote broker"
, validator => fun ?MODULE:non_empty_string/1
})}
, {remote_topic,
sc(binary(),
#{ default => <<"${topic}">>
, validator => fun ?MODULE:non_empty_string/1
, desc => """
Forward to which topic of the remote broker.<br>
Template with variables is allowed.
@ -233,5 +239,10 @@ Template with variables is allowed."""
qos() ->
hoconsc:union([emqx_schema:qos(), binary()]).
non_empty_string(<<>>) -> {error, empty_string_not_allowed};
non_empty_string("") -> {error, empty_string_not_allowed};
non_empty_string(S) when is_binary(S); is_list(S) -> ok;
non_empty_string(_) -> {error, invalid_string}.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).

View File

@ -58,13 +58,13 @@
-export_type([metrics/0, handler_name/0, metric_id/0]).
-type rate() :: #{
current => float(),
max => float(),
last5m => float()
current := float(),
max := float(),
last5m := float()
}.
-type metrics() :: #{
counters => #{atom() => integer()},
rate => #{atom() => rate()}
counters := #{atom() => integer()},
rate := #{atom() => rate()}
}.
-type handler_name() :: atom().
-type metric_id() :: binary().
@ -158,10 +158,10 @@ init(Name) ->
{ok, #state{}}.
handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) ->
{reply, #{}, State};
{reply, make_rate(0, 0, 0), State};
handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
{reply, case maps:get(Id, Rates, undefined) of
undefined -> #{};
undefined -> make_rate(0, 0, 0);
RatesPerId -> format_rates_of_id(RatesPerId)
end, State};
@ -303,7 +303,13 @@ format_rates_of_id(RatesPerId) ->
end, RatesPerId).
format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) ->
#{max => precision(Max, 2), current => precision(Current, 2), last5m => precision(Last5Min, 2)}.
make_rate(Current, Max, Last5Min).
make_rate(Current, Max, Last5Min) ->
#{ current => precision(Current, 2)
, max => precision(Max, 2)
, last5m => precision(Last5Min, 2)
}.
precision(Float, N) ->
Base = math:pow(10, N),

View File

@ -15,10 +15,11 @@
-spec check_params(map(), tag()) -> {ok, map()} | {error, term()}.
check_params(Params, Tag) ->
BTag = atom_to_binary(Tag),
case emqx_hocon:check(?MODULE, #{BTag => Params}) of
{ok, #{Tag := Checked}} ->
{ok, Checked};
{error, Reason} ->
Opts = #{atom_key => true, required => false},
try hocon_tconf:check_plain(?MODULE, #{BTag => Params}, Opts, [Tag]) of
#{Tag := Checked} -> {ok, Checked}
catch
throw : Reason ->
?SLOG(error, #{msg => "check_rule_params_failed",
reason => Reason
}),
@ -72,6 +73,7 @@ fields("rule_test") ->
, ref("ctx_dropped")
, ref("ctx_connected")
, ref("ctx_disconnected")
, ref("ctx_bridge_mqtt")
]),
#{desc => "The context of the event for testing",
default => #{}})}
@ -204,7 +206,20 @@ fields("ctx_disconnected") ->
, {"sockname", sc(binary(), #{desc => "The IP Address and Port of the Local Listener"})}
, {"disconnected_at", sc(integer(), #{
desc => "The Time that this Client is Disconnected"})}
].
];
fields("ctx_bridge_mqtt") ->
[ {"event_type", sc('$bridges/mqtt:*', #{desc => "Event Type", required => true})}
, {"id", sc(binary(), #{desc => "Message ID"})}
, {"payload", sc(binary(), #{desc => "The Message Payload"})}
, {"topic", sc(binary(), #{desc => "Message Topic"})}
, {"server", sc(binary(), #{desc => "The IP address (or hostname) and port of the MQTT broker,"
" in IP:Port format"})}
, {"dup", sc(binary(), #{desc => "The DUP flag of the MQTT message"})}
, {"retain", sc(binary(), #{desc => "If is a retain message"})}
, {"message_received_at", sc(integer(), #{
desc => "The Time that this Message is Received"})}
] ++ [qos()].
qos() ->
{"qos", sc(emqx_schema:qos(), #{desc => "The Message QoS"})}.

View File

@ -41,7 +41,7 @@
{ok, CheckedParams} ->
EXPR;
{error, REASON} ->
{400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}}
{400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(REASON)}}
end).
-define(METRICS(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS,
O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5),
@ -85,10 +85,8 @@ api_spec() ->
paths() -> ["/rule_events", "/rule_test", "/rules", "/rules/:id"].
error_schema(Code, Message) ->
[ {code, mk(string(), #{example => Code})}
, {message, mk(string(), #{example => Message})}
].
error_schema(Code, Message) when is_atom(Code) ->
emqx_dashboard_swagger:error_codes([Code], list_to_binary(Message)).
rule_creation_schema() ->
ref(emqx_rule_api_schema, "rule_creation").
@ -115,7 +113,7 @@ schema("/rules") ->
summary => <<"Create a Rule">>,
requestBody => rule_creation_schema(),
responses => #{
400 => error_schema('BAD_ARGS', "Invalid Parameters"),
400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
201 => rule_info_schema()
}}
};
@ -153,7 +151,7 @@ schema("/rules/:id") ->
parameters => param_path_id(),
requestBody => rule_creation_schema(),
responses => #{
400 => error_schema('BAD_ARGS', "Invalid Parameters"),
400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
200 => rule_info_schema()
}
},
@ -177,7 +175,7 @@ schema("/rule_test") ->
summary => <<"Test a Rule">>,
requestBody => rule_test_schema(),
responses => #{
400 => error_schema('BAD_ARGS', "Invalid Parameters"),
400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
412 => error_schema('NOT_MATCH', "SQL Not Match"),
200 => <<"Rule Test Pass">>
}
@ -201,13 +199,13 @@ param_path_id() ->
'/rules'(post, #{body := Params0}) ->
case maps:get(<<"id">>, Params0, list_to_binary(emqx_misc:gen_id(8))) of
<<>> ->
{400, #{code => 'BAD_ARGS', message => <<"empty rule id is not allowed">>}};
{400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
Id ->
Params = filter_out_request_body(Params0),
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx_rule_engine:get_rule(Id) of
{ok, _Rule} ->
{400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}};
{400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}};
not_found ->
case emqx_conf:update(ConfPath, Params, #{}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
@ -216,7 +214,7 @@ param_path_id() ->
{error, Reason} ->
?SLOG(error, #{msg => "create_rule_failed",
id => Id, reason => Reason}),
{400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
{400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
end
end
end.
@ -224,6 +222,8 @@ param_path_id() ->
'/rule_test'(post, #{body := Params}) ->
?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of
{ok, Result} -> {200, Result};
{error, {parse_error, Reason}} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
{error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}}
end).
@ -245,7 +245,7 @@ param_path_id() ->
{error, Reason} ->
?SLOG(error, #{msg => "update_rule_failed",
id => Id, reason => Reason}),
{400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
{400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
end;
'/rules/:id'(delete, #{bindings := #{id := Id}}) ->
@ -255,7 +255,7 @@ param_path_id() ->
{error, Reason} ->
?SLOG(error, #{msg => "delete_rule_failed",
id => Id, reason => Reason}),
{500, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
{500, #{code => 'INTERNAL_ERROR', message => ?ERR_BADARGS(Reason)}}
end.
%%------------------------------------------------------------------------------

View File

@ -607,7 +607,7 @@ columns_with_exam(<<"$bridges/mqtt", _/binary>> = EventName) ->
[ {<<"event">>, EventName}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"server">>, <<"192.168.0.10:1883">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"dup">>, false}

View File

@ -22,21 +22,25 @@
, get_selected_data/3
]).
-spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, nomatch}.
-spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, term()}.
test(#{sql := Sql, context := Context}) ->
{ok, Select} = emqx_rule_sqlparser:parse(Sql),
InTopic = maps:get(topic, Context, <<>>),
EventTopics = emqx_rule_sqlparser:select_from(Select),
case lists:all(fun is_publish_topic/1, EventTopics) of
true ->
%% test if the topic matches the topic filters in the rule
case emqx_plugin_libs_rule:can_topic_match_oneof(InTopic, EventTopics) of
true -> test_rule(Sql, Select, Context, EventTopics);
false -> {error, nomatch}
case emqx_rule_sqlparser:parse(Sql) of
{ok, Select} ->
InTopic = maps:get(topic, Context, <<>>),
EventTopics = emqx_rule_sqlparser:select_from(Select),
case lists:all(fun is_publish_topic/1, EventTopics) of
true ->
%% test if the topic matches the topic filters in the rule
case emqx_plugin_libs_rule:can_topic_match_oneof(InTopic, EventTopics) of
true -> test_rule(Sql, Select, Context, EventTopics);
false -> {error, nomatch}
end;
false ->
%% the rule is for both publish and events, test it directly
test_rule(Sql, Select, Context, EventTopics)
end;
false ->
%% the rule is for both publish and events, test it directly
test_rule(Sql, Select, Context, EventTopics)
{error, Reason} ->
{error, Reason}
end.
test_rule(Sql, Select, Context, EventTopics) ->