fix: problems found by @thalesmg in review
This commit is contained in:
parent
714363bd01
commit
6c2185bed7
|
@ -38,11 +38,6 @@
|
||||||
import_config/1
|
import_config/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% For testing
|
|
||||||
-export([
|
|
||||||
ensure_serde_absent/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-type schema() :: #{
|
-type schema() :: #{
|
||||||
type := serde_type(),
|
type := serde_type(),
|
||||||
source := binary(),
|
source := binary(),
|
||||||
|
@ -248,7 +243,7 @@ do_build_serdes(Schemas) ->
|
||||||
maybe_build_sparkplug_b_serde() ->
|
maybe_build_sparkplug_b_serde() ->
|
||||||
case get_schema(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME) of
|
case get_schema(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME) of
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
do_build_serde_no_check(
|
do_build_serde(
|
||||||
?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME,
|
?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME,
|
||||||
#{
|
#{
|
||||||
type => protobuf,
|
type => protobuf,
|
||||||
|
@ -293,15 +288,7 @@ build_serdes([], _Acc) ->
|
||||||
|
|
||||||
do_build_serde(Name, Serde) when not is_binary(Name) ->
|
do_build_serde(Name, Serde) when not is_binary(Name) ->
|
||||||
do_build_serde(to_bin(Name), Serde);
|
do_build_serde(to_bin(Name), Serde);
|
||||||
do_build_serde(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, _Serde) ->
|
do_build_serde(Name, #{type := Type, source := Source}) ->
|
||||||
{error,
|
|
||||||
erlang:iolist_to_binary(
|
|
||||||
io_lib:format("Illigal schema name ~s", [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME])
|
|
||||||
)};
|
|
||||||
do_build_serde(Name, Serde) ->
|
|
||||||
do_build_serde_no_check(Name, Serde).
|
|
||||||
|
|
||||||
do_build_serde_no_check(Name, #{type := Type, source := Source}) ->
|
|
||||||
try
|
try
|
||||||
{Serializer, Deserializer, Destructor} =
|
{Serializer, Deserializer, Destructor} =
|
||||||
emqx_ee_schema_registry_serde:make_serde(Type, Name, Source),
|
emqx_ee_schema_registry_serde:make_serde(Type, Name, Source),
|
||||||
|
@ -331,8 +318,6 @@ do_build_serde_no_check(Name, #{type := Type, source := Source}) ->
|
||||||
|
|
||||||
ensure_serde_absent(Name) when not is_binary(Name) ->
|
ensure_serde_absent(Name) when not is_binary(Name) ->
|
||||||
ensure_serde_absent(to_bin(Name));
|
ensure_serde_absent(to_bin(Name));
|
||||||
% ensure_serde_absent(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME) ->
|
|
||||||
% {error, <<"Cannot delete serde for Sparkplug B schema">>};
|
|
||||||
ensure_serde_absent(Name) ->
|
ensure_serde_absent(Name) ->
|
||||||
case get_serde(Name) of
|
case get_serde(Name) of
|
||||||
{ok, #{destructor := Destructor}} ->
|
{ok, #{destructor := Destructor}} ->
|
||||||
|
|
|
@ -42,7 +42,8 @@ fields(?CONF_KEY_ROOT) ->
|
||||||
),
|
),
|
||||||
#{
|
#{
|
||||||
default => #{},
|
default => #{},
|
||||||
desc => ?DESC("schema_registry_schemas")
|
desc => ?DESC("schema_registry_schemas"),
|
||||||
|
validator => fun validate_name/1
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
|
@ -89,6 +90,15 @@ union_member_selector_get_api(all_union_members) ->
|
||||||
union_member_selector_get_api({value, V}) ->
|
union_member_selector_get_api({value, V}) ->
|
||||||
refs_get_api(V).
|
refs_get_api(V).
|
||||||
|
|
||||||
|
validate_name(NameSchemaMap) ->
|
||||||
|
case maps:is_key(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, NameSchemaMap) of
|
||||||
|
true ->
|
||||||
|
{error,
|
||||||
|
<<"Illegal schema name ", ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME/binary>>};
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% `minirest_trails' "APIs"
|
%% `minirest_trails' "APIs"
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -790,18 +790,11 @@ sparkplug_example_data() ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
wait_for_sparkplug_schema_registered() ->
|
wait_for_sparkplug_schema_registered() ->
|
||||||
wait_for_sparkplug_schema_registered(100).
|
?retry(
|
||||||
|
100,
|
||||||
wait_for_sparkplug_schema_registered(0) ->
|
100,
|
||||||
ct:fail("Timed out waiting for sparkplug schema to be registered");
|
[_] = ets:lookup(?SERDE_TAB, ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME)
|
||||||
wait_for_sparkplug_schema_registered(AttemptsLeft) ->
|
).
|
||||||
case ets:info(?SERDE_TAB, size) of
|
|
||||||
0 ->
|
|
||||||
timer:sleep(100),
|
|
||||||
wait_for_sparkplug_schema_registered(AttemptsLeft - 1);
|
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
t_sparkplug_decode(_Config) ->
|
t_sparkplug_decode(_Config) ->
|
||||||
SQL =
|
SQL =
|
||||||
|
|
Loading…
Reference in New Issue