Merge branch 'release-57' into sync-r57-m-20240514

This commit is contained in:
Thales Macedo Garitezi 2024-05-14 15:03:04 -03:00
commit 2a0ac34656
40 changed files with 352 additions and 257 deletions

View File

@ -60,7 +60,7 @@
'message.publish', 'message.publish',
'message.puback', 'message.puback',
'message.dropped', 'message.dropped',
'message.validation_failed', 'schema.validation_failed',
'message.delivered', 'message.delivered',
'message.acked', 'message.acked',
'delivery.dropped', 'delivery.dropped',
@ -184,7 +184,7 @@ when
-callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) -> -callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) ->
callback_result(). callback_result().
-callback 'message.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) -> -callback 'schema.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) ->
callback_result(). callback_result().
-callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when -callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when

View File

@ -34,6 +34,9 @@
%% For CLI HTTP API outputs %% For CLI HTTP API outputs
-export([best_effort_json/1, best_effort_json/2, best_effort_json_obj/1]). -export([best_effort_json/1, best_effort_json/2, best_effort_json_obj/1]).
%% For emqx_trace_json_formatter
-export([format_msg/3]).
-ifdef(TEST). -ifdef(TEST).
-include_lib("proper/include/proper.hrl"). -include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").

View File

@ -110,7 +110,7 @@ persist(Msg) ->
). ).
needs_persistence(Msg) -> needs_persistence(Msg) ->
not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)). not emqx_message:get_flag(dup, Msg).
-spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result(). -spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
store_message(Msg) -> store_message(Msg) ->

View File

@ -23,6 +23,8 @@
%% logger_formatter:config/0 is not exported. %% logger_formatter:config/0 is not exported.
-type config() :: map(). -type config() :: map().
-define(DEFAULT_FORMATTER, fun logger:format_otp_report/1).
%%%----------------------------------------------------------------- %%%-----------------------------------------------------------------
%%% Callback Function %%% Callback Function
%%%----------------------------------------------------------------- %%%-----------------------------------------------------------------
@ -31,9 +33,10 @@
LogEvent :: logger:log_event(), LogEvent :: logger:log_event(),
Config :: config(). Config :: config().
format( format(
LogMap0, LogMap,
#{payload_encode := PEncode} #{payload_encode := PEncode} = Config
) -> ) ->
LogMap0 = maybe_format_msg(LogMap, Config),
LogMap1 = emqx_trace_formatter:evaluate_lazy_values(LogMap0), LogMap1 = emqx_trace_formatter:evaluate_lazy_values(LogMap0),
%% We just make some basic transformations on the input LogMap and then do %% We just make some basic transformations on the input LogMap and then do
%% an external call to create the JSON text %% an external call to create the JSON text
@ -46,6 +49,42 @@ format(
%%% Helper Functions %%% Helper Functions
%%%----------------------------------------------------------------- %%%-----------------------------------------------------------------
maybe_format_msg(#{msg := Msg, meta := Meta} = LogMap, Config) ->
try do_maybe_format_msg(Msg, Meta, Config) of
Map when is_map(Map) ->
LogMap#{meta => maps:merge(Meta, Map), msg => maps:get(msg, Map, "no_message")};
Bin when is_binary(Bin) ->
LogMap#{msg => Bin}
catch
C:R:S ->
Meta#{
msg => "emqx_logger_jsonfmt_format_error",
fmt_raw_input => Msg,
fmt_error => C,
fmt_reason => R,
fmt_stacktrace => S,
more => #{
original_log_entry => LogMap,
config => Config
}
}
end.
do_maybe_format_msg(String, _Meta, _Config) when is_list(String); is_binary(String) ->
unicode:characters_to_binary(String);
do_maybe_format_msg(undefined, _Meta, _Config) ->
#{};
do_maybe_format_msg({report, Report} = Msg, #{report_cb := Cb} = Meta, Config) ->
case is_map(Report) andalso Cb =:= ?DEFAULT_FORMATTER of
true ->
%% reporting a map without a customised format function
Report;
false ->
emqx_logger_jsonfmt:format_msg(Msg, Meta, Config)
end;
do_maybe_format_msg(Msg, Meta, Config) ->
emqx_logger_jsonfmt:format_msg(Msg, Meta, Config).
prepare_log_map(LogMap, PEncode) -> prepare_log_map(LogMap, PEncode) ->
NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)], NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)],
maps:from_list(NewKeyValuePairs). maps:from_list(NewKeyValuePairs).

View File

@ -383,8 +383,8 @@ default_appspec(emqx_dashboard, _SuiteOpts) ->
}; };
default_appspec(emqx_schema_registry, _SuiteOpts) -> default_appspec(emqx_schema_registry, _SuiteOpts) ->
#{schema_mod => emqx_schema_registry_schema, config => #{}}; #{schema_mod => emqx_schema_registry_schema, config => #{}};
default_appspec(emqx_message_validation, _SuiteOpts) -> default_appspec(emqx_schema_validation, _SuiteOpts) ->
#{schema_mod => emqx_message_validation_schema, config => #{}}; #{schema_mod => emqx_schema_validation_schema, config => #{}};
default_appspec(_, _) -> default_appspec(_, _) ->
#{}. #{}.

View File

@ -27,6 +27,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). -define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-define(EMQX_CONFIG, "sys_topics.sys_heartbeat_interval = 1s\n").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% SUITE boilerplate %% SUITE boilerplate
@ -66,19 +67,20 @@ groups() ->
init_per_group(persistence_disabled, Config) -> init_per_group(persistence_disabled, Config) ->
[ [
{emqx_config, "session_persistence { enable = false }"}, {emqx_config, ?EMQX_CONFIG ++ "session_persistence { enable = false }"},
{persistence, false} {persistence, false}
| Config | Config
]; ];
init_per_group(persistence_enabled, Config) -> init_per_group(persistence_enabled, Config) ->
[ [
{emqx_config, {emqx_config,
"session_persistence {\n" ?EMQX_CONFIG ++
" enable = true\n" "session_persistence {\n"
" last_alive_update_interval = 100ms\n" " enable = true\n"
" renew_streams_interval = 100ms\n" " last_alive_update_interval = 100ms\n"
" session_gc_interval = 2s\n" " renew_streams_interval = 100ms\n"
"}"}, " session_gc_interval = 2s\n"
"}"},
{persistence, ds} {persistence, ds}
| Config | Config
]; ];
@ -1334,6 +1336,43 @@ do_t_will_message(Config, Opts) ->
), ),
ok. ok.
t_sys_message_delivery(Config) ->
ConnFun = ?config(conn_fun, Config),
SysTopicFilter = emqx_topic:join(["$SYS", "brokers", '+', "uptime"]),
SysTopic = emqx_topic:join(["$SYS", "brokers", atom_to_list(node()), "uptime"]),
ClientId = ?config(client_id, Config),
{ok, Client1} = emqtt:start_link([
{clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config
]),
{ok, _} = emqtt:ConnFun(Client1),
{ok, _, [1]} = emqtt:subscribe(Client1, SysTopicFilter, [{qos, 1}, {rh, 2}]),
?assertMatch(
[
#{topic := SysTopic, qos := 0, retain := false, payload := _Uptime1},
#{topic := SysTopic, qos := 0, retain := false, payload := _Uptime2}
],
receive_messages(2)
),
ok = emqtt:disconnect(Client1),
{ok, Client2} = emqtt:start_link([
{clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, false}
| Config
]),
{ok, _} = emqtt:ConnFun(Client2),
?assertMatch(
[#{topic := SysTopic, qos := 0, retain := false, payload := _Uptime3}],
receive_messages(1)
).
get_topicwise_order(Msgs) -> get_topicwise_order(Msgs) ->
maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs). maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs).

View File

