Merge pull request #11870 from zmstone/1102-add-description-field-to-bridge-and-connector

feat(bridge): add description field to bridge and connector
This commit is contained in:
Zaiming (Stone) Shi 2023-11-03 12:50:42 +01:00 committed by GitHub
commit 36837e5624
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 155 additions and 30 deletions

View File

@ -169,7 +169,11 @@
-export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]). -export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]).
-export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]).
-export([ -export([
server_ssl_opts_schema/2, client_ssl_opts_schema/1, ciphers_schema/1, tls_versions_schema/1 server_ssl_opts_schema/2,
client_ssl_opts_schema/1,
ciphers_schema/1,
tls_versions_schema/1,
description_schema/0
]). ]).
-export([password_converter/2, bin_str_converter/2]). -export([password_converter/2, bin_str_converter/2]).
-export([authz_fields/0]). -export([authz_fields/0]).
@ -3649,3 +3653,14 @@ default_mem_check_interval() ->
true -> <<"60s">>; true -> <<"60s">>;
false -> disabled false -> disabled
end. end.
description_schema() ->
sc(
string(),
#{
default => <<"">>,
desc => ?DESC(description),
required => false,
importance => ?IMPORTANCE_LOW
}
).

View File

@ -18,6 +18,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(hoconsc, [mk/2, ref/2]). -import(hoconsc, [mk/2, ref/2]).
@ -127,3 +128,45 @@ desc(bridges_v2) ->
?DESC("desc_bridges_v2"); ?DESC("desc_bridges_v2");
desc(_) -> desc(_) ->
undefined. undefined.
-ifdef(TEST).
-include_lib("hocon/include/hocon_types.hrl").
schema_homogeneous_test() ->
case
lists:filtermap(
fun({_Name, Schema}) ->
is_bad_schema(Schema)
end,
fields(bridges_v2)
)
of
[] ->
ok;
List ->
throw(List)
end.
is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
Fields = Module:fields(TypeName),
ExpectedFieldNames = common_field_names(),
MissingFileds = lists:filter(
fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames
),
case MissingFileds of
[] ->
false;
_ ->
{true, #{
schema_modle => Module,
type_name => TypeName,
missing_fields => MissingFileds
}}
end.
common_field_names() ->
%% TODO: add 'config' to the list
[
enable, description, local_topic, connector, resource_opts
].
-endif.

View File

@ -125,7 +125,8 @@ fields(bridge_v2) ->
{connector, {connector,
mk(binary(), #{ mk(binary(), #{
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})} })},
{description, emqx_schema:description_schema()}
], ],
override_documentations(Fields); override_documentations(Fields);
fields(Method) -> fields(Method) ->

View File

