Merge remote-tracking branch 'origin/master' into 0529-donot-copy-cluster-conf-from-newer-version

This commit is contained in:
Zaiming (Stone) Shi 2023-06-05 22:56:58 +02:00
commit bb1cf9beaa
56 changed files with 542 additions and 208 deletions

View File

@ -187,7 +187,7 @@ fields(client_opts) ->
)},
{max_retry_time,
?HOCON(
emqx_schema:duration(),
emqx_schema:timeout_duration(),
#{
desc => ?DESC(max_retry_time),
default => <<"1h">>,

View File

@ -30,9 +30,19 @@
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("logger.hrl").
-define(MAX_INT_TIMEOUT_MS, 4294967295).
%% floor(?MAX_INT_TIMEOUT_MS / 1000).
-define(MAX_INT_TIMEOUT_S, 4294967).
-type duration() :: integer().
-type duration_s() :: integer().
-type duration_ms() :: integer().
%% ?MAX_INT_TIMEOUT is defined loosely in some OTP modules like
%% `erpc', `rpc' `gen' and `peer', despite affecting `receive' blocks
%% as well. It's `2^32 - 1'.
-type timeout_duration() :: 0..?MAX_INT_TIMEOUT_MS.
-type timeout_duration_s() :: 0..?MAX_INT_TIMEOUT_S.
-type timeout_duration_ms() :: 0..?MAX_INT_TIMEOUT_MS.
-type bytesize() :: integer().
-type wordsize() :: bytesize().
-type percent() :: float().
@ -56,6 +66,9 @@
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
-typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}).
-typerefl_from_string({timeout_duration/0, emqx_schema, to_timeout_duration}).
-typerefl_from_string({timeout_duration_s/0, emqx_schema, to_timeout_duration_s}).
-typerefl_from_string({timeout_duration_ms/0, emqx_schema, to_timeout_duration_ms}).
-typerefl_from_string({bytesize/0, emqx_schema, to_bytesize}).
-typerefl_from_string({wordsize/0, emqx_schema, to_wordsize}).
-typerefl_from_string({percent/0, emqx_schema, to_percent}).
@ -91,6 +104,9 @@
to_duration/1,
to_duration_s/1,
to_duration_ms/1,
to_timeout_duration/1,
to_timeout_duration_s/1,
to_timeout_duration_ms/1,
mk_duration/2,
to_bytesize/1,
to_wordsize/1,
@ -127,6 +143,9 @@
duration/0,
duration_s/0,
duration_ms/0,
timeout_duration/0,
timeout_duration_s/0,
timeout_duration_ms/0,
bytesize/0,
wordsize/0,
percent/0,
@ -1037,7 +1056,7 @@ fields("mqtt_quic_listener") ->
)},
{"idle_timeout",
sc(
duration_ms(),
timeout_duration_ms(),
#{
default => 0,
desc => ?DESC(fields_mqtt_quic_listener_idle_timeout),
@ -1054,7 +1073,7 @@ fields("mqtt_quic_listener") ->
)},
{"handshake_idle_timeout",
sc(
duration_ms(),
timeout_duration_ms(),
#{
default => <<"10s">>,
desc => ?DESC(fields_mqtt_quic_listener_handshake_idle_timeout),
@ -1071,7 +1090,7 @@ fields("mqtt_quic_listener") ->
)},
{"keep_alive_interval",
sc(
duration_ms(),
timeout_duration_ms(),
#{
default => 0,
desc => ?DESC(fields_mqtt_quic_listener_keep_alive_interval),
@ -2637,6 +2656,37 @@ to_duration_ms(Str) ->
_ -> {error, Str}
end.
-spec to_timeout_duration(Input) -> {ok, timeout_duration()} | {error, Input} when
Input :: string() | binary().
to_timeout_duration(Str) ->
do_to_timeout_duration(Str, fun to_duration/1, ?MAX_INT_TIMEOUT_MS, "ms").
-spec to_timeout_duration_ms(Input) -> {ok, timeout_duration_ms()} | {error, Input} when
Input :: string() | binary().
to_timeout_duration_ms(Str) ->
do_to_timeout_duration(Str, fun to_duration_ms/1, ?MAX_INT_TIMEOUT_MS, "ms").
-spec to_timeout_duration_s(Input) -> {ok, timeout_duration_s()} | {error, Input} when
Input :: string() | binary().
to_timeout_duration_s(Str) ->
do_to_timeout_duration(Str, fun to_duration_s/1, ?MAX_INT_TIMEOUT_S, "s").
do_to_timeout_duration(Str, Fn, Max, Unit) ->
case Fn(Str) of
{ok, I} ->
case I =< Max of
true ->
{ok, I};
false ->
Msg = lists:flatten(
io_lib:format("timeout value too large (max: ~b ~s)", [Max, Unit])
),
throw(Msg)
end;
Err ->
Err
end.
to_bytesize(Str) ->
case hocon_postprocess:bytesize(Str) of
I when is_integer(I) -> {ok, I};

View File

@ -45,7 +45,9 @@
limited_atom/0,
limited_latin_atom/0,
printable_utf8/0,
printable_codepoint/0
printable_codepoint/0,
raw_duration/0,
large_raw_duration/0
]).
%% Generic Types
@ -629,6 +631,20 @@ printable_codepoint() ->
{1, range(16#E000, 16#FFFD)}
]).
raw_duration() ->
?LET(
{Value, Unit},
{pos_integer(), oneof([<<"d">>, <<"h">>, <<"m">>, <<"s">>, <<"ms">>])},
<<(integer_to_binary(Value))/binary, Unit/binary>>
).
large_raw_duration() ->
?LET(
{Value, Unit},
{range(1_000_000, inf), oneof([<<"d">>, <<"h">>, <<"m">>])},
<<(integer_to_binary(Value))/binary, Unit/binary>>
).
%%--------------------------------------------------------------------
%% Iterators
%%--------------------------------------------------------------------

View File

