feat(schema_registry): add JSON schema

This commit is contained in:
Zaiming (Stone) Shi 2024-02-24 09:15:07 +01:00
parent 21cf600242
commit b7e5ea2941
13 changed files with 304 additions and 67 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang; -*- %% -*- mode: erlang; -*-
{deps, [ {deps, [
{jesse, "1.7.0"}, {jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.7.12"}}},
{emqx, {path, "../../apps/emqx"}}, {emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../emqx_utils"}}, {emqx_utils, {path, "../emqx_utils"}},
{emqx_gateway, {path, "../../apps/emqx_gateway"}} {emqx_gateway, {path, "../../apps/emqx_gateway"}}

View File

@ -26,7 +26,7 @@
-type encoded_data() :: iodata(). -type encoded_data() :: iodata().
-type decoded_data() :: map(). -type decoded_data() :: map().
-type serde_type() :: avro | protobuf. -type serde_type() :: avro | protobuf | json.
-type serde_opts() :: map(). -type serde_opts() :: map().
-record(serde, { -record(serde, {

View File

@ -6,6 +6,7 @@
{emqx_utils, {path, "../emqx_utils"}}, {emqx_utils, {path, "../emqx_utils"}},
{emqx_rule_engine, {path, "../emqx_rule_engine"}}, {emqx_rule_engine, {path, "../emqx_rule_engine"}},
{erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}}, {erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}},
{jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.7.12"}}},
{gpb, "4.19.9"} {gpb, "4.19.9"}
]}. ]}.

View File

@ -10,7 +10,8 @@
kernel, kernel,
stdlib, stdlib,
erlavro, erlavro,
gpb gpb,
jesse
]}, ]},
{env, []}, {env, []},
{modules, []}, {modules, []},

View File

@ -218,7 +218,10 @@ terminate(_Reason, _State) ->
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
create_tables() -> create_tables() ->
ok = emqx_utils_ets:new(?SERDE_TAB, [public, {keypos, #serde.name}]), ok = emqx_utils_ets:new(?SERDE_TAB, [public, ordered_set, {keypos, #serde.name}]),
%% have to create the table for jesse_database otherwise the on-demand table will disappear
%% when the caller process dies
ok = emqx_utils_ets:new(jesse_ets, [public, ordered_set]),
ok = mria:create_table(?PROTOBUF_CACHE_TAB, [ ok = mria:create_table(?PROTOBUF_CACHE_TAB, [
{type, set}, {type, set},
{rlog_shard, ?SCHEMA_REGISTRY_SHARD}, {rlog_shard, ?SCHEMA_REGISTRY_SHARD},

View File

@ -52,34 +52,48 @@ fields(?CONF_KEY_ROOT) ->
]; ];
fields(avro) -> fields(avro) ->
[ [
{type, mk(avro, #{required => true, desc => ?DESC("schema_type")})}, {type, mk(avro, #{required => true, desc => ?DESC("schema_type_avro")})}
{source, | common_fields(emqx_schema:json_binary())
mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})},
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
]; ];
fields(protobuf) -> fields(protobuf) ->
[ [
{type, mk(protobuf, #{required => true, desc => ?DESC("schema_type")})}, {type, mk(protobuf, #{required => true, desc => ?DESC("schema_type_protobuf")})}
{source, mk(binary(), #{required => true, desc => ?DESC("schema_source")})}, | common_fields(binary())
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})} ];
fields(json) ->
[
{type, mk(json, #{required => true, desc => ?DESC("schema_type_json")})}
| common_fields(emqx_schema:json_binary())
]; ];
fields("get_avro") -> fields("get_avro") ->
[{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)]; [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)];
fields("get_protobuf") -> fields("get_protobuf") ->
[{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(protobuf)]; [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(protobuf)];
fields("get_json") ->
[{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(json)];
fields("put_avro") -> fields("put_avro") ->
fields(avro); fields(avro);
fields("put_protobuf") -> fields("put_protobuf") ->
fields(protobuf); fields(protobuf);
fields("put_json") ->
fields(json);
fields("post_" ++ Type) -> fields("post_" ++ Type) ->
fields("get_" ++ Type). fields("get_" ++ Type).
common_fields(SourceType) ->
[
{source, mk(SourceType, #{required => true, desc => ?DESC("schema_source")})},
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
].
desc(?CONF_KEY_ROOT) -> desc(?CONF_KEY_ROOT) ->
?DESC("schema_registry_root"); ?DESC("schema_registry_root");
desc(avro) -> desc(avro) ->
?DESC("avro_type"); ?DESC("avro_type");
desc(protobuf) -> desc(protobuf) ->
?DESC("protobuf_type"); ?DESC("protobuf_type");
desc(json) ->
?DESC("json_type");
desc(_) -> desc(_) ->
undefined. undefined.
@ -121,7 +135,7 @@ mk(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Name) -> hoconsc:ref(?MODULE, Name). ref(Name) -> hoconsc:ref(?MODULE, Name).
supported_serde_types() -> supported_serde_types() ->
[avro, protobuf]. [avro, protobuf, json].
refs() -> refs() ->
[ref(Type) || Type <- supported_serde_types()]. [ref(Type) || Type <- supported_serde_types()].
@ -132,6 +146,8 @@ refs(#{<<"type">> := <<"avro">>}) ->
[ref(avro)]; [ref(avro)];
refs(#{<<"type">> := <<"protobuf">>}) -> refs(#{<<"type">> := <<"protobuf">>}) ->
[ref(protobuf)]; [ref(protobuf)];
refs(#{<<"type">> := <<"json">>}) ->
[ref(json)];
refs(_) -> refs(_) ->
Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]), Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
throw(#{ throw(#{
@ -140,7 +156,7 @@ refs(_) ->
}). }).
refs_get_api() -> refs_get_api() ->
[ref("get_avro"), ref("get_protobuf")]. [ref("get_avro"), ref("get_protobuf"), ref("get_json")].
refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) -> refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
refs(Value#{<<"type">> := atom_to_binary(TypeAtom)}); refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
@ -148,6 +164,8 @@ refs_get_api(#{<<"type">> := <<"avro">>}) ->
[ref("get_avro")]; [ref("get_avro")];
refs_get_api(#{<<"type">> := <<"protobuf">>}) -> refs_get_api(#{<<"type">> := <<"protobuf">>}) ->
[ref("get_protobuf")]; [ref("get_protobuf")];
refs_get_api(#{<<"type">> := <<"json">>}) ->
[ref("get_json")];
refs_get_api(_) -> refs_get_api(_) ->
Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]), Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
throw(#{ throw(#{

View File

@ -11,20 +11,32 @@
%% API %% API
-export([ -export([
decode/2,
decode/3,
encode/2,
encode/3,
make_serde/3, make_serde/3,
handle_rule_function/2, handle_rule_function/2,
destroy/1 destroy/1
]). ]).
%% Tests
-export([ -export([
decode/2,
decode/3,
encode/2,
encode/3,
eval_decode/2, eval_decode/2,
eval_encode/2 eval_encode/2
]). ]).
-define(BOOL(SerdeName, EXPR),
try
_ = EXPR,
true
catch
error:Reason ->
?SLOG(debug, #{msg => "schema_check_failed", schema => SerdeName, reason => Reason}),
false
end
).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -40,10 +52,6 @@ handle_rule_function(sparkplug_decode, [Data | MoreArgs]) ->
schema_decode, schema_decode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs] [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs]
); );
handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
decode(SchemaId, Data, MoreArgs);
handle_rule_function(schema_decode, Args) ->
error({args_count_error, {schema_decode, Args}});
handle_rule_function(sparkplug_encode, [Term]) -> handle_rule_function(sparkplug_encode, [Term]) ->
handle_rule_function( handle_rule_function(
schema_encode, schema_encode,
@ -54,6 +62,10 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) ->
schema_encode, schema_encode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs] [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
); );
handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
decode(SchemaId, Data, MoreArgs);
handle_rule_function(schema_decode, Args) ->
error({args_count_error, {schema_decode, Args}});
handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) -> handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
%% encode outputs iolists, but when the rule actions process those %% encode outputs iolists, but when the rule actions process those
%% it might wrongly encode them as JSON lists, so we force them to %% it might wrongly encode them as JSON lists, so we force them to
@ -62,33 +74,60 @@ handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
iolist_to_binary(IOList); iolist_to_binary(IOList);
handle_rule_function(schema_encode, Args) -> handle_rule_function(schema_encode, Args) ->
error({args_count_error, {schema_encode, Args}}); error({args_count_error, {schema_encode, Args}});
handle_rule_function(schema_check_decode, [SchemaId, Data | MoreArgs]) ->
check_decode(SchemaId, Data, MoreArgs);
handle_rule_function(schema_check_encode, [SchemaId, Term | MoreArgs]) ->
check_encode(SchemaId, Term, MoreArgs);
handle_rule_function(_, _) -> handle_rule_function(_, _) ->
{error, no_match_for_function}. {error, no_match_for_function}.
-spec check_decode(schema_name(), encoded_data(), [term()]) -> decoded_data().
check_decode(SerdeName, Data, VarArgs) ->
with_serde(
SerdeName,
fun(Serde) ->
?BOOL(SerdeName, eval_decode(Serde, [Data | VarArgs]))
end
).
-spec check_encode(schema_name(), decoded_data(), [term()]) -> encoded_data().
check_encode(SerdeName, Data, VarArgs) when is_list(VarArgs) ->
with_serde(
SerdeName,
fun(Serde) ->
?BOOL(SerdeName, eval_encode(Serde, [Data | VarArgs]))
end
).
-spec decode(schema_name(), encoded_data()) -> decoded_data(). -spec decode(schema_name(), encoded_data()) -> decoded_data().
decode(SerdeName, RawData) -> decode(SerdeName, RawData) ->
decode(SerdeName, RawData, []). decode(SerdeName, RawData, []).
-spec decode(schema_name(), encoded_data(), [term()]) -> decoded_data(). -spec decode(schema_name(), encoded_data(), [term()]) -> decoded_data().
decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) -> decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) ->
case emqx_schema_registry:get_serde(SerdeName) of with_serde(SerdeName, fun(Serde) ->
{error, not_found} ->
error({serde_not_found, SerdeName});
{ok, Serde} ->
eval_decode(Serde, [RawData | VarArgs]) eval_decode(Serde, [RawData | VarArgs])
end. end).
-spec encode(schema_name(), decoded_data()) -> encoded_data(). -spec encode(schema_name(), decoded_data()) -> encoded_data().
encode(SerdeName, RawData) -> encode(SerdeName, RawData) ->
encode(SerdeName, RawData, []). encode(SerdeName, RawData, []).
-spec encode(schema_name(), decoded_data(), [term()]) -> encoded_data(). -spec encode(schema_name(), decoded_data(), [term()]) -> encoded_data().
encode(SerdeName, EncodedData, VarArgs) when is_list(VarArgs) -> encode(SerdeName, Data, VarArgs) when is_list(VarArgs) ->
case emqx_schema_registry:get_serde(SerdeName) of with_serde(
{error, not_found} -> SerdeName,
error({serde_not_found, SerdeName}); fun(Serde) ->
eval_encode(Serde, [Data | VarArgs])
end
).
with_serde(Name, F) ->
case emqx_schema_registry:get_serde(Name) of
{ok, Serde} -> {ok, Serde} ->
eval_encode(Serde, [EncodedData | VarArgs]) F(Serde);
{error, not_found} ->
error({serde_not_found, Name})
end. end.
-spec make_serde(serde_type(), schema_name(), schema_source()) -> serde(). -spec make_serde(serde_type(), schema_name(), schema_source()) -> serde().
@ -108,7 +147,17 @@ make_serde(protobuf, Name, Source) ->
name = Name, name = Name,
type = protobuf, type = protobuf,
eval_context = SerdeMod eval_context = SerdeMod
}. };
make_serde(json, Name, Source) ->
case json_decode(Source) of
SchemaObj when is_map(SchemaObj) ->
%% jesse:add_schema adds any map() without further validation
%% if it's not a map, then case_clause
ok = jesse_add_schema(Name, SchemaObj),
#serde{name = Name, type = json};
_NotMap ->
error({invalid_json_schema, bad_schema_object})
end.
eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) -> eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]), Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
@ -116,14 +165,29 @@ eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) -> eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) ->
MessageName = binary_to_existing_atom(MessageName0, utf8), MessageName = binary_to_existing_atom(MessageName0, utf8),
Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]), Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]),
emqx_utils_maps:binary_key_map(Decoded). emqx_utils_maps:binary_key_map(Decoded);
eval_decode(#serde{type = json, name = Name}, [Data]) ->
true = is_binary(Data),
Term = json_decode(Data),
{ok, NewTerm} = jesse_validate(Name, Term),
NewTerm.
eval_encode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) -> eval_encode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
avro_binary_encoder:encode(Store, Name, Data); avro_binary_encoder:encode(Store, Name, Data);
eval_encode(#serde{type = protobuf, eval_context = SerdeMod}, [DecodedData0, MessageName0]) -> eval_encode(#serde{type = protobuf, eval_context = SerdeMod}, [DecodedData0, MessageName0]) ->
DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0), DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
MessageName = binary_to_existing_atom(MessageName0, utf8), MessageName = binary_to_existing_atom(MessageName0, utf8),
apply(SerdeMod, encode_msg, [DecodedData, MessageName]). apply(SerdeMod, encode_msg, [DecodedData, MessageName]);
eval_encode(#serde{type = json, name = Name}, [Map]) ->
%% The input Map may not be a valid JSON term for jesse
Data = iolist_to_binary(emqx_utils_json:encode(Map)),
NewMap = json_decode(Data),
case jesse_validate(Name, NewMap) of
{ok, _} ->
Data;
{error, Reason} ->
error(Reason)
end.
destroy(#serde{type = avro, name = _Name}) -> destroy(#serde{type = avro, name = _Name}) ->
?tp(serde_destroyed, #{type => avro, name => _Name}), ?tp(serde_destroyed, #{type => avro, name => _Name}),
@ -131,12 +195,31 @@ destroy(#serde{type = avro, name = _Name}) ->
destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) -> destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) ->
unload_code(SerdeMod), unload_code(SerdeMod),
?tp(serde_destroyed, #{type => protobuf, name => _Name}), ?tp(serde_destroyed, #{type => protobuf, name => _Name}),
ok;
destroy(#serde{type = json, name = Name}) ->
ok = jesse_del_schema(Name),
?tp(serde_destroyed, #{type => json, name => Name}),
ok. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal fns %% Internal fns
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
json_decode(Data) ->
emqx_utils_json:decode(Data, [return_maps]).
jesse_add_schema(Name, Obj) ->
jesse:add_schema(jesse_name(Name), Obj).
jesse_del_schema(Name) ->
jesse:del_schema(jesse_name(Name)).
jesse_validate(Name, Map) ->
jesse:validate(jesse_name(Name), Map, []).
jesse_name(Str) ->
unicode:characters_to_list(Str).
-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module(). -spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
make_protobuf_serde_mod(Name, Source) -> make_protobuf_serde_mod(Name, Source) ->
{SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name), {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),

View File

@ -23,14 +23,15 @@
all() -> all() ->
[ [
{group, avro}, {group, avro},
{group, protobuf} {group, protobuf},
{group, json}
] ++ sparkplug_tests(). ] ++ sparkplug_tests().
groups() -> groups() ->
AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(), AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(),
ProtobufOnlyTCs = protobuf_only_tcs(), ProtobufOnlyTCs = protobuf_only_tcs(),
TCs = AllTCsExceptSP -- ProtobufOnlyTCs, TCs = AllTCsExceptSP -- ProtobufOnlyTCs,
[{avro, TCs}, {protobuf, AllTCsExceptSP}]. [{avro, TCs}, {json, TCs}, {protobuf, AllTCsExceptSP}].
protobuf_only_tcs() -> protobuf_only_tcs() ->
[ [
@ -57,6 +58,8 @@ end_per_suite(_Config) ->
init_per_group(avro, Config) -> init_per_group(avro, Config) ->
[{serde_type, avro} | Config]; [{serde_type, avro} | Config];
init_per_group(json, Config) ->
[{serde_type, json} | Config];
init_per_group(protobuf, Config) -> init_per_group(protobuf, Config) ->
[{serde_type, protobuf} | Config]; [{serde_type, protobuf} | Config];
init_per_group(_Group, Config) -> init_per_group(_Group, Config) ->
@ -140,6 +143,18 @@ schema_params(avro) ->
}, },
SourceBin = emqx_utils_json:encode(Source), SourceBin = emqx_utils_json:encode(Source),
#{type => avro, source => SourceBin}; #{type => avro, source => SourceBin};
schema_params(json) ->
Source =
#{
type => object,
properties => #{
i => #{type => integer},
s => #{type => string}
},
required => [<<"i">>, <<"s">>]
},
SourceBin = emqx_utils_json:encode(Source),
#{type => json, source => SourceBin};
schema_params(protobuf) -> schema_params(protobuf) ->
SourceBin = SourceBin =
<< <<
@ -162,7 +177,7 @@ create_serde(SerdeType, SerdeName) ->
ok = emqx_schema_registry:add_schema(SerdeName, Schema), ok = emqx_schema_registry:add_schema(SerdeName, Schema),
ok. ok.
test_params_for(avro, encode_decode1) -> test_params_for(Type, encode_decode1) when Type =:= avro; Type =:= json ->
SQL = SQL =
<< <<
"select\n" "select\n"
@ -186,7 +201,7 @@ test_params_for(avro, encode_decode1) ->
expected_rule_output => ExpectedRuleOutput, expected_rule_output => ExpectedRuleOutput,
extra_args => ExtraArgs extra_args => ExtraArgs
}; };
test_params_for(avro, encode1) -> test_params_for(Type, encode1) when Type =:= avro; Type =:= json ->
SQL = SQL =
<< <<
"select\n" "select\n"
@ -202,7 +217,7 @@ test_params_for(avro, encode1) ->
payload_template => PayloadTemplate, payload_template => PayloadTemplate,
extra_args => ExtraArgs extra_args => ExtraArgs
}; };
test_params_for(avro, decode1) -> test_params_for(Type, decode1) when Type =:= avro; Type =:= json ->
SQL = SQL =
<< <<
"select\n" "select\n"
@ -503,13 +518,18 @@ t_encode(Config) ->
PayloadBin = emqx_utils_json:encode(Payload), PayloadBin = emqx_utils_json:encode(Payload),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Published = receive_published(?LINE), Published = receive_published(?LINE),
?assertMatch( case SerdeType of
#{payload := P} when is_binary(P), json ->
Published %% should have received binary
), %% but since it's valid json, so it got
%% 'safe_decode' decoded in receive_published
?assertMatch(#{payload := #{<<"i">> := _, <<"s">> := _}}, Published);
_ ->
?assertMatch(#{payload := B} when is_binary(B), Published),
#{payload := Encoded} = Published, #{payload := Encoded} = Published,
{ok, Serde} = emqx_schema_registry:get_serde(SerdeName), {ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
?assertEqual(Payload, eval_decode(Serde, [Encoded | ExtraArgs])), ?assertEqual(Payload, eval_decode(Serde, [Encoded | ExtraArgs]))
end,
ok. ok.
t_decode(Config) -> t_decode(Config) ->
@ -607,8 +627,13 @@ t_protobuf_union_decode(Config) ->
t_fail_rollback(Config) -> t_fail_rollback(Config) ->
SerdeType = ?config(serde_type, Config), SerdeType = ?config(serde_type, Config),
OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)), OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)),
BrokenSchema = OkSchema#{<<"source">> := <<"{}">>}, BrokenSchema =
case SerdeType of
json ->
OkSchema#{<<"source">> := <<"not a json value">>};
_ ->
OkSchema#{<<"source">> := <<"{}">>}
end,
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_conf:update( emqx_conf:update(

View File

@ -23,14 +23,16 @@
all() -> all() ->
[ [
{group, avro}, {group, avro},
{group, protobuf} {group, protobuf},
{group, json}
]. ].
groups() -> groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE), AllTCs = emqx_common_test_helpers:all(?MODULE),
[ [
{avro, AllTCs}, {avro, AllTCs},
{protobuf, AllTCs} {protobuf, AllTCs},
{json, AllTCs}
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
@ -80,6 +82,23 @@ init_per_group(protobuf, Config) ->
{schema_source, SourceBin}, {schema_source, SourceBin},
{invalid_schema_source, InvalidSourceBin} {invalid_schema_source, InvalidSourceBin}
| Config | Config
];
init_per_group(json, Config) ->
Source =
#{
properties => #{
foo => #{},
bar => #{}
},
required => [<<"foo">>]
},
SourceBin = emqx_utils_json:encode(Source),
InvalidSourceBin = <<"\"not an object\"">>,
[
{serde_type, json},
{schema_source, SourceBin},
{invalid_schema_source, InvalidSourceBin}
| Config
]. ].
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
@ -279,7 +298,7 @@ t_crud(Config) ->
<<"code">> := <<"BAD_REQUEST">>, <<"code">> := <<"BAD_REQUEST">>,
<<"message">> := <<"message">> :=
#{ #{
<<"expected">> := <<"avro | protobuf">>, <<"expected">> := <<"avro | protobuf | json">>,
<<"field_name">> := <<"type">> <<"field_name">> := <<"type">>
} }
}}, }},
@ -302,7 +321,7 @@ t_crud(Config) ->
<<"code">> := <<"BAD_REQUEST">>, <<"code">> := <<"BAD_REQUEST">>,
<<"message">> := <<"message">> :=
#{ #{
<<"expected">> := <<"avro | protobuf">>, <<"expected">> := <<"avro | protobuf | json">>,
<<"field_name">> := <<"type">> <<"field_name">> := <<"type">>
} }
}}, }},