@ -78,7 +78,6 @@ init_per_group(persistence_enabled = Group, Config) ->
], ],
#{work_dir => emqx_cth_suite:work_dir(Group, Config)} #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
), ),
emqx_logger:set_log_level(debug),
[ [
{apps, Apps}, {apps, Apps},
{persistence_enabled, true} {persistence_enabled, true}
@ -89,7 +88,6 @@ init_per_group(persistence_disabled = Group, Config) ->
[{emqx, "session_persistence.enable = false"}], [{emqx, "session_persistence.enable = false"}],
#{work_dir => emqx_cth_suite:work_dir(Group, Config)} #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
), ),
emqx_logger:set_log_level(debug),
[ [
{apps, Apps}, {apps, Apps},
{persistence_enabled, false} {persistence_enabled, false}

View File

@ -35,7 +35,7 @@
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
namespace() -> namespace() ->
"bridge_s3". "bridge_s3_aggreg_upload".
roots() -> roots() ->
[]. [].

View File

@ -189,9 +189,9 @@ swagger_desc(sent_bytes) ->
swagger_desc(dropped) -> swagger_desc(dropped) ->
swagger_desc_format("Dropped messages "); swagger_desc_format("Dropped messages ");
swagger_desc(validation_succeeded) -> swagger_desc(validation_succeeded) ->
swagger_desc_format("Message validations succeeded "); swagger_desc_format("Schema validations succeeded ");
swagger_desc(validation_failed) -> swagger_desc(validation_failed) ->
swagger_desc_format("Message validations failed "); swagger_desc_format("Schema validations failed ");
swagger_desc(persisted) -> swagger_desc(persisted) ->
swagger_desc_format("Messages saved to the durable storage "); swagger_desc_format("Messages saved to the durable storage ");
swagger_desc(subscriptions) -> swagger_desc(subscriptions) ->
@ -217,9 +217,9 @@ swagger_desc(sent_msg_rate) ->
swagger_desc(dropped_msg_rate) -> swagger_desc(dropped_msg_rate) ->
swagger_desc_format("Dropped messages ", per); swagger_desc_format("Dropped messages ", per);
swagger_desc(validation_succeeded_rate) -> swagger_desc(validation_succeeded_rate) ->
swagger_desc_format("Message validations succeeded ", per); swagger_desc_format("Schema validations succeeded ", per);
swagger_desc(validation_failed_rate) -> swagger_desc(validation_failed_rate) ->
swagger_desc_format("Message validations failed ", per); swagger_desc_format("Schema validations failed ", per);
swagger_desc(persisted_rate) -> swagger_desc(persisted_rate) ->
swagger_desc_format("Messages saved to the durable storage ", per); swagger_desc_format("Messages saved to the durable storage ", per);
swagger_desc(retained_msg_count) -> swagger_desc(retained_msg_count) ->

View File

@ -339,13 +339,6 @@ get_streams(Shard, TopicFilter, StartTime) ->
case generation_get(Shard, GenId) of case generation_get(Shard, GenId) of
#{module := Mod, data := GenData} -> #{module := Mod, data := GenData} ->
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
?tp(get_streams_get_gen_topic, #{
gen_id => GenId,
topic => TopicFilter,
start_time => StartTime,
streams => Streams,
gen_data => GenData
}),
[ [
{GenId, ?stream_v2(GenId, InnerStream)} {GenId, ?stream_v2(GenId, InnerStream)}
|| InnerStream <- Streams || InnerStream <- Streams

View File

@ -15,7 +15,7 @@
-define(EE_SCHEMA_MODULES, [ -define(EE_SCHEMA_MODULES, [
emqx_license_schema, emqx_license_schema,
emqx_schema_registry_schema, emqx_schema_registry_schema,
emqx_message_validation_schema, emqx_schema_validation_schema,
emqx_ft_schema emqx_ft_schema
]). ]).
@ -196,6 +196,6 @@ audit_log_conf() ->
tr_prometheus_collectors(Conf) -> tr_prometheus_collectors(Conf) ->
[ [
{'/prometheus/message_validation', emqx_prometheus_message_validation} {'/prometheus/schema_validation', emqx_prometheus_schema_validation}
| emqx_conf_schema:tr_prometheus_collectors(Conf) | emqx_conf_schema:tr_prometheus_collectors(Conf)
]. ].

View File

@ -88,7 +88,7 @@
[ [
emqx_license, emqx_license,
emqx_enterprise, emqx_enterprise,
emqx_message_validation, emqx_schema_validation,
emqx_connector_aggregator, emqx_connector_aggregator,
emqx_bridge_kafka, emqx_bridge_kafka,
emqx_bridge_pulsar, emqx_bridge_pulsar,

View File

@ -1193,7 +1193,7 @@ t_mqueue_messages(Config) ->
ClientId = atom_to_binary(?FUNCTION_NAME), ClientId = atom_to_binary(?FUNCTION_NAME),
Topic = <<"t/test_mqueue_msgs">>, Topic = <<"t/test_mqueue_msgs">>,
Count = emqx_mgmt:default_row_limit(), Count = emqx_mgmt:default_row_limit(),
{ok, _Client} = client_with_mqueue(ClientId, Topic, Count), ok = client_with_mqueue(ClientId, Topic, Count),
Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]), Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])), ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(), AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
@ -1244,14 +1244,16 @@ client_with_mqueue(ClientId, Topic, Count) ->
{ok, Client} = emqtt:start_link([ {ok, Client} = emqtt:start_link([
{proto_ver, v5}, {proto_ver, v5},
{clientid, ClientId}, {clientid, ClientId},
{clean_start, false}, {clean_start, true},
{properties, #{'Session-Expiry-Interval' => 120}} {properties, #{'Session-Expiry-Interval' => 120}}
]), ]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, Topic, 1), {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
ct:sleep(300),
ok = emqtt:disconnect(Client), ok = emqtt:disconnect(Client),
ct:sleep(100),
publish_msgs(Topic, Count), publish_msgs(Topic, Count),
{ok, Client}. ok.
client_with_inflight(ClientId, Topic, Count) -> client_with_inflight(ClientId, Topic, Count) ->
{ok, Client} = emqtt:start_link([ {ok, Client} = emqtt:start_link([
@ -1275,13 +1277,18 @@ publish_msgs(Topic, Count) ->
test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) -> test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) ->
Qs0 = io_lib:format("payload=~s", [PayloadEncoding]), Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
{ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
#{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
#{<<"start">> := StartPos, <<"position">> := Pos} = Meta,
?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)), {Msgs, StartPos, Pos} = ?retry(500, 10, begin
?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)), {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
?assertEqual(length(Msgs), Count), #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
#{<<"start">> := StartPos, <<"position">> := Pos} = Meta,
?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)),
?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)),
?assertEqual(length(Msgs), Count),
{Msgs, StartPos, Pos}
end),
lists:foreach( lists:foreach(
fun({Seq, #{<<"payload">> := P} = M}) -> fun({Seq, #{<<"payload">> := P} = M}) ->

View File

@ -269,6 +269,8 @@ t_http_test_json_formatter(_Config) ->
action_id => action_id =>
<<"action:http:emqx_bridge_http_test_lib:connector:http:emqx_bridge_http_test_lib">> <<"action:http:emqx_bridge_http_test_lib:connector:http:emqx_bridge_http_test_lib">>
}), }),
%% We should handle report style logging
?SLOG(error, #{msg => "recursive_republish_detected"}, #{topic => Topic}),
ok = emqx_trace_handler_SUITE:filesync(Name, topic), ok = emqx_trace_handler_SUITE:filesync(Name, topic),
{ok, _Detail2} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")), {ok, _Detail2} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")),
{ok, Bin} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")), {ok, Bin} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")),
@ -410,6 +412,19 @@ t_http_test_json_formatter(_Config) ->
}, },
NextFun() NextFun()
), ),
?assertMatch(
#{
<<"level">> := <<"error">>,
<<"meta">> :=
#{
<<"msg">> := <<"recursive_republish_detected">>,
<<"topic">> := <<"/x/y/z">>
},
<<"msg">> := <<"recursive_republish_detected">>,
<<"time">> := _
},
NextFun()
),
{ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))), {ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))),
?assertEqual(<<>>, Delete), ?assertEqual(<<>>, Delete),

View File

@ -1,14 +0,0 @@
{application, emqx_message_validation, [
{description, "EMQX Message Validation"},
{vsn, "0.1.0"},
{registered, [emqx_message_validation_sup, emqx_message_validation_registry]},
{mod, {emqx_message_validation_app, []}},
{applications, [
kernel,
stdlib
]},
{env, []},
{modules, []},
{links, []}
]}.

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7.1.1"}}}, {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7.1.2"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}} {emqx_resource, {path, "../../apps/emqx_resource"}}
]}. ]}.

View File

@ -22,12 +22,12 @@
-define(PROMETHEUS_AUTH_COLLECTOR, emqx_prometheus_auth). -define(PROMETHEUS_AUTH_COLLECTOR, emqx_prometheus_auth).
-define(PROMETHEUS_DATA_INTEGRATION_REGISTRY, '/prometheus/data_integration'). -define(PROMETHEUS_DATA_INTEGRATION_REGISTRY, '/prometheus/data_integration').
-define(PROMETHEUS_DATA_INTEGRATION_COLLECTOR, emqx_prometheus_data_integration). -define(PROMETHEUS_DATA_INTEGRATION_COLLECTOR, emqx_prometheus_data_integration).
-define(PROMETHEUS_MESSAGE_VALIDATION_REGISTRY, '/prometheus/message_validation'). -define(PROMETHEUS_SCHEMA_VALIDATION_REGISTRY, '/prometheus/schema_validation').
-define(PROMETHEUS_MESSAGE_VALIDATION_COLLECTOR, emqx_prometheus_message_validation). -define(PROMETHEUS_SCHEMA_VALIDATION_COLLECTOR, emqx_prometheus_schema_validation).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
-define(PROMETHEUS_EE_REGISTRIES, [ -define(PROMETHEUS_EE_REGISTRIES, [
?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY ?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY
]). ]).
%% ELSE if(?EMQX_RELEASE_EDITION == ee). %% ELSE if(?EMQX_RELEASE_EDITION == ee).
-else. -else.

View File