@ -809,3 +809,31 @@ set_envs([{_Name, _Value} | _] = Envs) ->
unset_envs([{_Name, _Value} | _] = Envs) ->
lists:map(fun({Name, _}) -> os:unsetenv(Name) end, Envs).
timeout_types_test_() ->
[
?_assertEqual(
{ok, 4294967295},
typerefl:from_string(emqx_schema:timeout_duration(), <<"4294967295ms">>)
),
?_assertEqual(
{ok, 4294967295},
typerefl:from_string(emqx_schema:timeout_duration_ms(), <<"4294967295ms">>)
),
?_assertEqual(
{ok, 4294967},
typerefl:from_string(emqx_schema:timeout_duration_s(), <<"4294967000ms">>)
),
?_assertThrow(
"timeout value too large (max: 4294967295 ms)",
typerefl:from_string(emqx_schema:timeout_duration(), <<"4294967296ms">>)
),
?_assertThrow(
"timeout value too large (max: 4294967295 ms)",
typerefl:from_string(emqx_schema:timeout_duration_ms(), <<"4294967296ms">>)
),
?_assertThrow(
"timeout value too large (max: 4294967 s)",
typerefl:from_string(emqx_schema:timeout_duration_s(), <<"4294967001ms">>)
)
].

View File

@ -0,0 +1,99 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(prop_emqx_schema).
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(MAX_INT_TIMEOUT_MS, 4294967295).
%%--------------------------------------------------------------------
%% Helper fns
%%--------------------------------------------------------------------
parse(Value, Type) ->
typerefl:from_string(Type, Value).
timeout_within_bounds(RawDuration) ->
case emqx_schema:to_duration_ms(RawDuration) of
{ok, I} when I =< ?MAX_INT_TIMEOUT_MS ->
true;
_ ->
false
end.
parses_the_same(Value, Type1, Type2) ->
parse(Value, Type1) =:= parse(Value, Type2).
%%--------------------------------------------------------------------
%% Properties
%%--------------------------------------------------------------------
prop_timeout_duration_refines_duration() ->
?FORALL(
RawDuration,
emqx_proper_types:raw_duration(),
?IMPLIES(
timeout_within_bounds(RawDuration),
parses_the_same(RawDuration, emqx_schema:duration(), emqx_schema:timeout_duration())
)
).
prop_timeout_duration_ms_refines_duration_ms() ->
?FORALL(
RawDuration,
emqx_proper_types:raw_duration(),
?IMPLIES(
timeout_within_bounds(RawDuration),
parses_the_same(
RawDuration, emqx_schema:duration_ms(), emqx_schema:timeout_duration_ms()
)
)
).
prop_timeout_duration_s_refines_duration_s() ->
?FORALL(
RawDuration,
emqx_proper_types:raw_duration(),
?IMPLIES(
timeout_within_bounds(RawDuration),
parses_the_same(RawDuration, emqx_schema:duration_s(), emqx_schema:timeout_duration_s())
)
).
prop_timeout_duration_is_valid_for_receive_after() ->
?FORALL(
RawDuration,
emqx_proper_types:large_raw_duration(),
?IMPLIES(
not timeout_within_bounds(RawDuration),
begin
%% we have to use the the non-strict version, because it's invalid
{ok, Timeout} = parse(RawDuration, emqx_schema:duration()),
Ref = make_ref(),
timer:send_after(20, {Ref, ok}),
?assertError(
timeout_value,
receive
{Ref, ok} -> error(should_be_invalid)
after Timeout -> error(should_be_invalid)
end
),
true
end
)
).

View File

@ -93,7 +93,7 @@ fields(config) ->
)},
{connect_timeout,
hoconsc:mk(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
default => <<"15s">>,
desc => ?DESC("connect_timeout")

View File

@ -47,7 +47,7 @@ fields(bridge_config) ->
[
{connect_timeout,
sc(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
default => <<"15s">>,
desc => ?DESC("connect_timeout")
@ -80,7 +80,7 @@ fields(bridge_config) ->
)},
{request_timeout,
sc(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
required => false,
deprecated => {since, "e5.0.1"},

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_influxdb, [
{description, "EMQX Enterprise InfluxDB Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [kernel, stdlib, influxdb]},
{env, []},

View File

@ -39,6 +39,9 @@
-type ts_precision() :: ns | us | ms | s.
%% Allocatable resources
-define(influx_client, influx_client).
-define(INFLUXDB_DEFAULT_PORT, 8086).
%% influxdb servers don't need parse
@ -53,10 +56,20 @@
callback_mode() -> async_if_possible.
on_start(InstId, Config) ->
%% InstID as pool would be handled by influxdb client
%% so there is no need to allocate pool_name here
%% ehttpc for influxdb-v1/v2,
%% ecpool for influxdb-udp
%% See: influxdb:start_client/1
start_client(InstId, Config).
on_stop(_InstId, #{client := Client}) ->
influxdb:stop_client(Client).
on_stop(InstId, _State) ->
case emqx_resource:get_allocated_resources(InstId) of
#{?influx_client := Client} ->
influxdb:stop_client(Client);
_ ->
ok
end.
on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
case data_to_points(Data, SyntaxLines) of
@ -220,8 +233,12 @@ start_client(InstId, Config) ->
config => emqx_utils:redact(Config),
client_config => emqx_utils:redact(ClientConfig)
}),
try
do_start_client(InstId, ClientConfig, Config)
try do_start_client(InstId, ClientConfig, Config) of
Res = {ok, #{client := Client}} ->
ok = emqx_resource:allocate_resource(InstId, ?influx_client, Client),
Res;
{error, Reason} ->
{error, Reason}
catch
E:R:S ->
?tp(influxdb_connector_start_exception, #{error => {E, R}}),

View File

@ -144,7 +144,7 @@ request_config() ->
)},
{request_timeout,
mk(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
default => <<"15s">>,
desc => ?DESC("config_request_timeout")

View File

@ -165,20 +165,20 @@ fields("config") ->
}
)},
{connect_timeout,
mk(emqx_schema:duration_ms(), #{
mk(emqx_schema:timeout_duration_ms(), #{
default => <<"5s">>,
desc => ?DESC(connect_timeout)
})},
{min_metadata_refresh_interval,
mk(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
default => <<"3s">>,
desc => ?DESC(min_metadata_refresh_interval)
}
)},
{metadata_request_timeout,
mk(emqx_schema:duration_ms(), #{
mk(emqx_schema:timeout_duration_ms(), #{
default => <<"5s">>,
desc => ?DESC(metadata_request_timeout)
})},

View File

@ -165,19 +165,32 @@ sql_create_table() ->
"CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))".
sql_drop_table() ->
"DROP TABLE mqtt_test".
"BEGIN\n"
" EXECUTE IMMEDIATE 'DROP TABLE mqtt_test';\n"
" EXCEPTION\n"
" WHEN OTHERS THEN\n"
" IF SQLCODE = -942 THEN\n"
" NULL;\n"
" ELSE\n"
" RAISE;\n"
" END IF;\n"
" END;".
sql_check_table_exist() ->
"SELECT COUNT(*) FROM user_tables WHERE table_name = 'MQTT_TEST'".
reset_table(Config) ->
ResourceId = resource_id(Config),
_ = emqx_resource:simple_sync_query(ResourceId, {sql, sql_drop_table()}),
drop_table_if_exists(Config),
{ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query(
ResourceId, {sql, sql_create_table()}
),
ok.
drop_table(Config) ->
drop_table_if_exists(Config) ->
ResourceId = resource_id(Config),
emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}),
{ok, [{proc_result, 0, _}]} =
emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}),
ok.
oracle_config(TestCase, _ConnectionType, Config) ->
@ -392,6 +405,12 @@ t_batch_sync_query(Config) ->
emqx_bridge:send_message(BridgeId, Params),
ok
end),
% Wait for reconnection.
?retry(
_Sleep = 1_000,
_Attempts = 30,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
?retry(
_Sleep = 1_000,
_Attempts = 30,
@ -527,3 +546,32 @@ t_no_sid_nor_service_name(Config0) ->
create_bridge(Config)
),
ok.
t_table_removed(Config) ->
ResourceId = resource_id(Config),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
drop_table_if_exists(Config),
MsgId = erlang:unique_integer(),
Params = #{
topic => ?config(mqtt_topic, Config),
id => MsgId,
payload => ?config(oracle_name, Config),
retain => true
},
Message = {send_message, Params},
?assertEqual(
{error, {unrecoverable_error, {942, "ORA-00942: table or view does not exist\n"}}},
emqx_resource:simple_sync_query(ResourceId, Message)
),
ok
end,
[]
),
ok.

