From 03b226248a1fbc2b495c9bf20020592451c8e9ba Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 7 Jun 2024 14:42:02 -0300 Subject: [PATCH 1/3] feat(schema registry): add check for inner types Currently, only `protobuf` has any. --- .../include/emqx_schema_registry.hrl | 2 +- .../src/emqx_schema_registry.erl | 11 +++++ .../src/emqx_schema_registry_serde.erl | 45 +++++++++++++++++++ .../test/emqx_schema_registry_serde_SUITE.erl | 43 +++++++++++++++--- 4 files changed, 94 insertions(+), 7 deletions(-) diff --git a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl index b25042c20..11bbb0b72 100644 --- a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl +++ b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl @@ -26,7 +26,7 @@ -type encoded_data() :: iodata(). -type decoded_data() :: map(). --type serde_type() :: avro | protobuf | json. +-type serde_type() :: emqx_schema_registry_serde:serde_type(). -type serde_opts() :: map(). -record(serde, { diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry.erl b/apps/emqx_schema_registry/src/emqx_schema_registry.erl index 7ba4ebcf8..5005b0dc8 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry.erl @@ -16,6 +16,8 @@ start_link/0, add_schema/2, get_schema/1, + is_existing_type/1, + is_existing_type/2, delete_schema/1, list_schemas/0 ]). @@ -52,6 +54,7 @@ %% API %%------------------------------------------------------------------------------------------------- +-spec start_link() -> gen_server:start_ret(). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -64,6 +67,14 @@ get_serde(SchemaName) -> {ok, Serde} end. +-spec is_existing_type(schema_name()) -> boolean(). +is_existing_type(SchemaName) -> + is_existing_type(SchemaName, []). + +-spec is_existing_type(schema_name(), [binary()]) -> boolean(). +is_existing_type(SchemaName, Path) -> + emqx_schema_registry_serde:is_existing_type(SchemaName, Path). + -spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}. get_schema(SchemaName) -> case diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl index e8da35449..1a2f90ee7 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_schema_registry_serde). +-feature(maybe_expr, enable). + -behaviour(emqx_rule_funcs). -include("emqx_schema_registry.hrl"). @@ -14,6 +16,8 @@ make_serde/3, handle_rule_function/2, schema_check/3, + is_existing_type/1, + is_existing_type/2, destroy/1 ]). @@ -27,6 +31,10 @@ eval_encode/2 ]). +%%------------------------------------------------------------------------------ +%% Type definitions +%%------------------------------------------------------------------------------ + -define(BOOL(SerdeName, EXPR), try _ = EXPR, @@ -38,10 +46,28 @@ end ). +-type eval_context() :: term(). + +-export_type([serde_type/0]). + %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ +-spec is_existing_type(schema_name()) -> boolean(). +is_existing_type(SchemaName) -> + is_existing_type(SchemaName, []). + +-spec is_existing_type(schema_name(), [binary()]) -> boolean(). +is_existing_type(SchemaName, Path) -> + maybe + {ok, #serde{type = SerdeType, eval_context = EvalContext}} ?= + emqx_schema_registry:get_serde(SchemaName), + has_inner_type(SerdeType, EvalContext, Path) + else + _ -> false + end. + -spec handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}. handle_rule_function(sparkplug_decode, [Data]) -> handle_rule_function( @@ -338,3 +364,22 @@ unload_code(SerdeMod) -> _ = code:purge(SerdeMod), _ = code:delete(SerdeMod), ok. + +-spec has_inner_type(serde_type(), eval_context(), [binary()]) -> + boolean(). +has_inner_type(protobuf, _SerdeMod, [_, _ | _]) -> + %% Protobuf only has one level of message types. + false; +has_inner_type(protobuf, SerdeMod, [MessageTypeBin]) -> + try apply(SerdeMod, get_msg_names, []) of + Names -> + lists:member(MessageTypeBin, [atom_to_binary(N, utf8) || N <- Names]) + catch + _:_ -> + false + end; +has_inner_type(_SerdeType, _EvalContext, []) -> + %% This function is only called if we already found a serde, so the root does exist. + true; +has_inner_type(_SerdeType, _EvalContext, _Path) -> + false. diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl index 0fad015f0..bdc083736 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl @@ -14,7 +14,6 @@ -import(emqx_common_test_helpers, [on_exit/1]). --define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]). -define(INVALID_JSON, #{ reason := #{expected := "emqx_schema:json_binary()"}, kind := validation_error @@ -28,12 +27,20 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_config:save_schema_mod_and_names(emqx_schema_registry_schema), - emqx_mgmt_api_test_util:init_suite(?APPS), - Config. + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_schema_registry, + emqx_rule_engine + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)), +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), ok. init_per_testcase(_TestCase, Config) -> Config. @@ -240,3 +247,27 @@ t_json_validation(_Config) -> ?assertNot(F(schema_check, <<"{\"bar\": 2}">>)), ?assertNot(F(schema_check, <<"{\"foo\": \"notinteger\", \"bar\": 2}">>)), ok. + +t_is_existing_type(_Config) -> + JsonName = <<"myjson">>, + ?assertNot(emqx_schema_registry:is_existing_type(JsonName)), + ok = emqx_schema_registry:add_schema(JsonName, schema_params(json)), + AvroName = <<"myavro">>, + ?assertNot(emqx_schema_registry:is_existing_type(AvroName)), + ok = emqx_schema_registry:add_schema(AvroName, schema_params(avro)), + ProtobufName = <<"myprotobuf">>, + MessageType = <<"Person">>, + ?assertNot(emqx_schema_registry:is_existing_type(ProtobufName)), + ok = emqx_schema_registry:add_schema(ProtobufName, schema_params(protobuf)), + %% JSON Schema: no inner names + ?assert(emqx_schema_registry:is_existing_type(JsonName)), + ?assertNot(emqx_schema_registry:is_existing_type(JsonName, [JsonName])), + %% Avro: no inner names + ?assert(emqx_schema_registry:is_existing_type(AvroName)), + ?assertNot(emqx_schema_registry:is_existing_type(AvroName, [AvroName])), + %% Protobuf: one level of message types + ?assert(emqx_schema_registry:is_existing_type(ProtobufName)), + ?assertNot(emqx_schema_registry:is_existing_type(ProtobufName, [ProtobufName])), + ?assert(emqx_schema_registry:is_existing_type(ProtobufName, [MessageType])), + ?assertNot(emqx_schema_registry:is_existing_type(ProtobufName, [MessageType, MessageType])), + ok. From 0f9c3b4cea5aea11277f665fb12fc58f6b622297 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 7 Jun 2024 15:27:50 -0300 Subject: [PATCH 2/3] feat(schema validation): check references schema names and types before changing config Fixes https://emqx.atlassian.net/browse/EMQX-12368 --- .../include/emqx_schema_registry.hrl | 2 +- .../src/emqx_schema_validation.erl | 14 +- .../src/emqx_schema_validation_config.erl | 175 +++++++++++++----- .../emqx_schema_validation_http_api_SUITE.erl | 127 ++++++++++--- changes/ee/feat-13210.en.md | 1 + 5 files changed, 250 insertions(+), 69 deletions(-) create mode 100644 changes/ee/feat-13210.en.md diff --git a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl index 11bbb0b72..b25042c20 100644 --- a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl +++ b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl @@ -26,7 +26,7 @@ -type encoded_data() :: iodata(). -type decoded_data() :: map(). --type serde_type() :: emqx_schema_registry_serde:serde_type(). +-type serde_type() :: avro | protobuf | json. -type serde_opts() :: map(). -record(serde, { diff --git a/apps/emqx_schema_validation/src/emqx_schema_validation.erl b/apps/emqx_schema_validation/src/emqx_schema_validation.erl index 6a5e76573..755773f13 100644 --- a/apps/emqx_schema_validation/src/emqx_schema_validation.erl +++ b/apps/emqx_schema_validation/src/emqx_schema_validation.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_schema_validation). +-feature(maybe_expr, enable). + -include_lib("snabbkaffe/include/trace.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). @@ -41,7 +43,13 @@ -define(TRACE_TAG, "SCHEMA_VALIDATION"). -type validation_name() :: binary(). --type validation() :: _TODO. +-type raw_validation() :: #{binary() => _}. +-type validation() :: #{ + name := validation_name(), + strategy := all_pass | any_pass, + failure_action := drop | disconnect | ignore, + log_failure := #{level := error | warning | notice | info | debug | none} +}. -export_type([ validation/0, @@ -65,12 +73,12 @@ reorder(Order) -> lookup(Name) -> emqx_schema_validation_config:lookup(Name). --spec insert(validation()) -> +-spec insert(raw_validation()) -> {ok, _} | {error, _}. insert(Validation) -> emqx_schema_validation_config:insert(Validation). --spec update(validation()) -> +-spec update(raw_validation()) -> {ok, _} | {error, _}. update(Validation) -> emqx_schema_validation_config:update(Validation). diff --git a/apps/emqx_schema_validation/src/emqx_schema_validation_config.erl b/apps/emqx_schema_validation/src/emqx_schema_validation_config.erl index 2bfd11c6d..aef662887 100644 --- a/apps/emqx_schema_validation/src/emqx_schema_validation_config.erl +++ b/apps/emqx_schema_validation/src/emqx_schema_validation_config.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_schema_validation_config). +-feature(maybe_expr, enable). + %% API -export([ add_handler/0, @@ -136,15 +138,25 @@ pre_config_update([?CONF_ROOT], {merge, NewConfig}, OldConfig) -> pre_config_update([?CONF_ROOT], {replace, NewConfig}, _OldConfig) -> {ok, NewConfig}. -post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) -> - {Pos, Validation} = fetch_with_index(New, Name), - ok = emqx_schema_validation_registry:insert(Pos, Validation), - ok; -post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) -> - {_Pos, OldValidation} = fetch_with_index(Old, Name), - {Pos, NewValidation} = fetch_with_index(New, Name), - ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation), - ok; +post_config_update( + ?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name} = RawValidation}, New, _Old, _AppEnvs +) -> + maybe + ok ?= assert_referenced_schemas_exist(RawValidation), + {Pos, Validation} = fetch_with_index(New, Name), + ok = emqx_schema_validation_registry:insert(Pos, Validation), + ok + end; +post_config_update( + ?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name} = RawValidation}, New, Old, _AppEnvs +) -> + maybe + ok ?= assert_referenced_schemas_exist(RawValidation), + {_Pos, OldValidation} = fetch_with_index(Old, Name), + {Pos, NewValidation} = fetch_with_index(New, Name), + ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation), + ok + end; post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) -> {Pos, Validation} = fetch_with_index(Old, Name), ok = emqx_schema_validation_registry:delete(Validation, Pos), @@ -161,16 +173,19 @@ post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) -> OldValidations, fun(#{name := N}) -> N end ), - NewValidations = - lists:map( - fun(#{name := Name}) -> - {Pos, Validation} = fetch_with_index(ResultingValidations, Name), - ok = emqx_schema_validation_registry:insert(Pos, Validation), - #{name => Name, pos => Pos} - end, - NewValidations0 - ), - {ok, #{new_validations => NewValidations}}; + maybe + ok ?= multi_assert_referenced_schemas_exist(NewValidations0), + NewValidations = + lists:map( + fun(#{name := Name}) -> + {Pos, Validation} = fetch_with_index(ResultingValidations, Name), + ok = emqx_schema_validation_registry:insert(Pos, Validation), + #{name => Name, pos => Pos} + end, + NewValidations0 + ), + {ok, #{new_validations => NewValidations}} + end; post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnvs) -> #{ new_validations := NewValidations, @@ -179,32 +194,46 @@ post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnv } = prepare_config_replace(Input, Old), #{validations := ResultingValidations} = ResultingConfig, #{validations := OldValidations} = Old, - lists:foreach( - fun(Name) -> - {Pos, Validation} = fetch_with_index(OldValidations, Name), - ok = emqx_schema_validation_registry:delete(Validation, Pos) - end, - DeletedValidations - ), - lists:foreach( - fun(Name) -> - {Pos, Validation} = fetch_with_index(ResultingValidations, Name), - ok = emqx_schema_validation_registry:insert(Pos, Validation) - end, - NewValidations - ), - ChangedValidations = - lists:map( + NewOrChangedValidationNames = NewValidations ++ ChangedValidations0, + maybe + ok ?= + multi_assert_referenced_schemas_exist( + lists:filter( + fun(#{name := N}) -> + lists:member(N, NewOrChangedValidationNames) + end, + ResultingValidations + ) + ), + lists:foreach( fun(Name) -> - {_Pos, OldValidation} = fetch_with_index(OldValidations, Name), - {Pos, NewValidation} = fetch_with_index(ResultingValidations, Name), - ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation), - #{name => Name, pos => Pos} + {Pos, Validation} = fetch_with_index(OldValidations, Name), + ok = emqx_schema_validation_registry:delete(Validation, Pos) end, - ChangedValidations0 + DeletedValidations ), - ok = emqx_schema_validation_registry:reindex_positions(ResultingValidations, OldValidations), - {ok, #{changed_validations => ChangedValidations}}. + lists:foreach( + fun(Name) -> + {Pos, Validation} = fetch_with_index(ResultingValidations, Name), + ok = emqx_schema_validation_registry:insert(Pos, Validation) + end, + NewValidations + ), + ChangedValidations = + lists:map( + fun(Name) -> + {_Pos, OldValidation} = fetch_with_index(OldValidations, Name), + {Pos, NewValidation} = fetch_with_index(ResultingValidations, Name), + ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation), + #{name => Name, pos => Pos} + end, + ChangedValidations0 + ), + ok = emqx_schema_validation_registry:reindex_positions( + ResultingValidations, OldValidations + ), + {ok, #{changed_validations => ChangedValidations}} + end. %%------------------------------------------------------------------------------ %% `emqx_config_backup' API @@ -388,3 +417,65 @@ prepare_config_replace(NewConfig, OldConfig) -> changed_validations => ChangedValidations0 ++ ChangedValidations1, deleted_validations => DeletedValidations }. + +-spec assert_referenced_schemas_exist(raw_validation()) -> ok | {error, map()}. +assert_referenced_schemas_exist(RawValidation) -> + #{<<"checks">> := RawChecks} = RawValidation, + SchemasToCheck = + lists:filtermap( + fun + (#{<<"schema">> := SchemaName} = Check) -> + %% so far, only protobuf has inner types + InnerPath = + case maps:find(<<"message_type">>, Check) of + {ok, MessageType} -> [MessageType]; + error -> [] + end, + {true, {SchemaName, InnerPath}}; + (_Check) -> + false + end, + RawChecks + ), + do_assert_referenced_schemas_exist(SchemasToCheck). + +do_assert_referenced_schemas_exist(SchemasToCheck) -> + MissingSchemas = + lists:foldl( + fun({SchemaName, InnerPath}, Acc) -> + case emqx_schema_registry:is_existing_type(SchemaName, InnerPath) of + true -> + Acc; + false -> + [[SchemaName | InnerPath] | Acc] + end + end, + [], + SchemasToCheck + ), + case MissingSchemas of + [] -> + ok; + [_ | _] -> + {error, #{missing_schemas => MissingSchemas}} + end. + +-spec multi_assert_referenced_schemas_exist([validation()]) -> ok | {error, map()}. +multi_assert_referenced_schemas_exist(Validations) -> + SchemasToCheck = + lists:filtermap( + fun + (#{schema := SchemaName} = Check) -> + %% so far, only protobuf has inner types + InnerPath = + case maps:find(message_type, Check) of + {ok, MessageType} -> [MessageType]; + error -> [] + end, + {true, {SchemaName, InnerPath}}; + (_Check) -> + false + end, + [Check || #{checks := Checks} <- Validations, Check <- Checks] + ), + do_assert_referenced_schemas_exist(SchemasToCheck). diff --git a/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl b/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl index 41731fa1b..76a54ebcf 100644 --- a/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl +++ b/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl @@ -356,25 +356,36 @@ protobuf_invalid_payloads() -> ]. protobuf_create_serde(SerdeName) -> - Source = - << - "message Person {\n" - " required string name = 1;\n" - " required int32 id = 2;\n" - " optional string email = 3;\n" - " }\n" - "message UnionValue {\n" - " oneof u {\n" - " int32 a = 1;\n" - " string b = 2;\n" - " }\n" - "}" - >>, + protobuf_upsert_serde(SerdeName, <<"Person">>). + +protobuf_upsert_serde(SerdeName, MessageType) -> + Source = protobuf_source(MessageType), Schema = #{type => protobuf, source => Source}, ok = emqx_schema_registry:add_schema(SerdeName, Schema), on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end), ok. +protobuf_source(MessageType) -> + iolist_to_binary( + [ + <<"message ">>, + MessageType, + <<" {\n">>, + << + " required string name = 1;\n" + " required int32 id = 2;\n" + " optional string email = 3;\n" + " }\n" + "message UnionValue {\n" + " oneof u {\n" + " int32 a = 1;\n" + " string b = 2;\n" + " }\n" + "}" + >> + ] + ). + %% Checks that the internal order in the registry/index matches expectation. assert_index_order(ExpectedOrder, Topic, Comment) -> ?assertEqual( @@ -1041,6 +1052,7 @@ t_duplicated_schema_checks(_Config) -> Name1 = <<"foo">>, SerdeName = <<"myserde">>, Check = schema_check(json, SerdeName), + json_create_serde(SerdeName), Validation1 = validation(Name1, [Check, sql_check(), Check]), ?assertMatch({400, _}, insert(Validation1)), @@ -1130,18 +1142,87 @@ t_multiple_validations(_Config) -> ok. +%% Test that we validate schema registry serde existency when using the HTTP API. t_schema_check_non_existent_serde(_Config) -> SerdeName = <<"idontexist">>, Name1 = <<"foo">>, + Check1 = schema_check(json, SerdeName), Validation1 = validation(Name1, [Check1]), - {201, _} = insert(Validation1), + ?assertMatch({400, _}, insert(Validation1)), - C = connect(<<"c1">>), - {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + Check2 = schema_check(avro, SerdeName), + Validation2 = validation(Name1, [Check2]), + ?assertMatch({400, _}, insert(Validation2)), - ok = publish(C, <<"t/1">>, #{i => 10, s => <<"s">>}), - ?assertNotReceive({publish, _}), + MessageType = <<"idontexisteither">>, + Check3 = schema_check(protobuf, SerdeName, #{<<"message_type">> => MessageType}), + Validation3 = validation(Name1, [Check3]), + ?assertMatch({400, _}, insert(Validation3)), + + protobuf_create_serde(SerdeName), + %% Still fails because reference message type doesn't exist. + ?assertMatch({400, _}, insert(Validation3)), + + ok. + +%% Test that we validate schema registry serde existency when loading configs. +t_schema_check_non_existent_serde_load_config(_Config) -> + Name1 = <<"1">>, + SerdeName1 = <<"serde1">>, + MessageType1 = <<"mt">>, + Check1A = schema_check(protobuf, SerdeName1, #{<<"message_type">> => MessageType1}), + Validation1A = validation(Name1, [Check1A]), + protobuf_upsert_serde(SerdeName1, MessageType1), + {201, _} = insert(Validation1A), + Name2 = <<"2">>, + SerdeName2 = <<"serde2">>, + Check2A = schema_check(json, SerdeName2), + Validation2A = validation(Name2, [Check2A]), + json_create_serde(SerdeName2), + {201, _} = insert(Validation2A), + + %% Config to load + %% Will replace existing config + MissingMessageType = <<"missing_mt">>, + Check1B = schema_check(protobuf, SerdeName1, #{<<"message_type">> => MissingMessageType}), + Validation1B = validation(Name1, [Check1B]), + + %% Will replace existing config + MissingSerdeName1 = <<"missing1">>, + Check2B = schema_check(json, MissingSerdeName1), + Validation2B = validation(Name2, [Check2B]), + + %% New validation; should be appended + Name3 = <<"3">>, + MissingSerdeName2 = <<"missing2">>, + Check3 = schema_check(avro, MissingSerdeName2), + Validation3 = validation(Name3, [Check3]), + + ConfRootBin = <<"schema_validation">>, + ConfigToLoad1 = #{ + ConfRootBin => #{ + <<"validations">> => [Validation1B, Validation2B, Validation3] + } + }, + ConfigToLoadBin1 = iolist_to_binary(hocon_pp:do(ConfigToLoad1, #{})), + %% Merge + ResMerge = emqx_conf_cli:load_config(ConfigToLoadBin1, #{mode => merge}), + ?assertMatch({error, _}, ResMerge), + {error, ErrorMessage1} = ResMerge, + ?assertEqual(match, re:run(ErrorMessage1, <<"missing_schemas">>, [{capture, none}])), + ?assertEqual(match, re:run(ErrorMessage1, MissingSerdeName1, [{capture, none}])), + ?assertEqual(match, re:run(ErrorMessage1, MissingSerdeName2, [{capture, none}])), + ?assertEqual(match, re:run(ErrorMessage1, MissingMessageType, [{capture, none}])), + + %% Replace + ResReplace = emqx_conf_cli:load_config(ConfigToLoadBin1, #{mode => replace}), + ?assertMatch({error, _}, ResReplace), + {error, ErrorMessage2} = ResReplace, + ?assertEqual(match, re:run(ErrorMessage2, <<"missing_schemas">>, [{capture, none}])), + ?assertEqual(match, re:run(ErrorMessage2, MissingSerdeName1, [{capture, none}])), + ?assertEqual(match, re:run(ErrorMessage2, MissingSerdeName2, [{capture, none}])), + ?assertEqual(match, re:run(ErrorMessage2, MissingMessageType, [{capture, none}])), ok. @@ -1232,16 +1313,16 @@ t_schema_check_protobuf(_Config) -> ), %% Bad config: unknown message name - Check2 = schema_check(protobuf, SerdeName, #{<<"message_type">> => <<"idontexist">>}), - Validation2 = validation(Name1, [Check2]), - {200, _} = update(Validation2), + %% Schema updated to use another message type after validation was created + OtherMessageType = <<"NewPersonType">>, + protobuf_upsert_serde(SerdeName, OtherMessageType), lists:foreach( fun(Payload) -> ok = publish(C, <<"t/1">>, {raw, Payload}), ?assertNotReceive({publish, _}) end, - protobuf_valid_payloads(SerdeName, MessageType) + protobuf_valid_payloads(SerdeName, OtherMessageType) ), ok. diff --git a/changes/ee/feat-13210.en.md b/changes/ee/feat-13210.en.md new file mode 100644 index 000000000..ed059a873 --- /dev/null +++ b/changes/ee/feat-13210.en.md @@ -0,0 +1 @@ +Now, when inserting or updating a Schema Validation, EMQX will check if the referenced schemas and message types exist in Schema Registry. From 337009c3a00be8f1bfeda0fa7deb1d8ed37a855b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 10 Jun 2024 13:45:29 -0300 Subject: [PATCH 3/3] fix: declare `emqx_schema_registry` as a dependency of `emqx_schema_validation` --- apps/emqx_schema_registry/src/emqx_schema_registry.app.src | 3 ++- apps/emqx_schema_validation/src/emqx_schema_validation.app.src | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry.app.src b/apps/emqx_schema_registry/src/emqx_schema_registry.app.src index efd9f1162..7577f8aeb 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry.app.src +++ b/apps/emqx_schema_registry/src/emqx_schema_registry.app.src @@ -11,7 +11,8 @@ stdlib, erlavro, gpb, - jesse + jesse, + emqx ]}, {env, []}, {modules, []}, diff --git a/apps/emqx_schema_validation/src/emqx_schema_validation.app.src b/apps/emqx_schema_validation/src/emqx_schema_validation.app.src index 773e0fff0..2dfe710db 100644 --- a/apps/emqx_schema_validation/src/emqx_schema_validation.app.src +++ b/apps/emqx_schema_validation/src/emqx_schema_validation.app.src @@ -5,7 +5,8 @@ {mod, {emqx_schema_validation_app, []}}, {applications, [ kernel, - stdlib + stdlib, + emqx_schema_registry ]}, {env, []}, {modules, []},