Merge pull request #12749 from thalesmg/mv-followup1-m-20240320

Follow up features and fixes for message validation - part 1
This commit is contained in:
Thales Macedo Garitezi 2024-03-21 16:03:49 -03:00 committed by GitHub
commit e17f663fa5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 609 additions and 72 deletions

View File

@ -59,6 +59,7 @@
'message.publish', 'message.publish',
'message.puback', 'message.puback',
'message.dropped', 'message.dropped',
'message.validation_failed',
'message.delivered', 'message.delivered',
'message.acked', 'message.acked',
'delivery.dropped', 'delivery.dropped',
@ -182,6 +183,9 @@ when
-callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) -> -callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) ->
callback_result(). callback_result().
-callback 'message.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) ->
callback_result().
-callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when -callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when
Msg :: emqx_types:message(). Msg :: emqx_types:message().

View File

@ -15,6 +15,7 @@
-define(EE_SCHEMA_MODULES, [ -define(EE_SCHEMA_MODULES, [
emqx_license_schema, emqx_license_schema,
emqx_schema_registry_schema, emqx_schema_registry_schema,
emqx_message_validation_schema,
emqx_ft_schema emqx_ft_schema
]). ]).

View File

@ -3,6 +3,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_message_validation). -module(emqx_message_validation).
-include_lib("snabbkaffe/include/trace.hrl").
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
@ -214,10 +215,7 @@ evaluate_sql_check(Check, Validation, Message) ->
fields := Fields, fields := Fields,
conditions := Conditions conditions := Conditions
} = Check, } = Check,
#{ #{name := Name} = Validation,
name := Name,
log_failure := #{level := FailureLogLevel}
} = Validation,
{Data, _} = emqx_rule_events:eventmsg_publish(Message), {Data, _} = emqx_rule_events:eventmsg_publish(Message),
try emqx_rule_runtime:evaluate_select(Fields, Data, Conditions) of try emqx_rule_runtime:evaluate_select(Fields, Data, Conditions) of
{ok, _} -> {ok, _} ->
@ -226,37 +224,24 @@ evaluate_sql_check(Check, Validation, Message) ->
false false
catch catch
throw:Reason -> throw:Reason ->
?TRACE( trace_failure(Validation, "validation_sql_check_throw", #{
FailureLogLevel, validation => Name,
?TRACE_TAG, reason => Reason
"validation_sql_check_throw", }),
#{
validation => Name,
reason => Reason
}
),
false; false;
Class:Error:Stacktrace -> Class:Error:Stacktrace ->
?TRACE( trace_failure(Validation, "validation_sql_check_failure", #{
FailureLogLevel, validation => Name,
?TRACE_TAG, kind => Class,
"validation_sql_check_failure", reason => Error,
#{ stacktrace => Stacktrace
validation => Name, }),
kind => Class,
reason => Error,
stacktrace => Stacktrace
}
),
false false
end. end.
evaluate_schema_check(Check, Validation, #message{payload = Data}) -> evaluate_schema_check(Check, Validation, #message{payload = Data}) ->
#{schema := SerdeName} = Check, #{schema := SerdeName} = Check,
#{ #{name := Name} = Validation,
name := Name,
log_failure := #{level := FailureLogLevel}
} = Validation,
ExtraArgs = ExtraArgs =
case Check of case Check of
#{type := protobuf, message_name := MessageName} -> #{type := protobuf, message_name := MessageName} ->
@ -268,29 +253,19 @@ evaluate_schema_check(Check, Validation, #message{payload = Data}) ->
emqx_schema_registry_serde:schema_check(SerdeName, Data, ExtraArgs) emqx_schema_registry_serde:schema_check(SerdeName, Data, ExtraArgs)
catch catch
error:{serde_not_found, _} -> error:{serde_not_found, _} ->
?TRACE( trace_failure(Validation, "validation_schema_check_schema_not_found", #{
FailureLogLevel, validation => Name,
?TRACE_TAG, schema_name => SerdeName
"validation_schema_check_schema_not_found", }),
#{
validation => Name,
schema_name => SerdeName
}
),
false; false;
Class:Error:Stacktrace -> Class:Error:Stacktrace ->
?TRACE( trace_failure(Validation, "validation_schema_check_failure", #{
FailureLogLevel, validation => Name,
?TRACE_TAG, schema_name => SerdeName,
"validation_schema_check_failure", kind => Class,
#{ reason => Error,
validation => Name, stacktrace => Stacktrace
schema_name => SerdeName, }),
kind => Class,
reason => Error,
stacktrace => Stacktrace
}
),
false false
end. end.
@ -404,20 +379,23 @@ run_validations(Validations, Message) ->
try try
emqx_rule_runtime:clear_rule_payload(), emqx_rule_runtime:clear_rule_payload(),
Fun = fun(Validation, Acc) -> Fun = fun(Validation, Acc) ->
#{ #{name := Name} = Validation,
name := Name,
log_failure := #{level := FailureLogLevel}
} = Validation,
case run_validation(Validation, Message) of case run_validation(Validation, Message) of
ok -> ok ->
{cont, Acc}; {cont, Acc};
ignore ->
trace_failure(Validation, "validation_failed", #{
validation => Name,
action => ignore
}),
run_message_validation_failed_hook(Message, Validation),
{cont, Acc};
FailureAction -> FailureAction ->
?TRACE( trace_failure(Validation, "validation_failed", #{
FailureLogLevel, validation => Name,
?TRACE_TAG, action => FailureAction
"validation_failed", }),
#{validation => Name, action => FailureAction} run_message_validation_failed_hook(Message, Validation),
),
{halt, FailureAction} {halt, FailureAction}
end end
end, end,
@ -454,3 +432,23 @@ run_check(#{type := sql} = Check, Validation, Message) ->
evaluate_sql_check(Check, Validation, Message); evaluate_sql_check(Check, Validation, Message);
run_check(Check, Validation, Message) -> run_check(Check, Validation, Message) ->
evaluate_schema_check(Check, Validation, Message). evaluate_schema_check(Check, Validation, Message).
trace_failure(#{log_failure := #{level := none}} = Validation, _Msg, _Meta) ->
#{
name := _Name,
failure_action := _Action
} = Validation,
?tp(message_validation_failed, #{log_level => none, name => _Name, action => _Action}),
ok;
trace_failure(#{log_failure := #{level := Level}} = Validation, Msg, Meta) ->
#{
name := _Name,
failure_action := _Action
} = Validation,
?tp(message_validation_failed, #{log_level => Level, name => _Name, action => _Action}),
?TRACE(Level, ?TRACE_TAG, Msg, Meta).
run_message_validation_failed_hook(Message, Validation) ->
#{name := Name} = Validation,
ValidationContext = #{name => Name},
emqx_hooks:run('message.validation_failed', [Message, ValidationContext]).

View File

@ -23,7 +23,8 @@
-export([ -export([
'/message_validations'/2, '/message_validations'/2,
'/message_validations/reorder'/2, '/message_validations/reorder'/2,
'/message_validations/validation/:name'/2 '/message_validations/validation/:name'/2,
'/message_validations/validation/:name/enable/:enable'/2
]). ]).
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
@ -45,7 +46,8 @@ paths() ->
[ [
"/message_validations", "/message_validations",
"/message_validations/reorder", "/message_validations/reorder",
"/message_validations/validation/:name" "/message_validations/validation/:name",
"/message_validations/validation/:name/enable/:enable"
]. ].
schema("/message_validations") -> schema("/message_validations") ->
@ -170,6 +172,22 @@ schema("/message_validations/validation/:name") ->
404 => error_schema('NOT_FOUND', "Validation not found") 404 => error_schema('NOT_FOUND', "Validation not found")
} }
} }
};
schema("/message_validations/validation/:name/enable/:enable") ->
#{
'operationId' => '/message_validations/validation/:name/enable/:enable',
post => #{
tags => ?TAGS,
summary => <<"Enable or disable validation">>,
description => ?DESC("enable_disable_validation"),
parameters => [param_path_name(), param_path_enable()],
responses =>
#{
204 => <<"No content">>,
404 => error_schema('NOT_FOUND', "Validation not found"),
400 => error_schema('BAD_REQUEST', "Bad params")
}
}
}. }.
param_path_name() -> param_path_name() ->
@ -184,6 +202,17 @@ param_path_name() ->
} }
)}. )}.
param_path_enable() ->
{enable,
mk(
boolean(),
#{
in => path,
required => true,
desc => ?DESC("param_path_enable")
}
)}.
fields(front) -> fields(front) ->
[{position, mk(front, #{default => front, required => true, in => body})}]; [{position, mk(front, #{default => front, required => true, in => body})}];
fields(rear) -> fields(rear) ->
@ -261,6 +290,15 @@ fields(reorder) ->
'/message_validations/reorder'(post, #{body := #{<<"order">> := Order}}) -> '/message_validations/reorder'(post, #{body := #{<<"order">> := Order}}) ->
do_reorder(Order). do_reorder(Order).
'/message_validations/validation/:name/enable/:enable'(post, #{
bindings := #{name := Name, enable := Enable}
}) ->
with_validation(
Name,
fun(Validation) -> do_enable_disable(Validation, Enable) end,
not_found()
).
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% Internal fns %% Internal fns
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
@ -328,6 +366,15 @@ do_reorder(Order) ->
?BAD_REQUEST(Error) ?BAD_REQUEST(Error)
end. end.
do_enable_disable(Validation, Enable) ->
RawValidation = make_serializable(Validation),
case emqx_message_validation:update(RawValidation#{<<"enable">> => Enable}) of
{ok, _} ->
?NO_CONTENT;
{error, Reason} ->
?BAD_REQUEST(Reason)
end.
with_validation(Name, FoundFn, NotFoundFn) -> with_validation(Name, FoundFn, NotFoundFn) ->
case emqx_message_validation:lookup(Name) of case emqx_message_validation:lookup(Name) of
{ok, Validation} -> {ok, Validation} ->
@ -345,3 +392,20 @@ return(Response) ->
not_found() -> not_found() ->
return(?NOT_FOUND(<<"Validation not found">>)). return(?NOT_FOUND(<<"Validation not found">>)).
make_serializable(Validation) ->
Schema = emqx_message_validation_schema,
RawConfig = #{
<<"message_validation">> => #{
<<"validations">> =>
[emqx_utils_maps:binary_key_map(Validation)]
}
},
#{
<<"message_validation">> := #{
<<"validations">> :=
[Serialized]
}
} =
hocon_tconf:make_serializable(Schema, RawConfig, #{}),
Serialized.

View File

@ -153,24 +153,26 @@ do_reindex_positions(Validations) ->
do_insert(Pos, Validation) -> do_insert(Pos, Validation) ->
#{ #{
enable := Enabled,
name := Name, name := Name,
topics := Topics topics := Topics
} = Validation, } = Validation,
maybe_create_metrics(Name), maybe_create_metrics(Name),
do_insert_into_tab(Name, Validation, Pos), do_insert_into_tab(Name, Validation, Pos),
update_topic_index(Name, Pos, Topics), Enabled andalso update_topic_index(Name, Pos, Topics),
ok. ok.
do_update(OldValidation, Pos, NewValidation) -> do_update(OldValidation, Pos, NewValidation) ->
#{topics := OldTopics} = OldValidation, #{topics := OldTopics} = OldValidation,
#{ #{
enable := Enabled,
name := Name, name := Name,
topics := NewTopics topics := NewTopics
} = NewValidation, } = NewValidation,
maybe_create_metrics(Name), maybe_create_metrics(Name),
do_insert_into_tab(Name, NewValidation, Pos), do_insert_into_tab(Name, NewValidation, Pos),
delete_topic_index(Name, OldTopics), delete_topic_index(Name, OldTopics),
update_topic_index(Name, Pos, NewTopics), Enabled andalso update_topic_index(Name, Pos, NewTopics),
ok. ok.
do_delete(Validation) -> do_delete(Validation) ->