View File

@ -62,7 +62,7 @@ fields(config) ->
)},
{connect_timeout,
mk(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
default => <<"5s">>,
desc => ?DESC("connect_timeout")
@ -86,11 +86,12 @@ fields(producer_opts) ->
default => <<"1MB">>, desc => ?DESC("producer_send_buffer")
})},
{sync_timeout,
mk(emqx_schema:duration_ms(), #{
mk(emqx_schema:timeout_duration_ms(), #{
default => <<"3s">>, desc => ?DESC("producer_sync_timeout")
})},
{retention_period,
mk(
%% not used in a `receive ... after' block, just timestamp comparison
hoconsc:union([infinity, emqx_schema:duration_ms()]),
#{default => infinity, desc => ?DESC("producer_retention_period")}
)},

View File

@ -84,7 +84,7 @@ fields(config) ->
)},
{timeout,
hoconsc:mk(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
default => <<"5s">>,
desc => ?DESC("timeout")
@ -100,7 +100,7 @@ fields(config) ->
)},
{publish_confirmation_timeout,
hoconsc:mk(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
default => <<"30s">>,
desc => ?DESC("timeout")
@ -117,7 +117,7 @@ fields(config) ->
)},
{heartbeat,
hoconsc:mk(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
default => <<"30s">>,
desc => ?DESC("heartbeat")

View File

@ -58,12 +58,12 @@ fields(config) ->
mk(binary(), #{default => <<>>, desc => ?DESC(security_token), sensitive => true})},
{sync_timeout,
mk(
emqx_schema:duration(),
emqx_schema:timeout_duration(),
#{default => <<"3s">>, desc => ?DESC(sync_timeout)}
)},
{refresh_interval,
mk(
emqx_schema:duration(),
emqx_schema:timeout_duration(),
#{default => <<"3s">>, desc => ?DESC(refresh_interval)}
)},
{send_buffer,
@ -102,22 +102,23 @@ on_start(
emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS)
),
ClientId = client_id(InstanceId),
TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic),
#{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config),
ClientCfg = #{acl_info => AclInfo},
Templates = parse_template(Config),
ProducersMapPID = create_producers_map(ClientId),
State = #{
client_id => ClientId,
topic => Topic,
topic_tokens => TopicTks,
sync_timeout => SyncTimeout,
templates => Templates,
producers_map_pid => ProducersMapPID,
producers_opts => ProducerOpts
},
ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId),
create_producers_map(ClientId),
case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of
{ok, _Pid} ->
{ok, State};
@ -130,23 +131,22 @@ on_start(
{error, Reason}
end.
on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) ->
on_stop(InstanceId, _State) ->
?SLOG(info, #{
msg => "stopping_rocketmq_connector",
connector => InstanceId
}),
Producers = ets:match(ClientId, {{RawTopic, '$1'}, '$2'}),
lists:foreach(
fun([Topic, Producer]) ->
ets:delete(ClientId, {RawTopic, Topic}),
_ = rocketmq:stop_and_delete_supervised_producers(Producer)
fun
({_, client_id, ClientId}) ->
destory_producers_map(ClientId),
ok = rocketmq:stop_and_delete_supervised_client(ClientId);
({_, _Topic, Producer}) ->
_ = rocketmq:stop_and_delete_supervised_producers(Producer)
end,
Producers
),
Pid ! ok,
ok = rocketmq:stop_and_delete_supervised_client(ClientId).
emqx_resource:get_allocated_resources_list(InstanceId)
).
on_query(InstanceId, Query, State) ->
do_query(InstanceId, Query, send_sync, State).
@ -179,7 +179,6 @@ do_query(
#{
templates := Templates,
client_id := ClientId,
topic := RawTopic,
topic_tokens := TopicTks,
producers_opts := ProducerOpts,
sync_timeout := RequestTimeout
@ -191,7 +190,7 @@ do_query(
#{connector => InstanceId, query => Query, state => State}
),
TopicKey = get_topic_key(Query, RawTopic, TopicTks),
TopicKey = get_topic_key(Query, TopicTks),
Data = apply_template(Query, Templates),
Result = safe_do_produce(
@ -220,7 +219,7 @@ do_query(
safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) ->
try
Producers = get_producers(ClientId, TopicKey, ProducerOpts),
Producers = get_producers(InstanceId, ClientId, TopicKey, ProducerOpts),
produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout)
catch
_Type:Reason ->
@ -249,10 +248,10 @@ parse_template([{Key, H} | T], Templates) ->
parse_template([], Templates) ->
Templates.
get_topic_key({_, Msg}, RawTopic, TopicTks) ->
{RawTopic, emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg)};
get_topic_key([Query | _], RawTopic, TopicTks) ->
get_topic_key(Query, RawTopic, TopicTks).
get_topic_key({_, Msg}, TopicTks) ->
emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg);
get_topic_key([Query | _], TopicTks) ->
get_topic_key(Query, TopicTks).
apply_template({Key, Msg} = _Req, Templates) ->
case maps:get(Key, Templates, undefined) of
@ -317,29 +316,29 @@ acl_info(_, _, _) ->
#{}.
create_producers_map(ClientId) ->
erlang:spawn(fun() ->
case ets:whereis(ClientId) of
undefined ->
_ = ets:new(ClientId, [public, named_table]),
ok;
_ ->
ok
end,
receive
_Msg ->
ok
end
end).
_ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]),
ok.
get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) ->
case ets:lookup(ClientId, TopicKey) of
[{_, Producers0}] ->
Producers0;
_ ->
ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic1]),
{ok, Producers0} = rocketmq:ensure_supervised_producers(
ClientId, ProducerGroup, Topic1, ProducerOpts
),
ets:insert(ClientId, {TopicKey, Producers0}),
Producers0
%% The resource manager will not terminate when restarting a resource,
%% so manually destroying the ets table is necessary.
destory_producers_map(ClientId) ->
case ets:whereis(ClientId) of
undefined ->
ok;
Tid ->
ets:delete(Tid)
end.
get_producers(InstanceId, ClientId, Topic, ProducerOpts) ->
case ets:lookup(ClientId, Topic) of
[{_, Producers}] ->
Producers;
_ ->
ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic]),
{ok, Producers} = rocketmq:ensure_supervised_producers(
ClientId, ProducerGroup, Topic, ProducerOpts
),
ok = emqx_resource:allocate_resource(InstanceId, Topic, Producers),
ets:insert(ClientId, {Topic, Producers}),
Producers
end.

View File

@ -528,7 +528,7 @@ fields("node") ->
)},
{"crash_dump_seconds",
sc(
emqx_schema:duration_s(),
emqx_schema:timeout_duration_s(),
#{
mapping => "vm_args.-env ERL_CRASH_DUMP_SECONDS",
default => <<"30s">>,
@ -550,7 +550,7 @@ fields("node") ->
)},
{"dist_net_ticktime",
sc(
emqx_schema:duration_s(),
emqx_schema:timeout_duration_s(),
#{
mapping => "vm_args.-kernel net_ticktime",
default => <<"2m">>,
@ -821,7 +821,7 @@ fields("rpc") ->
)},
{"socket_keepalive_idle",
sc(
emqx_schema:duration_s(),
emqx_schema:timeout_duration_s(),
#{
mapping => "gen_rpc.socket_keepalive_idle",
default => <<"15m">>,
@ -830,7 +830,7 @@ fields("rpc") ->
)},
{"socket_keepalive_interval",
sc(
emqx_schema:duration_s(),
emqx_schema:timeout_duration_s(),
#{
mapping => "gen_rpc.socket_keepalive_interval",
default => <<"75s">>,
@ -972,7 +972,7 @@ fields("log_overload_kill") ->
)},
{"restart_after",
sc(
hoconsc:union([emqx_schema:duration_ms(), infinity]),
hoconsc:union([emqx_schema:timeout_duration_ms(), infinity]),
#{
default => <<"5s">>,
desc => ?DESC("log_overload_kill_restart_after")

View File

@ -67,7 +67,7 @@ fields(config) ->
[
{connect_timeout,
sc(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
default => <<"15s">>,
desc => ?DESC("connect_timeout")
@ -80,7 +80,7 @@ fields(config) ->
)},
{retry_interval,
sc(
emqx_schema:duration(),
emqx_schema:timeout_duration(),
#{deprecated => {since, "5.0.4"}}
)},
{pool_type,
@ -138,7 +138,7 @@ fields("request") ->
)},
{request_timeout,
sc(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
required => false,
desc => ?DESC("request_timeout")

View File

@ -201,5 +201,5 @@ port(type) -> integer();
port(default) -> 389;
port(_) -> undefined.
duration(type) -> emqx_schema:duration_ms();
duration(type) -> emqx_schema:timeout_duration_ms();
duration(_) -> undefined.

View File

@ -108,7 +108,7 @@ fields(topology) ->
{wait_queue_timeout_ms, duration("wait_queue_timeout")},
{heartbeat_frequency_ms,
hoconsc:mk(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
default => <<"200s">>,
desc => ?DESC("heartbeat_period")
@ -422,7 +422,7 @@ r_mode(_) -> undefined.
duration(Desc) ->
#{
type => emqx_schema:duration_ms(),
type => emqx_schema:timeout_duration_ms(),
required => false,
desc => ?DESC(Desc)
}.

View File

@ -38,7 +38,7 @@ fields("dashboard") ->
{default_password, fun default_password/1},
{sample_interval,
?HOCON(
emqx_schema:duration_s(),
emqx_schema:timeout_duration_s(),
#{
default => <<"10s">>,
desc => ?DESC(sample_interval),

View File

@ -819,6 +819,12 @@ typename_to_spec("duration_s()", _Mod) ->
#{type => string, example => <<"1h">>};
typename_to_spec("duration_ms()", _Mod) ->
#{type => string, example => <<"32s">>};
typename_to_spec("timeout_duration()", _Mod) ->
#{type => string, example => <<"12m">>};
typename_to_spec("timeout_duration_s()", _Mod) ->
#{type => string, example => <<"1h">>};
typename_to_spec("timeout_duration_ms()", _Mod) ->
#{type => string, example => <<"32s">>};
typename_to_spec("percent()", _Mod) ->
#{type => number, example => <<"12%">>};
typename_to_spec("file()", _Mod) ->

View File

@ -32,7 +32,7 @@ fields("root") ->
)},
{default_username, fun default_username/1},
{default_password, fun default_password/1},
{sample_interval, mk(emqx_schema:duration_s(), #{default => <<"10s">>})},
{sample_interval, mk(emqx_schema:timeout_duration_s(), #{default => <<"10s">>})},
{token_expired_time, mk(emqx_schema:duration(), #{default => <<"30m">>})}
];
fields("ref1") ->

View File

@ -751,7 +751,7 @@ schema("/object") ->
{per_page, mk(range(1, 100), #{required => true, desc => <<"good per page desc">>})},
{timeout,
mk(
hoconsc:union([infinity, emqx_schema:duration_s()]),
hoconsc:union([infinity, emqx_schema:timeout_duration_s()]),
#{default => 5, required => true}
)},
{inner_ref, mk(hoconsc:ref(?MODULE, good_ref), #{})}
@ -761,7 +761,7 @@ schema("/nest/object") ->
{per_page, mk(range(1, 100), #{desc => <<"good per page desc">>})},
{timeout,
mk(
hoconsc:union([infinity, emqx_schema:duration_s()]),
hoconsc:union([infinity, emqx_schema:timeout_duration_s()]),
#{default => 5, required => true}
)},
{nest_object, [
@ -785,7 +785,7 @@ schema("/ref/array/with/key") ->
{per_page, mk(range(1, 100), #{desc => <<"good per page desc">>})},
{timeout,
mk(
hoconsc:union([infinity, emqx_schema:duration_s()]),
hoconsc:union([infinity, emqx_schema:timeout_duration_s()]),
#{default => 5, required => true}
)},
{array_refs, mk(hoconsc:array(hoconsc:ref(?MODULE, good_ref)), #{})}

View File

@ -573,7 +573,7 @@ schema("/object") ->
{per_page, mk(range(1, 100), #{required => true, desc => <<"good per page desc">>})},
{timeout,
mk(
hoconsc:union([infinity, emqx_schema:duration_s()]),
hoconsc:union([infinity, emqx_schema:timeout_duration_s()]),
#{default => 5, required => true}
)},
{inner_ref, mk(hoconsc:ref(?MODULE, good_ref), #{})}
@ -584,7 +584,7 @@ schema("/nest/object") ->
{per_page, mk(range(1, 100), #{desc => <<"good per page desc">>})},
{timeout,
mk(
hoconsc:union([infinity, emqx_schema:duration_s()]),
hoconsc:union([infinity, emqx_schema:timeout_duration_s()]),
#{default => 5, required => true}
)},
{nest_object, [
@ -613,13 +613,14 @@ schema("/ref/array/with/key") ->
{per_page, mk(range(1, 100), #{desc => <<"good per page desc">>})},
{timeout,
mk(
hoconsc:union([infinity, emqx_schema:duration_s()]),
hoconsc:union([infinity, emqx_schema:timeout_duration_s()]),
#{default => 5, required => true}
)},
{assert, mk(float(), #{desc => <<"money">>})},
{number_ex, mk(number(), #{desc => <<"number example">>})},
{percent_ex, mk(emqx_schema:percent(), #{desc => <<"percent example">>})},
{duration_ms_ex, mk(emqx_schema:duration_ms(), #{desc => <<"duration ms example">>})},
{duration_ms_ex,
mk(emqx_schema:timeout_duration_ms(), #{desc => <<"duration ms example">>})},
{atom_ex, mk(atom(), #{desc => <<"atom ex">>})},
{array_refs, mk(hoconsc:array(hoconsc:ref(?MODULE, good_ref)), #{})}
]);

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_exhook, [
{description, "EMQX Extension for Hook"},
{vsn, "5.0.12"},
{vsn, "5.0.13"},
{modules, []},
{registered, []},
{mod, {emqx_exhook_app, []}},

View File

@ -63,7 +63,7 @@ fields(server) ->
example => <<"http://127.0.0.1:9000">>
})},
{request_timeout,
?HOCON(emqx_schema:duration(), #{
?HOCON(emqx_schema:timeout_duration(), #{
default => <<"5s">>,
desc => ?DESC(request_timeout)
})},
@ -74,7 +74,7 @@ fields(server) ->
default => #{<<"keepalive">> => true, <<"nodelay">> => true}
})},
{auto_reconnect,
?HOCON(hoconsc:union([false, emqx_schema:duration()]), #{
?HOCON(hoconsc:union([false, emqx_schema:timeout_duration()]), #{
default => <<"60s">>,
desc => ?DESC(auto_reconnect)
})},

View File

@ -66,7 +66,7 @@ fields(file_transfer) ->
)},
{init_timeout,
mk(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
desc => ?DESC("init_timeout"),
required => false,
@ -75,7 +75,7 @@ fields(file_transfer) ->
)},
{store_segment_timeout,
mk(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
desc => ?DESC("store_segment_timeout"),
required => false,
@ -84,7 +84,7 @@ fields(file_transfer) ->
)},
{assemble_timeout,
mk(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
desc => ?DESC("assemble_timeout"),
required => false,
@ -195,7 +195,7 @@ fields(local_storage_segments_gc) ->
[
{interval,
mk(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
desc => ?DESC("storage_gc_interval"),
required => false,
@ -204,6 +204,7 @@ fields(local_storage_segments_gc) ->
)},
{maximum_segments_ttl,
mk(
%% not used in a `receive ... after' block, just timestamp comparison
emqx_schema:duration_s(),
#{
desc => ?DESC("storage_gc_max_segments_ttl"),
@ -213,6 +214,7 @@ fields(local_storage_segments_gc) ->
)},
{minimum_segments_ttl,
mk(
%% not used in a `receive ... after' block, just timestamp comparison
emqx_schema:duration_s(),
#{
desc => ?DESC("storage_gc_min_segments_ttl"),

View File

@ -63,11 +63,11 @@ api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
emqx_gateway_utils:make_deprecated_paths([
[
"/gateways",
"/gateways/:name",
"/gateways/:name/enable/:enable"
]).
].
%%--------------------------------------------------------------------
%% http handlers
@ -240,9 +240,7 @@ schema("/gateways/:name/enable/:enable") ->
)
}
}
};
schema(Path) ->
emqx_gateway_utils:make_compatible_schema(Path, fun schema/1).
}.
%%--------------------------------------------------------------------
%% params defines

View File

@ -61,11 +61,11 @@ api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
emqx_gateway_utils:make_deprecated_paths([
[
"/gateways/:name/authentication",
"/gateways/:name/authentication/users",
"/gateways/:name/authentication/users/:uid"
]).
].
%%--------------------------------------------------------------------
%% http handlers
@ -318,9 +318,8 @@ schema("/gateways/:name/authentication/users/:uid") ->
responses =>
?STANDARD_RESP(#{204 => <<"User Deleted">>})
}
};
schema(Path) ->
emqx_gateway_utils:make_compatible_schema(Path, fun schema/1).
}.
%%--------------------------------------------------------------------
%% params defines

View File

@ -55,10 +55,10 @@ api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
paths() ->
emqx_gateway_utils:make_deprecated_paths([
[
"/gateways/:name/authentication/import_users",
"/gateways/:name/listeners/:id/authentication/import_users"
]).
].
%%--------------------------------------------------------------------
%% http handlers
@ -147,9 +147,7 @@ schema("/gateways/:name/listeners/:id/authentication/import_users") ->
responses =>
?STANDARD_RESP(#{204 => <<"Imported">>})
}
};
schema(Path) ->
emqx_gateway_utils:make_compatible_schema(Path, fun schema/1).
}.
%%--------------------------------------------------------------------
%% params defines

View File

@ -70,12 +70,12 @@ api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
paths() ->
emqx_gateway_utils:make_deprecated_paths([
[
"/gateways/:name/clients",
"/gateways/:name/clients/:clientid",
"/gateways/:name/clients/:clientid/subscriptions",
"/gateways/:name/clients/:clientid/subscriptions/:topic"
]).
].
-define(CLIENT_QSCHEMA, [
{<<"node">>, atom},
@ -541,9 +541,7 @@ schema("/gateways/:name/clients/:clientid/subscriptions/:topic") ->
responses =>
?STANDARD_RESP(#{204 => <<"Unsubscribed">>})
}
};
schema(Path) ->
emqx_gateway_utils:make_compatible_schema(Path, fun schema/1).
}.
params_client_query() ->
params_gateway_name_in_path() ++

View File

@ -70,13 +70,13 @@ api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
emqx_gateway_utils:make_deprecated_paths([
[
"/gateways/:name/listeners",
"/gateways/:name/listeners/:id",
"/gateways/:name/listeners/:id/authentication",
"/gateways/:name/listeners/:id/authentication/users",
"/gateways/:name/listeners/:id/authentication/users/:uid"
]).
].
%%--------------------------------------------------------------------
%% http handlers
@ -590,9 +590,7 @@ schema("/gateways/:name/listeners/:id/authentication/users/:uid") ->
responses =>
?STANDARD_RESP(#{204 => <<"Deleted">>})
}
};
schema(Path) ->
emqx_gateway_utils:make_compatible_schema(Path, fun schema/1).
}.
%%--------------------------------------------------------------------
%% params defines

View File

@ -45,8 +45,6 @@
is_running/2,
global_chain/1,
listener_chain/3,
make_deprecated_paths/1,
make_compatible_schema/2,
find_gateway_definitions/0
]).
@ -531,39 +529,6 @@ default_subopts() ->
is_new => true
}.
%% Since 5.0.8, the API path of the gateway has been changed from "gateway" to "gateways"
%% and we need to be compatible with the old path
get_compatible_path("/gateway") ->
"/gateways";
get_compatible_path("/gateway/" ++ Rest) ->
"/gateways/" ++ Rest.
get_deprecated_path("/gateways") ->
"/gateway";
get_deprecated_path("/gateways/" ++ Rest) ->
"/gateway/" ++ Rest.
make_deprecated_paths(Paths) ->
Paths ++ [get_deprecated_path(Path) || Path <- Paths].
make_compatible_schema(Path, SchemaFun) ->
OldPath = get_compatible_path(Path),
make_compatible_schema2(OldPath, SchemaFun).
make_compatible_schema2(Path, SchemaFun) ->
Schema = SchemaFun(Path),
maps:map(
fun(Key, Value) ->
case lists:member(Key, [get, delete, put, post]) of
true ->
Value#{deprecated => true};
_ ->
Value
end
end,
Schema
).
-spec find_gateway_definitions() -> list(gateway_def()).
find_gateway_definitions() ->
lists:flatten(

View File

@ -107,9 +107,9 @@ t_gateway(_) ->
StompGw
),
{204, _} = request(put, "/gateways/stomp", #{enable => true}),
{200, #{enable := true}} = request(get, "/gateway/stomp"),
{200, #{enable := true}} = request(get, "/gateways/stomp"),
{204, _} = request(put, "/gateways/stomp", #{enable => false}),
{200, #{enable := false}} = request(get, "/gateway/stomp"),
{200, #{enable := false}} = request(get, "/gateways/stomp"),
{404, _} = request(put, "/gateways/undefined", #{}),
{400, _} = request(put, "/gateways/stomp", #{bad_key => "foo"}),
ok.
@ -121,27 +121,14 @@ t_gateway_fail(_) ->
{400, _} = request(put, "/gateways/coap", #{}),
ok.
t_deprecated_gateway(_) ->
{200, Gateways} = request(get, "/gateway"),
lists:foreach(fun assert_gw_unloaded/1, Gateways),
{404, NotFoundReq} = request(get, "/gateway/uname_gateway"),
assert_not_found(NotFoundReq),
{204, _} = request(put, "/gateway/stomp", #{}),
{200, StompGw} = request(get, "/gateway/stomp"),
assert_fields_exist(
[name, status, enable, created_at, started_at],
StompGw
),
ok.
t_gateway_enable(_) ->
{204, _} = request(put, "/gateways/stomp", #{}),
{200, #{enable := Enable}} = request(get, "/gateway/stomp"),
{200, #{enable := Enable}} = request(get, "/gateways/stomp"),
NotEnable = not Enable,
{204, _} = request(put, "/gateways/stomp/enable/" ++ atom_to_list(NotEnable), undefined),
{200, #{enable := NotEnable}} = request(get, "/gateway/stomp"),
{200, #{enable := NotEnable}} = request(get, "/gateways/stomp"),
{204, _} = request(put, "/gateways/stomp/enable/" ++ atom_to_list(Enable), undefined),
{200, #{enable := Enable}} = request(get, "/gateway/stomp"),
{200, #{enable := Enable}} = request(get, "/gateways/stomp"),
{404, _} = request(put, "/gateways/undefined/enable/true", undefined),
{404, _} = request(put, "/gateways/not_a_known_atom/enable/true", undefined),
{404, _} = request(put, "/gateways/coap/enable/true", undefined),

View File

@ -46,7 +46,7 @@ api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
paths() ->
emqx_gateway_utils:make_deprecated_paths([?PREFIX ++ "/request"]).
[?PREFIX ++ "/request"].
schema(?PREFIX ++ "/request") ->
#{
@ -65,9 +65,7 @@ schema(?PREFIX ++ "/request") ->
)
}
}
};
schema(Path) ->
emqx_gateway_utils:make_compatible_schema(Path, fun schema/1).
}.
request(post, #{body := Body, bindings := Bindings}) ->
ClientId = maps:get(clientid, Bindings, undefined),
@ -107,7 +105,7 @@ request_body() ->
[
{token, mk(binary(), #{desc => ?DESC(token)})},
{method, mk(enum([get, put, post, delete]), #{desc => ?DESC(method)})},
{timeout, mk(emqx_schema:duration_ms(), #{desc => ?DESC(timeout)})},
{timeout, mk(emqx_schema:timeout_duration_ms(), #{desc => ?DESC(timeout)})},
{content_type,
mk(
enum(['text/plain', 'application/json', 'application/octet-stream']),

View File

@ -97,8 +97,6 @@ t_send_request_api(_) ->
?assertEqual(Payload, RPayload)
end,
Test("gateways/coap/clients/client1/request"),
timer:sleep(100),
Test("gateway/coap/clients/client1/request"),
erlang:exit(ClientId, kill),
ok.

View File

@ -1,6 +1,6 @@
{application, emqx_gateway_lwm2m, [
{description, "LwM2M Gateway"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap]},
{env, []},

View File

@ -40,9 +40,9 @@ api_spec() ->
emqx_dashboard_swagger:spec(?MODULE).
paths() ->
emqx_gateway_utils:make_deprecated_paths([
[
?PATH("/lookup"), ?PATH("/observe"), ?PATH("/read"), ?PATH("/write")
]).
].
schema(?PATH("/lookup")) ->
#{
@ -127,9 +127,7 @@ schema(?PATH("/write")) ->
404 => error_codes(['CLIENT_NOT_FOUND'], <<"Clientid not found">>)
}
}
};
schema(Path) ->
emqx_gateway_utils:make_compatible_schema(Path, fun schema/1).
}.
fields(resource) ->
[

View File

@ -306,7 +306,7 @@ t_observe(Config) ->
test_recv_mqtt_response(RespTopic),
%% step2, call observe API
?assertMatch({204, []}, call_deprecated_send_api(Epn, "observe", "path=/3/0/1&enable=false")),
?assertMatch({204, []}, call_send_api(Epn, "observe", "path=/3/0/1&enable=false")),
timer:sleep(100),
#coap_message{type = Type, method = Method, options = Opts} = test_recv_coap_request(UdpSock),
?assertEqual(con, Type),
@ -328,9 +328,6 @@ call_lookup_api(ClientId, Path, Action) ->
call_send_api(ClientId, Cmd, Query) ->
call_send_api(ClientId, Cmd, Query, "gateways/lwm2m/clients").
call_deprecated_send_api(ClientId, Cmd, Query) ->
call_send_api(ClientId, Cmd, Query, "gateway/lwm2m/clients").
call_send_api(ClientId, Cmd, Query, API) ->
ApiPath = emqx_mgmt_api_test_util:api_path([API, ClientId, Cmd]),
Auth = emqx_mgmt_api_test_util:auth_header_(),

View File

@ -1,6 +1,6 @@
{application, emqx_node_rebalance, [
{description, "EMQX Node Rebalance"},
{vsn, "5.0.1"},
{vsn, "5.0.2"},
{registered, [
emqx_node_rebalance_sup,
emqx_node_rebalance,

View File

@ -356,7 +356,7 @@ fields(rebalance_start) ->
[
{"wait_health_check",
mk(
emqx_schema:duration_s(),
emqx_schema:timeout_duration_s(),
#{
desc => ?DESC(wait_health_check),
required => false
@ -414,7 +414,7 @@ fields(rebalance_start) ->
)},
{"wait_takeover",
mk(
emqx_schema:duration_s(),
emqx_schema:timeout_duration_s(),
#{
desc => ?DESC(wait_takeover),
required => false

View File

@ -351,6 +351,10 @@ to_bin(Bin) when is_binary(Bin) ->
to_bin(Atom) when is_atom(Atom) ->
erlang:atom_to_binary(Atom).
handle_result({error, {recoverable_error, _Error}} = Res) ->
Res;
handle_result({error, {unrecoverable_error, _Error}} = Res) ->
Res;
handle_result({error, disconnected}) ->
{error, {recoverable_error, disconnected}};
handle_result({error, Error}) ->
@ -359,6 +363,8 @@ handle_result({error, socket, closed} = Error) ->
{error, {recoverable_error, Error}};
handle_result({error, Type, Reason}) ->
{error, {unrecoverable_error, {Type, Reason}}};
handle_result({ok, [{proc_result, RetCode, Reason}]}) when RetCode =/= 0 ->
{error, {unrecoverable_error, {RetCode, Reason}}};
handle_result(Res) ->
Res.

View File

@ -48,7 +48,7 @@ fields("prometheus") ->
)},
{interval,
?HOCON(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
default => <<"15s">>,
required => true,

View File

@ -85,6 +85,7 @@
allocate_resource/3,
has_allocated_resources/1,
get_allocated_resources/1,
get_allocated_resources_list/1,
forget_allocated_resources/1
]).
@ -519,6 +520,10 @@ get_allocated_resources(InstanceId) ->
Objects = ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId),
maps:from_list([{K, V} || {_InstanceId, K, V} <- Objects]).
-spec get_allocated_resources_list(resource_id()) -> list(tuple()).
get_allocated_resources_list(InstanceId) ->
ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId).
-spec forget_allocated_resources(resource_id()) -> ok.
forget_allocated_resources(InstanceId) ->
true = ets:delete(?RESOURCE_ALLOCATION_TAB, InstanceId),

View File

@ -76,18 +76,18 @@ worker_pool_size(default) -> ?WORKER_POOL_SIZE;
worker_pool_size(required) -> false;
worker_pool_size(_) -> undefined.
resume_interval(type) -> emqx_schema:duration_ms();
resume_interval(type) -> emqx_schema:timeout_duration_ms();
resume_interval(importance) -> ?IMPORTANCE_HIDDEN;
resume_interval(desc) -> ?DESC("resume_interval");
resume_interval(required) -> false;
resume_interval(_) -> undefined.
metrics_flush_interval(type) -> emqx_schema:duration_ms();
metrics_flush_interval(type) -> emqx_schema:timeout_duration_ms();
metrics_flush_interval(importance) -> ?IMPORTANCE_HIDDEN;
metrics_flush_interval(required) -> false;
metrics_flush_interval(_) -> undefined.
health_check_interval(type) -> emqx_schema:duration_ms();
health_check_interval(type) -> emqx_schema:timeout_duration_ms();
health_check_interval(desc) -> ?DESC("health_check_interval");
health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW;
health_check_interval(required) -> false;
@ -115,7 +115,7 @@ start_after_created(default) -> ?START_AFTER_CREATED_RAW;
start_after_created(required) -> false;
start_after_created(_) -> undefined.
start_timeout(type) -> emqx_schema:duration_ms();
start_timeout(type) -> emqx_schema:timeout_duration_ms();
start_timeout(desc) -> ?DESC("start_timeout");
start_timeout(default) -> ?START_TIMEOUT_RAW;
start_timeout(required) -> false;
@ -133,7 +133,7 @@ query_mode(default) -> async;
query_mode(required) -> false;
query_mode(_) -> undefined.
request_ttl(type) -> hoconsc:union([emqx_schema:duration_ms(), infinity]);
request_ttl(type) -> hoconsc:union([emqx_schema:timeout_duration_ms(), infinity]);
request_ttl(aliases) -> [request_timeout];
request_ttl(desc) -> ?DESC("request_ttl");
request_ttl(default) -> ?DEFAULT_REQUEST_TTL_RAW;
@ -166,7 +166,7 @@ batch_size(default) -> ?DEFAULT_BATCH_SIZE;
batch_size(required) -> false;
batch_size(_) -> undefined.
batch_time(type) -> emqx_schema:duration_ms();
batch_time(type) -> emqx_schema:timeout_duration_ms();
batch_time(desc) -> ?DESC("batch_time");
batch_time(default) -> ?DEFAULT_BATCH_TIME_RAW;
batch_time(required) -> false;
@ -196,6 +196,10 @@ desc("creation_opts") -> ?DESC("creation_opts").
get_value_with_unit(Value) when is_integer(Value) ->
<<(erlang:integer_to_binary(Value))/binary, "ms">>;
get_value_with_unit(Value) when is_list(Value) ->
%% Must ensure it's a binary, otherwise formatting the error
%% message will fail.
list_to_binary(Value);
get_value_with_unit(Value) ->
Value.

View File

@ -0,0 +1,108 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_resource_schema_tests).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
%%===========================================================================
%% Test cases
%%===========================================================================
health_check_interval_validator_test_() ->
[
?_assertMatch(
#{<<"resource_opts">> := #{<<"health_check_interval">> := 150_000}},
parse_and_check_webhook_bridge(webhook_bridge_health_check_hocon(<<"150s">>))
),
?_assertMatch(
#{<<"resource_opts">> := #{<<"health_check_interval">> := 3_600_000}},
parse_and_check_webhook_bridge(webhook_bridge_health_check_hocon(<<"3600000ms">>))
),
?_assertThrow(
{_, [
#{
kind := validation_error,
reason := <<"Health Check Interval (3600001ms) is out of range", _/binary>>,
value := 3600001
}
]},
parse_and_check_webhook_bridge(webhook_bridge_health_check_hocon(<<"3600001ms">>))
),
{"bad parse: negative number",
?_assertThrow(
{_, [
#{
kind := validation_error,
reason := <<"Health Check Interval (-10ms) is out of range", _/binary>>,
value := "-10ms"
}
]},
parse_and_check_webhook_bridge(webhook_bridge_health_check_hocon(<<"-10ms">>))
)},
{"bad parse: underscores",
?_assertThrow(
{_, [
#{
kind := validation_error,
reason :=
<<"Health Check Interval (3_600_000ms) is out of range", _/binary>>,
value := "3_600_000ms"
}
]},
parse_and_check_webhook_bridge(webhook_bridge_health_check_hocon(<<"3_600_000ms">>))
)},
?_assertThrow(
#{exception := "timeout value too large" ++ _},
parse_and_check_webhook_bridge(
webhook_bridge_health_check_hocon(<<"150000000000000s">>)
)
)
].
%%===========================================================================
%% Helper functions
%%===========================================================================
parse_and_check_webhook_bridge(Hocon) ->
#{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)),
Conf.
parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon),
Conf.
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf).
%%===========================================================================
%% Data section
%%===========================================================================
%% erlfmt-ignore
webhook_bridge_health_check_hocon(HealthCheckInterval) ->
io_lib:format(
"""
bridges.webhook.simple {
url = \"http://localhost:4000\"
body = \"body\"
resource_opts {
health_check_interval = \"~s\"
}
}
""",
[HealthCheckInterval]).

View File

@ -45,13 +45,14 @@ fields("retainer") ->
{enable, sc(boolean(), enable, true)},
{msg_expiry_interval,
sc(
%% not used in a `receive ... after' block, just timestamp comparison
emqx_schema:duration_ms(),
msg_expiry_interval,
<<"0s">>
)},
{msg_clear_interval,
sc(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
msg_clear_interval,
<<"0s">>
)},

