diff --git a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl index b25042c20..11bbb0b72 100644 --- a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl +++ b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl @@ -26,7 +26,7 @@ -type encoded_data() :: iodata(). -type decoded_data() :: map(). --type serde_type() :: avro | protobuf | json. +-type serde_type() :: emqx_schema_registry_serde:serde_type(). -type serde_opts() :: map(). -record(serde, { diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry.erl b/apps/emqx_schema_registry/src/emqx_schema_registry.erl index 7ba4ebcf8..5005b0dc8 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry.erl @@ -16,6 +16,8 @@ start_link/0, add_schema/2, get_schema/1, + is_existing_type/1, + is_existing_type/2, delete_schema/1, list_schemas/0 ]). @@ -52,6 +54,7 @@ %% API %%------------------------------------------------------------------------------------------------- +-spec start_link() -> gen_server:start_ret(). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -64,6 +67,14 @@ get_serde(SchemaName) -> {ok, Serde} end. +-spec is_existing_type(schema_name()) -> boolean(). +is_existing_type(SchemaName) -> + is_existing_type(SchemaName, []). + +-spec is_existing_type(schema_name(), [binary()]) -> boolean(). +is_existing_type(SchemaName, Path) -> + emqx_schema_registry_serde:is_existing_type(SchemaName, Path). + -spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}. get_schema(SchemaName) -> case diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl index e8da35449..1a2f90ee7 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_schema_registry_serde). +-feature(maybe_expr, enable). + -behaviour(emqx_rule_funcs). -include("emqx_schema_registry.hrl"). @@ -14,6 +16,8 @@ make_serde/3, handle_rule_function/2, schema_check/3, + is_existing_type/1, + is_existing_type/2, destroy/1 ]). @@ -27,6 +31,10 @@ eval_encode/2 ]). +%%------------------------------------------------------------------------------ +%% Type definitions +%%------------------------------------------------------------------------------ + -define(BOOL(SerdeName, EXPR), try _ = EXPR, @@ -38,10 +46,28 @@ end ). +-type eval_context() :: term(). + +-export_type([serde_type/0]). + %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ +-spec is_existing_type(schema_name()) -> boolean(). +is_existing_type(SchemaName) -> + is_existing_type(SchemaName, []). + +-spec is_existing_type(schema_name(), [binary()]) -> boolean(). +is_existing_type(SchemaName, Path) -> + maybe + {ok, #serde{type = SerdeType, eval_context = EvalContext}} ?= + emqx_schema_registry:get_serde(SchemaName), + has_inner_type(SerdeType, EvalContext, Path) + else + _ -> false + end. + -spec handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}. handle_rule_function(sparkplug_decode, [Data]) -> handle_rule_function( @@ -338,3 +364,22 @@ unload_code(SerdeMod) -> _ = code:purge(SerdeMod), _ = code:delete(SerdeMod), ok. + +-spec has_inner_type(serde_type(), eval_context(), [binary()]) -> + boolean(). +has_inner_type(protobuf, _SerdeMod, [_, _ | _]) -> + %% Protobuf only has one level of message types. + false; +has_inner_type(protobuf, SerdeMod, [MessageTypeBin]) -> + try apply(SerdeMod, get_msg_names, []) of + Names -> + lists:member(MessageTypeBin, [atom_to_binary(N, utf8) || N <- Names]) + catch + _:_ -> + false + end; +has_inner_type(_SerdeType, _EvalContext, []) -> + %% This function is only called if we already found a serde, so the root does exist. + true; +has_inner_type(_SerdeType, _EvalContext, _Path) -> + false. diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl index 0fad015f0..bdc083736 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl @@ -14,7 +14,6 @@ -import(emqx_common_test_helpers, [on_exit/1]). --define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]). -define(INVALID_JSON, #{ reason := #{expected := "emqx_schema:json_binary()"}, kind := validation_error @@ -28,12 +27,20 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_config:save_schema_mod_and_names(emqx_schema_registry_schema), - emqx_mgmt_api_test_util:init_suite(?APPS), - Config. + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_schema_registry, + emqx_rule_engine + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)), +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), ok. init_per_testcase(_TestCase, Config) -> Config. @@ -240,3 +247,27 @@ t_json_validation(_Config) -> ?assertNot(F(schema_check, <<"{\"bar\": 2}">>)), ?assertNot(F(schema_check, <<"{\"foo\": \"notinteger\", \"bar\": 2}">>)), ok. + +t_is_existing_type(_Config) -> + JsonName = <<"myjson">>, + ?assertNot(emqx_schema_registry:is_existing_type(JsonName)), + ok = emqx_schema_registry:add_schema(JsonName, schema_params(json)), + AvroName = <<"myavro">>, + ?assertNot(emqx_schema_registry:is_existing_type(AvroName)), + ok = emqx_schema_registry:add_schema(AvroName, schema_params(avro)), + ProtobufName = <<"myprotobuf">>, + MessageType = <<"Person">>, + ?assertNot(emqx_schema_registry:is_existing_type(ProtobufName)), + ok = emqx_schema_registry:add_schema(ProtobufName, schema_params(protobuf)), + %% JSON Schema: no inner names + ?assert(emqx_schema_registry:is_existing_type(JsonName)), + ?assertNot(emqx_schema_registry:is_existing_type(JsonName, [JsonName])), + %% Avro: no inner names + ?assert(emqx_schema_registry:is_existing_type(AvroName)), + ?assertNot(emqx_schema_registry:is_existing_type(AvroName, [AvroName])), + %% Protobuf: one level of message types + ?assert(emqx_schema_registry:is_existing_type(ProtobufName)), + ?assertNot(emqx_schema_registry:is_existing_type(ProtobufName, [ProtobufName])), + ?assert(emqx_schema_registry:is_existing_type(ProtobufName, [MessageType])), + ?assertNot(emqx_schema_registry:is_existing_type(ProtobufName, [MessageType, MessageType])), + ok.