@ -49,7 +49,7 @@
stats/2, stats/2,
auth/2, auth/2,
data_integration/2, data_integration/2,
message_validation/2 schema_validation/2
]). ]).
-export([lookup_from_local_nodes/3]). -export([lookup_from_local_nodes/3]).
@ -73,7 +73,7 @@ paths() ->
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
paths_ee() -> paths_ee() ->
["/prometheus/message_validation"]. ["/prometheus/schema_validation"].
%% ELSE if(?EMQX_RELEASE_EDITION == ee). %% ELSE if(?EMQX_RELEASE_EDITION == ee).
-else. -else.
paths_ee() -> paths_ee() ->
@ -139,12 +139,12 @@ schema("/prometheus/data_integration") ->
#{200 => prometheus_data_schema()} #{200 => prometheus_data_schema()}
} }
}; };
schema("/prometheus/message_validation") -> schema("/prometheus/schema_validation") ->
#{ #{
'operationId' => message_validation, 'operationId' => schema_validation,
get => get =>
#{ #{
description => ?DESC(get_prom_message_validation), description => ?DESC(get_prom_schema_validation),
tags => ?TAGS, tags => ?TAGS,
parameters => [ref(mode)], parameters => [ref(mode)],
security => security(), security => security(),
@ -223,8 +223,8 @@ auth(get, #{headers := Headers, query_string := Qs}) ->
data_integration(get, #{headers := Headers, query_string := Qs}) -> data_integration(get, #{headers := Headers, query_string := Qs}) ->
collect(emqx_prometheus_data_integration, collect_opts(Headers, Qs)). collect(emqx_prometheus_data_integration, collect_opts(Headers, Qs)).
message_validation(get, #{headers := Headers, query_string := Qs}) -> schema_validation(get, #{headers := Headers, query_string := Qs}) ->
collect(emqx_prometheus_message_validation, collect_opts(Headers, Qs)). collect(emqx_prometheus_schema_validation, collect_opts(Headers, Qs)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal funcs %% Internal funcs

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_prometheus_message_validation). -module(emqx_prometheus_schema_validation).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
%% for bpapi %% for bpapi
@ -48,19 +48,19 @@
-define(MG(K, MAP), maps:get(K, MAP)). -define(MG(K, MAP), maps:get(K, MAP)).
-define(MG0(K, MAP), maps:get(K, MAP, 0)). -define(MG0(K, MAP), maps:get(K, MAP, 0)).
-define(metrics_data_key, message_validation_metrics_data). -define(metrics_data_key, schema_validation_metrics_data).
-define(key_enabled, emqx_message_validation_enable). -define(key_enabled, emqx_schema_validation_enable).
-define(key_matched, emqx_message_validation_matched). -define(key_matched, emqx_schema_validation_matched).
-define(key_failed, emqx_message_validation_failed). -define(key_failed, emqx_schema_validation_failed).
-define(key_succeeded, emqx_message_validation_succeeded). -define(key_succeeded, emqx_schema_validation_succeeded).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% `emqx_prometheus_cluster' API %% `emqx_prometheus_cluster' API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
fetch_from_local_node(Mode) -> fetch_from_local_node(Mode) ->
Validations = emqx_message_validation:list(), Validations = emqx_schema_validation:list(),
{node(), #{ {node(), #{
?metrics_data_key => to_validation_data(Mode, Validations) ?metrics_data_key => to_validation_data(Mode, Validations)
}}. }}.
@ -70,7 +70,7 @@ fetch_cluster_consistented_data() ->
aggre_or_zip_init_acc() -> aggre_or_zip_init_acc() ->
#{ #{
?metrics_data_key => maps:from_keys(message_validation_metric(names), []) ?metrics_data_key => maps:from_keys(schema_validation_metric(names), [])
}. }.
logic_sum_metrics() -> logic_sum_metrics() ->
@ -89,12 +89,12 @@ deregister_cleanup(_) -> ok.
-spec collect_mf(_Registry, Callback) -> ok when -spec collect_mf(_Registry, Callback) -> ok when
_Registry :: prometheus_registry:registry(), _Registry :: prometheus_registry:registry(),
Callback :: prometheus_collector:collect_mf_callback(). Callback :: prometheus_collector:collect_mf_callback().
collect_mf(?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY, Callback) -> collect_mf(?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY, Callback) ->
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
%% Message Validation Metrics %% Schema Validation Metrics
RuleMetricDs = ?MG(?metrics_data_key, RawData), RuleMetricDs = ?MG(?metrics_data_key, RawData),
ok = add_collect_family(Callback, message_validation_metrics_meta(), RuleMetricDs), ok = add_collect_family(Callback, schema_validation_metrics_meta(), RuleMetricDs),
ok; ok;
collect_mf(_, _) -> collect_mf(_, _) ->
@ -104,10 +104,10 @@ collect_mf(_, _) ->
collect(<<"json">>) -> collect(<<"json">>) ->
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
#{ #{
message_validations => collect_json_data(?MG(?metrics_data_key, RawData)) schema_validations => collect_json_data(?MG(?metrics_data_key, RawData))
}; };
collect(<<"prometheus">>) -> collect(<<"prometheus">>) ->
prometheus_text_format:format(?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY). prometheus_text_format:format(?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY).
%%==================== %%====================
%% API Helpers %% API Helpers
@ -128,7 +128,7 @@ collect_metrics(Name, Metrics) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%======================================== %%========================================
%% Message Validation Metrics %% Schema Validation Metrics
%%======================================== %%========================================
collect_mv(K = ?key_enabled, Data) -> gauge_metrics(?MG(K, Data)); collect_mv(K = ?key_enabled, Data) -> gauge_metrics(?MG(K, Data));
collect_mv(K = ?key_matched, Data) -> counter_metrics(?MG(K, Data)); collect_mv(K = ?key_matched, Data) -> counter_metrics(?MG(K, Data));
@ -140,10 +140,10 @@ collect_mv(K = ?key_succeeded, Data) -> counter_metrics(?MG(K, Data)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%======================================== %%========================================
%% Message Validation Metrics %% Schema Validation Metrics
%%======================================== %%========================================
message_validation_metrics_meta() -> schema_validation_metrics_meta() ->
[ [
{?key_enabled, gauge}, {?key_enabled, gauge},
{?key_matched, counter}, {?key_matched, counter},
@ -151,15 +151,15 @@ message_validation_metrics_meta() ->
{?key_succeeded, counter} {?key_succeeded, counter}
]. ].
message_validation_metric(names) -> schema_validation_metric(names) ->
emqx_prometheus_cluster:metric_names(message_validation_metrics_meta()). emqx_prometheus_cluster:metric_names(schema_validation_metrics_meta()).
to_validation_data(Mode, Validations) -> to_validation_data(Mode, Validations) ->
lists:foldl( lists:foldl(
fun(#{name := Name} = Validation, Acc) -> fun(#{name := Name} = Validation, Acc) ->
merge_acc_with_validations(Mode, Name, get_validation_metrics(Validation), Acc) merge_acc_with_validations(Mode, Name, get_validation_metrics(Validation), Acc)
end, end,
maps:from_keys(message_validation_metric(names), []), maps:from_keys(schema_validation_metric(names), []),
Validations Validations
). ).
@ -176,7 +176,7 @@ validation_point(Mode, Name, V) ->
{with_node_label(Mode, [{validation_name, Name}]), V}. {with_node_label(Mode, [{validation_name, Name}]), V}.
get_validation_metrics(#{name := Name, enable := Enabled} = _Rule) -> get_validation_metrics(#{name := Name, enable := Enabled} = _Rule) ->
#{counters := Counters} = emqx_message_validation_registry:get_metrics(Name), #{counters := Counters} = emqx_schema_validation_registry:get_metrics(Name),
#{ #{
?key_enabled => emqx_prometheus_cluster:boolean_to_number(Enabled), ?key_enabled => emqx_prometheus_cluster:boolean_to_number(Enabled),
?key_matched => ?MG0('matched', Counters), ?key_matched => ?MG0('matched', Counters),
@ -192,9 +192,9 @@ get_validation_metrics(#{name := Name, enable := Enabled} = _Rule) ->
%% merge / zip formatting funcs for type `application/json` %% merge / zip formatting funcs for type `application/json`
collect_json_data(Data) -> collect_json_data(Data) ->
emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_message_validation_metrics/3). emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_schema_validation_metrics/3).
zip_json_message_validation_metrics(Key, Points, [] = _AccIn) -> zip_json_schema_validation_metrics(Key, Points, [] = _AccIn) ->
lists:foldl( lists:foldl(
fun({Labels, Metric}, AccIn2) -> fun({Labels, Metric}, AccIn2) ->
LabelsKVMap = maps:from_list(Labels), LabelsKVMap = maps:from_list(Labels),
@ -204,7 +204,7 @@ zip_json_message_validation_metrics(Key, Points, [] = _AccIn) ->
[], [],
Points Points
); );
zip_json_message_validation_metrics(Key, Points, AllResultsAcc) -> zip_json_schema_validation_metrics(Key, Points, AllResultsAcc) ->
ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points), ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
lists:zipwith(fun maps:merge/2, AllResultsAcc, ThisKeyResult). lists:zipwith(fun maps:merge/2, AllResultsAcc, ThisKeyResult).

View File

@ -82,7 +82,7 @@ all() ->
{group, '/prometheus/stats'}, {group, '/prometheus/stats'},
{group, '/prometheus/auth'}, {group, '/prometheus/auth'},
{group, '/prometheus/data_integration'}, {group, '/prometheus/data_integration'},
[{group, '/prometheus/message_validation'} || emqx_release:edition() == ee] [{group, '/prometheus/schema_validation'} || emqx_release:edition() == ee]
]). ]).
groups() -> groups() ->
@ -100,7 +100,7 @@ groups() ->
{'/prometheus/stats', ModeGroups}, {'/prometheus/stats', ModeGroups},
{'/prometheus/auth', ModeGroups}, {'/prometheus/auth', ModeGroups},
{'/prometheus/data_integration', ModeGroups}, {'/prometheus/data_integration', ModeGroups},
{'/prometheus/message_validation', ModeGroups}, {'/prometheus/schema_validation', ModeGroups},
{?PROM_DATA_MODE__NODE, AcceptGroups}, {?PROM_DATA_MODE__NODE, AcceptGroups},
{?PROM_DATA_MODE__ALL_NODES_AGGREGATED, AcceptGroups}, {?PROM_DATA_MODE__ALL_NODES_AGGREGATED, AcceptGroups},
{?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, AcceptGroups}, {?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, AcceptGroups},
@ -133,7 +133,7 @@ init_per_suite(Config) ->
emqx_bridge_http, emqx_bridge_http,
emqx_connector, emqx_connector,
[ [
{emqx_message_validation, #{config => message_validation_config()}} {emqx_schema_validation, #{config => schema_validation_config()}}
|| emqx_release:edition() == ee || emqx_release:edition() == ee
], ],
{emqx_prometheus, emqx_prometheus_SUITE:legacy_conf_default()} {emqx_prometheus, emqx_prometheus_SUITE:legacy_conf_default()}
@ -166,8 +166,8 @@ init_per_group('/prometheus/auth', Config) ->
[{module, emqx_prometheus_auth} | Config]; [{module, emqx_prometheus_auth} | Config];
init_per_group('/prometheus/data_integration', Config) -> init_per_group('/prometheus/data_integration', Config) ->
[{module, emqx_prometheus_data_integration} | Config]; [{module, emqx_prometheus_data_integration} | Config];
init_per_group('/prometheus/message_validation', Config) -> init_per_group('/prometheus/schema_validation', Config) ->
[{module, emqx_prometheus_message_validation} | Config]; [{module, emqx_prometheus_schema_validation} | Config];
init_per_group(?PROM_DATA_MODE__NODE, Config) -> init_per_group(?PROM_DATA_MODE__NODE, Config) ->
[{mode, ?PROM_DATA_MODE__NODE} | Config]; [{mode, ?PROM_DATA_MODE__NODE} | Config];
init_per_group(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Config) -> init_per_group(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Config) ->
@ -239,7 +239,7 @@ assert_data(_Module, {Code, Header, RawDataBinary}, #{type := <<"prometheus">>,
assert_prom_data(DataL, Mode); assert_prom_data(DataL, Mode);
assert_data(Module, {Code, JsonData}, #{type := <<"json">>, mode := Mode}) -> assert_data(Module, {Code, JsonData}, #{type := <<"json">>, mode := Mode}) ->
?assertEqual(Code, 200), ?assertEqual(Code, 200),
?assert(is_map(JsonData), true), ?assertMatch(#{}, JsonData),
assert_json_data(Module, JsonData, Mode). assert_json_data(Module, JsonData, Mode).
%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%
@ -355,8 +355,8 @@ metric_meta(<<"emqx_schema_registrys_count">>) -> ?meta(0, 0, 0);
metric_meta(<<"emqx_rule_", _Tail/binary>>) -> ?meta(1, 1, 2); metric_meta(<<"emqx_rule_", _Tail/binary>>) -> ?meta(1, 1, 2);
metric_meta(<<"emqx_action_", _Tail/binary>>) -> ?meta(1, 1, 2); metric_meta(<<"emqx_action_", _Tail/binary>>) -> ?meta(1, 1, 2);
metric_meta(<<"emqx_connector_", _Tail/binary>>) -> ?meta(1, 1, 2); metric_meta(<<"emqx_connector_", _Tail/binary>>) -> ?meta(1, 1, 2);
%% `/prometheus/message_validation` %% `/prometheus/schema_validation`
metric_meta(<<"emqx_message_validation_", _Tail/binary>>) -> ?meta(1, 1, 2); metric_meta(<<"emqx_schema_validation_", _Tail/binary>>) -> ?meta(1, 1, 2);
%% normal emqx metrics %% normal emqx metrics
metric_meta(<<"emqx_", _Tail/binary>>) -> ?meta(0, 0, 1); metric_meta(<<"emqx_", _Tail/binary>>) -> ?meta(0, 0, 1);
metric_meta(_) -> #{}. metric_meta(_) -> #{}.
@ -821,16 +821,16 @@ assert_json_data__data_integration_overview(M, _) ->
). ).
-endif. -endif.
assert_json_data__message_validations(Ms, _) -> assert_json_data__schema_validations(Ms, _) ->
lists:foreach( lists:foreach(
fun(M) -> fun(M) ->
?assertMatch( ?assertMatch(
#{ #{
validation_name := _, validation_name := _,
emqx_message_validation_enable := _, emqx_schema_validation_enable := _,
emqx_message_validation_matched := _, emqx_schema_validation_matched := _,
emqx_message_validation_failed := _, emqx_schema_validation_failed := _,
emqx_message_validation_succeeded := _ emqx_schema_validation_succeeded := _
}, },
M M
) )
@ -838,7 +838,7 @@ assert_json_data__message_validations(Ms, _) ->
Ms Ms
). ).
message_validation_config() -> schema_validation_config() ->
Validation = #{ Validation = #{
<<"enable">> => true, <<"enable">> => true,
<<"name">> => <<"my_validation">>, <<"name">> => <<"my_validation">>,
@ -853,7 +853,7 @@ message_validation_config() ->
] ]
}, },
#{ #{
<<"message_validation">> => #{ <<"schema_validation">> => #{
<<"validations">> => [Validation] <<"validations">> => [Validation]
} }
}. }.

