diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 11ac8f582..d2f096a4c 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -41,6 +41,7 @@ {emqx_management,4}. {emqx_management,5}. {emqx_metrics,1}. +{emqx_metrics,2}. {emqx_mgmt_api_plugins,1}. {emqx_mgmt_api_plugins,2}. {emqx_mgmt_cluster,1}. diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index 860037472..6b8b60209 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -207,6 +207,10 @@ {counter, 'messages.publish'}, % Messages dropped due to no subscribers {counter, 'messages.dropped'}, + %% % Messages that failed validations + {counter, 'messages.validation_failed'}, + %% % Messages that passed validations + {counter, 'messages.validation_succeeded'}, % QoS2 Messages expired {counter, 'messages.dropped.await_pubrel_timeout'}, % Messages dropped @@ -712,4 +716,6 @@ reserved_idx('overload_protection.delay.timeout') -> 401; reserved_idx('overload_protection.hibernation') -> 402; reserved_idx('overload_protection.gc') -> 403; reserved_idx('overload_protection.new_conn') -> 404; +reserved_idx('messages.validation_succeeded') -> 405; +reserved_idx('messages.validation_failed') -> 406; reserved_idx(_) -> undefined. diff --git a/apps/emqx/src/proto/emqx_metrics_proto_v1.erl b/apps/emqx/src/proto/emqx_metrics_proto_v1.erl index 9b9b5e9c9..cf5e5d3ba 100644 --- a/apps/emqx/src/proto/emqx_metrics_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_metrics_proto_v1.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, get_metrics/4 ]). @@ -29,6 +30,9 @@ introduced_in() -> "5.1.0". +deprecated_since() -> + "5.7.0". + -spec get_metrics( [node()], emqx_metrics_worker:handler_name(), diff --git a/apps/emqx/src/proto/emqx_metrics_proto_v2.erl b/apps/emqx/src/proto/emqx_metrics_proto_v2.erl new file mode 100644 index 000000000..b0a380e5a --- /dev/null +++ b/apps/emqx/src/proto/emqx_metrics_proto_v2.erl @@ -0,0 +1,55 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_metrics_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + get_metrics/4, + + %% introduced in v2 + reset_metrics/4 +]). + +-include("bpapi.hrl"). + +introduced_in() -> + "5.7.0". + +-spec get_metrics( + [node()], + emqx_metrics_worker:handler_name(), + emqx_metrics_worker:metric_id(), + timeout() +) -> emqx_rpc:erpc_multicall(emqx_metrics_worker:metrics()). +get_metrics(Nodes, HandlerName, MetricId, Timeout) -> + erpc:multicall(Nodes, emqx_metrics_worker, get_metrics, [HandlerName, MetricId], Timeout). + +%%-------------------------------------------------------------------------------- +%% Introduced in V2 +%%-------------------------------------------------------------------------------- + +-spec reset_metrics( + [node()], + emqx_metrics_worker:handler_name(), + emqx_metrics_worker:metric_id(), + timeout() +) -> emqx_rpc:erpc_multicall(emqx_metrics_worker:metrics()). +reset_metrics(Nodes, HandlerName, MetricId, Timeout) -> + erpc:multicall(Nodes, emqx_metrics_worker, reset_metrics, [HandlerName, MetricId], Timeout). diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index a40716b4d..e4fedfd40 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -38,6 +38,7 @@ -export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([upgrade_raw_conf/1]). +-export([tr_prometheus_collectors/1]). %% internal exports for `emqx_enterprise_schema' only. -export([ensure_unicode_path/2, convert_rotation/2, log_handler_common_confs/2]). diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index 547ecca6b..13458b4b4 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -65,6 +65,8 @@ %, received_bytes sent, %, sent_bytes + validation_succeeded, + validation_failed, dropped ]). @@ -83,6 +85,8 @@ %received_bytes => received_bytes_rate, %sent_bytes => sent_bytes_rate, sent => sent_msg_rate, + validation_succeeded => validation_succeeded_rate, + validation_failed => validation_failed_rate, dropped => dropped_msg_rate }). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index b136742d0..6a9e868dd 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -18,6 +18,7 @@ -include("emqx_dashboard.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("emqx/include/logger.hrl"). -behaviour(gen_server). @@ -186,6 +187,7 @@ handle_cast(_Request, State = #state{}) -> handle_info({sample, Time}, State = #state{last = Last}) -> Now = sample(Time), {atomic, ok} = flush(Last, Now), + ?tp(dashboard_monitor_flushed, #{}), sample_timer(), {noreply, State#state{last = Now}}; handle_info(clean_expired, State) -> @@ -424,6 +426,8 @@ stats(received) -> emqx_metrics:val('messages.received'); stats(received_bytes) -> emqx_metrics:val('bytes.received'); stats(sent) -> emqx_metrics:val('messages.sent'); stats(sent_bytes) -> emqx_metrics:val('bytes.sent'); +stats(validation_succeeded) -> emqx_metrics:val('messages.validation_succeeded'); +stats(validation_failed) -> emqx_metrics:val('messages.validation_failed'); stats(dropped) -> emqx_metrics:val('messages.dropped'). %% ------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index 7dc1e919e..3ffadc1b2 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -188,6 +188,10 @@ swagger_desc(sent_bytes) -> swagger_desc_format("Sent bytes "); swagger_desc(dropped) -> swagger_desc_format("Dropped messages "); +swagger_desc(validation_succeeded) -> + swagger_desc_format("Message validations succeeded "); +swagger_desc(validation_failed) -> + swagger_desc_format("Message validations failed "); swagger_desc(subscriptions) -> <<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>; swagger_desc(topics) -> @@ -210,6 +214,10 @@ swagger_desc(sent_msg_rate) -> %swagger_desc(sent_bytes_rate) -> swagger_desc_format("Sent bytes ", per); swagger_desc(dropped_msg_rate) -> swagger_desc_format("Dropped messages ", per); +swagger_desc(validation_succeeded_rate) -> + swagger_desc_format("Message validations succeeded ", per); +swagger_desc(validation_failed_rate) -> + swagger_desc_format("Message validations failed ", per); swagger_desc(retained_msg_count) -> <<"Retained messages count at the time of sampling.", ?APPROXIMATE_DESC>>; swagger_desc(shared_subscriptions) -> diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index 0d80f47b3..95fe2e809 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -23,6 +23,7 @@ -include("emqx_dashboard.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(SERVER, "http://127.0.0.1:18083"). @@ -54,16 +55,35 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - ok = emqx_mgmt_api_test_util:init_suite([emqx, emqx_conf, emqx_retainer]), + emqx_common_test_helpers:clear_screen(), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + {emqx_retainer, ?BASE_RETAINER_CONF}, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard( + "dashboard.listeners.http { enable = true, bind = 18083 }\n" + "dashboard.sample_interval = 1s" + ) + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), + ok. + +init_per_testcase(_TestCase, Config) -> + ok = snabbkaffe:start_trace(), + ct:timetrap({seconds, 30}), Config. -end_per_suite(_Config) -> - emqx_mgmt_api_test_util:end_suite([emqx_retainer]). - -set_special_configs(emqx_retainer) -> - emqx_retainer:update_config(?BASE_RETAINER_CONF), - ok; -set_special_configs(_App) -> +end_per_testcase(_TestCase, _Config) -> + ok = snabbkaffe:stop(), ok. %%-------------------------------------------------------------------- @@ -71,7 +91,11 @@ set_special_configs(_App) -> %%-------------------------------------------------------------------- t_monitor_samplers_all(_Config) -> - timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), + {ok, _} = + snabbkaffe:block_until( + ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}), + infinity + ), Size = mnesia:table_info(emqx_dashboard_monitor, size), All = emqx_dashboard_monitor:samplers(all, infinity), All2 = emqx_dashboard_monitor:samplers(), @@ -80,25 +104,39 @@ t_monitor_samplers_all(_Config) -> ok. t_monitor_samplers_latest(_Config) -> - timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), - Samplers = emqx_dashboard_monitor:samplers(node(), 2), - Latest = emqx_dashboard_monitor:samplers(node(), 1), - ?assert(erlang:length(Samplers) == 2), - ?assert(erlang:length(Latest) == 1), - ?assert(hd(Latest) == lists:nth(2, Samplers)), + {ok, _} = + snabbkaffe:block_until( + ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}), + infinity + ), + ?retry(1_000, 10, begin + Samplers = emqx_dashboard_monitor:samplers(node(), 2), + Latest = emqx_dashboard_monitor:samplers(node(), 1), + ?assert(erlang:length(Samplers) == 2, #{samplers => Samplers}), + ?assert(erlang:length(Latest) == 1, #{latest => Latest}), + ?assert(hd(Latest) == lists:nth(2, Samplers)) + end), ok. t_monitor_sampler_format(_Config) -> - timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), + {ok, _} = + snabbkaffe:block_until( + ?match_event(#{?snk_kind := dashboard_monitor_flushed}), + infinity + ), Latest = hd(emqx_dashboard_monitor:samplers(node(), 1)), SamplerKeys = maps:keys(Latest), [?assert(lists:member(SamplerName, SamplerKeys)) || SamplerName <- ?SAMPLER_LIST], ok. t_monitor_api(_) -> - timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), - {ok, Samplers} = request(["monitor"], "latest=20"), - ?assert(erlang:length(Samplers) >= 2), + {ok, _} = + snabbkaffe:block_until( + ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}), + infinity + ), + {ok, Samplers} = request(["monitor"], "latest=200"), + ?assert(erlang:length(Samplers) >= 2, #{samplers => Samplers}), Fun = fun(Sampler) -> Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)], @@ -110,7 +148,11 @@ t_monitor_api(_) -> ok. t_monitor_current_api(_) -> - timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), + {ok, _} = + snabbkaffe:block_until( + ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}), + infinity + ), {ok, Rate} = request(["monitor_current"]), [ ?assert(maps:is_key(atom_to_binary(Key, utf8), Rate)) @@ -210,7 +252,11 @@ t_monitor_reset(_) -> ?assert(maps:is_key(atom_to_binary(Key, utf8), Rate)) || Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST ], - timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), + {ok, _} = + snabbkaffe:block_until( + ?match_n_events(1, #{?snk_kind := dashboard_monitor_flushed}), + infinity + ), {ok, Samplers} = request(["monitor"], "latest=1"), ?assertEqual(1, erlang:length(Samplers)), ok. diff --git a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl index 8d516795c..77bdf0c02 100644 --- a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl +++ b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl @@ -111,6 +111,10 @@ fields(Name) -> translations() -> emqx_conf_schema:translations(). +translation("prometheus") -> + [ + {"collectors", fun tr_prometheus_collectors/1} + ]; translation(Name) -> emqx_conf_schema:translation(Name). @@ -189,3 +193,9 @@ audit_log_conf() -> } )} ]. + +tr_prometheus_collectors(Conf) -> + [ + {'/prometheus/message_validation', emqx_prometheus_message_validation} + | emqx_conf_schema:tr_prometheus_collectors(Conf) + ]. diff --git a/apps/emqx_message_validation/src/emqx_message_validation.erl b/apps/emqx_message_validation/src/emqx_message_validation.erl index f0fdb6dee..7e0d2fd11 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation.erl @@ -140,10 +140,13 @@ on_message_publish(Message = #message{topic = Topic, headers = Headers}) -> Validations -> case run_validations(Validations, Message) of ok -> + emqx_metrics:inc('messages.validation_succeeded'), {ok, Message}; drop -> + emqx_metrics:inc('messages.validation_failed'), {stop, Message#message{headers = Headers#{allow_publish => false}}}; disconnect -> + emqx_metrics:inc('messages.validation_failed'), {stop, Message#message{ headers = Headers#{ allow_publish => false, @@ -380,14 +383,17 @@ run_validations(Validations, Message) -> emqx_rule_runtime:clear_rule_payload(), Fun = fun(Validation, Acc) -> #{name := Name} = Validation, + emqx_message_validation_registry:inc_matched(Name), case run_validation(Validation, Message) of ok -> + emqx_message_validation_registry:inc_succeeded(Name), {cont, Acc}; ignore -> trace_failure(Validation, "validation_failed", #{ validation => Name, action => ignore }), + emqx_message_validation_registry:inc_failed(Name), run_message_validation_failed_hook(Message, Validation), {cont, Acc}; FailureAction -> @@ -395,6 +401,7 @@ run_validations(Validations, Message) -> validation => Name, action => FailureAction }), + emqx_message_validation_registry:inc_failed(Name), run_message_validation_failed_hook(Message, Validation), {halt, FailureAction} end diff --git a/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl b/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl index 272abbbcf..de99ef714 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl @@ -24,6 +24,8 @@ '/message_validations'/2, '/message_validations/reorder'/2, '/message_validations/validation/:name'/2, + '/message_validations/validation/:name/metrics'/2, + '/message_validations/validation/:name/metrics/reset'/2, '/message_validations/validation/:name/enable/:enable'/2 ]). @@ -32,6 +34,7 @@ %%------------------------------------------------------------------------------------------------- -define(TAGS, [<<"Message Validation">>]). +-define(METRIC_NAME, message_validation). %%------------------------------------------------------------------------------------------------- %% `minirest' and `minirest_trails' API @@ -47,6 +50,8 @@ paths() -> "/message_validations", "/message_validations/reorder", "/message_validations/validation/:name", + "/message_validations/validation/:name/metrics", + "/message_validations/validation/:name/metrics/reset", "/message_validations/validation/:name/enable/:enable" ]. @@ -173,6 +178,43 @@ schema("/message_validations/validation/:name") -> } } }; +schema("/message_validations/validation/:name/metrics") -> + #{ + 'operationId' => '/message_validations/validation/:name/metrics', + get => #{ + tags => ?TAGS, + summary => <<"Get validation metrics">>, + description => ?DESC("get_validation_metrics"), + parameters => [param_path_name()], + responses => + #{ + 200 => + emqx_dashboard_swagger:schema_with_examples( + ref(get_metrics), + #{ + sample => + #{value => example_return_metrics()} + } + ), + 404 => error_schema('NOT_FOUND', "Validation not found") + } + } + }; +schema("/message_validations/validation/:name/metrics/reset") -> + #{ + 'operationId' => '/message_validations/validation/:name/metrics/reset', + post => #{ + tags => ?TAGS, + summary => <<"Reset validation metrics">>, + description => ?DESC("reset_validation_metrics"), + parameters => [param_path_name()], + responses => + #{ + 204 => <<"No content">>, + 404 => error_schema('NOT_FOUND', "Validation not found") + } + } + }; schema("/message_validations/validation/:name/enable/:enable") -> #{ 'operationId' => '/message_validations/validation/:name/enable/:enable', @@ -230,6 +272,22 @@ fields(before) -> fields(reorder) -> [ {order, mk(array(binary()), #{required => true, in => body})} + ]; +fields(get_metrics) -> + [ + {metrics, mk(ref(metrics), #{})}, + {node_metrics, mk(ref(node_metrics), #{})} + ]; +fields(metrics) -> + [ + {matched, mk(non_neg_integer(), #{})}, + {succeeded, mk(non_neg_integer(), #{})}, + {failed, mk(non_neg_integer(), #{})} + ]; +fields(node_metrics) -> + [ + {node, mk(binary(), #{})} + | fields(metrics) ]. %%------------------------------------------------------------------------------------------------- @@ -299,6 +357,47 @@ fields(reorder) -> not_found() ). +'/message_validations/validation/:name/metrics'(get, #{bindings := #{name := Name}}) -> + with_validation( + Name, + fun() -> + Nodes = emqx:running_nodes(), + Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, Name, 5_000), + NodeResults = lists:zip(Nodes, Results), + NodeErrors = [Result || Result = {_Node, {NOk, _}} <- NodeResults, NOk =/= ok], + NodeErrors == [] orelse + ?SLOG(warning, #{ + msg => "rpc_get_validation_metrics_errors", + errors => NodeErrors + }), + NodeMetrics = [format_metrics(Node, Metrics) || {Node, {ok, Metrics}} <- NodeResults], + Response = #{ + metrics => aggregate_metrics(NodeMetrics), + node_metrics => NodeMetrics + }, + ?OK(Response) + end, + not_found() + ). + +'/message_validations/validation/:name/metrics/reset'(post, #{bindings := #{name := Name}}) -> + with_validation( + Name, + fun() -> + Nodes = emqx:running_nodes(), + Results = emqx_metrics_proto_v2:reset_metrics(Nodes, ?METRIC_NAME, Name, 5_000), + NodeResults = lists:zip(Nodes, Results), + NodeErrors = [Result || Result = {_Node, {NOk, _}} <- NodeResults, NOk =/= ok], + NodeErrors == [] orelse + ?SLOG(warning, #{ + msg => "rpc_reset_validation_metrics_errors", + errors => NodeErrors + }), + ?NO_CONTENT + end, + not_found() + ). + %%------------------------------------------------------------------------------------------------- %% Internal fns %%------------------------------------------------------------------------------------------------- @@ -335,6 +434,10 @@ example_return_lookup() -> %% TODO #{}. +example_return_metrics() -> + %% TODO + #{}. + error_schema(Code, Message) -> error_schema(Code, Message, _ExtraFields = []). @@ -409,3 +512,51 @@ make_serializable(Validation) -> } = hocon_tconf:make_serializable(Schema, RawConfig, #{}), Serialized. + +format_metrics(Node, #{ + counters := #{ + 'matched' := Matched, + 'succeeded' := Succeeded, + 'failed' := Failed + }, + rate := #{ + 'matched' := #{ + current := MatchedRate, + last5m := Matched5mRate, + max := MatchedMaxRate + } + } +}) -> + #{ + metrics => #{ + 'matched' => Matched, + 'succeeded' => Succeeded, + 'failed' => Failed, + rate => MatchedRate, + rate_last5m => Matched5mRate, + rate_max => MatchedMaxRate + }, + node => Node + }; +format_metrics(Node, _) -> + #{ + metrics => #{ + 'matched' => 0, + 'succeeded' => 0, + 'failed' => 0, + rate => 0, + rate_last5m => 0, + rate_max => 0 + }, + node => Node + }. + +aggregate_metrics(NodeMetrics) -> + ErrorLogger = fun(_) -> ok end, + lists:foldl( + fun(#{metrics := Metrics}, Acc) -> + emqx_utils_maps:best_effort_recursive_sum(Metrics, Acc, ErrorLogger) + end, + #{}, + NodeMetrics + ). diff --git a/apps/emqx_message_validation/src/emqx_message_validation_registry.erl b/apps/emqx_message_validation/src/emqx_message_validation_registry.erl index 6a8e4a376..fd715e750 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_registry.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation_registry.erl @@ -15,6 +15,12 @@ matching_validations/1, + %% metrics + get_metrics/1, + inc_matched/1, + inc_succeeded/1, + inc_failed/1, + start_link/0, metrics_worker_spec/0 ]). @@ -36,7 +42,7 @@ -define(METRIC_NAME, message_validation). -define(METRICS, [ 'matched', - 'passed', + 'succeeded', 'failed' ]). -define(RATE_METRICS, ['matched']). @@ -102,6 +108,22 @@ matching_validations(Topic) -> metrics_worker_spec() -> emqx_metrics_worker:child_spec(message_validation_metrics, ?METRIC_NAME). +-spec get_metrics(validation_name()) -> emqx_metrics_worker:metrics(). +get_metrics(Name) -> + emqx_metrics_worker:get_metrics(?METRIC_NAME, Name). + +-spec inc_matched(validation_name()) -> ok. +inc_matched(Name) -> + emqx_metrics_worker:inc(?METRIC_NAME, Name, 'matched'). + +-spec inc_succeeded(validation_name()) -> ok. +inc_succeeded(Name) -> + emqx_metrics_worker:inc(?METRIC_NAME, Name, 'succeeded'). + +-spec inc_failed(validation_name()) -> ok. +inc_failed(Name) -> + emqx_metrics_worker:inc(?METRIC_NAME, Name, 'failed'). + %%------------------------------------------------------------------------------ %% `gen_server' API %%------------------------------------------------------------------------------ diff --git a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl index b43c8ce35..8bd2ae647 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl +++ b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl @@ -53,6 +53,7 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> clear_all_validations(), snabbkaffe:stop(), + reset_all_global_metrics(), emqx_common_test_helpers:call_janitor(), ok. @@ -70,6 +71,14 @@ clear_all_validations() -> emqx_message_validation:list() ). +reset_all_global_metrics() -> + lists:foreach( + fun({Name, _}) -> + emqx_metrics:set(Name, 0) + end, + emqx_metrics:all() + ). + maybe_json_decode(X) -> case emqx_utils_json:safe_decode(X, [return_maps]) of {ok, Decoded} -> Decoded; @@ -196,6 +205,30 @@ disable(Name) -> ct:pal("disable result:\n ~p", [Res]), simplify_result(Res). +get_metrics(Name) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), "validation", Name, "metrics"]), + Res = request(get, Path, _Params = []), + ct:pal("get metrics result:\n ~p", [Res]), + simplify_result(Res). + +reset_metrics(Name) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), "validation", Name, "metrics", "reset"]), + Res = request(post, Path, _Params = []), + ct:pal("reset metrics result:\n ~p", [Res]), + simplify_result(Res). + +all_metrics() -> + Path = emqx_mgmt_api_test_util:api_path(["metrics"]), + Res = request(get, Path, _Params = []), + ct:pal("all metrics result:\n ~p", [Res]), + simplify_result(Res). + +monitor_metrics() -> + Path = emqx_mgmt_api_test_util:api_path(["monitor"]), + Res = request(get, Path, _Params = []), + ct:pal("monitor metrics result:\n ~p", [Res]), + simplify_result(Res). + connect(ClientId) -> connect(ClientId, _IsPersistent = false). @@ -363,6 +396,48 @@ trace_rule(Data, Envs, _Args) -> get_traced_failures_from_rule_engine() -> ets:tab2list(?RECORDED_EVENTS_TAB). +assert_all_metrics(Line, Expected) -> + Keys = maps:keys(Expected), + ?retry( + 100, + 10, + begin + Res = all_metrics(), + ?assertMatch({200, _}, Res), + {200, [Metrics]} = Res, + ?assertEqual(Expected, maps:with(Keys, Metrics), #{line => Line}) + end + ), + ok. + +-define(assertAllMetrics(Expected), assert_all_metrics(?LINE, Expected)). + +%% check that dashboard monitor contains the success and failure metric keys +assert_monitor_metrics() -> + ok = snabbkaffe:start_trace(), + %% hack: force monitor to flush data now + {_, {ok, _}} = + ?wait_async_action( + emqx_dashboard_monitor ! {sample, erlang:system_time(millisecond)}, + #{?snk_kind := dashboard_monitor_flushed} + ), + Res = monitor_metrics(), + ?assertMatch({200, _}, Res), + {200, Metrics} = Res, + lists:foreach( + fun(M) -> + ?assertMatch( + #{ + <<"validation_failed">> := _, + <<"validation_succeeded">> := _ + }, + M + ) + end, + Metrics + ), + ok. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -703,6 +778,236 @@ t_enable_disable_via_api_endpoint(_Config) -> ok. +t_metrics(_Config) -> + %% extra validation that always passes at the head to check global metrics + Name0 = <<"bar">>, + Check0 = sql_check(<<"select 1 where true">>), + Validation0 = validation(Name0, [Check0]), + {201, _} = insert(Validation0), + + Name1 = <<"foo">>, + Check1 = sql_check(<<"select payload.x as x where x > 5">>), + Validation1 = validation(Name1, [Check1]), + + %% Non existent + ?assertMatch({404, _}, get_metrics(Name1)), + ?assertAllMetrics(#{ + <<"messages.dropped">> => 0, + <<"messages.validation_failed">> => 0, + <<"messages.validation_succeeded">> => 0 + }), + + {201, _} = insert(Validation1), + + ?assertMatch( + {200, #{ + <<"metrics">> := + #{ + <<"matched">> := 0, + <<"succeeded">> := 0, + <<"failed">> := 0, + <<"rate">> := _, + <<"rate_last5m">> := _, + <<"rate_max">> := _ + }, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := #{ + <<"matched">> := 0, + <<"succeeded">> := 0, + <<"failed">> := 0, + <<"rate">> := _, + <<"rate_last5m">> := _, + <<"rate_max">> := _ + } + } + ] + }}, + get_metrics(Name1) + ), + ?assertAllMetrics(#{ + <<"messages.dropped">> => 0, + <<"messages.validation_failed">> => 0, + <<"messages.validation_succeeded">> => 0 + }), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + + ok = publish(C, <<"t/1">>, #{x => 10}), + ?assertReceive({publish, _}), + + ?retry( + 100, + 10, + ?assertMatch( + {200, #{ + <<"metrics">> := + #{ + <<"matched">> := 1, + <<"succeeded">> := 1, + <<"failed">> := 0 + }, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := #{ + <<"matched">> := 1, + <<"succeeded">> := 1, + <<"failed">> := 0 + } + } + ] + }}, + get_metrics(Name1) + ) + ), + ?assertAllMetrics(#{ + <<"messages.dropped">> => 0, + <<"messages.validation_failed">> => 0, + <<"messages.validation_succeeded">> => 1 + }), + + ok = publish(C, <<"t/1">>, #{x => 5}), + ?assertNotReceive({publish, _}), + + ?retry( + 100, + 10, + ?assertMatch( + {200, #{ + <<"metrics">> := + #{ + <<"matched">> := 2, + <<"succeeded">> := 1, + <<"failed">> := 1 + }, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := #{ + <<"matched">> := 2, + <<"succeeded">> := 1, + <<"failed">> := 1 + } + } + ] + }}, + get_metrics(Name1) + ) + ), + ?assertAllMetrics(#{ + <<"messages.dropped">> => 0, + <<"messages.validation_failed">> => 1, + <<"messages.validation_succeeded">> => 1 + }), + + ?assertMatch({204, _}, reset_metrics(Name1)), + ?retry( + 100, + 10, + ?assertMatch( + {200, #{ + <<"metrics">> := + #{ + <<"matched">> := 0, + <<"succeeded">> := 0, + <<"failed">> := 0, + <<"rate">> := _, + <<"rate_last5m">> := _, + <<"rate_max">> := _ + }, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := #{ + <<"matched">> := 0, + <<"succeeded">> := 0, + <<"failed">> := 0, + <<"rate">> := _, + <<"rate_last5m">> := _, + <<"rate_max">> := _ + } + } + ] + }}, + get_metrics(Name1) + ) + ), + ?assertAllMetrics(#{ + <<"messages.dropped">> => 0, + <<"messages.validation_failed">> => 1, + <<"messages.validation_succeeded">> => 1 + }), + + %% updating a validation resets its metrics + ok = publish(C, <<"t/1">>, #{x => 5}), + ?assertNotReceive({publish, _}), + ok = publish(C, <<"t/1">>, #{x => 10}), + ?assertReceive({publish, _}), + ?retry( + 100, + 10, + ?assertMatch( + {200, #{ + <<"metrics">> := + #{ + <<"matched">> := 2, + <<"succeeded">> := 1, + <<"failed">> := 1 + }, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := #{ + <<"matched">> := 2, + <<"succeeded">> := 1, + <<"failed">> := 1 + } + } + ] + }}, + get_metrics(Name1) + ) + ), + {200, _} = update(Validation1), + ?retry( + 100, + 10, + ?assertMatch( + {200, #{ + <<"metrics">> := + #{ + <<"matched">> := 0, + <<"succeeded">> := 0, + <<"failed">> := 0 + }, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := #{ + <<"matched">> := 0, + <<"succeeded">> := 0, + <<"failed">> := 0 + } + } + ] + }}, + get_metrics(Name1) + ) + ), + + assert_monitor_metrics(), + + ok. + t_duplicated_schema_checks(_Config) -> Name1 = <<"foo">>, SerdeName = <<"myserde">>, diff --git a/apps/emqx_prometheus/include/emqx_prometheus.hrl b/apps/emqx_prometheus/include/emqx_prometheus.hrl index 242f3c960..76b5c4669 100644 --- a/apps/emqx_prometheus/include/emqx_prometheus.hrl +++ b/apps/emqx_prometheus/include/emqx_prometheus.hrl @@ -22,12 +22,27 @@ -define(PROMETHEUS_AUTH_COLLECTOR, emqx_prometheus_auth). -define(PROMETHEUS_DATA_INTEGRATION_REGISTRY, '/prometheus/data_integration'). -define(PROMETHEUS_DATA_INTEGRATION_COLLECTOR, emqx_prometheus_data_integration). +-define(PROMETHEUS_MESSAGE_VALIDATION_REGISTRY, '/prometheus/message_validation'). +-define(PROMETHEUS_MESSAGE_VALIDATION_COLLECTOR, emqx_prometheus_message_validation). --define(PROMETHEUS_ALL_REGISTRYS, [ - ?PROMETHEUS_DEFAULT_REGISTRY, - ?PROMETHEUS_AUTH_REGISTRY, - ?PROMETHEUS_DATA_INTEGRATION_REGISTRY +-if(?EMQX_RELEASE_EDITION == ee). +-define(PROMETHEUS_EE_REGISTRIES, [ + ?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY ]). +%% ELSE if(?EMQX_RELEASE_EDITION == ee). +-else. +-define(PROMETHEUS_EE_REGISTRIES, []). +%% END if(?EMQX_RELEASE_EDITION == ee). +-endif. + +-define(PROMETHEUS_ALL_REGISTRIES, + ?PROMETHEUS_EE_REGISTRIES ++ + [ + ?PROMETHEUS_DEFAULT_REGISTRY, + ?PROMETHEUS_AUTH_REGISTRY, + ?PROMETHEUS_DATA_INTEGRATION_REGISTRY + ] +). -define(PROM_DATA_MODE__NODE, node). -define(PROM_DATA_MODE__ALL_NODES_AGGREGATED, all_nodes_aggregated). diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index f53c05414..6ee3b973f 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -43,11 +43,13 @@ namespace/0 ]). +%% handlers -export([ setting/2, stats/2, auth/2, - data_integration/2 + data_integration/2, + message_validation/2 ]). -export([lookup_from_local_nodes/3]). @@ -67,7 +69,17 @@ paths() -> "/prometheus/auth", "/prometheus/stats", "/prometheus/data_integration" - ]. + ] ++ paths_ee(). + +-if(?EMQX_RELEASE_EDITION == ee). +paths_ee() -> + ["/prometheus/message_validation"]. +%% ELSE if(?EMQX_RELEASE_EDITION == ee). +-else. +paths_ee() -> + []. +%% END if(?EMQX_RELEASE_EDITION == ee). +-endif. schema("/prometheus") -> #{ @@ -126,6 +138,19 @@ schema("/prometheus/data_integration") -> responses => #{200 => prometheus_data_schema()} } + }; +schema("/prometheus/message_validation") -> + #{ + 'operationId' => message_validation, + get => + #{ + description => ?DESC(get_prom_message_validation), + tags => ?TAGS, + parameters => [ref(mode)], + security => security(), + responses => + #{200 => prometheus_data_schema()} + } }. security() -> @@ -198,6 +223,9 @@ auth(get, #{headers := Headers, query_string := Qs}) -> data_integration(get, #{headers := Headers, query_string := Qs}) -> collect(emqx_prometheus_data_integration, collect_opts(Headers, Qs)). +message_validation(get, #{headers := Headers, query_string := Qs}) -> + collect(emqx_prometheus_message_validation, collect_opts(Headers, Qs)). + %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- diff --git a/apps/emqx_prometheus/src/emqx_prometheus_config.erl b/apps/emqx_prometheus/src/emqx_prometheus_config.erl index 51bf2b6ac..f50c0a6a3 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_config.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_config.erl @@ -123,7 +123,7 @@ all_collectors() -> prometheus_registry:collectors(Registry) ++ AccIn end, _InitAcc = [], - ?PROMETHEUS_ALL_REGISTRYS + ?PROMETHEUS_ALL_REGISTRIES ). update_push_gateway(Prometheus) -> diff --git a/apps/emqx_prometheus/src/emqx_prometheus_message_validation.erl b/apps/emqx_prometheus/src/emqx_prometheus_message_validation.erl new file mode 100644 index 000000000..eefa126da --- /dev/null +++ b/apps/emqx_prometheus/src/emqx_prometheus_message_validation.erl @@ -0,0 +1,222 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_prometheus_message_validation). + +-if(?EMQX_RELEASE_EDITION == ee). +%% for bpapi +-behaviour(emqx_prometheus_cluster). + +%% Please don't remove this attribute, prometheus uses it to +%% automatically register collectors. +-behaviour(prometheus_collector). + +-include("emqx_prometheus.hrl"). +-include_lib("prometheus/include/prometheus.hrl"). + +-import( + prometheus_model_helpers, + [ + create_mf/5, + gauge_metrics/1, + counter_metrics/1 + ] +). + +-export([ + deregister_cleanup/1, + collect_mf/2, + collect_metrics/2 +]). + +%% `emqx_prometheus' API +-export([collect/1]). + +%% `emqx_prometheus_cluster' API +-export([ + fetch_from_local_node/1, + fetch_cluster_consistented_data/0, + aggre_or_zip_init_acc/0, + logic_sum_metrics/0 +]). + +%%-------------------------------------------------------------------- +%% Type definitions +%%-------------------------------------------------------------------- + +-define(MG(K, MAP), maps:get(K, MAP)). +-define(MG0(K, MAP), maps:get(K, MAP, 0)). + +-define(metrics_data_key, message_validation_metrics_data). + +-define(key_enabled, emqx_message_validation_enable). +-define(key_matched, emqx_message_validation_matched). +-define(key_failed, emqx_message_validation_failed). +-define(key_succeeded, emqx_message_validation_succeeded). + +%%-------------------------------------------------------------------- +%% `emqx_prometheus_cluster' API +%%-------------------------------------------------------------------- + +fetch_from_local_node(Mode) -> + Validations = emqx_message_validation:list(), + {node(), #{ + ?metrics_data_key => to_validation_data(Mode, Validations) + }}. + +fetch_cluster_consistented_data() -> + #{}. + +aggre_or_zip_init_acc() -> + #{ + ?metrics_data_key => maps:from_keys(message_validation_metric(names), []) + }. + +logic_sum_metrics() -> + [ + ?key_enabled + ]. + +%%-------------------------------------------------------------------- +%% Collector API +%%-------------------------------------------------------------------- + +%% @private +deregister_cleanup(_) -> ok. + +%% @private +-spec collect_mf(_Registry, Callback) -> ok when + _Registry :: prometheus_registry:registry(), + Callback :: prometheus_collector:collect_mf_callback(). +collect_mf(?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY, Callback) -> + RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), + + %% Message Validation Metrics + RuleMetricDs = ?MG(?metrics_data_key, RawData), + ok = add_collect_family(Callback, message_validation_metrics_meta(), RuleMetricDs), + + ok; +collect_mf(_, _) -> + ok. + +%% @private +collect(<<"json">>) -> + RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), + #{ + message_validations => collect_json_data(?MG(?metrics_data_key, RawData)) + }; +collect(<<"prometheus">>) -> + prometheus_text_format:format(?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY). + +%%==================== +%% API Helpers + +add_collect_family(Callback, MetricWithType, Data) -> + _ = [add_collect_family(Name, Data, Callback, Type) || {Name, Type} <- MetricWithType], + ok. + +add_collect_family(Name, Data, Callback, Type) -> + %% TODO: help document from Name + Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)). + +collect_metrics(Name, Metrics) -> + collect_mv(Name, Metrics). + +%%-------------------------------------------------------------------- +%% Collector +%%-------------------------------------------------------------------- + +%%======================================== +%% Message Validation Metrics +%%======================================== +collect_mv(K = ?key_enabled, Data) -> gauge_metrics(?MG(K, Data)); +collect_mv(K = ?key_matched, Data) -> counter_metrics(?MG(K, Data)); +collect_mv(K = ?key_failed, Data) -> counter_metrics(?MG(K, Data)); +collect_mv(K = ?key_succeeded, Data) -> counter_metrics(?MG(K, Data)). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +%%======================================== +%% Message Validation Metrics +%%======================================== + +message_validation_metrics_meta() -> + [ + {?key_enabled, gauge}, + {?key_matched, counter}, + {?key_failed, counter}, + {?key_succeeded, counter} + ]. + +message_validation_metric(names) -> + emqx_prometheus_cluster:metric_names(message_validation_metrics_meta()). + +to_validation_data(Mode, Validations) -> + lists:foldl( + fun(#{name := Name} = Validation, Acc) -> + merge_acc_with_validations(Mode, Name, get_validation_metrics(Validation), Acc) + end, + maps:from_keys(message_validation_metric(names), []), + Validations + ). + +merge_acc_with_validations(Mode, Id, ValidationMetrics, PointsAcc) -> + maps:fold( + fun(K, V, AccIn) -> + AccIn#{K => [validation_point(Mode, Id, V) | ?MG(K, AccIn)]} + end, + PointsAcc, + ValidationMetrics + ). + +validation_point(Mode, Name, V) -> + {with_node_label(Mode, [{validation_name, Name}]), V}. + +get_validation_metrics(#{name := Name, enable := Enabled} = _Rule) -> + #{counters := Counters} = emqx_message_validation_registry:get_metrics(Name), + #{ + ?key_enabled => emqx_prometheus_cluster:boolean_to_number(Enabled), + ?key_matched => ?MG0('matched', Counters), + ?key_failed => ?MG0('failed', Counters), + ?key_succeeded => ?MG0('succeeded', Counters) + }. + +%%-------------------------------------------------------------------- +%% Collect functions +%%-------------------------------------------------------------------- + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% merge / zip formatting funcs for type `application/json` + +collect_json_data(Data) -> + emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_message_validation_metrics/3). + +zip_json_message_validation_metrics(Key, Points, [] = _AccIn) -> + lists:foldl( + fun({Labels, Metric}, AccIn2) -> + LabelsKVMap = maps:from_list(Labels), + Point = LabelsKVMap#{Key => Metric}, + [Point | AccIn2] + end, + [], + Points + ); +zip_json_message_validation_metrics(Key, Points, AllResultsAcc) -> + ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points), + lists:zipwith(fun maps:merge/2, AllResultsAcc, ThisKeyResult). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Helper funcs + +with_node_label(?PROM_DATA_MODE__NODE, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) -> + [{node, node()} | Labels]. + +%% END if(?EMQX_RELEASE_EDITION == ee). +-endif. diff --git a/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl b/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl index 72cbf8f96..c7b48db57 100644 --- a/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl +++ b/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl @@ -78,11 +78,12 @@ rule_engine { ">>). all() -> - [ + lists:flatten([ {group, '/prometheus/stats'}, {group, '/prometheus/auth'}, - {group, '/prometheus/data_integration'} - ]. + {group, '/prometheus/data_integration'}, + [{group, '/prometheus/message_validation'} || emqx_release:edition() == ee] + ]). groups() -> TCs = emqx_common_test_helpers:all(?MODULE), @@ -99,6 +100,7 @@ groups() -> {'/prometheus/stats', ModeGroups}, {'/prometheus/auth', ModeGroups}, {'/prometheus/data_integration', ModeGroups}, + {'/prometheus/message_validation', ModeGroups}, {?PROM_DATA_MODE__NODE, AcceptGroups}, {?PROM_DATA_MODE__ALL_NODES_AGGREGATED, AcceptGroups}, {?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, AcceptGroups}, @@ -107,6 +109,7 @@ groups() -> ]. init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), meck:new(emqx_retainer, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_retainer, retained_count, fun() -> 0 end), meck:expect( @@ -121,7 +124,7 @@ init_per_suite(Config) -> application:load(emqx_auth), Apps = emqx_cth_suite:start( - [ + lists:flatten([ emqx, {emqx_conf, ?EMQX_CONF}, emqx_auth, @@ -129,8 +132,12 @@ init_per_suite(Config) -> emqx_rule_engine, emqx_bridge_http, emqx_connector, + [ + {emqx_message_validation, #{config => message_validation_config()}} + || emqx_release:edition() == ee + ], {emqx_prometheus, emqx_prometheus_SUITE:legacy_conf_default()} - ], + ]), #{ work_dir => filename:join(?config(priv_dir, Config), ?MODULE) } @@ -159,6 +166,8 @@ init_per_group('/prometheus/auth', Config) -> [{module, emqx_prometheus_auth} | Config]; init_per_group('/prometheus/data_integration', Config) -> [{module, emqx_prometheus_data_integration} | Config]; +init_per_group('/prometheus/message_validation', Config) -> + [{module, emqx_prometheus_message_validation} | Config]; init_per_group(?PROM_DATA_MODE__NODE, Config) -> [{mode, ?PROM_DATA_MODE__NODE} | Config]; init_per_group(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Config) -> @@ -346,6 +355,8 @@ metric_meta(<<"emqx_schema_registrys_count">>) -> ?meta(0, 0, 0); metric_meta(<<"emqx_rule_", _Tail/binary>>) -> ?meta(1, 1, 2); metric_meta(<<"emqx_action_", _Tail/binary>>) -> ?meta(1, 1, 2); metric_meta(<<"emqx_connector_", _Tail/binary>>) -> ?meta(1, 1, 2); +%% `/prometheus/message_validation` +metric_meta(<<"emqx_message_validation_", _Tail/binary>>) -> ?meta(1, 1, 2); %% normal emqx metrics metric_meta(<<"emqx_", _Tail/binary>>) -> ?meta(0, 0, 1); metric_meta(_) -> #{}. @@ -810,5 +821,42 @@ assert_json_data__data_integration_overview(M, _) -> ). -endif. +assert_json_data__message_validations(Ms, _) -> + lists:foreach( + fun(M) -> + ?assertMatch( + #{ + validation_name := _, + emqx_message_validation_enable := _, + emqx_message_validation_matched := _, + emqx_message_validation_failed := _, + emqx_message_validation_succeeded := _ + }, + M + ) + end, + Ms + ). + +message_validation_config() -> + Validation = #{ + <<"enable">> => true, + <<"name">> => <<"my_validation">>, + <<"topics">> => [<<"t/#">>], + <<"strategy">> => <<"all_pass">>, + <<"failure_action">> => <<"drop">>, + <<"checks">> => [ + #{ + <<"type">> => <<"sql">>, + <<"sql">> => <<"select * where true">> + } + ] + }, + #{ + <<"message_validation">> => #{ + <<"validations">> => [Validation] + } + }. + stop_apps(Apps) -> lists:foreach(fun application:stop/1, Apps). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 3497e40a6..354e40c5f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -540,7 +540,7 @@ printable_function_name(Mod, Func) -> get_rule_metrics(Id) -> Nodes = emqx:running_nodes(), - Results = emqx_metrics_proto_v1:get_metrics(Nodes, rule_metrics, Id, ?RPC_GET_METRICS_TIMEOUT), + Results = emqx_metrics_proto_v2:get_metrics(Nodes, rule_metrics, Id, ?RPC_GET_METRICS_TIMEOUT), NodeResults = lists:zip(Nodes, Results), NodeMetrics = [format_metrics(Node, Metrics) || {Node, {ok, Metrics}} <- NodeResults], NodeErrors = [Result || Result = {_Node, {NOk, _}} <- NodeResults, NOk =/= ok], diff --git a/rel/i18n/emqx_message_validation_http_api.hocon b/rel/i18n/emqx_message_validation_http_api.hocon index 69a8ed78d..439522854 100644 --- a/rel/i18n/emqx_message_validation_http_api.hocon +++ b/rel/i18n/emqx_message_validation_http_api.hocon @@ -21,6 +21,12 @@ emqx_message_validation_http_api { enable_disable_validation.desc: """Enable or disable a particular validation""" + get_validation_metrics.desc: + """Get metrics for a particular validation""" + + reset_validation_metrics.desc: + """Reset metrics for a particular validation""" + param_path_name.desc: """Validation name""" diff --git a/rel/i18n/emqx_prometheus_api.hocon b/rel/i18n/emqx_prometheus_api.hocon index 0c48e3add..b910f1d62 100644 --- a/rel/i18n/emqx_prometheus_api.hocon +++ b/rel/i18n/emqx_prometheus_api.hocon @@ -25,4 +25,9 @@ get_prom_data_integration_data.desc: get_prom_data_integration_data.label: """Prometheus Metrics for Data Integration""" +get_prom_message_validation.desc: +"""Get Prometheus Metrics for Message Validation""" +get_prom_message_validation.label: +"""Prometheus Metrics for Message Validation""" + } diff --git a/scripts/find-apps.sh b/scripts/find-apps.sh index 89f0a66e5..565c60449 100755 --- a/scripts/find-apps.sh +++ b/scripts/find-apps.sh @@ -97,6 +97,10 @@ matrix() { entries+=("$(format_app_entry "$app" 1 emqx "$runner")") entries+=("$(format_app_entry "$app" 1 emqx-enterprise "$runner")") ;; + apps/emqx_prometheus) + entries+=("$(format_app_entry "$app" 1 emqx "$runner")") + entries+=("$(format_app_entry "$app" 1 emqx-enterprise "$runner")") + ;; apps/emqx_rule_engine) entries+=("$(format_app_entry "$app" 1 emqx "$runner")") entries+=("$(format_app_entry "$app" 1 emqx-enterprise "$runner")")