From 7aa287c6c1b9c3ff21a704715a6347e00eca5c44 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 20 Mar 2024 13:37:01 -0300 Subject: [PATCH 1/7] fix: add message validation schema to `emqx_enterprise_schema` --- apps/emqx_enterprise/src/emqx_enterprise_schema.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl index 316100b67..8d516795c 100644 --- a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl +++ b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl @@ -15,6 +15,7 @@ -define(EE_SCHEMA_MODULES, [ emqx_license_schema, emqx_schema_registry_schema, + emqx_message_validation_schema, emqx_ft_schema ]). From e767f01e0a5f9fa5334ffddedfb4667b7715f707 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 20 Mar 2024 13:57:12 -0300 Subject: [PATCH 2/7] fix(message_validation): take `enable` into account --- .../src/emqx_message_validation_registry.erl | 6 ++-- ...emqx_message_validation_http_api_SUITE.erl | 36 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/apps/emqx_message_validation/src/emqx_message_validation_registry.erl b/apps/emqx_message_validation/src/emqx_message_validation_registry.erl index 125429565..6a8e4a376 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_registry.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation_registry.erl @@ -153,24 +153,26 @@ do_reindex_positions(Validations) -> do_insert(Pos, Validation) -> #{ + enable := Enabled, name := Name, topics := Topics } = Validation, maybe_create_metrics(Name), do_insert_into_tab(Name, Validation, Pos), - update_topic_index(Name, Pos, Topics), + Enabled andalso update_topic_index(Name, Pos, Topics), ok. do_update(OldValidation, Pos, NewValidation) -> #{topics := OldTopics} = OldValidation, #{ + enable := Enabled, name := Name, topics := NewTopics } = NewValidation, maybe_create_metrics(Name), do_insert_into_tab(Name, NewValidation, Pos), delete_topic_index(Name, OldTopics), - update_topic_index(Name, Pos, NewTopics), + Enabled andalso update_topic_index(Name, Pos, NewTopics), ok. do_delete(Validation) -> diff --git a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl index 57c097005..d042ee817 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl +++ b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl @@ -508,6 +508,42 @@ t_reorder(_Config) -> ok. +t_enable_disable_via_update(_Config) -> + Topic = <<"t">>, + + Name1 = <<"foo">>, + AlwaysFailCheck = sql_check(<<"select * where false">>), + Validation1 = validation(Name1, [AlwaysFailCheck], #{<<"topics">> => Topic}), + + {201, _} = insert(Validation1#{<<"enable">> => false}), + ?assertIndexOrder([], Topic), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, Topic), + + ok = publish(C, Topic, #{}), + ?assertReceive({publish, _}), + + {200, _} = update(Validation1#{<<"enable">> => true}), + ?assertIndexOrder([Name1], Topic), + + ok = publish(C, Topic, #{}), + ?assertNotReceive({publish, _}), + + {200, _} = update(Validation1#{<<"enable">> => false}), + ?assertIndexOrder([], Topic), + + ok = publish(C, Topic, #{}), + ?assertReceive({publish, _}), + + %% Test index after delete; ensure it's in the index before + {200, _} = update(Validation1#{<<"enable">> => true}), + ?assertIndexOrder([Name1], Topic), + {204, _} = delete(Name1), + ?assertIndexOrder([], Topic), + + ok. + %% Check the `all_pass' strategy t_all_pass(_Config) -> Name1 = <<"foo">>, From 8753e3583f07910031319f5536c7359323d2aa6b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 20 Mar 2024 14:09:11 -0300 Subject: [PATCH 3/7] feat(message_validation): add `none` log level --- .../src/emqx_message_validation.erl | 103 ++++++++---------- .../src/emqx_message_validation_schema.erl | 2 +- ...emqx_message_validation_http_api_SUITE.erl | 34 ++++++ 3 files changed, 79 insertions(+), 60 deletions(-) diff --git a/apps/emqx_message_validation/src/emqx_message_validation.erl b/apps/emqx_message_validation/src/emqx_message_validation.erl index 6ef042405..8bf0b879e 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation.erl @@ -3,6 +3,7 @@ %%-------------------------------------------------------------------- -module(emqx_message_validation). +-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -214,10 +215,7 @@ evaluate_sql_check(Check, Validation, Message) -> fields := Fields, conditions := Conditions } = Check, - #{ - name := Name, - log_failure := #{level := FailureLogLevel} - } = Validation, + #{name := Name} = Validation, {Data, _} = emqx_rule_events:eventmsg_publish(Message), try emqx_rule_runtime:evaluate_select(Fields, Data, Conditions) of {ok, _} -> @@ -226,37 +224,24 @@ evaluate_sql_check(Check, Validation, Message) -> false catch throw:Reason -> - ?TRACE( - FailureLogLevel, - ?TRACE_TAG, - "validation_sql_check_throw", - #{ - validation => Name, - reason => Reason - } - ), + trace_failure(Validation, "validation_sql_check_throw", #{ + validation => Name, + reason => Reason + }), false; Class:Error:Stacktrace -> - ?TRACE( - FailureLogLevel, - ?TRACE_TAG, - "validation_sql_check_failure", - #{ - validation => Name, - kind => Class, - reason => Error, - stacktrace => Stacktrace - } - ), + trace_failure(Validation, "validation_sql_check_failure", #{ + validation => Name, + kind => Class, + reason => Error, + stacktrace => Stacktrace + }), false end. evaluate_schema_check(Check, Validation, #message{payload = Data}) -> #{schema := SerdeName} = Check, - #{ - name := Name, - log_failure := #{level := FailureLogLevel} - } = Validation, + #{name := Name} = Validation, ExtraArgs = case Check of #{type := protobuf, message_name := MessageName} -> @@ -268,29 +253,19 @@ evaluate_schema_check(Check, Validation, #message{payload = Data}) -> emqx_schema_registry_serde:schema_check(SerdeName, Data, ExtraArgs) catch error:{serde_not_found, _} -> - ?TRACE( - FailureLogLevel, - ?TRACE_TAG, - "validation_schema_check_schema_not_found", - #{ - validation => Name, - schema_name => SerdeName - } - ), + trace_failure(Validation, "validation_schema_check_schema_not_found", #{ + validation => Name, + schema_name => SerdeName + }), false; Class:Error:Stacktrace -> - ?TRACE( - FailureLogLevel, - ?TRACE_TAG, - "validation_schema_check_failure", - #{ - validation => Name, - schema_name => SerdeName, - kind => Class, - reason => Error, - stacktrace => Stacktrace - } - ), + trace_failure(Validation, "validation_schema_check_failure", #{ + validation => Name, + schema_name => SerdeName, + kind => Class, + reason => Error, + stacktrace => Stacktrace + }), false end. @@ -404,20 +379,15 @@ run_validations(Validations, Message) -> try emqx_rule_runtime:clear_rule_payload(), Fun = fun(Validation, Acc) -> - #{ - name := Name, - log_failure := #{level := FailureLogLevel} - } = Validation, + #{name := Name} = Validation, case run_validation(Validation, Message) of ok -> {cont, Acc}; FailureAction -> - ?TRACE( - FailureLogLevel, - ?TRACE_TAG, - "validation_failed", - #{validation => Name, action => FailureAction} - ), + trace_failure(Validation, "validation_failed", #{ + validation => Name, + action => FailureAction + }), {halt, FailureAction} end end, @@ -454,3 +424,18 @@ run_check(#{type := sql} = Check, Validation, Message) -> evaluate_sql_check(Check, Validation, Message); run_check(Check, Validation, Message) -> evaluate_schema_check(Check, Validation, Message). + +trace_failure(#{log_failure := #{level := none}} = Validation, _Msg, _Meta) -> + #{ + name := _Name, + failure_action := _Action + } = Validation, + ?tp(message_validation_failure, #{log_level => none, name => _Name, action => _Action}), + ok; +trace_failure(#{log_failure := #{level := Level}} = Validation, Msg, Meta) -> + #{ + name := _Name, + failure_action := _Action + } = Validation, + ?tp(message_validation_failure, #{log_level => Level, name => _Name, action => _Action}), + ?TRACE(Level, ?TRACE_TAG, Msg, Meta). diff --git a/apps/emqx_message_validation/src/emqx_message_validation_schema.erl b/apps/emqx_message_validation/src/emqx_message_validation_schema.erl index 6218814b0..cd1711e0a 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_schema.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation_schema.erl @@ -104,7 +104,7 @@ fields(log_failure) -> [ {level, mk( - hoconsc:enum([error, warning, notice, info, debug]), + hoconsc:enum([error, warning, notice, info, debug, none]), #{desc => ?DESC("log_failure_at"), default => info} )} ]; diff --git a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl index d042ee817..e3207320f 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl +++ b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl @@ -50,6 +50,7 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> clear_all_validations(), + snabbkaffe:stop(), emqx_common_test_helpers:call_janitor(), ok. @@ -542,6 +543,39 @@ t_enable_disable_via_update(_Config) -> {204, _} = delete(Name1), ?assertIndexOrder([], Topic), + ok. + +t_log_failure_none(_Config) -> + ?check_trace( + begin + Name1 = <<"foo">>, + AlwaysFailCheck = sql_check(<<"select * where false">>), + Validation1 = validation( + Name1, + [AlwaysFailCheck], + #{<<"log_failure">> => #{<<"level">> => <<"none">>}} + ), + + {201, _} = insert(Validation1), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + + ok = publish(C, <<"t/1">>, #{}), + ?assertNotReceive({publish, _}), + + ok + end, + fun(Trace) -> + ?assertMatch([#{log_level := none}], ?of_kind(message_validation_failure, Trace)), + ok + end + ), + ok. + + + + ok. %% Check the `all_pass' strategy From 4944cc080ef19c90811021a366f9079924e315cc Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 20 Mar 2024 14:31:43 -0300 Subject: [PATCH 4/7] feat(message_validation): add `ignore` failure action --- .../src/emqx_message_validation.erl | 6 +++++ .../src/emqx_message_validation_schema.erl | 2 +- ...emqx_message_validation_http_api_SUITE.erl | 22 +++++++++++++++++++ rel/i18n/emqx_message_validation_schema.hocon | 3 ++- 4 files changed, 31 insertions(+), 2 deletions(-) diff --git a/apps/emqx_message_validation/src/emqx_message_validation.erl b/apps/emqx_message_validation/src/emqx_message_validation.erl index 8bf0b879e..74de48eac 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation.erl @@ -383,6 +383,12 @@ run_validations(Validations, Message) -> case run_validation(Validation, Message) of ok -> {cont, Acc}; + ignore -> + trace_failure(Validation, "validation_failed", #{ + validation => Name, + action => ignore + }), + {cont, Acc}; FailureAction -> trace_failure(Validation, "validation_failed", #{ validation => Name, diff --git a/apps/emqx_message_validation/src/emqx_message_validation_schema.erl b/apps/emqx_message_validation/src/emqx_message_validation_schema.erl index cd1711e0a..618b73481 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_schema.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation_schema.erl @@ -75,7 +75,7 @@ fields(validation) -> )}, {failure_action, mk( - hoconsc:enum([drop, disconnect]), + hoconsc:enum([drop, disconnect, ignore]), #{desc => ?DESC("failure_action"), required => true} )}, {log_failure, diff --git a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl index e3207320f..141fc74af 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl +++ b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl @@ -573,9 +573,31 @@ t_log_failure_none(_Config) -> ), ok. +t_action_ignore(_Config) -> + ?check_trace( + begin + Name1 = <<"foo">>, + AlwaysFailCheck = sql_check(<<"select * where false">>), + Validation1 = validation( + Name1, + [AlwaysFailCheck], + #{<<"failure_action">> => <<"ignore">>} + ), + {201, _} = insert(Validation1), + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + ok = publish(C, <<"t/1">>, #{}), + ?assertReceive({publish, _}), + ok + end, + fun(Trace) -> + ?assertMatch([#{action := ignore}], ?of_kind(message_validation_failure, Trace)), + ok + end + ), ok. %% Check the `all_pass' strategy diff --git a/rel/i18n/emqx_message_validation_schema.hocon b/rel/i18n/emqx_message_validation_schema.hocon index be746368a..670a16b80 100644 --- a/rel/i18n/emqx_message_validation_schema.hocon +++ b/rel/i18n/emqx_message_validation_schema.hocon @@ -71,7 +71,8 @@ emqx_message_validation_schema { """How to proceed if the validation fails. drop: The offending message is simply dropped without further processing. - disconnect: The message is not published, and the publishing client is disconnected.""" + disconnect: The message is not published, and the publishing client is disconnected. + ignore: Only the failure is logged and traced. No other action is taken.""" failure_action.label: """Failure action""" From b8cd1c90203f8370ad510e4176962b8b46fe817e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 20 Mar 2024 14:59:32 -0300 Subject: [PATCH 5/7] feat(message validation api): add enable/disable HTTP API --- .../src/emqx_message_validation_http_api.erl | 68 ++++++++++++++++++- ...emqx_message_validation_http_api_SUITE.erl | 61 +++++++++++++++++ .../emqx_message_validation_http_api.hocon | 6 ++ 3 files changed, 133 insertions(+), 2 deletions(-) diff --git a/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl b/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl index b024ca09e..272abbbcf 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl @@ -23,7 +23,8 @@ -export([ '/message_validations'/2, '/message_validations/reorder'/2, - '/message_validations/validation/:name'/2 + '/message_validations/validation/:name'/2, + '/message_validations/validation/:name/enable/:enable'/2 ]). %%------------------------------------------------------------------------------------------------- @@ -45,7 +46,8 @@ paths() -> [ "/message_validations", "/message_validations/reorder", - "/message_validations/validation/:name" + "/message_validations/validation/:name", + "/message_validations/validation/:name/enable/:enable" ]. schema("/message_validations") -> @@ -170,6 +172,22 @@ schema("/message_validations/validation/:name") -> 404 => error_schema('NOT_FOUND', "Validation not found") } } + }; +schema("/message_validations/validation/:name/enable/:enable") -> + #{ + 'operationId' => '/message_validations/validation/:name/enable/:enable', + post => #{ + tags => ?TAGS, + summary => <<"Enable or disable validation">>, + description => ?DESC("enable_disable_validation"), + parameters => [param_path_name(), param_path_enable()], + responses => + #{ + 204 => <<"No content">>, + 404 => error_schema('NOT_FOUND', "Validation not found"), + 400 => error_schema('BAD_REQUEST', "Bad params") + } + } }. param_path_name() -> @@ -184,6 +202,17 @@ param_path_name() -> } )}. +param_path_enable() -> + {enable, + mk( + boolean(), + #{ + in => path, + required => true, + desc => ?DESC("param_path_enable") + } + )}. + fields(front) -> [{position, mk(front, #{default => front, required => true, in => body})}]; fields(rear) -> @@ -261,6 +290,15 @@ fields(reorder) -> '/message_validations/reorder'(post, #{body := #{<<"order">> := Order}}) -> do_reorder(Order). +'/message_validations/validation/:name/enable/:enable'(post, #{ + bindings := #{name := Name, enable := Enable} +}) -> + with_validation( + Name, + fun(Validation) -> do_enable_disable(Validation, Enable) end, + not_found() + ). + %%------------------------------------------------------------------------------------------------- %% Internal fns %%------------------------------------------------------------------------------------------------- @@ -328,6 +366,15 @@ do_reorder(Order) -> ?BAD_REQUEST(Error) end. +do_enable_disable(Validation, Enable) -> + RawValidation = make_serializable(Validation), + case emqx_message_validation:update(RawValidation#{<<"enable">> => Enable}) of + {ok, _} -> + ?NO_CONTENT; + {error, Reason} -> + ?BAD_REQUEST(Reason) + end. + with_validation(Name, FoundFn, NotFoundFn) -> case emqx_message_validation:lookup(Name) of {ok, Validation} -> @@ -345,3 +392,20 @@ return(Response) -> not_found() -> return(?NOT_FOUND(<<"Validation not found">>)). + +make_serializable(Validation) -> + Schema = emqx_message_validation_schema, + RawConfig = #{ + <<"message_validation">> => #{ + <<"validations">> => + [emqx_utils_maps:binary_key_map(Validation)] + } + }, + #{ + <<"message_validation">> := #{ + <<"validations">> := + [Serialized] + } + } = + hocon_tconf:make_serializable(Schema, RawConfig, #{}), + Serialized. diff --git a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl index 141fc74af..8f052a147 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl +++ b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl @@ -182,6 +182,18 @@ reorder(Order) -> ct:pal("reorder result:\n ~p", [Res]), simplify_result(Res). +enable(Name) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), "validation", Name, "enable", "true"]), + Res = request(post, Path, _Params = []), + ct:pal("enable result:\n ~p", [Res]), + simplify_result(Res). + +disable(Name) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), "validation", Name, "enable", "false"]), + Res = request(post, Path, _Params = []), + ct:pal("disable result:\n ~p", [Res]), + simplify_result(Res). + connect(ClientId) -> connect(ClientId, _IsPersistent = false). @@ -600,6 +612,55 @@ t_action_ignore(_Config) -> ), ok. +t_enable_disable_via_api_endpoint(_Config) -> + Topic = <<"t">>, + + Name1 = <<"foo">>, + AlwaysFailCheck = sql_check(<<"select * where false">>), + Validation1 = validation(Name1, [AlwaysFailCheck], #{<<"topics">> => Topic}), + + {201, _} = insert(Validation1), + ?assertIndexOrder([Name1], Topic), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, Topic), + + ok = publish(C, Topic, #{}), + ?assertNotReceive({publish, _}), + + %% already enabled + {204, _} = enable(Name1), + ?assertIndexOrder([Name1], Topic), + ?assertMatch({200, #{<<"enable">> := true}}, lookup(Name1)), + + ok = publish(C, Topic, #{}), + ?assertNotReceive({publish, _}), + + {204, _} = disable(Name1), + ?assertIndexOrder([], Topic), + ?assertMatch({200, #{<<"enable">> := false}}, lookup(Name1)), + + ok = publish(C, Topic, #{}), + ?assertReceive({publish, _}), + + %% already disabled + {204, _} = disable(Name1), + ?assertIndexOrder([], Topic), + ?assertMatch({200, #{<<"enable">> := false}}, lookup(Name1)), + + ok = publish(C, Topic, #{}), + ?assertReceive({publish, _}), + + %% Re-enable + {204, _} = enable(Name1), + ?assertIndexOrder([Name1], Topic), + ?assertMatch({200, #{<<"enable">> := true}}, lookup(Name1)), + + ok = publish(C, Topic, #{}), + ?assertNotReceive({publish, _}), + + ok. + %% Check the `all_pass' strategy t_all_pass(_Config) -> Name1 = <<"foo">>, diff --git a/rel/i18n/emqx_message_validation_http_api.hocon b/rel/i18n/emqx_message_validation_http_api.hocon index 6e549fe31..69a8ed78d 100644 --- a/rel/i18n/emqx_message_validation_http_api.hocon +++ b/rel/i18n/emqx_message_validation_http_api.hocon @@ -18,7 +18,13 @@ emqx_message_validation_http_api { reorder_validations.desc: """Reorder of all validations""" + enable_disable_validation.desc: + """Enable or disable a particular validation""" + param_path_name.desc: """Validation name""" + param_path_enable.desc: + """Enable or disable validation""" + } From 62030e89429b5a95d7e0ff37061db52ee83a120a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 20 Mar 2024 15:26:00 -0300 Subject: [PATCH 6/7] feat(message validation): forbid repeated schema checks Fixes https://emqx.atlassian.net/browse/EMQX-12054 --- .../src/emqx_message_validation_schema.erl | 49 ++++++- ...emqx_message_validation_http_api_SUITE.erl | 15 +++ .../test/emqx_message_validation_tests.erl | 123 ++++++++++++++++++ 3 files changed, 181 insertions(+), 6 deletions(-) diff --git a/apps/emqx_message_validation/src/emqx_message_validation_schema.erl b/apps/emqx_message_validation/src/emqx_message_validation_schema.erl index 618b73481..9a27915d8 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_schema.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation_schema.erl @@ -91,12 +91,7 @@ fields(validation) -> #{ required => true, desc => ?DESC("checks"), - validator => fun - ([]) -> - {error, "at least one check must be defined"}; - (_) -> - ok - end + validator => fun validate_unique_schema_checks/1 } )} ]; @@ -232,3 +227,45 @@ do_validate_unique_names([#{<<"name">> := Name} | _Rest], Acc) when is_map_key(N {error, <<"duplicated name: ", Name/binary>>}; do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) -> do_validate_unique_names(Rest, Acc#{Name => true}). + +validate_unique_schema_checks([]) -> + {error, "at least one check must be defined"}; +validate_unique_schema_checks(Checks) -> + Seen = sets:new([{version, 2}]), + Duplicated = sets:new([{version, 2}]), + do_validate_unique_schema_checks(Checks, Seen, Duplicated). + +do_validate_unique_schema_checks(_Checks = [], _Seen, Duplicated) -> + case sets:to_list(Duplicated) of + [] -> + ok; + DuplicatedChecks0 -> + DuplicatedChecks = + lists:map( + fun({Type, SerdeName}) -> + [atom_to_binary(Type), ":", SerdeName] + end, + DuplicatedChecks0 + ), + Msg = iolist_to_binary([ + <<"duplicated schema checks: ">>, + lists:join(", ", DuplicatedChecks) + ]), + {error, Msg} + end; +do_validate_unique_schema_checks( + [#{<<"type">> := Type, <<"schema">> := SerdeName} | Rest], + Seen0, + Duplicated0 +) -> + Check = {Type, SerdeName}, + case sets:is_element(Check, Seen0) of + true -> + Duplicated = sets:add_element(Check, Duplicated0), + do_validate_unique_schema_checks(Rest, Seen0, Duplicated); + false -> + Seen = sets:add_element(Check, Seen0), + do_validate_unique_schema_checks(Rest, Seen, Duplicated0) + end; +do_validate_unique_schema_checks([_Check | Rest], Seen, Duplicated) -> + do_validate_unique_schema_checks(Rest, Seen, Duplicated). diff --git a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl index 8f052a147..9e22900fb 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl +++ b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl @@ -661,6 +661,21 @@ t_enable_disable_via_api_endpoint(_Config) -> ok. +t_duplicated_schema_checks(_Config) -> + Name1 = <<"foo">>, + SerdeName = <<"myserde">>, + Check = schema_check(json, SerdeName), + + Validation1 = validation(Name1, [Check, sql_check(), Check]), + ?assertMatch({400, _}, insert(Validation1)), + + Validation2 = validation(Name1, [Check, sql_check()]), + ?assertMatch({201, _}, insert(Validation2)), + + ?assertMatch({400, _}, update(Validation1)), + + ok. + %% Check the `all_pass' strategy t_all_pass(_Config) -> Name1 = <<"foo">>, diff --git a/apps/emqx_message_validation/test/emqx_message_validation_tests.erl b/apps/emqx_message_validation/test/emqx_message_validation_tests.erl index 5c0d7d1d8..c344f6202 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_tests.erl +++ b/apps/emqx_message_validation/test/emqx_message_validation_tests.erl @@ -52,6 +52,18 @@ sql_check(SQL) -> <<"sql">> => SQL }. +schema_check(Type, SerdeName) -> + schema_check(Type, SerdeName, _Overrides = #{}). + +schema_check(Type, SerdeName, Overrides) -> + emqx_utils_maps:deep_merge( + #{ + <<"type">> => emqx_utils_conv:bin(Type), + <<"schema">> => SerdeName + }, + Overrides + ). + eval_sql(Message, SQL) -> {ok, Check} = emqx_message_validation:parse_sql_check(SQL), Validation = #{log_failure => #{level => warning}, name => <<"validation">>}, @@ -217,3 +229,114 @@ check_test_() -> {"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))}, {"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))} ]. + +duplicated_check_test_() -> + [ + {"duplicated sql checks are not checked", + ?_assertMatch( + [#{<<"checks">> := [_, _]}], + parse_and_check([ + validation(<<"foo">>, [sql_check(), sql_check()]) + ]) + )}, + {"different serdes with same name", + ?_assertMatch( + [#{<<"checks">> := [_, _, _]}], + parse_and_check([ + validation(<<"foo">>, [ + schema_check(json, <<"a">>), + schema_check(avro, <<"a">>), + schema_check( + protobuf, + <<"a">>, + #{<<"message_name">> => <<"a">>} + ) + ]) + ]) + )}, + {"duplicated serdes 1", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"duplicated schema checks: json:a">>, + kind := validation_error, + path := "message_validation.validations.1.checks" + } + ]}, + parse_and_check([ + validation(<<"foo">>, [ + schema_check(json, <<"a">>), + schema_check(json, <<"a">>) + ]) + ]) + )}, + {"duplicated serdes 2", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"duplicated schema checks: json:a">>, + kind := validation_error, + path := "message_validation.validations.1.checks" + } + ]}, + parse_and_check([ + validation(<<"foo">>, [ + schema_check(json, <<"a">>), + sql_check(), + schema_check(json, <<"a">>) + ]) + ]) + )}, + {"duplicated serdes 3", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"duplicated schema checks: json:a">>, + kind := validation_error, + path := "message_validation.validations.1.checks" + } + ]}, + parse_and_check([ + validation(<<"foo">>, [ + schema_check(json, <<"a">>), + schema_check(json, <<"a">>), + sql_check() + ]) + ]) + )}, + {"duplicated serdes 4", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"duplicated schema checks: json:a">>, + kind := validation_error, + path := "message_validation.validations.1.checks" + } + ]}, + parse_and_check([ + validation(<<"foo">>, [ + schema_check(json, <<"a">>), + schema_check(json, <<"a">>), + schema_check(json, <<"a">>) + ]) + ]) + )}, + {"duplicated serdes 4", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"duplicated schema checks: ", _/binary>>, + kind := validation_error, + path := "message_validation.validations.1.checks" + } + ]}, + parse_and_check([ + validation(<<"foo">>, [ + schema_check(json, <<"a">>), + schema_check(json, <<"a">>), + schema_check(avro, <<"b">>), + schema_check(avro, <<"b">>) + ]) + ]) + )} + ]. From 00aa7b5621cd3ad63e1a98867107230404148d08 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 20 Mar 2024 17:27:19 -0300 Subject: [PATCH 7/7] feat: create new `message.validation_failed` hookpoint and rule engine event --- apps/emqx/src/emqx_hookpoints.erl | 4 + .../src/emqx_message_validation.erl | 11 ++- ...emqx_message_validation_http_api_SUITE.erl | 60 +++++++++++++- .../emqx_rule_engine/src/emqx_rule_events.erl | 79 +++++++++++++++++++ 4 files changed, 149 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_hookpoints.erl b/apps/emqx/src/emqx_hookpoints.erl index 70cf4b41a..1f7b6481b 100644 --- a/apps/emqx/src/emqx_hookpoints.erl +++ b/apps/emqx/src/emqx_hookpoints.erl @@ -59,6 +59,7 @@ 'message.publish', 'message.puback', 'message.dropped', + 'message.validation_failed', 'message.delivered', 'message.acked', 'delivery.dropped', @@ -182,6 +183,9 @@ when -callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) -> callback_result(). +-callback 'message.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) -> + callback_result(). + -callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when Msg :: emqx_types:message(). diff --git a/apps/emqx_message_validation/src/emqx_message_validation.erl b/apps/emqx_message_validation/src/emqx_message_validation.erl index 74de48eac..f0fdb6dee 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation.erl @@ -388,12 +388,14 @@ run_validations(Validations, Message) -> validation => Name, action => ignore }), + run_message_validation_failed_hook(Message, Validation), {cont, Acc}; FailureAction -> trace_failure(Validation, "validation_failed", #{ validation => Name, action => FailureAction }), + run_message_validation_failed_hook(Message, Validation), {halt, FailureAction} end end, @@ -436,12 +438,17 @@ trace_failure(#{log_failure := #{level := none}} = Validation, _Msg, _Meta) -> name := _Name, failure_action := _Action } = Validation, - ?tp(message_validation_failure, #{log_level => none, name => _Name, action => _Action}), + ?tp(message_validation_failed, #{log_level => none, name => _Name, action => _Action}), ok; trace_failure(#{log_failure := #{level := Level}} = Validation, Msg, Meta) -> #{ name := _Name, failure_action := _Action } = Validation, - ?tp(message_validation_failure, #{log_level => Level, name => _Name, action => _Action}), + ?tp(message_validation_failed, #{log_level => Level, name => _Name, action => _Action}), ?TRACE(Level, ?TRACE_TAG, Msg, Meta). + +run_message_validation_failed_hook(Message, Validation) -> + #{name := Name} = Validation, + ValidationContext = #{name => Name}, + emqx_hooks:run('message.validation_failed', [Message, ValidationContext]). diff --git a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl index 9e22900fb..b43c8ce35 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl +++ b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl @@ -14,6 +14,8 @@ -import(emqx_common_test_helpers, [on_exit/1]). +-define(RECORDED_EVENTS_TAB, recorded_actions). + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -328,6 +330,39 @@ assert_index_order(ExpectedOrder, Topic, Comment) -> Comment ). +create_failure_tracing_rule() -> + Params = #{ + enable => true, + sql => <<"select * from \"$events/message_validation_failed\" ">>, + actions => [make_trace_fn_action()] + }, + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + Res = request(post, Path, Params), + ct:pal("create failure tracing rule result:\n ~p", [Res]), + case Res of + {ok, {{_, 201, _}, _, #{<<"id">> := RuleId}}} -> + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + simplify_result(Res); + _ -> + simplify_result(Res) + end. + +make_trace_fn_action() -> + persistent_term:put({?MODULE, test_pid}, self()), + Fn = <<(atom_to_binary(?MODULE))/binary, ":trace_rule">>, + emqx_utils_ets:new(?RECORDED_EVENTS_TAB, [named_table, public, ordered_set]), + #{function => Fn, args => #{}}. + +trace_rule(Data, Envs, _Args) -> + Now = erlang:monotonic_time(), + ets:insert(?RECORDED_EVENTS_TAB, {Now, #{data => Data, envs => Envs}}), + TestPid = persistent_term:get({?MODULE, test_pid}), + TestPid ! {action, #{data => Data, envs => Envs}}, + ok. + +get_traced_failures_from_rule_engine() -> + ets:tab2list(?RECORDED_EVENTS_TAB). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -579,16 +614,16 @@ t_log_failure_none(_Config) -> ok end, fun(Trace) -> - ?assertMatch([#{log_level := none}], ?of_kind(message_validation_failure, Trace)), + ?assertMatch([#{log_level := none}], ?of_kind(message_validation_failed, Trace)), ok end ), ok. t_action_ignore(_Config) -> + Name1 = <<"foo">>, ?check_trace( begin - Name1 = <<"foo">>, AlwaysFailCheck = sql_check(<<"select * where false">>), Validation1 = validation( Name1, @@ -598,18 +633,25 @@ t_action_ignore(_Config) -> {201, _} = insert(Validation1), + {201, _} = create_failure_tracing_rule(), + C = connect(<<"c1">>), {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), ok = publish(C, <<"t/1">>, #{}), ?assertReceive({publish, _}), + ok end, fun(Trace) -> - ?assertMatch([#{action := ignore}], ?of_kind(message_validation_failure, Trace)), + ?assertMatch([#{action := ignore}], ?of_kind(message_validation_failed, Trace)), ok end ), + ?assertMatch( + [{_, #{data := #{validation := Name1, event := 'message.validation_failed'}}}], + get_traced_failures_from_rule_engine() + ), ok. t_enable_disable_via_api_endpoint(_Config) -> @@ -717,6 +759,8 @@ t_any_pass(_Config) -> %% Checks that multiple validations are run in order. t_multiple_validations(_Config) -> + {201, _} = create_failure_tracing_rule(), + Name1 = <<"foo">>, Check1 = sql_check(<<"select payload.x as x, payload.y as y where x > 10 or y > 0">>), Validation1 = validation(Name1, [Check1], #{<<"failure_action">> => <<"drop">>}), @@ -732,14 +776,24 @@ t_multiple_validations(_Config) -> ok = publish(C, <<"t/1">>, #{x => 11, y => 1}), ?assertReceive({publish, _}), + %% Barred by `Name1' ok = publish(C, <<"t/1">>, #{x => 7, y => 0}), ?assertNotReceive({publish, _}), ?assertNotReceive({disconnected, _, _}), + %% Barred by `Name2' unlink(C), ok = publish(C, <<"t/1">>, #{x => 0, y => 1}), ?assertNotReceive({publish, _}), ?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}), + ?assertMatch( + [ + {_, #{data := #{validation := Name1, event := 'message.validation_failed'}}}, + {_, #{data := #{validation := Name2, event := 'message.validation_failed'}}} + ], + get_traced_failures_from_rule_engine() + ), + ok. t_schema_check_non_existent_serde(_Config) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index df5b2af3c..90bede778 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -44,6 +44,7 @@ on_session_unsubscribed/4, on_message_publish/2, on_message_dropped/4, + on_message_validation_failed/3, on_message_delivered/3, on_message_acked/3, on_delivery_dropped/4, @@ -78,6 +79,7 @@ event_names() -> 'message.delivered', 'message.acked', 'message.dropped', + 'message.validation_failed', 'delivery.dropped' ]. @@ -93,6 +95,7 @@ event_topics_enum() -> '$events/message_delivered', '$events/message_acked', '$events/message_dropped', + '$events/message_validation_failed', '$events/delivery_dropped' % '$events/message_publish' % not possible to use in SELECT FROM ]. @@ -221,6 +224,19 @@ on_message_dropped(Message, _, Reason, Conf) -> end, {ok, Message}. +on_message_validation_failed(Message, ValidationContext, Conf) -> + case ignore_sys_message(Message) of + true -> + ok; + false -> + apply_event( + 'message.validation_failed', + fun() -> eventmsg_validation_failed(Message, ValidationContext) end, + Conf + ) + end, + {ok, Message}. + on_message_delivered(ClientInfo, Message, Conf) -> case ignore_sys_message(Message) of true -> @@ -477,6 +493,38 @@ eventmsg_dropped( #{headers => Headers} ). +eventmsg_validation_failed( + Message = #message{ + id = Id, + from = ClientId, + qos = QoS, + flags = Flags, + topic = Topic, + headers = Headers, + payload = Payload, + timestamp = Timestamp + }, + ValidationContext +) -> + #{name := ValidationName} = ValidationContext, + with_basic_columns( + 'message.validation_failed', + #{ + id => emqx_guid:to_hexstr(Id), + validation => ValidationName, + clientid => ClientId, + username => emqx_message:get_header(username, Message, undefined), + payload => Payload, + peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)), + topic => Topic, + qos => QoS, + flags => Flags, + pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), + publish_received_at => Timestamp + }, + #{headers => Headers} + ). + eventmsg_delivered( _ClientInfo = #{ peerhost := PeerHost, @@ -627,6 +675,7 @@ event_info() -> event_info_message_deliver(), event_info_message_acked(), event_info_message_dropped(), + event_info_message_validation_failed(), event_info_client_connected(), event_info_client_disconnected(), event_info_client_connack(), @@ -666,6 +715,13 @@ event_info_message_dropped() -> <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>}, <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">> ). +event_info_message_validation_failed() -> + event_info_common( + 'message.validation_failed', + {<<"message validation failed">>, <<"TODO"/utf8>>}, + {<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>}, + <<"SELECT * FROM \"$events/message_validation_failed\" WHERE topic =~ 't/#'">> + ). event_info_delivery_dropped() -> event_info_common( 'delivery.dropped', @@ -737,6 +793,9 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> test_columns('message.dropped') -> [{<<"reason">>, [<<"no_subscribers">>, <<"the reason of dropping">>]}] ++ test_columns('message.publish'); +test_columns('message.validation_failed') -> + [{<<"validation">>, <<"myvalidation">>}] ++ + test_columns('message.publish'); test_columns('message.publish') -> [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid of the sender">>]}, @@ -840,6 +899,23 @@ columns_with_exam('message.dropped') -> {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; +columns_with_exam('message.validation_failed') -> + [ + {<<"event">>, 'message.validation_failed'}, + {<<"validation">>, <<"my_validation">>}, + {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}, + {<<"clientid">>, <<"c_emqx">>}, + {<<"username">>, <<"u_emqx">>}, + {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, + {<<"peerhost">>, <<"192.168.0.10">>}, + {<<"topic">>, <<"t/a">>}, + {<<"qos">>, 1}, + {<<"flags">>, #{}}, + {<<"publish_received_at">>, erlang:system_time(millisecond)}, + columns_example_props(pub_props), + {<<"timestamp">>, erlang:system_time(millisecond)}, + {<<"node">>, node()} + ]; columns_with_exam('delivery.dropped') -> [ {<<"event">>, 'delivery.dropped'}, @@ -1030,6 +1106,7 @@ hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4; hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3; hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3; hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4; +hook_fun('message.validation_failed') -> fun ?MODULE:on_message_validation_failed/3; hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4; hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2; hook_fun(Event) -> error({invalid_event, Event}). @@ -1054,6 +1131,7 @@ event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed'; event_name(<<"$events/message_delivered">>) -> 'message.delivered'; event_name(<<"$events/message_acked">>) -> 'message.acked'; event_name(<<"$events/message_dropped">>) -> 'message.dropped'; +event_name(<<"$events/message_validation_failed">>) -> 'message.validation_failed'; event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped'; event_name(_) -> 'message.publish'. @@ -1067,6 +1145,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.dropped') -> <<"$events/message_dropped">>; +event_topic('message.validation_failed') -> <<"$events/message_validation_failed">>; event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; event_topic('message.publish') -> <<"$events/message_publish">>.