diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 4679a8d04..e2951c302 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -187,7 +187,7 @@ fields(client_opts) -> )}, {max_retry_time, ?HOCON( - emqx_schema:duration(), + emqx_schema:timeout_duration(), #{ desc => ?DESC(max_retry_time), default => <<"1h">>, diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ab3d7bf71..37d5350a5 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -30,9 +30,19 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("logger.hrl"). +-define(MAX_INT_TIMEOUT_MS, 4294967295). +%% floor(?MAX_INT_TIMEOUT_MS / 1000). +-define(MAX_INT_TIMEOUT_S, 4294967). + -type duration() :: integer(). -type duration_s() :: integer(). -type duration_ms() :: integer(). +%% ?MAX_INT_TIMEOUT is defined loosely in some OTP modules like +%% `erpc', `rpc' `gen' and `peer', despite affecting `receive' blocks +%% as well. It's `2^32 - 1'. +-type timeout_duration() :: 0..?MAX_INT_TIMEOUT_MS. +-type timeout_duration_s() :: 0..?MAX_INT_TIMEOUT_S. +-type timeout_duration_ms() :: 0..?MAX_INT_TIMEOUT_MS. -type bytesize() :: integer(). -type wordsize() :: bytesize(). -type percent() :: float(). @@ -56,6 +66,9 @@ -typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). -typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}). +-typerefl_from_string({timeout_duration/0, emqx_schema, to_timeout_duration}). +-typerefl_from_string({timeout_duration_s/0, emqx_schema, to_timeout_duration_s}). +-typerefl_from_string({timeout_duration_ms/0, emqx_schema, to_timeout_duration_ms}). -typerefl_from_string({bytesize/0, emqx_schema, to_bytesize}). -typerefl_from_string({wordsize/0, emqx_schema, to_wordsize}). -typerefl_from_string({percent/0, emqx_schema, to_percent}). @@ -91,6 +104,9 @@ to_duration/1, to_duration_s/1, to_duration_ms/1, + to_timeout_duration/1, + to_timeout_duration_s/1, + to_timeout_duration_ms/1, mk_duration/2, to_bytesize/1, to_wordsize/1, @@ -127,6 +143,9 @@ duration/0, duration_s/0, duration_ms/0, + timeout_duration/0, + timeout_duration_s/0, + timeout_duration_ms/0, bytesize/0, wordsize/0, percent/0, @@ -1037,7 +1056,7 @@ fields("mqtt_quic_listener") -> )}, {"idle_timeout", sc( - duration_ms(), + timeout_duration_ms(), #{ default => 0, desc => ?DESC(fields_mqtt_quic_listener_idle_timeout), @@ -1054,7 +1073,7 @@ fields("mqtt_quic_listener") -> )}, {"handshake_idle_timeout", sc( - duration_ms(), + timeout_duration_ms(), #{ default => <<"10s">>, desc => ?DESC(fields_mqtt_quic_listener_handshake_idle_timeout), @@ -1071,7 +1090,7 @@ fields("mqtt_quic_listener") -> )}, {"keep_alive_interval", sc( - duration_ms(), + timeout_duration_ms(), #{ default => 0, desc => ?DESC(fields_mqtt_quic_listener_keep_alive_interval), @@ -2637,6 +2656,37 @@ to_duration_ms(Str) -> _ -> {error, Str} end. +-spec to_timeout_duration(Input) -> {ok, timeout_duration()} | {error, Input} when + Input :: string() | binary(). +to_timeout_duration(Str) -> + do_to_timeout_duration(Str, fun to_duration/1, ?MAX_INT_TIMEOUT_MS, "ms"). + +-spec to_timeout_duration_ms(Input) -> {ok, timeout_duration_ms()} | {error, Input} when + Input :: string() | binary(). +to_timeout_duration_ms(Str) -> + do_to_timeout_duration(Str, fun to_duration_ms/1, ?MAX_INT_TIMEOUT_MS, "ms"). + +-spec to_timeout_duration_s(Input) -> {ok, timeout_duration_s()} | {error, Input} when + Input :: string() | binary(). +to_timeout_duration_s(Str) -> + do_to_timeout_duration(Str, fun to_duration_s/1, ?MAX_INT_TIMEOUT_S, "s"). + +do_to_timeout_duration(Str, Fn, Max, Unit) -> + case Fn(Str) of + {ok, I} -> + case I =< Max of + true -> + {ok, I}; + false -> + Msg = lists:flatten( + io_lib:format("timeout value too large (max: ~b ~s)", [Max, Unit]) + ), + throw(Msg) + end; + Err -> + Err + end. + to_bytesize(Str) -> case hocon_postprocess:bytesize(Str) of I when is_integer(I) -> {ok, I}; diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index e1d95227b..c1b41c292 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -45,7 +45,9 @@ limited_atom/0, limited_latin_atom/0, printable_utf8/0, - printable_codepoint/0 + printable_codepoint/0, + raw_duration/0, + large_raw_duration/0 ]). %% Generic Types @@ -629,6 +631,20 @@ printable_codepoint() -> {1, range(16#E000, 16#FFFD)} ]). +raw_duration() -> + ?LET( + {Value, Unit}, + {pos_integer(), oneof([<<"d">>, <<"h">>, <<"m">>, <<"s">>, <<"ms">>])}, + <<(integer_to_binary(Value))/binary, Unit/binary>> + ). + +large_raw_duration() -> + ?LET( + {Value, Unit}, + {range(1_000_000, inf), oneof([<<"d">>, <<"h">>, <<"m">>])}, + <<(integer_to_binary(Value))/binary, Unit/binary>> + ). + %%-------------------------------------------------------------------- %% Iterators %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_schema_tests.erl b/apps/emqx/test/emqx_schema_tests.erl index 3dcfa331e..ad2341460 100644 --- a/apps/emqx/test/emqx_schema_tests.erl +++ b/apps/emqx/test/emqx_schema_tests.erl @@ -809,3 +809,31 @@ set_envs([{_Name, _Value} | _] = Envs) -> unset_envs([{_Name, _Value} | _] = Envs) -> lists:map(fun({Name, _}) -> os:unsetenv(Name) end, Envs). + +timeout_types_test_() -> + [ + ?_assertEqual( + {ok, 4294967295}, + typerefl:from_string(emqx_schema:timeout_duration(), <<"4294967295ms">>) + ), + ?_assertEqual( + {ok, 4294967295}, + typerefl:from_string(emqx_schema:timeout_duration_ms(), <<"4294967295ms">>) + ), + ?_assertEqual( + {ok, 4294967}, + typerefl:from_string(emqx_schema:timeout_duration_s(), <<"4294967000ms">>) + ), + ?_assertThrow( + "timeout value too large (max: 4294967295 ms)", + typerefl:from_string(emqx_schema:timeout_duration(), <<"4294967296ms">>) + ), + ?_assertThrow( + "timeout value too large (max: 4294967295 ms)", + typerefl:from_string(emqx_schema:timeout_duration_ms(), <<"4294967296ms">>) + ), + ?_assertThrow( + "timeout value too large (max: 4294967 s)", + typerefl:from_string(emqx_schema:timeout_duration_s(), <<"4294967001ms">>) + ) + ]. diff --git a/apps/emqx/test/props/prop_emqx_schema.erl b/apps/emqx/test/props/prop_emqx_schema.erl new file mode 100644 index 000000000..5d5e8f017 --- /dev/null +++ b/apps/emqx/test/props/prop_emqx_schema.erl @@ -0,0 +1,99 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_emqx_schema). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(MAX_INT_TIMEOUT_MS, 4294967295). + +%%-------------------------------------------------------------------- +%% Helper fns +%%-------------------------------------------------------------------- + +parse(Value, Type) -> + typerefl:from_string(Type, Value). + +timeout_within_bounds(RawDuration) -> + case emqx_schema:to_duration_ms(RawDuration) of + {ok, I} when I =< ?MAX_INT_TIMEOUT_MS -> + true; + _ -> + false + end. + +parses_the_same(Value, Type1, Type2) -> + parse(Value, Type1) =:= parse(Value, Type2). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_timeout_duration_refines_duration() -> + ?FORALL( + RawDuration, + emqx_proper_types:raw_duration(), + ?IMPLIES( + timeout_within_bounds(RawDuration), + parses_the_same(RawDuration, emqx_schema:duration(), emqx_schema:timeout_duration()) + ) + ). + +prop_timeout_duration_ms_refines_duration_ms() -> + ?FORALL( + RawDuration, + emqx_proper_types:raw_duration(), + ?IMPLIES( + timeout_within_bounds(RawDuration), + parses_the_same( + RawDuration, emqx_schema:duration_ms(), emqx_schema:timeout_duration_ms() + ) + ) + ). + +prop_timeout_duration_s_refines_duration_s() -> + ?FORALL( + RawDuration, + emqx_proper_types:raw_duration(), + ?IMPLIES( + timeout_within_bounds(RawDuration), + parses_the_same(RawDuration, emqx_schema:duration_s(), emqx_schema:timeout_duration_s()) + ) + ). + +prop_timeout_duration_is_valid_for_receive_after() -> + ?FORALL( + RawDuration, + emqx_proper_types:large_raw_duration(), + ?IMPLIES( + not timeout_within_bounds(RawDuration), + begin + %% we have to use the the non-strict version, because it's invalid + {ok, Timeout} = parse(RawDuration, emqx_schema:duration()), + Ref = make_ref(), + timer:send_after(20, {Ref, ok}), + ?assertError( + timeout_value, + receive + {Ref, ok} -> error(should_be_invalid) + after Timeout -> error(should_be_invalid) + end + ), + true + end + ) + ). diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index aefd9112f..5d975cca7 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -93,7 +93,7 @@ fields(config) -> )}, {connect_timeout, hoconsc:mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"15s">>, desc => ?DESC("connect_timeout") diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl index 70109a0ea..1bd21ce5c 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl @@ -47,7 +47,7 @@ fields(bridge_config) -> [ {connect_timeout, sc( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"15s">>, desc => ?DESC("connect_timeout") @@ -80,7 +80,7 @@ fields(bridge_config) -> )}, {request_timeout, sc( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ required => false, deprecated => {since, "e5.0.1"}, diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src index 14d881399..80b708582 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_influxdb, [ {description, "EMQX Enterprise InfluxDB Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, influxdb]}, {env, []}, diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 2f65f7902..05e7c11b2 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -39,6 +39,9 @@ -type ts_precision() :: ns | us | ms | s. +%% Allocatable resources +-define(influx_client, influx_client). + -define(INFLUXDB_DEFAULT_PORT, 8086). %% influxdb servers don't need parse @@ -53,10 +56,20 @@ callback_mode() -> async_if_possible. on_start(InstId, Config) -> + %% InstID as pool would be handled by influxdb client + %% so there is no need to allocate pool_name here + %% ehttpc for influxdb-v1/v2, + %% ecpool for influxdb-udp + %% See: influxdb:start_client/1 start_client(InstId, Config). -on_stop(_InstId, #{client := Client}) -> - influxdb:stop_client(Client). +on_stop(InstId, _State) -> + case emqx_resource:get_allocated_resources(InstId) of + #{?influx_client := Client} -> + influxdb:stop_client(Client); + _ -> + ok + end. on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) -> case data_to_points(Data, SyntaxLines) of @@ -220,8 +233,12 @@ start_client(InstId, Config) -> config => emqx_utils:redact(Config), client_config => emqx_utils:redact(ClientConfig) }), - try - do_start_client(InstId, ClientConfig, Config) + try do_start_client(InstId, ClientConfig, Config) of + Res = {ok, #{client := Client}} -> + ok = emqx_resource:allocate_resource(InstId, ?influx_client, Client), + Res; + {error, Reason} -> + {error, Reason} catch E:R:S -> ?tp(influxdb_connector_start_exception, #{error => {E, R}}), diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 2948bd59c..9a9e95b65 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -144,7 +144,7 @@ request_config() -> )}, {request_timeout, mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"15s">>, desc => ?DESC("config_request_timeout") diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 73a71787e..64f2394c4 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -165,20 +165,20 @@ fields("config") -> } )}, {connect_timeout, - mk(emqx_schema:duration_ms(), #{ + mk(emqx_schema:timeout_duration_ms(), #{ default => <<"5s">>, desc => ?DESC(connect_timeout) })}, {min_metadata_refresh_interval, mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"3s">>, desc => ?DESC(min_metadata_refresh_interval) } )}, {metadata_request_timeout, - mk(emqx_schema:duration_ms(), #{ + mk(emqx_schema:timeout_duration_ms(), #{ default => <<"5s">>, desc => ?DESC(metadata_request_timeout) })}, diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 79e8ae36e..06b0256e2 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -165,19 +165,32 @@ sql_create_table() -> "CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))". sql_drop_table() -> - "DROP TABLE mqtt_test". + "BEGIN\n" + " EXECUTE IMMEDIATE 'DROP TABLE mqtt_test';\n" + " EXCEPTION\n" + " WHEN OTHERS THEN\n" + " IF SQLCODE = -942 THEN\n" + " NULL;\n" + " ELSE\n" + " RAISE;\n" + " END IF;\n" + " END;". + +sql_check_table_exist() -> + "SELECT COUNT(*) FROM user_tables WHERE table_name = 'MQTT_TEST'". reset_table(Config) -> ResourceId = resource_id(Config), - _ = emqx_resource:simple_sync_query(ResourceId, {sql, sql_drop_table()}), + drop_table_if_exists(Config), {ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query( ResourceId, {sql, sql_create_table()} ), ok. -drop_table(Config) -> +drop_table_if_exists(Config) -> ResourceId = resource_id(Config), - emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}), + {ok, [{proc_result, 0, _}]} = + emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}), ok. oracle_config(TestCase, _ConnectionType, Config) -> @@ -392,6 +405,12 @@ t_batch_sync_query(Config) -> emqx_bridge:send_message(BridgeId, Params), ok end), + % Wait for reconnection. + ?retry( + _Sleep = 1_000, + _Attempts = 30, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), ?retry( _Sleep = 1_000, _Attempts = 30, @@ -527,3 +546,32 @@ t_no_sid_nor_service_name(Config0) -> create_bridge(Config) ), ok. + +t_table_removed(Config) -> + ResourceId = resource_id(Config), + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + drop_table_if_exists(Config), + MsgId = erlang:unique_integer(), + Params = #{ + topic => ?config(mqtt_topic, Config), + id => MsgId, + payload => ?config(oracle_name, Config), + retain => true + }, + Message = {send_message, Params}, + ?assertEqual( + {error, {unrecoverable_error, {942, "ORA-00942: table or view does not exist\n"}}}, + emqx_resource:simple_sync_query(ResourceId, Message) + ), + ok + end, + [] + ), + ok. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index aa1076d33..038da3e61 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -62,7 +62,7 @@ fields(config) -> )}, {connect_timeout, mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"5s">>, desc => ?DESC("connect_timeout") @@ -86,11 +86,12 @@ fields(producer_opts) -> default => <<"1MB">>, desc => ?DESC("producer_send_buffer") })}, {sync_timeout, - mk(emqx_schema:duration_ms(), #{ + mk(emqx_schema:timeout_duration_ms(), #{ default => <<"3s">>, desc => ?DESC("producer_sync_timeout") })}, {retention_period, mk( + %% not used in a `receive ... after' block, just timestamp comparison hoconsc:union([infinity, emqx_schema:duration_ms()]), #{default => infinity, desc => ?DESC("producer_retention_period")} )}, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index cbdcbc845..e8404ffd3 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -84,7 +84,7 @@ fields(config) -> )}, {timeout, hoconsc:mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"5s">>, desc => ?DESC("timeout") @@ -100,7 +100,7 @@ fields(config) -> )}, {publish_confirmation_timeout, hoconsc:mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"30s">>, desc => ?DESC("timeout") @@ -117,7 +117,7 @@ fields(config) -> )}, {heartbeat, hoconsc:mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"30s">>, desc => ?DESC("heartbeat") diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index dfb1f3def..a7d01960e 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -58,12 +58,12 @@ fields(config) -> mk(binary(), #{default => <<>>, desc => ?DESC(security_token), sensitive => true})}, {sync_timeout, mk( - emqx_schema:duration(), + emqx_schema:timeout_duration(), #{default => <<"3s">>, desc => ?DESC(sync_timeout)} )}, {refresh_interval, mk( - emqx_schema:duration(), + emqx_schema:timeout_duration(), #{default => <<"3s">>, desc => ?DESC(refresh_interval)} )}, {send_buffer, @@ -102,22 +102,23 @@ on_start( emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS) ), ClientId = client_id(InstanceId), - TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), #{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config), ClientCfg = #{acl_info => AclInfo}, Templates = parse_template(Config), - ProducersMapPID = create_producers_map(ClientId), + State = #{ client_id => ClientId, topic => Topic, topic_tokens => TopicTks, sync_timeout => SyncTimeout, templates => Templates, - producers_map_pid => ProducersMapPID, producers_opts => ProducerOpts }, + ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId), + create_producers_map(ClientId), + case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of {ok, _Pid} -> {ok, State}; @@ -130,23 +131,22 @@ on_start( {error, Reason} end. -on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) -> +on_stop(InstanceId, _State) -> ?SLOG(info, #{ msg => "stopping_rocketmq_connector", connector => InstanceId }), - Producers = ets:match(ClientId, {{RawTopic, '$1'}, '$2'}), lists:foreach( - fun([Topic, Producer]) -> - ets:delete(ClientId, {RawTopic, Topic}), - _ = rocketmq:stop_and_delete_supervised_producers(Producer) + fun + ({_, client_id, ClientId}) -> + destory_producers_map(ClientId), + ok = rocketmq:stop_and_delete_supervised_client(ClientId); + ({_, _Topic, Producer}) -> + _ = rocketmq:stop_and_delete_supervised_producers(Producer) end, - Producers - ), - - Pid ! ok, - ok = rocketmq:stop_and_delete_supervised_client(ClientId). + emqx_resource:get_allocated_resources_list(InstanceId) + ). on_query(InstanceId, Query, State) -> do_query(InstanceId, Query, send_sync, State). @@ -179,7 +179,6 @@ do_query( #{ templates := Templates, client_id := ClientId, - topic := RawTopic, topic_tokens := TopicTks, producers_opts := ProducerOpts, sync_timeout := RequestTimeout @@ -191,7 +190,7 @@ do_query( #{connector => InstanceId, query => Query, state => State} ), - TopicKey = get_topic_key(Query, RawTopic, TopicTks), + TopicKey = get_topic_key(Query, TopicTks), Data = apply_template(Query, Templates), Result = safe_do_produce( @@ -220,7 +219,7 @@ do_query( safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) -> try - Producers = get_producers(ClientId, TopicKey, ProducerOpts), + Producers = get_producers(InstanceId, ClientId, TopicKey, ProducerOpts), produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout) catch _Type:Reason -> @@ -249,10 +248,10 @@ parse_template([{Key, H} | T], Templates) -> parse_template([], Templates) -> Templates. -get_topic_key({_, Msg}, RawTopic, TopicTks) -> - {RawTopic, emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg)}; -get_topic_key([Query | _], RawTopic, TopicTks) -> - get_topic_key(Query, RawTopic, TopicTks). +get_topic_key({_, Msg}, TopicTks) -> + emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg); +get_topic_key([Query | _], TopicTks) -> + get_topic_key(Query, TopicTks). apply_template({Key, Msg} = _Req, Templates) -> case maps:get(Key, Templates, undefined) of @@ -317,29 +316,29 @@ acl_info(_, _, _) -> #{}. create_producers_map(ClientId) -> - erlang:spawn(fun() -> - case ets:whereis(ClientId) of - undefined -> - _ = ets:new(ClientId, [public, named_table]), - ok; - _ -> - ok - end, - receive - _Msg -> - ok - end - end). + _ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]), + ok. -get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) -> - case ets:lookup(ClientId, TopicKey) of - [{_, Producers0}] -> - Producers0; - _ -> - ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic1]), - {ok, Producers0} = rocketmq:ensure_supervised_producers( - ClientId, ProducerGroup, Topic1, ProducerOpts - ), - ets:insert(ClientId, {TopicKey, Producers0}), - Producers0 +%% The resource manager will not terminate when restarting a resource, +%% so manually destroying the ets table is necessary. +destory_producers_map(ClientId) -> + case ets:whereis(ClientId) of + undefined -> + ok; + Tid -> + ets:delete(Tid) + end. + +get_producers(InstanceId, ClientId, Topic, ProducerOpts) -> + case ets:lookup(ClientId, Topic) of + [{_, Producers}] -> + Producers; + _ -> + ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic]), + {ok, Producers} = rocketmq:ensure_supervised_producers( + ClientId, ProducerGroup, Topic, ProducerOpts + ), + ok = emqx_resource:allocate_resource(InstanceId, Topic, Producers), + ets:insert(ClientId, {Topic, Producers}), + Producers end. diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index b4ba2bf47..0f5bafe9f 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -528,7 +528,7 @@ fields("node") -> )}, {"crash_dump_seconds", sc( - emqx_schema:duration_s(), + emqx_schema:timeout_duration_s(), #{ mapping => "vm_args.-env ERL_CRASH_DUMP_SECONDS", default => <<"30s">>, @@ -550,7 +550,7 @@ fields("node") -> )}, {"dist_net_ticktime", sc( - emqx_schema:duration_s(), + emqx_schema:timeout_duration_s(), #{ mapping => "vm_args.-kernel net_ticktime", default => <<"2m">>, @@ -821,7 +821,7 @@ fields("rpc") -> )}, {"socket_keepalive_idle", sc( - emqx_schema:duration_s(), + emqx_schema:timeout_duration_s(), #{ mapping => "gen_rpc.socket_keepalive_idle", default => <<"15m">>, @@ -830,7 +830,7 @@ fields("rpc") -> )}, {"socket_keepalive_interval", sc( - emqx_schema:duration_s(), + emqx_schema:timeout_duration_s(), #{ mapping => "gen_rpc.socket_keepalive_interval", default => <<"75s">>, @@ -972,7 +972,7 @@ fields("log_overload_kill") -> )}, {"restart_after", sc( - hoconsc:union([emqx_schema:duration_ms(), infinity]), + hoconsc:union([emqx_schema:timeout_duration_ms(), infinity]), #{ default => <<"5s">>, desc => ?DESC("log_overload_kill_restart_after") diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index ef4224592..386414cd4 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -67,7 +67,7 @@ fields(config) -> [ {connect_timeout, sc( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"15s">>, desc => ?DESC("connect_timeout") @@ -80,7 +80,7 @@ fields(config) -> )}, {retry_interval, sc( - emqx_schema:duration(), + emqx_schema:timeout_duration(), #{deprecated => {since, "5.0.4"}} )}, {pool_type, @@ -138,7 +138,7 @@ fields("request") -> )}, {request_timeout, sc( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ required => false, desc => ?DESC("request_timeout") diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 1d969e6f1..6cf717c46 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -201,5 +201,5 @@ port(type) -> integer(); port(default) -> 389; port(_) -> undefined. -duration(type) -> emqx_schema:duration_ms(); +duration(type) -> emqx_schema:timeout_duration_ms(); duration(_) -> undefined. diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 5b63daef3..1a64a0132 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -108,7 +108,7 @@ fields(topology) -> {wait_queue_timeout_ms, duration("wait_queue_timeout")}, {heartbeat_frequency_ms, hoconsc:mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"200s">>, desc => ?DESC("heartbeat_period") @@ -422,7 +422,7 @@ r_mode(_) -> undefined. duration(Desc) -> #{ - type => emqx_schema:duration_ms(), + type => emqx_schema:timeout_duration_ms(), required => false, desc => ?DESC(Desc) }. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl index e2b02edab..4b8c12054 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl @@ -38,7 +38,7 @@ fields("dashboard") -> {default_password, fun default_password/1}, {sample_interval, ?HOCON( - emqx_schema:duration_s(), + emqx_schema:timeout_duration_s(), #{ default => <<"10s">>, desc => ?DESC(sample_interval), diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 6e354e1cf..47acee58b 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -819,6 +819,12 @@ typename_to_spec("duration_s()", _Mod) -> #{type => string, example => <<"1h">>}; typename_to_spec("duration_ms()", _Mod) -> #{type => string, example => <<"32s">>}; +typename_to_spec("timeout_duration()", _Mod) -> + #{type => string, example => <<"12m">>}; +typename_to_spec("timeout_duration_s()", _Mod) -> + #{type => string, example => <<"1h">>}; +typename_to_spec("timeout_duration_ms()", _Mod) -> + #{type => string, example => <<"32s">>}; typename_to_spec("percent()", _Mod) -> #{type => number, example => <<"12%">>}; typename_to_spec("file()", _Mod) -> diff --git a/apps/emqx_dashboard/test/emqx_swagger_remote_schema.erl b/apps/emqx_dashboard/test/emqx_swagger_remote_schema.erl index c2266ad5b..dc9d54260 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_remote_schema.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_remote_schema.erl @@ -32,7 +32,7 @@ fields("root") -> )}, {default_username, fun default_username/1}, {default_password, fun default_password/1}, - {sample_interval, mk(emqx_schema:duration_s(), #{default => <<"10s">>})}, + {sample_interval, mk(emqx_schema:timeout_duration_s(), #{default => <<"10s">>})}, {token_expired_time, mk(emqx_schema:duration(), #{default => <<"30m">>})} ]; fields("ref1") -> diff --git a/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl index e60f7318f..e8c79c57c 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl @@ -751,7 +751,7 @@ schema("/object") -> {per_page, mk(range(1, 100), #{required => true, desc => <<"good per page desc">>})}, {timeout, mk( - hoconsc:union([infinity, emqx_schema:duration_s()]), + hoconsc:union([infinity, emqx_schema:timeout_duration_s()]), #{default => 5, required => true} )}, {inner_ref, mk(hoconsc:ref(?MODULE, good_ref), #{})} @@ -761,7 +761,7 @@ schema("/nest/object") -> {per_page, mk(range(1, 100), #{desc => <<"good per page desc">>})}, {timeout, mk( - hoconsc:union([infinity, emqx_schema:duration_s()]), + hoconsc:union([infinity, emqx_schema:timeout_duration_s()]), #{default => 5, required => true} )}, {nest_object, [ @@ -785,7 +785,7 @@ schema("/ref/array/with/key") -> {per_page, mk(range(1, 100), #{desc => <<"good per page desc">>})}, {timeout, mk( - hoconsc:union([infinity, emqx_schema:duration_s()]), + hoconsc:union([infinity, emqx_schema:timeout_duration_s()]), #{default => 5, required => true} )}, {array_refs, mk(hoconsc:array(hoconsc:ref(?MODULE, good_ref)), #{})} diff --git a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl index f23752653..c0771f973 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl @@ -573,7 +573,7 @@ schema("/object") -> {per_page, mk(range(1, 100), #{required => true, desc => <<"good per page desc">>})}, {timeout, mk( - hoconsc:union([infinity, emqx_schema:duration_s()]), + hoconsc:union([infinity, emqx_schema:timeout_duration_s()]), #{default => 5, required => true} )}, {inner_ref, mk(hoconsc:ref(?MODULE, good_ref), #{})} @@ -584,7 +584,7 @@ schema("/nest/object") -> {per_page, mk(range(1, 100), #{desc => <<"good per page desc">>})}, {timeout, mk( - hoconsc:union([infinity, emqx_schema:duration_s()]), + hoconsc:union([infinity, emqx_schema:timeout_duration_s()]), #{default => 5, required => true} )}, {nest_object, [ @@ -613,13 +613,14 @@ schema("/ref/array/with/key") -> {per_page, mk(range(1, 100), #{desc => <<"good per page desc">>})}, {timeout, mk( - hoconsc:union([infinity, emqx_schema:duration_s()]), + hoconsc:union([infinity, emqx_schema:timeout_duration_s()]), #{default => 5, required => true} )}, {assert, mk(float(), #{desc => <<"money">>})}, {number_ex, mk(number(), #{desc => <<"number example">>})}, {percent_ex, mk(emqx_schema:percent(), #{desc => <<"percent example">>})}, - {duration_ms_ex, mk(emqx_schema:duration_ms(), #{desc => <<"duration ms example">>})}, + {duration_ms_ex, + mk(emqx_schema:timeout_duration_ms(), #{desc => <<"duration ms example">>})}, {atom_ex, mk(atom(), #{desc => <<"atom ex">>})}, {array_refs, mk(hoconsc:array(hoconsc:ref(?MODULE, good_ref)), #{})} ]); diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index 194c91206..92a70cf37 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_exhook, [ {description, "EMQX Extension for Hook"}, - {vsn, "5.0.12"}, + {vsn, "5.0.13"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, diff --git a/apps/emqx_exhook/src/emqx_exhook_schema.erl b/apps/emqx_exhook/src/emqx_exhook_schema.erl index f6cc896f3..8a2139495 100644 --- a/apps/emqx_exhook/src/emqx_exhook_schema.erl +++ b/apps/emqx_exhook/src/emqx_exhook_schema.erl @@ -63,7 +63,7 @@ fields(server) -> example => <<"http://127.0.0.1:9000">> })}, {request_timeout, - ?HOCON(emqx_schema:duration(), #{ + ?HOCON(emqx_schema:timeout_duration(), #{ default => <<"5s">>, desc => ?DESC(request_timeout) })}, @@ -74,7 +74,7 @@ fields(server) -> default => #{<<"keepalive">> => true, <<"nodelay">> => true} })}, {auto_reconnect, - ?HOCON(hoconsc:union([false, emqx_schema:duration()]), #{ + ?HOCON(hoconsc:union([false, emqx_schema:timeout_duration()]), #{ default => <<"60s">>, desc => ?DESC(auto_reconnect) })}, diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index 09e9ab0a5..37508fe3e 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -66,7 +66,7 @@ fields(file_transfer) -> )}, {init_timeout, mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ desc => ?DESC("init_timeout"), required => false, @@ -75,7 +75,7 @@ fields(file_transfer) -> )}, {store_segment_timeout, mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ desc => ?DESC("store_segment_timeout"), required => false, @@ -84,7 +84,7 @@ fields(file_transfer) -> )}, {assemble_timeout, mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ desc => ?DESC("assemble_timeout"), required => false, @@ -195,7 +195,7 @@ fields(local_storage_segments_gc) -> [ {interval, mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ desc => ?DESC("storage_gc_interval"), required => false, @@ -204,6 +204,7 @@ fields(local_storage_segments_gc) -> )}, {maximum_segments_ttl, mk( + %% not used in a `receive ... after' block, just timestamp comparison emqx_schema:duration_s(), #{ desc => ?DESC("storage_gc_max_segments_ttl"), @@ -213,6 +214,7 @@ fields(local_storage_segments_gc) -> )}, {minimum_segments_ttl, mk( + %% not used in a `receive ... after' block, just timestamp comparison emqx_schema:duration_s(), #{ desc => ?DESC("storage_gc_min_segments_ttl"), diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index bc44daca8..61f29059f 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -63,11 +63,11 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). paths() -> - emqx_gateway_utils:make_deprecated_paths([ + [ "/gateways", "/gateways/:name", "/gateways/:name/enable/:enable" - ]). + ]. %%-------------------------------------------------------------------- %% http handlers @@ -240,9 +240,7 @@ schema("/gateways/:name/enable/:enable") -> ) } } - }; -schema(Path) -> - emqx_gateway_utils:make_compatible_schema(Path, fun schema/1). + }. %%-------------------------------------------------------------------- %% params defines diff --git a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl index 41b1b11d5..01bfcdb53 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl @@ -61,11 +61,11 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). paths() -> - emqx_gateway_utils:make_deprecated_paths([ + [ "/gateways/:name/authentication", "/gateways/:name/authentication/users", "/gateways/:name/authentication/users/:uid" - ]). + ]. %%-------------------------------------------------------------------- %% http handlers @@ -318,9 +318,8 @@ schema("/gateways/:name/authentication/users/:uid") -> responses => ?STANDARD_RESP(#{204 => <<"User Deleted">>}) } - }; -schema(Path) -> - emqx_gateway_utils:make_compatible_schema(Path, fun schema/1). + }. + %%-------------------------------------------------------------------- %% params defines diff --git a/apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl b/apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl index 68f392923..5848e3d55 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl @@ -55,10 +55,10 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). paths() -> - emqx_gateway_utils:make_deprecated_paths([ + [ "/gateways/:name/authentication/import_users", "/gateways/:name/listeners/:id/authentication/import_users" - ]). + ]. %%-------------------------------------------------------------------- %% http handlers @@ -147,9 +147,7 @@ schema("/gateways/:name/listeners/:id/authentication/import_users") -> responses => ?STANDARD_RESP(#{204 => <<"Imported">>}) } - }; -schema(Path) -> - emqx_gateway_utils:make_compatible_schema(Path, fun schema/1). + }. %%-------------------------------------------------------------------- %% params defines diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index e052647fe..8037f4197 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -70,12 +70,12 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}). paths() -> - emqx_gateway_utils:make_deprecated_paths([ + [ "/gateways/:name/clients", "/gateways/:name/clients/:clientid", "/gateways/:name/clients/:clientid/subscriptions", "/gateways/:name/clients/:clientid/subscriptions/:topic" - ]). + ]. -define(CLIENT_QSCHEMA, [ {<<"node">>, atom}, @@ -541,9 +541,7 @@ schema("/gateways/:name/clients/:clientid/subscriptions/:topic") -> responses => ?STANDARD_RESP(#{204 => <<"Unsubscribed">>}) } - }; -schema(Path) -> - emqx_gateway_utils:make_compatible_schema(Path, fun schema/1). + }. params_client_query() -> params_gateway_name_in_path() ++ diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 2a6d59e35..8eea3c522 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -70,13 +70,13 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). paths() -> - emqx_gateway_utils:make_deprecated_paths([ + [ "/gateways/:name/listeners", "/gateways/:name/listeners/:id", "/gateways/:name/listeners/:id/authentication", "/gateways/:name/listeners/:id/authentication/users", "/gateways/:name/listeners/:id/authentication/users/:uid" - ]). + ]. %%-------------------------------------------------------------------- %% http handlers @@ -590,9 +590,7 @@ schema("/gateways/:name/listeners/:id/authentication/users/:uid") -> responses => ?STANDARD_RESP(#{204 => <<"Deleted">>}) } - }; -schema(Path) -> - emqx_gateway_utils:make_compatible_schema(Path, fun schema/1). + }. %%-------------------------------------------------------------------- %% params defines diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index ced9eff48..d41b3c93b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -45,8 +45,6 @@ is_running/2, global_chain/1, listener_chain/3, - make_deprecated_paths/1, - make_compatible_schema/2, find_gateway_definitions/0 ]). @@ -531,39 +529,6 @@ default_subopts() -> is_new => true }. -%% Since 5.0.8, the API path of the gateway has been changed from "gateway" to "gateways" -%% and we need to be compatible with the old path -get_compatible_path("/gateway") -> - "/gateways"; -get_compatible_path("/gateway/" ++ Rest) -> - "/gateways/" ++ Rest. - -get_deprecated_path("/gateways") -> - "/gateway"; -get_deprecated_path("/gateways/" ++ Rest) -> - "/gateway/" ++ Rest. - -make_deprecated_paths(Paths) -> - Paths ++ [get_deprecated_path(Path) || Path <- Paths]. - -make_compatible_schema(Path, SchemaFun) -> - OldPath = get_compatible_path(Path), - make_compatible_schema2(OldPath, SchemaFun). - -make_compatible_schema2(Path, SchemaFun) -> - Schema = SchemaFun(Path), - maps:map( - fun(Key, Value) -> - case lists:member(Key, [get, delete, put, post]) of - true -> - Value#{deprecated => true}; - _ -> - Value - end - end, - Schema - ). - -spec find_gateway_definitions() -> list(gateway_def()). find_gateway_definitions() -> lists:flatten( diff --git a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl index fb648062a..b2e5861af 100644 --- a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl @@ -107,9 +107,9 @@ t_gateway(_) -> StompGw ), {204, _} = request(put, "/gateways/stomp", #{enable => true}), - {200, #{enable := true}} = request(get, "/gateway/stomp"), + {200, #{enable := true}} = request(get, "/gateways/stomp"), {204, _} = request(put, "/gateways/stomp", #{enable => false}), - {200, #{enable := false}} = request(get, "/gateway/stomp"), + {200, #{enable := false}} = request(get, "/gateways/stomp"), {404, _} = request(put, "/gateways/undefined", #{}), {400, _} = request(put, "/gateways/stomp", #{bad_key => "foo"}), ok. @@ -121,27 +121,14 @@ t_gateway_fail(_) -> {400, _} = request(put, "/gateways/coap", #{}), ok. -t_deprecated_gateway(_) -> - {200, Gateways} = request(get, "/gateway"), - lists:foreach(fun assert_gw_unloaded/1, Gateways), - {404, NotFoundReq} = request(get, "/gateway/uname_gateway"), - assert_not_found(NotFoundReq), - {204, _} = request(put, "/gateway/stomp", #{}), - {200, StompGw} = request(get, "/gateway/stomp"), - assert_fields_exist( - [name, status, enable, created_at, started_at], - StompGw - ), - ok. - t_gateway_enable(_) -> {204, _} = request(put, "/gateways/stomp", #{}), - {200, #{enable := Enable}} = request(get, "/gateway/stomp"), + {200, #{enable := Enable}} = request(get, "/gateways/stomp"), NotEnable = not Enable, {204, _} = request(put, "/gateways/stomp/enable/" ++ atom_to_list(NotEnable), undefined), - {200, #{enable := NotEnable}} = request(get, "/gateway/stomp"), + {200, #{enable := NotEnable}} = request(get, "/gateways/stomp"), {204, _} = request(put, "/gateways/stomp/enable/" ++ atom_to_list(Enable), undefined), - {200, #{enable := Enable}} = request(get, "/gateway/stomp"), + {200, #{enable := Enable}} = request(get, "/gateways/stomp"), {404, _} = request(put, "/gateways/undefined/enable/true", undefined), {404, _} = request(put, "/gateways/not_a_known_atom/enable/true", undefined), {404, _} = request(put, "/gateways/coap/enable/true", undefined), diff --git a/apps/emqx_gateway_coap/src/emqx_coap_api.erl b/apps/emqx_gateway_coap/src/emqx_coap_api.erl index b4fce5473..f6872656d 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_api.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_api.erl @@ -46,7 +46,7 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}). paths() -> - emqx_gateway_utils:make_deprecated_paths([?PREFIX ++ "/request"]). + [?PREFIX ++ "/request"]. schema(?PREFIX ++ "/request") -> #{ @@ -65,9 +65,7 @@ schema(?PREFIX ++ "/request") -> ) } } - }; -schema(Path) -> - emqx_gateway_utils:make_compatible_schema(Path, fun schema/1). + }. request(post, #{body := Body, bindings := Bindings}) -> ClientId = maps:get(clientid, Bindings, undefined), @@ -107,7 +105,7 @@ request_body() -> [ {token, mk(binary(), #{desc => ?DESC(token)})}, {method, mk(enum([get, put, post, delete]), #{desc => ?DESC(method)})}, - {timeout, mk(emqx_schema:duration_ms(), #{desc => ?DESC(timeout)})}, + {timeout, mk(emqx_schema:timeout_duration_ms(), #{desc => ?DESC(timeout)})}, {content_type, mk( enum(['text/plain', 'application/json', 'application/octet-stream']), diff --git a/apps/emqx_gateway_coap/test/emqx_coap_api_SUITE.erl b/apps/emqx_gateway_coap/test/emqx_coap_api_SUITE.erl index cec09a016..6b7038eb8 100644 --- a/apps/emqx_gateway_coap/test/emqx_coap_api_SUITE.erl +++ b/apps/emqx_gateway_coap/test/emqx_coap_api_SUITE.erl @@ -97,8 +97,6 @@ t_send_request_api(_) -> ?assertEqual(Payload, RPayload) end, Test("gateways/coap/clients/client1/request"), - timer:sleep(100), - Test("gateway/coap/clients/client1/request"), erlang:exit(ClientId, kill), ok. diff --git a/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src b/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src index 83a707395..3a1e3fc62 100644 --- a/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src +++ b/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_lwm2m, [ {description, "LwM2M Gateway"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap]}, {env, []}, diff --git a/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_api.erl b/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_api.erl index ca32d03db..ab780537f 100644 --- a/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_api.erl +++ b/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_api.erl @@ -40,9 +40,9 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE). paths() -> - emqx_gateway_utils:make_deprecated_paths([ + [ ?PATH("/lookup"), ?PATH("/observe"), ?PATH("/read"), ?PATH("/write") - ]). + ]. schema(?PATH("/lookup")) -> #{ @@ -127,9 +127,7 @@ schema(?PATH("/write")) -> 404 => error_codes(['CLIENT_NOT_FOUND'], <<"Clientid not found">>) } } - }; -schema(Path) -> - emqx_gateway_utils:make_compatible_schema(Path, fun schema/1). + }. fields(resource) -> [ diff --git a/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_api_SUITE.erl b/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_api_SUITE.erl index 6fa46ebbc..14fe7f0ae 100644 --- a/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_api_SUITE.erl +++ b/apps/emqx_gateway_lwm2m/test/emqx_lwm2m_api_SUITE.erl @@ -306,7 +306,7 @@ t_observe(Config) -> test_recv_mqtt_response(RespTopic), %% step2, call observe API - ?assertMatch({204, []}, call_deprecated_send_api(Epn, "observe", "path=/3/0/1&enable=false")), + ?assertMatch({204, []}, call_send_api(Epn, "observe", "path=/3/0/1&enable=false")), timer:sleep(100), #coap_message{type = Type, method = Method, options = Opts} = test_recv_coap_request(UdpSock), ?assertEqual(con, Type), @@ -328,9 +328,6 @@ call_lookup_api(ClientId, Path, Action) -> call_send_api(ClientId, Cmd, Query) -> call_send_api(ClientId, Cmd, Query, "gateways/lwm2m/clients"). -call_deprecated_send_api(ClientId, Cmd, Query) -> - call_send_api(ClientId, Cmd, Query, "gateway/lwm2m/clients"). - call_send_api(ClientId, Cmd, Query, API) -> ApiPath = emqx_mgmt_api_test_util:api_path([API, ClientId, Cmd]), Auth = emqx_mgmt_api_test_util:auth_header_(), diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src index 69cf91f4c..453afb5cb 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src @@ -1,6 +1,6 @@ {application, emqx_node_rebalance, [ {description, "EMQX Node Rebalance"}, - {vsn, "5.0.1"}, + {vsn, "5.0.2"}, {registered, [ emqx_node_rebalance_sup, emqx_node_rebalance, diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl index 1f6328a63..d0526f5d5 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl @@ -356,7 +356,7 @@ fields(rebalance_start) -> [ {"wait_health_check", mk( - emqx_schema:duration_s(), + emqx_schema:timeout_duration_s(), #{ desc => ?DESC(wait_health_check), required => false @@ -414,7 +414,7 @@ fields(rebalance_start) -> )}, {"wait_takeover", mk( - emqx_schema:duration_s(), + emqx_schema:timeout_duration_s(), #{ desc => ?DESC(wait_takeover), required => false diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index ae2128a7e..0f543badd 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -351,6 +351,10 @@ to_bin(Bin) when is_binary(Bin) -> to_bin(Atom) when is_atom(Atom) -> erlang:atom_to_binary(Atom). +handle_result({error, {recoverable_error, _Error}} = Res) -> + Res; +handle_result({error, {unrecoverable_error, _Error}} = Res) -> + Res; handle_result({error, disconnected}) -> {error, {recoverable_error, disconnected}}; handle_result({error, Error}) -> @@ -359,6 +363,8 @@ handle_result({error, socket, closed} = Error) -> {error, {recoverable_error, Error}}; handle_result({error, Type, Reason}) -> {error, {unrecoverable_error, {Type, Reason}}}; +handle_result({ok, [{proc_result, RetCode, Reason}]}) when RetCode =/= 0 -> + {error, {unrecoverable_error, {RetCode, Reason}}}; handle_result(Res) -> Res. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl index 370582304..a33eaa5e7 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl @@ -48,7 +48,7 @@ fields("prometheus") -> )}, {interval, ?HOCON( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"15s">>, required => true, diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 37d7b1696..0dbc3067f 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -85,6 +85,7 @@ allocate_resource/3, has_allocated_resources/1, get_allocated_resources/1, + get_allocated_resources_list/1, forget_allocated_resources/1 ]). @@ -519,6 +520,10 @@ get_allocated_resources(InstanceId) -> Objects = ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId), maps:from_list([{K, V} || {_InstanceId, K, V} <- Objects]). +-spec get_allocated_resources_list(resource_id()) -> list(tuple()). +get_allocated_resources_list(InstanceId) -> + ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId). + -spec forget_allocated_resources(resource_id()) -> ok. forget_allocated_resources(InstanceId) -> true = ets:delete(?RESOURCE_ALLOCATION_TAB, InstanceId), diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 7f9886a5d..4b36f5b89 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -76,18 +76,18 @@ worker_pool_size(default) -> ?WORKER_POOL_SIZE; worker_pool_size(required) -> false; worker_pool_size(_) -> undefined. -resume_interval(type) -> emqx_schema:duration_ms(); +resume_interval(type) -> emqx_schema:timeout_duration_ms(); resume_interval(importance) -> ?IMPORTANCE_HIDDEN; resume_interval(desc) -> ?DESC("resume_interval"); resume_interval(required) -> false; resume_interval(_) -> undefined. -metrics_flush_interval(type) -> emqx_schema:duration_ms(); +metrics_flush_interval(type) -> emqx_schema:timeout_duration_ms(); metrics_flush_interval(importance) -> ?IMPORTANCE_HIDDEN; metrics_flush_interval(required) -> false; metrics_flush_interval(_) -> undefined. -health_check_interval(type) -> emqx_schema:duration_ms(); +health_check_interval(type) -> emqx_schema:timeout_duration_ms(); health_check_interval(desc) -> ?DESC("health_check_interval"); health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; health_check_interval(required) -> false; @@ -115,7 +115,7 @@ start_after_created(default) -> ?START_AFTER_CREATED_RAW; start_after_created(required) -> false; start_after_created(_) -> undefined. -start_timeout(type) -> emqx_schema:duration_ms(); +start_timeout(type) -> emqx_schema:timeout_duration_ms(); start_timeout(desc) -> ?DESC("start_timeout"); start_timeout(default) -> ?START_TIMEOUT_RAW; start_timeout(required) -> false; @@ -133,7 +133,7 @@ query_mode(default) -> async; query_mode(required) -> false; query_mode(_) -> undefined. -request_ttl(type) -> hoconsc:union([emqx_schema:duration_ms(), infinity]); +request_ttl(type) -> hoconsc:union([emqx_schema:timeout_duration_ms(), infinity]); request_ttl(aliases) -> [request_timeout]; request_ttl(desc) -> ?DESC("request_ttl"); request_ttl(default) -> ?DEFAULT_REQUEST_TTL_RAW; @@ -166,7 +166,7 @@ batch_size(default) -> ?DEFAULT_BATCH_SIZE; batch_size(required) -> false; batch_size(_) -> undefined. -batch_time(type) -> emqx_schema:duration_ms(); +batch_time(type) -> emqx_schema:timeout_duration_ms(); batch_time(desc) -> ?DESC("batch_time"); batch_time(default) -> ?DEFAULT_BATCH_TIME_RAW; batch_time(required) -> false; @@ -196,6 +196,10 @@ desc("creation_opts") -> ?DESC("creation_opts"). get_value_with_unit(Value) when is_integer(Value) -> <<(erlang:integer_to_binary(Value))/binary, "ms">>; +get_value_with_unit(Value) when is_list(Value) -> + %% Must ensure it's a binary, otherwise formatting the error + %% message will fail. + list_to_binary(Value); get_value_with_unit(Value) -> Value. diff --git a/apps/emqx_resource/test/emqx_resource_schema_tests.erl b/apps/emqx_resource/test/emqx_resource_schema_tests.erl new file mode 100644 index 000000000..676a42510 --- /dev/null +++ b/apps/emqx_resource/test/emqx_resource_schema_tests.erl @@ -0,0 +1,108 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_resource_schema_tests). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%%=========================================================================== +%% Test cases +%%=========================================================================== + +health_check_interval_validator_test_() -> + [ + ?_assertMatch( + #{<<"resource_opts">> := #{<<"health_check_interval">> := 150_000}}, + parse_and_check_webhook_bridge(webhook_bridge_health_check_hocon(<<"150s">>)) + ), + ?_assertMatch( + #{<<"resource_opts">> := #{<<"health_check_interval">> := 3_600_000}}, + parse_and_check_webhook_bridge(webhook_bridge_health_check_hocon(<<"3600000ms">>)) + ), + ?_assertThrow( + {_, [ + #{ + kind := validation_error, + reason := <<"Health Check Interval (3600001ms) is out of range", _/binary>>, + value := 3600001 + } + ]}, + parse_and_check_webhook_bridge(webhook_bridge_health_check_hocon(<<"3600001ms">>)) + ), + {"bad parse: negative number", + ?_assertThrow( + {_, [ + #{ + kind := validation_error, + reason := <<"Health Check Interval (-10ms) is out of range", _/binary>>, + value := "-10ms" + } + ]}, + parse_and_check_webhook_bridge(webhook_bridge_health_check_hocon(<<"-10ms">>)) + )}, + {"bad parse: underscores", + ?_assertThrow( + {_, [ + #{ + kind := validation_error, + reason := + <<"Health Check Interval (3_600_000ms) is out of range", _/binary>>, + value := "3_600_000ms" + } + ]}, + parse_and_check_webhook_bridge(webhook_bridge_health_check_hocon(<<"3_600_000ms">>)) + )}, + ?_assertThrow( + #{exception := "timeout value too large" ++ _}, + parse_and_check_webhook_bridge( + webhook_bridge_health_check_hocon(<<"150000000000000s">>) + ) + ) + ]. + +%%=========================================================================== +%% Helper functions +%%=========================================================================== + +parse_and_check_webhook_bridge(Hocon) -> + #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)), + Conf. + +parse(Hocon) -> + {ok, Conf} = hocon:binary(Hocon), + Conf. + +check(Conf) when is_map(Conf) -> + hocon_tconf:check_plain(emqx_bridge_schema, Conf). + +%%=========================================================================== +%% Data section +%%=========================================================================== + +%% erlfmt-ignore +webhook_bridge_health_check_hocon(HealthCheckInterval) -> +io_lib:format( +""" +bridges.webhook.simple { + url = \"http://localhost:4000\" + body = \"body\" + resource_opts { + health_check_interval = \"~s\" + } +} +""", +[HealthCheckInterval]). diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 823183cc3..c2cbf5135 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -45,13 +45,14 @@ fields("retainer") -> {enable, sc(boolean(), enable, true)}, {msg_expiry_interval, sc( + %% not used in a `receive ... after' block, just timestamp comparison emqx_schema:duration_ms(), msg_expiry_interval, <<"0s">> )}, {msg_clear_interval, sc( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), msg_clear_interval, <<"0s">> )}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index bc8cae07a..4831ccf0e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -216,7 +216,7 @@ rule_engine_settings() -> ?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})}, {jq_function_default_timeout, ?HOCON( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"10s">>, desc => ?DESC("rule_engine_jq_function_default_timeout") diff --git a/apps/emqx_s3/src/emqx_s3.app.src b/apps/emqx_s3/src/emqx_s3.app.src index 6d0518769..0599d7923 100644 --- a/apps/emqx_s3/src/emqx_s3.app.src +++ b/apps/emqx_s3/src/emqx_s3.app.src @@ -1,6 +1,6 @@ {application, emqx_s3, [ {description, "EMQX S3"}, - {vsn, "5.0.7"}, + {vsn, "5.0.8"}, {modules, []}, {registered, [emqx_s3_sup]}, {applications, [ diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl index f02364969..c2460e20d 100644 --- a/apps/emqx_s3/src/emqx_s3_schema.erl +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -68,6 +68,7 @@ fields(s3) -> )}, {url_expire_time, mk( + %% not used in a `receive ... after' block, just timestamp comparison emqx_schema:duration_s(), #{ default => "1h", diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.app.src b/apps/emqx_slow_subs/src/emqx_slow_subs.app.src index a06ff2595..922eed668 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.app.src +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.app.src @@ -1,7 +1,7 @@ {application, emqx_slow_subs, [ {description, "EMQX Slow Subscribers Statistics"}, % strict semver, bump manually! - {vsn, "1.0.5"}, + {vsn, "1.0.6"}, {modules, []}, {registered, [emqx_slow_subs_sup]}, {applications, [kernel, stdlib, emqx]}, diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl index 47ea18c3c..9edf0c799 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl @@ -30,12 +30,14 @@ fields("slow_subs") -> {enable, sc(boolean(), false, enable)}, {threshold, sc( + %% not used in a `receive ... after' block, just timestamp comparison emqx_schema:duration_ms(), <<"500ms">>, threshold )}, {expire_interval, sc( + %% not used in a `receive ... after' block, just timestamp comparison emqx_schema:duration_ms(), <<"300s">>, expire_interval diff --git a/changes/ce/feat-10909.en.md b/changes/ce/feat-10909.en.md new file mode 100644 index 000000000..5cce28c82 --- /dev/null +++ b/changes/ce/feat-10909.en.md @@ -0,0 +1 @@ +Remove the deprecated HTTP APIs for gateways diff --git a/changes/ce/fix-10930.en.md b/changes/ce/fix-10930.en.md new file mode 100644 index 000000000..a0f70fa56 --- /dev/null +++ b/changes/ce/fix-10930.en.md @@ -0,0 +1,3 @@ +Added a schema validation for values that might be used in timeouts to avoid invalid values. + +Before this fix, it was possible to use absurd values in the schema that would exceed the system limit, causing a crash. diff --git a/changes/ee/feat-10908.en.md b/changes/ee/feat-10908.en.md new file mode 100644 index 000000000..ee350226c --- /dev/null +++ b/changes/ee/feat-10908.en.md @@ -0,0 +1 @@ +Refactored the RocketMQ bridge to avoid leaking resources during crashes at creation. diff --git a/changes/ee/feat-10924.en.md b/changes/ee/feat-10924.en.md new file mode 100644 index 000000000..0fb8a330d --- /dev/null +++ b/changes/ee/feat-10924.en.md @@ -0,0 +1 @@ +Refactored influxdb bridge connector to avoid resource leaks during crashes at creation.