Merge pull request #10337 from thalesmg/schema-registry-v50

feat: implement schema registry for 5.0 (avro)
This commit is contained in:
Thales Macedo Garitezi 2023-04-11 16:46:27 -03:00 committed by GitHub
commit 914184697e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1956 additions and 30 deletions

View File

@ -44,6 +44,7 @@
-type port_number() :: 1..65536.
-type server_parse_option() :: #{default_port => port_number(), no_port => boolean()}.
-type url() :: binary().
-type json_binary() :: binary().
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
@ -58,6 +59,7 @@
-typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}).
-typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}).
-typerefl_from_string({url/0, emqx_schema, to_url}).
-typerefl_from_string({json_binary/0, emqx_schema, to_json_binary}).
-export([
validate_heap_size/1,
@ -84,7 +86,8 @@
to_ip_port/1,
to_erl_cipher_suite/1,
to_comma_separated_atoms/1,
to_url/1
to_url/1,
to_json_binary/1
]).
-export([
@ -112,7 +115,8 @@
ip_port/0,
cipher/0,
comma_separated_atoms/0,
url/0
url/0,
json_binary/0
]).
-export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]).
@ -2576,6 +2580,14 @@ to_url(Str) ->
Error
end.
to_json_binary(Str) ->
case emqx_json:safe_decode(Str) of
{ok, _} ->
{ok, iolist_to_binary(Str)};
Error ->
Error
end.
to_bar_separated_list(Str) ->
{ok, string:tokens(Str, "| ")}.

View File