View File

@ -75,7 +75,7 @@ fields(validation) ->
)}, )},
{failure_action, {failure_action,
mk( mk(
hoconsc:enum([drop, disconnect]), hoconsc:enum([drop, disconnect, ignore]),
#{desc => ?DESC("failure_action"), required => true} #{desc => ?DESC("failure_action"), required => true}
)}, )},
{log_failure, {log_failure,
@ -91,12 +91,7 @@ fields(validation) ->
#{ #{
required => true, required => true,
desc => ?DESC("checks"), desc => ?DESC("checks"),
validator => fun validator => fun validate_unique_schema_checks/1
([]) ->
{error, "at least one check must be defined"};
(_) ->
ok
end
} }
)} )}
]; ];
@ -104,7 +99,7 @@ fields(log_failure) ->
[ [
{level, {level,
mk( mk(
hoconsc:enum([error, warning, notice, info, debug]), hoconsc:enum([error, warning, notice, info, debug, none]),
#{desc => ?DESC("log_failure_at"), default => info} #{desc => ?DESC("log_failure_at"), default => info}
)} )}
]; ];
@ -232,3 +227,45 @@ do_validate_unique_names([#{<<"name">> := Name} | _Rest], Acc) when is_map_key(N
{error, <<"duplicated name: ", Name/binary>>}; {error, <<"duplicated name: ", Name/binary>>};
do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) -> do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) ->
do_validate_unique_names(Rest, Acc#{Name => true}). do_validate_unique_names(Rest, Acc#{Name => true}).
validate_unique_schema_checks([]) ->
{error, "at least one check must be defined"};
validate_unique_schema_checks(Checks) ->
Seen = sets:new([{version, 2}]),
Duplicated = sets:new([{version, 2}]),
do_validate_unique_schema_checks(Checks, Seen, Duplicated).
do_validate_unique_schema_checks(_Checks = [], _Seen, Duplicated) ->
case sets:to_list(Duplicated) of
[] ->
ok;
DuplicatedChecks0 ->
DuplicatedChecks =
lists:map(
fun({Type, SerdeName}) ->
[atom_to_binary(Type), ":", SerdeName]
end,
DuplicatedChecks0
),
Msg = iolist_to_binary([
<<"duplicated schema checks: ">>,
lists:join(", ", DuplicatedChecks)
]),
{error, Msg}
end;
do_validate_unique_schema_checks(
[#{<<"type">> := Type, <<"schema">> := SerdeName} | Rest],
Seen0,
Duplicated0
) ->
Check = {Type, SerdeName},
case sets:is_element(Check, Seen0) of
true ->
Duplicated = sets:add_element(Check, Duplicated0),
do_validate_unique_schema_checks(Rest, Seen0, Duplicated);
false ->
Seen = sets:add_element(Check, Seen0),
do_validate_unique_schema_checks(Rest, Seen, Duplicated0)
end;
do_validate_unique_schema_checks([_Check | Rest], Seen, Duplicated) ->
do_validate_unique_schema_checks(Rest, Seen, Duplicated).

View File

@ -14,6 +14,8 @@
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
-define(RECORDED_EVENTS_TAB, recorded_actions).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% CT boilerplate %% CT boilerplate
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -50,6 +52,7 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, _Config) ->
clear_all_validations(), clear_all_validations(),
snabbkaffe:stop(),
emqx_common_test_helpers:call_janitor(), emqx_common_test_helpers:call_janitor(),
ok. ok.
@ -181,6 +184,18 @@ reorder(Order) ->
ct:pal("reorder result:\n ~p", [Res]), ct:pal("reorder result:\n ~p", [Res]),
simplify_result(Res). simplify_result(Res).
enable(Name) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), "validation", Name, "enable", "true"]),
Res = request(post, Path, _Params = []),
ct:pal("enable result:\n ~p", [Res]),
simplify_result(Res).
disable(Name) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), "validation", Name, "enable", "false"]),
Res = request(post, Path, _Params = []),
ct:pal("disable result:\n ~p", [Res]),
simplify_result(Res).
connect(ClientId) -> connect(ClientId) ->
connect(ClientId, _IsPersistent = false). connect(ClientId, _IsPersistent = false).
@ -315,6 +330,39 @@ assert_index_order(ExpectedOrder, Topic, Comment) ->
Comment Comment
). ).
create_failure_tracing_rule() ->
Params = #{
enable => true,
sql => <<"select * from \"$events/message_validation_failed\" ">>,
actions => [make_trace_fn_action()]
},
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
Res = request(post, Path, Params),
ct:pal("create failure tracing rule result:\n ~p", [Res]),
case Res of
{ok, {{_, 201, _}, _, #{<<"id">> := RuleId}}} ->
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
simplify_result(Res);
_ ->
simplify_result(Res)
end.
make_trace_fn_action() ->
persistent_term:put({?MODULE, test_pid}, self()),
Fn = <<(atom_to_binary(?MODULE))/binary, ":trace_rule">>,
emqx_utils_ets:new(?RECORDED_EVENTS_TAB, [named_table, public, ordered_set]),
#{function => Fn, args => #{}}.
trace_rule(Data, Envs, _Args) ->
Now = erlang:monotonic_time(),
ets:insert(?RECORDED_EVENTS_TAB, {Now, #{data => Data, envs => Envs}}),
TestPid = persistent_term:get({?MODULE, test_pid}),
TestPid ! {action, #{data => Data, envs => Envs}},
ok.
get_traced_failures_from_rule_engine() ->
ets:tab2list(?RECORDED_EVENTS_TAB).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -508,6 +556,168 @@ t_reorder(_Config) ->
ok. ok.
t_enable_disable_via_update(_Config) ->
Topic = <<"t">>,
Name1 = <<"foo">>,
AlwaysFailCheck = sql_check(<<"select * where false">>),
Validation1 = validation(Name1, [AlwaysFailCheck], #{<<"topics">> => Topic}),
{201, _} = insert(Validation1#{<<"enable">> => false}),
?assertIndexOrder([], Topic),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, Topic),
ok = publish(C, Topic, #{}),
?assertReceive({publish, _}),
{200, _} = update(Validation1#{<<"enable">> => true}),
?assertIndexOrder([Name1], Topic),
ok = publish(C, Topic, #{}),
?assertNotReceive({publish, _}),
{200, _} = update(Validation1#{<<"enable">> => false}),
?assertIndexOrder([], Topic),
ok = publish(C, Topic, #{}),
?assertReceive({publish, _}),
%% Test index after delete; ensure it's in the index before
{200, _} = update(Validation1#{<<"enable">> => true}),
?assertIndexOrder([Name1], Topic),
{204, _} = delete(Name1),
?assertIndexOrder([], Topic),
ok.
t_log_failure_none(_Config) ->
?check_trace(
begin
Name1 = <<"foo">>,
AlwaysFailCheck = sql_check(<<"select * where false">>),
Validation1 = validation(
Name1,
[AlwaysFailCheck],
#{<<"log_failure">> => #{<<"level">> => <<"none">>}}
),
{201, _} = insert(Validation1),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
ok = publish(C, <<"t/1">>, #{}),
?assertNotReceive({publish, _}),
ok
end,
fun(Trace) ->
?assertMatch([#{log_level := none}], ?of_kind(message_validation_failed, Trace)),
ok
end
),
ok.
t_action_ignore(_Config) ->
Name1 = <<"foo">>,
?check_trace(
begin
AlwaysFailCheck = sql_check(<<"select * where false">>),
Validation1 = validation(
Name1,
[AlwaysFailCheck],
#{<<"failure_action">> => <<"ignore">>}
),
{201, _} = insert(Validation1),
{201, _} = create_failure_tracing_rule(),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
ok = publish(C, <<"t/1">>, #{}),
?assertReceive({publish, _}),
ok
end,
fun(Trace) ->
?assertMatch([#{action := ignore}], ?of_kind(message_validation_failed, Trace)),
ok
end
),
?assertMatch(
[{_, #{data := #{validation := Name1, event := 'message.validation_failed'}}}],
get_traced_failures_from_rule_engine()
),
ok.
t_enable_disable_via_api_endpoint(_Config) ->
Topic = <<"t">>,
Name1 = <<"foo">>,
AlwaysFailCheck = sql_check(<<"select * where false">>),
Validation1 = validation(Name1, [AlwaysFailCheck], #{<<"topics">> => Topic}),
{201, _} = insert(Validation1),
?assertIndexOrder([Name1], Topic),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, Topic),
ok = publish(C, Topic, #{}),
?assertNotReceive({publish, _}),
%% already enabled
{204, _} = enable(Name1),
?assertIndexOrder([Name1], Topic),
?assertMatch({200, #{<<"enable">> := true}}, lookup(Name1)),
ok = publish(C, Topic, #{}),
?assertNotReceive({publish, _}),
{204, _} = disable(Name1),
?assertIndexOrder([], Topic),
?assertMatch({200, #{<<"enable">> := false}}, lookup(Name1)),
ok = publish(C, Topic, #{}),
?assertReceive({publish, _}),
%% already disabled
{204, _} = disable(Name1),
?assertIndexOrder([], Topic),
?assertMatch({200, #{<<"enable">> := false}}, lookup(Name1)),
ok = publish(C, Topic, #{}),
?assertReceive({publish, _}),
%% Re-enable
{204, _} = enable(Name1),
?assertIndexOrder([Name1], Topic),
?assertMatch({200, #{<<"enable">> := true}}, lookup(Name1)),
ok = publish(C, Topic, #{}),
?assertNotReceive({publish, _}),
ok.
t_duplicated_schema_checks(_Config) ->
Name1 = <<"foo">>,
SerdeName = <<"myserde">>,
Check = schema_check(json, SerdeName),
Validation1 = validation(Name1, [Check, sql_check(), Check]),
?assertMatch({400, _}, insert(Validation1)),
Validation2 = validation(Name1, [Check, sql_check()]),
?assertMatch({201, _}, insert(Validation2)),
?assertMatch({400, _}, update(Validation1)),
ok.
%% Check the `all_pass' strategy %% Check the `all_pass' strategy
t_all_pass(_Config) -> t_all_pass(_Config) ->
Name1 = <<"foo">>, Name1 = <<"foo">>,
@ -549,6 +759,8 @@ t_any_pass(_Config) ->
%% Checks that multiple validations are run in order. %% Checks that multiple validations are run in order.
t_multiple_validations(_Config) -> t_multiple_validations(_Config) ->
{201, _} = create_failure_tracing_rule(),
Name1 = <<"foo">>, Name1 = <<"foo">>,
Check1 = sql_check(<<"select payload.x as x, payload.y as y where x > 10 or y > 0">>), Check1 = sql_check(<<"select payload.x as x, payload.y as y where x > 10 or y > 0">>),
Validation1 = validation(Name1, [Check1], #{<<"failure_action">> => <<"drop">>}), Validation1 = validation(Name1, [Check1], #{<<"failure_action">> => <<"drop">>}),
@ -564,14 +776,24 @@ t_multiple_validations(_Config) ->
ok = publish(C, <<"t/1">>, #{x => 11, y => 1}), ok = publish(C, <<"t/1">>, #{x => 11, y => 1}),
?assertReceive({publish, _}), ?assertReceive({publish, _}),
%% Barred by `Name1'
ok = publish(C, <<"t/1">>, #{x => 7, y => 0}), ok = publish(C, <<"t/1">>, #{x => 7, y => 0}),
?assertNotReceive({publish, _}), ?assertNotReceive({publish, _}),
?assertNotReceive({disconnected, _, _}), ?assertNotReceive({disconnected, _, _}),
%% Barred by `Name2'
unlink(C), unlink(C),
ok = publish(C, <<"t/1">>, #{x => 0, y => 1}), ok = publish(C, <<"t/1">>, #{x => 0, y => 1}),
?assertNotReceive({publish, _}), ?assertNotReceive({publish, _}),
?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}), ?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}),
?assertMatch(
[
{_, #{data := #{validation := Name1, event := 'message.validation_failed'}}},
{_, #{data := #{validation := Name2, event := 'message.validation_failed'}}}
],
get_traced_failures_from_rule_engine()
),
ok. ok.
t_schema_check_non_existent_serde(_Config) -> t_schema_check_non_existent_serde(_Config) ->

View File

@ -52,6 +52,18 @@ sql_check(SQL) ->
<<"sql">> => SQL <<"sql">> => SQL
}. }.
schema_check(Type, SerdeName) ->
schema_check(Type, SerdeName, _Overrides = #{}).
schema_check(Type, SerdeName, Overrides) ->
emqx_utils_maps:deep_merge(
#{
<<"type">> => emqx_utils_conv:bin(Type),
<<"schema">> => SerdeName
},
Overrides
).
eval_sql(Message, SQL) -> eval_sql(Message, SQL) ->
{ok, Check} = emqx_message_validation:parse_sql_check(SQL), {ok, Check} = emqx_message_validation:parse_sql_check(SQL),
Validation = #{log_failure => #{level => warning}, name => <<"validation">>}, Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
@ -217,3 +229,114 @@ check_test_() ->
{"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))}, {"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))},
{"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))} {"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))}
]. ].
duplicated_check_test_() ->
[
{"duplicated sql checks are not checked",
?_assertMatch(
[#{<<"checks">> := [_, _]}],
parse_and_check([
validation(<<"foo">>, [sql_check(), sql_check()])
])
)},
{"different serdes with same name",
?_assertMatch(
[#{<<"checks">> := [_, _, _]}],
parse_and_check([
validation(<<"foo">>, [
schema_check(json, <<"a">>),
schema_check(avro, <<"a">>),
schema_check(
protobuf,
<<"a">>,
#{<<"message_name">> => <<"a">>}
)
])
])
)},
{"duplicated serdes 1",
?_assertThrow(
{_Schema, [
#{
reason := <<"duplicated schema checks: json:a">>,
kind := validation_error,
path := "message_validation.validations.1.checks"
}
]},
parse_and_check([
validation(<<"foo">>, [
schema_check(json, <<"a">>),
schema_check(json, <<"a">>)
])
])
)},
{"duplicated serdes 2",
?_assertThrow(
{_Schema, [
#{
reason := <<"duplicated schema checks: json:a">>,
kind := validation_error,
path := "message_validation.validations.1.checks"
}
]},
parse_and_check([
validation(<<"foo">>, [
schema_check(json, <<"a">>),
sql_check(),
schema_check(json, <<"a">>)
])
])
)},
{"duplicated serdes 3",
?_assertThrow(
{_Schema, [
#{
reason := <<"duplicated schema checks: json:a">>,
kind := validation_error,
path := "message_validation.validations.1.checks"
}
]},
parse_and_check([
validation(<<"foo">>, [
schema_check(json, <<"a">>),
schema_check(json, <<"a">>),
sql_check()
])
])
)},
{"duplicated serdes 4",
?_assertThrow(
{_Schema, [
#{
reason := <<"duplicated schema checks: json:a">>,
kind := validation_error,
path := "message_validation.validations.1.checks"
}
]},
parse_and_check([
validation(<<"foo">>, [
schema_check(json, <<"a">>),
schema_check(json, <<"a">>),
schema_check(json, <<"a">>)
])
])
)},
{"duplicated serdes 4",
?_assertThrow(
{_Schema, [
#{
reason := <<"duplicated schema checks: ", _/binary>>,
kind := validation_error,
path := "message_validation.validations.1.checks"
}
]},
parse_and_check([
validation(<<"foo">>, [
schema_check(json, <<"a">>),
schema_check(json, <<"a">>),
schema_check(avro, <<"b">>),
schema_check(avro, <<"b">>)
])
])
)}
].

View File

@ -44,6 +44,7 @@
on_session_unsubscribed/4, on_session_unsubscribed/4,
on_message_publish/2, on_message_publish/2,
on_message_dropped/4, on_message_dropped/4,
on_message_validation_failed/3,
on_message_delivered/3, on_message_delivered/3,
on_message_acked/3, on_message_acked/3,
on_delivery_dropped/4, on_delivery_dropped/4,
@ -78,6 +79,7 @@ event_names() ->
'message.delivered', 'message.delivered',
'message.acked', 'message.acked',
'message.dropped', 'message.dropped',
'message.validation_failed',
'delivery.dropped' 'delivery.dropped'
]. ].
@ -93,6 +95,7 @@ event_topics_enum() ->
'$events/message_delivered', '$events/message_delivered',
'$events/message_acked', '$events/message_acked',
'$events/message_dropped', '$events/message_dropped',
'$events/message_validation_failed',
'$events/delivery_dropped' '$events/delivery_dropped'
% '$events/message_publish' % not possible to use in SELECT FROM % '$events/message_publish' % not possible to use in SELECT FROM
]. ].
@ -221,6 +224,19 @@ on_message_dropped(Message, _, Reason, Conf) ->
end, end,
{ok, Message}. {ok, Message}.
on_message_validation_failed(Message, ValidationContext, Conf) ->
case ignore_sys_message(Message) of
true ->
ok;
false ->
apply_event(
'message.validation_failed',
fun() -> eventmsg_validation_failed(Message, ValidationContext) end,
Conf
)
end,
{ok, Message}.
on_message_delivered(ClientInfo, Message, Conf) -> on_message_delivered(ClientInfo, Message, Conf) ->
case ignore_sys_message(Message) of case ignore_sys_message(Message) of
true -> true ->
@ -477,6 +493,38 @@ eventmsg_dropped(
#{headers => Headers} #{headers => Headers}
). ).
eventmsg_validation_failed(
Message = #message{
id = Id,
from = ClientId,
qos = QoS,
flags = Flags,
topic = Topic,
headers = Headers,
payload = Payload,
timestamp = Timestamp
},
ValidationContext
) ->
#{name := ValidationName} = ValidationContext,
with_basic_columns(
'message.validation_failed',
#{
id => emqx_guid:to_hexstr(Id),
validation => ValidationName,
clientid => ClientId,
username => emqx_message:get_header(username, Message, undefined),
payload => Payload,
peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
topic => Topic,
qos => QoS,
flags => Flags,
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
},
#{headers => Headers}
).
eventmsg_delivered( eventmsg_delivered(
_ClientInfo = #{ _ClientInfo = #{
peerhost := PeerHost, peerhost := PeerHost,
@ -627,6 +675,7 @@ event_info() ->
event_info_message_deliver(), event_info_message_deliver(),
event_info_message_acked(), event_info_message_acked(),
event_info_message_dropped(), event_info_message_dropped(),
event_info_message_validation_failed(),
event_info_client_connected(), event_info_client_connected(),
event_info_client_disconnected(), event_info_client_disconnected(),
event_info_client_connack(), event_info_client_connack(),
@ -666,6 +715,13 @@ event_info_message_dropped() ->
<<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>}, <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>},
<<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">> <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">>
). ).
event_info_message_validation_failed() ->
event_info_common(
'message.validation_failed',
{<<"message validation failed">>, <<"TODO"/utf8>>},
{<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>},
<<"SELECT * FROM \"$events/message_validation_failed\" WHERE topic =~ 't/#'">>
).
event_info_delivery_dropped() -> event_info_delivery_dropped() ->
event_info_common( event_info_common(
'delivery.dropped', 'delivery.dropped',
@ -737,6 +793,9 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
test_columns('message.dropped') -> test_columns('message.dropped') ->
[{<<"reason">>, [<<"no_subscribers">>, <<"the reason of dropping">>]}] ++ [{<<"reason">>, [<<"no_subscribers">>, <<"the reason of dropping">>]}] ++
test_columns('message.publish'); test_columns('message.publish');
test_columns('message.validation_failed') ->
[{<<"validation">>, <<"myvalidation">>}] ++
test_columns('message.publish');
test_columns('message.publish') -> test_columns('message.publish') ->
[ [
{<<"clientid">>, [<<"c_emqx">>, <<"the clientid of the sender">>]}, {<<"clientid">>, [<<"c_emqx">>, <<"the clientid of the sender">>]},
@ -840,6 +899,23 @@ columns_with_exam('message.dropped') ->
{<<"timestamp">>, erlang:system_time(millisecond)}, {<<"timestamp">>, erlang:system_time(millisecond)},
{<<"node">>, node()} {<<"node">>, node()}
]; ];
columns_with_exam('message.validation_failed') ->
[
{<<"event">>, 'message.validation_failed'},
{<<"validation">>, <<"my_validation">>},
{<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())},
{<<"clientid">>, <<"c_emqx">>},
{<<"username">>, <<"u_emqx">>},
{<<"payload">>, <<"{\"msg\": \"hello\"}">>},
{<<"peerhost">>, <<"192.168.0.10">>},
{<<"topic">>, <<"t/a">>},
{<<"qos">>, 1},
{<<"flags">>, #{}},
{<<"publish_received_at">>, erlang:system_time(millisecond)},
columns_example_props(pub_props),
{<<"timestamp">>, erlang:system_time(millisecond)},
{<<"node">>, node()}
];
columns_with_exam('delivery.dropped') -> columns_with_exam('delivery.dropped') ->
[ [
{<<"event">>, 'delivery.dropped'}, {<<"event">>, 'delivery.dropped'},
@ -1030,6 +1106,7 @@ hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4;
hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3; hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3;
hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3; hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3;
hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4; hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4;
hook_fun('message.validation_failed') -> fun ?MODULE:on_message_validation_failed/3;
hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4; hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4;
hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2; hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2;
hook_fun(Event) -> error({invalid_event, Event}). hook_fun(Event) -> error({invalid_event, Event}).
@ -1054,6 +1131,7 @@ event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed';
event_name(<<"$events/message_delivered">>) -> 'message.delivered'; event_name(<<"$events/message_delivered">>) -> 'message.delivered';
event_name(<<"$events/message_acked">>) -> 'message.acked'; event_name(<<"$events/message_acked">>) -> 'message.acked';
event_name(<<"$events/message_dropped">>) -> 'message.dropped'; event_name(<<"$events/message_dropped">>) -> 'message.dropped';
event_name(<<"$events/message_validation_failed">>) -> 'message.validation_failed';
event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped'; event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped';
event_name(_) -> 'message.publish'. event_name(_) -> 'message.publish'.
@ -1067,6 +1145,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.delivered') -> <<"$events/message_delivered">>;
event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.acked') -> <<"$events/message_acked">>;
event_topic('message.dropped') -> <<"$events/message_dropped">>; event_topic('message.dropped') -> <<"$events/message_dropped">>;
event_topic('message.validation_failed') -> <<"$events/message_validation_failed">>;
event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
event_topic('message.publish') -> <<"$events/message_publish">>. event_topic('message.publish') -> <<"$events/message_publish">>.

View File

@ -18,7 +18,13 @@ emqx_message_validation_http_api {
reorder_validations.desc: reorder_validations.desc:
"""Reorder of all validations""" """Reorder of all validations"""
enable_disable_validation.desc:
"""Enable or disable a particular validation"""
param_path_name.desc: param_path_name.desc:
"""Validation name""" """Validation name"""
param_path_enable.desc:
"""Enable or disable validation"""
} }

View File

@ -71,7 +71,8 @@ emqx_message_validation_schema {
"""How to proceed if the validation fails. """How to proceed if the validation fails.
<code>drop</code>: The offending message is simply dropped without further processing. <code>drop</code>: The offending message is simply dropped without further processing.
<code>disconnect</code>: The message is not published, and the publishing client is disconnected.""" <code>disconnect</code>: The message is not published, and the publishing client is disconnected.
<code>ignore</code>: Only the failure is logged and traced. No other action is taken."""
failure_action.label: failure_action.label:
"""Failure action""" """Failure action"""