Merge pull request #13210 from thalesmg/sv-check-referenced-types-r57-20240607

feat(schema validation): check references schema names and types before changing config
This commit is contained in:
Thales Macedo Garitezi 2024-06-11 15:34:14 -03:00 committed by GitHub
commit 7cb57606bf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 346 additions and 76 deletions

View File

@ -11,7 +11,8 @@
stdlib, stdlib,
erlavro, erlavro,
gpb, gpb,
jesse jesse,
emqx
]}, ]},
{env, []}, {env, []},
{modules, []}, {modules, []},

View File

@ -16,6 +16,8 @@
start_link/0, start_link/0,
add_schema/2, add_schema/2,
get_schema/1, get_schema/1,
is_existing_type/1,
is_existing_type/2,
delete_schema/1, delete_schema/1,
list_schemas/0 list_schemas/0
]). ]).
@ -52,6 +54,7 @@
%% API %% API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
-spec start_link() -> gen_server:start_ret().
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@ -64,6 +67,14 @@ get_serde(SchemaName) ->
{ok, Serde} {ok, Serde}
end. end.
-spec is_existing_type(schema_name()) -> boolean().
is_existing_type(SchemaName) ->
is_existing_type(SchemaName, []).
-spec is_existing_type(schema_name(), [binary()]) -> boolean().
is_existing_type(SchemaName, Path) ->
emqx_schema_registry_serde:is_existing_type(SchemaName, Path).
-spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}. -spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}.
get_schema(SchemaName) -> get_schema(SchemaName) ->
case case

View File

