Merge pull request #12770 from thalesmg/mv-metrics-m-20240321

feat(message validation): implement metrics
This commit is contained in:
Thales Macedo Garitezi 2024-03-26 08:59:05 -03:00 committed by GitHub
commit cc37030265
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 987 additions and 35 deletions

View File

@ -43,6 +43,7 @@
{emqx_management,4}.
{emqx_management,5}.
{emqx_metrics,1}.
{emqx_metrics,2}.
{emqx_mgmt_api_plugins,1}.
{emqx_mgmt_api_plugins,2}.
{emqx_mgmt_cluster,1}.

View File

@ -207,6 +207,10 @@
{counter, 'messages.publish'},
% Messages dropped due to no subscribers
{counter, 'messages.dropped'},
%% % Messages that failed validations
{counter, 'messages.validation_failed'},
%% % Messages that passed validations
{counter, 'messages.validation_succeeded'},
% QoS2 Messages expired
{counter, 'messages.dropped.await_pubrel_timeout'},
% Messages dropped
@ -712,4 +716,6 @@ reserved_idx('overload_protection.delay.timeout') -> 401;
reserved_idx('overload_protection.hibernation') -> 402;
reserved_idx('overload_protection.gc') -> 403;
reserved_idx('overload_protection.new_conn') -> 404;
reserved_idx('messages.validation_succeeded') -> 405;
reserved_idx('messages.validation_failed') -> 406;
reserved_idx(_) -> undefined.

View File

@ -20,6 +20,7 @@
-export([
introduced_in/0,
deprecated_since/0,
get_metrics/4
]).
@ -29,6 +30,9 @@
introduced_in() ->
"5.1.0".
deprecated_since() ->
"5.7.0".
-spec get_metrics(
[node()],
emqx_metrics_worker:handler_name(),

View File

@ -0,0 +1,55 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_metrics_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
get_metrics/4,
%% introduced in v2
reset_metrics/4
]).
-include("bpapi.hrl").
introduced_in() ->
"5.7.0".
-spec get_metrics(
[node()],
emqx_metrics_worker:handler_name(),
emqx_metrics_worker:metric_id(),
timeout()
) -> emqx_rpc:erpc_multicall(emqx_metrics_worker:metrics()).
get_metrics(Nodes, HandlerName, MetricId, Timeout) ->
erpc:multicall(Nodes, emqx_metrics_worker, get_metrics, [HandlerName, MetricId], Timeout).
%%--------------------------------------------------------------------------------
%% Introduced in V2
%%--------------------------------------------------------------------------------
-spec reset_metrics(
[node()],
emqx_metrics_worker:handler_name(),
emqx_metrics_worker:metric_id(),
timeout()
) -> emqx_rpc:erpc_multicall(emqx_metrics_worker:metrics()).
reset_metrics(Nodes, HandlerName, MetricId, Timeout) ->
erpc:multicall(Nodes, emqx_metrics_worker, reset_metrics, [HandlerName, MetricId], Timeout).

View File