View File

@ -45,7 +45,7 @@
on_session_unsubscribed/4, on_session_unsubscribed/4,
on_message_publish/2, on_message_publish/2,
on_message_dropped/4, on_message_dropped/4,
on_message_validation_failed/3, on_schema_validation_failed/3,
on_message_delivered/3, on_message_delivered/3,
on_message_acked/3, on_message_acked/3,
on_delivery_dropped/4, on_delivery_dropped/4,
@ -80,7 +80,7 @@ event_names() ->
'message.delivered', 'message.delivered',
'message.acked', 'message.acked',
'message.dropped', 'message.dropped',
'message.validation_failed', 'schema.validation_failed',
'delivery.dropped' 'delivery.dropped'
]. ].
@ -96,7 +96,7 @@ event_topics_enum() ->
'$events/message_delivered', '$events/message_delivered',
'$events/message_acked', '$events/message_acked',
'$events/message_dropped', '$events/message_dropped',
'$events/message_validation_failed', '$events/schema_validation_failed',
'$events/delivery_dropped' '$events/delivery_dropped'
% '$events/message_publish' % not possible to use in SELECT FROM % '$events/message_publish' % not possible to use in SELECT FROM
]. ].
@ -237,13 +237,13 @@ on_message_dropped(Message, _, Reason, Conf) ->
end, end,
{ok, Message}. {ok, Message}.
on_message_validation_failed(Message, ValidationContext, Conf) -> on_schema_validation_failed(Message, ValidationContext, Conf) ->
case ignore_sys_message(Message) of case ignore_sys_message(Message) of
true -> true ->
ok; ok;
false -> false ->
apply_event( apply_event(
'message.validation_failed', 'schema.validation_failed',
fun() -> eventmsg_validation_failed(Message, ValidationContext) end, fun() -> eventmsg_validation_failed(Message, ValidationContext) end,
Conf Conf
) )
@ -550,7 +550,7 @@ eventmsg_validation_failed(
) -> ) ->
#{name := ValidationName} = ValidationContext, #{name := ValidationName} = ValidationContext,
with_basic_columns( with_basic_columns(
'message.validation_failed', 'schema.validation_failed',
#{ #{
id => emqx_guid:to_hexstr(Id), id => emqx_guid:to_hexstr(Id),
validation => ValidationName, validation => ValidationName,
@ -730,16 +730,16 @@ event_info() ->
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
%% ELSE (?EMQX_RELEASE_EDITION == ee). %% ELSE (?EMQX_RELEASE_EDITION == ee).
event_info_message_validation_failed() -> event_info_schema_validation_failed() ->
event_info_common( event_info_common(
'message.validation_failed', 'schema.validation_failed',
{<<"message validation failed">>, <<"TODO"/utf8>>}, {<<"schema validation failed">>, <<"TODO"/utf8>>},
{<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>}, {<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>},
<<"SELECT * FROM \"$events/message_validation_failed\" WHERE topic =~ 't/#'">> <<"SELECT * FROM \"$events/schema_validation_failed\" WHERE topic =~ 't/#'">>
). ).
ee_event_info() -> ee_event_info() ->
[ [
event_info_message_validation_failed() event_info_schema_validation_failed()
]. ].
-else. -else.
%% END (?EMQX_RELEASE_EDITION == ee). %% END (?EMQX_RELEASE_EDITION == ee).
@ -931,7 +931,7 @@ test_columns(Event) ->
ee_test_columns(Event). ee_test_columns(Event).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
ee_test_columns('message.validation_failed') -> ee_test_columns('schema.validation_failed') ->
[{<<"validation">>, <<"myvalidation">>}] ++ [{<<"validation">>, <<"myvalidation">>}] ++
test_columns('message.publish'). test_columns('message.publish').
%% ELSE (?EMQX_RELEASE_EDITION == ee). %% ELSE (?EMQX_RELEASE_EDITION == ee).
@ -980,9 +980,9 @@ columns_with_exam('message.dropped') ->
{<<"timestamp">>, erlang:system_time(millisecond)}, {<<"timestamp">>, erlang:system_time(millisecond)},
{<<"node">>, node()} {<<"node">>, node()}
]; ];
columns_with_exam('message.validation_failed') -> columns_with_exam('schema.validation_failed') ->
[ [
{<<"event">>, 'message.validation_failed'}, {<<"event">>, 'schema.validation_failed'},
{<<"validation">>, <<"my_validation">>}, {<<"validation">>, <<"my_validation">>},
{<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())},
{<<"clientid">>, <<"c_emqx">>}, {<<"clientid">>, <<"c_emqx">>},
@ -1200,7 +1200,7 @@ hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4;
hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3; hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3;
hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3; hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3;
hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4; hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4;
hook_fun('message.validation_failed') -> fun ?MODULE:on_message_validation_failed/3; hook_fun('schema.validation_failed') -> fun ?MODULE:on_schema_validation_failed/3;
hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4; hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4;
hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2; hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2;
hook_fun(Event) -> error({invalid_event, Event}). hook_fun(Event) -> error({invalid_event, Event}).
@ -1231,7 +1231,7 @@ event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed';
event_name(<<"$events/message_delivered">>) -> 'message.delivered'; event_name(<<"$events/message_delivered">>) -> 'message.delivered';
event_name(<<"$events/message_acked">>) -> 'message.acked'; event_name(<<"$events/message_acked">>) -> 'message.acked';
event_name(<<"$events/message_dropped">>) -> 'message.dropped'; event_name(<<"$events/message_dropped">>) -> 'message.dropped';
event_name(<<"$events/message_validation_failed">>) -> 'message.validation_failed'; event_name(<<"$events/schema_validation_failed">>) -> 'schema.validation_failed';
event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped'; event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped';
event_name(_) -> 'message.publish'. event_name(_) -> 'message.publish'.
@ -1246,7 +1246,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.delivered') -> <<"$events/message_delivered">>;
event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.acked') -> <<"$events/message_acked">>;
event_topic('message.dropped') -> <<"$events/message_dropped">>; event_topic('message.dropped') -> <<"$events/message_dropped">>;
event_topic('message.validation_failed') -> <<"$events/message_validation_failed">>; event_topic('schema.validation_failed') -> <<"$events/schema_validation_failed">>;
event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
event_topic('message.publish') -> <<"$events/message_publish">>. event_topic('message.publish') -> <<"$events/message_publish">>.

View File

@ -27,7 +27,7 @@
inc_action_metrics/2 inc_action_metrics/2
]). ]).
%% Internal exports used by message validation %% Internal exports used by schema validation
-export([evaluate_select/3, clear_rule_payload/0]). -export([evaluate_select/3, clear_rule_payload/0]).
-import( -import(

View File

@ -1,4 +1,4 @@
# EMQX Message Validation # EMQX Schema Validation
This application encapsulates the functionality to validate incoming or internally This application encapsulates the functionality to validate incoming or internally
triggered published payloads and take an action upon failure, which can be to just drop triggered published payloads and take an action upon failure, which can be to just drop
@ -7,7 +7,7 @@ the message without further processing, or to disconnect the offending client as
# Documentation # Documentation
Refer to [Message Refer to [Message
Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/message-validation.html) Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/schema-validation.html)
for more information about the semantics and checks available. for more information about the semantics and checks available.
# HTTP APIs # HTTP APIs
@ -16,7 +16,7 @@ APIs are provided for validation management, which includes creating,
updating, looking up, deleting, listing validations. updating, looking up, deleting, listing validations.
Refer to [API Docs - Refer to [API Docs -
Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Message-Validation) Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Schema-Validation)
for more detailed information. for more detailed information.

View File

@ -0,0 +1,14 @@
{application, emqx_schema_validation, [
{description, "EMQX Schema Validation"},
{vsn, "0.1.0"},
{registered, [emqx_schema_validation_sup, emqx_schema_validation_registry]},
{mod, {emqx_schema_validation_app, []}},
{applications, [
kernel,
stdlib
]},
{env, []},
{modules, []},
{links, []}
]}.

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_message_validation). -module(emqx_schema_validation).
-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("snabbkaffe/include/trace.hrl").
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
@ -47,8 +47,8 @@
%% Type declarations %% Type declarations
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-define(TRACE_TAG, "MESSAGE_VALIDATION"). -define(TRACE_TAG, "SCHEMA_VALIDATION").
-define(CONF_ROOT, message_validation). -define(CONF_ROOT, schema_validation).
-define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]). -define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]).
-type validation_name() :: binary(). -type validation_name() :: binary().
@ -72,7 +72,7 @@ load() ->
Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []), Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
lists:foreach( lists:foreach(
fun({Pos, Validation}) -> fun({Pos, Validation}) ->
ok = emqx_message_validation_registry:insert(Pos, Validation) ok = emqx_schema_validation_registry:insert(Pos, Validation)
end, end,
lists:enumerate(Validations) lists:enumerate(Validations)
). ).
@ -81,7 +81,7 @@ unload() ->
Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []), Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
lists:foreach( lists:foreach(
fun(Validation) -> fun(Validation) ->
ok = emqx_message_validation_registry:delete(Validation) ok = emqx_schema_validation_registry:delete(Validation)
end, end,
Validations Validations
). ).
@ -146,7 +146,7 @@ unregister_hooks() ->
-spec on_message_publish(emqx_types:message()) -> -spec on_message_publish(emqx_types:message()) ->
{ok, emqx_types:message()} | {stop, emqx_types:message()}. {ok, emqx_types:message()} | {stop, emqx_types:message()}.
on_message_publish(Message = #message{topic = Topic, headers = Headers}) -> on_message_publish(Message = #message{topic = Topic, headers = Headers}) ->
case emqx_message_validation_registry:matching_validations(Topic) of case emqx_schema_validation_registry:matching_validations(Topic) of
[] -> [] ->
ok; ok;
Validations -> Validations ->
@ -184,19 +184,19 @@ pre_config_update(?VALIDATIONS_CONF_PATH, {reorder, Order}, OldValidations) ->
post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) -> post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
{Pos, Validation} = fetch_with_index(New, Name), {Pos, Validation} = fetch_with_index(New, Name),
ok = emqx_message_validation_registry:insert(Pos, Validation), ok = emqx_schema_validation_registry:insert(Pos, Validation),
ok; ok;
post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) -> post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) ->
{_Pos, OldValidation} = fetch_with_index(Old, Name), {_Pos, OldValidation} = fetch_with_index(Old, Name),
{Pos, NewValidation} = fetch_with_index(New, Name), {Pos, NewValidation} = fetch_with_index(New, Name),
ok = emqx_message_validation_registry:update(OldValidation, Pos, NewValidation), ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
ok; ok;
post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) -> post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
{_Pos, Validation} = fetch_with_index(Old, Name), {_Pos, Validation} = fetch_with_index(Old, Name),
ok = emqx_message_validation_registry:delete(Validation), ok = emqx_schema_validation_registry:delete(Validation),
ok; ok;
post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) -> post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) ->
ok = emqx_message_validation_registry:reindex_positions(New), ok = emqx_schema_validation_registry:reindex_positions(New),
ok. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -395,26 +395,26 @@ run_validations(Validations, Message) ->
emqx_rule_runtime:clear_rule_payload(), emqx_rule_runtime:clear_rule_payload(),
Fun = fun(Validation, Acc) -> Fun = fun(Validation, Acc) ->
#{name := Name} = Validation, #{name := Name} = Validation,
emqx_message_validation_registry:inc_matched(Name), emqx_schema_validation_registry:inc_matched(Name),
case run_validation(Validation, Message) of case run_validation(Validation, Message) of
ok -> ok ->
emqx_message_validation_registry:inc_succeeded(Name), emqx_schema_validation_registry:inc_succeeded(Name),
{cont, Acc}; {cont, Acc};
ignore -> ignore ->
trace_failure(Validation, "validation_failed", #{ trace_failure(Validation, "validation_failed", #{
validation => Name, validation => Name,
action => ignore action => ignore
}), }),
emqx_message_validation_registry:inc_failed(Name), emqx_schema_validation_registry:inc_failed(Name),
run_message_validation_failed_hook(Message, Validation), run_schema_validation_failed_hook(Message, Validation),
{cont, Acc}; {cont, Acc};
FailureAction -> FailureAction ->
trace_failure(Validation, "validation_failed", #{ trace_failure(Validation, "validation_failed", #{
validation => Name, validation => Name,
action => FailureAction action => FailureAction
}), }),
emqx_message_validation_registry:inc_failed(Name), emqx_schema_validation_registry:inc_failed(Name),
run_message_validation_failed_hook(Message, Validation), run_schema_validation_failed_hook(Message, Validation),
{halt, FailureAction} {halt, FailureAction}
end end
end, end,
@ -457,17 +457,17 @@ trace_failure(#{log_failure := #{level := none}} = Validation, _Msg, _Meta) ->
name := _Name, name := _Name,
failure_action := _Action failure_action := _Action
} = Validation, } = Validation,
?tp(message_validation_failed, #{log_level => none, name => _Name, action => _Action}), ?tp(schema_validation_failed, #{log_level => none, name => _Name, action => _Action}),
ok; ok;
trace_failure(#{log_failure := #{level := Level}} = Validation, Msg, Meta) -> trace_failure(#{log_failure := #{level := Level}} = Validation, Msg, Meta) ->
#{ #{
name := _Name, name := _Name,
failure_action := _Action failure_action := _Action
} = Validation, } = Validation,
?tp(message_validation_failed, #{log_level => Level, name => _Name, action => _Action}), ?tp(schema_validation_failed, #{log_level => Level, name => _Name, action => _Action}),
?TRACE(Level, ?TRACE_TAG, Msg, Meta). ?TRACE(Level, ?TRACE_TAG, Msg, Meta).
run_message_validation_failed_hook(Message, Validation) -> run_schema_validation_failed_hook(Message, Validation) ->
#{name := Name} = Validation, #{name := Name} = Validation,
ValidationContext = #{name => Name}, ValidationContext = #{name => Name},
emqx_hooks:run('message.validation_failed', [Message, ValidationContext]). emqx_hooks:run('schema.validation_failed', [Message, ValidationContext]).

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_message_validation_app). -module(emqx_schema_validation_app).
-behaviour(application). -behaviour(application).
@ -18,15 +18,15 @@
-spec start(application:start_type(), term()) -> {ok, pid()}. -spec start(application:start_type(), term()) -> {ok, pid()}.
start(_Type, _Args) -> start(_Type, _Args) ->
{ok, Sup} = emqx_message_validation_sup:start_link(), {ok, Sup} = emqx_schema_validation_sup:start_link(),
ok = emqx_message_validation:add_handler(), ok = emqx_schema_validation:add_handler(),
ok = emqx_message_validation:register_hooks(), ok = emqx_schema_validation:register_hooks(),
ok = emqx_message_validation:load(), ok = emqx_schema_validation:load(),
{ok, Sup}. {ok, Sup}.
-spec stop(term()) -> ok. -spec stop(term()) -> ok.
stop(_State) -> stop(_State) ->
ok = emqx_message_validation:unload(), ok = emqx_schema_validation:unload(),
ok = emqx_message_validation:unregister_hooks(), ok = emqx_schema_validation:unregister_hooks(),
ok = emqx_message_validation:remove_handler(), ok = emqx_schema_validation:remove_handler(),
ok. ok.

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_message_validation_http_api). -module(emqx_schema_validation_http_api).
-behaviour(minirest_api). -behaviour(minirest_api).
@ -21,43 +21,43 @@
%% `minirest' handlers %% `minirest' handlers
-export([ -export([
'/message_validations'/2, '/schema_validations'/2,
'/message_validations/reorder'/2, '/schema_validations/reorder'/2,
'/message_validations/validation/:name'/2, '/schema_validations/validation/:name'/2,
'/message_validations/validation/:name/metrics'/2, '/schema_validations/validation/:name/metrics'/2,
'/message_validations/validation/:name/metrics/reset'/2, '/schema_validations/validation/:name/metrics/reset'/2,
'/message_validations/validation/:name/enable/:enable'/2 '/schema_validations/validation/:name/enable/:enable'/2
]). ]).
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% Type definitions %% Type definitions
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
-define(TAGS, [<<"Message Validation">>]). -define(TAGS, [<<"Schema Validation">>]).
-define(METRIC_NAME, message_validation). -define(METRIC_NAME, schema_validation).
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `minirest' and `minirest_trails' API %% `minirest' and `minirest_trails' API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
namespace() -> "message_validation_http_api". namespace() -> "schema_validation_http_api".
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() -> paths() ->
[ [
"/message_validations", "/schema_validations",
"/message_validations/reorder", "/schema_validations/reorder",
"/message_validations/validation/:name", "/schema_validations/validation/:name",
"/message_validations/validation/:name/metrics", "/schema_validations/validation/:name/metrics",
"/message_validations/validation/:name/metrics/reset", "/schema_validations/validation/:name/metrics/reset",
"/message_validations/validation/:name/enable/:enable" "/schema_validations/validation/:name/enable/:enable"
]. ].
schema("/message_validations") -> schema("/schema_validations") ->
#{ #{
'operationId' => '/message_validations', 'operationId' => '/schema_validations',
get => #{ get => #{
tags => ?TAGS, tags => ?TAGS,
summary => <<"List validations">>, summary => <<"List validations">>,
@ -67,7 +67,7 @@ schema("/message_validations") ->
200 => 200 =>
emqx_dashboard_swagger:schema_with_examples( emqx_dashboard_swagger:schema_with_examples(
array( array(
emqx_message_validation_schema:api_schema(list) emqx_schema_validation_schema:api_schema(list)
), ),
example_return_list() example_return_list()
) )
@ -78,14 +78,14 @@ schema("/message_validations") ->
summary => <<"Append a new validation">>, summary => <<"Append a new validation">>,
description => ?DESC("append_validation"), description => ?DESC("append_validation"),
'requestBody' => emqx_dashboard_swagger:schema_with_examples( 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
emqx_message_validation_schema:api_schema(post), emqx_schema_validation_schema:api_schema(post),
example_input_create() example_input_create()
), ),
responses => responses =>
#{ #{
201 => 201 =>
emqx_dashboard_swagger:schema_with_examples( emqx_dashboard_swagger:schema_with_examples(
emqx_message_validation_schema:api_schema(post), emqx_schema_validation_schema:api_schema(post),
example_return_create() example_return_create()
), ),
400 => error_schema('ALREADY_EXISTS', "Validation already exists") 400 => error_schema('ALREADY_EXISTS', "Validation already exists")
@ -96,14 +96,14 @@ schema("/message_validations") ->
summary => <<"Update a validation">>, summary => <<"Update a validation">>,
description => ?DESC("update_validation"), description => ?DESC("update_validation"),
'requestBody' => emqx_dashboard_swagger:schema_with_examples( 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
emqx_message_validation_schema:api_schema(put), emqx_schema_validation_schema:api_schema(put),
example_input_update() example_input_update()
), ),
responses => responses =>
#{ #{
200 => 200 =>
emqx_dashboard_swagger:schema_with_examples( emqx_dashboard_swagger:schema_with_examples(
emqx_message_validation_schema:api_schema(put), emqx_schema_validation_schema:api_schema(put),
example_return_update() example_return_update()
), ),
404 => error_schema('NOT_FOUND', "Validation not found"), 404 => error_schema('NOT_FOUND', "Validation not found"),
@ -111,9 +111,9 @@ schema("/message_validations") ->
} }
} }
}; };
schema("/message_validations/reorder") -> schema("/schema_validations/reorder") ->
#{ #{
'operationId' => '/message_validations/reorder', 'operationId' => '/schema_validations/reorder',
post => #{ post => #{
tags => ?TAGS, tags => ?TAGS,
summary => <<"Reorder all validations">>, summary => <<"Reorder all validations">>,
@ -140,9 +140,9 @@ schema("/message_validations/reorder") ->
} }
} }
}; };
schema("/message_validations/validation/:name") -> schema("/schema_validations/validation/:name") ->
#{ #{
'operationId' => '/message_validations/validation/:name', 'operationId' => '/schema_validations/validation/:name',
get => #{ get => #{
tags => ?TAGS, tags => ?TAGS,
summary => <<"Lookup a validation">>, summary => <<"Lookup a validation">>,
@ -153,7 +153,7 @@ schema("/message_validations/validation/:name") ->
200 => 200 =>
emqx_dashboard_swagger:schema_with_examples( emqx_dashboard_swagger:schema_with_examples(
array( array(
emqx_message_validation_schema:api_schema(lookup) emqx_schema_validation_schema:api_schema(lookup)
), ),
example_return_lookup() example_return_lookup()
), ),
@ -172,9 +172,9 @@ schema("/message_validations/validation/:name") ->
} }
} }
}; };
schema("/message_validations/validation/:name/metrics") -> schema("/schema_validations/validation/:name/metrics") ->
#{ #{
'operationId' => '/message_validations/validation/:name/metrics', 'operationId' => '/schema_validations/validation/:name/metrics',
get => #{ get => #{
tags => ?TAGS, tags => ?TAGS,
summary => <<"Get validation metrics">>, summary => <<"Get validation metrics">>,
@ -191,9 +191,9 @@ schema("/message_validations/validation/:name/metrics") ->
} }
} }
}; };
schema("/message_validations/validation/:name/metrics/reset") -> schema("/schema_validations/validation/:name/metrics/reset") ->
#{ #{
'operationId' => '/message_validations/validation/:name/metrics/reset', 'operationId' => '/schema_validations/validation/:name/metrics/reset',
post => #{ post => #{
tags => ?TAGS, tags => ?TAGS,
summary => <<"Reset validation metrics">>, summary => <<"Reset validation metrics">>,
@ -206,9 +206,9 @@ schema("/message_validations/validation/:name/metrics/reset") ->
} }
} }
}; };
schema("/message_validations/validation/:name/enable/:enable") -> schema("/schema_validations/validation/:name/enable/:enable") ->
#{ #{
'operationId' => '/message_validations/validation/:name/enable/:enable', 'operationId' => '/schema_validations/validation/:name/enable/:enable',
post => #{ post => #{
tags => ?TAGS, tags => ?TAGS,
summary => <<"Enable or disable validation">>, summary => <<"Enable or disable validation">>,
@ -285,29 +285,29 @@ fields(node_metrics) ->
%% `minirest' handlers %% `minirest' handlers
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
'/message_validations'(get, _Params) -> '/schema_validations'(get, _Params) ->
?OK(emqx_message_validation:list()); ?OK(emqx_schema_validation:list());
'/message_validations'(post, #{body := Params = #{<<"name">> := Name}}) -> '/schema_validations'(post, #{body := Params = #{<<"name">> := Name}}) ->
with_validation( with_validation(
Name, Name,
return(?BAD_REQUEST('ALREADY_EXISTS', <<"Validation already exists">>)), return(?BAD_REQUEST('ALREADY_EXISTS', <<"Validation already exists">>)),
fun() -> fun() ->
case emqx_message_validation:insert(Params) of case emqx_schema_validation:insert(Params) of
{ok, _} -> {ok, _} ->
{ok, Res} = emqx_message_validation:lookup(Name), {ok, Res} = emqx_schema_validation:lookup(Name),
{201, Res}; {201, Res};
{error, Error} -> {error, Error} ->
?BAD_REQUEST(Error) ?BAD_REQUEST(Error)
end end
end end
); );
'/message_validations'(put, #{body := Params = #{<<"name">> := Name}}) -> '/schema_validations'(put, #{body := Params = #{<<"name">> := Name}}) ->
with_validation( with_validation(
Name, Name,
fun() -> fun() ->
case emqx_message_validation:update(Params) of case emqx_schema_validation:update(Params) of
{ok, _} -> {ok, _} ->
{ok, Res} = emqx_message_validation:lookup(Name), {ok, Res} = emqx_schema_validation:lookup(Name),
{200, Res}; {200, Res};
{error, Error} -> {error, Error} ->
?BAD_REQUEST(Error) ?BAD_REQUEST(Error)
@ -316,17 +316,17 @@ fields(node_metrics) ->
not_found() not_found()
). ).
'/message_validations/validation/:name'(get, #{bindings := #{name := Name}}) -> '/schema_validations/validation/:name'(get, #{bindings := #{name := Name}}) ->
with_validation( with_validation(
Name, Name,
fun(Validation) -> ?OK(Validation) end, fun(Validation) -> ?OK(Validation) end,
not_found() not_found()
); );
'/message_validations/validation/:name'(delete, #{bindings := #{name := Name}}) -> '/schema_validations/validation/:name'(delete, #{bindings := #{name := Name}}) ->
with_validation( with_validation(
Name, Name,
fun() -> fun() ->
case emqx_message_validation:delete(Name) of case emqx_schema_validation:delete(Name) of
{ok, _} -> {ok, _} ->
?NO_CONTENT; ?NO_CONTENT;
{error, Error} -> {error, Error} ->
@ -336,10 +336,10 @@ fields(node_metrics) ->
not_found() not_found()
). ).
'/message_validations/reorder'(post, #{body := #{<<"order">> := Order}}) -> '/schema_validations/reorder'(post, #{body := #{<<"order">> := Order}}) ->
do_reorder(Order). do_reorder(Order).
'/message_validations/validation/:name/enable/:enable'(post, #{ '/schema_validations/validation/:name/enable/:enable'(post, #{
bindings := #{name := Name, enable := Enable} bindings := #{name := Name, enable := Enable}
}) -> }) ->
with_validation( with_validation(
@ -348,7 +348,7 @@ fields(node_metrics) ->
not_found() not_found()
). ).
'/message_validations/validation/:name/metrics'(get, #{bindings := #{name := Name}}) -> '/schema_validations/validation/:name/metrics'(get, #{bindings := #{name := Name}}) ->
with_validation( with_validation(
Name, Name,
fun() -> fun() ->
@ -371,7 +371,7 @@ fields(node_metrics) ->
not_found() not_found()
). ).
'/message_validations/validation/:name/metrics/reset'(post, #{bindings := #{name := Name}}) -> '/schema_validations/validation/:name/metrics/reset'(post, #{bindings := #{name := Name}}) ->
with_validation( with_validation(
Name, Name,
fun() -> fun() ->
@ -516,7 +516,7 @@ error_schema(Codes, Message, ExtraFields) when is_list(Codes) andalso is_binary(
ExtraFields ++ emqx_dashboard_swagger:error_codes(Codes, Message). ExtraFields ++ emqx_dashboard_swagger:error_codes(Codes, Message).
do_reorder(Order) -> do_reorder(Order) ->
case emqx_message_validation:reorder(Order) of case emqx_schema_validation:reorder(Order) of
{ok, _} -> {ok, _} ->
?NO_CONTENT; ?NO_CONTENT;
{error, {error,
@ -538,7 +538,7 @@ do_reorder(Order) ->
do_enable_disable(Validation, Enable) -> do_enable_disable(Validation, Enable) ->
RawValidation = make_serializable(Validation), RawValidation = make_serializable(Validation),
case emqx_message_validation:update(RawValidation#{<<"enable">> => Enable}) of case emqx_schema_validation:update(RawValidation#{<<"enable">> => Enable}) of
{ok, _} -> {ok, _} ->
?NO_CONTENT; ?NO_CONTENT;
{error, Reason} -> {error, Reason} ->
@ -546,7 +546,7 @@ do_enable_disable(Validation, Enable) ->
end. end.
with_validation(Name, FoundFn, NotFoundFn) -> with_validation(Name, FoundFn, NotFoundFn) ->
case emqx_message_validation:lookup(Name) of case emqx_schema_validation:lookup(Name) of
{ok, Validation} -> {ok, Validation} ->
{arity, Arity} = erlang:fun_info(FoundFn, arity), {arity, Arity} = erlang:fun_info(FoundFn, arity),
case Arity of case Arity of
@ -564,15 +564,15 @@ not_found() ->
return(?NOT_FOUND(<<"Validation not found">>)). return(?NOT_FOUND(<<"Validation not found">>)).
make_serializable(Validation) -> make_serializable(Validation) ->
Schema = emqx_message_validation_schema, Schema = emqx_schema_validation_schema,
RawConfig = #{ RawConfig = #{
<<"message_validation">> => #{ <<"schema_validation">> => #{
<<"validations">> => <<"validations">> =>
[emqx_utils_maps:binary_key_map(Validation)] [emqx_utils_maps:binary_key_map(Validation)]
} }
}, },
#{ #{
<<"message_validation">> := #{ <<"schema_validation">> := #{
<<"validations">> := <<"validations">> :=
[Serialized] [Serialized]
} }

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_message_validation_registry). -module(emqx_schema_validation_registry).
-behaviour(gen_server). -behaviour(gen_server).
@ -36,10 +36,10 @@
%% Type declarations %% Type declarations
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-define(VALIDATION_TOPIC_INDEX, emqx_message_validation_index). -define(VALIDATION_TOPIC_INDEX, emqx_schema_validation_index).
-define(VALIDATION_TAB, emqx_message_validation_tab). -define(VALIDATION_TAB, emqx_schema_validation_tab).
-define(METRIC_NAME, message_validation). -define(METRIC_NAME, schema_validation).
-define(METRICS, [ -define(METRICS, [
'matched', 'matched',
'succeeded', 'succeeded',
@ -106,7 +106,7 @@ matching_validations(Topic) ->
-spec metrics_worker_spec() -> supervisor:child_spec(). -spec metrics_worker_spec() -> supervisor:child_spec().
metrics_worker_spec() -> metrics_worker_spec() ->
emqx_metrics_worker:child_spec(message_validation_metrics, ?METRIC_NAME). emqx_metrics_worker:child_spec(schema_validation_metrics, ?METRIC_NAME).
-spec get_metrics(validation_name()) -> emqx_metrics_worker:metrics(). -spec get_metrics(validation_name()) -> emqx_metrics_worker:metrics().
get_metrics(Name) -> get_metrics(Name) ->
@ -243,7 +243,7 @@ transform_validation(Validation = #{checks := Checks}) ->
Validation#{checks := lists:map(fun transform_check/1, Checks)}. Validation#{checks := lists:map(fun transform_check/1, Checks)}.
transform_check(#{type := sql, sql := SQL}) -> transform_check(#{type := sql, sql := SQL}) ->
{ok, Check} = emqx_message_validation:parse_sql_check(SQL), {ok, Check} = emqx_schema_validation:parse_sql_check(SQL),
Check; Check;
transform_check(Check) -> transform_check(Check) ->
Check. Check.

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_message_validation_schema). -module(emqx_schema_validation_schema).
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
@ -26,12 +26,12 @@
%% `hocon_schema' API %% `hocon_schema' API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
namespace() -> message_validation. namespace() -> schema_validation.
roots() -> roots() ->
[{message_validation, mk(ref(message_validation), #{importance => ?IMPORTANCE_HIDDEN})}]. [{schema_validation, mk(ref(schema_validation), #{importance => ?IMPORTANCE_HIDDEN})}].
fields(message_validation) -> fields(schema_validation) ->
[ [
{validations, {validations,
mk( mk(
@ -199,7 +199,7 @@ ensure_array(L, _) when is_list(L) -> L;
ensure_array(B, _) -> [B]. ensure_array(B, _) -> [B].
validate_sql(SQL) -> validate_sql(SQL) ->
case emqx_message_validation:parse_sql_check(SQL) of case emqx_schema_validation:parse_sql_check(SQL) of
{ok, _Parsed} -> {ok, _Parsed} ->
ok; ok;
Error = {error, _} -> Error = {error, _} ->

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_message_validation_sup). -module(emqx_schema_validation_sup).
-behaviour(supervisor). -behaviour(supervisor).
@ -23,8 +23,8 @@ start_link() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
init([]) -> init([]) ->
Registry = worker_spec(emqx_message_validation_registry), Registry = worker_spec(emqx_schema_validation_registry),
Metrics = emqx_message_validation_registry:metrics_worker_spec(), Metrics = emqx_schema_validation_registry:metrics_worker_spec(),
SupFlags = #{ SupFlags = #{
strategy => one_for_one, strategy => one_for_one,
intensity => 10, intensity => 10,

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_message_validation_http_api_SUITE). -module(emqx_schema_validation_http_api_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
@ -31,7 +31,7 @@ init_per_suite(Config) ->
emqx, emqx,
emqx_conf, emqx_conf,
emqx_rule_engine, emqx_rule_engine,
emqx_message_validation, emqx_schema_validation,
emqx_management, emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard(), emqx_mgmt_api_test_util:emqx_dashboard(),
emqx_schema_registry emqx_schema_registry
@ -66,9 +66,9 @@ end_per_testcase(_TestCase, _Config) ->
clear_all_validations() -> clear_all_validations() ->
lists:foreach( lists:foreach(
fun(#{name := Name}) -> fun(#{name := Name}) ->
{ok, _} = emqx_message_validation:delete(Name) {ok, _} = emqx_schema_validation:delete(Name)
end, end,
emqx_message_validation:list() emqx_schema_validation:list()
). ).
reset_all_global_metrics() -> reset_all_global_metrics() ->
@ -146,7 +146,7 @@ schema_check(Type, SerdeName, Overrides) ->
Overrides Overrides
). ).
api_root() -> "message_validations". api_root() -> "schema_validations".
simplify_result(Res) -> simplify_result(Res) ->
case Res of case Res of
@ -358,7 +358,7 @@ assert_index_order(ExpectedOrder, Topic, Comment) ->
ExpectedOrder, ExpectedOrder,
[ [
N N
|| #{name := N} <- emqx_message_validation_registry:matching_validations(Topic) || #{name := N} <- emqx_schema_validation_registry:matching_validations(Topic)
], ],
Comment Comment
). ).
@ -366,7 +366,7 @@ assert_index_order(ExpectedOrder, Topic, Comment) ->
create_failure_tracing_rule() -> create_failure_tracing_rule() ->
Params = #{ Params = #{
enable => true, enable => true,
sql => <<"select * from \"$events/message_validation_failed\" ">>, sql => <<"select * from \"$events/schema_validation_failed\" ">>,
actions => [make_trace_fn_action()] actions => [make_trace_fn_action()]
}, },
Path = emqx_mgmt_api_test_util:api_path(["rules"]), Path = emqx_mgmt_api_test_util:api_path(["rules"]),
@ -689,7 +689,7 @@ t_log_failure_none(_Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
?assertMatch([#{log_level := none}], ?of_kind(message_validation_failed, Trace)), ?assertMatch([#{log_level := none}], ?of_kind(schema_validation_failed, Trace)),
ok ok
end end
), ),
@ -719,12 +719,12 @@ t_action_ignore(_Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
?assertMatch([#{action := ignore}], ?of_kind(message_validation_failed, Trace)), ?assertMatch([#{action := ignore}], ?of_kind(schema_validation_failed, Trace)),
ok ok
end end
), ),
?assertMatch( ?assertMatch(
[{_, #{data := #{validation := Name1, event := 'message.validation_failed'}}}], [{_, #{data := #{validation := Name1, event := 'schema.validation_failed'}}}],
get_traced_failures_from_rule_engine() get_traced_failures_from_rule_engine()
), ),
ok. ok.
@ -1093,8 +1093,8 @@ t_multiple_validations(_Config) ->
?assertMatch( ?assertMatch(
[ [
{_, #{data := #{validation := Name1, event := 'message.validation_failed'}}}, {_, #{data := #{validation := Name1, event := 'schema.validation_failed'}}},
{_, #{data := #{validation := Name2, event := 'message.validation_failed'}}} {_, #{data := #{validation := Name2, event := 'schema.validation_failed'}}}
], ],
get_traced_failures_from_rule_engine() get_traced_failures_from_rule_engine()
), ),

View File

@ -1,22 +1,22 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_message_validation_tests). -module(emqx_schema_validation_tests).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(VALIDATIONS_PATH, "message_validation.validations"). -define(VALIDATIONS_PATH, "schema_validation.validations").
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper fns %% Helper fns
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
parse_and_check(InnerConfigs) -> parse_and_check(InnerConfigs) ->
RootBin = <<"message_validation">>, RootBin = <<"schema_validation">>,
InnerBin = <<"validations">>, InnerBin = <<"validations">>,
RawConf = #{RootBin => #{InnerBin => InnerConfigs}}, RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
#{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain( #{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
emqx_message_validation_schema, emqx_schema_validation_schema,
RawConf, RawConf,
#{ #{
required => false, required => false,
@ -65,9 +65,9 @@ schema_check(Type, SerdeName, Overrides) ->
). ).
eval_sql(Message, SQL) -> eval_sql(Message, SQL) ->
{ok, Check} = emqx_message_validation:parse_sql_check(SQL), {ok, Check} = emqx_schema_validation:parse_sql_check(SQL),
Validation = #{log_failure => #{level => warning}, name => <<"validation">>}, Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
emqx_message_validation:evaluate_sql_check(Check, Validation, Message). emqx_schema_validation:evaluate_sql_check(Check, Validation, Message).
message() -> message() ->
message(_Opts = #{}). message(_Opts = #{}).
@ -196,7 +196,7 @@ invalid_names_test_() ->
{_Schema, [ {_Schema, [
#{ #{
kind := validation_error, kind := validation_error,
path := "message_validation.validations.1.name" path := "schema_validation.validations.1.name"
} }
]}, ]},
parse_and_check([validation(InvalidName, [sql_check()])]) parse_and_check([validation(InvalidName, [sql_check()])])
@ -239,7 +239,7 @@ duplicated_check_test_() ->
#{ #{
reason := <<"duplicated topics: t/1">>, reason := <<"duplicated topics: t/1">>,
kind := validation_error, kind := validation_error,
path := "message_validation.validations.1.topics" path := "schema_validation.validations.1.topics"
} }
]}, ]},
parse_and_check([ parse_and_check([
@ -256,7 +256,7 @@ duplicated_check_test_() ->
#{ #{
reason := <<"duplicated topics: t/1">>, reason := <<"duplicated topics: t/1">>,
kind := validation_error, kind := validation_error,
path := "message_validation.validations.1.topics" path := "schema_validation.validations.1.topics"
} }
]}, ]},
parse_and_check([ parse_and_check([
@ -273,7 +273,7 @@ duplicated_check_test_() ->
#{ #{
reason := <<"duplicated topics: t/1, t/2">>, reason := <<"duplicated topics: t/1, t/2">>,
kind := validation_error, kind := validation_error,
path := "message_validation.validations.1.topics" path := "schema_validation.validations.1.topics"
} }
]}, ]},
parse_and_check([ parse_and_check([
@ -320,7 +320,7 @@ duplicated_check_test_() ->
#{ #{
reason := <<"duplicated schema checks: json:a">>, reason := <<"duplicated schema checks: json:a">>,
kind := validation_error, kind := validation_error,
path := "message_validation.validations.1.checks" path := "schema_validation.validations.1.checks"
} }
]}, ]},
parse_and_check([ parse_and_check([
@ -336,7 +336,7 @@ duplicated_check_test_() ->
#{ #{
reason := <<"duplicated schema checks: json:a">>, reason := <<"duplicated schema checks: json:a">>,
kind := validation_error, kind := validation_error,
path := "message_validation.validations.1.checks" path := "schema_validation.validations.1.checks"
} }
]}, ]},
parse_and_check([ parse_and_check([
@ -353,7 +353,7 @@ duplicated_check_test_() ->
#{ #{
reason := <<"duplicated schema checks: json:a">>, reason := <<"duplicated schema checks: json:a">>,
kind := validation_error, kind := validation_error,
path := "message_validation.validations.1.checks" path := "schema_validation.validations.1.checks"
} }
]}, ]},
parse_and_check([ parse_and_check([
@ -370,7 +370,7 @@ duplicated_check_test_() ->
#{ #{
reason := <<"duplicated schema checks: json:a">>, reason := <<"duplicated schema checks: json:a">>,
kind := validation_error, kind := validation_error,
path := "message_validation.validations.1.checks" path := "schema_validation.validations.1.checks"
} }
]}, ]},
parse_and_check([ parse_and_check([
@ -387,7 +387,7 @@ duplicated_check_test_() ->
#{ #{
reason := <<"duplicated schema checks: ", _/binary>>, reason := <<"duplicated schema checks: ", _/binary>>,
kind := validation_error, kind := validation_error,
path := "message_validation.validations.1.checks" path := "schema_validation.validations.1.checks"
} }
]}, ]},
parse_and_check([ parse_and_check([

View File

@ -0,0 +1 @@
Reduced log spamming when connection goes down in a Postgres/Timescale/Matrix connector.

View File

@ -79,7 +79,7 @@ defmodule EMQXUmbrella.MixProject do
# in conflict by ehttpc and emqtt # in conflict by ehttpc and emqtt
{:gun, github: "emqx/gun", tag: "1.3.11", override: true}, {:gun, github: "emqx/gun", tag: "1.3.11", override: true},
# in conflict by emqx_connector and system_monitor # in conflict by emqx_connector and system_monitor
{:epgsql, github: "emqx/epgsql", tag: "4.7.1.1", override: true}, {:epgsql, github: "emqx/epgsql", tag: "4.7.1.2", override: true},
# in conflict by emqx and observer_cli # in conflict by emqx and observer_cli
{:recon, github: "ferd/recon", tag: "2.5.1", override: true}, {:recon, github: "ferd/recon", tag: "2.5.1", override: true},
{:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true}, {:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true},
@ -189,7 +189,7 @@ defmodule EMQXUmbrella.MixProject do
:emqx_s3, :emqx_s3,
:emqx_bridge_s3, :emqx_bridge_s3,
:emqx_schema_registry, :emqx_schema_registry,
:emqx_message_validation, :emqx_schema_validation,
:emqx_enterprise, :emqx_enterprise,
:emqx_bridge_kinesis, :emqx_bridge_kinesis,
:emqx_bridge_azure_event_hub, :emqx_bridge_azure_event_hub,

View File

@ -116,7 +116,7 @@ is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false;
is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false; is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false;
is_community_umbrella_app("apps/emqx_gateway_jt808") -> false; is_community_umbrella_app("apps/emqx_gateway_jt808") -> false;
is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false; is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false;
is_community_umbrella_app("apps/emqx_message_validation") -> false; is_community_umbrella_app("apps/emqx_schema_validation") -> false;
is_community_umbrella_app("apps/emqx_eviction_agent") -> false; is_community_umbrella_app("apps/emqx_eviction_agent") -> false;
is_community_umbrella_app("apps/emqx_node_rebalance") -> false; is_community_umbrella_app("apps/emqx_node_rebalance") -> false;
is_community_umbrella_app(_) -> true. is_community_umbrella_app(_) -> true.

View File

@ -25,9 +25,9 @@ get_prom_data_integration_data.desc:
get_prom_data_integration_data.label: get_prom_data_integration_data.label:
"""Prometheus Metrics for Data Integration""" """Prometheus Metrics for Data Integration"""
get_prom_message_validation.desc: get_prom_schema_validation.desc:
"""Get Prometheus Metrics for Message Validation""" """Get Prometheus Metrics for Schema Validation"""
get_prom_message_validation.label: get_prom_schema_validation.label:
"""Prometheus Metrics for Message Validation""" """Prometheus Metrics for Schema Validation"""
} }

View File

@ -1,4 +1,4 @@
emqx_message_validation_http_api { emqx_schema_validation_http_api {
list_validations.desc: list_validations.desc:
"""List validations""" """List validations"""

View File

@ -1,4 +1,4 @@
emqx_message_validation_schema { emqx_schema_validation_schema {
check_avro_type.desc: check_avro_type.desc:
"""Avro schema check""" """Avro schema check"""