View File

@ -15,6 +15,10 @@
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
-define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]). -define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]).
-define(INVALID_JSON, #{
reason := #{expected := "emqx_schema:json_binary()"},
kind := validation_error
}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% CT boilerplate %% CT boilerplate
@ -79,7 +83,21 @@ schema_params(protobuf) ->
" }\n" " }\n"
" " " "
>>, >>,
#{type => protobuf, source => SourceBin}. #{type => protobuf, source => SourceBin};
schema_params(json) ->
Source =
#{
<<"$schema">> => <<"http://json-schema.org/draft-06/schema#">>,
<<"$id">> => <<"http://json-schema.org/draft-06/schema#">>,
type => object,
properties => #{
foo => #{type => integer},
bar => #{type => integer}
},
required => [<<"foo">>]
},
SourceBin = emqx_utils_json:encode(Source),
#{type => json, source => SourceBin}.
assert_roundtrip(SerdeName, Original) -> assert_roundtrip(SerdeName, Original) ->
Encoded = emqx_schema_registry_serde:encode(SerdeName, Original), Encoded = emqx_schema_registry_serde:encode(SerdeName, Original),
@ -109,10 +127,7 @@ t_avro_invalid_json_schema(_Config) ->
SerdeName = my_serde, SerdeName = my_serde,
Params = schema_params(avro), Params = schema_params(avro),
WrongParams = Params#{source := <<"{">>}, WrongParams = Params#{source := <<"{">>},
?assertMatch( ?assertMatch({error, ?INVALID_JSON}, emqx_schema_registry:add_schema(SerdeName, WrongParams)),
{error, #{reason := #{expected := _}}},
emqx_schema_registry:add_schema(SerdeName, WrongParams)
),
ok. ok.
t_avro_invalid_schema(_Config) -> t_avro_invalid_schema(_Config) ->
@ -128,14 +143,25 @@ t_avro_invalid_schema(_Config) ->
t_serde_not_found(_Config) -> t_serde_not_found(_Config) ->
%% for coverage %% for coverage
NonexistentSerde = <<"nonexistent">>, NonexistentSerde = <<"nonexistent">>,
Original = #{},
?assertError( ?assertError(
{serde_not_found, NonexistentSerde}, {serde_not_found, NonexistentSerde},
emqx_schema_registry_serde:encode(NonexistentSerde, Original) emqx_schema_registry_serde:encode(NonexistentSerde, data)
), ),
?assertError( ?assertError(
{serde_not_found, NonexistentSerde}, {serde_not_found, NonexistentSerde},
emqx_schema_registry_serde:decode(NonexistentSerde, Original) emqx_schema_registry_serde:decode(NonexistentSerde, data)
),
?assertError(
{serde_not_found, NonexistentSerde},
emqx_schema_registry_serde:handle_rule_function(schema_check_decode, [
NonexistentSerde, data
])
),
?assertError(
{serde_not_found, NonexistentSerde},
emqx_schema_registry_serde:handle_rule_function(schema_check_encode, [
NonexistentSerde, data
])
), ),
ok. ok.
@ -171,3 +197,44 @@ t_protobuf_invalid_schema(_Config) ->
emqx_schema_registry:add_schema(SerdeName, WrongParams) emqx_schema_registry:add_schema(SerdeName, WrongParams)
), ),
ok. ok.
t_json_invalid_schema(_Config) ->
SerdeName = invalid_json,
Params = schema_params(json),
BadParams1 = Params#{source := <<"not valid json value">>},
BadParams2 = Params#{source := <<"\"not an object\"">>},
BadParams3 = Params#{source := <<"{\"foo\": 1}">>},
?assertMatch({error, ?INVALID_JSON}, emqx_schema_registry:add_schema(SerdeName, BadParams1)),
?assertMatch(
{error, {post_config_update, _, {invalid_json_schema, bad_schema_object}}},
emqx_schema_registry:add_schema(SerdeName, BadParams2)
),
?assertMatch(
ok,
emqx_schema_registry:add_schema(SerdeName, BadParams3)
),
ok.
t_roundtrip_json(_Config) ->
SerdeName = my_json_schema,
Params = schema_params(json),
ok = emqx_schema_registry:add_schema(SerdeName, Params),
Original = #{<<"foo">> => 1, <<"bar">> => 2},
assert_roundtrip(SerdeName, Original),
ok.
t_json_validation(_Config) ->
SerdeName = my_json_schema,
Params = schema_params(json),
ok = emqx_schema_registry:add_schema(SerdeName, Params),
F = fun(Fn, Data) ->
emqx_schema_registry_serde:handle_rule_function(Fn, [SerdeName, Data])
end,
OK = #{<<"foo">> => 1, <<"bar">> => 2},
NotOk = #{<<"bar">> => 2},
?assert(F(schema_check_encode, OK)),
?assert(F(schema_check_decode, <<"{\"foo\": 1, \"bar\": 2}">>)),
?assertNot(F(schema_check_encode, NotOk)),
?assertNot(F(schema_check_decode, <<"{\"bar\": 2}">>)),
?assertNot(F(schema_check_decode, <<"{\"foo\": \"notinteger\", \"bar\": 2}">>)),
ok.