@ -38,6 +38,7 @@
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
-export([upgrade_raw_conf/1]).
-export([tr_prometheus_collectors/1]).
%% internal exports for `emqx_enterprise_schema' only.
-export([ensure_unicode_path/2, convert_rotation/2, log_handler_common_confs/2]).

View File

@ -65,6 +65,8 @@
%, received_bytes
sent,
%, sent_bytes
validation_succeeded,
validation_failed,
dropped
]).
@ -83,6 +85,8 @@
%received_bytes => received_bytes_rate,
%sent_bytes => sent_bytes_rate,
sent => sent_msg_rate,
validation_succeeded => validation_succeeded_rate,
validation_failed => validation_failed_rate,
dropped => dropped_msg_rate
}).

View File

@ -18,6 +18,7 @@
-include("emqx_dashboard.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include_lib("emqx/include/logger.hrl").
-behaviour(gen_server).
@ -186,6 +187,7 @@ handle_cast(_Request, State = #state{}) ->
handle_info({sample, Time}, State = #state{last = Last}) ->
Now = sample(Time),
{atomic, ok} = flush(Last, Now),
?tp(dashboard_monitor_flushed, #{}),
sample_timer(),
{noreply, State#state{last = Now}};
handle_info(clean_expired, State) ->
@ -424,6 +426,8 @@ stats(received) -> emqx_metrics:val('messages.received');
stats(received_bytes) -> emqx_metrics:val('bytes.received');
stats(sent) -> emqx_metrics:val('messages.sent');
stats(sent_bytes) -> emqx_metrics:val('bytes.sent');
stats(validation_succeeded) -> emqx_metrics:val('messages.validation_succeeded');
stats(validation_failed) -> emqx_metrics:val('messages.validation_failed');
stats(dropped) -> emqx_metrics:val('messages.dropped').
%% -------------------------------------------------------------------------------------------------

View File

@ -188,6 +188,10 @@ swagger_desc(sent_bytes) ->
swagger_desc_format("Sent bytes ");
swagger_desc(dropped) ->
swagger_desc_format("Dropped messages ");
swagger_desc(validation_succeeded) ->
swagger_desc_format("Message validations succeeded ");
swagger_desc(validation_failed) ->
swagger_desc_format("Message validations failed ");
swagger_desc(subscriptions) ->
<<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(topics) ->
@ -210,6 +214,10 @@ swagger_desc(sent_msg_rate) ->
%swagger_desc(sent_bytes_rate) -> swagger_desc_format("Sent bytes ", per);
swagger_desc(dropped_msg_rate) ->
swagger_desc_format("Dropped messages ", per);
swagger_desc(validation_succeeded_rate) ->
swagger_desc_format("Message validations succeeded ", per);
swagger_desc(validation_failed_rate) ->
swagger_desc_format("Message validations failed ", per);
swagger_desc(retained_msg_count) ->
<<"Retained messages count at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(shared_subscriptions) ->

View File

@ -23,6 +23,7 @@
-include("emqx_dashboard.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(SERVER, "http://127.0.0.1:18083").
@ -54,16 +55,35 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_mgmt_api_test_util:init_suite([emqx, emqx_conf, emqx_retainer]),
emqx_common_test_helpers:clear_screen(),
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
{emqx_retainer, ?BASE_RETAINER_CONF},
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard(
"dashboard.listeners.http { enable = true, bind = 18083 }\n"
"dashboard.sample_interval = 1s"
)
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
[{apps, Apps} | Config].
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(_TestCase, Config) ->
ok = snabbkaffe:start_trace(),
ct:timetrap({seconds, 30}),
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite([emqx_retainer]).
set_special_configs(emqx_retainer) ->
emqx_retainer:update_config(?BASE_RETAINER_CONF),
ok;
set_special_configs(_App) ->
end_per_testcase(_TestCase, _Config) ->
ok = snabbkaffe:stop(),
ok.
%%--------------------------------------------------------------------
@ -71,7 +91,11 @@ set_special_configs(_App) ->
%%--------------------------------------------------------------------
t_monitor_samplers_all(_Config) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
infinity
),
Size = mnesia:table_info(emqx_dashboard_monitor, size),
All = emqx_dashboard_monitor:samplers(all, infinity),
All2 = emqx_dashboard_monitor:samplers(),
@ -80,25 +104,39 @@ t_monitor_samplers_all(_Config) ->
ok.
t_monitor_samplers_latest(_Config) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
Samplers = emqx_dashboard_monitor:samplers(node(), 2),
Latest = emqx_dashboard_monitor:samplers(node(), 1),
?assert(erlang:length(Samplers) == 2),
?assert(erlang:length(Latest) == 1),
?assert(hd(Latest) == lists:nth(2, Samplers)),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
infinity
),
?retry(1_000, 10, begin
Samplers = emqx_dashboard_monitor:samplers(node(), 2),
Latest = emqx_dashboard_monitor:samplers(node(), 1),
?assert(erlang:length(Samplers) == 2, #{samplers => Samplers}),
?assert(erlang:length(Latest) == 1, #{latest => Latest}),
?assert(hd(Latest) == lists:nth(2, Samplers))
end),
ok.
t_monitor_sampler_format(_Config) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
{ok, _} =
snabbkaffe:block_until(
?match_event(#{?snk_kind := dashboard_monitor_flushed}),
infinity
),
Latest = hd(emqx_dashboard_monitor:samplers(node(), 1)),
SamplerKeys = maps:keys(Latest),
[?assert(lists:member(SamplerName, SamplerKeys)) || SamplerName <- ?SAMPLER_LIST],
ok.
t_monitor_api(_) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
{ok, Samplers} = request(["monitor"], "latest=20"),
?assert(erlang:length(Samplers) >= 2),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
infinity
),
{ok, Samplers} = request(["monitor"], "latest=200"),
?assert(erlang:length(Samplers) >= 2, #{samplers => Samplers}),
Fun =
fun(Sampler) ->
Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)],
@ -110,7 +148,11 @@ t_monitor_api(_) ->
ok.
t_monitor_current_api(_) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
infinity
),
{ok, Rate} = request(["monitor_current"]),
[
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
@ -210,7 +252,11 @@ t_monitor_reset(_) ->
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
],
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(1, #{?snk_kind := dashboard_monitor_flushed}),
infinity
),
{ok, Samplers} = request(["monitor"], "latest=1"),
?assertEqual(1, erlang:length(Samplers)),
ok.

View File

@ -111,6 +111,10 @@ fields(Name) ->
translations() ->
emqx_conf_schema:translations().
translation("prometheus") ->
[
{"collectors", fun tr_prometheus_collectors/1}
];
translation(Name) ->
emqx_conf_schema:translation(Name).
@ -189,3 +193,9 @@ audit_log_conf() ->
}
)}
].
tr_prometheus_collectors(Conf) ->
[
{'/prometheus/message_validation', emqx_prometheus_message_validation}
| emqx_conf_schema:tr_prometheus_collectors(Conf)
].

View File

@ -140,10 +140,13 @@ on_message_publish(Message = #message{topic = Topic, headers = Headers}) ->
Validations ->
case run_validations(Validations, Message) of
ok ->
emqx_metrics:inc('messages.validation_succeeded'),
{ok, Message};
drop ->
emqx_metrics:inc('messages.validation_failed'),
{stop, Message#message{headers = Headers#{allow_publish => false}}};
disconnect ->
emqx_metrics:inc('messages.validation_failed'),
{stop, Message#message{
headers = Headers#{
allow_publish => false,
@ -380,14 +383,17 @@ run_validations(Validations, Message) ->
emqx_rule_runtime:clear_rule_payload(),
Fun = fun(Validation, Acc) ->
#{name := Name} = Validation,
emqx_message_validation_registry:inc_matched(Name),
case run_validation(Validation, Message) of
ok ->
emqx_message_validation_registry:inc_succeeded(Name),
{cont, Acc};
ignore ->
trace_failure(Validation, "validation_failed", #{
validation => Name,
action => ignore
}),
emqx_message_validation_registry:inc_failed(Name),
run_message_validation_failed_hook(Message, Validation),
{cont, Acc};
FailureAction ->
@ -395,6 +401,7 @@ run_validations(Validations, Message) ->
validation => Name,
action => FailureAction
}),
emqx_message_validation_registry:inc_failed(Name),
run_message_validation_failed_hook(Message, Validation),
{halt, FailureAction}
end

View File

@ -24,6 +24,8 @@
'/message_validations'/2,
'/message_validations/reorder'/2,
'/message_validations/validation/:name'/2,
'/message_validations/validation/:name/metrics'/2,
'/message_validations/validation/:name/metrics/reset'/2,
'/message_validations/validation/:name/enable/:enable'/2
]).
@ -32,6 +34,7 @@
%%-------------------------------------------------------------------------------------------------
-define(TAGS, [<<"Message Validation">>]).
-define(METRIC_NAME, message_validation).
%%-------------------------------------------------------------------------------------------------
%% `minirest' and `minirest_trails' API
@ -47,6 +50,8 @@ paths() ->
"/message_validations",
"/message_validations/reorder",
"/message_validations/validation/:name",
"/message_validations/validation/:name/metrics",
"/message_validations/validation/:name/metrics/reset",
"/message_validations/validation/:name/enable/:enable"
].
@ -173,6 +178,43 @@ schema("/message_validations/validation/:name") ->
}
}
};
schema("/message_validations/validation/:name/metrics") ->
#{
'operationId' => '/message_validations/validation/:name/metrics',
get => #{
tags => ?TAGS,
summary => <<"Get validation metrics">>,
description => ?DESC("get_validation_metrics"),
parameters => [param_path_name()],
responses =>
#{
200 =>
emqx_dashboard_swagger:schema_with_examples(
ref(get_metrics),
#{
sample =>
#{value => example_return_metrics()}
}
),
404 => error_schema('NOT_FOUND', "Validation not found")
}
}
};
schema("/message_validations/validation/:name/metrics/reset") ->
#{
'operationId' => '/message_validations/validation/:name/metrics/reset',
post => #{
tags => ?TAGS,
summary => <<"Reset validation metrics">>,
description => ?DESC("reset_validation_metrics"),
parameters => [param_path_name()],
responses =>
#{
204 => <<"No content">>,
404 => error_schema('NOT_FOUND', "Validation not found")
}
}
};
schema("/message_validations/validation/:name/enable/:enable") ->
#{
'operationId' => '/message_validations/validation/:name/enable/:enable',
@ -230,6 +272,22 @@ fields(before) ->
fields(reorder) ->
[
{order, mk(array(binary()), #{required => true, in => body})}
];
fields(get_metrics) ->
[
{metrics, mk(ref(metrics), #{})},
{node_metrics, mk(ref(node_metrics), #{})}
];
fields(metrics) ->
[
{matched, mk(non_neg_integer(), #{})},
{succeeded, mk(non_neg_integer(), #{})},
{failed, mk(non_neg_integer(), #{})}
];
fields(node_metrics) ->
[
{node, mk(binary(), #{})}
| fields(metrics)
].
%%-------------------------------------------------------------------------------------------------
@ -299,6 +357,47 @@ fields(reorder) ->
not_found()
).
'/message_validations/validation/:name/metrics'(get, #{bindings := #{name := Name}}) ->
with_validation(
Name,
fun() ->
Nodes = emqx:running_nodes(),
Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, Name, 5_000),
NodeResults = lists:zip(Nodes, Results),
NodeErrors = [Result || Result = {_Node, {NOk, _}} <- NodeResults, NOk =/= ok],
NodeErrors == [] orelse
?SLOG(warning, #{
msg => "rpc_get_validation_metrics_errors",
errors => NodeErrors
}),
NodeMetrics = [format_metrics(Node, Metrics) || {Node, {ok, Metrics}} <- NodeResults],
Response = #{
metrics => aggregate_metrics(NodeMetrics),
node_metrics => NodeMetrics
},
?OK(Response)
end,
not_found()
).
'/message_validations/validation/:name/metrics/reset'(post, #{bindings := #{name := Name}}) ->
with_validation(
Name,
fun() ->
Nodes = emqx:running_nodes(),
Results = emqx_metrics_proto_v2:reset_metrics(Nodes, ?METRIC_NAME, Name, 5_000),
NodeResults = lists:zip(Nodes, Results),
NodeErrors = [Result || Result = {_Node, {NOk, _}} <- NodeResults, NOk =/= ok],
NodeErrors == [] orelse
?SLOG(warning, #{
msg => "rpc_reset_validation_metrics_errors",
errors => NodeErrors
}),
?NO_CONTENT
end,
not_found()
).
%%-------------------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------------------
@ -335,6 +434,10 @@ example_return_lookup() ->
%% TODO
#{}.
example_return_metrics() ->
%% TODO
#{}.
error_schema(Code, Message) ->
error_schema(Code, Message, _ExtraFields = []).
@ -409,3 +512,51 @@ make_serializable(Validation) ->
} =
hocon_tconf:make_serializable(Schema, RawConfig, #{}),
Serialized.
format_metrics(Node, #{
counters := #{
'matched' := Matched,
'succeeded' := Succeeded,
'failed' := Failed
},
rate := #{
'matched' := #{
current := MatchedRate,
last5m := Matched5mRate,
max := MatchedMaxRate
}
}
}) ->
#{
metrics => #{
'matched' => Matched,
'succeeded' => Succeeded,
'failed' => Failed,
rate => MatchedRate,
rate_last5m => Matched5mRate,
rate_max => MatchedMaxRate
},
node => Node
};
format_metrics(Node, _) ->
#{
metrics => #{
'matched' => 0,
'succeeded' => 0,
'failed' => 0,
rate => 0,
rate_last5m => 0,
rate_max => 0
},
node => Node
}.
aggregate_metrics(NodeMetrics) ->
ErrorLogger = fun(_) -> ok end,
lists:foldl(
fun(#{metrics := Metrics}, Acc) ->
emqx_utils_maps:best_effort_recursive_sum(Metrics, Acc, ErrorLogger)
end,
#{},
NodeMetrics
).

View File

@ -15,6 +15,12 @@
matching_validations/1,
%% metrics
get_metrics/1,
inc_matched/1,
inc_succeeded/1,
inc_failed/1,
start_link/0,
metrics_worker_spec/0
]).
@ -36,7 +42,7 @@
-define(METRIC_NAME, message_validation).
-define(METRICS, [
'matched',
'passed',
'succeeded',
'failed'
]).
-define(RATE_METRICS, ['matched']).
@ -102,6 +108,22 @@ matching_validations(Topic) ->
metrics_worker_spec() ->
emqx_metrics_worker:child_spec(message_validation_metrics, ?METRIC_NAME).
-spec get_metrics(validation_name()) -> emqx_metrics_worker:metrics().
get_metrics(Name) ->
emqx_metrics_worker:get_metrics(?METRIC_NAME, Name).
-spec inc_matched(validation_name()) -> ok.
inc_matched(Name) ->
emqx_metrics_worker:inc(?METRIC_NAME, Name, 'matched').
-spec inc_succeeded(validation_name()) -> ok.
inc_succeeded(Name) ->
emqx_metrics_worker:inc(?METRIC_NAME, Name, 'succeeded').
-spec inc_failed(validation_name()) -> ok.
inc_failed(Name) ->
emqx_metrics_worker:inc(?METRIC_NAME, Name, 'failed').
%%------------------------------------------------------------------------------
%% `gen_server' API
%%------------------------------------------------------------------------------

View File

@ -53,6 +53,7 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(_TestCase, _Config) ->
clear_all_validations(),
snabbkaffe:stop(),
reset_all_global_metrics(),
emqx_common_test_helpers:call_janitor(),
ok.
@ -70,6 +71,14 @@ clear_all_validations() ->
emqx_message_validation:list()
).
reset_all_global_metrics() ->
lists:foreach(
fun({Name, _}) ->
emqx_metrics:set(Name, 0)
end,
emqx_metrics:all()
).
maybe_json_decode(X) ->
case emqx_utils_json:safe_decode(X, [return_maps]) of
{ok, Decoded} -> Decoded;
@ -196,6 +205,30 @@ disable(Name) ->
ct:pal("disable result:\n ~p", [Res]),
simplify_result(Res).
get_metrics(Name) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), "validation", Name, "metrics"]),
Res = request(get, Path, _Params = []),
ct:pal("get metrics result:\n ~p", [Res]),
simplify_result(Res).
reset_metrics(Name) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), "validation", Name, "metrics", "reset"]),
Res = request(post, Path, _Params = []),
ct:pal("reset metrics result:\n ~p", [Res]),
simplify_result(Res).
all_metrics() ->
Path = emqx_mgmt_api_test_util:api_path(["metrics"]),
Res = request(get, Path, _Params = []),
ct:pal("all metrics result:\n ~p", [Res]),
simplify_result(Res).
monitor_metrics() ->
Path = emqx_mgmt_api_test_util:api_path(["monitor"]),
Res = request(get, Path, _Params = []),
ct:pal("monitor metrics result:\n ~p", [Res]),
simplify_result(Res).
connect(ClientId) ->
connect(ClientId, _IsPersistent = false).
@ -363,6 +396,48 @@ trace_rule(Data, Envs, _Args) ->
get_traced_failures_from_rule_engine() ->
ets:tab2list(?RECORDED_EVENTS_TAB).
assert_all_metrics(Line, Expected) ->
Keys = maps:keys(Expected),
?retry(
100,
10,
begin
Res = all_metrics(),
?assertMatch({200, _}, Res),
{200, [Metrics]} = Res,
?assertEqual(Expected, maps:with(Keys, Metrics), #{line => Line})
end
),
ok.
-define(assertAllMetrics(Expected), assert_all_metrics(?LINE, Expected)).
%% check that dashboard monitor contains the success and failure metric keys
assert_monitor_metrics() ->
ok = snabbkaffe:start_trace(),
%% hack: force monitor to flush data now
{_, {ok, _}} =
?wait_async_action(
emqx_dashboard_monitor ! {sample, erlang:system_time(millisecond)},
#{?snk_kind := dashboard_monitor_flushed}
),
Res = monitor_metrics(),
?assertMatch({200, _}, Res),
{200, Metrics} = Res,
lists:foreach(
fun(M) ->
?assertMatch(
#{
<<"validation_failed">> := _,
<<"validation_succeeded">> := _
},
M
)
end,
Metrics
),
ok.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -703,6 +778,236 @@ t_enable_disable_via_api_endpoint(_Config) ->
ok.
t_metrics(_Config) ->
%% extra validation that always passes at the head to check global metrics
Name0 = <<"bar">>,
Check0 = sql_check(<<"select 1 where true">>),
Validation0 = validation(Name0, [Check0]),
{201, _} = insert(Validation0),
Name1 = <<"foo">>,
Check1 = sql_check(<<"select payload.x as x where x > 5">>),
Validation1 = validation(Name1, [Check1]),
%% Non existent
?assertMatch({404, _}, get_metrics(Name1)),
?assertAllMetrics(#{
<<"messages.dropped">> => 0,
<<"messages.validation_failed">> => 0,
<<"messages.validation_succeeded">> => 0
}),
{201, _} = insert(Validation1),
?assertMatch(
{200, #{
<<"metrics">> :=
#{
<<"matched">> := 0,
<<"succeeded">> := 0,
<<"failed">> := 0,
<<"rate">> := _,
<<"rate_last5m">> := _,
<<"rate_max">> := _
},
<<"node_metrics">> :=
[
#{
<<"node">> := _,
<<"metrics">> := #{
<<"matched">> := 0,
<<"succeeded">> := 0,
<<"failed">> := 0,
<<"rate">> := _,
<<"rate_last5m">> := _,
<<"rate_max">> := _
}
}
]
}},
get_metrics(Name1)
),
?assertAllMetrics(#{
<<"messages.dropped">> => 0,
<<"messages.validation_failed">> => 0,
<<"messages.validation_succeeded">> => 0
}),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
ok = publish(C, <<"t/1">>, #{x => 10}),
?assertReceive({publish, _}),
?retry(
100,
10,
?assertMatch(
{200, #{
<<"metrics">> :=
#{
<<"matched">> := 1,
<<"succeeded">> := 1,
<<"failed">> := 0
},
<<"node_metrics">> :=
[
#{
<<"node">> := _,
<<"metrics">> := #{
<<"matched">> := 1,
<<"succeeded">> := 1,
<<"failed">> := 0
}
}
]
}},
get_metrics(Name1)
)
),
?assertAllMetrics(#{
<<"messages.dropped">> => 0,
<<"messages.validation_failed">> => 0,
<<"messages.validation_succeeded">> => 1
}),
ok = publish(C, <<"t/1">>, #{x => 5}),
?assertNotReceive({publish, _}),
?retry(
100,
10,
?assertMatch(
{200, #{
<<"metrics">> :=
#{
<<"matched">> := 2,
<<"succeeded">> := 1,
<<"failed">> := 1
},
<<"node_metrics">> :=
[
#{
<<"node">> := _,
<<"metrics">> := #{
<<"matched">> := 2,
<<"succeeded">> := 1,
<<"failed">> := 1
}
}
]
}},
get_metrics(Name1)
)
),
?assertAllMetrics(#{
<<"messages.dropped">> => 0,
<<"messages.validation_failed">> => 1,
<<"messages.validation_succeeded">> => 1
}),
?assertMatch({204, _}, reset_metrics(Name1)),
?retry(
100,
10,
?assertMatch(
{200, #{
<<"metrics">> :=
#{
<<"matched">> := 0,
<<"succeeded">> := 0,
<<"failed">> := 0,
<<"rate">> := _,
<<"rate_last5m">> := _,
<<"rate_max">> := _
},
<<"node_metrics">> :=
[
#{
<<"node">> := _,
<<"metrics">> := #{
<<"matched">> := 0,
<<"succeeded">> := 0,
<<"failed">> := 0,
<<"rate">> := _,
<<"rate_last5m">> := _,
<<"rate_max">> := _
}
}
]
}},
get_metrics(Name1)
)
),
?assertAllMetrics(#{
<<"messages.dropped">> => 0,
<<"messages.validation_failed">> => 1,
<<"messages.validation_succeeded">> => 1
}),
%% updating a validation resets its metrics
ok = publish(C, <<"t/1">>, #{x => 5}),
?assertNotReceive({publish, _}),
ok = publish(C, <<"t/1">>, #{x => 10}),
?assertReceive({publish, _}),
?retry(
100,
10,
?assertMatch(
{200, #{
<<"metrics">> :=
#{
<<"matched">> := 2,
<<"succeeded">> := 1,
<<"failed">> := 1
},
<<"node_metrics">> :=
[
#{
<<"node">> := _,
<<"metrics">> := #{
<<"matched">> := 2,
<<"succeeded">> := 1,
<<"failed">> := 1
}
}
]
}},
get_metrics(Name1)
)
),
{200, _} = update(Validation1),
?retry(
100,
10,
?assertMatch(
{200, #{
<<"metrics">> :=
#{
<<"matched">> := 0,
<<"succeeded">> := 0,
<<"failed">> := 0
},
<<"node_metrics">> :=
[
#{
<<"node">> := _,
<<"metrics">> := #{
<<"matched">> := 0,
<<"succeeded">> := 0,
<<"failed">> := 0
}
}
]
}},
get_metrics(Name1)
)
),
assert_monitor_metrics(),
ok.
t_duplicated_schema_checks(_Config) ->
Name1 = <<"foo">>,
SerdeName = <<"myserde">>,

View File

@ -22,12 +22,27 @@
-define(PROMETHEUS_AUTH_COLLECTOR, emqx_prometheus_auth).
-define(PROMETHEUS_DATA_INTEGRATION_REGISTRY, '/prometheus/data_integration').
-define(PROMETHEUS_DATA_INTEGRATION_COLLECTOR, emqx_prometheus_data_integration).
-define(PROMETHEUS_MESSAGE_VALIDATION_REGISTRY, '/prometheus/message_validation').
-define(PROMETHEUS_MESSAGE_VALIDATION_COLLECTOR, emqx_prometheus_message_validation).
-define(PROMETHEUS_ALL_REGISTRYS, [
?PROMETHEUS_DEFAULT_REGISTRY,
?PROMETHEUS_AUTH_REGISTRY,
?PROMETHEUS_DATA_INTEGRATION_REGISTRY
-if(?EMQX_RELEASE_EDITION == ee).
-define(PROMETHEUS_EE_REGISTRIES, [
?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY
]).
%% ELSE if(?EMQX_RELEASE_EDITION == ee).
-else.
-define(PROMETHEUS_EE_REGISTRIES, []).
%% END if(?EMQX_RELEASE_EDITION == ee).
-endif.
-define(PROMETHEUS_ALL_REGISTRIES,
?PROMETHEUS_EE_REGISTRIES ++
[
?PROMETHEUS_DEFAULT_REGISTRY,
?PROMETHEUS_AUTH_REGISTRY,
?PROMETHEUS_DATA_INTEGRATION_REGISTRY
]
).
-define(PROM_DATA_MODE__NODE, node).
-define(PROM_DATA_MODE__ALL_NODES_AGGREGATED, all_nodes_aggregated).

View File

@ -43,11 +43,13 @@
namespace/0
]).
%% handlers
-export([
setting/2,
stats/2,
auth/2,
data_integration/2
data_integration/2,
message_validation/2
]).
-export([lookup_from_local_nodes/3]).
@ -67,7 +69,17 @@ paths() ->
"/prometheus/auth",
"/prometheus/stats",
"/prometheus/data_integration"
].
] ++ paths_ee().
-if(?EMQX_RELEASE_EDITION == ee).
paths_ee() ->
["/prometheus/message_validation"].
%% ELSE if(?EMQX_RELEASE_EDITION == ee).
-else.
paths_ee() ->
[].
%% END if(?EMQX_RELEASE_EDITION == ee).
-endif.
schema("/prometheus") ->
#{
@ -126,6 +138,19 @@ schema("/prometheus/data_integration") ->
responses =>
#{200 => prometheus_data_schema()}
}
};
schema("/prometheus/message_validation") ->
#{
'operationId' => message_validation,
get =>
#{
description => ?DESC(get_prom_message_validation),
tags => ?TAGS,
parameters => [ref(mode)],
security => security(),
responses =>
#{200 => prometheus_data_schema()}
}
}.
security() ->
@ -198,6 +223,9 @@ auth(get, #{headers := Headers, query_string := Qs}) ->
data_integration(get, #{headers := Headers, query_string := Qs}) ->
collect(emqx_prometheus_data_integration, collect_opts(Headers, Qs)).
message_validation(get, #{headers := Headers, query_string := Qs}) ->
collect(emqx_prometheus_message_validation, collect_opts(Headers, Qs)).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------

View File

@ -123,7 +123,7 @@ all_collectors() ->
prometheus_registry:collectors(Registry) ++ AccIn
end,
_InitAcc = [],
?PROMETHEUS_ALL_REGISTRYS
?PROMETHEUS_ALL_REGISTRIES
).
update_push_gateway(Prometheus) ->

View File

@ -0,0 +1,222 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_prometheus_message_validation).
-if(?EMQX_RELEASE_EDITION == ee).
%% for bpapi
-behaviour(emqx_prometheus_cluster).
%% Please don't remove this attribute, prometheus uses it to
%% automatically register collectors.
-behaviour(prometheus_collector).
-include("emqx_prometheus.hrl").
-include_lib("prometheus/include/prometheus.hrl").
-import(
prometheus_model_helpers,
[
create_mf/5,
gauge_metrics/1,
counter_metrics/1
]
).
-export([
deregister_cleanup/1,
collect_mf/2,
collect_metrics/2
]).
%% `emqx_prometheus' API
-export([collect/1]).
%% `emqx_prometheus_cluster' API
-export([
fetch_from_local_node/1,
fetch_cluster_consistented_data/0,
aggre_or_zip_init_acc/0,
logic_sum_metrics/0
]).
%%--------------------------------------------------------------------
%% Type definitions
%%--------------------------------------------------------------------
-define(MG(K, MAP), maps:get(K, MAP)).
-define(MG0(K, MAP), maps:get(K, MAP, 0)).
-define(metrics_data_key, message_validation_metrics_data).
-define(key_enabled, emqx_message_validation_enable).
-define(key_matched, emqx_message_validation_matched).
-define(key_failed, emqx_message_validation_failed).
-define(key_succeeded, emqx_message_validation_succeeded).
%%--------------------------------------------------------------------
%% `emqx_prometheus_cluster' API
%%--------------------------------------------------------------------
fetch_from_local_node(Mode) ->
Validations = emqx_message_validation:list(),
{node(), #{
?metrics_data_key => to_validation_data(Mode, Validations)
}}.
fetch_cluster_consistented_data() ->
#{}.
aggre_or_zip_init_acc() ->
#{
?metrics_data_key => maps:from_keys(message_validation_metric(names), [])
}.
logic_sum_metrics() ->
[
?key_enabled
].
%%--------------------------------------------------------------------
%% Collector API
%%--------------------------------------------------------------------
%% @private
deregister_cleanup(_) -> ok.
%% @private
-spec collect_mf(_Registry, Callback) -> ok when
_Registry :: prometheus_registry:registry(),
Callback :: prometheus_collector:collect_mf_callback().
collect_mf(?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY, Callback) ->
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
%% Message Validation Metrics
RuleMetricDs = ?MG(?metrics_data_key, RawData),
ok = add_collect_family(Callback, message_validation_metrics_meta(), RuleMetricDs),
ok;
collect_mf(_, _) ->
ok.
%% @private
collect(<<"json">>) ->
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
#{
message_validations => collect_json_data(?MG(?metrics_data_key, RawData))
};
collect(<<"prometheus">>) ->
prometheus_text_format:format(?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY).
%%====================
%% API Helpers
add_collect_family(Callback, MetricWithType, Data) ->
_ = [add_collect_family(Name, Data, Callback, Type) || {Name, Type} <- MetricWithType],
ok.
add_collect_family(Name, Data, Callback, Type) ->
%% TODO: help document from Name
Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)).
collect_metrics(Name, Metrics) ->
collect_mv(Name, Metrics).
%%--------------------------------------------------------------------
%% Collector
%%--------------------------------------------------------------------
%%========================================
%% Message Validation Metrics
%%========================================
collect_mv(K = ?key_enabled, Data) -> gauge_metrics(?MG(K, Data));
collect_mv(K = ?key_matched, Data) -> counter_metrics(?MG(K, Data));
collect_mv(K = ?key_failed, Data) -> counter_metrics(?MG(K, Data));
collect_mv(K = ?key_succeeded, Data) -> counter_metrics(?MG(K, Data)).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%%========================================
%% Message Validation Metrics
%%========================================
message_validation_metrics_meta() ->
[
{?key_enabled, gauge},
{?key_matched, counter},
{?key_failed, counter},
{?key_succeeded, counter}
].
message_validation_metric(names) ->
emqx_prometheus_cluster:metric_names(message_validation_metrics_meta()).
to_validation_data(Mode, Validations) ->
lists:foldl(
fun(#{name := Name} = Validation, Acc) ->
merge_acc_with_validations(Mode, Name, get_validation_metrics(Validation), Acc)
end,
maps:from_keys(message_validation_metric(names), []),
Validations
).
merge_acc_with_validations(Mode, Id, ValidationMetrics, PointsAcc) ->
maps:fold(
fun(K, V, AccIn) ->
AccIn#{K => [validation_point(Mode, Id, V) | ?MG(K, AccIn)]}
end,
PointsAcc,
ValidationMetrics
).
validation_point(Mode, Name, V) ->
{with_node_label(Mode, [{validation_name, Name}]), V}.
get_validation_metrics(#{name := Name, enable := Enabled} = _Rule) ->
#{counters := Counters} = emqx_message_validation_registry:get_metrics(Name),
#{
?key_enabled => emqx_prometheus_cluster:boolean_to_number(Enabled),
?key_matched => ?MG0('matched', Counters),
?key_failed => ?MG0('failed', Counters),
?key_succeeded => ?MG0('succeeded', Counters)
}.
%%--------------------------------------------------------------------
%% Collect functions
%%--------------------------------------------------------------------
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `application/json`
collect_json_data(Data) ->
emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_message_validation_metrics/3).
zip_json_message_validation_metrics(Key, Points, [] = _AccIn) ->
lists:foldl(
fun({Labels, Metric}, AccIn2) ->
LabelsKVMap = maps:from_list(Labels),
Point = LabelsKVMap#{Key => Metric},
[Point | AccIn2]
end,
[],
Points
);
zip_json_message_validation_metrics(Key, Points, AllResultsAcc) ->
ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
lists:zipwith(fun maps:merge/2, AllResultsAcc, ThisKeyResult).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Helper funcs
with_node_label(?PROM_DATA_MODE__NODE, Labels) ->
Labels;
with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) ->
Labels;
with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) ->
[{node, node()} | Labels].
%% END if(?EMQX_RELEASE_EDITION == ee).
-endif.

