diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index d9fd12ab5..804a3a04c 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -169,7 +169,11 @@ -export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]). -export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([ - server_ssl_opts_schema/2, client_ssl_opts_schema/1, ciphers_schema/1, tls_versions_schema/1 + server_ssl_opts_schema/2, + client_ssl_opts_schema/1, + ciphers_schema/1, + tls_versions_schema/1, + description_schema/0 ]). -export([password_converter/2, bin_str_converter/2]). -export([authz_fields/0]). @@ -3649,3 +3653,14 @@ default_mem_check_interval() -> true -> <<"60s">>; false -> disabled end. + +description_schema() -> + sc( + string(), + #{ + default => <<"">>, + desc => ?DESC(description), + required => false, + importance => ?IMPORTANCE_LOW + } + ). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 0badc82db..93ec4c8d4 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -18,6 +18,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("eunit/include/eunit.hrl"). -import(hoconsc, [mk/2, ref/2]). @@ -127,3 +128,45 @@ desc(bridges_v2) -> ?DESC("desc_bridges_v2"); desc(_) -> undefined. + +-ifdef(TEST). +-include_lib("hocon/include/hocon_types.hrl"). +schema_homogeneous_test() -> + case + lists:filtermap( + fun({_Name, Schema}) -> + is_bad_schema(Schema) + end, + fields(bridges_v2) + ) + of + [] -> + ok; + List -> + throw(List) + end. + +is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> + Fields = Module:fields(TypeName), + ExpectedFieldNames = common_field_names(), + MissingFileds = lists:filter( + fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames + ), + case MissingFileds of + [] -> + false; + _ -> + {true, #{ + schema_modle => Module, + type_name => TypeName, + missing_fields => MissingFileds + }} + end. + +common_field_names() -> + %% TODO: add 'config' to the list + [ + enable, description, local_topic, connector, resource_opts + ]. + +-endif. diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index 333b66434..05459ac47 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -125,7 +125,8 @@ fields(bridge_v2) -> {connector, mk(binary(), #{ desc => ?DESC(emqx_connector_schema, "connector_field"), required => true - })} + })}, + {description, emqx_schema:description_schema()} ], override_documentations(Fields); fields(Method) -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 470f63add..68e84dfc1 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -266,13 +266,15 @@ fields(kafka_producer_action) -> {connector, mk(binary(), #{ desc => ?DESC(emqx_connector_schema, "connector_field"), required => true - })} + })}, + {description, emqx_schema:description_schema()} ] ++ fields(producer_opts); fields(kafka_consumer) -> fields("config") ++ fields(consumer_opts); fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {description, emqx_schema:description_schema()}, {bootstrap_hosts, mk( binary(), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 48ff89dd5..af64ddf37 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -698,6 +698,20 @@ create_bridge(Config, Overrides) -> KafkaConfig = emqx_utils_maps:deep_merge(KafkaConfig0, Overrides), emqx_bridge:create(Type, Name, KafkaConfig). +create_bridge_wait_for_balance(Config) -> + setup_group_subscriber_spy(self()), + try + Res = create_bridge(Config), + receive + {kafka_assignment, _, _} -> + Res + after 20_000 -> + ct:fail("timed out waiting for kafka assignment") + end + after + kill_group_subscriber_spy() + end. + delete_bridge(Config) -> Type = ?BRIDGE_TYPE_BIN, Name = ?config(kafka_name, Config), @@ -1020,31 +1034,37 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) -> setup_group_subscriber_spy_fn() -> TestPid = self(), fun() -> - ok = meck:new(brod_group_subscriber_v2, [ - passthrough, no_link, no_history, non_strict - ]), - ok = meck:expect( - brod_group_subscriber_v2, - assignments_received, - fun(Pid, MemberId, GenerationId, TopicAssignments) -> - ?tp( - kafka_assignment, - #{ - node => node(), - pid => Pid, - member_id => MemberId, - generation_id => GenerationId, - topic_assignments => TopicAssignments - } - ), - TestPid ! - {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}}, - meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) - end - ), - ok + setup_group_subscriber_spy(TestPid) end. +setup_group_subscriber_spy(TestPid) -> + ok = meck:new(brod_group_subscriber_v2, [ + passthrough, no_link, no_history, non_strict + ]), + ok = meck:expect( + brod_group_subscriber_v2, + assignments_received, + fun(Pid, MemberId, GenerationId, TopicAssignments) -> + ?tp( + kafka_assignment, + #{ + node => node(), + pid => Pid, + member_id => MemberId, + generation_id => GenerationId, + topic_assignments => TopicAssignments + } + ), + TestPid ! + {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}}, + meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) + end + ), + ok. + +kill_group_subscriber_spy() -> + meck:unload(brod_group_subscriber_v2). + wait_for_cluster_rpc(Node) -> %% need to wait until the config handler is ready after %% restarting during the cluster join. @@ -1702,10 +1722,7 @@ t_dynamic_mqtt_topic(Config) -> MQTTTopic = emqx_topic:join([KafkaTopic, '#']), ?check_trace( begin - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), + ?assertMatch({ok, _}, create_bridge_wait_for_balance(Config)), wait_until_subscribers_are_ready(NPartitions, 40_000), {ok, C} = emqtt:start_link(), on_exit(fun() -> emqtt:stop(C) end), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 4d803fd79..4a3d57aa6 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -18,6 +18,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("eunit/include/eunit.hrl"). -import(hoconsc, [mk/2, ref/2]). @@ -305,3 +306,44 @@ to_bin(Bin) when is_binary(Bin) -> Bin; to_bin(Something) -> Something. + +-ifdef(TEST). +-include_lib("hocon/include/hocon_types.hrl"). +schema_homogeneous_test() -> + case + lists:filtermap( + fun({_Name, Schema}) -> + is_bad_schema(Schema) + end, + fields(connectors) + ) + of + [] -> + ok; + List -> + throw(List) + end. + +is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> + Fields = Module:fields(TypeName), + ExpectedFieldNames = common_field_names(), + MissingFileds = lists:filter( + fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames + ), + case MissingFileds of + [] -> + false; + _ -> + {true, #{ + schema_modle => Module, + type_name => TypeName, + missing_fields => MissingFileds + }} + end. + +common_field_names() -> + [ + enable, description + ]. + +-endif. diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 634b93c24..9ed579994 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1571,4 +1571,9 @@ the system topic $SYS/sysmon/large_heap.""" sysmon_vm_large_heap.label: """Enable Large Heap monitoring.""" +description.label: +"""Description""" +description.desc: +"""Descriptive text.""" + }