View File

@ -50,8 +50,7 @@ deps(Config) ->
overrides() -> overrides() ->
[ [
{add, [{extra_src_dirs, [{"etc", [{recursive, true}]}]}]}, {add, [{extra_src_dirs, [{"etc", [{recursive, true}]}]}]}
{add, jesse, [{erl_opts, [nowarn_match_float_zero]}]}
] ++ snabbkaffe_overrides(). ] ++ snabbkaffe_overrides().
%% Temporary workaround for a rebar3 erl_opts duplication %% Temporary workaround for a rebar3 erl_opts duplication

View File

@ -12,6 +12,14 @@ protobuf_type.desc:
protobuf_type.label: protobuf_type.label:
"""Protocol Buffers""" """Protocol Buffers"""
json_type.desc: """~
Supports JSON Schema
[Draft 03](http://tools.ietf.org/html/draft-zyp-json-schema-03)
[Draft 04](http://tools.ietf.org/html/draft-zyp-json-schema-04) and
[Draft 06](https://datatracker.ietf.org/doc/html/draft-wright-json-schema-00)."""
json_type.label: "JSON Schema"
schema_description.desc: schema_description.desc:
"""A description for this schema.""" """A description for this schema."""
@ -42,10 +50,22 @@ schema_source.desc:
schema_source.label: schema_source.label:
"""Schema source""" """Schema source"""
schema_type.desc: schema_type_avro.desc:
"""Schema type.""" """Must be `avro` for Avro schema."""
schema_type.label: schema_type_avro.label:
"""Schema type""" """Avro Schema"""
schema_type_protobuf.desc:
"""Must be `protobuf` for protobuf schema."""
schema_type_protobuf.label:
"""Protobuf Schema"""
schema_type_json.desc:
"""Must be `json` for JSON schema."""
schema_type_json.label:
"""JSON Schema"""
} }

View File

@ -198,6 +198,7 @@ procs
progname progname
prometheus prometheus
proto proto
protobuf
ps ps
psk psk
pubsub pubsub