View File

@ -78,11 +78,12 @@ rule_engine {
">>).
all() ->
[
lists:flatten([
{group, '/prometheus/stats'},
{group, '/prometheus/auth'},
{group, '/prometheus/data_integration'}
].
{group, '/prometheus/data_integration'},
[{group, '/prometheus/message_validation'} || emqx_release:edition() == ee]
]).
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
@ -99,6 +100,7 @@ groups() ->
{'/prometheus/stats', ModeGroups},
{'/prometheus/auth', ModeGroups},
{'/prometheus/data_integration', ModeGroups},
{'/prometheus/message_validation', ModeGroups},
{?PROM_DATA_MODE__NODE, AcceptGroups},
{?PROM_DATA_MODE__ALL_NODES_AGGREGATED, AcceptGroups},
{?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, AcceptGroups},
@ -107,6 +109,7 @@ groups() ->
].
init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
meck:new(emqx_retainer, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_retainer, retained_count, fun() -> 0 end),
meck:expect(
@ -121,7 +124,7 @@ init_per_suite(Config) ->
application:load(emqx_auth),
Apps = emqx_cth_suite:start(
[
lists:flatten([
emqx,
{emqx_conf, ?EMQX_CONF},
emqx_auth,
@ -129,8 +132,12 @@ init_per_suite(Config) ->
emqx_rule_engine,
emqx_bridge_http,
emqx_connector,
[
{emqx_message_validation, #{config => message_validation_config()}}
|| emqx_release:edition() == ee
],
{emqx_prometheus, emqx_prometheus_SUITE:legacy_conf_default()}
],
]),
#{
work_dir => filename:join(?config(priv_dir, Config), ?MODULE)
}
@ -159,6 +166,8 @@ init_per_group('/prometheus/auth', Config) ->
[{module, emqx_prometheus_auth} | Config];
init_per_group('/prometheus/data_integration', Config) ->
[{module, emqx_prometheus_data_integration} | Config];
init_per_group('/prometheus/message_validation', Config) ->
[{module, emqx_prometheus_message_validation} | Config];
init_per_group(?PROM_DATA_MODE__NODE, Config) ->
[{mode, ?PROM_DATA_MODE__NODE} | Config];
init_per_group(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Config) ->
@ -346,6 +355,8 @@ metric_meta(<<"emqx_schema_registrys_count">>) -> ?meta(0, 0, 0);
metric_meta(<<"emqx_rule_", _Tail/binary>>) -> ?meta(1, 1, 2);
metric_meta(<<"emqx_action_", _Tail/binary>>) -> ?meta(1, 1, 2);
metric_meta(<<"emqx_connector_", _Tail/binary>>) -> ?meta(1, 1, 2);
%% `/prometheus/message_validation`
metric_meta(<<"emqx_message_validation_", _Tail/binary>>) -> ?meta(1, 1, 2);
%% normal emqx metrics
metric_meta(<<"emqx_", _Tail/binary>>) -> ?meta(0, 0, 1);
metric_meta(_) -> #{}.
@ -810,5 +821,42 @@ assert_json_data__data_integration_overview(M, _) ->
).
-endif.
assert_json_data__message_validations(Ms, _) ->
lists:foreach(
fun(M) ->
?assertMatch(
#{
validation_name := _,
emqx_message_validation_enable := _,
emqx_message_validation_matched := _,
emqx_message_validation_failed := _,
emqx_message_validation_succeeded := _
},
M
)
end,
Ms
).
message_validation_config() ->
Validation = #{
<<"enable">> => true,
<<"name">> => <<"my_validation">>,
<<"topics">> => [<<"t/#">>],
<<"strategy">> => <<"all_pass">>,
<<"failure_action">> => <<"drop">>,
<<"checks">> => [
#{
<<"type">> => <<"sql">>,
<<"sql">> => <<"select * where true">>
}
]
},
#{
<<"message_validation">> => #{
<<"validations">> => [Validation]
}
}.
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).