@ -676,7 +676,8 @@ start_slave(Name, Opts) when is_map(Opts) ->
]
);
slave ->
slave:start_link(host(), Name, ebin_path())
Env = " -env HOCON_ENV_OVERRIDE_PREFIX EMQX_",
slave:start_link(host(), Name, ebin_path() ++ Env)
end
end,
case DoStart() of
@ -749,6 +750,20 @@ setup_node(Node, Opts) when is_map(Opts) ->
%% `emqx_conf' app and correctly catch up the config.
StartAutocluster = maps:get(start_autocluster, Opts, false),
ct:pal(
"setting up node ~p:\n ~p",
[
Node,
#{
start_autocluster => StartAutocluster,
load_apps => LoadApps,
apps => Apps,
env => Env,
start_apps => StartApps
}
]
),
%% Load env before doing anything to avoid overriding
[ok = erpc:call(Node, ?MODULE, load, [App]) || App <- [gen_rpc, ekka, mria, emqx | LoadApps]],
@ -773,10 +788,7 @@ setup_node(Node, Opts) when is_map(Opts) ->
end,
%% Setting env before starting any applications
[
ok = rpc:call(Node, application, set_env, [Application, Key, Value])
|| {Application, Key, Value} <- Env
],
set_envs(Node, Env),
%% Here we start the apps
EnvHandlerForRpc =
@ -794,8 +806,9 @@ setup_node(Node, Opts) when is_map(Opts) ->
node(),
integer_to_list(erlang:unique_integer())
]),
Cookie = atom_to_list(erlang:get_cookie()),
os:putenv("EMQX_NODE__DATA_DIR", NodeDataDir),
os:putenv("EMQX_NODE__COOKIE", atom_to_list(erlang:get_cookie())),
os:putenv("EMQX_NODE__COOKIE", Cookie),
emqx_config:init_load(SchemaMod),
os:unsetenv("EMQX_NODE__DATA_DIR"),
os:unsetenv("EMQX_NODE__COOKIE"),
@ -826,7 +839,15 @@ setup_node(Node, Opts) when is_map(Opts) ->
ok;
_ ->
StartAutocluster andalso
(ok = rpc:call(Node, emqx_machine_boot, start_autocluster, [])),
begin
%% Note: we need to re-set the env because
%% starting the apps apparently make some of them
%% to be lost... This is particularly useful for
%% setting extra apps to be restarted after
%% joining.
set_envs(Node, Env),
ok = erpc:call(Node, emqx_machine_boot, start_autocluster, [])
end,
case rpc:call(Node, ekka, join, [JoinTo]) of
ok ->
ok;
@ -883,6 +904,14 @@ merge_opts(Opts1, Opts2) ->
Opts2
).
set_envs(Node, Env) ->
lists:foreach(
fun({Application, Key, Value}) ->
ok = rpc:call(Node, application, set_env, [Application, Key, Value])
end,
Env
).
erl_flags() ->
%% One core and redirecting logs to master
"+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path().

View File

@ -755,6 +755,8 @@ typename_to_spec("initial()", _Mod) ->
#{type => string, example => <<"0MB">>};
typename_to_spec("bucket_name()", _Mod) ->
#{type => string, example => <<"retainer">>};
typename_to_spec("json_binary()", _Mod) ->
#{type => string, example => <<"{\"a\": [1,true]}">>};
typename_to_spec(Name, Mod) ->
Spec = range(Name),
Spec1 = remote_module_type(Spec, Name, Mod),

View File

@ -1097,26 +1097,27 @@ date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) ->
%% Here the emqx_rule_funcs module acts as a proxy, forwarding
%% the function handling to the worker module.
%% @end
% '$handle_undefined_function'(schema_decode, [SchemaId, Data|MoreArgs]) ->
% emqx_schema_parser:decode(SchemaId, Data, MoreArgs);
% '$handle_undefined_function'(schema_decode, Args) ->
% error({args_count_error, {schema_decode, Args}});
% '$handle_undefined_function'(schema_encode, [SchemaId, Term|MoreArgs]) ->
% emqx_schema_parser:encode(SchemaId, Term, MoreArgs);
% '$handle_undefined_function'(schema_encode, Args) ->
% error({args_count_error, {schema_encode, Args}});
% '$handle_undefined_function'(sprintf, [Format|Args]) ->
% erlang:apply(fun sprintf_s/2, [Format, Args]);
% '$handle_undefined_function'(Fun, Args) ->
% error({sql_function_not_supported, function_literal(Fun, Args)}).
-if(?EMQX_RELEASE_EDITION == ee).
%% EE
'$handle_undefined_function'(schema_decode, [SchemaId, Data | MoreArgs]) ->
emqx_ee_schema_registry_serde:decode(SchemaId, Data, MoreArgs);
'$handle_undefined_function'(schema_decode, Args) ->
error({args_count_error, {schema_decode, Args}});
'$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) ->
emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs);
'$handle_undefined_function'(schema_encode, Args) ->
error({args_count_error, {schema_encode, Args}});
'$handle_undefined_function'(sprintf, [Format | Args]) ->
erlang:apply(fun sprintf_s/2, [Format, Args]);
'$handle_undefined_function'(Fun, Args) ->
error({sql_function_not_supported, function_literal(Fun, Args)}).
-else.
%% CE
'$handle_undefined_function'(sprintf, [Format | Args]) ->
erlang:apply(fun sprintf_s/2, [Format, Args]);
'$handle_undefined_function'(Fun, Args) ->
error({sql_function_not_supported, function_literal(Fun, Args)}).
-endif.
map_path(Key) ->
{path, [{key, P} || P <- string:split(Key, ".", all)]}.

View File

@ -0,0 +1,3 @@
Add schema registry feature.
With schema registry, one can encode and decode special serialization formats in payloads when transforming messages in Rule Engine. Currently, only [Apache Avro](https://avro.apache.org/) is supported.

View File

@ -1,6 +1,6 @@
{application, emqx_ee_conf, [
{description, "EMQX Enterprise Edition configuration schema"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [
kernel,

View File

@ -8,7 +8,7 @@
-export([namespace/0, roots/0, fields/1, translations/0, translation/1]).
-define(EE_SCHEMA_MODULES, [emqx_license_schema]).
-define(EE_SCHEMA_MODULES, [emqx_license_schema, emqx_ee_schema_registry_schema]).
namespace() ->
emqx_conf_schema:namespace().

View File

@ -0,0 +1,19 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~

View File

@ -0,0 +1,73 @@
# EMQX Schema Registry
EMQX Schema Registry for managing various of schemas for
decoding/encoding messages.
To use schema in rule engine, a schema name should be passed to the
SQL functions that decode/encode data, like:
```sql
SELECT
schema_decode('sensor_notify', payload) as payload
FROM
"message.publish"
WHERE
topic = 't/1'
```
## Using schema registry with rule engine
```
+---------------------------+
| |
Events/Msgs | | Events/Msgs
--------------------> EMQX |------------------>
| |
| |
+-------------|-------------+
|
HOOK |
|
+-------------v-------------+ +----------+
| | Data | |
| Rule Engine ------------- Backends |
| | | |
+------|-------------|------+ +----------+
|^ |^
Decode|| ||Encode
|| ||
+------v|------------v|-----+
| |
| Schema Registry |
| |
+---------------------------+
```
## Architecture
```
| |
Decode | [APIs] | Encode
| |
| | [Registry]
+------v--------------v------+
REGISTER SCHEMA | |
INSTANCE | | +--------+
-------------------> | | |
[Management APIs] | Schema Registry ------ Schema |
| | | |
| | +--------+
| |
+----------------------------+
/ | \
+---/---+ +---|----+ +---\---+
| | | | | |
[Decoders] | Avro | |ProtoBuf| |Others |
| | | | | |
+-------+ +--------+ +-------+
```
- Register schema instance: adds a new instance of a schema of a
certain type. For example, when the user may have several Avro or
Protobuf schemas that they wish to use with different data flows.

View File

@ -0,0 +1,39 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-ifndef(EMQX_EE_SCHEMA_REGISTRY_HRL).
-define(EMQX_EE_SCHEMA_REGISTRY_HRL, true).
-define(CONF_KEY_ROOT, schema_registry).
-define(CONF_KEY_PATH, [?CONF_KEY_ROOT]).
-define(SCHEMA_REGISTRY_SHARD, emqx_ee_schema_registry_shard).
-define(SERDE_TAB, emqx_ee_schema_registry_serde_tab).
-type schema_name() :: binary().
-type schema_source() :: binary().
-type encoded_data() :: iodata().
-type decoded_data() :: map().
-type serializer() :: fun((decoded_data()) -> encoded_data()).
-type deserializer() :: fun((encoded_data()) -> decoded_data()).
-type destructor() :: fun(() -> ok).
-type serde_type() :: avro.
-type serde_opts() :: map().
-record(serde, {
name :: schema_name(),
serializer :: serializer(),
deserializer :: deserializer(),
destructor :: destructor()
}).
-type serde() :: #serde{}.
-type serde_map() :: #{
name := schema_name(),
serializer := serializer(),
deserializer := deserializer(),
destructor := destructor()
}.
-endif.

View File

@ -0,0 +1,12 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [
{emqx, {path, "../../apps/emqx"}},
{erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}}
]}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_ee_schema_registry]}
]}.

View File

@ -0,0 +1,15 @@
{application, emqx_ee_schema_registry, [
{description, "EMQX Schema Registry"},
{vsn, "0.1.0"},
{registered, [emqx_ee_schema_registry_sup]},
{mod, {emqx_ee_schema_registry_app, []}},
{applications, [
kernel,
stdlib,
erlavro
]},
{env, []},
{modules, []},
{links, []}
]}.

View File

@ -0,0 +1,253 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry).
-behaviour(gen_server).
-behaviour(emqx_config_handler).
-include("emqx_ee_schema_registry.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API
-export([
start_link/0,
get_serde/1,
add_schema/2,
get_schema/1,
delete_schema/1,
list_schemas/0
]).
%% `gen_server' API
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_continue/2,
terminate/2
]).
%% `emqx_config_handler' API
-export([post_config_update/5]).
-type schema() :: #{
type := serde_type(),
source := binary(),
description => binary()
}.
%%-------------------------------------------------------------------------------------------------
%% API
%%-------------------------------------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec get_serde(schema_name()) -> {ok, serde_map()} | {error, not_found}.
get_serde(SchemaName) ->
case ets:lookup(?SERDE_TAB, to_bin(SchemaName)) of
[] ->
{error, not_found};
[Serde] ->
{ok, serde_to_map(Serde)}
end.
-spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}.
get_schema(SchemaName) ->
case emqx_config:get([?CONF_KEY_ROOT, schemas, SchemaName], undefined) of
undefined ->
{error, not_found};
Config ->
{ok, Config}
end.
-spec add_schema(schema_name(), schema()) -> ok | {error, term()}.
add_schema(Name, Schema) ->
RawSchema = emqx_map_lib:binary_key_map(Schema),
Res = emqx_conf:update(
[?CONF_KEY_ROOT, schemas, Name],
RawSchema,
#{override_to => cluster}
),
case Res of
{ok, _} ->
ok;
Error ->
Error
end.
-spec delete_schema(schema_name()) -> ok | {error, term()}.
delete_schema(Name) ->
Res = emqx_conf:remove(
[?CONF_KEY_ROOT, schemas, Name],
#{override_to => cluster}
),
case Res of
{ok, _} ->
ok;
Error ->
Error
end.
-spec list_schemas() -> #{schema_name() => schema()}.
list_schemas() ->
emqx_config:get([?CONF_KEY_ROOT, schemas], #{}).
%%-------------------------------------------------------------------------------------------------
%% `emqx_config_handler' API
%%-------------------------------------------------------------------------------------------------
post_config_update(
[?CONF_KEY_ROOT, schemas] = _Path,
_Cmd,
NewConf = #{schemas := NewSchemas},
OldConf = #{},
_AppEnvs
) ->
OldSchemas = maps:get(schemas, OldConf, #{}),
#{
added := Added,
changed := Changed0,
removed := Removed
} = emqx_map_lib:diff_maps(NewSchemas, OldSchemas),
Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0),
RemovedNames = maps:keys(Removed),
case RemovedNames of
[] ->
ok;
_ ->
async_delete_serdes(RemovedNames)
end,
SchemasToBuild = maps:to_list(maps:merge(Changed, Added)),
case build_serdes(SchemasToBuild) of
ok ->
{ok, NewConf};
{error, Reason, SerdesToRollback} ->
lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
{error, Reason}
end;
post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) ->
{ok, NewConf}.
%%-------------------------------------------------------------------------------------------------
%% `gen_server' API
%%-------------------------------------------------------------------------------------------------
init(_) ->
process_flag(trap_exit, true),
create_tables(),
Schemas = emqx_conf:get([?CONF_KEY_ROOT, schemas], #{}),
State = #{},
{ok, State, {continue, {build_serdes, Schemas}}}.
handle_continue({build_serdes, Schemas}, State) ->
do_build_serdes(Schemas),
{noreply, State}.
handle_call(_Call, _From, State) ->
{reply, {error, unknown_call}, State}.
handle_cast({delete_serdes, Names}, State) ->
lists:foreach(fun ensure_serde_absent/1, Names),
?tp(schema_registry_serdes_deleted, #{}),
{noreply, State};
handle_cast({build_serdes, Schemas}, State) ->
do_build_serdes(Schemas),
{noreply, State};
handle_cast(_Cast, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
%%-------------------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------------------
create_tables() ->
ok = mria:create_table(?SERDE_TAB, [
{type, ordered_set},
{rlog_shard, ?SCHEMA_REGISTRY_SHARD},
{storage, ram_copies},
{record_name, serde},
{attributes, record_info(fields, serde)}
]),
ok = mria:wait_for_tables([?SERDE_TAB]),
ok.
do_build_serdes(Schemas) ->
%% TODO: use some kind of mutex to make each core build a
%% different serde to avoid duplicate work. Maybe ekka_locker?
maps:foreach(fun do_build_serde/2, Schemas),
?tp(schema_registry_serdes_built, #{}).
build_serdes(Serdes) ->
build_serdes(Serdes, []).
build_serdes([{Name, Params} | Rest], Acc0) ->
Acc = [Name | Acc0],
case do_build_serde(Name, Params) of
ok ->
build_serdes(Rest, Acc);
{error, Error} ->
{error, Error, Acc}
end;
build_serdes([], _Acc) ->
ok.
do_build_serde(Name0, #{type := Type, source := Source}) ->
try
Name = to_bin(Name0),
{Serializer, Deserializer, Destructor} =
emqx_ee_schema_registry_serde:make_serde(Type, Name, Source),
Serde = #serde{
name = Name,
serializer = Serializer,
deserializer = Deserializer,
destructor = Destructor
},
ok = mria:dirty_write(?SERDE_TAB, Serde),
ok
catch
Kind:Error:Stacktrace ->
?SLOG(
error,
#{
msg => "error_building_serde",
name => Name0,
type => Type,
kind => Kind,
error => Error,
stacktrace => Stacktrace
}
),
{error, Error}
end.
ensure_serde_absent(Name) ->
case get_serde(Name) of
{ok, #{destructor := Destructor}} ->
Destructor(),
ok = mria:dirty_delete(?SERDE_TAB, to_bin(Name));
{error, not_found} ->
ok
end.
async_delete_serdes(Names) ->
gen_server:cast(?MODULE, {delete_serdes, Names}).
to_bin(A) when is_atom(A) -> atom_to_binary(A);
to_bin(B) when is_binary(B) -> B.
-spec serde_to_map(serde()) -> serde_map().
serde_to_map(#serde{} = Serde) ->
#{
name => Serde#serde.name,
serializer => Serde#serde.serializer,
deserializer => Serde#serde.deserializer,
destructor => Serde#serde.destructor
}.

View File

@ -0,0 +1,19 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_app).
-behaviour(application).
-include("emqx_ee_schema_registry.hrl").
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity),
emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry),
emqx_ee_schema_registry_sup:start_link().
stop(_State) ->
emqx_conf:remove_handler(?CONF_KEY_PATH),
ok.

View File

@ -0,0 +1,251 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_http_api).
-behaviour(minirest_api).
-include("emqx_ee_schema_registry.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_api_lib.hrl").
-export([
namespace/0,
api_spec/0,
paths/0,
schema/1
]).
-export([
'/schema_registry'/2,
'/schema_registry/:name'/2
]).
%%-------------------------------------------------------------------------------------------------
%% `minirest' and `minirest_trails' API
%%-------------------------------------------------------------------------------------------------
namespace() -> "schema_registry_http_api".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
[
"/schema_registry",
"/schema_registry/:name"
].
schema("/schema_registry") ->
#{
'operationId' => '/schema_registry',
get => #{
tags => [<<"schema_registry">>],
summary => <<"List registered schemas">>,
description => ?DESC("desc_schema_registry_api_list"),
responses =>
#{
200 =>
emqx_dashboard_swagger:schema_with_examples(
hoconsc:array(emqx_ee_schema_registry_schema:api_schema("get")),
#{
sample =>
#{value => sample_list_schemas_response()}
}
)
}
},
post => #{
tags => [<<"schema_registry">>],
summary => <<"Register a new schema">>,
description => ?DESC("desc_schema_registry_api_post"),
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
emqx_ee_schema_registry_schema:api_schema("post"),
post_examples()
),
responses =>
#{
201 =>
emqx_dashboard_swagger:schema_with_examples(
emqx_ee_schema_registry_schema:api_schema("post"),
post_examples()
),
400 => error_schema('ALREADY_EXISTS', "Schema already exists")
}
}
};
schema("/schema_registry/:name") ->
#{
'operationId' => '/schema_registry/:name',
get => #{
tags => [<<"schema_registry">>],
summary => <<"Get registered schema">>,
description => ?DESC("desc_schema_registry_api_get"),
parameters => [param_path_schema_name()],
responses =>
#{
200 =>
emqx_dashboard_swagger:schema_with_examples(
emqx_ee_schema_registry_schema:api_schema("get"),
get_examples()
),
404 => error_schema('NOT_FOUND', "Schema not found")
}
},
put => #{
tags => [<<"schema_registry">>],
summary => <<"Update a schema">>,
description => ?DESC("desc_schema_registry_api_put"),
parameters => [param_path_schema_name()],
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
emqx_ee_schema_registry_schema:api_schema("put"),
post_examples()
),
responses =>
#{
200 =>
emqx_dashboard_swagger:schema_with_examples(
emqx_ee_schema_registry_schema:api_schema("put"),
put_examples()
),
404 => error_schema('NOT_FOUND', "Schema not found")
}
},
delete => #{
tags => [<<"schema_registry">>],
summary => <<"Delete registered schema">>,
description => ?DESC("desc_schema_registry_api_delete"),
parameters => [param_path_schema_name()],
responses =>
#{
204 => <<"Schema deleted">>,
404 => error_schema('NOT_FOUND', "Schema not found")
}
}
}.
%%-------------------------------------------------------------------------------------------------
%% API
%%-------------------------------------------------------------------------------------------------
'/schema_registry'(get, _Params) ->
Schemas = emqx_ee_schema_registry:list_schemas(),
Response =
lists:map(
fun({Name, Params}) ->
Params#{name => Name}
end,
maps:to_list(Schemas)
),
?OK(Response);
'/schema_registry'(post, #{body := Params0 = #{<<"name">> := Name}}) ->
Params = maps:without([<<"name">>], Params0),
case emqx_ee_schema_registry:get_schema(Name) of
{error, not_found} ->
case emqx_ee_schema_registry:add_schema(Name, Params) of
ok ->
{ok, Res} = emqx_ee_schema_registry:get_schema(Name),
{201, Res#{name => Name}};
{error, Error} ->
?BAD_REQUEST(Error)
end;
{ok, _} ->
?BAD_REQUEST('ALREADY_EXISTS', <<"Schema already exists">>)
end.
'/schema_registry/:name'(get, #{bindings := #{name := Name}}) ->
case emqx_ee_schema_registry:get_schema(Name) of
{error, not_found} ->
?NOT_FOUND(<<"Schema not found">>);
{ok, Schema} ->
?OK(Schema#{name => Name})
end;
'/schema_registry/:name'(put, #{bindings := #{name := Name}, body := Params}) ->
case emqx_ee_schema_registry:get_schema(Name) of
{error, not_found} ->
?NOT_FOUND(<<"Schema not found">>);
{ok, _} ->
case emqx_ee_schema_registry:add_schema(Name, Params) of
ok ->
{ok, Res} = emqx_ee_schema_registry:get_schema(Name),
?OK(Res#{name => Name});
{error, Error} ->
?BAD_REQUEST(Error)
end
end;
'/schema_registry/:name'(delete, #{bindings := #{name := Name}}) ->
case emqx_ee_schema_registry:get_schema(Name) of
{error, not_found} ->
?NOT_FOUND(<<"Schema not found">>);
{ok, _} ->
case emqx_ee_schema_registry:delete_schema(Name) of
ok ->
?NO_CONTENT;
{error, Error} ->
?BAD_REQUEST(Error)
end
end.
%%-------------------------------------------------------------------------------------------------
%% Examples
%%-------------------------------------------------------------------------------------------------
sample_list_schemas_response() ->
[sample_get_schema_response(avro)].
sample_get_schema_response(avro) ->
#{
type => <<"avro">>,
name => <<"my_avro_schema">>,
description => <<"My Avro Schema">>,
source => <<
"{\"type\":\"record\","
"\"fields\":[{\"type\":\"int\",\"name\":\"i\"},"
"{\"type\":\"string\",\"name\":\"s\"}]}"
>>
}.
put_examples() ->
post_examples().
post_examples() ->
get_examples().
get_examples() ->
#{
<<"avro_schema">> =>
#{
summary => <<"Avro">>,
value => sample_get_schema_response(avro)
}
}.
%%-------------------------------------------------------------------------------------------------
%% Schemas and hocon types
%%-------------------------------------------------------------------------------------------------
param_path_schema_name() ->
{name,
mk(
binary(),
#{
in => path,
required => true,
example => <<"my_schema">>,
desc => ?DESC("desc_param_path_schema_name")
}
)}.
%%-------------------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------------------
mk(Type, Meta) -> hoconsc:mk(Type, Meta).
error_schema(Code, Message) when is_atom(Code) ->
error_schema([Code], Message);
error_schema(Codes, Message) when is_list(Message) ->
error_schema(Codes, list_to_binary(Message));
error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) ->
emqx_dashboard_swagger:error_codes(Codes, Message).

View File

@ -0,0 +1,127 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_ee_schema_registry.hrl").
%% `hocon_schema' API
-export([
roots/0,
fields/1,
desc/1,
tags/0,
union_member_selector/1
]).
%% `minirest_trails' API
-export([
api_schema/1
]).
%%------------------------------------------------------------------------------
%% `hocon_schema' APIs
%%------------------------------------------------------------------------------
roots() ->
[{?CONF_KEY_ROOT, mk(ref(?CONF_KEY_ROOT), #{required => false})}].
tags() ->
[<<"Schema Registry">>].
fields(?CONF_KEY_ROOT) ->
[
{schemas,
mk(
hoconsc:map(
name,
hoconsc:union(fun union_member_selector/1)
),
#{
default => #{},
desc => ?DESC("schema_registry_schemas")
}
)}
];
fields(avro) ->
[
{type, mk(avro, #{required => true, desc => ?DESC("schema_type")})},
{source,
mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})},
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
];
fields("get_avro") ->
[{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)];
fields("put_avro") ->
fields(avro);
fields("post_" ++ Type) ->
fields("get_" ++ Type).
desc(?CONF_KEY_ROOT) ->
?DESC("schema_registry_root");
desc(avro) ->
?DESC("avro_type");
desc(_) ->
undefined.
union_member_selector(all_union_members) ->
refs();
union_member_selector({value, V}) ->
refs(V).
union_member_selector_get_api(all_union_members) ->
refs_get_api();
union_member_selector_get_api({value, V}) ->
refs_get_api(V).
%%------------------------------------------------------------------------------
%% `minirest_trails' "APIs"
%%------------------------------------------------------------------------------
api_schema("get") ->
hoconsc:union(fun union_member_selector_get_api/1);
api_schema("post") ->
api_schema("get");
api_schema("put") ->
hoconsc:union(fun union_member_selector/1).
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
mk(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Name) -> hoconsc:ref(?MODULE, Name).
supported_serde_types() ->
[avro].
refs() ->
[ref(Type) || Type <- supported_serde_types()].
refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
refs(#{<<"type">> := <<"avro">>}) ->
[ref(avro)];
refs(_) ->
Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
throw(#{
field_name => type,
expected => Expected
}).
refs_get_api() ->
[ref("get_avro")].
refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
refs_get_api(#{<<"type">> := <<"avro">>}) ->
[ref("get_avro")];
refs_get_api(_) ->
Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
throw(#{
field_name => type,
expected => Expected
}).

View File

@ -0,0 +1,70 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_serde).
-include("emqx_ee_schema_registry.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API
-export([
decode/2,
decode/3,
encode/2,
encode/3,
make_serde/3
]).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
-spec decode(schema_name(), encoded_data()) -> decoded_data().
decode(SerdeName, RawData) ->
decode(SerdeName, RawData, []).
-spec decode(schema_name(), encoded_data(), [term()]) -> decoded_data().
decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) ->
case emqx_ee_schema_registry:get_serde(SerdeName) of
{error, not_found} ->
error({serde_not_found, SerdeName});
{ok, #{deserializer := Deserializer}} ->
apply(Deserializer, [RawData | VarArgs])
end.
-spec encode(schema_name(), decoded_data()) -> encoded_data().
encode(SerdeName, RawData) ->
encode(SerdeName, RawData, []).
-spec encode(schema_name(), decoded_data(), [term()]) -> encoded_data().
encode(SerdeName, EncodedData, VarArgs) when is_list(VarArgs) ->
case emqx_ee_schema_registry:get_serde(SerdeName) of
{error, not_found} ->
error({serde_not_found, SerdeName});
{ok, #{serializer := Serializer}} ->
apply(Serializer, [EncodedData | VarArgs])
end.
-spec make_serde(serde_type(), schema_name(), schema_source()) ->
{serializer(), deserializer(), destructor()}.
make_serde(avro, Name, Source0) ->
Source = inject_avro_name(Name, Source0),
Serializer = avro:make_simple_encoder(Source, _Opts = []),
Deserializer = avro:make_simple_decoder(Source, [{map_type, map}, {record_type, map}]),
Destructor = fun() ->
?tp(serde_destroyed, #{type => avro, name => Name}),
ok
end,
{Serializer, Deserializer, Destructor}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
-spec inject_avro_name(schema_name(), schema_source()) -> schema_source().
inject_avro_name(Name, Source0) ->
%% The schema checks that the source is a valid JSON when
%% typechecking, so we shouldn't need to validate here.
Schema0 = emqx_json:decode(Source0, [return_maps]),
Schema = Schema0#{<<"name">> => Name},
emqx_json:encode(Schema).

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
%% sup_flags() = #{strategy => strategy(), % optional
%% intensity => non_neg_integer(), % optional
%% period => pos_integer()} % optional
%% child_spec() = #{id => child_id(), % mandatory
%% start => mfargs(), % mandatory
%% restart => restart(), % optional
%% shutdown => shutdown(), % optional
%% type => worker(), % optional
%% modules => modules()} % optional
init([]) ->
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 100
},
ChildSpecs = [child_spec(emqx_ee_schema_registry)],
{ok, {SupFlags, ChildSpecs}}.
child_spec(Mod) ->
#{
id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 5_000,
type => worker,
modules => [Mod]
}.

