From f690db9849260f2731b5b9ec6cf8ee8f89eea5b2 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 10 May 2024 12:12:11 -0300 Subject: [PATCH 1/8] fix(postgres): reduce log spamming when connection goes down Fixes https://emqx.atlassian.net/browse/EMQX-12334 See also: https://github.com/emqx/epgsql/pull/10 --- apps/emqx_postgresql/rebar.config | 2 +- changes/ee/fix-13018.en.md | 1 + mix.exs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 changes/ee/fix-13018.en.md diff --git a/apps/emqx_postgresql/rebar.config b/apps/emqx_postgresql/rebar.config index 8568e49fe..120d432fc 100644 --- a/apps/emqx_postgresql/rebar.config +++ b/apps/emqx_postgresql/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7.1.1"}}}, + {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7.1.2"}}}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}} ]}. diff --git a/changes/ee/fix-13018.en.md b/changes/ee/fix-13018.en.md new file mode 100644 index 000000000..e52600894 --- /dev/null +++ b/changes/ee/fix-13018.en.md @@ -0,0 +1 @@ +Reduced log spamming when connection goes down in a Postgres/Timescale/Matrix connector. diff --git a/mix.exs b/mix.exs index edcef686d..d5bb83e26 100644 --- a/mix.exs +++ b/mix.exs @@ -79,7 +79,7 @@ defmodule EMQXUmbrella.MixProject do # in conflict by ehttpc and emqtt {:gun, github: "emqx/gun", tag: "1.3.11", override: true}, # in conflict by emqx_connector and system_monitor - {:epgsql, github: "emqx/epgsql", tag: "4.7.1.1", override: true}, + {:epgsql, github: "emqx/epgsql", tag: "4.7.1.2", override: true}, # in conflict by emqx and observer_cli {:recon, github: "ferd/recon", tag: "2.5.1", override: true}, {:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true}, From 921d82f0835d19ec1ffb2692c0edf1c5f1b98311 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 13 May 2024 12:09:28 +0200 Subject: [PATCH 2/8] fix: JSON trace formatter should handle report style log entries Fixes: https://emqx.atlassian.net/browse/EMQX-12344 --- apps/emqx/src/emqx_logger_jsonfmt.erl | 3 ++ .../emqx_trace/emqx_trace_json_formatter.erl | 41 ++++++++++++++++++- .../test/emqx_mgmt_api_trace_SUITE.erl | 15 +++++++ 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_logger_jsonfmt.erl b/apps/emqx/src/emqx_logger_jsonfmt.erl index 856b6111c..776c8f753 100644 --- a/apps/emqx/src/emqx_logger_jsonfmt.erl +++ b/apps/emqx/src/emqx_logger_jsonfmt.erl @@ -34,6 +34,9 @@ %% For CLI HTTP API outputs -export([best_effort_json/1, best_effort_json/2, best_effort_json_obj/1]). +%% For emqx_trace_json_formatter +-export([format_msg/3]). + -ifdef(TEST). -include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl index 82c5a31ee..3c3f9b2a4 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -23,6 +23,8 @@ %% logger_formatter:config/0 is not exported. -type config() :: map(). +-define(DEFAULT_FORMATTER, fun logger:format_otp_report/1). + %%%----------------------------------------------------------------- %%% Callback Function %%%----------------------------------------------------------------- @@ -31,9 +33,10 @@ LogEvent :: logger:log_event(), Config :: config(). format( - LogMap0, - #{payload_encode := PEncode} + LogMap, + #{payload_encode := PEncode} = Config ) -> + LogMap0 = maybe_format_msg(LogMap, Config), LogMap1 = emqx_trace_formatter:evaluate_lazy_values(LogMap0), %% We just make some basic transformations on the input LogMap and then do %% an external call to create the JSON text @@ -46,6 +49,40 @@ format( %%% Helper Functions %%%----------------------------------------------------------------- +maybe_format_msg(#{msg := Msg, meta := Meta} = LogMap, Config) -> + try do_maybe_format_msg(Msg, Meta, Config) of + Map when is_map(Map) -> + LogMap#{meta => maps:merge(Meta, Map), msg => maps:get(msg, Map, "no_message")}; + Bin when is_binary(Bin) -> + LogMap#{msg => Bin} + catch + C:R:S -> + Meta#{ + amsg => Msg, + msg => "emqx_logger_jsonfmt_format_error", + fmt_raw_input => Msg, + fmt_error => C, + fmt_reason => R, + fmt_stacktrace => S, + more => {Msg, Meta, Config} + } + end. + +do_maybe_format_msg(String, _Meta, _Config) when is_list(String); is_binary(String) -> + unicode:characters_to_binary(String); +do_maybe_format_msg(undefined, _Meta, _Config) -> + #{}; +do_maybe_format_msg({report, Report} = Msg, #{report_cb := Cb} = Meta, Config) -> + case is_map(Report) andalso Cb =:= ?DEFAULT_FORMATTER of + true -> + %% reporting a map without a customised format function + Report; + false -> + emqx_logger_jsonfmt:format_msg(Msg, Meta, Config) + end; +do_maybe_format_msg(Msg, Meta, Config) -> + emqx_logger_jsonfmt:format_msg(Msg, Meta, Config). + prepare_log_map(LogMap, PEncode) -> NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)], maps:from_list(NewKeyValuePairs). diff --git a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl index 4daf1c51a..2a3e1d18a 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl @@ -269,6 +269,8 @@ t_http_test_json_formatter(_Config) -> action_id => <<"action:http:emqx_bridge_http_test_lib:connector:http:emqx_bridge_http_test_lib">> }), + %% We should handle report style logging + ?SLOG(error, #{msg => "recursive_republish_detected"}, #{topic => Topic}), ok = emqx_trace_handler_SUITE:filesync(Name, topic), {ok, _Detail2} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")), {ok, Bin} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")), @@ -410,6 +412,19 @@ t_http_test_json_formatter(_Config) -> }, NextFun() ), + ?assertMatch( + #{ + <<"level">> := <<"error">>, + <<"meta">> := + #{ + <<"msg">> := <<"recursive_republish_detected">>, + <<"topic">> := <<"/x/y/z">> + }, + <<"msg">> := <<"recursive_republish_detected">>, + <<"time">> := _ + }, + NextFun() + ), {ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))), ?assertEqual(<<>>, Delete), From a2a5fd1f8e8efd78b11a06bea33aac5486551711 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 13 May 2024 15:48:20 -0300 Subject: [PATCH 3/8] test(client mgmt api): attempt to fix flaky test ``` %%% emqx_mgmt_api_clients_SUITE ==> msgs_plain_encoding.t_mqueue_messages: FAILED %%% emqx_mgmt_api_clients_SUITE ==> Failure/Error: ?assertEqual(79, Count) expected: 79 got: 100 line: 1303 ``` --- .../test/emqx_mgmt_api_clients_SUITE.erl | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index c354552f9..ecf28d3cd 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -1193,7 +1193,7 @@ t_mqueue_messages(Config) -> ClientId = atom_to_binary(?FUNCTION_NAME), Topic = <<"t/test_mqueue_msgs">>, Count = emqx_mgmt:default_row_limit(), - {ok, _Client} = client_with_mqueue(ClientId, Topic, Count), + ok = client_with_mqueue(ClientId, Topic, Count), Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]), ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), @@ -1244,14 +1244,16 @@ client_with_mqueue(ClientId, Topic, Count) -> {ok, Client} = emqtt:start_link([ {proto_ver, v5}, {clientid, ClientId}, - {clean_start, false}, + {clean_start, true}, {properties, #{'Session-Expiry-Interval' => 120}} ]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, Topic, 1), + ct:sleep(300), ok = emqtt:disconnect(Client), + ct:sleep(100), publish_msgs(Topic, Count), - {ok, Client}. + ok. client_with_inflight(ClientId, Topic, Count) -> {ok, Client} = emqtt:start_link([ @@ -1275,13 +1277,18 @@ publish_msgs(Topic, Count) -> test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) -> Qs0 = io_lib:format("payload=~s", [PayloadEncoding]), - {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader), - #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp), - #{<<"start">> := StartPos, <<"position">> := Pos} = Meta, - ?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)), - ?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)), - ?assertEqual(length(Msgs), Count), + {Msgs, StartPos, Pos} = ?retry(500, 10, begin + {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader), + #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp), + #{<<"start">> := StartPos, <<"position">> := Pos} = Meta, + + ?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)), + ?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)), + ?assertEqual(length(Msgs), Count), + + {Msgs, StartPos, Pos} + end), lists:foreach( fun({Seq, #{<<"payload">> := P} = M}) -> From 66b7ac4c45d833f25ce688402f1309096f7eea45 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 13 May 2024 21:25:00 +0200 Subject: [PATCH 4/8] fix(sessds): persist $SYS messages as well Otherwise, persistent sessions will not be able to receive $SYS messages whatsoever. --- apps/emqx/src/emqx_persistent_message.erl | 2 +- .../test/emqx_persistent_session_SUITE.erl | 53 ++++++++++++++++--- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index c909c5c5f..dc991619b 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -110,7 +110,7 @@ persist(Msg) -> ). needs_persistence(Msg) -> - not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)). + not emqx_message:get_flag(dup, Msg). -spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result(). store_message(Msg) -> diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 674e1a8d9..f0b783250 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -27,6 +27,7 @@ -compile(nowarn_export_all). -define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). +-define(EMQX_CONFIG, "sys_topics.sys_heartbeat_interval = 1s\n"). %%-------------------------------------------------------------------- %% SUITE boilerplate @@ -66,19 +67,20 @@ groups() -> init_per_group(persistence_disabled, Config) -> [ - {emqx_config, "session_persistence { enable = false }"}, + {emqx_config, ?EMQX_CONFIG ++ "session_persistence { enable = false }"}, {persistence, false} | Config ]; init_per_group(persistence_enabled, Config) -> [ {emqx_config, - "session_persistence {\n" - " enable = true\n" - " last_alive_update_interval = 100ms\n" - " renew_streams_interval = 100ms\n" - " session_gc_interval = 2s\n" - "}"}, + ?EMQX_CONFIG ++ + "session_persistence {\n" + " enable = true\n" + " last_alive_update_interval = 100ms\n" + " renew_streams_interval = 100ms\n" + " session_gc_interval = 2s\n" + "}"}, {persistence, ds} | Config ]; @@ -1334,6 +1336,43 @@ do_t_will_message(Config, Opts) -> ), ok. +t_sys_message_delivery(Config) -> + ConnFun = ?config(conn_fun, Config), + SysTopicFilter = emqx_topic:join(["$SYS", "brokers", '+', "uptime"]), + SysTopic = emqx_topic:join(["$SYS", "brokers", atom_to_list(node()), "uptime"]), + ClientId = ?config(client_id, Config), + + {ok, Client1} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [1]} = emqtt:subscribe(Client1, SysTopicFilter, [{qos, 1}, {rh, 2}]), + ?assertMatch( + [ + #{topic := SysTopic, qos := 0, retain := false, payload := _Uptime1}, + #{topic := SysTopic, qos := 0, retain := false, payload := _Uptime2} + ], + receive_messages(2) + ), + + ok = emqtt:disconnect(Client1), + + {ok, Client2} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client2), + ?assertMatch( + [#{topic := SysTopic, qos := 0, retain := false, payload := _Uptime3}], + receive_messages(1) + ). + get_topicwise_order(Msgs) -> maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs). From ac3f5a083dc77ff1d61019076fdc4061f57b0378 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 13 May 2024 21:57:19 +0200 Subject: [PATCH 5/8] test: Reduce log spam in the failed test suites --- apps/emqx/test/emqx_takeover_SUITE.erl | 2 -- apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl | 7 ------- 2 files changed, 9 deletions(-) diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index c9fdeed51..b9fce3b7f 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -78,7 +78,6 @@ init_per_group(persistence_enabled = Group, Config) -> ], #{work_dir => emqx_cth_suite:work_dir(Group, Config)} ), - emqx_logger:set_log_level(debug), [ {apps, Apps}, {persistence_enabled, true} @@ -89,7 +88,6 @@ init_per_group(persistence_disabled = Group, Config) -> [{emqx, "session_persistence.enable = false"}], #{work_dir => emqx_cth_suite:work_dir(Group, Config)} ), - emqx_logger:set_log_level(debug), [ {apps, Apps}, {persistence_enabled, false} diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index d9a0321b0..fdba6335f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -339,13 +339,6 @@ get_streams(Shard, TopicFilter, StartTime) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), - ?tp(get_streams_get_gen_topic, #{ - gen_id => GenId, - topic => TopicFilter, - start_time => StartTime, - streams => Streams, - gen_data => GenData - }), [ {GenId, ?stream_v2(GenId, InnerStream)} || InnerStream <- Streams From c10a48f4be61a43c2d9e86055ec5e1d3444a148d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 14 May 2024 10:49:21 +0200 Subject: [PATCH 6/8] fix(s3-aggreg): disambiguate action schema namespace Otherwise schemas for `s3` and `s3_aggregated_upload` collide when projected into OpenAPI schema. --- apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl index 02d43f4f0..cea54f71f 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl @@ -35,7 +35,7 @@ %%------------------------------------------------------------------------------------------------- namespace() -> - "bridge_s3". + "bridge_s3_aggreg_upload". roots() -> []. From f75c7a5cea73c989a34b3e0c90617c90c5cec33f Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 14 May 2024 13:38:41 +0200 Subject: [PATCH 7/8] fix(trace log entry error): better structure and naming --- apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl index 3c3f9b2a4..706956c3a 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -58,13 +58,15 @@ maybe_format_msg(#{msg := Msg, meta := Meta} = LogMap, Config) -> catch C:R:S -> Meta#{ - amsg => Msg, msg => "emqx_logger_jsonfmt_format_error", fmt_raw_input => Msg, fmt_error => C, fmt_reason => R, fmt_stacktrace => S, - more => {Msg, Meta, Config} + more => #{ + original_log_entry => LogMap, + config => Config + } } end. From 257dae521130fb276625a8edc9658224878f44cf Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 14 May 2024 10:13:39 -0300 Subject: [PATCH 8/8] refactor: rename `message_validation` to `schema_validation` Request from Product team. Fixes https://emqx.atlassian.net/browse/EMQX-12383 --- apps/emqx/src/emqx_hookpoints.erl | 4 +- apps/emqx/test/emqx_cth_suite.erl | 4 +- .../src/emqx_dashboard_monitor_api.erl | 8 +- .../src/emqx_enterprise_schema.erl | 4 +- apps/emqx_machine/priv/reboot_lists.eterm | 2 +- .../src/emqx_message_validation.app.src | 14 --- .../include/emqx_prometheus.hrl | 6 +- .../src/emqx_prometheus_api.erl | 14 +-- ... => emqx_prometheus_schema_validation.erl} | 46 ++++---- .../test/emqx_prometheus_data_SUITE.erl | 30 ++--- .../emqx_rule_engine/src/emqx_rule_events.erl | 34 +++--- .../src/emqx_rule_runtime.erl | 2 +- .../BSL.txt | 0 .../README.md | 6 +- .../rebar.config | 0 .../src/emqx_schema_validation.app.src | 14 +++ .../src/emqx_schema_validation.erl} | 40 +++---- .../src/emqx_schema_validation_app.erl} | 16 +-- .../src/emqx_schema_validation_http_api.erl} | 110 +++++++++--------- .../src/emqx_schema_validation_registry.erl} | 12 +- .../src/emqx_schema_validation_schema.erl} | 10 +- .../src/emqx_schema_validation_sup.erl} | 6 +- ...emqx_schema_validation_http_api_SUITE.erl} | 24 ++-- .../test/emqx_schema_validation_tests.erl} | 30 ++--- mix.exs | 2 +- rebar.config.erl | 2 +- rel/i18n/emqx_prometheus_api.hocon | 8 +- ... => emqx_schema_validation_http_api.hocon} | 2 +- ...on => emqx_schema_validation_schema.hocon} | 2 +- 29 files changed, 226 insertions(+), 226 deletions(-) delete mode 100644 apps/emqx_message_validation/src/emqx_message_validation.app.src rename apps/emqx_prometheus/src/{emqx_prometheus_message_validation.erl => emqx_prometheus_schema_validation.erl} (81%) rename apps/{emqx_message_validation => emqx_schema_validation}/BSL.txt (100%) rename apps/{emqx_message_validation => emqx_schema_validation}/README.md (90%) rename apps/{emqx_message_validation => emqx_schema_validation}/rebar.config (100%) create mode 100644 apps/emqx_schema_validation/src/emqx_schema_validation.app.src rename apps/{emqx_message_validation/src/emqx_message_validation.erl => emqx_schema_validation/src/emqx_schema_validation.erl} (91%) rename apps/{emqx_message_validation/src/emqx_message_validation_app.erl => emqx_schema_validation/src/emqx_schema_validation_app.erl} (67%) rename apps/{emqx_message_validation/src/emqx_message_validation_http_api.erl => emqx_schema_validation/src/emqx_schema_validation_http_api.erl} (83%) rename apps/{emqx_message_validation/src/emqx_message_validation_registry.erl => emqx_schema_validation/src/emqx_schema_validation_registry.erl} (95%) rename apps/{emqx_message_validation/src/emqx_message_validation_schema.erl => emqx_schema_validation/src/emqx_schema_validation_schema.erl} (97%) rename apps/{emqx_message_validation/src/emqx_message_validation_sup.erl => emqx_schema_validation/src/emqx_schema_validation_sup.erl} (88%) rename apps/{emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl => emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl} (97%) rename apps/{emqx_message_validation/test/emqx_message_validation_tests.erl => emqx_schema_validation/test/emqx_schema_validation_tests.erl} (92%) rename rel/i18n/{emqx_message_validation_http_api.hocon => emqx_schema_validation_http_api.hocon} (95%) rename rel/i18n/{emqx_message_validation_schema.hocon => emqx_schema_validation_schema.hocon} (98%) diff --git a/apps/emqx/src/emqx_hookpoints.erl b/apps/emqx/src/emqx_hookpoints.erl index 1f7b6481b..0fcf76f3f 100644 --- a/apps/emqx/src/emqx_hookpoints.erl +++ b/apps/emqx/src/emqx_hookpoints.erl @@ -59,7 +59,7 @@ 'message.publish', 'message.puback', 'message.dropped', - 'message.validation_failed', + 'schema.validation_failed', 'message.delivered', 'message.acked', 'delivery.dropped', @@ -183,7 +183,7 @@ when -callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) -> callback_result(). --callback 'message.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) -> +-callback 'schema.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) -> callback_result(). -callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 65b4364d0..cf52afce1 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -383,8 +383,8 @@ default_appspec(emqx_dashboard, _SuiteOpts) -> }; default_appspec(emqx_schema_registry, _SuiteOpts) -> #{schema_mod => emqx_schema_registry_schema, config => #{}}; -default_appspec(emqx_message_validation, _SuiteOpts) -> - #{schema_mod => emqx_message_validation_schema, config => #{}}; +default_appspec(emqx_schema_validation, _SuiteOpts) -> + #{schema_mod => emqx_schema_validation_schema, config => #{}}; default_appspec(_, _) -> #{}. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index 1b6773b87..9d9b095f0 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -189,9 +189,9 @@ swagger_desc(sent_bytes) -> swagger_desc(dropped) -> swagger_desc_format("Dropped messages "); swagger_desc(validation_succeeded) -> - swagger_desc_format("Message validations succeeded "); + swagger_desc_format("Schema validations succeeded "); swagger_desc(validation_failed) -> - swagger_desc_format("Message validations failed "); + swagger_desc_format("Schema validations failed "); swagger_desc(persisted) -> swagger_desc_format("Messages saved to the durable storage "); swagger_desc(subscriptions) -> @@ -217,9 +217,9 @@ swagger_desc(sent_msg_rate) -> swagger_desc(dropped_msg_rate) -> swagger_desc_format("Dropped messages ", per); swagger_desc(validation_succeeded_rate) -> - swagger_desc_format("Message validations succeeded ", per); + swagger_desc_format("Schema validations succeeded ", per); swagger_desc(validation_failed_rate) -> - swagger_desc_format("Message validations failed ", per); + swagger_desc_format("Schema validations failed ", per); swagger_desc(persisted_rate) -> swagger_desc_format("Messages saved to the durable storage ", per); swagger_desc(retained_msg_count) -> diff --git a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl index 77bdf0c02..909fb4109 100644 --- a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl +++ b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl @@ -15,7 +15,7 @@ -define(EE_SCHEMA_MODULES, [ emqx_license_schema, emqx_schema_registry_schema, - emqx_message_validation_schema, + emqx_schema_validation_schema, emqx_ft_schema ]). @@ -196,6 +196,6 @@ audit_log_conf() -> tr_prometheus_collectors(Conf) -> [ - {'/prometheus/message_validation', emqx_prometheus_message_validation} + {'/prometheus/schema_validation', emqx_prometheus_schema_validation} | emqx_conf_schema:tr_prometheus_collectors(Conf) ]. diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index 6b04e71f6..8d5f83698 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -88,7 +88,7 @@ [ emqx_license, emqx_enterprise, - emqx_message_validation, + emqx_schema_validation, emqx_connector_aggregator, emqx_bridge_kafka, emqx_bridge_pulsar, diff --git a/apps/emqx_message_validation/src/emqx_message_validation.app.src b/apps/emqx_message_validation/src/emqx_message_validation.app.src deleted file mode 100644 index 077f839a9..000000000 --- a/apps/emqx_message_validation/src/emqx_message_validation.app.src +++ /dev/null @@ -1,14 +0,0 @@ -{application, emqx_message_validation, [ - {description, "EMQX Message Validation"}, - {vsn, "0.1.0"}, - {registered, [emqx_message_validation_sup, emqx_message_validation_registry]}, - {mod, {emqx_message_validation_app, []}}, - {applications, [ - kernel, - stdlib - ]}, - {env, []}, - {modules, []}, - - {links, []} -]}. diff --git a/apps/emqx_prometheus/include/emqx_prometheus.hrl b/apps/emqx_prometheus/include/emqx_prometheus.hrl index 76b5c4669..32ba2a8e2 100644 --- a/apps/emqx_prometheus/include/emqx_prometheus.hrl +++ b/apps/emqx_prometheus/include/emqx_prometheus.hrl @@ -22,12 +22,12 @@ -define(PROMETHEUS_AUTH_COLLECTOR, emqx_prometheus_auth). -define(PROMETHEUS_DATA_INTEGRATION_REGISTRY, '/prometheus/data_integration'). -define(PROMETHEUS_DATA_INTEGRATION_COLLECTOR, emqx_prometheus_data_integration). --define(PROMETHEUS_MESSAGE_VALIDATION_REGISTRY, '/prometheus/message_validation'). --define(PROMETHEUS_MESSAGE_VALIDATION_COLLECTOR, emqx_prometheus_message_validation). +-define(PROMETHEUS_SCHEMA_VALIDATION_REGISTRY, '/prometheus/schema_validation'). +-define(PROMETHEUS_SCHEMA_VALIDATION_COLLECTOR, emqx_prometheus_schema_validation). -if(?EMQX_RELEASE_EDITION == ee). -define(PROMETHEUS_EE_REGISTRIES, [ - ?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY + ?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY ]). %% ELSE if(?EMQX_RELEASE_EDITION == ee). -else. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index 6ee3b973f..f9e499d82 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -49,7 +49,7 @@ stats/2, auth/2, data_integration/2, - message_validation/2 + schema_validation/2 ]). -export([lookup_from_local_nodes/3]). @@ -73,7 +73,7 @@ paths() -> -if(?EMQX_RELEASE_EDITION == ee). paths_ee() -> - ["/prometheus/message_validation"]. + ["/prometheus/schema_validation"]. %% ELSE if(?EMQX_RELEASE_EDITION == ee). -else. paths_ee() -> @@ -139,12 +139,12 @@ schema("/prometheus/data_integration") -> #{200 => prometheus_data_schema()} } }; -schema("/prometheus/message_validation") -> +schema("/prometheus/schema_validation") -> #{ - 'operationId' => message_validation, + 'operationId' => schema_validation, get => #{ - description => ?DESC(get_prom_message_validation), + description => ?DESC(get_prom_schema_validation), tags => ?TAGS, parameters => [ref(mode)], security => security(), @@ -223,8 +223,8 @@ auth(get, #{headers := Headers, query_string := Qs}) -> data_integration(get, #{headers := Headers, query_string := Qs}) -> collect(emqx_prometheus_data_integration, collect_opts(Headers, Qs)). -message_validation(get, #{headers := Headers, query_string := Qs}) -> - collect(emqx_prometheus_message_validation, collect_opts(Headers, Qs)). +schema_validation(get, #{headers := Headers, query_string := Qs}) -> + collect(emqx_prometheus_schema_validation, collect_opts(Headers, Qs)). %%-------------------------------------------------------------------- %% Internal funcs diff --git a/apps/emqx_prometheus/src/emqx_prometheus_message_validation.erl b/apps/emqx_prometheus/src/emqx_prometheus_schema_validation.erl similarity index 81% rename from apps/emqx_prometheus/src/emqx_prometheus_message_validation.erl rename to apps/emqx_prometheus/src/emqx_prometheus_schema_validation.erl index eefa126da..b0b477273 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_message_validation.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_schema_validation.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_prometheus_message_validation). +-module(emqx_prometheus_schema_validation). -if(?EMQX_RELEASE_EDITION == ee). %% for bpapi @@ -48,19 +48,19 @@ -define(MG(K, MAP), maps:get(K, MAP)). -define(MG0(K, MAP), maps:get(K, MAP, 0)). --define(metrics_data_key, message_validation_metrics_data). +-define(metrics_data_key, schema_validation_metrics_data). --define(key_enabled, emqx_message_validation_enable). --define(key_matched, emqx_message_validation_matched). --define(key_failed, emqx_message_validation_failed). --define(key_succeeded, emqx_message_validation_succeeded). +-define(key_enabled, emqx_schema_validation_enable). +-define(key_matched, emqx_schema_validation_matched). +-define(key_failed, emqx_schema_validation_failed). +-define(key_succeeded, emqx_schema_validation_succeeded). %%-------------------------------------------------------------------- %% `emqx_prometheus_cluster' API %%-------------------------------------------------------------------- fetch_from_local_node(Mode) -> - Validations = emqx_message_validation:list(), + Validations = emqx_schema_validation:list(), {node(), #{ ?metrics_data_key => to_validation_data(Mode, Validations) }}. @@ -70,7 +70,7 @@ fetch_cluster_consistented_data() -> aggre_or_zip_init_acc() -> #{ - ?metrics_data_key => maps:from_keys(message_validation_metric(names), []) + ?metrics_data_key => maps:from_keys(schema_validation_metric(names), []) }. logic_sum_metrics() -> @@ -89,12 +89,12 @@ deregister_cleanup(_) -> ok. -spec collect_mf(_Registry, Callback) -> ok when _Registry :: prometheus_registry:registry(), Callback :: prometheus_collector:collect_mf_callback(). -collect_mf(?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY, Callback) -> +collect_mf(?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY, Callback) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), - %% Message Validation Metrics + %% Schema Validation Metrics RuleMetricDs = ?MG(?metrics_data_key, RawData), - ok = add_collect_family(Callback, message_validation_metrics_meta(), RuleMetricDs), + ok = add_collect_family(Callback, schema_validation_metrics_meta(), RuleMetricDs), ok; collect_mf(_, _) -> @@ -104,10 +104,10 @@ collect_mf(_, _) -> collect(<<"json">>) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), #{ - message_validations => collect_json_data(?MG(?metrics_data_key, RawData)) + schema_validations => collect_json_data(?MG(?metrics_data_key, RawData)) }; collect(<<"prometheus">>) -> - prometheus_text_format:format(?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY). + prometheus_text_format:format(?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY). %%==================== %% API Helpers @@ -128,7 +128,7 @@ collect_metrics(Name, Metrics) -> %%-------------------------------------------------------------------- %%======================================== -%% Message Validation Metrics +%% Schema Validation Metrics %%======================================== collect_mv(K = ?key_enabled, Data) -> gauge_metrics(?MG(K, Data)); collect_mv(K = ?key_matched, Data) -> counter_metrics(?MG(K, Data)); @@ -140,10 +140,10 @@ collect_mv(K = ?key_succeeded, Data) -> counter_metrics(?MG(K, Data)). %%-------------------------------------------------------------------- %%======================================== -%% Message Validation Metrics +%% Schema Validation Metrics %%======================================== -message_validation_metrics_meta() -> +schema_validation_metrics_meta() -> [ {?key_enabled, gauge}, {?key_matched, counter}, @@ -151,15 +151,15 @@ message_validation_metrics_meta() -> {?key_succeeded, counter} ]. -message_validation_metric(names) -> - emqx_prometheus_cluster:metric_names(message_validation_metrics_meta()). +schema_validation_metric(names) -> + emqx_prometheus_cluster:metric_names(schema_validation_metrics_meta()). to_validation_data(Mode, Validations) -> lists:foldl( fun(#{name := Name} = Validation, Acc) -> merge_acc_with_validations(Mode, Name, get_validation_metrics(Validation), Acc) end, - maps:from_keys(message_validation_metric(names), []), + maps:from_keys(schema_validation_metric(names), []), Validations ). @@ -176,7 +176,7 @@ validation_point(Mode, Name, V) -> {with_node_label(Mode, [{validation_name, Name}]), V}. get_validation_metrics(#{name := Name, enable := Enabled} = _Rule) -> - #{counters := Counters} = emqx_message_validation_registry:get_metrics(Name), + #{counters := Counters} = emqx_schema_validation_registry:get_metrics(Name), #{ ?key_enabled => emqx_prometheus_cluster:boolean_to_number(Enabled), ?key_matched => ?MG0('matched', Counters), @@ -192,9 +192,9 @@ get_validation_metrics(#{name := Name, enable := Enabled} = _Rule) -> %% merge / zip formatting funcs for type `application/json` collect_json_data(Data) -> - emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_message_validation_metrics/3). + emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_schema_validation_metrics/3). -zip_json_message_validation_metrics(Key, Points, [] = _AccIn) -> +zip_json_schema_validation_metrics(Key, Points, [] = _AccIn) -> lists:foldl( fun({Labels, Metric}, AccIn2) -> LabelsKVMap = maps:from_list(Labels), @@ -204,7 +204,7 @@ zip_json_message_validation_metrics(Key, Points, [] = _AccIn) -> [], Points ); -zip_json_message_validation_metrics(Key, Points, AllResultsAcc) -> +zip_json_schema_validation_metrics(Key, Points, AllResultsAcc) -> ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points), lists:zipwith(fun maps:merge/2, AllResultsAcc, ThisKeyResult). diff --git a/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl b/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl index c7b48db57..62c454e64 100644 --- a/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl +++ b/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl @@ -82,7 +82,7 @@ all() -> {group, '/prometheus/stats'}, {group, '/prometheus/auth'}, {group, '/prometheus/data_integration'}, - [{group, '/prometheus/message_validation'} || emqx_release:edition() == ee] + [{group, '/prometheus/schema_validation'} || emqx_release:edition() == ee] ]). groups() -> @@ -100,7 +100,7 @@ groups() -> {'/prometheus/stats', ModeGroups}, {'/prometheus/auth', ModeGroups}, {'/prometheus/data_integration', ModeGroups}, - {'/prometheus/message_validation', ModeGroups}, + {'/prometheus/schema_validation', ModeGroups}, {?PROM_DATA_MODE__NODE, AcceptGroups}, {?PROM_DATA_MODE__ALL_NODES_AGGREGATED, AcceptGroups}, {?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, AcceptGroups}, @@ -133,7 +133,7 @@ init_per_suite(Config) -> emqx_bridge_http, emqx_connector, [ - {emqx_message_validation, #{config => message_validation_config()}} + {emqx_schema_validation, #{config => schema_validation_config()}} || emqx_release:edition() == ee ], {emqx_prometheus, emqx_prometheus_SUITE:legacy_conf_default()} @@ -166,8 +166,8 @@ init_per_group('/prometheus/auth', Config) -> [{module, emqx_prometheus_auth} | Config]; init_per_group('/prometheus/data_integration', Config) -> [{module, emqx_prometheus_data_integration} | Config]; -init_per_group('/prometheus/message_validation', Config) -> - [{module, emqx_prometheus_message_validation} | Config]; +init_per_group('/prometheus/schema_validation', Config) -> + [{module, emqx_prometheus_schema_validation} | Config]; init_per_group(?PROM_DATA_MODE__NODE, Config) -> [{mode, ?PROM_DATA_MODE__NODE} | Config]; init_per_group(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Config) -> @@ -239,7 +239,7 @@ assert_data(_Module, {Code, Header, RawDataBinary}, #{type := <<"prometheus">>, assert_prom_data(DataL, Mode); assert_data(Module, {Code, JsonData}, #{type := <<"json">>, mode := Mode}) -> ?assertEqual(Code, 200), - ?assert(is_map(JsonData), true), + ?assertMatch(#{}, JsonData), assert_json_data(Module, JsonData, Mode). %%%%%%%%%%%%%%%%%%%% @@ -355,8 +355,8 @@ metric_meta(<<"emqx_schema_registrys_count">>) -> ?meta(0, 0, 0); metric_meta(<<"emqx_rule_", _Tail/binary>>) -> ?meta(1, 1, 2); metric_meta(<<"emqx_action_", _Tail/binary>>) -> ?meta(1, 1, 2); metric_meta(<<"emqx_connector_", _Tail/binary>>) -> ?meta(1, 1, 2); -%% `/prometheus/message_validation` -metric_meta(<<"emqx_message_validation_", _Tail/binary>>) -> ?meta(1, 1, 2); +%% `/prometheus/schema_validation` +metric_meta(<<"emqx_schema_validation_", _Tail/binary>>) -> ?meta(1, 1, 2); %% normal emqx metrics metric_meta(<<"emqx_", _Tail/binary>>) -> ?meta(0, 0, 1); metric_meta(_) -> #{}. @@ -821,16 +821,16 @@ assert_json_data__data_integration_overview(M, _) -> ). -endif. -assert_json_data__message_validations(Ms, _) -> +assert_json_data__schema_validations(Ms, _) -> lists:foreach( fun(M) -> ?assertMatch( #{ validation_name := _, - emqx_message_validation_enable := _, - emqx_message_validation_matched := _, - emqx_message_validation_failed := _, - emqx_message_validation_succeeded := _ + emqx_schema_validation_enable := _, + emqx_schema_validation_matched := _, + emqx_schema_validation_failed := _, + emqx_schema_validation_succeeded := _ }, M ) @@ -838,7 +838,7 @@ assert_json_data__message_validations(Ms, _) -> Ms ). -message_validation_config() -> +schema_validation_config() -> Validation = #{ <<"enable">> => true, <<"name">> => <<"my_validation">>, @@ -853,7 +853,7 @@ message_validation_config() -> ] }, #{ - <<"message_validation">> => #{ + <<"schema_validation">> => #{ <<"validations">> => [Validation] } }. diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 1ff5146e1..d6ed2c5c3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -44,7 +44,7 @@ on_session_unsubscribed/4, on_message_publish/2, on_message_dropped/4, - on_message_validation_failed/3, + on_schema_validation_failed/3, on_message_delivered/3, on_message_acked/3, on_delivery_dropped/4, @@ -79,7 +79,7 @@ event_names() -> 'message.delivered', 'message.acked', 'message.dropped', - 'message.validation_failed', + 'schema.validation_failed', 'delivery.dropped' ]. @@ -95,7 +95,7 @@ event_topics_enum() -> '$events/message_delivered', '$events/message_acked', '$events/message_dropped', - '$events/message_validation_failed', + '$events/schema_validation_failed', '$events/delivery_dropped' % '$events/message_publish' % not possible to use in SELECT FROM ]. @@ -224,13 +224,13 @@ on_message_dropped(Message, _, Reason, Conf) -> end, {ok, Message}. -on_message_validation_failed(Message, ValidationContext, Conf) -> +on_schema_validation_failed(Message, ValidationContext, Conf) -> case ignore_sys_message(Message) of true -> ok; false -> apply_event( - 'message.validation_failed', + 'schema.validation_failed', fun() -> eventmsg_validation_failed(Message, ValidationContext) end, Conf ) @@ -508,7 +508,7 @@ eventmsg_validation_failed( ) -> #{name := ValidationName} = ValidationContext, with_basic_columns( - 'message.validation_failed', + 'schema.validation_failed', #{ id => emqx_guid:to_hexstr(Id), validation => ValidationName, @@ -687,16 +687,16 @@ event_info() -> -if(?EMQX_RELEASE_EDITION == ee). %% ELSE (?EMQX_RELEASE_EDITION == ee). -event_info_message_validation_failed() -> +event_info_schema_validation_failed() -> event_info_common( - 'message.validation_failed', - {<<"message validation failed">>, <<"TODO"/utf8>>}, + 'schema.validation_failed', + {<<"schema validation failed">>, <<"TODO"/utf8>>}, {<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>}, - <<"SELECT * FROM \"$events/message_validation_failed\" WHERE topic =~ 't/#'">> + <<"SELECT * FROM \"$events/schema_validation_failed\" WHERE topic =~ 't/#'">> ). ee_event_info() -> [ - event_info_message_validation_failed() + event_info_schema_validation_failed() ]. -else. %% END (?EMQX_RELEASE_EDITION == ee). @@ -873,7 +873,7 @@ test_columns(Event) -> ee_test_columns(Event). -if(?EMQX_RELEASE_EDITION == ee). -ee_test_columns('message.validation_failed') -> +ee_test_columns('schema.validation_failed') -> [{<<"validation">>, <<"myvalidation">>}] ++ test_columns('message.publish'). %% ELSE (?EMQX_RELEASE_EDITION == ee). @@ -922,9 +922,9 @@ columns_with_exam('message.dropped') -> {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; -columns_with_exam('message.validation_failed') -> +columns_with_exam('schema.validation_failed') -> [ - {<<"event">>, 'message.validation_failed'}, + {<<"event">>, 'schema.validation_failed'}, {<<"validation">>, <<"my_validation">>}, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}, {<<"clientid">>, <<"c_emqx">>}, @@ -1129,7 +1129,7 @@ hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4; hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3; hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3; hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4; -hook_fun('message.validation_failed') -> fun ?MODULE:on_message_validation_failed/3; +hook_fun('schema.validation_failed') -> fun ?MODULE:on_schema_validation_failed/3; hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4; hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2; hook_fun(Event) -> error({invalid_event, Event}). @@ -1154,7 +1154,7 @@ event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed'; event_name(<<"$events/message_delivered">>) -> 'message.delivered'; event_name(<<"$events/message_acked">>) -> 'message.acked'; event_name(<<"$events/message_dropped">>) -> 'message.dropped'; -event_name(<<"$events/message_validation_failed">>) -> 'message.validation_failed'; +event_name(<<"$events/schema_validation_failed">>) -> 'schema.validation_failed'; event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped'; event_name(_) -> 'message.publish'. @@ -1168,7 +1168,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.dropped') -> <<"$events/message_dropped">>; -event_topic('message.validation_failed') -> <<"$events/message_validation_failed">>; +event_topic('schema.validation_failed') -> <<"$events/schema_validation_failed">>; event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; event_topic('message.publish') -> <<"$events/message_publish">>. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 9a5de7871..1429561a7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -27,7 +27,7 @@ inc_action_metrics/2 ]). -%% Internal exports used by message validation +%% Internal exports used by schema validation -export([evaluate_select/3, clear_rule_payload/0]). -import( diff --git a/apps/emqx_message_validation/BSL.txt b/apps/emqx_schema_validation/BSL.txt similarity index 100% rename from apps/emqx_message_validation/BSL.txt rename to apps/emqx_schema_validation/BSL.txt diff --git a/apps/emqx_message_validation/README.md b/apps/emqx_schema_validation/README.md similarity index 90% rename from apps/emqx_message_validation/README.md rename to apps/emqx_schema_validation/README.md index c32e74147..9882209ba 100644 --- a/apps/emqx_message_validation/README.md +++ b/apps/emqx_schema_validation/README.md @@ -1,4 +1,4 @@ -# EMQX Message Validation +# EMQX Schema Validation This application encapsulates the functionality to validate incoming or internally triggered published payloads and take an action upon failure, which can be to just drop @@ -7,7 +7,7 @@ the message without further processing, or to disconnect the offending client as # Documentation Refer to [Message -Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/message-validation.html) +Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/schema-validation.html) for more information about the semantics and checks available. # HTTP APIs @@ -16,7 +16,7 @@ APIs are provided for validation management, which includes creating, updating, looking up, deleting, listing validations. Refer to [API Docs - -Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Message-Validation) +Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Schema-Validation) for more detailed information. diff --git a/apps/emqx_message_validation/rebar.config b/apps/emqx_schema_validation/rebar.config similarity index 100% rename from apps/emqx_message_validation/rebar.config rename to apps/emqx_schema_validation/rebar.config diff --git a/apps/emqx_schema_validation/src/emqx_schema_validation.app.src b/apps/emqx_schema_validation/src/emqx_schema_validation.app.src new file mode 100644 index 000000000..a2bdf30cc --- /dev/null +++ b/apps/emqx_schema_validation/src/emqx_schema_validation.app.src @@ -0,0 +1,14 @@ +{application, emqx_schema_validation, [ + {description, "EMQX Schema Validation"}, + {vsn, "0.1.0"}, + {registered, [emqx_schema_validation_sup, emqx_schema_validation_registry]}, + {mod, {emqx_schema_validation_app, []}}, + {applications, [ + kernel, + stdlib + ]}, + {env, []}, + {modules, []}, + + {links, []} +]}. diff --git a/apps/emqx_message_validation/src/emqx_message_validation.erl b/apps/emqx_schema_validation/src/emqx_schema_validation.erl similarity index 91% rename from apps/emqx_message_validation/src/emqx_message_validation.erl rename to apps/emqx_schema_validation/src/emqx_schema_validation.erl index 8f0b41d2f..12aa1e733 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation.erl +++ b/apps/emqx_schema_validation/src/emqx_schema_validation.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_message_validation). +-module(emqx_schema_validation). -include_lib("snabbkaffe/include/trace.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl"). @@ -47,8 +47,8 @@ %% Type declarations %%------------------------------------------------------------------------------ --define(TRACE_TAG, "MESSAGE_VALIDATION"). --define(CONF_ROOT, message_validation). +-define(TRACE_TAG, "SCHEMA_VALIDATION"). +-define(CONF_ROOT, schema_validation). -define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]). -type validation_name() :: binary(). @@ -72,7 +72,7 @@ load() -> Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []), lists:foreach( fun({Pos, Validation}) -> - ok = emqx_message_validation_registry:insert(Pos, Validation) + ok = emqx_schema_validation_registry:insert(Pos, Validation) end, lists:enumerate(Validations) ). @@ -81,7 +81,7 @@ unload() -> Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []), lists:foreach( fun(Validation) -> - ok = emqx_message_validation_registry:delete(Validation) + ok = emqx_schema_validation_registry:delete(Validation) end, Validations ). @@ -146,7 +146,7 @@ unregister_hooks() -> -spec on_message_publish(emqx_types:message()) -> {ok, emqx_types:message()} | {stop, emqx_types:message()}. on_message_publish(Message = #message{topic = Topic, headers = Headers}) -> - case emqx_message_validation_registry:matching_validations(Topic) of + case emqx_schema_validation_registry:matching_validations(Topic) of [] -> ok; Validations -> @@ -184,19 +184,19 @@ pre_config_update(?VALIDATIONS_CONF_PATH, {reorder, Order}, OldValidations) -> post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) -> {Pos, Validation} = fetch_with_index(New, Name), - ok = emqx_message_validation_registry:insert(Pos, Validation), + ok = emqx_schema_validation_registry:insert(Pos, Validation), ok; post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) -> {_Pos, OldValidation} = fetch_with_index(Old, Name), {Pos, NewValidation} = fetch_with_index(New, Name), - ok = emqx_message_validation_registry:update(OldValidation, Pos, NewValidation), + ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation), ok; post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) -> {_Pos, Validation} = fetch_with_index(Old, Name), - ok = emqx_message_validation_registry:delete(Validation), + ok = emqx_schema_validation_registry:delete(Validation), ok; post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) -> - ok = emqx_message_validation_registry:reindex_positions(New), + ok = emqx_schema_validation_registry:reindex_positions(New), ok. %%------------------------------------------------------------------------------ @@ -395,26 +395,26 @@ run_validations(Validations, Message) -> emqx_rule_runtime:clear_rule_payload(), Fun = fun(Validation, Acc) -> #{name := Name} = Validation, - emqx_message_validation_registry:inc_matched(Name), + emqx_schema_validation_registry:inc_matched(Name), case run_validation(Validation, Message) of ok -> - emqx_message_validation_registry:inc_succeeded(Name), + emqx_schema_validation_registry:inc_succeeded(Name), {cont, Acc}; ignore -> trace_failure(Validation, "validation_failed", #{ validation => Name, action => ignore }), - emqx_message_validation_registry:inc_failed(Name), - run_message_validation_failed_hook(Message, Validation), + emqx_schema_validation_registry:inc_failed(Name), + run_schema_validation_failed_hook(Message, Validation), {cont, Acc}; FailureAction -> trace_failure(Validation, "validation_failed", #{ validation => Name, action => FailureAction }), - emqx_message_validation_registry:inc_failed(Name), - run_message_validation_failed_hook(Message, Validation), + emqx_schema_validation_registry:inc_failed(Name), + run_schema_validation_failed_hook(Message, Validation), {halt, FailureAction} end end, @@ -457,17 +457,17 @@ trace_failure(#{log_failure := #{level := none}} = Validation, _Msg, _Meta) -> name := _Name, failure_action := _Action } = Validation, - ?tp(message_validation_failed, #{log_level => none, name => _Name, action => _Action}), + ?tp(schema_validation_failed, #{log_level => none, name => _Name, action => _Action}), ok; trace_failure(#{log_failure := #{level := Level}} = Validation, Msg, Meta) -> #{ name := _Name, failure_action := _Action } = Validation, - ?tp(message_validation_failed, #{log_level => Level, name => _Name, action => _Action}), + ?tp(schema_validation_failed, #{log_level => Level, name => _Name, action => _Action}), ?TRACE(Level, ?TRACE_TAG, Msg, Meta). -run_message_validation_failed_hook(Message, Validation) -> +run_schema_validation_failed_hook(Message, Validation) -> #{name := Name} = Validation, ValidationContext = #{name => Name}, - emqx_hooks:run('message.validation_failed', [Message, ValidationContext]). + emqx_hooks:run('schema.validation_failed', [Message, ValidationContext]). diff --git a/apps/emqx_message_validation/src/emqx_message_validation_app.erl b/apps/emqx_schema_validation/src/emqx_schema_validation_app.erl similarity index 67% rename from apps/emqx_message_validation/src/emqx_message_validation_app.erl rename to apps/emqx_schema_validation/src/emqx_schema_validation_app.erl index 4e566eedd..107ae4e8f 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_app.erl +++ b/apps/emqx_schema_validation/src/emqx_schema_validation_app.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_message_validation_app). +-module(emqx_schema_validation_app). -behaviour(application). @@ -18,15 +18,15 @@ -spec start(application:start_type(), term()) -> {ok, pid()}. start(_Type, _Args) -> - {ok, Sup} = emqx_message_validation_sup:start_link(), - ok = emqx_message_validation:add_handler(), - ok = emqx_message_validation:register_hooks(), - ok = emqx_message_validation:load(), + {ok, Sup} = emqx_schema_validation_sup:start_link(), + ok = emqx_schema_validation:add_handler(), + ok = emqx_schema_validation:register_hooks(), + ok = emqx_schema_validation:load(), {ok, Sup}. -spec stop(term()) -> ok. stop(_State) -> - ok = emqx_message_validation:unload(), - ok = emqx_message_validation:unregister_hooks(), - ok = emqx_message_validation:remove_handler(), + ok = emqx_schema_validation:unload(), + ok = emqx_schema_validation:unregister_hooks(), + ok = emqx_schema_validation:remove_handler(), ok. diff --git a/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl b/apps/emqx_schema_validation/src/emqx_schema_validation_http_api.erl similarity index 83% rename from apps/emqx_message_validation/src/emqx_message_validation_http_api.erl rename to apps/emqx_schema_validation/src/emqx_schema_validation_http_api.erl index ac12eacd4..60df2151f 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl +++ b/apps/emqx_schema_validation/src/emqx_schema_validation_http_api.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_message_validation_http_api). +-module(emqx_schema_validation_http_api). -behaviour(minirest_api). @@ -21,43 +21,43 @@ %% `minirest' handlers -export([ - '/message_validations'/2, - '/message_validations/reorder'/2, - '/message_validations/validation/:name'/2, - '/message_validations/validation/:name/metrics'/2, - '/message_validations/validation/:name/metrics/reset'/2, - '/message_validations/validation/:name/enable/:enable'/2 + '/schema_validations'/2, + '/schema_validations/reorder'/2, + '/schema_validations/validation/:name'/2, + '/schema_validations/validation/:name/metrics'/2, + '/schema_validations/validation/:name/metrics/reset'/2, + '/schema_validations/validation/:name/enable/:enable'/2 ]). %%------------------------------------------------------------------------------------------------- %% Type definitions %%------------------------------------------------------------------------------------------------- --define(TAGS, [<<"Message Validation">>]). --define(METRIC_NAME, message_validation). +-define(TAGS, [<<"Schema Validation">>]). +-define(METRIC_NAME, schema_validation). %%------------------------------------------------------------------------------------------------- %% `minirest' and `minirest_trails' API %%------------------------------------------------------------------------------------------------- -namespace() -> "message_validation_http_api". +namespace() -> "schema_validation_http_api". api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). paths() -> [ - "/message_validations", - "/message_validations/reorder", - "/message_validations/validation/:name", - "/message_validations/validation/:name/metrics", - "/message_validations/validation/:name/metrics/reset", - "/message_validations/validation/:name/enable/:enable" + "/schema_validations", + "/schema_validations/reorder", + "/schema_validations/validation/:name", + "/schema_validations/validation/:name/metrics", + "/schema_validations/validation/:name/metrics/reset", + "/schema_validations/validation/:name/enable/:enable" ]. -schema("/message_validations") -> +schema("/schema_validations") -> #{ - 'operationId' => '/message_validations', + 'operationId' => '/schema_validations', get => #{ tags => ?TAGS, summary => <<"List validations">>, @@ -67,7 +67,7 @@ schema("/message_validations") -> 200 => emqx_dashboard_swagger:schema_with_examples( array( - emqx_message_validation_schema:api_schema(list) + emqx_schema_validation_schema:api_schema(list) ), example_return_list() ) @@ -78,14 +78,14 @@ schema("/message_validations") -> summary => <<"Append a new validation">>, description => ?DESC("append_validation"), 'requestBody' => emqx_dashboard_swagger:schema_with_examples( - emqx_message_validation_schema:api_schema(post), + emqx_schema_validation_schema:api_schema(post), example_input_create() ), responses => #{ 201 => emqx_dashboard_swagger:schema_with_examples( - emqx_message_validation_schema:api_schema(post), + emqx_schema_validation_schema:api_schema(post), example_return_create() ), 400 => error_schema('ALREADY_EXISTS', "Validation already exists") @@ -96,14 +96,14 @@ schema("/message_validations") -> summary => <<"Update a validation">>, description => ?DESC("update_validation"), 'requestBody' => emqx_dashboard_swagger:schema_with_examples( - emqx_message_validation_schema:api_schema(put), + emqx_schema_validation_schema:api_schema(put), example_input_update() ), responses => #{ 200 => emqx_dashboard_swagger:schema_with_examples( - emqx_message_validation_schema:api_schema(put), + emqx_schema_validation_schema:api_schema(put), example_return_update() ), 404 => error_schema('NOT_FOUND', "Validation not found"), @@ -111,9 +111,9 @@ schema("/message_validations") -> } } }; -schema("/message_validations/reorder") -> +schema("/schema_validations/reorder") -> #{ - 'operationId' => '/message_validations/reorder', + 'operationId' => '/schema_validations/reorder', post => #{ tags => ?TAGS, summary => <<"Reorder all validations">>, @@ -140,9 +140,9 @@ schema("/message_validations/reorder") -> } } }; -schema("/message_validations/validation/:name") -> +schema("/schema_validations/validation/:name") -> #{ - 'operationId' => '/message_validations/validation/:name', + 'operationId' => '/schema_validations/validation/:name', get => #{ tags => ?TAGS, summary => <<"Lookup a validation">>, @@ -153,7 +153,7 @@ schema("/message_validations/validation/:name") -> 200 => emqx_dashboard_swagger:schema_with_examples( array( - emqx_message_validation_schema:api_schema(lookup) + emqx_schema_validation_schema:api_schema(lookup) ), example_return_lookup() ), @@ -172,9 +172,9 @@ schema("/message_validations/validation/:name") -> } } }; -schema("/message_validations/validation/:name/metrics") -> +schema("/schema_validations/validation/:name/metrics") -> #{ - 'operationId' => '/message_validations/validation/:name/metrics', + 'operationId' => '/schema_validations/validation/:name/metrics', get => #{ tags => ?TAGS, summary => <<"Get validation metrics">>, @@ -191,9 +191,9 @@ schema("/message_validations/validation/:name/metrics") -> } } }; -schema("/message_validations/validation/:name/metrics/reset") -> +schema("/schema_validations/validation/:name/metrics/reset") -> #{ - 'operationId' => '/message_validations/validation/:name/metrics/reset', + 'operationId' => '/schema_validations/validation/:name/metrics/reset', post => #{ tags => ?TAGS, summary => <<"Reset validation metrics">>, @@ -206,9 +206,9 @@ schema("/message_validations/validation/:name/metrics/reset") -> } } }; -schema("/message_validations/validation/:name/enable/:enable") -> +schema("/schema_validations/validation/:name/enable/:enable") -> #{ - 'operationId' => '/message_validations/validation/:name/enable/:enable', + 'operationId' => '/schema_validations/validation/:name/enable/:enable', post => #{ tags => ?TAGS, summary => <<"Enable or disable validation">>, @@ -285,29 +285,29 @@ fields(node_metrics) -> %% `minirest' handlers %%------------------------------------------------------------------------------------------------- -'/message_validations'(get, _Params) -> - ?OK(emqx_message_validation:list()); -'/message_validations'(post, #{body := Params = #{<<"name">> := Name}}) -> +'/schema_validations'(get, _Params) -> + ?OK(emqx_schema_validation:list()); +'/schema_validations'(post, #{body := Params = #{<<"name">> := Name}}) -> with_validation( Name, return(?BAD_REQUEST('ALREADY_EXISTS', <<"Validation already exists">>)), fun() -> - case emqx_message_validation:insert(Params) of + case emqx_schema_validation:insert(Params) of {ok, _} -> - {ok, Res} = emqx_message_validation:lookup(Name), + {ok, Res} = emqx_schema_validation:lookup(Name), {201, Res}; {error, Error} -> ?BAD_REQUEST(Error) end end ); -'/message_validations'(put, #{body := Params = #{<<"name">> := Name}}) -> +'/schema_validations'(put, #{body := Params = #{<<"name">> := Name}}) -> with_validation( Name, fun() -> - case emqx_message_validation:update(Params) of + case emqx_schema_validation:update(Params) of {ok, _} -> - {ok, Res} = emqx_message_validation:lookup(Name), + {ok, Res} = emqx_schema_validation:lookup(Name), {200, Res}; {error, Error} -> ?BAD_REQUEST(Error) @@ -316,17 +316,17 @@ fields(node_metrics) -> not_found() ). -'/message_validations/validation/:name'(get, #{bindings := #{name := Name}}) -> +'/schema_validations/validation/:name'(get, #{bindings := #{name := Name}}) -> with_validation( Name, fun(Validation) -> ?OK(Validation) end, not_found() ); -'/message_validations/validation/:name'(delete, #{bindings := #{name := Name}}) -> +'/schema_validations/validation/:name'(delete, #{bindings := #{name := Name}}) -> with_validation( Name, fun() -> - case emqx_message_validation:delete(Name) of + case emqx_schema_validation:delete(Name) of {ok, _} -> ?NO_CONTENT; {error, Error} -> @@ -336,10 +336,10 @@ fields(node_metrics) -> not_found() ). -'/message_validations/reorder'(post, #{body := #{<<"order">> := Order}}) -> +'/schema_validations/reorder'(post, #{body := #{<<"order">> := Order}}) -> do_reorder(Order). -'/message_validations/validation/:name/enable/:enable'(post, #{ +'/schema_validations/validation/:name/enable/:enable'(post, #{ bindings := #{name := Name, enable := Enable} }) -> with_validation( @@ -348,7 +348,7 @@ fields(node_metrics) -> not_found() ). -'/message_validations/validation/:name/metrics'(get, #{bindings := #{name := Name}}) -> +'/schema_validations/validation/:name/metrics'(get, #{bindings := #{name := Name}}) -> with_validation( Name, fun() -> @@ -371,7 +371,7 @@ fields(node_metrics) -> not_found() ). -'/message_validations/validation/:name/metrics/reset'(post, #{bindings := #{name := Name}}) -> +'/schema_validations/validation/:name/metrics/reset'(post, #{bindings := #{name := Name}}) -> with_validation( Name, fun() -> @@ -516,7 +516,7 @@ error_schema(Codes, Message, ExtraFields) when is_list(Codes) andalso is_binary( ExtraFields ++ emqx_dashboard_swagger:error_codes(Codes, Message). do_reorder(Order) -> - case emqx_message_validation:reorder(Order) of + case emqx_schema_validation:reorder(Order) of {ok, _} -> ?NO_CONTENT; {error, @@ -538,7 +538,7 @@ do_reorder(Order) -> do_enable_disable(Validation, Enable) -> RawValidation = make_serializable(Validation), - case emqx_message_validation:update(RawValidation#{<<"enable">> => Enable}) of + case emqx_schema_validation:update(RawValidation#{<<"enable">> => Enable}) of {ok, _} -> ?NO_CONTENT; {error, Reason} -> @@ -546,7 +546,7 @@ do_enable_disable(Validation, Enable) -> end. with_validation(Name, FoundFn, NotFoundFn) -> - case emqx_message_validation:lookup(Name) of + case emqx_schema_validation:lookup(Name) of {ok, Validation} -> {arity, Arity} = erlang:fun_info(FoundFn, arity), case Arity of @@ -564,15 +564,15 @@ not_found() -> return(?NOT_FOUND(<<"Validation not found">>)). make_serializable(Validation) -> - Schema = emqx_message_validation_schema, + Schema = emqx_schema_validation_schema, RawConfig = #{ - <<"message_validation">> => #{ + <<"schema_validation">> => #{ <<"validations">> => [emqx_utils_maps:binary_key_map(Validation)] } }, #{ - <<"message_validation">> := #{ + <<"schema_validation">> := #{ <<"validations">> := [Serialized] } diff --git a/apps/emqx_message_validation/src/emqx_message_validation_registry.erl b/apps/emqx_schema_validation/src/emqx_schema_validation_registry.erl similarity index 95% rename from apps/emqx_message_validation/src/emqx_message_validation_registry.erl rename to apps/emqx_schema_validation/src/emqx_schema_validation_registry.erl index fd715e750..c25007424 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_registry.erl +++ b/apps/emqx_schema_validation/src/emqx_schema_validation_registry.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_message_validation_registry). +-module(emqx_schema_validation_registry). -behaviour(gen_server). @@ -36,10 +36,10 @@ %% Type declarations %%------------------------------------------------------------------------------ --define(VALIDATION_TOPIC_INDEX, emqx_message_validation_index). --define(VALIDATION_TAB, emqx_message_validation_tab). +-define(VALIDATION_TOPIC_INDEX, emqx_schema_validation_index). +-define(VALIDATION_TAB, emqx_schema_validation_tab). --define(METRIC_NAME, message_validation). +-define(METRIC_NAME, schema_validation). -define(METRICS, [ 'matched', 'succeeded', @@ -106,7 +106,7 @@ matching_validations(Topic) -> -spec metrics_worker_spec() -> supervisor:child_spec(). metrics_worker_spec() -> - emqx_metrics_worker:child_spec(message_validation_metrics, ?METRIC_NAME). + emqx_metrics_worker:child_spec(schema_validation_metrics, ?METRIC_NAME). -spec get_metrics(validation_name()) -> emqx_metrics_worker:metrics(). get_metrics(Name) -> @@ -243,7 +243,7 @@ transform_validation(Validation = #{checks := Checks}) -> Validation#{checks := lists:map(fun transform_check/1, Checks)}. transform_check(#{type := sql, sql := SQL}) -> - {ok, Check} = emqx_message_validation:parse_sql_check(SQL), + {ok, Check} = emqx_schema_validation:parse_sql_check(SQL), Check; transform_check(Check) -> Check. diff --git a/apps/emqx_message_validation/src/emqx_message_validation_schema.erl b/apps/emqx_schema_validation/src/emqx_schema_validation_schema.erl similarity index 97% rename from apps/emqx_message_validation/src/emqx_message_validation_schema.erl rename to apps/emqx_schema_validation/src/emqx_schema_validation_schema.erl index 3c56c5ada..fa9461745 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_schema.erl +++ b/apps/emqx_schema_validation/src/emqx_schema_validation_schema.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_message_validation_schema). +-module(emqx_schema_validation_schema). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -26,12 +26,12 @@ %% `hocon_schema' API %%------------------------------------------------------------------------------ -namespace() -> message_validation. +namespace() -> schema_validation. roots() -> - [{message_validation, mk(ref(message_validation), #{importance => ?IMPORTANCE_HIDDEN})}]. + [{schema_validation, mk(ref(schema_validation), #{importance => ?IMPORTANCE_HIDDEN})}]. -fields(message_validation) -> +fields(schema_validation) -> [ {validations, mk( @@ -199,7 +199,7 @@ ensure_array(L, _) when is_list(L) -> L; ensure_array(B, _) -> [B]. validate_sql(SQL) -> - case emqx_message_validation:parse_sql_check(SQL) of + case emqx_schema_validation:parse_sql_check(SQL) of {ok, _Parsed} -> ok; Error = {error, _} -> diff --git a/apps/emqx_message_validation/src/emqx_message_validation_sup.erl b/apps/emqx_schema_validation/src/emqx_schema_validation_sup.erl similarity index 88% rename from apps/emqx_message_validation/src/emqx_message_validation_sup.erl rename to apps/emqx_schema_validation/src/emqx_schema_validation_sup.erl index 2e8d6b8c6..f20c9ccf8 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_sup.erl +++ b/apps/emqx_schema_validation/src/emqx_schema_validation_sup.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_message_validation_sup). +-module(emqx_schema_validation_sup). -behaviour(supervisor). @@ -23,8 +23,8 @@ start_link() -> %%------------------------------------------------------------------------------ init([]) -> - Registry = worker_spec(emqx_message_validation_registry), - Metrics = emqx_message_validation_registry:metrics_worker_spec(), + Registry = worker_spec(emqx_schema_validation_registry), + Metrics = emqx_schema_validation_registry:metrics_worker_spec(), SupFlags = #{ strategy => one_for_one, intensity => 10, diff --git a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl b/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl similarity index 97% rename from apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl rename to apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl index 879da797c..0a5cd49cd 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl +++ b/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_message_validation_http_api_SUITE). +-module(emqx_schema_validation_http_api_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -31,7 +31,7 @@ init_per_suite(Config) -> emqx, emqx_conf, emqx_rule_engine, - emqx_message_validation, + emqx_schema_validation, emqx_management, emqx_mgmt_api_test_util:emqx_dashboard(), emqx_schema_registry @@ -66,9 +66,9 @@ end_per_testcase(_TestCase, _Config) -> clear_all_validations() -> lists:foreach( fun(#{name := Name}) -> - {ok, _} = emqx_message_validation:delete(Name) + {ok, _} = emqx_schema_validation:delete(Name) end, - emqx_message_validation:list() + emqx_schema_validation:list() ). reset_all_global_metrics() -> @@ -146,7 +146,7 @@ schema_check(Type, SerdeName, Overrides) -> Overrides ). -api_root() -> "message_validations". +api_root() -> "schema_validations". simplify_result(Res) -> case Res of @@ -358,7 +358,7 @@ assert_index_order(ExpectedOrder, Topic, Comment) -> ExpectedOrder, [ N - || #{name := N} <- emqx_message_validation_registry:matching_validations(Topic) + || #{name := N} <- emqx_schema_validation_registry:matching_validations(Topic) ], Comment ). @@ -366,7 +366,7 @@ assert_index_order(ExpectedOrder, Topic, Comment) -> create_failure_tracing_rule() -> Params = #{ enable => true, - sql => <<"select * from \"$events/message_validation_failed\" ">>, + sql => <<"select * from \"$events/schema_validation_failed\" ">>, actions => [make_trace_fn_action()] }, Path = emqx_mgmt_api_test_util:api_path(["rules"]), @@ -689,7 +689,7 @@ t_log_failure_none(_Config) -> ok end, fun(Trace) -> - ?assertMatch([#{log_level := none}], ?of_kind(message_validation_failed, Trace)), + ?assertMatch([#{log_level := none}], ?of_kind(schema_validation_failed, Trace)), ok end ), @@ -719,12 +719,12 @@ t_action_ignore(_Config) -> ok end, fun(Trace) -> - ?assertMatch([#{action := ignore}], ?of_kind(message_validation_failed, Trace)), + ?assertMatch([#{action := ignore}], ?of_kind(schema_validation_failed, Trace)), ok end ), ?assertMatch( - [{_, #{data := #{validation := Name1, event := 'message.validation_failed'}}}], + [{_, #{data := #{validation := Name1, event := 'schema.validation_failed'}}}], get_traced_failures_from_rule_engine() ), ok. @@ -1093,8 +1093,8 @@ t_multiple_validations(_Config) -> ?assertMatch( [ - {_, #{data := #{validation := Name1, event := 'message.validation_failed'}}}, - {_, #{data := #{validation := Name2, event := 'message.validation_failed'}}} + {_, #{data := #{validation := Name1, event := 'schema.validation_failed'}}}, + {_, #{data := #{validation := Name2, event := 'schema.validation_failed'}}} ], get_traced_failures_from_rule_engine() ), diff --git a/apps/emqx_message_validation/test/emqx_message_validation_tests.erl b/apps/emqx_schema_validation/test/emqx_schema_validation_tests.erl similarity index 92% rename from apps/emqx_message_validation/test/emqx_message_validation_tests.erl rename to apps/emqx_schema_validation/test/emqx_schema_validation_tests.erl index 2c33a20ba..a75e4b556 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_tests.erl +++ b/apps/emqx_schema_validation/test/emqx_schema_validation_tests.erl @@ -1,22 +1,22 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_message_validation_tests). +-module(emqx_schema_validation_tests). -include_lib("eunit/include/eunit.hrl"). --define(VALIDATIONS_PATH, "message_validation.validations"). +-define(VALIDATIONS_PATH, "schema_validation.validations"). %%------------------------------------------------------------------------------ %% Helper fns %%------------------------------------------------------------------------------ parse_and_check(InnerConfigs) -> - RootBin = <<"message_validation">>, + RootBin = <<"schema_validation">>, InnerBin = <<"validations">>, RawConf = #{RootBin => #{InnerBin => InnerConfigs}}, #{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain( - emqx_message_validation_schema, + emqx_schema_validation_schema, RawConf, #{ required => false, @@ -65,9 +65,9 @@ schema_check(Type, SerdeName, Overrides) -> ). eval_sql(Message, SQL) -> - {ok, Check} = emqx_message_validation:parse_sql_check(SQL), + {ok, Check} = emqx_schema_validation:parse_sql_check(SQL), Validation = #{log_failure => #{level => warning}, name => <<"validation">>}, - emqx_message_validation:evaluate_sql_check(Check, Validation, Message). + emqx_schema_validation:evaluate_sql_check(Check, Validation, Message). message() -> message(_Opts = #{}). @@ -196,7 +196,7 @@ invalid_names_test_() -> {_Schema, [ #{ kind := validation_error, - path := "message_validation.validations.1.name" + path := "schema_validation.validations.1.name" } ]}, parse_and_check([validation(InvalidName, [sql_check()])]) @@ -239,7 +239,7 @@ duplicated_check_test_() -> #{ reason := <<"duplicated topics: t/1">>, kind := validation_error, - path := "message_validation.validations.1.topics" + path := "schema_validation.validations.1.topics" } ]}, parse_and_check([ @@ -256,7 +256,7 @@ duplicated_check_test_() -> #{ reason := <<"duplicated topics: t/1">>, kind := validation_error, - path := "message_validation.validations.1.topics" + path := "schema_validation.validations.1.topics" } ]}, parse_and_check([ @@ -273,7 +273,7 @@ duplicated_check_test_() -> #{ reason := <<"duplicated topics: t/1, t/2">>, kind := validation_error, - path := "message_validation.validations.1.topics" + path := "schema_validation.validations.1.topics" } ]}, parse_and_check([ @@ -320,7 +320,7 @@ duplicated_check_test_() -> #{ reason := <<"duplicated schema checks: json:a">>, kind := validation_error, - path := "message_validation.validations.1.checks" + path := "schema_validation.validations.1.checks" } ]}, parse_and_check([ @@ -336,7 +336,7 @@ duplicated_check_test_() -> #{ reason := <<"duplicated schema checks: json:a">>, kind := validation_error, - path := "message_validation.validations.1.checks" + path := "schema_validation.validations.1.checks" } ]}, parse_and_check([ @@ -353,7 +353,7 @@ duplicated_check_test_() -> #{ reason := <<"duplicated schema checks: json:a">>, kind := validation_error, - path := "message_validation.validations.1.checks" + path := "schema_validation.validations.1.checks" } ]}, parse_and_check([ @@ -370,7 +370,7 @@ duplicated_check_test_() -> #{ reason := <<"duplicated schema checks: json:a">>, kind := validation_error, - path := "message_validation.validations.1.checks" + path := "schema_validation.validations.1.checks" } ]}, parse_and_check([ @@ -387,7 +387,7 @@ duplicated_check_test_() -> #{ reason := <<"duplicated schema checks: ", _/binary>>, kind := validation_error, - path := "message_validation.validations.1.checks" + path := "schema_validation.validations.1.checks" } ]}, parse_and_check([ diff --git a/mix.exs b/mix.exs index d4d85c3c0..621b327b4 100644 --- a/mix.exs +++ b/mix.exs @@ -189,7 +189,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_s3, :emqx_bridge_s3, :emqx_schema_registry, - :emqx_message_validation, + :emqx_schema_validation, :emqx_enterprise, :emqx_bridge_kinesis, :emqx_bridge_azure_event_hub, diff --git a/rebar.config.erl b/rebar.config.erl index 6d4027f10..8320cc62a 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -116,7 +116,7 @@ is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false; is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false; is_community_umbrella_app("apps/emqx_gateway_jt808") -> false; is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false; -is_community_umbrella_app("apps/emqx_message_validation") -> false; +is_community_umbrella_app("apps/emqx_schema_validation") -> false; is_community_umbrella_app("apps/emqx_eviction_agent") -> false; is_community_umbrella_app("apps/emqx_node_rebalance") -> false; is_community_umbrella_app(_) -> true. diff --git a/rel/i18n/emqx_prometheus_api.hocon b/rel/i18n/emqx_prometheus_api.hocon index b910f1d62..819d346d8 100644 --- a/rel/i18n/emqx_prometheus_api.hocon +++ b/rel/i18n/emqx_prometheus_api.hocon @@ -25,9 +25,9 @@ get_prom_data_integration_data.desc: get_prom_data_integration_data.label: """Prometheus Metrics for Data Integration""" -get_prom_message_validation.desc: -"""Get Prometheus Metrics for Message Validation""" -get_prom_message_validation.label: -"""Prometheus Metrics for Message Validation""" +get_prom_schema_validation.desc: +"""Get Prometheus Metrics for Schema Validation""" +get_prom_schema_validation.label: +"""Prometheus Metrics for Schema Validation""" } diff --git a/rel/i18n/emqx_message_validation_http_api.hocon b/rel/i18n/emqx_schema_validation_http_api.hocon similarity index 95% rename from rel/i18n/emqx_message_validation_http_api.hocon rename to rel/i18n/emqx_schema_validation_http_api.hocon index 439522854..a631ac904 100644 --- a/rel/i18n/emqx_message_validation_http_api.hocon +++ b/rel/i18n/emqx_schema_validation_http_api.hocon @@ -1,4 +1,4 @@ -emqx_message_validation_http_api { +emqx_schema_validation_http_api { list_validations.desc: """List validations""" diff --git a/rel/i18n/emqx_message_validation_schema.hocon b/rel/i18n/emqx_schema_validation_schema.hocon similarity index 98% rename from rel/i18n/emqx_message_validation_schema.hocon rename to rel/i18n/emqx_schema_validation_schema.hocon index 1aeec7233..986bb49d1 100644 --- a/rel/i18n/emqx_message_validation_schema.hocon +++ b/rel/i18n/emqx_schema_validation_schema.hocon @@ -1,4 +1,4 @@ -emqx_message_validation_schema { +emqx_schema_validation_schema { check_avro_type.desc: """Avro schema check"""