Merge pull request #13046 from thalesmg/refactor-mv-rename-sv-r57-20240514
refactor: rename `message_validation` to `schema_validation`
This commit is contained in:
commit
1730a41337
|
@ -59,7 +59,7 @@
|
|||
'message.publish',
|
||||
'message.puback',
|
||||
'message.dropped',
|
||||
'message.validation_failed',
|
||||
'schema.validation_failed',
|
||||
'message.delivered',
|
||||
'message.acked',
|
||||
'delivery.dropped',
|
||||
|
@ -183,7 +183,7 @@ when
|
|||
-callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) ->
|
||||
callback_result().
|
||||
|
||||
-callback 'message.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) ->
|
||||
-callback 'schema.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) ->
|
||||
callback_result().
|
||||
|
||||
-callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when
|
||||
|
|
|
@ -383,8 +383,8 @@ default_appspec(emqx_dashboard, _SuiteOpts) ->
|
|||
};
|
||||
default_appspec(emqx_schema_registry, _SuiteOpts) ->
|
||||
#{schema_mod => emqx_schema_registry_schema, config => #{}};
|
||||
default_appspec(emqx_message_validation, _SuiteOpts) ->
|
||||
#{schema_mod => emqx_message_validation_schema, config => #{}};
|
||||
default_appspec(emqx_schema_validation, _SuiteOpts) ->
|
||||
#{schema_mod => emqx_schema_validation_schema, config => #{}};
|
||||
default_appspec(_, _) ->
|
||||
#{}.
|
||||
|
||||
|
|
|
@ -189,9 +189,9 @@ swagger_desc(sent_bytes) ->
|
|||
swagger_desc(dropped) ->
|
||||
swagger_desc_format("Dropped messages ");
|
||||
swagger_desc(validation_succeeded) ->
|
||||
swagger_desc_format("Message validations succeeded ");
|
||||
swagger_desc_format("Schema validations succeeded ");
|
||||
swagger_desc(validation_failed) ->
|
||||
swagger_desc_format("Message validations failed ");
|
||||
swagger_desc_format("Schema validations failed ");
|
||||
swagger_desc(persisted) ->
|
||||
swagger_desc_format("Messages saved to the durable storage ");
|
||||
swagger_desc(subscriptions) ->
|
||||
|
@ -217,9 +217,9 @@ swagger_desc(sent_msg_rate) ->
|
|||
swagger_desc(dropped_msg_rate) ->
|
||||
swagger_desc_format("Dropped messages ", per);
|
||||
swagger_desc(validation_succeeded_rate) ->
|
||||
swagger_desc_format("Message validations succeeded ", per);
|
||||
swagger_desc_format("Schema validations succeeded ", per);
|
||||
swagger_desc(validation_failed_rate) ->
|
||||
swagger_desc_format("Message validations failed ", per);
|
||||
swagger_desc_format("Schema validations failed ", per);
|
||||
swagger_desc(persisted_rate) ->
|
||||
swagger_desc_format("Messages saved to the durable storage ", per);
|
||||
swagger_desc(retained_msg_count) ->
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
-define(EE_SCHEMA_MODULES, [
|
||||
emqx_license_schema,
|
||||
emqx_schema_registry_schema,
|
||||
emqx_message_validation_schema,
|
||||
emqx_schema_validation_schema,
|
||||
emqx_ft_schema
|
||||
]).
|
||||
|
||||
|
@ -196,6 +196,6 @@ audit_log_conf() ->
|
|||
|
||||
tr_prometheus_collectors(Conf) ->
|
||||
[
|
||||
{'/prometheus/message_validation', emqx_prometheus_message_validation}
|
||||
{'/prometheus/schema_validation', emqx_prometheus_schema_validation}
|
||||
| emqx_conf_schema:tr_prometheus_collectors(Conf)
|
||||
].
|
||||
|
|
|
@ -88,7 +88,7 @@
|
|||
[
|
||||
emqx_license,
|
||||
emqx_enterprise,
|
||||
emqx_message_validation,
|
||||
emqx_schema_validation,
|
||||
emqx_connector_aggregator,
|
||||
emqx_bridge_kafka,
|
||||
emqx_bridge_pulsar,
|
||||
|
|
|
@ -1,14 +0,0 @@
|
|||
{application, emqx_message_validation, [
|
||||
{description, "EMQX Message Validation"},
|
||||
{vsn, "0.1.0"},
|
||||
{registered, [emqx_message_validation_sup, emqx_message_validation_registry]},
|
||||
{mod, {emqx_message_validation_app, []}},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
|
||||
{links, []}
|
||||
]}.
|
|
@ -22,12 +22,12 @@
|
|||
-define(PROMETHEUS_AUTH_COLLECTOR, emqx_prometheus_auth).
|
||||
-define(PROMETHEUS_DATA_INTEGRATION_REGISTRY, '/prometheus/data_integration').
|
||||
-define(PROMETHEUS_DATA_INTEGRATION_COLLECTOR, emqx_prometheus_data_integration).
|
||||
-define(PROMETHEUS_MESSAGE_VALIDATION_REGISTRY, '/prometheus/message_validation').
|
||||
-define(PROMETHEUS_MESSAGE_VALIDATION_COLLECTOR, emqx_prometheus_message_validation).
|
||||
-define(PROMETHEUS_SCHEMA_VALIDATION_REGISTRY, '/prometheus/schema_validation').
|
||||
-define(PROMETHEUS_SCHEMA_VALIDATION_COLLECTOR, emqx_prometheus_schema_validation).
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
-define(PROMETHEUS_EE_REGISTRIES, [
|
||||
?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY
|
||||
?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY
|
||||
]).
|
||||
%% ELSE if(?EMQX_RELEASE_EDITION == ee).
|
||||
-else.
|
||||
|
|
|
@ -49,7 +49,7 @@
|
|||
stats/2,
|
||||
auth/2,
|
||||
data_integration/2,
|
||||
message_validation/2
|
||||
schema_validation/2
|
||||
]).
|
||||
|
||||
-export([lookup_from_local_nodes/3]).
|
||||
|
@ -73,7 +73,7 @@ paths() ->
|
|||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
paths_ee() ->
|
||||
["/prometheus/message_validation"].
|
||||
["/prometheus/schema_validation"].
|
||||
%% ELSE if(?EMQX_RELEASE_EDITION == ee).
|
||||
-else.
|
||||
paths_ee() ->
|
||||
|
@ -139,12 +139,12 @@ schema("/prometheus/data_integration") ->
|
|||
#{200 => prometheus_data_schema()}
|
||||
}
|
||||
};
|
||||
schema("/prometheus/message_validation") ->
|
||||
schema("/prometheus/schema_validation") ->
|
||||
#{
|
||||
'operationId' => message_validation,
|
||||
'operationId' => schema_validation,
|
||||
get =>
|
||||
#{
|
||||
description => ?DESC(get_prom_message_validation),
|
||||
description => ?DESC(get_prom_schema_validation),
|
||||
tags => ?TAGS,
|
||||
parameters => [ref(mode)],
|
||||
security => security(),
|
||||
|
@ -223,8 +223,8 @@ auth(get, #{headers := Headers, query_string := Qs}) ->
|
|||
data_integration(get, #{headers := Headers, query_string := Qs}) ->
|
||||
collect(emqx_prometheus_data_integration, collect_opts(Headers, Qs)).
|
||||
|
||||
message_validation(get, #{headers := Headers, query_string := Qs}) ->
|
||||
collect(emqx_prometheus_message_validation, collect_opts(Headers, Qs)).
|
||||
schema_validation(get, #{headers := Headers, query_string := Qs}) ->
|
||||
collect(emqx_prometheus_schema_validation, collect_opts(Headers, Qs)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal funcs
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_prometheus_message_validation).
|
||||
-module(emqx_prometheus_schema_validation).
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
%% for bpapi
|
||||
|
@ -48,19 +48,19 @@
|
|||
-define(MG(K, MAP), maps:get(K, MAP)).
|
||||
-define(MG0(K, MAP), maps:get(K, MAP, 0)).
|
||||
|
||||
-define(metrics_data_key, message_validation_metrics_data).
|
||||
-define(metrics_data_key, schema_validation_metrics_data).
|
||||
|
||||
-define(key_enabled, emqx_message_validation_enable).
|
||||
-define(key_matched, emqx_message_validation_matched).
|
||||
-define(key_failed, emqx_message_validation_failed).
|
||||
-define(key_succeeded, emqx_message_validation_succeeded).
|
||||
-define(key_enabled, emqx_schema_validation_enable).
|
||||
-define(key_matched, emqx_schema_validation_matched).
|
||||
-define(key_failed, emqx_schema_validation_failed).
|
||||
-define(key_succeeded, emqx_schema_validation_succeeded).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% `emqx_prometheus_cluster' API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
fetch_from_local_node(Mode) ->
|
||||
Validations = emqx_message_validation:list(),
|
||||
Validations = emqx_schema_validation:list(),
|
||||
{node(), #{
|
||||
?metrics_data_key => to_validation_data(Mode, Validations)
|
||||
}}.
|
||||
|
@ -70,7 +70,7 @@ fetch_cluster_consistented_data() ->
|
|||
|
||||
aggre_or_zip_init_acc() ->
|
||||
#{
|
||||
?metrics_data_key => maps:from_keys(message_validation_metric(names), [])
|
||||
?metrics_data_key => maps:from_keys(schema_validation_metric(names), [])
|
||||
}.
|
||||
|
||||
logic_sum_metrics() ->
|
||||
|
@ -89,12 +89,12 @@ deregister_cleanup(_) -> ok.
|
|||
-spec collect_mf(_Registry, Callback) -> ok when
|
||||
_Registry :: prometheus_registry:registry(),
|
||||
Callback :: prometheus_collector:collect_mf_callback().
|
||||
collect_mf(?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY, Callback) ->
|
||||
collect_mf(?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY, Callback) ->
|
||||
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
||||
|
||||
%% Message Validation Metrics
|
||||
%% Schema Validation Metrics
|
||||
RuleMetricDs = ?MG(?metrics_data_key, RawData),
|
||||
ok = add_collect_family(Callback, message_validation_metrics_meta(), RuleMetricDs),
|
||||
ok = add_collect_family(Callback, schema_validation_metrics_meta(), RuleMetricDs),
|
||||
|
||||
ok;
|
||||
collect_mf(_, _) ->
|
||||
|
@ -104,10 +104,10 @@ collect_mf(_, _) ->
|
|||
collect(<<"json">>) ->
|
||||
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
||||
#{
|
||||
message_validations => collect_json_data(?MG(?metrics_data_key, RawData))
|
||||
schema_validations => collect_json_data(?MG(?metrics_data_key, RawData))
|
||||
};
|
||||
collect(<<"prometheus">>) ->
|
||||
prometheus_text_format:format(?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY).
|
||||
prometheus_text_format:format(?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY).
|
||||
|
||||
%%====================
|
||||
%% API Helpers
|
||||
|
@ -128,7 +128,7 @@ collect_metrics(Name, Metrics) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%%========================================
|
||||
%% Message Validation Metrics
|
||||
%% Schema Validation Metrics
|
||||
%%========================================
|
||||
collect_mv(K = ?key_enabled, Data) -> gauge_metrics(?MG(K, Data));
|
||||
collect_mv(K = ?key_matched, Data) -> counter_metrics(?MG(K, Data));
|
||||
|
@ -140,10 +140,10 @@ collect_mv(K = ?key_succeeded, Data) -> counter_metrics(?MG(K, Data)).
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%%========================================
|
||||
%% Message Validation Metrics
|
||||
%% Schema Validation Metrics
|
||||
%%========================================
|
||||
|
||||
message_validation_metrics_meta() ->
|
||||
schema_validation_metrics_meta() ->
|
||||
[
|
||||
{?key_enabled, gauge},
|
||||
{?key_matched, counter},
|
||||
|
@ -151,15 +151,15 @@ message_validation_metrics_meta() ->
|
|||
{?key_succeeded, counter}
|
||||
].
|
||||
|
||||
message_validation_metric(names) ->
|
||||
emqx_prometheus_cluster:metric_names(message_validation_metrics_meta()).
|
||||
schema_validation_metric(names) ->
|
||||
emqx_prometheus_cluster:metric_names(schema_validation_metrics_meta()).
|
||||
|
||||
to_validation_data(Mode, Validations) ->
|
||||
lists:foldl(
|
||||
fun(#{name := Name} = Validation, Acc) ->
|
||||
merge_acc_with_validations(Mode, Name, get_validation_metrics(Validation), Acc)
|
||||
end,
|
||||
maps:from_keys(message_validation_metric(names), []),
|
||||
maps:from_keys(schema_validation_metric(names), []),
|
||||
Validations
|
||||
).
|
||||
|
||||
|
@ -176,7 +176,7 @@ validation_point(Mode, Name, V) ->
|
|||
{with_node_label(Mode, [{validation_name, Name}]), V}.
|
||||
|
||||
get_validation_metrics(#{name := Name, enable := Enabled} = _Rule) ->
|
||||
#{counters := Counters} = emqx_message_validation_registry:get_metrics(Name),
|
||||
#{counters := Counters} = emqx_schema_validation_registry:get_metrics(Name),
|
||||
#{
|
||||
?key_enabled => emqx_prometheus_cluster:boolean_to_number(Enabled),
|
||||
?key_matched => ?MG0('matched', Counters),
|
||||
|
@ -192,9 +192,9 @@ get_validation_metrics(#{name := Name, enable := Enabled} = _Rule) ->
|
|||
%% merge / zip formatting funcs for type `application/json`
|
||||
|
||||
collect_json_data(Data) ->
|
||||
emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_message_validation_metrics/3).
|
||||
emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_schema_validation_metrics/3).
|
||||
|
||||
zip_json_message_validation_metrics(Key, Points, [] = _AccIn) ->
|
||||
zip_json_schema_validation_metrics(Key, Points, [] = _AccIn) ->
|
||||
lists:foldl(
|
||||
fun({Labels, Metric}, AccIn2) ->
|
||||
LabelsKVMap = maps:from_list(Labels),
|
||||
|
@ -204,7 +204,7 @@ zip_json_message_validation_metrics(Key, Points, [] = _AccIn) ->
|
|||
[],
|
||||
Points
|
||||
);
|
||||
zip_json_message_validation_metrics(Key, Points, AllResultsAcc) ->
|
||||
zip_json_schema_validation_metrics(Key, Points, AllResultsAcc) ->
|
||||
ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
|
||||
lists:zipwith(fun maps:merge/2, AllResultsAcc, ThisKeyResult).
|
||||
|
|
@ -82,7 +82,7 @@ all() ->
|
|||
{group, '/prometheus/stats'},
|
||||
{group, '/prometheus/auth'},
|
||||
{group, '/prometheus/data_integration'},
|
||||
[{group, '/prometheus/message_validation'} || emqx_release:edition() == ee]
|
||||
[{group, '/prometheus/schema_validation'} || emqx_release:edition() == ee]
|
||||
]).
|
||||
|
||||
groups() ->
|
||||
|
@ -100,7 +100,7 @@ groups() ->
|
|||
{'/prometheus/stats', ModeGroups},
|
||||
{'/prometheus/auth', ModeGroups},
|
||||
{'/prometheus/data_integration', ModeGroups},
|
||||
{'/prometheus/message_validation', ModeGroups},
|
||||
{'/prometheus/schema_validation', ModeGroups},
|
||||
{?PROM_DATA_MODE__NODE, AcceptGroups},
|
||||
{?PROM_DATA_MODE__ALL_NODES_AGGREGATED, AcceptGroups},
|
||||
{?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, AcceptGroups},
|
||||
|
@ -133,7 +133,7 @@ init_per_suite(Config) ->
|
|||
emqx_bridge_http,
|
||||
emqx_connector,
|
||||
[
|
||||
{emqx_message_validation, #{config => message_validation_config()}}
|
||||
{emqx_schema_validation, #{config => schema_validation_config()}}
|
||||
|| emqx_release:edition() == ee
|
||||
],
|
||||
{emqx_prometheus, emqx_prometheus_SUITE:legacy_conf_default()}
|
||||
|
@ -166,8 +166,8 @@ init_per_group('/prometheus/auth', Config) ->
|
|||
[{module, emqx_prometheus_auth} | Config];
|
||||
init_per_group('/prometheus/data_integration', Config) ->
|
||||
[{module, emqx_prometheus_data_integration} | Config];
|
||||
init_per_group('/prometheus/message_validation', Config) ->
|
||||
[{module, emqx_prometheus_message_validation} | Config];
|
||||
init_per_group('/prometheus/schema_validation', Config) ->
|
||||
[{module, emqx_prometheus_schema_validation} | Config];
|
||||
init_per_group(?PROM_DATA_MODE__NODE, Config) ->
|
||||
[{mode, ?PROM_DATA_MODE__NODE} | Config];
|
||||
init_per_group(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Config) ->
|
||||
|
@ -239,7 +239,7 @@ assert_data(_Module, {Code, Header, RawDataBinary}, #{type := <<"prometheus">>,
|
|||
assert_prom_data(DataL, Mode);
|
||||
assert_data(Module, {Code, JsonData}, #{type := <<"json">>, mode := Mode}) ->
|
||||
?assertEqual(Code, 200),
|
||||
?assert(is_map(JsonData), true),
|
||||
?assertMatch(#{}, JsonData),
|
||||
assert_json_data(Module, JsonData, Mode).
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%
|
||||
|
@ -355,8 +355,8 @@ metric_meta(<<"emqx_schema_registrys_count">>) -> ?meta(0, 0, 0);
|
|||
metric_meta(<<"emqx_rule_", _Tail/binary>>) -> ?meta(1, 1, 2);
|
||||
metric_meta(<<"emqx_action_", _Tail/binary>>) -> ?meta(1, 1, 2);
|
||||
metric_meta(<<"emqx_connector_", _Tail/binary>>) -> ?meta(1, 1, 2);
|
||||
%% `/prometheus/message_validation`
|
||||
metric_meta(<<"emqx_message_validation_", _Tail/binary>>) -> ?meta(1, 1, 2);
|
||||
%% `/prometheus/schema_validation`
|
||||
metric_meta(<<"emqx_schema_validation_", _Tail/binary>>) -> ?meta(1, 1, 2);
|
||||
%% normal emqx metrics
|
||||
metric_meta(<<"emqx_", _Tail/binary>>) -> ?meta(0, 0, 1);
|
||||
metric_meta(_) -> #{}.
|
||||
|
@ -821,16 +821,16 @@ assert_json_data__data_integration_overview(M, _) ->
|
|||
).
|
||||
-endif.
|
||||
|
||||
assert_json_data__message_validations(Ms, _) ->
|
||||
assert_json_data__schema_validations(Ms, _) ->
|
||||
lists:foreach(
|
||||
fun(M) ->
|
||||
?assertMatch(
|
||||
#{
|
||||
validation_name := _,
|
||||
emqx_message_validation_enable := _,
|
||||
emqx_message_validation_matched := _,
|
||||
emqx_message_validation_failed := _,
|
||||
emqx_message_validation_succeeded := _
|
||||
emqx_schema_validation_enable := _,
|
||||
emqx_schema_validation_matched := _,
|
||||
emqx_schema_validation_failed := _,
|
||||
emqx_schema_validation_succeeded := _
|
||||
},
|
||||
M
|
||||
)
|
||||
|
@ -838,7 +838,7 @@ assert_json_data__message_validations(Ms, _) ->
|
|||
Ms
|
||||
).
|
||||
|
||||
message_validation_config() ->
|
||||
schema_validation_config() ->
|
||||
Validation = #{
|
||||
<<"enable">> => true,
|
||||
<<"name">> => <<"my_validation">>,
|
||||
|
@ -853,7 +853,7 @@ message_validation_config() ->
|
|||
]
|
||||
},
|
||||
#{
|
||||
<<"message_validation">> => #{
|
||||
<<"schema_validation">> => #{
|
||||
<<"validations">> => [Validation]
|
||||
}
|
||||
}.
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
on_session_unsubscribed/4,
|
||||
on_message_publish/2,
|
||||
on_message_dropped/4,
|
||||
on_message_validation_failed/3,
|
||||
on_schema_validation_failed/3,
|
||||
on_message_delivered/3,
|
||||
on_message_acked/3,
|
||||
on_delivery_dropped/4,
|
||||
|
@ -79,7 +79,7 @@ event_names() ->
|
|||
'message.delivered',
|
||||
'message.acked',
|
||||
'message.dropped',
|
||||
'message.validation_failed',
|
||||
'schema.validation_failed',
|
||||
'delivery.dropped'
|
||||
].
|
||||
|
||||
|
@ -95,7 +95,7 @@ event_topics_enum() ->
|
|||
'$events/message_delivered',
|
||||
'$events/message_acked',
|
||||
'$events/message_dropped',
|
||||
'$events/message_validation_failed',
|
||||
'$events/schema_validation_failed',
|
||||
'$events/delivery_dropped'
|
||||
% '$events/message_publish' % not possible to use in SELECT FROM
|
||||
].
|
||||
|
@ -224,13 +224,13 @@ on_message_dropped(Message, _, Reason, Conf) ->
|
|||
end,
|
||||
{ok, Message}.
|
||||
|
||||
on_message_validation_failed(Message, ValidationContext, Conf) ->
|
||||
on_schema_validation_failed(Message, ValidationContext, Conf) ->
|
||||
case ignore_sys_message(Message) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
apply_event(
|
||||
'message.validation_failed',
|
||||
'schema.validation_failed',
|
||||
fun() -> eventmsg_validation_failed(Message, ValidationContext) end,
|
||||
Conf
|
||||
)
|
||||
|
@ -508,7 +508,7 @@ eventmsg_validation_failed(
|
|||
) ->
|
||||
#{name := ValidationName} = ValidationContext,
|
||||
with_basic_columns(
|
||||
'message.validation_failed',
|
||||
'schema.validation_failed',
|
||||
#{
|
||||
id => emqx_guid:to_hexstr(Id),
|
||||
validation => ValidationName,
|
||||
|
@ -687,16 +687,16 @@ event_info() ->
|
|||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
%% ELSE (?EMQX_RELEASE_EDITION == ee).
|
||||
event_info_message_validation_failed() ->
|
||||
event_info_schema_validation_failed() ->
|
||||
event_info_common(
|
||||
'message.validation_failed',
|
||||
{<<"message validation failed">>, <<"TODO"/utf8>>},
|
||||
'schema.validation_failed',
|
||||
{<<"schema validation failed">>, <<"TODO"/utf8>>},
|
||||
{<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>},
|
||||
<<"SELECT * FROM \"$events/message_validation_failed\" WHERE topic =~ 't/#'">>
|
||||
<<"SELECT * FROM \"$events/schema_validation_failed\" WHERE topic =~ 't/#'">>
|
||||
).
|
||||
ee_event_info() ->
|
||||
[
|
||||
event_info_message_validation_failed()
|
||||
event_info_schema_validation_failed()
|
||||
].
|
||||
-else.
|
||||
%% END (?EMQX_RELEASE_EDITION == ee).
|
||||
|
@ -873,7 +873,7 @@ test_columns(Event) ->
|
|||
ee_test_columns(Event).
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
ee_test_columns('message.validation_failed') ->
|
||||
ee_test_columns('schema.validation_failed') ->
|
||||
[{<<"validation">>, <<"myvalidation">>}] ++
|
||||
test_columns('message.publish').
|
||||
%% ELSE (?EMQX_RELEASE_EDITION == ee).
|
||||
|
@ -922,9 +922,9 @@ columns_with_exam('message.dropped') ->
|
|||
{<<"timestamp">>, erlang:system_time(millisecond)},
|
||||
{<<"node">>, node()}
|
||||
];
|
||||
columns_with_exam('message.validation_failed') ->
|
||||
columns_with_exam('schema.validation_failed') ->
|
||||
[
|
||||
{<<"event">>, 'message.validation_failed'},
|
||||
{<<"event">>, 'schema.validation_failed'},
|
||||
{<<"validation">>, <<"my_validation">>},
|
||||
{<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())},
|
||||
{<<"clientid">>, <<"c_emqx">>},
|
||||
|
@ -1129,7 +1129,7 @@ hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4;
|
|||
hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3;
|
||||
hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3;
|
||||
hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4;
|
||||
hook_fun('message.validation_failed') -> fun ?MODULE:on_message_validation_failed/3;
|
||||
hook_fun('schema.validation_failed') -> fun ?MODULE:on_schema_validation_failed/3;
|
||||
hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4;
|
||||
hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2;
|
||||
hook_fun(Event) -> error({invalid_event, Event}).
|
||||
|
@ -1154,7 +1154,7 @@ event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed';
|
|||
event_name(<<"$events/message_delivered">>) -> 'message.delivered';
|
||||
event_name(<<"$events/message_acked">>) -> 'message.acked';
|
||||
event_name(<<"$events/message_dropped">>) -> 'message.dropped';
|
||||
event_name(<<"$events/message_validation_failed">>) -> 'message.validation_failed';
|
||||
event_name(<<"$events/schema_validation_failed">>) -> 'schema.validation_failed';
|
||||
event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped';
|
||||
event_name(_) -> 'message.publish'.
|
||||
|
||||
|
@ -1168,7 +1168,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
|
|||
event_topic('message.delivered') -> <<"$events/message_delivered">>;
|
||||
event_topic('message.acked') -> <<"$events/message_acked">>;
|
||||
event_topic('message.dropped') -> <<"$events/message_dropped">>;
|
||||
event_topic('message.validation_failed') -> <<"$events/message_validation_failed">>;
|
||||
event_topic('schema.validation_failed') -> <<"$events/schema_validation_failed">>;
|
||||
event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
|
||||
event_topic('message.publish') -> <<"$events/message_publish">>.
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
inc_action_metrics/2
|
||||
]).
|
||||
|
||||
%% Internal exports used by message validation
|
||||
%% Internal exports used by schema validation
|
||||
-export([evaluate_select/3, clear_rule_payload/0]).
|
||||
|
||||
-import(
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# EMQX Message Validation
|
||||
# EMQX Schema Validation
|
||||
|
||||
This application encapsulates the functionality to validate incoming or internally
|
||||
triggered published payloads and take an action upon failure, which can be to just drop
|
||||
|
@ -7,7 +7,7 @@ the message without further processing, or to disconnect the offending client as
|
|||
# Documentation
|
||||
|
||||
Refer to [Message
|
||||
Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/message-validation.html)
|
||||
Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/schema-validation.html)
|
||||
for more information about the semantics and checks available.
|
||||
|
||||
# HTTP APIs
|
||||
|
@ -16,7 +16,7 @@ APIs are provided for validation management, which includes creating,
|
|||
updating, looking up, deleting, listing validations.
|
||||
|
||||
Refer to [API Docs -
|
||||
Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Message-Validation)
|
||||
Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Schema-Validation)
|
||||
for more detailed information.
|
||||
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
{application, emqx_schema_validation, [
|
||||
{description, "EMQX Schema Validation"},
|
||||
{vsn, "0.1.0"},
|
||||
{registered, [emqx_schema_validation_sup, emqx_schema_validation_registry]},
|
||||
{mod, {emqx_schema_validation_app, []}},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
|
||||
{links, []}
|
||||
]}.
|
|
@ -1,7 +1,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_validation).
|
||||
-module(emqx_schema_validation).
|
||||
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||
|
@ -47,8 +47,8 @@
|
|||
%% Type declarations
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-define(TRACE_TAG, "MESSAGE_VALIDATION").
|
||||
-define(CONF_ROOT, message_validation).
|
||||
-define(TRACE_TAG, "SCHEMA_VALIDATION").
|
||||
-define(CONF_ROOT, schema_validation).
|
||||
-define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]).
|
||||
|
||||
-type validation_name() :: binary().
|
||||
|
@ -72,7 +72,7 @@ load() ->
|
|||
Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
|
||||
lists:foreach(
|
||||
fun({Pos, Validation}) ->
|
||||
ok = emqx_message_validation_registry:insert(Pos, Validation)
|
||||
ok = emqx_schema_validation_registry:insert(Pos, Validation)
|
||||
end,
|
||||
lists:enumerate(Validations)
|
||||
).
|
||||
|
@ -81,7 +81,7 @@ unload() ->
|
|||
Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
|
||||
lists:foreach(
|
||||
fun(Validation) ->
|
||||
ok = emqx_message_validation_registry:delete(Validation)
|
||||
ok = emqx_schema_validation_registry:delete(Validation)
|
||||
end,
|
||||
Validations
|
||||
).
|
||||
|
@ -146,7 +146,7 @@ unregister_hooks() ->
|
|||
-spec on_message_publish(emqx_types:message()) ->
|
||||
{ok, emqx_types:message()} | {stop, emqx_types:message()}.
|
||||
on_message_publish(Message = #message{topic = Topic, headers = Headers}) ->
|
||||
case emqx_message_validation_registry:matching_validations(Topic) of
|
||||
case emqx_schema_validation_registry:matching_validations(Topic) of
|
||||
[] ->
|
||||
ok;
|
||||
Validations ->
|
||||
|
@ -184,19 +184,19 @@ pre_config_update(?VALIDATIONS_CONF_PATH, {reorder, Order}, OldValidations) ->
|
|||
|
||||
post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
|
||||
{Pos, Validation} = fetch_with_index(New, Name),
|
||||
ok = emqx_message_validation_registry:insert(Pos, Validation),
|
||||
ok = emqx_schema_validation_registry:insert(Pos, Validation),
|
||||
ok;
|
||||
post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) ->
|
||||
{_Pos, OldValidation} = fetch_with_index(Old, Name),
|
||||
{Pos, NewValidation} = fetch_with_index(New, Name),
|
||||
ok = emqx_message_validation_registry:update(OldValidation, Pos, NewValidation),
|
||||
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
|
||||
ok;
|
||||
post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
|
||||
{_Pos, Validation} = fetch_with_index(Old, Name),
|
||||
ok = emqx_message_validation_registry:delete(Validation),
|
||||
ok = emqx_schema_validation_registry:delete(Validation),
|
||||
ok;
|
||||
post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) ->
|
||||
ok = emqx_message_validation_registry:reindex_positions(New),
|
||||
ok = emqx_schema_validation_registry:reindex_positions(New),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -395,26 +395,26 @@ run_validations(Validations, Message) ->
|
|||
emqx_rule_runtime:clear_rule_payload(),
|
||||
Fun = fun(Validation, Acc) ->
|
||||
#{name := Name} = Validation,
|
||||
emqx_message_validation_registry:inc_matched(Name),
|
||||
emqx_schema_validation_registry:inc_matched(Name),
|
||||
case run_validation(Validation, Message) of
|
||||
ok ->
|
||||
emqx_message_validation_registry:inc_succeeded(Name),
|
||||
emqx_schema_validation_registry:inc_succeeded(Name),
|
||||
{cont, Acc};
|
||||
ignore ->
|
||||
trace_failure(Validation, "validation_failed", #{
|
||||
validation => Name,
|
||||
action => ignore
|
||||
}),
|
||||
emqx_message_validation_registry:inc_failed(Name),
|
||||
run_message_validation_failed_hook(Message, Validation),
|
||||
emqx_schema_validation_registry:inc_failed(Name),
|
||||
run_schema_validation_failed_hook(Message, Validation),
|
||||
{cont, Acc};
|
||||
FailureAction ->
|
||||
trace_failure(Validation, "validation_failed", #{
|
||||
validation => Name,
|
||||
action => FailureAction
|
||||
}),
|
||||
emqx_message_validation_registry:inc_failed(Name),
|
||||
run_message_validation_failed_hook(Message, Validation),
|
||||
emqx_schema_validation_registry:inc_failed(Name),
|
||||
run_schema_validation_failed_hook(Message, Validation),
|
||||
{halt, FailureAction}
|
||||
end
|
||||
end,
|
||||
|
@ -457,17 +457,17 @@ trace_failure(#{log_failure := #{level := none}} = Validation, _Msg, _Meta) ->
|
|||
name := _Name,
|
||||
failure_action := _Action
|
||||
} = Validation,
|
||||
?tp(message_validation_failed, #{log_level => none, name => _Name, action => _Action}),
|
||||
?tp(schema_validation_failed, #{log_level => none, name => _Name, action => _Action}),
|
||||
ok;
|
||||
trace_failure(#{log_failure := #{level := Level}} = Validation, Msg, Meta) ->
|
||||
#{
|
||||
name := _Name,
|
||||
failure_action := _Action
|
||||
} = Validation,
|
||||
?tp(message_validation_failed, #{log_level => Level, name => _Name, action => _Action}),
|
||||
?tp(schema_validation_failed, #{log_level => Level, name => _Name, action => _Action}),
|
||||
?TRACE(Level, ?TRACE_TAG, Msg, Meta).
|
||||
|
||||
run_message_validation_failed_hook(Message, Validation) ->
|
||||
run_schema_validation_failed_hook(Message, Validation) ->
|
||||
#{name := Name} = Validation,
|
||||
ValidationContext = #{name => Name},
|
||||
emqx_hooks:run('message.validation_failed', [Message, ValidationContext]).
|
||||
emqx_hooks:run('schema.validation_failed', [Message, ValidationContext]).
|
|
@ -1,7 +1,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_validation_app).
|
||||
-module(emqx_schema_validation_app).
|
||||
|
||||
-behaviour(application).
|
||||
|
||||
|
@ -18,15 +18,15 @@
|
|||
|
||||
-spec start(application:start_type(), term()) -> {ok, pid()}.
|
||||
start(_Type, _Args) ->
|
||||
{ok, Sup} = emqx_message_validation_sup:start_link(),
|
||||
ok = emqx_message_validation:add_handler(),
|
||||
ok = emqx_message_validation:register_hooks(),
|
||||
ok = emqx_message_validation:load(),
|
||||
{ok, Sup} = emqx_schema_validation_sup:start_link(),
|
||||
ok = emqx_schema_validation:add_handler(),
|
||||
ok = emqx_schema_validation:register_hooks(),
|
||||
ok = emqx_schema_validation:load(),
|
||||
{ok, Sup}.
|
||||
|
||||
-spec stop(term()) -> ok.
|
||||
stop(_State) ->
|
||||
ok = emqx_message_validation:unload(),
|
||||
ok = emqx_message_validation:unregister_hooks(),
|
||||
ok = emqx_message_validation:remove_handler(),
|
||||
ok = emqx_schema_validation:unload(),
|
||||
ok = emqx_schema_validation:unregister_hooks(),
|
||||
ok = emqx_schema_validation:remove_handler(),
|
||||
ok.
|
|
@ -1,7 +1,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_validation_http_api).
|
||||
-module(emqx_schema_validation_http_api).
|
||||
|
||||
-behaviour(minirest_api).
|
||||
|
||||
|
@ -21,43 +21,43 @@
|
|||
|
||||
%% `minirest' handlers
|
||||
-export([
|
||||
'/message_validations'/2,
|
||||
'/message_validations/reorder'/2,
|
||||
'/message_validations/validation/:name'/2,
|
||||
'/message_validations/validation/:name/metrics'/2,
|
||||
'/message_validations/validation/:name/metrics/reset'/2,
|
||||
'/message_validations/validation/:name/enable/:enable'/2
|
||||
'/schema_validations'/2,
|
||||
'/schema_validations/reorder'/2,
|
||||
'/schema_validations/validation/:name'/2,
|
||||
'/schema_validations/validation/:name/metrics'/2,
|
||||
'/schema_validations/validation/:name/metrics/reset'/2,
|
||||
'/schema_validations/validation/:name/enable/:enable'/2
|
||||
]).
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% Type definitions
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
-define(TAGS, [<<"Message Validation">>]).
|
||||
-define(METRIC_NAME, message_validation).
|
||||
-define(TAGS, [<<"Schema Validation">>]).
|
||||
-define(METRIC_NAME, schema_validation).
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% `minirest' and `minirest_trails' API
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> "message_validation_http_api".
|
||||
namespace() -> "schema_validation_http_api".
|
||||
|
||||
api_spec() ->
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||
|
||||
paths() ->
|
||||
[
|
||||
"/message_validations",
|
||||
"/message_validations/reorder",
|
||||
"/message_validations/validation/:name",
|
||||
"/message_validations/validation/:name/metrics",
|
||||
"/message_validations/validation/:name/metrics/reset",
|
||||
"/message_validations/validation/:name/enable/:enable"
|
||||
"/schema_validations",
|
||||
"/schema_validations/reorder",
|
||||
"/schema_validations/validation/:name",
|
||||
"/schema_validations/validation/:name/metrics",
|
||||
"/schema_validations/validation/:name/metrics/reset",
|
||||
"/schema_validations/validation/:name/enable/:enable"
|
||||
].
|
||||
|
||||
schema("/message_validations") ->
|
||||
schema("/schema_validations") ->
|
||||
#{
|
||||
'operationId' => '/message_validations',
|
||||
'operationId' => '/schema_validations',
|
||||
get => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"List validations">>,
|
||||
|
@ -67,7 +67,7 @@ schema("/message_validations") ->
|
|||
200 =>
|
||||
emqx_dashboard_swagger:schema_with_examples(
|
||||
array(
|
||||
emqx_message_validation_schema:api_schema(list)
|
||||
emqx_schema_validation_schema:api_schema(list)
|
||||
),
|
||||
example_return_list()
|
||||
)
|
||||
|
@ -78,14 +78,14 @@ schema("/message_validations") ->
|
|||
summary => <<"Append a new validation">>,
|
||||
description => ?DESC("append_validation"),
|
||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||
emqx_message_validation_schema:api_schema(post),
|
||||
emqx_schema_validation_schema:api_schema(post),
|
||||
example_input_create()
|
||||
),
|
||||
responses =>
|
||||
#{
|
||||
201 =>
|
||||
emqx_dashboard_swagger:schema_with_examples(
|
||||
emqx_message_validation_schema:api_schema(post),
|
||||
emqx_schema_validation_schema:api_schema(post),
|
||||
example_return_create()
|
||||
),
|
||||
400 => error_schema('ALREADY_EXISTS', "Validation already exists")
|
||||
|
@ -96,14 +96,14 @@ schema("/message_validations") ->
|
|||
summary => <<"Update a validation">>,
|
||||
description => ?DESC("update_validation"),
|
||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||
emqx_message_validation_schema:api_schema(put),
|
||||
emqx_schema_validation_schema:api_schema(put),
|
||||
example_input_update()
|
||||
),
|
||||
responses =>
|
||||
#{
|
||||
200 =>
|
||||
emqx_dashboard_swagger:schema_with_examples(
|
||||
emqx_message_validation_schema:api_schema(put),
|
||||
emqx_schema_validation_schema:api_schema(put),
|
||||
example_return_update()
|
||||
),
|
||||
404 => error_schema('NOT_FOUND', "Validation not found"),
|
||||
|
@ -111,9 +111,9 @@ schema("/message_validations") ->
|
|||
}
|
||||
}
|
||||
};
|
||||
schema("/message_validations/reorder") ->
|
||||
schema("/schema_validations/reorder") ->
|
||||
#{
|
||||
'operationId' => '/message_validations/reorder',
|
||||
'operationId' => '/schema_validations/reorder',
|
||||
post => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Reorder all validations">>,
|
||||
|
@ -140,9 +140,9 @@ schema("/message_validations/reorder") ->
|
|||
}
|
||||
}
|
||||
};
|
||||
schema("/message_validations/validation/:name") ->
|
||||
schema("/schema_validations/validation/:name") ->
|
||||
#{
|
||||
'operationId' => '/message_validations/validation/:name',
|
||||
'operationId' => '/schema_validations/validation/:name',
|
||||
get => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Lookup a validation">>,
|
||||
|
@ -153,7 +153,7 @@ schema("/message_validations/validation/:name") ->
|
|||
200 =>
|
||||
emqx_dashboard_swagger:schema_with_examples(
|
||||
array(
|
||||
emqx_message_validation_schema:api_schema(lookup)
|
||||
emqx_schema_validation_schema:api_schema(lookup)
|
||||
),
|
||||
example_return_lookup()
|
||||
),
|
||||
|
@ -172,9 +172,9 @@ schema("/message_validations/validation/:name") ->
|
|||
}
|
||||
}
|
||||
};
|
||||
schema("/message_validations/validation/:name/metrics") ->
|
||||
schema("/schema_validations/validation/:name/metrics") ->
|
||||
#{
|
||||
'operationId' => '/message_validations/validation/:name/metrics',
|
||||
'operationId' => '/schema_validations/validation/:name/metrics',
|
||||
get => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Get validation metrics">>,
|
||||
|
@ -191,9 +191,9 @@ schema("/message_validations/validation/:name/metrics") ->
|
|||
}
|
||||
}
|
||||
};
|
||||
schema("/message_validations/validation/:name/metrics/reset") ->
|
||||
schema("/schema_validations/validation/:name/metrics/reset") ->
|
||||
#{
|
||||
'operationId' => '/message_validations/validation/:name/metrics/reset',
|
||||
'operationId' => '/schema_validations/validation/:name/metrics/reset',
|
||||
post => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Reset validation metrics">>,
|
||||
|
@ -206,9 +206,9 @@ schema("/message_validations/validation/:name/metrics/reset") ->
|
|||
}
|
||||
}
|
||||
};
|
||||
schema("/message_validations/validation/:name/enable/:enable") ->
|
||||
schema("/schema_validations/validation/:name/enable/:enable") ->
|
||||
#{
|
||||
'operationId' => '/message_validations/validation/:name/enable/:enable',
|
||||
'operationId' => '/schema_validations/validation/:name/enable/:enable',
|
||||
post => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Enable or disable validation">>,
|
||||
|
@ -285,29 +285,29 @@ fields(node_metrics) ->
|
|||
%% `minirest' handlers
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
'/message_validations'(get, _Params) ->
|
||||
?OK(emqx_message_validation:list());
|
||||
'/message_validations'(post, #{body := Params = #{<<"name">> := Name}}) ->
|
||||
'/schema_validations'(get, _Params) ->
|
||||
?OK(emqx_schema_validation:list());
|
||||
'/schema_validations'(post, #{body := Params = #{<<"name">> := Name}}) ->
|
||||
with_validation(
|
||||
Name,
|
||||
return(?BAD_REQUEST('ALREADY_EXISTS', <<"Validation already exists">>)),
|
||||
fun() ->
|
||||
case emqx_message_validation:insert(Params) of
|
||||
case emqx_schema_validation:insert(Params) of
|
||||
{ok, _} ->
|
||||
{ok, Res} = emqx_message_validation:lookup(Name),
|
||||
{ok, Res} = emqx_schema_validation:lookup(Name),
|
||||
{201, Res};
|
||||
{error, Error} ->
|
||||
?BAD_REQUEST(Error)
|
||||
end
|
||||
end
|
||||
);
|
||||
'/message_validations'(put, #{body := Params = #{<<"name">> := Name}}) ->
|
||||
'/schema_validations'(put, #{body := Params = #{<<"name">> := Name}}) ->
|
||||
with_validation(
|
||||
Name,
|
||||
fun() ->
|
||||
case emqx_message_validation:update(Params) of
|
||||
case emqx_schema_validation:update(Params) of
|
||||
{ok, _} ->
|
||||
{ok, Res} = emqx_message_validation:lookup(Name),
|
||||
{ok, Res} = emqx_schema_validation:lookup(Name),
|
||||
{200, Res};
|
||||
{error, Error} ->
|
||||
?BAD_REQUEST(Error)
|
||||
|
@ -316,17 +316,17 @@ fields(node_metrics) ->
|
|||
not_found()
|
||||
).
|
||||
|
||||
'/message_validations/validation/:name'(get, #{bindings := #{name := Name}}) ->
|
||||
'/schema_validations/validation/:name'(get, #{bindings := #{name := Name}}) ->
|
||||
with_validation(
|
||||
Name,
|
||||
fun(Validation) -> ?OK(Validation) end,
|
||||
not_found()
|
||||
);
|
||||
'/message_validations/validation/:name'(delete, #{bindings := #{name := Name}}) ->
|
||||
'/schema_validations/validation/:name'(delete, #{bindings := #{name := Name}}) ->
|
||||
with_validation(
|
||||
Name,
|
||||
fun() ->
|
||||
case emqx_message_validation:delete(Name) of
|
||||
case emqx_schema_validation:delete(Name) of
|
||||
{ok, _} ->
|
||||
?NO_CONTENT;
|
||||
{error, Error} ->
|
||||
|
@ -336,10 +336,10 @@ fields(node_metrics) ->
|
|||
not_found()
|
||||
).
|
||||
|
||||
'/message_validations/reorder'(post, #{body := #{<<"order">> := Order}}) ->
|
||||
'/schema_validations/reorder'(post, #{body := #{<<"order">> := Order}}) ->
|
||||
do_reorder(Order).
|
||||
|
||||
'/message_validations/validation/:name/enable/:enable'(post, #{
|
||||
'/schema_validations/validation/:name/enable/:enable'(post, #{
|
||||
bindings := #{name := Name, enable := Enable}
|
||||
}) ->
|
||||
with_validation(
|
||||
|
@ -348,7 +348,7 @@ fields(node_metrics) ->
|
|||
not_found()
|
||||
).
|
||||
|
||||
'/message_validations/validation/:name/metrics'(get, #{bindings := #{name := Name}}) ->
|
||||
'/schema_validations/validation/:name/metrics'(get, #{bindings := #{name := Name}}) ->
|
||||
with_validation(
|
||||
Name,
|
||||
fun() ->
|
||||
|
@ -371,7 +371,7 @@ fields(node_metrics) ->
|
|||
not_found()
|
||||
).
|
||||
|
||||
'/message_validations/validation/:name/metrics/reset'(post, #{bindings := #{name := Name}}) ->
|
||||
'/schema_validations/validation/:name/metrics/reset'(post, #{bindings := #{name := Name}}) ->
|
||||
with_validation(
|
||||
Name,
|
||||
fun() ->
|
||||
|
@ -516,7 +516,7 @@ error_schema(Codes, Message, ExtraFields) when is_list(Codes) andalso is_binary(
|
|||
ExtraFields ++ emqx_dashboard_swagger:error_codes(Codes, Message).
|
||||
|
||||
do_reorder(Order) ->
|
||||
case emqx_message_validation:reorder(Order) of
|
||||
case emqx_schema_validation:reorder(Order) of
|
||||
{ok, _} ->
|
||||
?NO_CONTENT;
|
||||
{error,
|
||||
|
@ -538,7 +538,7 @@ do_reorder(Order) ->
|
|||
|
||||
do_enable_disable(Validation, Enable) ->
|
||||
RawValidation = make_serializable(Validation),
|
||||
case emqx_message_validation:update(RawValidation#{<<"enable">> => Enable}) of
|
||||
case emqx_schema_validation:update(RawValidation#{<<"enable">> => Enable}) of
|
||||
{ok, _} ->
|
||||
?NO_CONTENT;
|
||||
{error, Reason} ->
|
||||
|
@ -546,7 +546,7 @@ do_enable_disable(Validation, Enable) ->
|
|||
end.
|
||||
|
||||
with_validation(Name, FoundFn, NotFoundFn) ->
|
||||
case emqx_message_validation:lookup(Name) of
|
||||
case emqx_schema_validation:lookup(Name) of
|
||||
{ok, Validation} ->
|
||||
{arity, Arity} = erlang:fun_info(FoundFn, arity),
|
||||
case Arity of
|
||||
|
@ -564,15 +564,15 @@ not_found() ->
|
|||
return(?NOT_FOUND(<<"Validation not found">>)).
|
||||
|
||||
make_serializable(Validation) ->
|
||||
Schema = emqx_message_validation_schema,
|
||||
Schema = emqx_schema_validation_schema,
|
||||
RawConfig = #{
|
||||
<<"message_validation">> => #{
|
||||
<<"schema_validation">> => #{
|
||||
<<"validations">> =>
|
||||
[emqx_utils_maps:binary_key_map(Validation)]
|
||||
}
|
||||
},
|
||||
#{
|
||||
<<"message_validation">> := #{
|
||||
<<"schema_validation">> := #{
|
||||
<<"validations">> :=
|
||||
[Serialized]
|
||||
}
|
|
@ -1,7 +1,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_validation_registry).
|
||||
-module(emqx_schema_validation_registry).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
|
@ -36,10 +36,10 @@
|
|||
%% Type declarations
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-define(VALIDATION_TOPIC_INDEX, emqx_message_validation_index).
|
||||
-define(VALIDATION_TAB, emqx_message_validation_tab).
|
||||
-define(VALIDATION_TOPIC_INDEX, emqx_schema_validation_index).
|
||||
-define(VALIDATION_TAB, emqx_schema_validation_tab).
|
||||
|
||||
-define(METRIC_NAME, message_validation).
|
||||
-define(METRIC_NAME, schema_validation).
|
||||
-define(METRICS, [
|
||||
'matched',
|
||||
'succeeded',
|
||||
|
@ -106,7 +106,7 @@ matching_validations(Topic) ->
|
|||
|
||||
-spec metrics_worker_spec() -> supervisor:child_spec().
|
||||
metrics_worker_spec() ->
|
||||
emqx_metrics_worker:child_spec(message_validation_metrics, ?METRIC_NAME).
|
||||
emqx_metrics_worker:child_spec(schema_validation_metrics, ?METRIC_NAME).
|
||||
|
||||
-spec get_metrics(validation_name()) -> emqx_metrics_worker:metrics().
|
||||
get_metrics(Name) ->
|
||||
|
@ -243,7 +243,7 @@ transform_validation(Validation = #{checks := Checks}) ->
|
|||
Validation#{checks := lists:map(fun transform_check/1, Checks)}.
|
||||
|
||||
transform_check(#{type := sql, sql := SQL}) ->
|
||||
{ok, Check} = emqx_message_validation:parse_sql_check(SQL),
|
||||
{ok, Check} = emqx_schema_validation:parse_sql_check(SQL),
|
||||
Check;
|
||||
transform_check(Check) ->
|
||||
Check.
|
|
@ -1,7 +1,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_validation_schema).
|
||||
-module(emqx_schema_validation_schema).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
|
@ -26,12 +26,12 @@
|
|||
%% `hocon_schema' API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> message_validation.
|
||||
namespace() -> schema_validation.
|
||||
|
||||
roots() ->
|
||||
[{message_validation, mk(ref(message_validation), #{importance => ?IMPORTANCE_HIDDEN})}].
|
||||
[{schema_validation, mk(ref(schema_validation), #{importance => ?IMPORTANCE_HIDDEN})}].
|
||||
|
||||
fields(message_validation) ->
|
||||
fields(schema_validation) ->
|
||||
[
|
||||
{validations,
|
||||
mk(
|
||||
|
@ -199,7 +199,7 @@ ensure_array(L, _) when is_list(L) -> L;
|
|||
ensure_array(B, _) -> [B].
|
||||
|
||||
validate_sql(SQL) ->
|
||||
case emqx_message_validation:parse_sql_check(SQL) of
|
||||
case emqx_schema_validation:parse_sql_check(SQL) of
|
||||
{ok, _Parsed} ->
|
||||
ok;
|
||||
Error = {error, _} ->
|
|
@ -1,7 +1,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_validation_sup).
|
||||
-module(emqx_schema_validation_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
|
@ -23,8 +23,8 @@ start_link() ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
Registry = worker_spec(emqx_message_validation_registry),
|
||||
Metrics = emqx_message_validation_registry:metrics_worker_spec(),
|
||||
Registry = worker_spec(emqx_schema_validation_registry),
|
||||
Metrics = emqx_schema_validation_registry:metrics_worker_spec(),
|
||||
SupFlags = #{
|
||||
strategy => one_for_one,
|
||||
intensity => 10,
|
|
@ -1,7 +1,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_validation_http_api_SUITE).
|
||||
-module(emqx_schema_validation_http_api_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
@ -31,7 +31,7 @@ init_per_suite(Config) ->
|
|||
emqx,
|
||||
emqx_conf,
|
||||
emqx_rule_engine,
|
||||
emqx_message_validation,
|
||||
emqx_schema_validation,
|
||||
emqx_management,
|
||||
emqx_mgmt_api_test_util:emqx_dashboard(),
|
||||
emqx_schema_registry
|
||||
|
@ -66,9 +66,9 @@ end_per_testcase(_TestCase, _Config) ->
|
|||
clear_all_validations() ->
|
||||
lists:foreach(
|
||||
fun(#{name := Name}) ->
|
||||
{ok, _} = emqx_message_validation:delete(Name)
|
||||
{ok, _} = emqx_schema_validation:delete(Name)
|
||||
end,
|
||||
emqx_message_validation:list()
|
||||
emqx_schema_validation:list()
|
||||
).
|
||||
|
||||
reset_all_global_metrics() ->
|
||||
|
@ -146,7 +146,7 @@ schema_check(Type, SerdeName, Overrides) ->
|
|||
Overrides
|
||||
).
|
||||
|
||||
api_root() -> "message_validations".
|
||||
api_root() -> "schema_validations".
|
||||
|
||||
simplify_result(Res) ->
|
||||
case Res of
|
||||
|
@ -358,7 +358,7 @@ assert_index_order(ExpectedOrder, Topic, Comment) ->
|
|||
ExpectedOrder,
|
||||
[
|
||||
N
|
||||
|| #{name := N} <- emqx_message_validation_registry:matching_validations(Topic)
|
||||
|| #{name := N} <- emqx_schema_validation_registry:matching_validations(Topic)
|
||||
],
|
||||
Comment
|
||||
).
|
||||
|
@ -366,7 +366,7 @@ assert_index_order(ExpectedOrder, Topic, Comment) ->
|
|||
create_failure_tracing_rule() ->
|
||||
Params = #{
|
||||
enable => true,
|
||||
sql => <<"select * from \"$events/message_validation_failed\" ">>,
|
||||
sql => <<"select * from \"$events/schema_validation_failed\" ">>,
|
||||
actions => [make_trace_fn_action()]
|
||||
},
|
||||
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
|
||||
|
@ -689,7 +689,7 @@ t_log_failure_none(_Config) ->
|
|||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertMatch([#{log_level := none}], ?of_kind(message_validation_failed, Trace)),
|
||||
?assertMatch([#{log_level := none}], ?of_kind(schema_validation_failed, Trace)),
|
||||
ok
|
||||
end
|
||||
),
|
||||
|
@ -719,12 +719,12 @@ t_action_ignore(_Config) ->
|
|||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertMatch([#{action := ignore}], ?of_kind(message_validation_failed, Trace)),
|
||||
?assertMatch([#{action := ignore}], ?of_kind(schema_validation_failed, Trace)),
|
||||
ok
|
||||
end
|
||||
),
|
||||
?assertMatch(
|
||||
[{_, #{data := #{validation := Name1, event := 'message.validation_failed'}}}],
|
||||
[{_, #{data := #{validation := Name1, event := 'schema.validation_failed'}}}],
|
||||
get_traced_failures_from_rule_engine()
|
||||
),
|
||||
ok.
|
||||
|
@ -1093,8 +1093,8 @@ t_multiple_validations(_Config) ->
|
|||
|
||||
?assertMatch(
|
||||
[
|
||||
{_, #{data := #{validation := Name1, event := 'message.validation_failed'}}},
|
||||
{_, #{data := #{validation := Name2, event := 'message.validation_failed'}}}
|
||||
{_, #{data := #{validation := Name1, event := 'schema.validation_failed'}}},
|
||||
{_, #{data := #{validation := Name2, event := 'schema.validation_failed'}}}
|
||||
],
|
||||
get_traced_failures_from_rule_engine()
|
||||
),
|
|
@ -1,22 +1,22 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_validation_tests).
|
||||
-module(emqx_schema_validation_tests).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(VALIDATIONS_PATH, "message_validation.validations").
|
||||
-define(VALIDATIONS_PATH, "schema_validation.validations").
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helper fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
parse_and_check(InnerConfigs) ->
|
||||
RootBin = <<"message_validation">>,
|
||||
RootBin = <<"schema_validation">>,
|
||||
InnerBin = <<"validations">>,
|
||||
RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
|
||||
#{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
|
||||
emqx_message_validation_schema,
|
||||
emqx_schema_validation_schema,
|
||||
RawConf,
|
||||
#{
|
||||
required => false,
|
||||
|
@ -65,9 +65,9 @@ schema_check(Type, SerdeName, Overrides) ->
|
|||
).
|
||||
|
||||
eval_sql(Message, SQL) ->
|
||||
{ok, Check} = emqx_message_validation:parse_sql_check(SQL),
|
||||
{ok, Check} = emqx_schema_validation:parse_sql_check(SQL),
|
||||
Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
|
||||
emqx_message_validation:evaluate_sql_check(Check, Validation, Message).
|
||||
emqx_schema_validation:evaluate_sql_check(Check, Validation, Message).
|
||||
|
||||
message() ->
|
||||
message(_Opts = #{}).
|
||||
|
@ -196,7 +196,7 @@ invalid_names_test_() ->
|
|||
{_Schema, [
|
||||
#{
|
||||
kind := validation_error,
|
||||
path := "message_validation.validations.1.name"
|
||||
path := "schema_validation.validations.1.name"
|
||||
}
|
||||
]},
|
||||
parse_and_check([validation(InvalidName, [sql_check()])])
|
||||
|
@ -239,7 +239,7 @@ duplicated_check_test_() ->
|
|||
#{
|
||||
reason := <<"duplicated topics: t/1">>,
|
||||
kind := validation_error,
|
||||
path := "message_validation.validations.1.topics"
|
||||
path := "schema_validation.validations.1.topics"
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
|
@ -256,7 +256,7 @@ duplicated_check_test_() ->
|
|||
#{
|
||||
reason := <<"duplicated topics: t/1">>,
|
||||
kind := validation_error,
|
||||
path := "message_validation.validations.1.topics"
|
||||
path := "schema_validation.validations.1.topics"
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
|
@ -273,7 +273,7 @@ duplicated_check_test_() ->
|
|||
#{
|
||||
reason := <<"duplicated topics: t/1, t/2">>,
|
||||
kind := validation_error,
|
||||
path := "message_validation.validations.1.topics"
|
||||
path := "schema_validation.validations.1.topics"
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
|
@ -320,7 +320,7 @@ duplicated_check_test_() ->
|
|||
#{
|
||||
reason := <<"duplicated schema checks: json:a">>,
|
||||
kind := validation_error,
|
||||
path := "message_validation.validations.1.checks"
|
||||
path := "schema_validation.validations.1.checks"
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
|
@ -336,7 +336,7 @@ duplicated_check_test_() ->
|
|||
#{
|
||||
reason := <<"duplicated schema checks: json:a">>,
|
||||
kind := validation_error,
|
||||
path := "message_validation.validations.1.checks"
|
||||
path := "schema_validation.validations.1.checks"
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
|
@ -353,7 +353,7 @@ duplicated_check_test_() ->
|
|||
#{
|
||||
reason := <<"duplicated schema checks: json:a">>,
|
||||
kind := validation_error,
|
||||
path := "message_validation.validations.1.checks"
|
||||
path := "schema_validation.validations.1.checks"
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
|
@ -370,7 +370,7 @@ duplicated_check_test_() ->
|
|||
#{
|
||||
reason := <<"duplicated schema checks: json:a">>,
|
||||
kind := validation_error,
|
||||
path := "message_validation.validations.1.checks"
|
||||
path := "schema_validation.validations.1.checks"
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
|
@ -387,7 +387,7 @@ duplicated_check_test_() ->
|
|||
#{
|
||||
reason := <<"duplicated schema checks: ", _/binary>>,
|
||||
kind := validation_error,
|
||||
path := "message_validation.validations.1.checks"
|
||||
path := "schema_validation.validations.1.checks"
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
2
mix.exs
2
mix.exs
|
@ -189,7 +189,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
:emqx_s3,
|
||||
:emqx_bridge_s3,
|
||||
:emqx_schema_registry,
|
||||
:emqx_message_validation,
|
||||
:emqx_schema_validation,
|
||||
:emqx_enterprise,
|
||||
:emqx_bridge_kinesis,
|
||||
:emqx_bridge_azure_event_hub,
|
||||
|
|
|
@ -116,7 +116,7 @@ is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false;
|
|||
is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false;
|
||||
is_community_umbrella_app("apps/emqx_gateway_jt808") -> false;
|
||||
is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false;
|
||||
is_community_umbrella_app("apps/emqx_message_validation") -> false;
|
||||
is_community_umbrella_app("apps/emqx_schema_validation") -> false;
|
||||
is_community_umbrella_app("apps/emqx_eviction_agent") -> false;
|
||||
is_community_umbrella_app("apps/emqx_node_rebalance") -> false;
|
||||
is_community_umbrella_app(_) -> true.
|
||||
|
|
|
@ -25,9 +25,9 @@ get_prom_data_integration_data.desc:
|
|||
get_prom_data_integration_data.label:
|
||||
"""Prometheus Metrics for Data Integration"""
|
||||
|
||||
get_prom_message_validation.desc:
|
||||
"""Get Prometheus Metrics for Message Validation"""
|
||||
get_prom_message_validation.label:
|
||||
"""Prometheus Metrics for Message Validation"""
|
||||
get_prom_schema_validation.desc:
|
||||
"""Get Prometheus Metrics for Schema Validation"""
|
||||
get_prom_schema_validation.label:
|
||||
"""Prometheus Metrics for Schema Validation"""
|
||||
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
emqx_message_validation_http_api {
|
||||
emqx_schema_validation_http_api {
|
||||
|
||||
list_validations.desc:
|
||||
"""List validations"""
|
|
@ -1,4 +1,4 @@
|
|||
emqx_message_validation_schema {
|
||||
emqx_schema_validation_schema {
|
||||
|
||||
check_avro_type.desc:
|
||||
"""Avro schema check"""
|
Loading…
Reference in New Issue