feat: implement schema registry for 5.0 (avro)
Part of https://emqx.atlassian.net/browse/EMQX-9251 This ports part of the Schema Registry app from 4.x to 5.0. Here, only support for Avro is added. Subsequent PRs will follow to add support for other formats.
This commit is contained in:
parent
33100ecca6
commit
97b94b8882
|
@ -52,12 +52,12 @@ WHERE
|
|||
| | [Registry]
|
||||
+------v--------------v------+
|
||||
REGISTER SCHEMA | |
|
||||
-------------------> | +--------+
|
||||
| | | |
|
||||
INSTANCE | | +--------+
|
||||
-------------------> | | |
|
||||
[Management APIs] | Schema Registry ------ Schema |
|
||||
| | | |
|
||||
-------------------> | +--------+
|
||||
LOAD PARSERS | |
|
||||
| | +--------+
|
||||
| |
|
||||
+----------------------------+
|
||||
/ | \
|
||||
+---/---+ +---|----+ +---\---+
|
||||
|
@ -67,3 +67,7 @@ WHERE
|
|||
+-------+ +--------+ +-------+
|
||||
|
||||
```
|
||||
|
||||
- Register schema instance: adds a new instance of a schema of a
|
||||
certain type. For example, when the user may have several Avro or
|
||||
Protobuf schemas that they wish to use with different data flows.
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
get_serde/1,
|
||||
|
||||
add_schema/2,
|
||||
get_schema/1,
|
||||
delete_schema/1,
|
||||
list_schemas/0
|
||||
]).
|
||||
|
@ -26,6 +27,7 @@
|
|||
init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_continue/2,
|
||||
terminate/2
|
||||
]).
|
||||
|
||||
|
@ -54,6 +56,15 @@ get_serde(SchemaName) ->
|
|||
{ok, serde_to_map(Serde)}
|
||||
end.
|
||||
|
||||
-spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}.
|
||||
get_schema(SchemaName) ->
|
||||
case emqx_config:get([?CONF_KEY_ROOT, schemas, SchemaName], undefined) of
|
||||
undefined ->
|
||||
{error, not_found};
|
||||
Config ->
|
||||
{ok, Config}
|
||||
end.
|
||||
|
||||
-spec add_schema(schema_name(), schema()) -> ok | {error, term()}.
|
||||
add_schema(Name, Schema) ->
|
||||
RawSchema = emqx_map_lib:binary_key_map(Schema),
|
||||
|
@ -130,9 +141,12 @@ init(_) ->
|
|||
process_flag(trap_exit, true),
|
||||
create_tables(),
|
||||
Schemas = emqx_conf:get([?CONF_KEY_ROOT, schemas], #{}),
|
||||
async_build_serdes(Schemas),
|
||||
State = #{},
|
||||
{ok, State}.
|
||||
{ok, State, {continue, {build_serdes, Schemas}}}.
|
||||
|
||||
handle_continue({build_serdes, Schemas}, State) ->
|
||||
do_build_serdes(Schemas),
|
||||
{noreply, State}.
|
||||
|
||||
handle_call(_Call, _From, State) ->
|
||||
{reply, {error, unknown_call}, State}.
|
||||
|
@ -223,9 +237,6 @@ ensure_serde_absent(Name) ->
|
|||
ok
|
||||
end.
|
||||
|
||||
async_build_serdes(Schemas) ->
|
||||
gen_server:cast(?MODULE, {build_serdes, Schemas}).
|
||||
|
||||
async_delete_serdes(Names) ->
|
||||
gen_server:cast(?MODULE, {delete_serdes, Names}).
|
||||
|
||||
|
|
|
@ -141,44 +141,44 @@ schema("/schema_registry/:name") ->
|
|||
?OK(Response);
|
||||
'/schema_registry'(post, #{body := Params0 = #{<<"name">> := Name}}) ->
|
||||
Params = maps:without([<<"name">>], Params0),
|
||||
case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of
|
||||
undefined ->
|
||||
case emqx_ee_schema_registry:get_schema(Name) of
|
||||
{error, not_found} ->
|
||||
case emqx_ee_schema_registry:add_schema(Name, Params) of
|
||||
ok ->
|
||||
Res = emqx_config:get([?CONF_KEY_ROOT, schemas, Name]),
|
||||
{ok, Res} = emqx_ee_schema_registry:get_schema(Name),
|
||||
{201, Res#{name => Name}};
|
||||
{error, Error} ->
|
||||
?BAD_REQUEST(Error)
|
||||
end;
|
||||
_ ->
|
||||
{ok, _} ->
|
||||
?BAD_REQUEST('ALREADY_EXISTS', <<"Schema already exists">>)
|
||||
end.
|
||||
|
||||
'/schema_registry/:name'(get, #{bindings := #{name := Name}}) ->
|
||||
case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of
|
||||
undefined ->
|
||||
case emqx_ee_schema_registry:get_schema(Name) of
|
||||
{error, not_found} ->
|
||||
?NOT_FOUND(<<"Schema not found">>);
|
||||
Res ->
|
||||
?OK(Res#{name => Name})
|
||||
{ok, Schema} ->
|
||||
?OK(Schema#{name => Name})
|
||||
end;
|
||||
'/schema_registry/:name'(put, #{bindings := #{name := Name}, body := Params}) ->
|
||||
case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of
|
||||
undefined ->
|
||||
case emqx_ee_schema_registry:get_schema(Name) of
|
||||
{error, not_found} ->
|
||||
?NOT_FOUND(<<"Schema not found">>);
|
||||
_ ->
|
||||
{ok, _} ->
|
||||
case emqx_ee_schema_registry:add_schema(Name, Params) of
|
||||
ok ->
|
||||
Res = emqx_config:get([?CONF_KEY_ROOT, schemas, Name]),
|
||||
{ok, Res} = emqx_ee_schema_registry:get_schema(Name),
|
||||
?OK(Res#{name => Name});
|
||||
{error, Error} ->
|
||||
?BAD_REQUEST(Error)
|
||||
end
|
||||
end;
|
||||
'/schema_registry/:name'(delete, #{bindings := #{name := Name}}) ->
|
||||
case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of
|
||||
undefined ->
|
||||
case emqx_ee_schema_registry:get_schema(Name) of
|
||||
{error, not_found} ->
|
||||
?NOT_FOUND(<<"Schema not found">>);
|
||||
_ ->
|
||||
{ok, _} ->
|
||||
case emqx_ee_schema_registry:delete_schema(Name) of
|
||||
ok ->
|
||||
?NO_CONTENT;
|
||||
|
|
|
@ -48,7 +48,7 @@ fields(?CONF_KEY_ROOT) ->
|
|||
];
|
||||
fields(avro) ->
|
||||
[
|
||||
{type, mk(hoconsc:enum([avro]), #{required => true, desc => ?DESC("schema_type")})},
|
||||
{type, mk(avro, #{required => true, desc => ?DESC("schema_type")})},
|
||||
{source,
|
||||
mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})},
|
||||
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
|
||||
|
|
Loading…
Reference in New Issue