@ -3,6 +3,8 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_schema_registry_serde). -module(emqx_schema_registry_serde).
-feature(maybe_expr, enable).
-behaviour(emqx_rule_funcs). -behaviour(emqx_rule_funcs).
-include("emqx_schema_registry.hrl"). -include("emqx_schema_registry.hrl").
@ -14,6 +16,8 @@
make_serde/3, make_serde/3,
handle_rule_function/2, handle_rule_function/2,
schema_check/3, schema_check/3,
is_existing_type/1,
is_existing_type/2,
destroy/1 destroy/1
]). ]).
@ -27,6 +31,10 @@
eval_encode/2 eval_encode/2
]). ]).
%%------------------------------------------------------------------------------
%% Type definitions
%%------------------------------------------------------------------------------
-define(BOOL(SerdeName, EXPR), -define(BOOL(SerdeName, EXPR),
try try
_ = EXPR, _ = EXPR,
@ -38,10 +46,28 @@
end end
). ).
-type eval_context() :: term().
-export_type([serde_type/0]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec is_existing_type(schema_name()) -> boolean().
is_existing_type(SchemaName) ->
is_existing_type(SchemaName, []).
-spec is_existing_type(schema_name(), [binary()]) -> boolean().
is_existing_type(SchemaName, Path) ->
maybe
{ok, #serde{type = SerdeType, eval_context = EvalContext}} ?=
emqx_schema_registry:get_serde(SchemaName),
has_inner_type(SerdeType, EvalContext, Path)
else
_ -> false
end.
-spec handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}. -spec handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
handle_rule_function(sparkplug_decode, [Data]) -> handle_rule_function(sparkplug_decode, [Data]) ->
handle_rule_function( handle_rule_function(
@ -338,3 +364,22 @@ unload_code(SerdeMod) ->
_ = code:purge(SerdeMod), _ = code:purge(SerdeMod),
_ = code:delete(SerdeMod), _ = code:delete(SerdeMod),
ok. ok.
-spec has_inner_type(serde_type(), eval_context(), [binary()]) ->
boolean().
has_inner_type(protobuf, _SerdeMod, [_, _ | _]) ->
%% Protobuf only has one level of message types.
false;
has_inner_type(protobuf, SerdeMod, [MessageTypeBin]) ->
try apply(SerdeMod, get_msg_names, []) of
Names ->
lists:member(MessageTypeBin, [atom_to_binary(N, utf8) || N <- Names])
catch
_:_ ->
false
end;
has_inner_type(_SerdeType, _EvalContext, []) ->
%% This function is only called if we already found a serde, so the root does exist.
true;
has_inner_type(_SerdeType, _EvalContext, _Path) ->
false.

View File

@ -14,7 +14,6 @@
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
-define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]).
-define(INVALID_JSON, #{ -define(INVALID_JSON, #{
reason := #{expected := "emqx_schema:json_binary()"}, reason := #{expected := "emqx_schema:json_binary()"},
kind := validation_error kind := validation_error
@ -28,12 +27,20 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_config:save_schema_mod_and_names(emqx_schema_registry_schema), Apps = emqx_cth_suite:start(
emqx_mgmt_api_test_util:init_suite(?APPS), [
Config. emqx,
emqx_conf,
emqx_schema_registry,
emqx_rule_engine
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(_Config) -> end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)), Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok. ok.
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
Config. Config.
@ -240,3 +247,27 @@ t_json_validation(_Config) ->
?assertNot(F(schema_check, <<"{\"bar\": 2}">>)), ?assertNot(F(schema_check, <<"{\"bar\": 2}">>)),
?assertNot(F(schema_check, <<"{\"foo\": \"notinteger\", \"bar\": 2}">>)), ?assertNot(F(schema_check, <<"{\"foo\": \"notinteger\", \"bar\": 2}">>)),
ok. ok.
t_is_existing_type(_Config) ->
JsonName = <<"myjson">>,
?assertNot(emqx_schema_registry:is_existing_type(JsonName)),
ok = emqx_schema_registry:add_schema(JsonName, schema_params(json)),
AvroName = <<"myavro">>,
?assertNot(emqx_schema_registry:is_existing_type(AvroName)),
ok = emqx_schema_registry:add_schema(AvroName, schema_params(avro)),
ProtobufName = <<"myprotobuf">>,
MessageType = <<"Person">>,
?assertNot(emqx_schema_registry:is_existing_type(ProtobufName)),
ok = emqx_schema_registry:add_schema(ProtobufName, schema_params(protobuf)),
%% JSON Schema: no inner names
?assert(emqx_schema_registry:is_existing_type(JsonName)),
?assertNot(emqx_schema_registry:is_existing_type(JsonName, [JsonName])),
%% Avro: no inner names
?assert(emqx_schema_registry:is_existing_type(AvroName)),
?assertNot(emqx_schema_registry:is_existing_type(AvroName, [AvroName])),
%% Protobuf: one level of message types
?assert(emqx_schema_registry:is_existing_type(ProtobufName)),
?assertNot(emqx_schema_registry:is_existing_type(ProtobufName, [ProtobufName])),
?assert(emqx_schema_registry:is_existing_type(ProtobufName, [MessageType])),
?assertNot(emqx_schema_registry:is_existing_type(ProtobufName, [MessageType, MessageType])),
ok.

View File

@ -5,7 +5,8 @@
{mod, {emqx_schema_validation_app, []}}, {mod, {emqx_schema_validation_app, []}},
{applications, [ {applications, [
kernel, kernel,
stdlib stdlib,
emqx_schema_registry
]}, ]},
{env, []}, {env, []},
{modules, []}, {modules, []},

View File

@ -3,6 +3,8 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_schema_validation). -module(emqx_schema_validation).
-feature(maybe_expr, enable).
-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("snabbkaffe/include/trace.hrl").
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/emqx_hooks.hrl").
@ -41,7 +43,13 @@
-define(TRACE_TAG, "SCHEMA_VALIDATION"). -define(TRACE_TAG, "SCHEMA_VALIDATION").
-type validation_name() :: binary(). -type validation_name() :: binary().
-type validation() :: _TODO. -type raw_validation() :: #{binary() => _}.
-type validation() :: #{
name := validation_name(),
strategy := all_pass | any_pass,
failure_action := drop | disconnect | ignore,
log_failure := #{level := error | warning | notice | info | debug | none}
}.
-export_type([ -export_type([
validation/0, validation/0,
@ -65,12 +73,12 @@ reorder(Order) ->
lookup(Name) -> lookup(Name) ->
emqx_schema_validation_config:lookup(Name). emqx_schema_validation_config:lookup(Name).
-spec insert(validation()) -> -spec insert(raw_validation()) ->
{ok, _} | {error, _}. {ok, _} | {error, _}.
insert(Validation) -> insert(Validation) ->
emqx_schema_validation_config:insert(Validation). emqx_schema_validation_config:insert(Validation).
-spec update(validation()) -> -spec update(raw_validation()) ->
{ok, _} | {error, _}. {ok, _} | {error, _}.
update(Validation) -> update(Validation) ->
emqx_schema_validation_config:update(Validation). emqx_schema_validation_config:update(Validation).

View File

@ -3,6 +3,8 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_schema_validation_config). -module(emqx_schema_validation_config).
-feature(maybe_expr, enable).
%% API %% API
-export([ -export([
add_handler/0, add_handler/0,
@ -136,15 +138,25 @@ pre_config_update([?CONF_ROOT], {merge, NewConfig}, OldConfig) ->
pre_config_update([?CONF_ROOT], {replace, NewConfig}, _OldConfig) -> pre_config_update([?CONF_ROOT], {replace, NewConfig}, _OldConfig) ->
{ok, NewConfig}. {ok, NewConfig}.
post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) -> post_config_update(
{Pos, Validation} = fetch_with_index(New, Name), ?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name} = RawValidation}, New, _Old, _AppEnvs
ok = emqx_schema_validation_registry:insert(Pos, Validation), ) ->
ok; maybe
post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) -> ok ?= assert_referenced_schemas_exist(RawValidation),
{_Pos, OldValidation} = fetch_with_index(Old, Name), {Pos, Validation} = fetch_with_index(New, Name),
{Pos, NewValidation} = fetch_with_index(New, Name), ok = emqx_schema_validation_registry:insert(Pos, Validation),
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation), ok
ok; end;
post_config_update(
?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name} = RawValidation}, New, Old, _AppEnvs
) ->
maybe
ok ?= assert_referenced_schemas_exist(RawValidation),
{_Pos, OldValidation} = fetch_with_index(Old, Name),
{Pos, NewValidation} = fetch_with_index(New, Name),
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
ok
end;
post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) -> post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
{Pos, Validation} = fetch_with_index(Old, Name), {Pos, Validation} = fetch_with_index(Old, Name),
ok = emqx_schema_validation_registry:delete(Validation, Pos), ok = emqx_schema_validation_registry:delete(Validation, Pos),
@ -161,16 +173,19 @@ post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) ->
OldValidations, OldValidations,
fun(#{name := N}) -> N end fun(#{name := N}) -> N end
), ),
NewValidations = maybe
lists:map( ok ?= multi_assert_referenced_schemas_exist(NewValidations0),
fun(#{name := Name}) -> NewValidations =
{Pos, Validation} = fetch_with_index(ResultingValidations, Name), lists:map(
ok = emqx_schema_validation_registry:insert(Pos, Validation), fun(#{name := Name}) ->
#{name => Name, pos => Pos} {Pos, Validation} = fetch_with_index(ResultingValidations, Name),
end, ok = emqx_schema_validation_registry:insert(Pos, Validation),
NewValidations0 #{name => Name, pos => Pos}
), end,
{ok, #{new_validations => NewValidations}}; NewValidations0
),
{ok, #{new_validations => NewValidations}}
end;
post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnvs) -> post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnvs) ->
#{ #{
new_validations := NewValidations, new_validations := NewValidations,
@ -179,32 +194,46 @@ post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnv
} = prepare_config_replace(Input, Old), } = prepare_config_replace(Input, Old),
#{validations := ResultingValidations} = ResultingConfig, #{validations := ResultingValidations} = ResultingConfig,
#{validations := OldValidations} = Old, #{validations := OldValidations} = Old,
lists:foreach( NewOrChangedValidationNames = NewValidations ++ ChangedValidations0,
fun(Name) -> maybe
{Pos, Validation} = fetch_with_index(OldValidations, Name), ok ?=
ok = emqx_schema_validation_registry:delete(Validation, Pos) multi_assert_referenced_schemas_exist(
end, lists:filter(
DeletedValidations fun(#{name := N}) ->
), lists:member(N, NewOrChangedValidationNames)
lists:foreach( end,
fun(Name) -> ResultingValidations
{Pos, Validation} = fetch_with_index(ResultingValidations, Name), )
ok = emqx_schema_validation_registry:insert(Pos, Validation) ),
end, lists:foreach(
NewValidations
),
ChangedValidations =
lists:map(
fun(Name) -> fun(Name) ->
{_Pos, OldValidation} = fetch_with_index(OldValidations, Name), {Pos, Validation} = fetch_with_index(OldValidations, Name),
{Pos, NewValidation} = fetch_with_index(ResultingValidations, Name), ok = emqx_schema_validation_registry:delete(Validation, Pos)
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
#{name => Name, pos => Pos}
end, end,
ChangedValidations0 DeletedValidations
), ),
ok = emqx_schema_validation_registry:reindex_positions(ResultingValidations, OldValidations), lists:foreach(
{ok, #{changed_validations => ChangedValidations}}. fun(Name) ->
{Pos, Validation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:insert(Pos, Validation)
end,
NewValidations
),
ChangedValidations =
lists:map(
fun(Name) ->
{_Pos, OldValidation} = fetch_with_index(OldValidations, Name),
{Pos, NewValidation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
#{name => Name, pos => Pos}
end,
ChangedValidations0
),
ok = emqx_schema_validation_registry:reindex_positions(
ResultingValidations, OldValidations
),
{ok, #{changed_validations => ChangedValidations}}
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% `emqx_config_backup' API %% `emqx_config_backup' API
@ -388,3 +417,65 @@ prepare_config_replace(NewConfig, OldConfig) ->
changed_validations => ChangedValidations0 ++ ChangedValidations1, changed_validations => ChangedValidations0 ++ ChangedValidations1,
deleted_validations => DeletedValidations deleted_validations => DeletedValidations
}. }.
-spec assert_referenced_schemas_exist(raw_validation()) -> ok | {error, map()}.
assert_referenced_schemas_exist(RawValidation) ->
#{<<"checks">> := RawChecks} = RawValidation,
SchemasToCheck =
lists:filtermap(
fun
(#{<<"schema">> := SchemaName} = Check) ->
%% so far, only protobuf has inner types
InnerPath =
case maps:find(<<"message_type">>, Check) of
{ok, MessageType} -> [MessageType];
error -> []
end,
{true, {SchemaName, InnerPath}};
(_Check) ->
false
end,
RawChecks
),
do_assert_referenced_schemas_exist(SchemasToCheck).
do_assert_referenced_schemas_exist(SchemasToCheck) ->
MissingSchemas =
lists:foldl(
fun({SchemaName, InnerPath}, Acc) ->
case emqx_schema_registry:is_existing_type(SchemaName, InnerPath) of
true ->
Acc;
false ->
[[SchemaName | InnerPath] | Acc]
end
end,
[],
SchemasToCheck
),
case MissingSchemas of
[] ->
ok;
[_ | _] ->
{error, #{missing_schemas => MissingSchemas}}
end.
-spec multi_assert_referenced_schemas_exist([validation()]) -> ok | {error, map()}.
multi_assert_referenced_schemas_exist(Validations) ->
SchemasToCheck =
lists:filtermap(
fun
(#{schema := SchemaName} = Check) ->
%% so far, only protobuf has inner types
InnerPath =
case maps:find(message_type, Check) of
{ok, MessageType} -> [MessageType];
error -> []
end,
{true, {SchemaName, InnerPath}};
(_Check) ->
false
end,
[Check || #{checks := Checks} <- Validations, Check <- Checks]
),
do_assert_referenced_schemas_exist(SchemasToCheck).

View File

@ -356,25 +356,36 @@ protobuf_invalid_payloads() ->
]. ].
protobuf_create_serde(SerdeName) -> protobuf_create_serde(SerdeName) ->
Source = protobuf_upsert_serde(SerdeName, <<"Person">>).
<<
"message Person {\n" protobuf_upsert_serde(SerdeName, MessageType) ->
" required string name = 1;\n" Source = protobuf_source(MessageType),
" required int32 id = 2;\n"
" optional string email = 3;\n"
" }\n"
"message UnionValue {\n"
" oneof u {\n"
" int32 a = 1;\n"
" string b = 2;\n"
" }\n"
"}"
>>,
Schema = #{type => protobuf, source => Source}, Schema = #{type => protobuf, source => Source},
ok = emqx_schema_registry:add_schema(SerdeName, Schema), ok = emqx_schema_registry:add_schema(SerdeName, Schema),
on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end), on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end),
ok. ok.
protobuf_source(MessageType) ->
iolist_to_binary(
[
<<"message ">>,
MessageType,
<<" {\n">>,
<<
" required string name = 1;\n"
" required int32 id = 2;\n"
" optional string email = 3;\n"
" }\n"
"message UnionValue {\n"
" oneof u {\n"
" int32 a = 1;\n"
" string b = 2;\n"
" }\n"
"}"
>>
]
).
%% Checks that the internal order in the registry/index matches expectation. %% Checks that the internal order in the registry/index matches expectation.
assert_index_order(ExpectedOrder, Topic, Comment) -> assert_index_order(ExpectedOrder, Topic, Comment) ->
?assertEqual( ?assertEqual(
@ -1041,6 +1052,7 @@ t_duplicated_schema_checks(_Config) ->
Name1 = <<"foo">>, Name1 = <<"foo">>,
SerdeName = <<"myserde">>, SerdeName = <<"myserde">>,
Check = schema_check(json, SerdeName), Check = schema_check(json, SerdeName),
json_create_serde(SerdeName),
Validation1 = validation(Name1, [Check, sql_check(), Check]), Validation1 = validation(Name1, [Check, sql_check(), Check]),
?assertMatch({400, _}, insert(Validation1)), ?assertMatch({400, _}, insert(Validation1)),
@ -1130,18 +1142,87 @@ t_multiple_validations(_Config) ->
ok. ok.
%% Test that we validate schema registry serde existency when using the HTTP API.
t_schema_check_non_existent_serde(_Config) -> t_schema_check_non_existent_serde(_Config) ->
SerdeName = <<"idontexist">>, SerdeName = <<"idontexist">>,
Name1 = <<"foo">>, Name1 = <<"foo">>,
Check1 = schema_check(json, SerdeName), Check1 = schema_check(json, SerdeName),
Validation1 = validation(Name1, [Check1]), Validation1 = validation(Name1, [Check1]),
{201, _} = insert(Validation1), ?assertMatch({400, _}, insert(Validation1)),
C = connect(<<"c1">>), Check2 = schema_check(avro, SerdeName),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), Validation2 = validation(Name1, [Check2]),
?assertMatch({400, _}, insert(Validation2)),
ok = publish(C, <<"t/1">>, #{i => 10, s => <<"s">>}), MessageType = <<"idontexisteither">>,
?assertNotReceive({publish, _}), Check3 = schema_check(protobuf, SerdeName, #{<<"message_type">> => MessageType}),
Validation3 = validation(Name1, [Check3]),
?assertMatch({400, _}, insert(Validation3)),
protobuf_create_serde(SerdeName),
%% Still fails because reference message type doesn't exist.
?assertMatch({400, _}, insert(Validation3)),
ok.
%% Test that we validate schema registry serde existency when loading configs.
t_schema_check_non_existent_serde_load_config(_Config) ->
Name1 = <<"1">>,
SerdeName1 = <<"serde1">>,
MessageType1 = <<"mt">>,
Check1A = schema_check(protobuf, SerdeName1, #{<<"message_type">> => MessageType1}),
Validation1A = validation(Name1, [Check1A]),
protobuf_upsert_serde(SerdeName1, MessageType1),
{201, _} = insert(Validation1A),
Name2 = <<"2">>,
SerdeName2 = <<"serde2">>,
Check2A = schema_check(json, SerdeName2),
Validation2A = validation(Name2, [Check2A]),
json_create_serde(SerdeName2),
{201, _} = insert(Validation2A),
%% Config to load
%% Will replace existing config
MissingMessageType = <<"missing_mt">>,
Check1B = schema_check(protobuf, SerdeName1, #{<<"message_type">> => MissingMessageType}),
Validation1B = validation(Name1, [Check1B]),
%% Will replace existing config
MissingSerdeName1 = <<"missing1">>,
Check2B = schema_check(json, MissingSerdeName1),
Validation2B = validation(Name2, [Check2B]),
%% New validation; should be appended
Name3 = <<"3">>,
MissingSerdeName2 = <<"missing2">>,
Check3 = schema_check(avro, MissingSerdeName2),
Validation3 = validation(Name3, [Check3]),
ConfRootBin = <<"schema_validation">>,
ConfigToLoad1 = #{
ConfRootBin => #{
<<"validations">> => [Validation1B, Validation2B, Validation3]
}
},
ConfigToLoadBin1 = iolist_to_binary(hocon_pp:do(ConfigToLoad1, #{})),
%% Merge
ResMerge = emqx_conf_cli:load_config(ConfigToLoadBin1, #{mode => merge}),
?assertMatch({error, _}, ResMerge),
{error, ErrorMessage1} = ResMerge,
?assertEqual(match, re:run(ErrorMessage1, <<"missing_schemas">>, [{capture, none}])),
?assertEqual(match, re:run(ErrorMessage1, MissingSerdeName1, [{capture, none}])),
?assertEqual(match, re:run(ErrorMessage1, MissingSerdeName2, [{capture, none}])),
?assertEqual(match, re:run(ErrorMessage1, MissingMessageType, [{capture, none}])),
%% Replace
ResReplace = emqx_conf_cli:load_config(ConfigToLoadBin1, #{mode => replace}),
?assertMatch({error, _}, ResReplace),
{error, ErrorMessage2} = ResReplace,
?assertEqual(match, re:run(ErrorMessage2, <<"missing_schemas">>, [{capture, none}])),
?assertEqual(match, re:run(ErrorMessage2, MissingSerdeName1, [{capture, none}])),
?assertEqual(match, re:run(ErrorMessage2, MissingSerdeName2, [{capture, none}])),
?assertEqual(match, re:run(ErrorMessage2, MissingMessageType, [{capture, none}])),
ok. ok.
@ -1232,16 +1313,16 @@ t_schema_check_protobuf(_Config) ->
), ),
%% Bad config: unknown message name %% Bad config: unknown message name
Check2 = schema_check(protobuf, SerdeName, #{<<"message_type">> => <<"idontexist">>}), %% Schema updated to use another message type after validation was created
Validation2 = validation(Name1, [Check2]), OtherMessageType = <<"NewPersonType">>,
{200, _} = update(Validation2), protobuf_upsert_serde(SerdeName, OtherMessageType),
lists:foreach( lists:foreach(
fun(Payload) -> fun(Payload) ->
ok = publish(C, <<"t/1">>, {raw, Payload}), ok = publish(C, <<"t/1">>, {raw, Payload}),
?assertNotReceive({publish, _}) ?assertNotReceive({publish, _})
end, end,
protobuf_valid_payloads(SerdeName, MessageType) protobuf_valid_payloads(SerdeName, OtherMessageType)
), ),
ok. ok.

View File

@ -0,0 +1 @@
Now, when inserting or updating a Schema Validation, EMQX will check if the referenced schemas and message types exist in Schema Registry.