@ -266,13 +266,15 @@ fields(kafka_producer_action) ->
{connector, {connector,
mk(binary(), #{ mk(binary(), #{
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})} })},
{description, emqx_schema:description_schema()}
] ++ fields(producer_opts); ] ++ fields(producer_opts);
fields(kafka_consumer) -> fields(kafka_consumer) ->
fields("config") ++ fields(consumer_opts); fields("config") ++ fields(consumer_opts);
fields("config") -> fields("config") ->
[ [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{description, emqx_schema:description_schema()},
{bootstrap_hosts, {bootstrap_hosts,
mk( mk(
binary(), binary(),

View File

@ -698,6 +698,20 @@ create_bridge(Config, Overrides) ->
KafkaConfig = emqx_utils_maps:deep_merge(KafkaConfig0, Overrides), KafkaConfig = emqx_utils_maps:deep_merge(KafkaConfig0, Overrides),
emqx_bridge:create(Type, Name, KafkaConfig). emqx_bridge:create(Type, Name, KafkaConfig).
create_bridge_wait_for_balance(Config) ->
setup_group_subscriber_spy(self()),
try
Res = create_bridge(Config),
receive
{kafka_assignment, _, _} ->
Res
after 20_000 ->
ct:fail("timed out waiting for kafka assignment")
end
after
kill_group_subscriber_spy()
end.
delete_bridge(Config) -> delete_bridge(Config) ->
Type = ?BRIDGE_TYPE_BIN, Type = ?BRIDGE_TYPE_BIN,
Name = ?config(kafka_name, Config), Name = ?config(kafka_name, Config),
@ -1020,31 +1034,37 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) ->
setup_group_subscriber_spy_fn() -> setup_group_subscriber_spy_fn() ->
TestPid = self(), TestPid = self(),
fun() -> fun() ->
ok = meck:new(brod_group_subscriber_v2, [ setup_group_subscriber_spy(TestPid)
passthrough, no_link, no_history, non_strict
]),
ok = meck:expect(
brod_group_subscriber_v2,
assignments_received,
fun(Pid, MemberId, GenerationId, TopicAssignments) ->
?tp(
kafka_assignment,
#{
node => node(),
pid => Pid,
member_id => MemberId,
generation_id => GenerationId,
topic_assignments => TopicAssignments
}
),
TestPid !
{kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
end
),
ok
end. end.
setup_group_subscriber_spy(TestPid) ->
ok = meck:new(brod_group_subscriber_v2, [
passthrough, no_link, no_history, non_strict
]),
ok = meck:expect(
brod_group_subscriber_v2,
assignments_received,
fun(Pid, MemberId, GenerationId, TopicAssignments) ->
?tp(
kafka_assignment,
#{
node => node(),
pid => Pid,
member_id => MemberId,
generation_id => GenerationId,
topic_assignments => TopicAssignments
}
),
TestPid !
{kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
end
),
ok.
kill_group_subscriber_spy() ->
meck:unload(brod_group_subscriber_v2).
wait_for_cluster_rpc(Node) -> wait_for_cluster_rpc(Node) ->
%% need to wait until the config handler is ready after %% need to wait until the config handler is ready after
%% restarting during the cluster join. %% restarting during the cluster join.
@ -1702,10 +1722,7 @@ t_dynamic_mqtt_topic(Config) ->
MQTTTopic = emqx_topic:join([KafkaTopic, '#']), MQTTTopic = emqx_topic:join([KafkaTopic, '#']),
?check_trace( ?check_trace(
begin begin
?assertMatch( ?assertMatch({ok, _}, create_bridge_wait_for_balance(Config)),
{ok, _},
create_bridge(Config)
),
wait_until_subscribers_are_ready(NPartitions, 40_000), wait_until_subscribers_are_ready(NPartitions, 40_000),
{ok, C} = emqtt:start_link(), {ok, C} = emqtt:start_link(),
on_exit(fun() -> emqtt:stop(C) end), on_exit(fun() -> emqtt:stop(C) end),

View File

@ -18,6 +18,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(hoconsc, [mk/2, ref/2]). -import(hoconsc, [mk/2, ref/2]).
@ -305,3 +306,44 @@ to_bin(Bin) when is_binary(Bin) ->
Bin; Bin;
to_bin(Something) -> to_bin(Something) ->
Something. Something.
-ifdef(TEST).
-include_lib("hocon/include/hocon_types.hrl").
schema_homogeneous_test() ->
case
lists:filtermap(
fun({_Name, Schema}) ->
is_bad_schema(Schema)
end,
fields(connectors)
)
of
[] ->
ok;
List ->
throw(List)
end.
is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
Fields = Module:fields(TypeName),
ExpectedFieldNames = common_field_names(),
MissingFileds = lists:filter(
fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames
),
case MissingFileds of
[] ->
false;
_ ->
{true, #{
schema_modle => Module,
type_name => TypeName,
missing_fields => MissingFileds
}}
end.
common_field_names() ->
[
enable, description
].
-endif.

View File

@ -1571,4 +1571,9 @@ the system topic <code>$SYS/sysmon/large_heap</code>."""
sysmon_vm_large_heap.label: sysmon_vm_large_heap.label:
"""Enable Large Heap monitoring.""" """Enable Large Heap monitoring."""
description.label:
"""Description"""
description.desc:
"""Descriptive text."""
} }