diff --git a/apps/emqx_gateway_ocpp/rebar.config b/apps/emqx_gateway_ocpp/rebar.config index 16bbcd109..f5095dd30 100644 --- a/apps/emqx_gateway_ocpp/rebar.config +++ b/apps/emqx_gateway_ocpp/rebar.config @@ -1,7 +1,7 @@ %% -*- mode: erlang; -*- {deps, [ - {jesse, "1.7.0"}, + {jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.7.12"}}}, {emqx, {path, "../../apps/emqx"}}, {emqx_utils, {path, "../emqx_utils"}}, {emqx_gateway, {path, "../../apps/emqx_gateway"}} diff --git a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl index 4e5fb6ce5..b25042c20 100644 --- a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl +++ b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl @@ -26,7 +26,7 @@ -type encoded_data() :: iodata(). -type decoded_data() :: map(). --type serde_type() :: avro | protobuf. +-type serde_type() :: avro | protobuf | json. -type serde_opts() :: map(). -record(serde, { diff --git a/apps/emqx_schema_registry/rebar.config b/apps/emqx_schema_registry/rebar.config index 44082865b..e49e69fbd 100644 --- a/apps/emqx_schema_registry/rebar.config +++ b/apps/emqx_schema_registry/rebar.config @@ -6,6 +6,7 @@ {emqx_utils, {path, "../emqx_utils"}}, {emqx_rule_engine, {path, "../emqx_rule_engine"}}, {erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}}, + {jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.7.12"}}}, {gpb, "4.19.9"} ]}. diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry.app.src b/apps/emqx_schema_registry/src/emqx_schema_registry.app.src index 21430c5ff..01b082271 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry.app.src +++ b/apps/emqx_schema_registry/src/emqx_schema_registry.app.src @@ -10,7 +10,8 @@ kernel, stdlib, erlavro, - gpb + gpb, + jesse ]}, {env, []}, {modules, []}, diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry.erl b/apps/emqx_schema_registry/src/emqx_schema_registry.erl index 2e6edab74..7ba4ebcf8 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry.erl @@ -218,7 +218,10 @@ terminate(_Reason, _State) -> %%------------------------------------------------------------------------------------------------- create_tables() -> - ok = emqx_utils_ets:new(?SERDE_TAB, [public, {keypos, #serde.name}]), + ok = emqx_utils_ets:new(?SERDE_TAB, [public, ordered_set, {keypos, #serde.name}]), + %% have to create the table for jesse_database otherwise the on-demand table will disappear + %% when the caller process dies + ok = emqx_utils_ets:new(jesse_ets, [public, ordered_set]), ok = mria:create_table(?PROTOBUF_CACHE_TAB, [ {type, set}, {rlog_shard, ?SCHEMA_REGISTRY_SHARD}, diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry_schema.erl b/apps/emqx_schema_registry/src/emqx_schema_registry_schema.erl index ea0bffc04..7a7780a71 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry_schema.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry_schema.erl @@ -52,34 +52,48 @@ fields(?CONF_KEY_ROOT) -> ]; fields(avro) -> [ - {type, mk(avro, #{required => true, desc => ?DESC("schema_type")})}, - {source, - mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})}, - {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})} + {type, mk(avro, #{required => true, desc => ?DESC("schema_type_avro")})} + | common_fields(emqx_schema:json_binary()) ]; fields(protobuf) -> [ - {type, mk(protobuf, #{required => true, desc => ?DESC("schema_type")})}, - {source, mk(binary(), #{required => true, desc => ?DESC("schema_source")})}, - {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})} + {type, mk(protobuf, #{required => true, desc => ?DESC("schema_type_protobuf")})} + | common_fields(binary()) + ]; +fields(json) -> + [ + {type, mk(json, #{required => true, desc => ?DESC("schema_type_json")})} + | common_fields(emqx_schema:json_binary()) ]; fields("get_avro") -> [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)]; fields("get_protobuf") -> [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(protobuf)]; +fields("get_json") -> + [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(json)]; fields("put_avro") -> fields(avro); fields("put_protobuf") -> fields(protobuf); +fields("put_json") -> + fields(json); fields("post_" ++ Type) -> fields("get_" ++ Type). +common_fields(SourceType) -> + [ + {source, mk(SourceType, #{required => true, desc => ?DESC("schema_source")})}, + {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})} + ]. + desc(?CONF_KEY_ROOT) -> ?DESC("schema_registry_root"); desc(avro) -> ?DESC("avro_type"); desc(protobuf) -> ?DESC("protobuf_type"); +desc(json) -> + ?DESC("json_type"); desc(_) -> undefined. @@ -121,7 +135,7 @@ mk(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Name) -> hoconsc:ref(?MODULE, Name). supported_serde_types() -> - [avro, protobuf]. + [avro, protobuf, json]. refs() -> [ref(Type) || Type <- supported_serde_types()]. @@ -132,6 +146,8 @@ refs(#{<<"type">> := <<"avro">>}) -> [ref(avro)]; refs(#{<<"type">> := <<"protobuf">>}) -> [ref(protobuf)]; +refs(#{<<"type">> := <<"json">>}) -> + [ref(json)]; refs(_) -> Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]), throw(#{ @@ -140,7 +156,7 @@ refs(_) -> }). refs_get_api() -> - [ref("get_avro"), ref("get_protobuf")]. + [ref("get_avro"), ref("get_protobuf"), ref("get_json")]. refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) -> refs(Value#{<<"type">> := atom_to_binary(TypeAtom)}); @@ -148,6 +164,8 @@ refs_get_api(#{<<"type">> := <<"avro">>}) -> [ref("get_avro")]; refs_get_api(#{<<"type">> := <<"protobuf">>}) -> [ref("get_protobuf")]; +refs_get_api(#{<<"type">> := <<"json">>}) -> + [ref("get_json")]; refs_get_api(_) -> Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]), throw(#{ diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl index 2be00a0b9..e3a365386 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl @@ -11,20 +11,32 @@ %% API -export([ - decode/2, - decode/3, - encode/2, - encode/3, make_serde/3, handle_rule_function/2, destroy/1 ]). +%% Tests -export([ + decode/2, + decode/3, + encode/2, + encode/3, eval_decode/2, eval_encode/2 ]). +-define(BOOL(SerdeName, EXPR), + try + _ = EXPR, + true + catch + error:Reason -> + ?SLOG(debug, #{msg => "schema_check_failed", schema => SerdeName, reason => Reason}), + false + end +). + %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ @@ -40,10 +52,6 @@ handle_rule_function(sparkplug_decode, [Data | MoreArgs]) -> schema_decode, [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs] ); -handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) -> - decode(SchemaId, Data, MoreArgs); -handle_rule_function(schema_decode, Args) -> - error({args_count_error, {schema_decode, Args}}); handle_rule_function(sparkplug_encode, [Term]) -> handle_rule_function( schema_encode, @@ -54,6 +62,10 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) -> schema_encode, [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs] ); +handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) -> + decode(SchemaId, Data, MoreArgs); +handle_rule_function(schema_decode, Args) -> + error({args_count_error, {schema_decode, Args}}); handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) -> %% encode outputs iolists, but when the rule actions process those %% it might wrongly encode them as JSON lists, so we force them to @@ -62,33 +74,60 @@ handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) -> iolist_to_binary(IOList); handle_rule_function(schema_encode, Args) -> error({args_count_error, {schema_encode, Args}}); +handle_rule_function(schema_check_decode, [SchemaId, Data | MoreArgs]) -> + check_decode(SchemaId, Data, MoreArgs); +handle_rule_function(schema_check_encode, [SchemaId, Term | MoreArgs]) -> + check_encode(SchemaId, Term, MoreArgs); handle_rule_function(_, _) -> {error, no_match_for_function}. +-spec check_decode(schema_name(), encoded_data(), [term()]) -> decoded_data(). +check_decode(SerdeName, Data, VarArgs) -> + with_serde( + SerdeName, + fun(Serde) -> + ?BOOL(SerdeName, eval_decode(Serde, [Data | VarArgs])) + end + ). + +-spec check_encode(schema_name(), decoded_data(), [term()]) -> encoded_data(). +check_encode(SerdeName, Data, VarArgs) when is_list(VarArgs) -> + with_serde( + SerdeName, + fun(Serde) -> + ?BOOL(SerdeName, eval_encode(Serde, [Data | VarArgs])) + end + ). + -spec decode(schema_name(), encoded_data()) -> decoded_data(). decode(SerdeName, RawData) -> decode(SerdeName, RawData, []). -spec decode(schema_name(), encoded_data(), [term()]) -> decoded_data(). decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) -> - case emqx_schema_registry:get_serde(SerdeName) of - {error, not_found} -> - error({serde_not_found, SerdeName}); - {ok, Serde} -> - eval_decode(Serde, [RawData | VarArgs]) - end. + with_serde(SerdeName, fun(Serde) -> + eval_decode(Serde, [RawData | VarArgs]) + end). -spec encode(schema_name(), decoded_data()) -> encoded_data(). encode(SerdeName, RawData) -> encode(SerdeName, RawData, []). -spec encode(schema_name(), decoded_data(), [term()]) -> encoded_data(). -encode(SerdeName, EncodedData, VarArgs) when is_list(VarArgs) -> - case emqx_schema_registry:get_serde(SerdeName) of - {error, not_found} -> - error({serde_not_found, SerdeName}); +encode(SerdeName, Data, VarArgs) when is_list(VarArgs) -> + with_serde( + SerdeName, + fun(Serde) -> + eval_encode(Serde, [Data | VarArgs]) + end + ). + +with_serde(Name, F) -> + case emqx_schema_registry:get_serde(Name) of {ok, Serde} -> - eval_encode(Serde, [EncodedData | VarArgs]) + F(Serde); + {error, not_found} -> + error({serde_not_found, Name}) end. -spec make_serde(serde_type(), schema_name(), schema_source()) -> serde(). @@ -108,7 +147,17 @@ make_serde(protobuf, Name, Source) -> name = Name, type = protobuf, eval_context = SerdeMod - }. + }; +make_serde(json, Name, Source) -> + case json_decode(Source) of + SchemaObj when is_map(SchemaObj) -> + %% jesse:add_schema adds any map() without further validation + %% if it's not a map, then case_clause + ok = jesse_add_schema(Name, SchemaObj), + #serde{name = Name, type = json}; + _NotMap -> + error({invalid_json_schema, bad_schema_object}) + end. eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) -> Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]), @@ -116,14 +165,29 @@ eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) -> eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) -> MessageName = binary_to_existing_atom(MessageName0, utf8), Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]), - emqx_utils_maps:binary_key_map(Decoded). + emqx_utils_maps:binary_key_map(Decoded); +eval_decode(#serde{type = json, name = Name}, [Data]) -> + true = is_binary(Data), + Term = json_decode(Data), + {ok, NewTerm} = jesse_validate(Name, Term), + NewTerm. eval_encode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) -> avro_binary_encoder:encode(Store, Name, Data); eval_encode(#serde{type = protobuf, eval_context = SerdeMod}, [DecodedData0, MessageName0]) -> DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0), MessageName = binary_to_existing_atom(MessageName0, utf8), - apply(SerdeMod, encode_msg, [DecodedData, MessageName]). + apply(SerdeMod, encode_msg, [DecodedData, MessageName]); +eval_encode(#serde{type = json, name = Name}, [Map]) -> + %% The input Map may not be a valid JSON term for jesse + Data = iolist_to_binary(emqx_utils_json:encode(Map)), + NewMap = json_decode(Data), + case jesse_validate(Name, NewMap) of + {ok, _} -> + Data; + {error, Reason} -> + error(Reason) + end. destroy(#serde{type = avro, name = _Name}) -> ?tp(serde_destroyed, #{type => avro, name => _Name}), @@ -131,12 +195,31 @@ destroy(#serde{type = avro, name = _Name}) -> destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) -> unload_code(SerdeMod), ?tp(serde_destroyed, #{type => protobuf, name => _Name}), + ok; +destroy(#serde{type = json, name = Name}) -> + ok = jesse_del_schema(Name), + ?tp(serde_destroyed, #{type => json, name => Name}), ok. %%------------------------------------------------------------------------------ %% Internal fns %%------------------------------------------------------------------------------ +json_decode(Data) -> + emqx_utils_json:decode(Data, [return_maps]). + +jesse_add_schema(Name, Obj) -> + jesse:add_schema(jesse_name(Name), Obj). + +jesse_del_schema(Name) -> + jesse:del_schema(jesse_name(Name)). + +jesse_validate(Name, Map) -> + jesse:validate(jesse_name(Name), Map, []). + +jesse_name(Str) -> + unicode:characters_to_list(Str). + -spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module(). make_protobuf_serde_mod(Name, Source) -> {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name), diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl index ab311c578..22252b7c3 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl @@ -23,14 +23,15 @@ all() -> [ {group, avro}, - {group, protobuf} + {group, protobuf}, + {group, json} ] ++ sparkplug_tests(). groups() -> AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(), ProtobufOnlyTCs = protobuf_only_tcs(), TCs = AllTCsExceptSP -- ProtobufOnlyTCs, - [{avro, TCs}, {protobuf, AllTCsExceptSP}]. + [{avro, TCs}, {json, TCs}, {protobuf, AllTCsExceptSP}]. protobuf_only_tcs() -> [ @@ -57,6 +58,8 @@ end_per_suite(_Config) -> init_per_group(avro, Config) -> [{serde_type, avro} | Config]; +init_per_group(json, Config) -> + [{serde_type, json} | Config]; init_per_group(protobuf, Config) -> [{serde_type, protobuf} | Config]; init_per_group(_Group, Config) -> @@ -140,6 +143,18 @@ schema_params(avro) -> }, SourceBin = emqx_utils_json:encode(Source), #{type => avro, source => SourceBin}; +schema_params(json) -> + Source = + #{ + type => object, + properties => #{ + i => #{type => integer}, + s => #{type => string} + }, + required => [<<"i">>, <<"s">>] + }, + SourceBin = emqx_utils_json:encode(Source), + #{type => json, source => SourceBin}; schema_params(protobuf) -> SourceBin = << @@ -162,7 +177,7 @@ create_serde(SerdeType, SerdeName) -> ok = emqx_schema_registry:add_schema(SerdeName, Schema), ok. -test_params_for(avro, encode_decode1) -> +test_params_for(Type, encode_decode1) when Type =:= avro; Type =:= json -> SQL = << "select\n" @@ -186,7 +201,7 @@ test_params_for(avro, encode_decode1) -> expected_rule_output => ExpectedRuleOutput, extra_args => ExtraArgs }; -test_params_for(avro, encode1) -> +test_params_for(Type, encode1) when Type =:= avro; Type =:= json -> SQL = << "select\n" @@ -202,7 +217,7 @@ test_params_for(avro, encode1) -> payload_template => PayloadTemplate, extra_args => ExtraArgs }; -test_params_for(avro, decode1) -> +test_params_for(Type, decode1) when Type =:= avro; Type =:= json -> SQL = << "select\n" @@ -503,13 +518,18 @@ t_encode(Config) -> PayloadBin = emqx_utils_json:encode(Payload), emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), Published = receive_published(?LINE), - ?assertMatch( - #{payload := P} when is_binary(P), - Published - ), - #{payload := Encoded} = Published, - {ok, Serde} = emqx_schema_registry:get_serde(SerdeName), - ?assertEqual(Payload, eval_decode(Serde, [Encoded | ExtraArgs])), + case SerdeType of + json -> + %% should have received binary + %% but since it's valid json, so it got + %% 'safe_decode' decoded in receive_published + ?assertMatch(#{payload := #{<<"i">> := _, <<"s">> := _}}, Published); + _ -> + ?assertMatch(#{payload := B} when is_binary(B), Published), + #{payload := Encoded} = Published, + {ok, Serde} = emqx_schema_registry:get_serde(SerdeName), + ?assertEqual(Payload, eval_decode(Serde, [Encoded | ExtraArgs])) + end, ok. t_decode(Config) -> @@ -607,8 +627,13 @@ t_protobuf_union_decode(Config) -> t_fail_rollback(Config) -> SerdeType = ?config(serde_type, Config), OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)), - BrokenSchema = OkSchema#{<<"source">> := <<"{}">>}, - + BrokenSchema = + case SerdeType of + json -> + OkSchema#{<<"source">> := <<"not a json value">>}; + _ -> + OkSchema#{<<"source">> := <<"{}">>} + end, ?assertMatch( {ok, _}, emqx_conf:update( diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl index 77d940191..7aede9b4c 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl @@ -23,14 +23,16 @@ all() -> [ {group, avro}, - {group, protobuf} + {group, protobuf}, + {group, json} ]. groups() -> AllTCs = emqx_common_test_helpers:all(?MODULE), [ {avro, AllTCs}, - {protobuf, AllTCs} + {protobuf, AllTCs}, + {json, AllTCs} ]. init_per_suite(Config) -> @@ -80,6 +82,23 @@ init_per_group(protobuf, Config) -> {schema_source, SourceBin}, {invalid_schema_source, InvalidSourceBin} | Config + ]; +init_per_group(json, Config) -> + Source = + #{ + properties => #{ + foo => #{}, + bar => #{} + }, + required => [<<"foo">>] + }, + SourceBin = emqx_utils_json:encode(Source), + InvalidSourceBin = <<"\"not an object\"">>, + [ + {serde_type, json}, + {schema_source, SourceBin}, + {invalid_schema_source, InvalidSourceBin} + | Config ]. end_per_group(_Group, _Config) -> @@ -279,7 +298,7 @@ t_crud(Config) -> <<"code">> := <<"BAD_REQUEST">>, <<"message">> := #{ - <<"expected">> := <<"avro | protobuf">>, + <<"expected">> := <<"avro | protobuf | json">>, <<"field_name">> := <<"type">> } }}, @@ -302,7 +321,7 @@ t_crud(Config) -> <<"code">> := <<"BAD_REQUEST">>, <<"message">> := #{ - <<"expected">> := <<"avro | protobuf">>, + <<"expected">> := <<"avro | protobuf | json">>, <<"field_name">> := <<"type">> } }}, diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl index 1fa7124ac..3c6ecd14e 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl @@ -15,6 +15,10 @@ -import(emqx_common_test_helpers, [on_exit/1]). -define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]). +-define(INVALID_JSON, #{ + reason := #{expected := "emqx_schema:json_binary()"}, + kind := validation_error +}). %%------------------------------------------------------------------------------ %% CT boilerplate @@ -79,7 +83,21 @@ schema_params(protobuf) -> " }\n" " " >>, - #{type => protobuf, source => SourceBin}. + #{type => protobuf, source => SourceBin}; +schema_params(json) -> + Source = + #{ + <<"$schema">> => <<"http://json-schema.org/draft-06/schema#">>, + <<"$id">> => <<"http://json-schema.org/draft-06/schema#">>, + type => object, + properties => #{ + foo => #{type => integer}, + bar => #{type => integer} + }, + required => [<<"foo">>] + }, + SourceBin = emqx_utils_json:encode(Source), + #{type => json, source => SourceBin}. assert_roundtrip(SerdeName, Original) -> Encoded = emqx_schema_registry_serde:encode(SerdeName, Original), @@ -109,10 +127,7 @@ t_avro_invalid_json_schema(_Config) -> SerdeName = my_serde, Params = schema_params(avro), WrongParams = Params#{source := <<"{">>}, - ?assertMatch( - {error, #{reason := #{expected := _}}}, - emqx_schema_registry:add_schema(SerdeName, WrongParams) - ), + ?assertMatch({error, ?INVALID_JSON}, emqx_schema_registry:add_schema(SerdeName, WrongParams)), ok. t_avro_invalid_schema(_Config) -> @@ -128,14 +143,25 @@ t_avro_invalid_schema(_Config) -> t_serde_not_found(_Config) -> %% for coverage NonexistentSerde = <<"nonexistent">>, - Original = #{}, ?assertError( {serde_not_found, NonexistentSerde}, - emqx_schema_registry_serde:encode(NonexistentSerde, Original) + emqx_schema_registry_serde:encode(NonexistentSerde, data) ), ?assertError( {serde_not_found, NonexistentSerde}, - emqx_schema_registry_serde:decode(NonexistentSerde, Original) + emqx_schema_registry_serde:decode(NonexistentSerde, data) + ), + ?assertError( + {serde_not_found, NonexistentSerde}, + emqx_schema_registry_serde:handle_rule_function(schema_check_decode, [ + NonexistentSerde, data + ]) + ), + ?assertError( + {serde_not_found, NonexistentSerde}, + emqx_schema_registry_serde:handle_rule_function(schema_check_encode, [ + NonexistentSerde, data + ]) ), ok. @@ -171,3 +197,44 @@ t_protobuf_invalid_schema(_Config) -> emqx_schema_registry:add_schema(SerdeName, WrongParams) ), ok. + +t_json_invalid_schema(_Config) -> + SerdeName = invalid_json, + Params = schema_params(json), + BadParams1 = Params#{source := <<"not valid json value">>}, + BadParams2 = Params#{source := <<"\"not an object\"">>}, + BadParams3 = Params#{source := <<"{\"foo\": 1}">>}, + ?assertMatch({error, ?INVALID_JSON}, emqx_schema_registry:add_schema(SerdeName, BadParams1)), + ?assertMatch( + {error, {post_config_update, _, {invalid_json_schema, bad_schema_object}}}, + emqx_schema_registry:add_schema(SerdeName, BadParams2) + ), + ?assertMatch( + ok, + emqx_schema_registry:add_schema(SerdeName, BadParams3) + ), + ok. + +t_roundtrip_json(_Config) -> + SerdeName = my_json_schema, + Params = schema_params(json), + ok = emqx_schema_registry:add_schema(SerdeName, Params), + Original = #{<<"foo">> => 1, <<"bar">> => 2}, + assert_roundtrip(SerdeName, Original), + ok. + +t_json_validation(_Config) -> + SerdeName = my_json_schema, + Params = schema_params(json), + ok = emqx_schema_registry:add_schema(SerdeName, Params), + F = fun(Fn, Data) -> + emqx_schema_registry_serde:handle_rule_function(Fn, [SerdeName, Data]) + end, + OK = #{<<"foo">> => 1, <<"bar">> => 2}, + NotOk = #{<<"bar">> => 2}, + ?assert(F(schema_check_encode, OK)), + ?assert(F(schema_check_decode, <<"{\"foo\": 1, \"bar\": 2}">>)), + ?assertNot(F(schema_check_encode, NotOk)), + ?assertNot(F(schema_check_decode, <<"{\"bar\": 2}">>)), + ?assertNot(F(schema_check_decode, <<"{\"foo\": \"notinteger\", \"bar\": 2}">>)), + ok. diff --git a/rebar.config.erl b/rebar.config.erl index b340e7eea..1ca2dee09 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -50,8 +50,7 @@ deps(Config) -> overrides() -> [ - {add, [{extra_src_dirs, [{"etc", [{recursive, true}]}]}]}, - {add, jesse, [{erl_opts, [nowarn_match_float_zero]}]} + {add, [{extra_src_dirs, [{"etc", [{recursive, true}]}]}]} ] ++ snabbkaffe_overrides(). %% Temporary workaround for a rebar3 erl_opts duplication diff --git a/rel/i18n/emqx_schema_registry_schema.hocon b/rel/i18n/emqx_schema_registry_schema.hocon index bd1e53bce..695b106f0 100644 --- a/rel/i18n/emqx_schema_registry_schema.hocon +++ b/rel/i18n/emqx_schema_registry_schema.hocon @@ -12,6 +12,14 @@ protobuf_type.desc: protobuf_type.label: """Protocol Buffers""" +json_type.desc: """~ + Supports JSON Schema + [Draft 03](http://tools.ietf.org/html/draft-zyp-json-schema-03) + [Draft 04](http://tools.ietf.org/html/draft-zyp-json-schema-04) and + [Draft 06](https://datatracker.ietf.org/doc/html/draft-wright-json-schema-00).""" + +json_type.label: "JSON Schema" + schema_description.desc: """A description for this schema.""" @@ -42,10 +50,22 @@ schema_source.desc: schema_source.label: """Schema source""" -schema_type.desc: -"""Schema type.""" +schema_type_avro.desc: +"""Must be `avro` for Avro schema.""" -schema_type.label: -"""Schema type""" +schema_type_avro.label: +"""Avro Schema""" + +schema_type_protobuf.desc: +"""Must be `protobuf` for protobuf schema.""" + +schema_type_protobuf.label: +"""Protobuf Schema""" + +schema_type_json.desc: +"""Must be `json` for JSON schema.""" + +schema_type_json.label: +"""JSON Schema""" } diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index bfa63e349..8adec3506 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -198,6 +198,7 @@ procs progname prometheus proto +protobuf ps psk pubsub