View File

@ -216,7 +216,7 @@ rule_engine_settings() ->
?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})},
{jq_function_default_timeout,
?HOCON(
emqx_schema:duration_ms(),
emqx_schema:timeout_duration_ms(),
#{
default => <<"10s">>,
desc => ?DESC("rule_engine_jq_function_default_timeout")

View File

@ -1,6 +1,6 @@
{application, emqx_s3, [
{description, "EMQX S3"},
{vsn, "5.0.7"},
{vsn, "5.0.8"},
{modules, []},
{registered, [emqx_s3_sup]},
{applications, [

View File

@ -68,6 +68,7 @@ fields(s3) ->
)},
{url_expire_time,
mk(
%% not used in a `receive ... after' block, just timestamp comparison
emqx_schema:duration_s(),
#{
default => "1h",

View File

@ -1,7 +1,7 @@
{application, emqx_slow_subs, [
{description, "EMQX Slow Subscribers Statistics"},
% strict semver, bump manually!
{vsn, "1.0.5"},
{vsn, "1.0.6"},
{modules, []},
{registered, [emqx_slow_subs_sup]},
{applications, [kernel, stdlib, emqx]},

View File

@ -30,12 +30,14 @@ fields("slow_subs") ->
{enable, sc(boolean(), false, enable)},
{threshold,
sc(
%% not used in a `receive ... after' block, just timestamp comparison
emqx_schema:duration_ms(),
<<"500ms">>,
threshold
)},
{expire_interval,
sc(
%% not used in a `receive ... after' block, just timestamp comparison
emqx_schema:duration_ms(),
<<"300s">>,
expire_interval

View File

@ -0,0 +1 @@
Remove the deprecated HTTP APIs for gateways

View File

@ -0,0 +1,3 @@
Added a schema validation for values that might be used in timeouts to avoid invalid values.
Before this fix, it was possible to use absurd values in the schema that would exceed the system limit, causing a crash.

View File

@ -0,0 +1 @@
Refactored the RocketMQ bridge to avoid leaking resources during crashes at creation.

View File

@ -0,0 +1 @@
Refactored influxdb bridge connector to avoid resource leaks during crashes at creation.