From 2bd72aab44efb51c46a8df5364c92f7c9a8c55f1 Mon Sep 17 00:00:00 2001 From: zmstone Date: Tue, 16 Apr 2024 16:58:45 +0200 Subject: [PATCH 01/14] chore: bump dashboard schema version to 0.2.0 --- .../src/emqx_dashboard_schema_api.erl | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema_api.erl index 4a708cd78..2bc1c5b39 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema_api.erl @@ -44,6 +44,8 @@ ]) ). +-define(SCHEMA_VERSION, <<"0.2.0">>). + %%-------------------------------------------------------------------- %% minirest API and schema %%-------------------------------------------------------------------- @@ -97,20 +99,31 @@ gen_schema(connectors) -> connectors_schema_json(). hotconf_schema_json() -> - SchemaInfo = #{title => <<"EMQX Hot Conf API Schema">>, version => <<"0.1.0">>}, + SchemaInfo = #{ + title => <<"EMQX Hot Conf Schema">>, + version => ?SCHEMA_VERSION + }, gen_api_schema_json_iodata(emqx_mgmt_api_configs, SchemaInfo). bridge_schema_json() -> - SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => <<"0.1.0">>}, + SchemaInfo = #{ + title => <<"EMQX Data Bridge Schema">>, + version => ?SCHEMA_VERSION + }, gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo). actions_schema_json() -> - SchemaInfo = #{title => <<"EMQX Data Actions API Schema">>, version => <<"0.1.0">>}, - %% Note: this will be moved to `emqx_actions' application in the future. + SchemaInfo = #{ + title => <<"EMQX Data Actions and Sources Schema">>, + version => ?SCHEMA_VERSION + }, gen_api_schema_json_iodata(emqx_bridge_v2_api, SchemaInfo). connectors_schema_json() -> - SchemaInfo = #{title => <<"EMQX Connectors Schema">>, version => <<"0.1.0">>}, + SchemaInfo = #{ + title => <<"EMQX Connectors Schema">>, + version => ?SCHEMA_VERSION + }, gen_api_schema_json_iodata(emqx_connector_api, SchemaInfo). gen_api_schema_json_iodata(SchemaMod, SchemaInfo) -> From df458b98d77f8c76c17842bdceec2ff765ea0318 Mon Sep 17 00:00:00 2001 From: zmstone Date: Tue, 16 Apr 2024 17:28:34 +0200 Subject: [PATCH 02/14] refactor(dashboard_schema): no need to translate labels the trans_label implementation was ugly, it compares an anonymous function to check if the label should be translated. since we have stopped generating i18n message ids for dashboard schema, this entire function is now stale, so this function is deleted. --- .../src/emqx_dashboard_swagger.erl | 36 +++---------------- 1 file changed, 5 insertions(+), 31 deletions(-) diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index dc188426e..7a5ea1939 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -652,19 +652,6 @@ trans_required(Spec, true, _) -> Spec#{required => true}; trans_required(Spec, _, path) -> Spec#{required => true}; trans_required(Spec, _, _) -> Spec. -trans_desc(Init, Hocon, Func, Name, Options) -> - Spec0 = trans_description(Init, Hocon, Options), - case Func =:= fun hocon_schema_to_spec/2 of - true -> - Spec0; - false -> - Spec1 = trans_label(Spec0, Hocon, Name, Options), - case Spec1 of - #{description := _} -> Spec1; - _ -> Spec1 - end - end. - trans_description(Spec, Hocon, Options) -> Desc = case desc_struct(Hocon) of @@ -702,19 +689,6 @@ get_i18n_text(Lang, Namespace, Id, Tag, Default) -> get_lang(#{i18n_lang := Lang}) -> Lang; get_lang(_) -> emqx:get_config([dashboard, i18n_lang]). -trans_label(Spec, Hocon, Default, Options) -> - Label = - case desc_struct(Hocon) of - ?DESC(_, _) = Struct -> get_i18n(<<"label">>, Struct, Default, Options); - _ -> Default - end, - case Label =:= undefined of - true -> - Spec; - false -> - Spec#{label => Label} - end. - desc_struct(Hocon) -> R = case hocon_schema:field_schema(Hocon, desc) of @@ -772,7 +746,7 @@ response(Status, #{content := _} = Content, {Acc, RefsAcc, Module, Options}) -> response(Status, ?REF(StructName), {Acc, RefsAcc, Module, Options}) -> response(Status, ?R_REF(Module, StructName), {Acc, RefsAcc, Module, Options}); response(Status, ?R_REF(_Mod, _Name) = RRef, {Acc, RefsAcc, Module, Options}) -> - SchemaToSpec = schema_converter(Options), + SchemaToSpec = get_schema_converter(Options), {Spec, Refs} = SchemaToSpec(RRef, Module), Content = content(Spec), { @@ -910,7 +884,7 @@ parse_object(PropList = [_ | _], Module, Options) when is_list(PropList) -> parse_object(Other, Module, Options) -> erlang:throw( {error, #{ - msg => <<"Object only supports not empty proplists">>, + msg => <<"Object only supports non-empty fields list">>, args => Other, module => Module, options => Options @@ -950,10 +924,10 @@ parse_object_loop([{Name, Hocon} | Rest], Module, Options, Props, Required, Refs true -> HoconType = hocon_schema:field_schema(Hocon, type), Init0 = init_prop([default | ?DEFAULT_FIELDS], #{}, Hocon), - SchemaToSpec = schema_converter(Options), + SchemaToSpec = get_schema_converter(Options), Init = maps:remove( summary, - trans_desc(Init0, Hocon, SchemaToSpec, NameBin, Options) + trans_description(Init0, Hocon, Options) ), {Prop, Refs1} = SchemaToSpec(HoconType, Module), NewRequiredAcc = @@ -1002,7 +976,7 @@ to_ref(Mod, StructName, Acc, RefsAcc) -> Ref = #{<<"$ref">> => ?TO_COMPONENTS_PARAM(Mod, StructName)}, {[Ref | Acc], [{Mod, StructName, parameter} | RefsAcc]}. -schema_converter(Options) -> +get_schema_converter(Options) -> maps:get(schema_converter, Options, fun hocon_schema_to_spec/2). hocon_error_msg(Reason) -> From 51c8173174e26efc0a0c9e4f8942d8f9005d8d08 Mon Sep 17 00:00:00 2001 From: zmstone Date: Tue, 16 Apr 2024 19:00:32 +0200 Subject: [PATCH 03/14] feat(bridge): add is_template flag to bridge config fields --- apps/emqx/src/emqx_schema.erl | 11 +++++++-- .../src/emqx_bridge_cassandra.erl | 2 +- .../src/emqx_bridge_clickhouse.erl | 8 +++++-- .../src/emqx_bridge_dynamo.erl | 23 +++++++++--------- apps/emqx_bridge_es/src/emqx_bridge_es.erl | 8 +++---- .../src/emqx_bridge_gcp_pubsub.erl | 19 ++++++++++----- .../src/emqx_bridge_hstreamdb.erl | 17 ++++++++----- .../src/emqx_bridge_http_schema.erl | 2 +- .../src/emqx_bridge_influxdb.erl | 2 +- .../src/emqx_bridge_iotdb.erl | 10 ++++---- .../src/emqx_bridge_kafka.erl | 24 ++++++++++++++----- .../src/emqx_bridge_kinesis.erl | 2 +- .../src/emqx_bridge_mongodb.erl | 6 +++-- .../src/emqx_bridge_mqtt_connector_schema.erl | 14 +++++------ .../src/emqx_bridge_mysql.erl | 2 +- .../src/emqx_bridge_opents.erl | 8 +++---- .../src/emqx_bridge_oracle.erl | 4 ++-- .../src/emqx_bridge_pgsql.erl | 2 +- .../src/emqx_bridge_pulsar_pubsub_schema.erl | 4 ++-- .../emqx_bridge_rabbitmq_pubsub_schema.erl | 2 +- .../src/emqx_bridge_redis.erl | 2 +- .../src/emqx_bridge_rocketmq.erl | 12 ++++++---- .../src/emqx_bridge_rocketmq_connector.erl | 2 +- apps/emqx_bridge_s3/src/emqx_bridge_s3.erl | 2 +- .../src/emqx_bridge_sqlserver.erl | 2 +- .../src/emqx_bridge_syskeeper.erl | 4 ++-- .../src/emqx_bridge_tdengine.erl | 4 ++-- apps/emqx_conf/src/emqx_conf_schema_types.erl | 6 +++++ apps/emqx_s3/src/emqx_s3.app.src | 2 +- apps/emqx_s3/src/emqx_s3_client.erl | 3 ++- apps/emqx_s3/src/emqx_s3_schema.erl | 4 ++-- rel/i18n/emqx_conf_schema_types.hocon | 3 +++ 32 files changed, 133 insertions(+), 83 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 1dab4f42f..57ad00e99 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -61,6 +61,7 @@ }. -type url() :: binary(). -type json_binary() :: binary(). +-type template() :: binary(). -typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). @@ -78,6 +79,7 @@ -typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}). -typerefl_from_string({url/0, emqx_schema, to_url}). -typerefl_from_string({json_binary/0, emqx_schema, to_json_binary}). +-typerefl_from_string({template/0, emqx_schema, to_template}). -type parsed_server() :: #{ hostname := string(), @@ -120,7 +122,8 @@ to_erl_cipher_suite/1, to_comma_separated_atoms/1, to_url/1, - to_json_binary/1 + to_json_binary/1, + to_template/1 ]). -export([ @@ -160,7 +163,8 @@ comma_separated_atoms/0, url/0, json_binary/0, - port_number/0 + port_number/0, + template/0 ]). -export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]). @@ -2594,6 +2598,9 @@ to_json_binary(Str) -> Error end. +to_template(Str) -> + {ok, iolist_to_binary(Str)}. + %% @doc support the following format: %% - 127.0.0.1:1883 %% - ::1:1883 diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl index d34cb1950..80fbc80d2 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl @@ -181,7 +181,7 @@ fields("post", Type) -> cql_field() -> {cql, mk( - binary(), + emqx_schema:template(), #{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>} )}. diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl index 833c2570d..1e07f2340 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl @@ -184,8 +184,12 @@ fields("post", Type) -> sql_field() -> {sql, mk( - binary(), - #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + emqx_schema:template(), + #{ + desc => ?DESC("sql_template"), + default => ?DEFAULT_SQL, + format => <<"sql">> + } )}. batch_value_separator_field() -> diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl index 13828c0f7..d568fee25 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl @@ -160,13 +160,7 @@ fields(dynamo_action) -> ); fields(action_parameters) -> Parameters = - [ - {template, - mk( - binary(), - #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE} - )} - ] ++ emqx_bridge_dynamo_connector:fields(config), + [{template, template_field_schema()}] ++ emqx_bridge_dynamo_connector:fields(config), lists:foldl( fun(Key, Acc) -> proplists:delete(Key, Acc) @@ -199,11 +193,7 @@ fields(connector_resource_opts) -> fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {template, - mk( - binary(), - #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE} - )}, + {template, template_field_schema()}, {local_topic, mk( binary(), @@ -230,6 +220,15 @@ fields("put") -> fields("get") -> emqx_bridge_schema:status_fields() ++ fields("post"). +template_field_schema() -> + mk( + emqx_schema:template(), + #{ + desc => ?DESC("template"), + default => ?DEFAULT_TEMPLATE + } + ). + desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl index 97f3986e4..def0b76f7 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -135,7 +135,7 @@ overwrite() -> index() -> {index, ?HOCON( - binary(), + emqx_schema:template(), #{ required => true, example => <<"${payload.index}">>, @@ -146,7 +146,7 @@ index() -> id(Required) -> {id, ?HOCON( - binary(), + emqx_schema:template(), #{ required => Required, example => <<"${payload.id}">>, @@ -157,7 +157,7 @@ id(Required) -> doc() -> {doc, ?HOCON( - binary(), + emqx_schema:template(), #{ required => false, example => <<"${payload.doc}">>, @@ -187,7 +187,7 @@ doc_as_upsert() -> routing() -> {routing, ?HOCON( - binary(), + emqx_schema:template(), #{ required => false, example => <<"${payload.routing}">>, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl index 007bbc1a0..a5991af81 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl @@ -122,7 +122,7 @@ fields(producer) -> )}, {ordering_key_template, sc( - binary(), + emqx_schema:template(), #{ default => <<>>, desc => ?DESC("ordering_key_template") @@ -130,7 +130,7 @@ fields(producer) -> )}, {payload_template, sc( - binary(), + emqx_schema:template(), #{ default => <<>>, desc => ?DESC("payload_template") @@ -201,8 +201,11 @@ fields(consumer_topic_mapping) -> {qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})}, {payload_template, mk( - string(), - #{default => <<"${.}">>, desc => ?DESC(consumer_mqtt_payload)} + emqx_schema:template(), + #{ + default => <<"${.}">>, + desc => ?DESC(consumer_mqtt_payload) + } )} ]; fields("consumer_resource_opts") -> @@ -221,14 +224,18 @@ fields("consumer_resource_opts") -> fields(key_value_pair) -> [ {key, - mk(binary(), #{ + mk(emqx_schema:template(), #{ required => true, validator => [ emqx_resource_validator:not_empty("Key templates must not be empty") ], desc => ?DESC(kv_pair_key) })}, - {value, mk(binary(), #{required => true, desc => ?DESC(kv_pair_value)})} + {value, + mk(emqx_schema:template(), #{ + required => true, + desc => ?DESC(kv_pair_value) + })} ]; fields("get_producer") -> emqx_bridge_schema:status_fields() ++ fields("post_producer"); diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl index 7fa19c9a4..7024a2e07 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl @@ -167,13 +167,13 @@ fields(action_parameters) -> })}, {partition_key, - mk(binary(), #{ - required => false, desc => ?DESC(emqx_bridge_hstreamdb_connector, "partition_key") + mk(emqx_schema:template(), #{ + required => false, + desc => ?DESC(emqx_bridge_hstreamdb_connector, "partition_key") })}, {grpc_flush_timeout, fun grpc_flush_timeout/1}, - {record_template, - mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})}, + {record_template, record_template_schema()}, {aggregation_pool_size, mk(pos_integer(), #{ default => ?DEFAULT_AGG_POOL_SIZE, desc => ?DESC("aggregation_pool_size") @@ -222,6 +222,12 @@ fields("put") -> hstream_bridge_common_fields() ++ connector_fields(). +record_template_schema() -> + mk(emqx_schema:template(), #{ + default => <<"${payload}">>, + desc => ?DESC("record_template") + }). + grpc_timeout(type) -> emqx_schema:timeout_duration_ms(); grpc_timeout(desc) -> ?DESC(emqx_bridge_hstreamdb_connector, "grpc_timeout"); grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW; @@ -239,8 +245,7 @@ hstream_bridge_common_fields() -> [ {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, - {record_template, - mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})} + {record_template, record_template_schema()} ] ++ emqx_resource_schema:fields("resource_opts"). diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl index 43f3d1748..ef150adfc 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl @@ -287,7 +287,7 @@ method_field() -> body_field() -> {body, mk( - binary(), + emqx_schema:template(), #{ default => undefined, desc => ?DESC("config_body") diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl index a62effe51..59d36cd5f 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl @@ -42,7 +42,7 @@ %% api write_syntax_type() -> - typerefl:alias("string", write_syntax()). + typerefl:alias("template", write_syntax()). %% Examples conn_bridge_examples(Method) -> diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 134868978..599be842a 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -84,7 +84,7 @@ fields(action_parameters) -> )}, {device_id, mk( - binary(), + emqx_schema:template(), #{ desc => ?DESC("config_device_id") } @@ -114,7 +114,7 @@ fields(action_parameters_data) -> )}, {measurement, mk( - binary(), + emqx_schema:template(), #{ required => true, desc => ?DESC("config_parameters_measurement") @@ -122,7 +122,9 @@ fields(action_parameters_data) -> )}, {data_type, mk( - hoconsc:union([enum([text, boolean, int32, int64, float, double]), binary()]), + hoconsc:union([ + enum([text, boolean, int32, int64, float, double]), emqx_schema:template() + ]), #{ required => true, desc => ?DESC("config_parameters_data_type") @@ -130,7 +132,7 @@ fields(action_parameters_data) -> )}, {value, mk( - binary(), + emqx_schema:template(), #{ required => true, desc => ?DESC("config_parameters_value") diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index ff9d19c0d..b0b0c3a03 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -477,11 +477,20 @@ fields(producer_kafka_ext_headers) -> ]; fields(kafka_message) -> [ - {key, mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC(kafka_message_key)})}, - {value, mk(string(), #{default => <<"${.}">>, desc => ?DESC(kafka_message_value)})}, + {key, + mk(emqx_schema:template(), #{ + default => <<"${.clientid}">>, + desc => ?DESC(kafka_message_key) + })}, + {value, + mk(emqx_schema:template(), #{ + default => <<"${.}">>, + desc => ?DESC(kafka_message_value) + })}, {timestamp, - mk(string(), #{ - default => <<"${.timestamp}">>, desc => ?DESC(kafka_message_timestamp) + mk(emqx_schema:template(), #{ + default => <<"${.timestamp}">>, + desc => ?DESC(kafka_message_timestamp) })} ]; fields(producer_buffer) -> @@ -536,8 +545,11 @@ fields(consumer_topic_mapping) -> {qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})}, {payload_template, mk( - string(), - #{default => <<"${.}">>, desc => ?DESC(consumer_mqtt_payload)} + emqx_schema:template(), + #{ + default => <<"${.}">>, + desc => ?DESC(consumer_mqtt_payload) + } )} ]; fields(consumer_kafka_opts) -> diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl index 3c22e41e2..40849a29d 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl @@ -150,7 +150,7 @@ fields(producer) -> [ {payload_template, sc( - binary(), + emqx_schema:template(), #{ default => <<"${.}">>, desc => ?DESC("payload_template") diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl index c81df1334..593bf6ff8 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl @@ -44,8 +44,10 @@ roots() -> []. fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})}, - {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})}, - {payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})}, + {collection, + mk(emqx_schema:template(), #{desc => ?DESC("collection"), default => <<"mqtt">>})}, + {payload_template, + mk(emqx_schema:template(), #{required => false, desc => ?DESC("payload_template")})}, {resource_opts, mk( ref(?MODULE, "creation_opts"), diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl index 7103e53ee..bc2939c24 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl @@ -200,7 +200,7 @@ fields("ingress_local") -> [ {topic, mk( - binary(), + emqx_schema:template(), #{ validator => fun emqx_schema:non_empty_string/1, desc => ?DESC("ingress_local_topic"), @@ -217,7 +217,7 @@ fields("ingress_local") -> )}, {retain, mk( - hoconsc:union([boolean(), binary()]), + hoconsc:union([boolean(), emqx_schema:template()]), #{ default => <<"${retain}">>, desc => ?DESC("retain") @@ -225,7 +225,7 @@ fields("ingress_local") -> )}, {payload, mk( - binary(), + emqx_schema:template(), #{ default => undefined, desc => ?DESC("payload") @@ -268,7 +268,7 @@ fields("egress_remote") -> [ {topic, mk( - binary(), + emqx_schema:template(), #{ required => true, validator => fun emqx_schema:non_empty_string/1, @@ -286,7 +286,7 @@ fields("egress_remote") -> )}, {retain, mk( - hoconsc:union([boolean(), binary()]), + hoconsc:union([boolean(), emqx_schema:template()]), #{ required => false, default => false, @@ -295,7 +295,7 @@ fields("egress_remote") -> )}, {payload, mk( - binary(), + emqx_schema:template(), #{ default => undefined, desc => ?DESC("payload") @@ -344,7 +344,7 @@ desc(_) -> undefined. qos() -> - hoconsc:union([emqx_schema:qos(), binary()]). + hoconsc:union([emqx_schema:qos(), emqx_schema:template()]). parse_server(Str) -> #{hostname := Host, port := Port} = emqx_schema:parse_server(Str, ?MQTT_HOST_OPTS), diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.erl b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.erl index ee7487760..24b11b930 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.erl +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.erl @@ -117,7 +117,7 @@ fields("config") -> {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {sql, mk( - binary(), + emqx_schema:template(), #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} )}, {local_topic, diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl index d38ed8eb4..25c0ce88d 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl @@ -146,7 +146,7 @@ fields(action_parameters_data) -> [ {timestamp, mk( - binary(), + emqx_schema:template(), #{ desc => ?DESC("config_parameters_timestamp"), required => false @@ -154,7 +154,7 @@ fields(action_parameters_data) -> )}, {metric, mk( - binary(), + emqx_schema:template(), #{ required => true, desc => ?DESC("config_parameters_metric") @@ -162,7 +162,7 @@ fields(action_parameters_data) -> )}, {tags, mk( - hoconsc:union([map(), binary()]), + hoconsc:union([map(), emqx_schema:template()]), #{ required => true, desc => ?DESC("config_parameters_tags"), @@ -188,7 +188,7 @@ fields(action_parameters_data) -> )}, {value, mk( - hoconsc:union([integer(), float(), binary()]), + hoconsc:union([integer(), float(), emqx_schema:template()]), #{ required => true, desc => ?DESC("config_parameters_value") diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl index fb485c16b..c3b4160ab 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl @@ -158,7 +158,7 @@ fields(action_parameters) -> [ {sql, hoconsc:mk( - binary(), + emqx_schema:template(), #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} )} ]; @@ -177,7 +177,7 @@ fields("config") -> )}, {sql, hoconsc:mk( - binary(), + emqx_schema:template(), #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} )}, {local_topic, diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index 7d02e8cca..5a0b9eb5b 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -61,7 +61,7 @@ fields(action_parameters) -> [ {sql, hoconsc:mk( - binary(), + emqx_schema:template(), #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>} )} ]; diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl index ccf985ba8..dff62843e 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl @@ -51,12 +51,12 @@ fields(action_parameters) -> fields(producer_pulsar_message) -> [ {key, - ?HOCON(string(), #{ + ?HOCON(emqx_schema:template(), #{ default => <<"${.clientid}">>, desc => ?DESC("producer_key_template") })}, {value, - ?HOCON(string(), #{ + ?HOCON(emqx_schema:template(), #{ default => <<"${.}">>, desc => ?DESC("producer_value_template") })} diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl index 9a9741226..b0c254fc4 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl @@ -99,7 +99,7 @@ fields(action_parameters) -> )}, {payload_template, hoconsc:mk( - binary(), + emqx_schema:template(), #{ default => <<"">>, desc => ?DESC(?CONNECTOR_SCHEMA, "payload_template") diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl index c80f9ead1..c9b2a35b9 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl @@ -211,7 +211,7 @@ desc(_) -> undefined. command_template(type) -> - list(binary()); + hoconsc:array(emqx_schema:template()); command_template(required) -> true; command_template(validator) -> diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl index 589719486..750993e9a 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl @@ -162,8 +162,11 @@ fields(action_parameters) -> [ {template, mk( - binary(), - #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE} + emqx_schema:template(), + #{ + desc => ?DESC("template"), + default => ?DEFAULT_TEMPLATE + } )} ] ++ emqx_bridge_rocketmq_connector:fields(config), lists:foldl( @@ -205,7 +208,7 @@ fields("config") -> {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {template, mk( - binary(), + emqx_schema:template(), #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE} )}, {local_topic, @@ -214,8 +217,7 @@ fields("config") -> #{desc => ?DESC("local_topic"), required => false} )} ] ++ emqx_resource_schema:fields("resource_opts") ++ - (emqx_bridge_rocketmq_connector:fields(config) -- - emqx_connector_schema_lib:prepare_statement_fields()); + emqx_bridge_rocketmq_connector:fields(config); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") -> diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 1af520a93..0bea5a8ff 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -47,7 +47,7 @@ fields(config) -> {servers, servers()}, {topic, mk( - binary(), + emqx_schema:template(), #{default => <<"TopicTest">>, desc => ?DESC(topic)} )}, {access_key, diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl index 5d7e176e3..79cc560d2 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl @@ -77,7 +77,7 @@ fields(s3_upload_parameters) -> [ {content, hoconsc:mk( - string(), + emqx_schema:template(), #{ required => false, default => <<"${.}">>, diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl index e9df1fdb6..af66b8a88 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl @@ -192,7 +192,7 @@ fields(action_parameters) -> [ {sql, mk( - binary(), + emqx_schema:template(), #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} )} ]; diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl index 9ac0efe8a..547562f26 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl @@ -112,7 +112,7 @@ fields("parameters") -> [ {target_topic, mk( - binary(), + emqx_schema:template(), #{desc => ?DESC("target_topic"), default => <<"${topic}">>} )}, {target_qos, @@ -122,7 +122,7 @@ fields("parameters") -> )}, {template, mk( - binary(), + emqx_schema:template(), #{desc => ?DESC("template"), default => <<"${payload}">>} )} ]; diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl index 6e71da87e..f086f00dc 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl @@ -83,7 +83,7 @@ fields("config") -> {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {sql, mk( - binary(), + emqx_schema:template(), #{ desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, @@ -125,7 +125,7 @@ fields(action_parameters) -> {database, fun emqx_connector_schema_lib:database/1}, {sql, mk( - binary(), + emqx_schema:template(), #{ desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, diff --git a/apps/emqx_conf/src/emqx_conf_schema_types.erl b/apps/emqx_conf/src/emqx_conf_schema_types.erl index f530ee872..49ee8fbaf 100644 --- a/apps/emqx_conf/src/emqx_conf_schema_types.erl +++ b/apps/emqx_conf/src/emqx_conf_schema_types.erl @@ -65,6 +65,12 @@ readable("boolean()") -> dashboard => #{type => boolean}, docgen => #{type => "Boolean"} }; +readable("template()") -> + #{ + swagger => #{type => string}, + dashboard => #{type => string, is_template => true}, + docgen => #{type => "String", desc => ?DESC(template)} + }; readable("binary()") -> #{ swagger => #{type => string}, diff --git a/apps/emqx_s3/src/emqx_s3.app.src b/apps/emqx_s3/src/emqx_s3.app.src index 965cb099d..c307f2c9c 100644 --- a/apps/emqx_s3/src/emqx_s3.app.src +++ b/apps/emqx_s3/src/emqx_s3.app.src @@ -1,6 +1,6 @@ {application, emqx_s3, [ {description, "EMQX S3"}, - {vsn, "5.0.14"}, + {vsn, "5.1.0"}, {modules, []}, {registered, [emqx_s3_sup]}, {applications, [ diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index a415cf8d4..b7bd85833 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -103,7 +103,7 @@ put_object(Client, Key, Value) -> -spec put_object(client(), key(), upload_options(), iodata()) -> ok_or_error(term()). put_object( - #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig = #aws_config{}}, + #{bucket := Bucket0, headers := BaseHeaders, aws_config := AwsConfig = #aws_config{}}, Key, UploadOpts, Content @@ -111,6 +111,7 @@ put_object( ECKey = erlcloud_key(Key), ECOpts = erlcloud_upload_options(UploadOpts), Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)), + Bucket = to_list_string(Bucket0), try erlcloud_s3:put_object(Bucket, ECKey, Content, ECOpts, Headers, AwsConfig) of Props when is_list(Props) -> ok diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl index ff8c632bd..de5e4f53e 100644 --- a/apps/emqx_s3/src/emqx_s3_schema.erl +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -74,7 +74,7 @@ fields(s3_upload) -> [ {bucket, mk( - string(), + emqx_schema:template(), #{ desc => ?DESC("bucket"), required => true @@ -82,7 +82,7 @@ fields(s3_upload) -> )}, {key, mk( - string(), + emqx_schema:template(), #{ desc => ?DESC("key"), required => true diff --git a/rel/i18n/emqx_conf_schema_types.hocon b/rel/i18n/emqx_conf_schema_types.hocon index 6b9dac9ea..f9eefbe1d 100644 --- a/rel/i18n/emqx_conf_schema_types.hocon +++ b/rel/i18n/emqx_conf_schema_types.hocon @@ -9,4 +9,7 @@ emqx_conf_schema_types { secret.desc: """A string holding some sensitive information, such as a password. When secret starts with file://, the rest of the string is interpreted as a path to a file containing the secret itself: whole content of the file except any trailing whitespace characters is considered a secret value. Note: when clustered, all EMQX nodes should have the same file present before using file:// secrets.""" + template.desc: """~ + A string for `${.path.to.var}` style value interpolation, + where the leading dot is optional, and `${.}` represents all values as an object.""" } From f9f14f9758697cebf09f7a1f5647b83b9d158ec6 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 17 Apr 2024 13:08:08 +0200 Subject: [PATCH 04/14] refactor(emqx_conf): raise exception at higher level for more context --- apps/emqx_conf/src/emqx_conf.erl | 22 +++++++++++++----- apps/emqx_conf/src/emqx_conf_schema_types.erl | 23 +++++++++++-------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 0bd319503..23dda6b02 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -304,12 +304,22 @@ gen_flat_doc(RootNames, #{full_name := FullName, fields := Fields} = S, DescReso false -> ok end, - #{ - text => short_name(FullName), - hash => format_hash(FullName), - doc => maps:get(desc, S, <<"">>), - fields => format_fields(Fields, DescResolver) - }. + try + #{ + text => short_name(FullName), + hash => format_hash(FullName), + doc => maps:get(desc, S, <<"">>), + fields => format_fields(Fields, DescResolver) + } + catch + throw:Reason -> + io:format( + standard_error, + "failed_to_build_doc for ~s:~n~p~n", + [FullName, Reason] + ), + error(failed_to_build_doc) + end. format_fields(Fields, DescResolver) -> [format_field(F, DescResolver) || F <- Fields]. diff --git a/apps/emqx_conf/src/emqx_conf_schema_types.erl b/apps/emqx_conf/src/emqx_conf_schema_types.erl index 49ee8fbaf..dbfbe74bc 100644 --- a/apps/emqx_conf/src/emqx_conf_schema_types.erl +++ b/apps/emqx_conf/src/emqx_conf_schema_types.erl @@ -33,8 +33,19 @@ readable(Module, TypeStr) when is_list(TypeStr) -> %% Module is ignored so far as all types are distinguished by their names readable(TypeStr) catch - throw:unknown_type -> - fail(#{reason => unknown_type, type => TypeStr, module => Module}) + throw:Reason -> + throw(#{ + reason => Reason, + type => TypeStr, + module => Module + }); + error:Reason:Stacktrace -> + throw(#{ + reason => Reason, + stacktrace => Stacktrace, + type => TypeStr, + module => Module + }) end. readable_swagger(Module, TypeStr) -> @@ -49,16 +60,10 @@ readable_docgen(Module, TypeStr) -> get_readable(Module, TypeStr, Flavor) -> Map = readable(Module, TypeStr), case maps:get(Flavor, Map, undefined) of - undefined -> fail(#{reason => unknown_type, module => Module, type => TypeStr}); + undefined -> throw(#{reason => unknown_type, module => Module, type => TypeStr}); Value -> Value end. -%% Fail the build or test. Production code should never get here. --spec fail(_) -> no_return(). -fail(Reason) -> - io:format(standard_error, "ERROR: ~p~n", [Reason]), - error(Reason). - readable("boolean()") -> #{ swagger => #{type => boolean}, From d49e98bc4b6defce0500a45ef9db1c084f1f885e Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 17 Apr 2024 13:53:43 +0200 Subject: [PATCH 05/14] test: fix dashboard schema validation --- apps/emqx_dashboard/src/emqx_dashboard_schema_api.erl | 8 ++++---- scripts/test/emqx-smoke-test.sh | 6 ++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema_api.erl index 2bc1c5b39..632c8b8d4 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema_api.erl @@ -100,28 +100,28 @@ gen_schema(connectors) -> hotconf_schema_json() -> SchemaInfo = #{ - title => <<"EMQX Hot Conf Schema">>, + title => <<"Hot Conf Schema">>, version => ?SCHEMA_VERSION }, gen_api_schema_json_iodata(emqx_mgmt_api_configs, SchemaInfo). bridge_schema_json() -> SchemaInfo = #{ - title => <<"EMQX Data Bridge Schema">>, + title => <<"Data Bridge Schema">>, version => ?SCHEMA_VERSION }, gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo). actions_schema_json() -> SchemaInfo = #{ - title => <<"EMQX Data Actions and Sources Schema">>, + title => <<"Actions and Sources Schema">>, version => ?SCHEMA_VERSION }, gen_api_schema_json_iodata(emqx_bridge_v2_api, SchemaInfo). connectors_schema_json() -> SchemaInfo = #{ - title => <<"EMQX Connectors Schema">>, + title => <<"Connectors Schema">>, version => ?SCHEMA_VERSION }, gen_api_schema_json_iodata(emqx_connector_api, SchemaInfo). diff --git a/scripts/test/emqx-smoke-test.sh b/scripts/test/emqx-smoke-test.sh index 4430a313a..8177d7b85 100755 --- a/scripts/test/emqx-smoke-test.sh +++ b/scripts/test/emqx-smoke-test.sh @@ -82,8 +82,10 @@ main() { ## The json status feature was added after hotconf and bridges schema API if [ "$JSON_STATUS" != 'NOT_JSON' ]; then check_swagger_json - check_schema_json hotconf "EMQX Hot Conf API Schema" - check_schema_json bridges "EMQX Data Bridge API Schema" + check_schema_json hotconf "Hot Conf Schema" + check_schema_json bridges "Data Bridge Schema" + check_schema_json actions "Actions and Sources Schema" + check_schema_json connectors "Connectors Schema" fi } From 5c014f4c299d2c34fd5b5dbd66565f78e93319a0 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 17 Apr 2024 15:04:03 +0200 Subject: [PATCH 06/14] test: fix test cases --- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl | 4 ++-- apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl | 2 +- apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl | 4 ++-- apps/emqx_s3/test/emqx_s3_schema_SUITE.erl | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index b0b0c3a03..cf96ce6cb 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -756,8 +756,8 @@ producer_strategy_key_validator( producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf)); producer_strategy_key_validator(#{ <<"partition_strategy">> := key_dispatch, - <<"message">> := #{<<"key">> := ""} -}) -> + <<"message">> := #{<<"key">> := Key} +}) when Key =:= "" orelse Key =:= <<>> -> {error, "Message key cannot be empty when `key_dispatch` strategy is used"}; producer_strategy_key_validator(_) -> ok. diff --git a/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl index 917c24ffa..0f2448480 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl @@ -359,7 +359,7 @@ t_bad_ref(_Config) -> Refs = [{?MODULE, bad_ref}], Fields = fields(bad_ref), ?assertThrow( - {error, #{msg := <<"Object only supports not empty proplists">>, args := Fields}}, + {error, #{msg := <<"Object only supports non-empty fields list">>, args := Fields}}, validate(Path, Spec, Refs) ), ok. diff --git a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl index 6fa3dbd3d..e9397f643 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl @@ -189,7 +189,7 @@ t_nest_object(_Config) -> t_empty(_Config) -> ?assertThrow( {error, #{ - msg := <<"Object only supports not empty proplists">>, + msg := <<"Object only supports non-empty fields list">>, args := [], module := ?MODULE }}, @@ -273,7 +273,7 @@ t_bad_ref(_Config) -> ?assertThrow( {error, #{ module := ?MODULE, - msg := <<"Object only supports not empty proplists">> + msg := <<"Object only supports non-empty fields list">> }}, validate(Path, Object, ExpectRefs) ), diff --git a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl index ec02aeac4..170f8a065 100644 --- a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl @@ -20,7 +20,7 @@ all() -> t_minimal_config(_Config) -> ?assertMatch( #{ - bucket := "bucket", + bucket := <<"bucket">>, host := "s3.us-east-1.endpoint.com", port := 443, min_part_size := 5242880, @@ -45,7 +45,7 @@ t_full_config(_Config) -> #{ access_key_id := "access_key_id", acl := public_read, - bucket := "bucket", + bucket := <<"bucket">>, host := "s3.us-east-1.endpoint.com", min_part_size := 10485760, port := 443, From 5a4bfff9e55486d36d911a6c5747aaebb9dbbbb6 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 17 Apr 2024 15:10:26 +0200 Subject: [PATCH 07/14] refactor: add template_str type --- apps/emqx/src/emqx_schema.erl | 11 +++++++++-- apps/emqx_conf/src/emqx_conf_schema_types.erl | 6 ++++++ apps/emqx_s3/src/emqx_s3_client.erl | 3 +-- apps/emqx_s3/src/emqx_s3_schema.erl | 4 ++-- apps/emqx_s3/test/emqx_s3_schema_SUITE.erl | 4 ++-- 5 files changed, 20 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 57ad00e99..02e31387e 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -62,6 +62,7 @@ -type url() :: binary(). -type json_binary() :: binary(). -type template() :: binary(). +-type template_str() :: string(). -typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). @@ -80,6 +81,7 @@ -typerefl_from_string({url/0, emqx_schema, to_url}). -typerefl_from_string({json_binary/0, emqx_schema, to_json_binary}). -typerefl_from_string({template/0, emqx_schema, to_template}). +-typerefl_from_string({template_str/0, emqx_schema, to_template_str}). -type parsed_server() :: #{ hostname := string(), @@ -123,7 +125,8 @@ to_comma_separated_atoms/1, to_url/1, to_json_binary/1, - to_template/1 + to_template/1, + to_template_str/1 ]). -export([ @@ -164,7 +167,8 @@ url/0, json_binary/0, port_number/0, - template/0 + template/0, + template_str/0 ]). -export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]). @@ -2601,6 +2605,9 @@ to_json_binary(Str) -> to_template(Str) -> {ok, iolist_to_binary(Str)}. +to_template_str(Str) -> + {ok, unicode:characters_to_list(Str, utf8)}. + %% @doc support the following format: %% - 127.0.0.1:1883 %% - ::1:1883 diff --git a/apps/emqx_conf/src/emqx_conf_schema_types.erl b/apps/emqx_conf/src/emqx_conf_schema_types.erl index dbfbe74bc..bcc9c1469 100644 --- a/apps/emqx_conf/src/emqx_conf_schema_types.erl +++ b/apps/emqx_conf/src/emqx_conf_schema_types.erl @@ -76,6 +76,12 @@ readable("template()") -> dashboard => #{type => string, is_template => true}, docgen => #{type => "String", desc => ?DESC(template)} }; +readable("template_str()") -> + #{ + swagger => #{type => string}, + dashboard => #{type => string, is_template => true}, + docgen => #{type => "String", desc => ?DESC(template)} + }; readable("binary()") -> #{ swagger => #{type => string}, diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index b7bd85833..a415cf8d4 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -103,7 +103,7 @@ put_object(Client, Key, Value) -> -spec put_object(client(), key(), upload_options(), iodata()) -> ok_or_error(term()). put_object( - #{bucket := Bucket0, headers := BaseHeaders, aws_config := AwsConfig = #aws_config{}}, + #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig = #aws_config{}}, Key, UploadOpts, Content @@ -111,7 +111,6 @@ put_object( ECKey = erlcloud_key(Key), ECOpts = erlcloud_upload_options(UploadOpts), Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)), - Bucket = to_list_string(Bucket0), try erlcloud_s3:put_object(Bucket, ECKey, Content, ECOpts, Headers, AwsConfig) of Props when is_list(Props) -> ok diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl index de5e4f53e..1199948d0 100644 --- a/apps/emqx_s3/src/emqx_s3_schema.erl +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -74,7 +74,7 @@ fields(s3_upload) -> [ {bucket, mk( - emqx_schema:template(), + emqx_schema:template_str(), #{ desc => ?DESC("bucket"), required => true @@ -82,7 +82,7 @@ fields(s3_upload) -> )}, {key, mk( - emqx_schema:template(), + emqx_schema:template_str(), #{ desc => ?DESC("key"), required => true diff --git a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl index 170f8a065..ec02aeac4 100644 --- a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl @@ -20,7 +20,7 @@ all() -> t_minimal_config(_Config) -> ?assertMatch( #{ - bucket := <<"bucket">>, + bucket := "bucket", host := "s3.us-east-1.endpoint.com", port := 443, min_part_size := 5242880, @@ -45,7 +45,7 @@ t_full_config(_Config) -> #{ access_key_id := "access_key_id", acl := public_read, - bucket := <<"bucket">>, + bucket := "bucket", host := "s3.us-east-1.endpoint.com", min_part_size := 10485760, port := 443, From c96ae8dd2375666a63aebc3431efc8d092347bed Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 17 Apr 2024 16:11:26 +0200 Subject: [PATCH 08/14] fix: return 503 if bridge bpapi call timeout --- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index a7bef1952..e33e1ca07 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -1007,7 +1007,13 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, _ConfRootKey, BridgeType, Br {error, not_implemented} -> ?NOT_IMPLEMENTED; {error, timeout} -> - ?BAD_REQUEST(<<"Request timeout">>); + BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + ?SLOG(warning, #{ + msg => "bridge_bpapi_call_timeout", + bridge => BridgeId, + call => OperFunc + }), + ?SERVICE_UNAVAILABLE(<<"Request timeout">>); {error, {start_pool_failed, Name, Reason}} -> Msg = bin( io_lib:format("Failed to start ~p pool for reason ~p", [Name, redact(Reason)]) @@ -1018,9 +1024,8 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, _ConfRootKey, BridgeType, Br ?SLOG(warning, #{ msg => "bridge_inconsistent_in_cluster_for_call_operation", reason => not_found, - type => BridgeType, - name => BridgeName, - bridge => BridgeId + bridge => BridgeId, + call => OperFunc }), ?SERVICE_UNAVAILABLE(<<"Bridge not found on remote node: ", BridgeId/binary>>); {error, {node_not_found, Node}} -> From 6ab2b004ed24c2c583336df14140a1d4b4ea5b80 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 17 Apr 2024 16:40:25 +0200 Subject: [PATCH 09/14] fix(resource_manager): update cache after channel add --- apps/emqx_resource/src/emqx_resource_manager.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index fcdf56202..6d9ad50e4 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -1058,7 +1058,8 @@ channels_health_check(?status_connected = _ConnectorStatus, Data0) -> get_config_for_channels(Data0, ChannelsNotAdded), Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0), %% Now that we have done the adding, we can get the status of all channels - trigger_health_check_for_added_channels(Data1); + Data2 = trigger_health_check_for_added_channels(Data1), + update_state(Data2, Data0); channels_health_check(?status_connecting = _ConnectorStatus, Data0) -> %% Whenever the resource is connecting: %% 1. Change the status of all added channels to connecting From 55941000c02dab8168608e6ddad571e9f470d9e1 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 17 Apr 2024 20:42:03 +0200 Subject: [PATCH 10/14] test: make test case more stable, less flaky --- .../test/emqx_bridge_pulsar_connector_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl index b3c351da0..cd54e2194 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl @@ -1235,7 +1235,7 @@ t_resilience(Config) -> after 1_000 -> ct:fail("producer didn't stop!") end, Consumed = lists:flatmap( - fun(_) -> receive_consumed(5_000) end, lists:seq(1, NumProduced) + fun(_) -> receive_consumed(10_000) end, lists:seq(1, NumProduced) ), ?assertEqual(NumProduced, length(Consumed)), ExpectedPayloads = lists:map(fun integer_to_binary/1, lists:seq(1, NumProduced)), From ab763fe6656c42df5c59dd2a31feebdfe05bcf7d Mon Sep 17 00:00:00 2001 From: zmstone Date: Thu, 18 Apr 2024 09:32:05 +0200 Subject: [PATCH 11/14] test: fix test case flakyness --- .../src/emqx_authn/emqx_authn_chains.erl | 19 ++++++++++--------- .../test/emqx_authn/emqx_authn_init_SUITE.erl | 3 ++- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl index 62163dda3..0d21058e3 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl @@ -353,13 +353,13 @@ init(_Opts) -> ok = emqx_config_handler:add_handler([listeners, '?', '?', ?CONF_ROOT], Module), ok = hook_deny(), {ok, #{hooked => false, providers => #{}, init_done => false}, - {continue, initialize_authentication}}. + {continue, {initialize_authentication, init}}}. handle_call(get_providers, _From, #{providers := Providers} = State) -> reply(Providers, State); handle_call( {register_providers, Providers}, - _From, + From, #{providers := Reg0} = State ) -> case lists:filter(fun({T, _}) -> maps:is_key(T, Reg0) end, Providers) of @@ -371,7 +371,7 @@ handle_call( Reg0, Providers ), - reply(ok, State#{providers := Reg}, initialize_authentication); + reply(ok, State#{providers := Reg}, {initialize_authentication, From}); Clashes -> reply({error, {authentication_type_clash, Clashes}}, State) end; @@ -447,10 +447,10 @@ handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. -handle_continue(initialize_authentication, #{init_done := true} = State) -> +handle_continue({initialize_authentication, _From}, #{init_done := true} = State) -> {noreply, State}; -handle_continue(initialize_authentication, #{providers := Providers} = State) -> - InitDone = initialize_authentication(Providers), +handle_continue({initialize_authentication, From}, #{providers := Providers} = State) -> + InitDone = initialize_authentication(Providers, From), {noreply, maybe_hook(State#{init_done := InitDone})}. handle_cast(Req, State) -> @@ -484,11 +484,13 @@ code_change(_OldVsn, State, _Extra) -> %% Private functions %%------------------------------------------------------------------------------ -initialize_authentication(Providers) -> +initialize_authentication(Providers, From) -> ProviderTypes = maps:keys(Providers), Chains = chain_configs(), HasProviders = has_providers_for_configs(Chains, ProviderTypes), - do_initialize_authentication(Providers, Chains, HasProviders). + Result = do_initialize_authentication(Providers, Chains, HasProviders), + ?tp(info, authn_chains_initialization_done, #{from => From, result => Result}), + Result. do_initialize_authentication(_Providers, _Chains, _HasProviders = false) -> false; @@ -500,7 +502,6 @@ do_initialize_authentication(Providers, Chains, _HasProviders = true) -> Chains ), ok = unhook_deny(), - ?tp(info, authn_chains_initialization_done, #{}), true. initialize_chain_authentication(_Providers, _ChainName, []) -> diff --git a/apps/emqx_auth/test/emqx_authn/emqx_authn_init_SUITE.erl b/apps/emqx_auth/test/emqx_authn/emqx_authn_init_SUITE.erl index fec1f3fa4..78e179ccb 100644 --- a/apps/emqx_auth/test/emqx_authn/emqx_authn_init_SUITE.erl +++ b/apps/emqx_auth/test/emqx_authn/emqx_authn_init_SUITE.erl @@ -69,9 +69,10 @@ t_initialize(_Config) -> emqx_access_control:authenticate(?CLIENTINFO) ), + Self = self(), ?assertWaitEvent( ok = emqx_authn_test_lib:register_fake_providers([{password_based, built_in_database}]), - #{?snk_kind := authn_chains_initialization_done}, + #{?snk_kind := authn_chains_initialization_done, from := {Self, _}}, 100 ), From ca56e7e8d74ea540a8e832daa99ccfa9f9114bf3 Mon Sep 17 00:00:00 2001 From: zmstone Date: Thu, 18 Apr 2024 13:04:36 +0200 Subject: [PATCH 12/14] fix(kafka): headers are template fields --- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index cf96ce6cb..83bc33266 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -389,7 +389,7 @@ fields(producer_kafka_opts) -> )}, {kafka_headers, mk( - binary(), + emqx_schema:template(), #{ required => false, validator => fun kafka_header_validator/1, @@ -462,12 +462,12 @@ fields(producer_kafka_ext_headers) -> [ {kafka_ext_header_key, mk( - binary(), + emqx_schema:template(), #{required => true, desc => ?DESC(producer_kafka_ext_header_key)} )}, {kafka_ext_header_value, mk( - binary(), + emqx_schema:template(), #{ required => true, validator => fun kafka_ext_header_value_validator/1, From ede4eeae9fdbdd0d13df8841154fb1585f1fba11 Mon Sep 17 00:00:00 2001 From: zmstone Date: Thu, 18 Apr 2024 13:04:56 +0200 Subject: [PATCH 13/14] fix(http_bridge): path is template field --- apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl | 4 ++-- apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index ae1e727ca..4eef6968b 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -128,8 +128,8 @@ fields("request") -> desc => ?DESC("method"), validator => fun ?MODULE:validate_method/1 })}, - {path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})}, - {body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})}, + {path, hoconsc:mk(emqx_schema:template(), #{required => false, desc => ?DESC("path")})}, + {body, hoconsc:mk(emqx_schema:template(), #{required => false, desc => ?DESC("body")})}, {headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})}, {max_retries, sc( diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl index ef150adfc..8b33b1523 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl @@ -114,7 +114,7 @@ fields("parameters_opts") -> [ {path, mk( - binary(), + emqx_schema:template(), #{ desc => ?DESC("config_path"), required => false From 5b38d592f02cdcbf67744e0577aed0ab08823a13 Mon Sep 17 00:00:00 2001 From: zmstone Date: Thu, 18 Apr 2024 13:16:29 +0200 Subject: [PATCH 14/14] feat(http): add `is_template` as HTTP headers field property is_template was designed to be type property. however for HTTP headers, it's a map() type, instead of creating a new type for it, it's easier to just add it as a field property. --- apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl | 3 ++- apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl | 3 ++- apps/emqx_dashboard/src/emqx_dashboard_swagger.erl | 6 +++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 4eef6968b..e8143d87f 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -130,7 +130,8 @@ fields("request") -> })}, {path, hoconsc:mk(emqx_schema:template(), #{required => false, desc => ?DESC("path")})}, {body, hoconsc:mk(emqx_schema:template(), #{required => false, desc => ?DESC("body")})}, - {headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})}, + {headers, + hoconsc:mk(map(), #{required => false, desc => ?DESC("headers"), is_template => true})}, {max_retries, sc( non_neg_integer(), diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl index 8b33b1523..cadbcf0d2 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl @@ -270,7 +270,8 @@ headers_field() -> <<"content-type">> => <<"application/json">>, <<"keep-alive">> => <<"timeout=5">> }, - desc => ?DESC("config_headers") + desc => ?DESC("config_headers"), + is_template => true } )}. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 7a5ea1939..4ada5994c 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -57,7 +57,11 @@ allowEmptyValue, deprecated, minimum, - maximum + maximum, + %% is_template is a type property, + %% but some exceptions are made for them to be field property + %% for example, HTTP headers (which is a map type) + is_template ]). -define(INIT_SCHEMA, #{