View File

@ -540,7 +540,7 @@ printable_function_name(Mod, Func) ->
get_rule_metrics(Id) ->
Nodes = emqx:running_nodes(),
Results = emqx_metrics_proto_v1:get_metrics(Nodes, rule_metrics, Id, ?RPC_GET_METRICS_TIMEOUT),
Results = emqx_metrics_proto_v2:get_metrics(Nodes, rule_metrics, Id, ?RPC_GET_METRICS_TIMEOUT),
NodeResults = lists:zip(Nodes, Results),
NodeMetrics = [format_metrics(Node, Metrics) || {Node, {ok, Metrics}} <- NodeResults],
NodeErrors = [Result || Result = {_Node, {NOk, _}} <- NodeResults, NOk =/= ok],

View File

@ -21,6 +21,12 @@ emqx_message_validation_http_api {
enable_disable_validation.desc:
"""Enable or disable a particular validation"""
get_validation_metrics.desc:
"""Get metrics for a particular validation"""
reset_validation_metrics.desc:
"""Reset metrics for a particular validation"""
param_path_name.desc:
"""Validation name"""

View File

@ -25,4 +25,9 @@ get_prom_data_integration_data.desc:
get_prom_data_integration_data.label:
"""Prometheus Metrics for Data Integration"""
get_prom_message_validation.desc:
"""Get Prometheus Metrics for Message Validation"""
get_prom_message_validation.label:
"""Prometheus Metrics for Message Validation"""
}

View File

@ -97,6 +97,10 @@ matrix() {
entries+=("$(format_app_entry "$app" 1 emqx "$runner")")
entries+=("$(format_app_entry "$app" 1 emqx-enterprise "$runner")")
;;
apps/emqx_prometheus)
entries+=("$(format_app_entry "$app" 1 emqx "$runner")")
entries+=("$(format_app_entry "$app" 1 emqx-enterprise "$runner")")
;;
apps/emqx_rule_engine)
entries+=("$(format_app_entry "$app" 1 emqx "$runner")")
entries+=("$(format_app_entry "$app" 1 emqx-enterprise "$runner")")