View File

@ -0,0 +1,433 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_ee_schema_registry.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(APPS, [emqx_conf, emqx_rule_engine, emqx_ee_schema_registry]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
[{group, avro}].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[{avro, TCs}].
init_per_suite(Config) ->
emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
emqx_mgmt_api_test_util:init_suite(?APPS),
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)),
ok.
init_per_group(avro, Config) ->
[{serde_type, avro} | Config];
init_per_group(_Group, Config) ->
Config.
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(_TestCase, Config) ->
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(_TestCase, _Config) ->
ok = snabbkaffe:stop(),
emqx_common_test_helpers:call_janitor(),
clear_schemas(),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
trace_rule(Data, Envs, _Args) ->
Now = erlang:monotonic_time(),
ets:insert(recorded_actions, {Now, #{data => Data, envs => Envs}}),
TestPid = persistent_term:get({?MODULE, test_pid}),
TestPid ! {action, #{data => Data, envs => Envs}},
ok.
make_trace_fn_action() ->
persistent_term:put({?MODULE, test_pid}, self()),
Fn = <<(atom_to_binary(?MODULE))/binary, ":trace_rule">>,
emqx_tables:new(recorded_actions, [named_table, public, ordered_set]),
#{function => Fn, args => #{}}.
create_rule_http(RuleParams) ->
RepublishTopic = <<"republish/schema_registry">>,
emqx:subscribe(RepublishTopic),
DefaultParams = #{
enable => true,
actions => [
make_trace_fn_action(),
#{
<<"function">> => <<"republish">>,
<<"args">> =>
#{
<<"topic">> => RepublishTopic,
<<"payload">> => <<>>,
<<"qos">> => 0,
<<"retain">> => false,
<<"user_properties">> => <<>>
}
}
]
},
Params = maps:merge(DefaultParams, RuleParams),
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
{ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
Error -> Error
end.
schema_params(avro) ->
Source = #{
type => record,
fields => [
#{name => <<"i">>, type => <<"int">>},
#{name => <<"s">>, type => <<"string">>}
]
},
SourceBin = emqx_json:encode(Source),
#{type => avro, source => SourceBin}.
create_serde(SerdeType, SerdeName) ->
Schema = schema_params(SerdeType),
ok = emqx_ee_schema_registry:add_schema(SerdeName, Schema),
ok.
sql_for(avro, encode_decode1) ->
<<
"select\n"
" schema_decode('my_serde',\n"
" schema_encode('my_serde', json_decode(payload))) as decoded,\n"
" decoded.i as decoded_int,\n"
" decoded.s as decoded_string\n"
" from t"
>>;
sql_for(avro, encode1) ->
<<
"select\n"
" schema_encode('my_serde', json_decode(payload)) as encoded\n"
" from t"
>>;
sql_for(avro, decode1) ->
<<
"select\n"
" schema_decode('my_serde', payload) as decoded\n"
" from t"
>>;
sql_for(Type, Name) ->
ct:fail("unimplemented: ~p", [{Type, Name}]).
clear_schemas() ->
maps:foreach(
fun(Name, _Schema) ->
ok = emqx_ee_schema_registry:delete_schema(Name)
end,
emqx_ee_schema_registry:list_schemas()
).
receive_action_results() ->
receive
{action, #{data := _} = Res} ->
Res
after 1_000 ->
ct:fail("action didn't run")
end.
receive_published(Line) ->
receive
{deliver, _Topic, Msg} ->
MsgMap = emqx_message:to_map(Msg),
maps:update_with(
payload,
fun(Raw) ->
case emqx_json:safe_decode(Raw, [return_maps]) of
{ok, Decoded} -> Decoded;
{error, _} -> Raw
end
end,
MsgMap
)
after 1_000 ->
ct:pal("mailbox: ~p", [process_info(self(), messages)]),
ct:fail("publish not received, line ~b", [Line])
end.
cluster(Config) ->
PrivDataDir = ?config(priv_dir, Config),
PeerModule =
case os:getenv("IS_CI") of
false ->
slave;
_ ->
ct_slave
end,
Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core],
[
{apps, ?APPS},
{listener_ports, []},
{peer_mod, PeerModule},
{priv_data_dir, PrivDataDir},
{load_schema, true},
{start_autocluster, true},
{schema_mod, emqx_ee_conf_schema},
%% need to restart schema registry app in the tests so
%% that it re-registers the config handler that is lost
%% when emqx_conf restarts during join.
{env, [{emqx_machine, applications, [emqx_ee_schema_registry]}]},
{load_apps, [emqx_machine | ?APPS]},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, [broker, router]),
ok;
(emqx_conf) ->
ok;
(_) ->
ok
end}
]
),
ct:pal("cluster:\n ~p", [Cluster]),
Cluster.
start_cluster(Cluster) ->
Nodes = [
emqx_common_test_helpers:start_slave(Name, Opts)
|| {Name, Opts} <- Cluster
],
on_exit(fun() ->
emqx_misc:pmap(
fun(N) ->
ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_slave(N)
end,
Nodes
)
end),
erpc:multicall(Nodes, mria_rlog, wait_for_shards, [[?SCHEMA_REGISTRY_SHARD], 30_000]),
Nodes.
wait_for_cluster_rpc(Node) ->
%% need to wait until the config handler is ready after
%% restarting during the cluster join.
?retry(
_Sleep0 = 100,
_Attempts0 = 50,
true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler]))
).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_unknown_calls(_Config) ->
Ref = monitor(process, emqx_ee_schema_registry),
%% for coverage
emqx_ee_schema_registry ! unknown,
gen_server:cast(emqx_ee_schema_registry, unknown),
?assertEqual({error, unknown_call}, gen_server:call(emqx_ee_schema_registry, unknown)),
receive
{'DOWN', Ref, process, _, _} ->
ct:fail("registry shouldn't have died")
after 500 ->
ok
end.
t_encode_decode(Config) ->
SerdeType = ?config(serde_type, Config),
SerdeName = my_serde,
ok = create_serde(SerdeType, SerdeName),
{ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, encode_decode1)}),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
PayloadBin = emqx_json:encode(Payload),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Res = receive_action_results(),
?assertMatch(
#{
data :=
#{
<<"decoded">> :=
#{
<<"i">> := 10,
<<"s">> := <<"text">>
},
<<"decoded_int">> := 10,
<<"decoded_string">> := <<"text">>
}
},
Res
),
ok.
t_delete_serde(Config) ->
SerdeType = ?config(serde_type, Config),
SerdeName = my_serde,
?check_trace(
begin
ok = create_serde(SerdeType, SerdeName),
{ok, {ok, _}} =
?wait_async_action(
emqx_ee_schema_registry:delete_schema(SerdeName),
#{?snk_kind := schema_registry_serdes_deleted},
1_000
),
ok
end,
fun(Trace) ->
?assertMatch([_], ?of_kind(schema_registry_serdes_deleted, Trace)),
?assertMatch([#{type := SerdeType}], ?of_kind(serde_destroyed, Trace)),
ok
end
),
ok.
t_encode(Config) ->
SerdeType = ?config(serde_type, Config),
SerdeName = my_serde,
ok = create_serde(SerdeType, SerdeName),
{ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, encode1)}),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
PayloadBin = emqx_json:encode(Payload),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Published = receive_published(?LINE),
?assertMatch(
#{payload := #{<<"encoded">> := _}},
Published
),
#{payload := #{<<"encoded">> := Encoded}} = Published,
{ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
?assertEqual(Payload, Deserializer(Encoded)),
ok.
t_decode(Config) ->
SerdeType = ?config(serde_type, Config),
SerdeName = my_serde,
ok = create_serde(SerdeType, SerdeName),
{ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, decode1)}),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
{ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
EncodedBin = Serializer(Payload),
emqx:publish(emqx_message:make(<<"t">>, EncodedBin)),
Published = receive_published(?LINE),
?assertMatch(
#{payload := #{<<"decoded">> := _}},
Published
),
#{payload := #{<<"decoded">> := Decoded}} = Published,
?assertEqual(Payload, Decoded),
ok.
t_fail_rollback(Config) ->
SerdeType = ?config(serde_type, Config),
OkSchema = emqx_map_lib:binary_key_map(schema_params(SerdeType)),
BrokenSchema = OkSchema#{<<"source">> := <<"{}">>},
%% hopefully, for this small map, the key order is used.
Serdes = #{
<<"a">> => OkSchema,
<<"z">> => BrokenSchema
},
?assertMatch(
{error, _},
emqx_conf:update(
[?CONF_KEY_ROOT, schemas],
Serdes,
#{}
)
),
%% no serdes should be in the table
?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"a">>)),
?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"z">>)),
ok.
t_cluster_serde_build(Config) ->
SerdeType = ?config(serde_type, Config),
Cluster = cluster(Config),
SerdeName = my_serde,
Schema = schema_params(SerdeType),
?check_trace(
begin
Nodes = [N1, N2 | _] = start_cluster(Cluster),
NumNodes = length(Nodes),
wait_for_cluster_rpc(N2),
?assertEqual(
ok,
erpc:call(N2, emqx_ee_schema_registry, add_schema, [SerdeName, Schema])
),
%% check that we can serialize/deserialize in all nodes
lists:foreach(
fun(N) ->
erpc:call(N, fun() ->
Res0 = emqx_ee_schema_registry:get_serde(SerdeName),
?assertMatch({ok, #{}}, Res0, #{node => N}),
{ok, #{serializer := Serializer, deserializer := Deserializer}} = Res0,
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
?assertEqual(Payload, Deserializer(Serializer(Payload)), #{node => N}),
ok
end)
end,
Nodes
),
%% now we delete and check it's removed from the table
?tp(will_delete_schema, #{}),
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := schema_registry_serdes_deleted}),
NumNodes,
5_000
),
?assertEqual(
ok,
erpc:call(N1, emqx_ee_schema_registry, delete_schema, [SerdeName])
),
{ok, _} = snabbkaffe:receive_events(SRef1),
lists:foreach(
fun(N) ->
erpc:call(N, fun() ->
?assertMatch(
{error, not_found},
emqx_ee_schema_registry:get_serde(SerdeName),
#{node => N}
),
ok
end)
end,
Nodes
),
ok
end,
fun(Trace) ->
?assert(
?strict_causality(
#{?snk_kind := will_delete_schema},
#{?snk_kind := serde_destroyed, type := SerdeType},
Trace
)
),
ok
end
),
ok.

View File

@ -0,0 +1,250 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_http_api_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_mgmt_api_test_util, [uri/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(APPS, [emqx_conf, emqx_ee_schema_registry]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
emqx_mgmt_api_test_util:init_suite(?APPS),
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)),
ok.
init_per_testcase(_TestCase, Config) ->
clear_schemas(),
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(_TestCase, _Config) ->
clear_schemas(),
ok = snabbkaffe:stop(),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
request(get) ->
do_request(get, _Parts = [], _Body = []);
request({get, Name}) ->
do_request(get, _Parts = [Name], _Body = []);
request({delete, Name}) ->
do_request(delete, _Parts = [Name], _Body = []);
request({put, Name, Params}) ->
do_request(put, _Parts = [Name], Params);
request({post, Params}) ->
do_request(post, _Parts = [], Params).
do_request(Method, PathParts, Body) ->
Header = emqx_common_test_http:default_auth_header(),
URI = uri(["schema_registry" | PathParts]),
Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
Res0 = emqx_mgmt_api_test_util:request_api(Method, URI, [], Header, Body, Opts),
case Res0 of
{ok, Code, <<>>} ->
{ok, Code, <<>>};
{ok, Code, Res1} ->
Res2 = emqx_json:decode(Res1, [return_maps]),
Res3 = try_decode_error_message(Res2),
{ok, Code, Res3};
Error ->
Error
end.
try_decode_error_message(#{<<"message">> := Msg0} = Res0) ->
case emqx_json:safe_decode(Msg0, [return_maps]) of
{ok, Msg} ->
Res0#{<<"message">> := Msg};
{error, _} ->
Res0
end;
try_decode_error_message(Res) ->
Res.
clear_schemas() ->
maps:foreach(
fun(Name, _Schema) ->
ok = emqx_ee_schema_registry:delete_schema(Name)
end,
emqx_ee_schema_registry:list_schemas()
).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_crud(_Config) ->
SchemaName = <<"my_avro_schema">>,
Source = #{
type => record,
fields => [
#{name => <<"i">>, type => <<"int">>},
#{name => <<"s">>, type => <<"string">>}
]
},
SourceBin = emqx_json:encode(Source),
Params = #{
<<"type">> => <<"avro">>,
<<"source">> => SourceBin,
<<"name">> => SchemaName,
<<"description">> => <<"My schema">>
},
UpdateParams = maps:without([<<"name">>], Params),
%% no schemas at first
?assertMatch({ok, 200, []}, request(get)),
?assertMatch(
{ok, 404, #{
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := <<"Schema not found">>
}},
request({get, SchemaName})
),
?assertMatch(
{ok, 404, #{
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := <<"Schema not found">>
}},
request({put, SchemaName, UpdateParams})
),
?assertMatch(
{ok, 404, #{
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := <<"Schema not found">>
}},
request({delete, SchemaName})
),
%% create a schema
?assertMatch(
{ok, 201, #{
<<"type">> := <<"avro">>,
<<"source">> := SourceBin,
<<"name">> := SchemaName,
<<"description">> := <<"My schema">>
}},
request({post, Params})
),
?assertMatch(
{ok, 200, #{
<<"type">> := <<"avro">>,
<<"source">> := SourceBin,
<<"name">> := SchemaName,
<<"description">> := <<"My schema">>
}},
request({get, SchemaName})
),
?assertMatch(
{ok, 200, [
#{
<<"type">> := <<"avro">>,
<<"source">> := SourceBin,
<<"name">> := SchemaName,
<<"description">> := <<"My schema">>
}
]},
request(get)
),
UpdateParams1 = UpdateParams#{<<"description">> := <<"My new schema">>},
?assertMatch(
{ok, 200, #{
<<"type">> := <<"avro">>,
<<"source">> := SourceBin,
<<"name">> := SchemaName,
<<"description">> := <<"My new schema">>
}},
request({put, SchemaName, UpdateParams1})
),
?assertMatch(
{ok, 400, #{
<<"code">> := <<"ALREADY_EXISTS">>,
<<"message">> := <<"Schema already exists">>
}},
request({post, Params})
),
%% typechecks, but is invalid
?assertMatch(
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> :=
<<"{post_config_update,emqx_ee_schema_registry,{not_found,<<\"type\">>}}">>
}},
request({put, SchemaName, UpdateParams#{<<"source">> := <<"{}">>}})
),
?assertMatch(
{ok, 204, <<>>},
request({delete, SchemaName})
),
%% doesn't typecheck
lists:foreach(
fun(Field) ->
?assertMatch(
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> := #{<<"reason">> := <<"required_field">>}
}},
request({post, maps:without([Field], Params)}),
#{field => Field}
)
end,
[<<"name">>, <<"source">>]
),
?assertMatch(
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> :=
#{
<<"expected">> := [_ | _],
<<"field_name">> := <<"type">>
}
}},
request({post, maps:without([<<"type">>], Params)}),
#{field => <<"type">>}
),
%% typechecks, but is invalid
?assertMatch(
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> :=
<<"{post_config_update,emqx_ee_schema_registry,{not_found,<<\"type\">>}}">>
}},
request({post, Params#{<<"source">> := <<"{}">>}})
),
%% unknown serde type
?assertMatch(
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> :=
#{
<<"expected">> := [_ | _],
<<"field_name">> := <<"type">>
}
}},
request({post, Params#{<<"type">> := <<"foo">>}})
),
ok.

View File

@ -0,0 +1,121 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_serde_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_ee_schema_registry.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(APPS, [emqx_conf, emqx_rule_engine, emqx_ee_schema_registry]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
emqx_mgmt_api_test_util:init_suite(?APPS),
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)),
ok.
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
emqx_common_test_helpers:call_janitor(),
clear_schemas(),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
clear_schemas() ->
maps:foreach(
fun(Name, _Schema) ->
ok = emqx_ee_schema_registry:delete_schema(Name)
end,
emqx_ee_schema_registry:list_schemas()
).
schema_params(avro) ->
Source = #{
type => record,
fields => [
#{name => <<"i">>, type => <<"int">>},
#{name => <<"s">>, type => <<"string">>}
]
},
SourceBin = emqx_json:encode(Source),
#{type => avro, source => SourceBin}.
assert_roundtrip(SerdeName, Original) ->
Encoded = emqx_ee_schema_registry_serde:encode(SerdeName, Original),
Decoded = emqx_ee_schema_registry_serde:decode(SerdeName, Encoded),
?assertEqual(Original, Decoded, #{original => Original}).
assert_roundtrip(SerdeName, Original, ArgsSerialize, ArgsDeserialize) ->
Encoded = emqx_ee_schema_registry_serde:encode(SerdeName, Original, ArgsSerialize),
Decoded = emqx_ee_schema_registry_serde:decode(SerdeName, Encoded, ArgsDeserialize),
?assertEqual(Original, Decoded, #{original => Original}).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_roundtrip_avro(_Config) ->
SerdeName = my_serde,
Params = schema_params(avro),
ok = emqx_ee_schema_registry:add_schema(SerdeName, Params),
Original = #{<<"i">> => 10, <<"s">> => <<"hi">>},
%% for coverage
assert_roundtrip(SerdeName, Original, _ArgsSerialize = [], _ArgsDeserialize = []),
assert_roundtrip(SerdeName, Original),
ok.
t_avro_invalid_json_schema(_Config) ->
SerdeName = my_serde,
Params = schema_params(avro),
WrongParams = Params#{source := <<"{">>},
?assertMatch(
{error, #{reason := #{expected_type := _}}},
emqx_ee_schema_registry:add_schema(SerdeName, WrongParams)
),
ok.
t_avro_invalid_schema(_Config) ->
SerdeName = my_serde,
Params = schema_params(avro),
WrongParams = Params#{source := <<"{}">>},
?assertMatch(
{error, {post_config_update, _, {not_found, <<"type">>}}},
emqx_ee_schema_registry:add_schema(SerdeName, WrongParams)
),
ok.
t_serde_not_found(_Config) ->
%% for coverage
NonexistentSerde = <<"nonexistent">>,
Original = #{},
?assertError(
{serde_not_found, NonexistentSerde},
emqx_ee_schema_registry_serde:encode(NonexistentSerde, Original)
),
?assertError(
{serde_not_found, NonexistentSerde},
emqx_ee_schema_registry_serde:decode(NonexistentSerde, Original)
),
ok.

View File

@ -83,6 +83,8 @@ defmodule EMQXUmbrella.MixProject do
# in conflict by emqx and observer_cli
{:recon, github: "ferd/recon", tag: "2.5.1", override: true},
{:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true},
# in conflict by erlavro and rocketmq
{:jsone, github: "emqx/jsone", tag: "1.7.1", override: true},
# dependencies of dependencies; we choose specific refs to match
# what rebar3 chooses.
# in conflict by gun and emqtt
@ -317,7 +319,8 @@ defmodule EMQXUmbrella.MixProject do
emqx_license: :permanent,
emqx_ee_conf: :load,
emqx_ee_connector: :permanent,
emqx_ee_bridge: :permanent
emqx_ee_bridge: :permanent,
emqx_ee_schema_registry: :permanent
],
else: []
)

View File

@ -81,6 +81,8 @@
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
, {telemetry, "1.1.0"}
, {hackney, {git, "https://github.com/emqx/hackney.git", {tag, "1.18.1-1"}}}
%% in conflict by erlavro and rocketmq
, {jsone, {git, "https://github.com/emqx/jsone.git", {tag, "1.7.1"}}}
]}.
{xref_ignores,

View File

@ -427,7 +427,8 @@ relx_apps_per_edition(ee) ->
emqx_license,
{emqx_ee_conf, load},
emqx_ee_connector,
emqx_ee_bridge
emqx_ee_bridge,
emqx_ee_schema_registry
];
relx_apps_per_edition(ce) ->
[].

