From 0f9c3b4cea5aea11277f665fb12fc58f6b622297 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 7 Jun 2024 15:27:50 -0300 Subject: [PATCH] feat(schema validation): check references schema names and types before changing config Fixes https://emqx.atlassian.net/browse/EMQX-12368 --- .../include/emqx_schema_registry.hrl | 2 +- .../src/emqx_schema_validation.erl | 14 +- .../src/emqx_schema_validation_config.erl | 175 +++++++++++++----- .../emqx_schema_validation_http_api_SUITE.erl | 127 ++++++++++--- changes/ee/feat-13210.en.md | 1 + 5 files changed, 250 insertions(+), 69 deletions(-) create mode 100644 changes/ee/feat-13210.en.md diff --git a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl index 11bbb0b72..b25042c20 100644 --- a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl +++ b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl @@ -26,7 +26,7 @@ -type encoded_data() :: iodata(). -type decoded_data() :: map(). --type serde_type() :: emqx_schema_registry_serde:serde_type(). +-type serde_type() :: avro | protobuf | json. -type serde_opts() :: map(). -record(serde, { diff --git a/apps/emqx_schema_validation/src/emqx_schema_validation.erl b/apps/emqx_schema_validation/src/emqx_schema_validation.erl index 6a5e76573..755773f13 100644 --- a/apps/emqx_schema_validation/src/emqx_schema_validation.erl +++ b/apps/emqx_schema_validation/src/emqx_schema_validation.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_schema_validation). +-feature(maybe_expr, enable). + -include_lib("snabbkaffe/include/trace.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). @@ -41,7 +43,13 @@ -define(TRACE_TAG, "SCHEMA_VALIDATION"). -type validation_name() :: binary(). --type validation() :: _TODO. +-type raw_validation() :: #{binary() => _}. +-type validation() :: #{ + name := validation_name(), + strategy := all_pass | any_pass, + failure_action := drop | disconnect | ignore, + log_failure := #{level := error | warning | notice | info | debug | none} +}. -export_type([ validation/0, @@ -65,12 +73,12 @@ reorder(Order) -> lookup(Name) -> emqx_schema_validation_config:lookup(Name). --spec insert(validation()) -> +-spec insert(raw_validation()) -> {ok, _} | {error, _}. insert(Validation) -> emqx_schema_validation_config:insert(Validation). --spec update(validation()) -> +-spec update(raw_validation()) -> {ok, _} | {error, _}. update(Validation) -> emqx_schema_validation_config:update(Validation). diff --git a/apps/emqx_schema_validation/src/emqx_schema_validation_config.erl b/apps/emqx_schema_validation/src/emqx_schema_validation_config.erl index 2bfd11c6d..aef662887 100644 --- a/apps/emqx_schema_validation/src/emqx_schema_validation_config.erl +++ b/apps/emqx_schema_validation/src/emqx_schema_validation_config.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_schema_validation_config). +-feature(maybe_expr, enable). + %% API -export([ add_handler/0, @@ -136,15 +138,25 @@ pre_config_update([?CONF_ROOT], {merge, NewConfig}, OldConfig) -> pre_config_update([?CONF_ROOT], {replace, NewConfig}, _OldConfig) -> {ok, NewConfig}. -post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) -> - {Pos, Validation} = fetch_with_index(New, Name), - ok = emqx_schema_validation_registry:insert(Pos, Validation), - ok; -post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) -> - {_Pos, OldValidation} = fetch_with_index(Old, Name), - {Pos, NewValidation} = fetch_with_index(New, Name), - ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation), - ok; +post_config_update( + ?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name} = RawValidation}, New, _Old, _AppEnvs +) -> + maybe + ok ?= assert_referenced_schemas_exist(RawValidation), + {Pos, Validation} = fetch_with_index(New, Name), + ok = emqx_schema_validation_registry:insert(Pos, Validation), + ok + end; +post_config_update( + ?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name} = RawValidation}, New, Old, _AppEnvs +) -> + maybe + ok ?= assert_referenced_schemas_exist(RawValidation), + {_Pos, OldValidation} = fetch_with_index(Old, Name), + {Pos, NewValidation} = fetch_with_index(New, Name), + ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation), + ok + end; post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) -> {Pos, Validation} = fetch_with_index(Old, Name), ok = emqx_schema_validation_registry:delete(Validation, Pos), @@ -161,16 +173,19 @@ post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) -> OldValidations, fun(#{name := N}) -> N end ), - NewValidations = - lists:map( - fun(#{name := Name}) -> - {Pos, Validation} = fetch_with_index(ResultingValidations, Name), - ok = emqx_schema_validation_registry:insert(Pos, Validation), - #{name => Name, pos => Pos} - end, - NewValidations0 - ), - {ok, #{new_validations => NewValidations}}; + maybe + ok ?= multi_assert_referenced_schemas_exist(NewValidations0), + NewValidations = + lists:map( + fun(#{name := Name}) -> + {Pos, Validation} = fetch_with_index(ResultingValidations, Name), + ok = emqx_schema_validation_registry:insert(Pos, Validation), + #{name => Name, pos => Pos} + end, + NewValidations0 + ), + {ok, #{new_validations => NewValidations}} + end; post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnvs) -> #{ new_validations := NewValidations, @@ -179,32 +194,46 @@ post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnv } = prepare_config_replace(Input, Old), #{validations := ResultingValidations} = ResultingConfig, #{validations := OldValidations} = Old, - lists:foreach( - fun(Name) -> - {Pos, Validation} = fetch_with_index(OldValidations, Name), - ok = emqx_schema_validation_registry:delete(Validation, Pos) - end, - DeletedValidations - ), - lists:foreach( - fun(Name) -> - {Pos, Validation} = fetch_with_index(ResultingValidations, Name), - ok = emqx_schema_validation_registry:insert(Pos, Validation) - end, - NewValidations - ), - ChangedValidations = - lists:map( + NewOrChangedValidationNames = NewValidations ++ ChangedValidations0, + maybe + ok ?= + multi_assert_referenced_schemas_exist( + lists:filter( + fun(#{name := N}) -> + lists:member(N, NewOrChangedValidationNames) + end, + ResultingValidations + ) + ), + lists:foreach( fun(Name) -> - {_Pos, OldValidation} = fetch_with_index(OldValidations, Name), - {Pos, NewValidation} = fetch_with_index(ResultingValidations, Name), - ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation), - #{name => Name, pos => Pos} + {Pos, Validation} = fetch_with_index(OldValidations, Name), + ok = emqx_schema_validation_registry:delete(Validation, Pos) end, - ChangedValidations0 + DeletedValidations ), - ok = emqx_schema_validation_registry:reindex_positions(ResultingValidations, OldValidations), - {ok, #{changed_validations => ChangedValidations}}. + lists:foreach( + fun(Name) -> + {Pos, Validation} = fetch_with_index(ResultingValidations, Name), + ok = emqx_schema_validation_registry:insert(Pos, Validation) + end, + NewValidations + ), + ChangedValidations = + lists:map( + fun(Name) -> + {_Pos, OldValidation} = fetch_with_index(OldValidations, Name), + {Pos, NewValidation} = fetch_with_index(ResultingValidations, Name), + ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation), + #{name => Name, pos => Pos} + end, + ChangedValidations0 + ), + ok = emqx_schema_validation_registry:reindex_positions( + ResultingValidations, OldValidations + ), + {ok, #{changed_validations => ChangedValidations}} + end. %%------------------------------------------------------------------------------ %% `emqx_config_backup' API @@ -388,3 +417,65 @@ prepare_config_replace(NewConfig, OldConfig) -> changed_validations => ChangedValidations0 ++ ChangedValidations1, deleted_validations => DeletedValidations }. + +-spec assert_referenced_schemas_exist(raw_validation()) -> ok | {error, map()}. +assert_referenced_schemas_exist(RawValidation) -> + #{<<"checks">> := RawChecks} = RawValidation, + SchemasToCheck = + lists:filtermap( + fun + (#{<<"schema">> := SchemaName} = Check) -> + %% so far, only protobuf has inner types + InnerPath = + case maps:find(<<"message_type">>, Check) of + {ok, MessageType} -> [MessageType]; + error -> [] + end, + {true, {SchemaName, InnerPath}}; + (_Check) -> + false + end, + RawChecks + ), + do_assert_referenced_schemas_exist(SchemasToCheck). + +do_assert_referenced_schemas_exist(SchemasToCheck) -> + MissingSchemas = + lists:foldl( + fun({SchemaName, InnerPath}, Acc) -> + case emqx_schema_registry:is_existing_type(SchemaName, InnerPath) of + true -> + Acc; + false -> + [[SchemaName | InnerPath] | Acc] + end + end, + [], + SchemasToCheck + ), + case MissingSchemas of + [] -> + ok; + [_ | _] -> + {error, #{missing_schemas => MissingSchemas}} + end. + +-spec multi_assert_referenced_schemas_exist([validation()]) -> ok | {error, map()}. +multi_assert_referenced_schemas_exist(Validations) -> + SchemasToCheck = + lists:filtermap( + fun + (#{schema := SchemaName} = Check) -> + %% so far, only protobuf has inner types + InnerPath = + case maps:find(message_type, Check) of + {ok, MessageType} -> [MessageType]; + error -> [] + end, + {true, {SchemaName, InnerPath}}; + (_Check) -> + false + end, + [Check || #{checks := Checks} <- Validations, Check <- Checks] + ), + do_assert_referenced_schemas_exist(SchemasToCheck). diff --git a/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl b/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl index 41731fa1b..76a54ebcf 100644 --- a/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl +++ b/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl @@ -356,25 +356,36 @@ protobuf_invalid_payloads() -> ]. protobuf_create_serde(SerdeName) -> - Source = - << - "message Person {\n" - " required string name = 1;\n" - " required int32 id = 2;\n" - " optional string email = 3;\n" - " }\n" - "message UnionValue {\n" - " oneof u {\n" - " int32 a = 1;\n" - " string b = 2;\n" - " }\n" - "}" - >>, + protobuf_upsert_serde(SerdeName, <<"Person">>). + +protobuf_upsert_serde(SerdeName, MessageType) -> + Source = protobuf_source(MessageType), Schema = #{type => protobuf, source => Source}, ok = emqx_schema_registry:add_schema(SerdeName, Schema), on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end), ok. +protobuf_source(MessageType) -> + iolist_to_binary( + [ + <<"message ">>, + MessageType, + <<" {\n">>, + << + " required string name = 1;\n" + " required int32 id = 2;\n" + " optional string email = 3;\n" + " }\n" + "message UnionValue {\n" + " oneof u {\n" + " int32 a = 1;\n" + " string b = 2;\n" + " }\n" + "}" + >> + ] + ). + %% Checks that the internal order in the registry/index matches expectation. assert_index_order(ExpectedOrder, Topic, Comment) -> ?assertEqual( @@ -1041,6 +1052,7 @@ t_duplicated_schema_checks(_Config) -> Name1 = <<"foo">>, SerdeName = <<"myserde">>, Check = schema_check(json, SerdeName), + json_create_serde(SerdeName), Validation1 = validation(Name1, [Check, sql_check(), Check]), ?assertMatch({400, _}, insert(Validation1)), @@ -1130,18 +1142,87 @@ t_multiple_validations(_Config) -> ok. +%% Test that we validate schema registry serde existency when using the HTTP API. t_schema_check_non_existent_serde(_Config) -> SerdeName = <<"idontexist">>, Name1 = <<"foo">>, + Check1 = schema_check(json, SerdeName), Validation1 = validation(Name1, [Check1]), - {201, _} = insert(Validation1), + ?assertMatch({400, _}, insert(Validation1)), - C = connect(<<"c1">>), - {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + Check2 = schema_check(avro, SerdeName), + Validation2 = validation(Name1, [Check2]), + ?assertMatch({400, _}, insert(Validation2)), - ok = publish(C, <<"t/1">>, #{i => 10, s => <<"s">>}), - ?assertNotReceive({publish, _}), + MessageType = <<"idontexisteither">>, + Check3 = schema_check(protobuf, SerdeName, #{<<"message_type">> => MessageType}), + Validation3 = validation(Name1, [Check3]), + ?assertMatch({400, _}, insert(Validation3)), + + protobuf_create_serde(SerdeName), + %% Still fails because reference message type doesn't exist. + ?assertMatch({400, _}, insert(Validation3)), + + ok. + +%% Test that we validate schema registry serde existency when loading configs. +t_schema_check_non_existent_serde_load_config(_Config) -> + Name1 = <<"1">>, + SerdeName1 = <<"serde1">>, + MessageType1 = <<"mt">>, + Check1A = schema_check(protobuf, SerdeName1, #{<<"message_type">> => MessageType1}), + Validation1A = validation(Name1, [Check1A]), + protobuf_upsert_serde(SerdeName1, MessageType1), + {201, _} = insert(Validation1A), + Name2 = <<"2">>, + SerdeName2 = <<"serde2">>, + Check2A = schema_check(json, SerdeName2), + Validation2A = validation(Name2, [Check2A]), + json_create_serde(SerdeName2), + {201, _} = insert(Validation2A), + + %% Config to load + %% Will replace existing config + MissingMessageType = <<"missing_mt">>, + Check1B = schema_check(protobuf, SerdeName1, #{<<"message_type">> => MissingMessageType}), + Validation1B = validation(Name1, [Check1B]), + + %% Will replace existing config + MissingSerdeName1 = <<"missing1">>, + Check2B = schema_check(json, MissingSerdeName1), + Validation2B = validation(Name2, [Check2B]), + + %% New validation; should be appended + Name3 = <<"3">>, + MissingSerdeName2 = <<"missing2">>, + Check3 = schema_check(avro, MissingSerdeName2), + Validation3 = validation(Name3, [Check3]), + + ConfRootBin = <<"schema_validation">>, + ConfigToLoad1 = #{ + ConfRootBin => #{ + <<"validations">> => [Validation1B, Validation2B, Validation3] + } + }, + ConfigToLoadBin1 = iolist_to_binary(hocon_pp:do(ConfigToLoad1, #{})), + %% Merge + ResMerge = emqx_conf_cli:load_config(ConfigToLoadBin1, #{mode => merge}), + ?assertMatch({error, _}, ResMerge), + {error, ErrorMessage1} = ResMerge, + ?assertEqual(match, re:run(ErrorMessage1, <<"missing_schemas">>, [{capture, none}])), + ?assertEqual(match, re:run(ErrorMessage1, MissingSerdeName1, [{capture, none}])), + ?assertEqual(match, re:run(ErrorMessage1, MissingSerdeName2, [{capture, none}])), + ?assertEqual(match, re:run(ErrorMessage1, MissingMessageType, [{capture, none}])), + + %% Replace + ResReplace = emqx_conf_cli:load_config(ConfigToLoadBin1, #{mode => replace}), + ?assertMatch({error, _}, ResReplace), + {error, ErrorMessage2} = ResReplace, + ?assertEqual(match, re:run(ErrorMessage2, <<"missing_schemas">>, [{capture, none}])), + ?assertEqual(match, re:run(ErrorMessage2, MissingSerdeName1, [{capture, none}])), + ?assertEqual(match, re:run(ErrorMessage2, MissingSerdeName2, [{capture, none}])), + ?assertEqual(match, re:run(ErrorMessage2, MissingMessageType, [{capture, none}])), ok. @@ -1232,16 +1313,16 @@ t_schema_check_protobuf(_Config) -> ), %% Bad config: unknown message name - Check2 = schema_check(protobuf, SerdeName, #{<<"message_type">> => <<"idontexist">>}), - Validation2 = validation(Name1, [Check2]), - {200, _} = update(Validation2), + %% Schema updated to use another message type after validation was created + OtherMessageType = <<"NewPersonType">>, + protobuf_upsert_serde(SerdeName, OtherMessageType), lists:foreach( fun(Payload) -> ok = publish(C, <<"t/1">>, {raw, Payload}), ?assertNotReceive({publish, _}) end, - protobuf_valid_payloads(SerdeName, MessageType) + protobuf_valid_payloads(SerdeName, OtherMessageType) ), ok. diff --git a/changes/ee/feat-13210.en.md b/changes/ee/feat-13210.en.md new file mode 100644 index 000000000..ed059a873 --- /dev/null +++ b/changes/ee/feat-13210.en.md @@ -0,0 +1 @@ +Now, when inserting or updating a Schema Validation, EMQX will check if the referenced schemas and message types exist in Schema Registry.