View File

@ -57,7 +57,7 @@ emqx_bridge_api {
desc_api1 {
desc {
en: """List all created bridges"""
zh: """列出所有 Birdge"""
zh: """列出所有 Bridge"""
}
label: {
en: "List All Bridges"

View File

@ -0,0 +1,69 @@
emqx_ee_schema_registry_http_api {
# apis
desc_schema_registry_api_list {
desc {
en: "List all registered schemas"
zh: "列出所有注册的模式"
}
label {
en: "List schemas"
zh: "列表模式"
}
}
desc_schema_registry_api_get {
desc {
en: "Get a schema by its name"
zh: "通过名称获取模式"
}
label {
en: "Get schema"
zh: "获取模式"
}
}
desc_schema_registry_api_post {
desc {
en: "Register a new schema"
zh: "注册一个新的模式"
}
label {
en: "Register schema"
zh: "注册模式"
}
}
desc_schema_registry_api_put {
desc {
en: "Update an existing schema"
zh: "更新一个现有的模式"
}
label {
en: "Update schema"
zh: "更新模式"
}
}
desc_schema_registry_api_delete {
desc {
en: "Delete a schema"
zh: "删除一个模式"
}
label {
en: "Delete schema"
zh: "删除模式"
}
}
# params
desc_param_path_schema_name {
desc {
en: "The schema name"
zh: "模式名称"
}
label {
en: "Schema name"
zh: "模式名称"
}
}
}

View File

@ -0,0 +1,78 @@
emqx_ee_schema_registry_schema {
schema_registry_root {
desc {
en: "Schema registry configurations."
zh: "模式注册表的配置。"
}
label {
en: "Schema registry"
zh: "模式注册表"
}
}
schema_registry_schemas {
desc {
en: "Registered schemas."
zh: "注册的模式。"
}
label {
en: "Registered schemas"
zh: "注册的模式"
}
}
schema_name {
desc {
en: "A name for the schema that will serve as its identifier."
zh: "模式的一个名称,将作为其标识符。"
}
label {
en: "Schema name"
zh: "模式名称"
}
}
schema_type {
desc {
en: "Schema type."
zh: "模式类型。"
}
label {
en: "Schema type"
zh: "模式类型"
}
}
schema_source {
desc {
en: "Source text for the schema."
zh: "模式的源文本。"
}
label {
en: "Schema source"
zh: "模式来源"
}
}
schema_description {
desc {
en: "A description for this schema."
zh: "对该模式的描述。"
}
label {
en: "Schema description"
zh: "模式描述"
}
}
avro_type {
desc {
en: "[Apache Avro](https://avro.apache.org/) serialization format."
zh: "[阿帕奇-阿夫罗](https://avro.apache.org/) 序列化格式。"
}
label {
en: "Apache Avro"
zh: "阿帕奇-阿夫罗"
}
}
}

View File

@ -1,6 +1,7 @@
ACL
AES
APIs
Avro
BPAPI
BSON
Backplane