Merge pull request #11077 from zhongwencool/fix-crash-ip-port-type
This commit is contained in:
commit
f50d7334d6
|
@ -29,7 +29,7 @@
|
||||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
|
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
|
||||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}},
|
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}},
|
||||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
|
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
|
||||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.8"}}},
|
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.9"}}},
|
||||||
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
|
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
|
||||||
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
|
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
|
||||||
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
|
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
|
||||||
|
|
|
@ -1758,7 +1758,7 @@ base_listener(Bind) ->
|
||||||
)},
|
)},
|
||||||
{"bind",
|
{"bind",
|
||||||
sc(
|
sc(
|
||||||
hoconsc:union([ip_port(), integer()]),
|
ip_port(),
|
||||||
#{
|
#{
|
||||||
default => Bind,
|
default => Bind,
|
||||||
required => true,
|
required => true,
|
||||||
|
@ -2418,13 +2418,13 @@ mk_duration(Desc, OverrideMeta) ->
|
||||||
to_duration(Str) ->
|
to_duration(Str) ->
|
||||||
case hocon_postprocess:duration(Str) of
|
case hocon_postprocess:duration(Str) of
|
||||||
I when is_integer(I) -> {ok, I};
|
I when is_integer(I) -> {ok, I};
|
||||||
_ -> {error, Str}
|
_ -> to_integer(Str)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
to_duration_s(Str) ->
|
to_duration_s(Str) ->
|
||||||
case hocon_postprocess:duration(Str) of
|
case hocon_postprocess:duration(Str) of
|
||||||
I when is_number(I) -> {ok, ceiling(I / 1000)};
|
I when is_number(I) -> {ok, ceiling(I / 1000)};
|
||||||
_ -> {error, Str}
|
_ -> to_integer(Str)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when
|
-spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when
|
||||||
|
@ -2432,7 +2432,7 @@ to_duration_s(Str) ->
|
||||||
to_duration_ms(Str) ->
|
to_duration_ms(Str) ->
|
||||||
case hocon_postprocess:duration(Str) of
|
case hocon_postprocess:duration(Str) of
|
||||||
I when is_number(I) -> {ok, ceiling(I)};
|
I when is_number(I) -> {ok, ceiling(I)};
|
||||||
_ -> {error, Str}
|
_ -> to_integer(Str)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec to_timeout_duration(Input) -> {ok, timeout_duration()} | {error, Input} when
|
-spec to_timeout_duration(Input) -> {ok, timeout_duration()} | {error, Input} when
|
||||||
|
@ -2473,7 +2473,7 @@ do_to_timeout_duration(Str, Fn, Max, Unit) ->
|
||||||
to_bytesize(Str) ->
|
to_bytesize(Str) ->
|
||||||
case hocon_postprocess:bytesize(Str) of
|
case hocon_postprocess:bytesize(Str) of
|
||||||
I when is_integer(I) -> {ok, I};
|
I when is_integer(I) -> {ok, I};
|
||||||
_ -> {error, Str}
|
_ -> to_integer(Str)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
to_wordsize(Str) ->
|
to_wordsize(Str) ->
|
||||||
|
@ -2483,6 +2483,13 @@ to_wordsize(Str) ->
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
to_integer(Str) ->
|
||||||
|
case string:to_integer(Str) of
|
||||||
|
{Int, []} -> {ok, Int};
|
||||||
|
{Int, <<>>} -> {ok, Int};
|
||||||
|
_ -> {error, Str}
|
||||||
|
end.
|
||||||
|
|
||||||
to_percent(Str) ->
|
to_percent(Str) ->
|
||||||
{ok, hocon_postprocess:percent(Str)}.
|
{ok, hocon_postprocess:percent(Str)}.
|
||||||
|
|
||||||
|
@ -2525,9 +2532,9 @@ to_ip_port(Str) ->
|
||||||
case split_ip_port(Str) of
|
case split_ip_port(Str) of
|
||||||
{"", Port} ->
|
{"", Port} ->
|
||||||
%% this is a local address
|
%% this is a local address
|
||||||
{ok, list_to_integer(Port)};
|
{ok, parse_port(Port)};
|
||||||
{MaybeIp, Port} ->
|
{MaybeIp, Port} ->
|
||||||
PortVal = list_to_integer(Port),
|
PortVal = parse_port(Port),
|
||||||
case inet:parse_address(MaybeIp) of
|
case inet:parse_address(MaybeIp) of
|
||||||
{ok, IpTuple} ->
|
{ok, IpTuple} ->
|
||||||
{ok, {IpTuple, PortVal}};
|
{ok, {IpTuple, PortVal}};
|
||||||
|
@ -2543,18 +2550,11 @@ split_ip_port(Str0) ->
|
||||||
case lists:split(string:rchr(Str, $:), Str) of
|
case lists:split(string:rchr(Str, $:), Str) of
|
||||||
%% no colon
|
%% no colon
|
||||||
{[], Str} ->
|
{[], Str} ->
|
||||||
try
|
{"", Str};
|
||||||
%% if it's just a port number, then return as-is
|
|
||||||
_ = list_to_integer(Str),
|
|
||||||
{"", Str}
|
|
||||||
catch
|
|
||||||
_:_ ->
|
|
||||||
error
|
|
||||||
end;
|
|
||||||
{IpPlusColon, PortString} ->
|
{IpPlusColon, PortString} ->
|
||||||
IpStr0 = lists:droplast(IpPlusColon),
|
IpStr0 = lists:droplast(IpPlusColon),
|
||||||
case IpStr0 of
|
case IpStr0 of
|
||||||
%% dropp head/tail brackets
|
%% drop head/tail brackets
|
||||||
[$[ | S] ->
|
[$[ | S] ->
|
||||||
case lists:last(S) of
|
case lists:last(S) of
|
||||||
$] -> {lists:droplast(S), PortString};
|
$] -> {lists:droplast(S), PortString};
|
||||||
|
|
|
@ -497,17 +497,24 @@ t_update_config(_Config) ->
|
||||||
emqx_config_handler:start_link(),
|
emqx_config_handler:start_link(),
|
||||||
{ok, Pid} = emqx_crl_cache:start_link(),
|
{ok, Pid} = emqx_crl_cache:start_link(),
|
||||||
Conf = #{
|
Conf = #{
|
||||||
refresh_interval => timer:minutes(5),
|
refresh_interval => <<"5m">>,
|
||||||
http_timeout => timer:minutes(10),
|
http_timeout => <<"10m">>,
|
||||||
capacity => 123
|
capacity => 123
|
||||||
},
|
},
|
||||||
?assertMatch({ok, _}, emqx:update_config([<<"crl_cache">>], Conf)),
|
?assertMatch({ok, _}, emqx:update_config([<<"crl_cache">>], Conf)),
|
||||||
State = sys:get_state(Pid),
|
State = sys:get_state(Pid),
|
||||||
?assertEqual(Conf, #{
|
?assertEqual(
|
||||||
|
#{
|
||||||
|
refresh_interval => timer:minutes(5),
|
||||||
|
http_timeout => timer:minutes(10),
|
||||||
|
capacity => 123
|
||||||
|
},
|
||||||
|
#{
|
||||||
refresh_interval => element(3, State),
|
refresh_interval => element(3, State),
|
||||||
http_timeout => element(4, State),
|
http_timeout => element(4, State),
|
||||||
capacity => element(7, State)
|
capacity => element(7, State)
|
||||||
}),
|
}
|
||||||
|
),
|
||||||
emqx_config:erase(<<"crl_cache">>),
|
emqx_config:erase(<<"crl_cache">>),
|
||||||
emqx_config_handler:stop(),
|
emqx_config_handler:stop(),
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -33,9 +33,9 @@ init_per_suite(Config) ->
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"max_count">> => 3,
|
<<"max_count">> => 3,
|
||||||
% 0.1s
|
% 0.1s
|
||||||
<<"window_time">> => 100,
|
<<"window_time">> => <<"100ms">>,
|
||||||
%% 2s
|
%% 2s
|
||||||
<<"ban_time">> => "2s"
|
<<"ban_time">> => <<"2s">>
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
Config.
|
Config.
|
||||||
|
@ -119,16 +119,16 @@ t_conf_update(_) ->
|
||||||
?assertEqual(Global#{window_time := 100}, emqx_flapping:get_policy(zone_1)),
|
?assertEqual(Global#{window_time := 100}, emqx_flapping:get_policy(zone_1)),
|
||||||
|
|
||||||
Zones = #{
|
Zones = #{
|
||||||
<<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => 123}},
|
<<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => <<"123s">>}},
|
||||||
<<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => 456}}
|
<<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => <<"456s">>}}
|
||||||
},
|
},
|
||||||
?assertMatch({ok, _}, emqx:update_config([zones], Zones)),
|
?assertMatch({ok, _}, emqx:update_config([zones], Zones)),
|
||||||
%% new_zone is already deleted
|
%% new_zone is already deleted
|
||||||
?assertError({config_not_found, _}, get_policy(new_zone)),
|
?assertError({config_not_found, _}, get_policy(new_zone)),
|
||||||
%% update zone(zone_1) has default.
|
%% update zone(zone_1) has default.
|
||||||
?assertEqual(Global#{window_time := 123}, emqx_flapping:get_policy(zone_1)),
|
?assertEqual(Global#{window_time := 123000}, emqx_flapping:get_policy(zone_1)),
|
||||||
%% create zone(zone_2) has default
|
%% create zone(zone_2) has default
|
||||||
?assertEqual(Global#{window_time := 456}, emqx_flapping:get_policy(zone_2)),
|
?assertEqual(Global#{window_time := 456000}, emqx_flapping:get_policy(zone_2)),
|
||||||
%% reset to default(empty) andalso get default from global
|
%% reset to default(empty) andalso get default from global
|
||||||
?assertMatch({ok, _}, emqx:update_config([zones], #{})),
|
?assertMatch({ok, _}, emqx:update_config([zones], #{})),
|
||||||
?assertEqual(Global, emqx:get_config([zones, default, flapping_detect])),
|
?assertEqual(Global, emqx:get_config([zones, default, flapping_detect])),
|
||||||
|
@ -172,13 +172,13 @@ validate_timer(Lists) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_window_compatibility_check(_Conf) ->
|
t_window_compatibility_check(_Conf) ->
|
||||||
Flapping = emqx:get_config([flapping_detect]),
|
Flapping = emqx:get_raw_config([flapping_detect]),
|
||||||
ok = emqx_config:init_load(emqx_schema, <<"flapping_detect {window_time = disable}">>),
|
ok = emqx_config:init_load(emqx_schema, <<"flapping_detect {window_time = disable}">>),
|
||||||
?assertMatch(#{window_time := 60000, enable := false}, emqx:get_config([flapping_detect])),
|
?assertMatch(#{window_time := 60000, enable := false}, emqx:get_config([flapping_detect])),
|
||||||
%% reset
|
%% reset
|
||||||
FlappingBin = iolist_to_binary(["flapping_detect {", hocon_pp:do(Flapping, #{}), "}"]),
|
FlappingBin = iolist_to_binary(["flapping_detect {", hocon_pp:do(Flapping, #{}), "}"]),
|
||||||
ok = emqx_config:init_load(emqx_schema, FlappingBin),
|
ok = emqx_config:init_load(emqx_schema, FlappingBin),
|
||||||
?assertEqual(Flapping, emqx:get_config([flapping_detect])),
|
?assertEqual(Flapping, emqx:get_raw_config([flapping_detect])),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
get_policy(Zone) ->
|
get_policy(Zone) ->
|
||||||
|
|
|
@ -137,15 +137,19 @@ init_per_testcase(t_ocsp_responder_error_responses, Config) ->
|
||||||
enable_ocsp_stapling => true,
|
enable_ocsp_stapling => true,
|
||||||
responder_url => <<"http://localhost:9877/">>,
|
responder_url => <<"http://localhost:9877/">>,
|
||||||
issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"),
|
issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"),
|
||||||
refresh_http_timeout => 15_000,
|
refresh_http_timeout => <<"15s">>,
|
||||||
refresh_interval => 1_000
|
refresh_interval => <<"1s">>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Conf = #{listeners => #{Type => #{Name => ListenerOpts}}},
|
Conf = #{listeners => #{Type => #{Name => ListenerOpts}}},
|
||||||
ConfBin = emqx_utils_maps:binary_key_map(Conf),
|
ConfBin = emqx_utils_maps:binary_key_map(Conf),
|
||||||
hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}),
|
CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{
|
||||||
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts),
|
required => false, atom_keys => false
|
||||||
|
}),
|
||||||
|
Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf),
|
||||||
|
ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2),
|
||||||
|
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2),
|
||||||
snabbkaffe:start_trace(),
|
snabbkaffe:start_trace(),
|
||||||
_Heir = spawn_dummy_heir(),
|
_Heir = spawn_dummy_heir(),
|
||||||
{ok, CachePid} = emqx_ocsp_cache:start_link(),
|
{ok, CachePid} = emqx_ocsp_cache:start_link(),
|
||||||
|
@ -179,15 +183,19 @@ init_per_testcase(_TestCase, Config) ->
|
||||||
enable_ocsp_stapling => true,
|
enable_ocsp_stapling => true,
|
||||||
responder_url => <<"http://localhost:9877/">>,
|
responder_url => <<"http://localhost:9877/">>,
|
||||||
issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"),
|
issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"),
|
||||||
refresh_http_timeout => 15_000,
|
refresh_http_timeout => <<"15s">>,
|
||||||
refresh_interval => 1_000
|
refresh_interval => <<"1s">>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Conf = #{listeners => #{Type => #{Name => ListenerOpts}}},
|
Conf = #{listeners => #{Type => #{Name => ListenerOpts}}},
|
||||||
ConfBin = emqx_utils_maps:binary_key_map(Conf),
|
ConfBin = emqx_utils_maps:binary_key_map(Conf),
|
||||||
hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}),
|
CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{
|
||||||
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts),
|
required => false, atom_keys => false
|
||||||
|
}),
|
||||||
|
Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf),
|
||||||
|
ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2),
|
||||||
|
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2),
|
||||||
[
|
[
|
||||||
{cache_pid, CachePid}
|
{cache_pid, CachePid}
|
||||||
| Config
|
| Config
|
||||||
|
|
|
@ -1316,8 +1316,8 @@ authenticator_examples() ->
|
||||||
<<"password">> => ?PH_PASSWORD
|
<<"password">> => ?PH_PASSWORD
|
||||||
},
|
},
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
connect_timeout => 5000,
|
connect_timeout => <<"5s">>,
|
||||||
request_timeout => 5000,
|
request_timeout => <<"5s">>,
|
||||||
enable_pipelining => 100,
|
enable_pipelining => 100,
|
||||||
ssl => #{enable => false}
|
ssl => #{enable => false}
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,7 @@ listener_mqtt_tcp_conf(Port, EnableAuthn) ->
|
||||||
<<"max_connections">> => 1024000,
|
<<"max_connections">> => 1024000,
|
||||||
<<"mountpoint">> => <<>>,
|
<<"mountpoint">> => <<>>,
|
||||||
<<"proxy_protocol">> => false,
|
<<"proxy_protocol">> => false,
|
||||||
<<"proxy_protocol_timeout">> => 3000,
|
<<"proxy_protocol_timeout">> => <<"3s">>,
|
||||||
<<"enable_authn">> => EnableAuthn
|
<<"enable_authn">> => EnableAuthn
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -105,7 +105,7 @@ set_special_configs(_App) ->
|
||||||
<<"headers">> => #{},
|
<<"headers">> => #{},
|
||||||
<<"ssl">> => #{<<"enable">> => true},
|
<<"ssl">> => #{<<"enable">> => true},
|
||||||
<<"method">> => <<"get">>,
|
<<"method">> => <<"get">>,
|
||||||
<<"request_timeout">> => 5000
|
<<"request_timeout">> => <<"5s">>
|
||||||
}).
|
}).
|
||||||
-define(SOURCE2, #{
|
-define(SOURCE2, #{
|
||||||
<<"type">> => <<"mongodb">>,
|
<<"type">> => <<"mongodb">>,
|
||||||
|
|
|
@ -70,7 +70,7 @@ t_api(_) ->
|
||||||
<<"cache">> => #{
|
<<"cache">> => #{
|
||||||
<<"enable">> => false,
|
<<"enable">> => false,
|
||||||
<<"max_size">> => 32,
|
<<"max_size">> => 32,
|
||||||
<<"ttl">> => 60000
|
<<"ttl">> => <<"60s">>
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -84,7 +84,7 @@ t_api(_) ->
|
||||||
<<"cache">> => #{
|
<<"cache">> => #{
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"max_size">> => 32,
|
<<"max_size">> => 32,
|
||||||
<<"ttl">> => 60000
|
<<"ttl">> => <<"60s">>
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
|
@ -178,21 +178,21 @@ bridge_async_config(#{port := Port} = Config) ->
|
||||||
Name = maps:get(name, Config, ?BRIDGE_NAME),
|
Name = maps:get(name, Config, ?BRIDGE_NAME),
|
||||||
PoolSize = maps:get(pool_size, Config, 1),
|
PoolSize = maps:get(pool_size, Config, 1),
|
||||||
QueryMode = maps:get(query_mode, Config, "async"),
|
QueryMode = maps:get(query_mode, Config, "async"),
|
||||||
ConnectTimeout = maps:get(connect_timeout, Config, 1),
|
ConnectTimeout = maps:get(connect_timeout, Config, "1s"),
|
||||||
RequestTimeout = maps:get(request_timeout, Config, 10000),
|
RequestTimeout = maps:get(request_timeout, Config, "10s"),
|
||||||
ResumeInterval = maps:get(resume_interval, Config, "1s"),
|
ResumeInterval = maps:get(resume_interval, Config, "1s"),
|
||||||
ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
|
ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
|
||||||
ConfigString = io_lib:format(
|
ConfigString = io_lib:format(
|
||||||
"bridges.~s.~s {\n"
|
"bridges.~s.~s {\n"
|
||||||
" url = \"http://localhost:~p\"\n"
|
" url = \"http://localhost:~p\"\n"
|
||||||
" connect_timeout = \"~ps\"\n"
|
" connect_timeout = \"~p\"\n"
|
||||||
" enable = true\n"
|
" enable = true\n"
|
||||||
" enable_pipelining = 100\n"
|
" enable_pipelining = 100\n"
|
||||||
" max_retries = 2\n"
|
" max_retries = 2\n"
|
||||||
" method = \"post\"\n"
|
" method = \"post\"\n"
|
||||||
" pool_size = ~p\n"
|
" pool_size = ~p\n"
|
||||||
" pool_type = \"random\"\n"
|
" pool_type = \"random\"\n"
|
||||||
" request_timeout = \"~ps\"\n"
|
" request_timeout = \"~s\"\n"
|
||||||
" body = \"${id}\""
|
" body = \"${id}\""
|
||||||
" resource_opts {\n"
|
" resource_opts {\n"
|
||||||
" inflight_window = 100\n"
|
" inflight_window = 100\n"
|
||||||
|
@ -257,8 +257,8 @@ t_send_async_connection_timeout(Config) ->
|
||||||
port => Port,
|
port => Port,
|
||||||
pool_size => 1,
|
pool_size => 1,
|
||||||
query_mode => "async",
|
query_mode => "async",
|
||||||
connect_timeout => 10_000,
|
connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "s",
|
||||||
request_timeout => ResponseDelayMS * 2,
|
request_timeout => "10s",
|
||||||
resource_request_ttl => "infinity"
|
resource_request_ttl => "infinity"
|
||||||
}),
|
}),
|
||||||
NumberOfMessagesToSend = 10,
|
NumberOfMessagesToSend = 10,
|
||||||
|
@ -278,8 +278,8 @@ t_async_free_retries(Config) ->
|
||||||
port => Port,
|
port => Port,
|
||||||
pool_size => 1,
|
pool_size => 1,
|
||||||
query_mode => "sync",
|
query_mode => "sync",
|
||||||
connect_timeout => 1_000,
|
connect_timeout => "1s",
|
||||||
request_timeout => 10_000,
|
request_timeout => "10s",
|
||||||
resource_request_ttl => "10000s"
|
resource_request_ttl => "10000s"
|
||||||
}),
|
}),
|
||||||
%% Fail 5 times then succeed.
|
%% Fail 5 times then succeed.
|
||||||
|
@ -304,8 +304,8 @@ t_async_common_retries(Config) ->
|
||||||
pool_size => 1,
|
pool_size => 1,
|
||||||
query_mode => "sync",
|
query_mode => "sync",
|
||||||
resume_interval => "100ms",
|
resume_interval => "100ms",
|
||||||
connect_timeout => 1_000,
|
connect_timeout => "1s",
|
||||||
request_timeout => 10_000,
|
request_timeout => "10s",
|
||||||
resource_request_ttl => "10000s"
|
resource_request_ttl => "10000s"
|
||||||
}),
|
}),
|
||||||
%% Keeps failing until connector gives up.
|
%% Keeps failing until connector gives up.
|
||||||
|
|
|
@ -635,9 +635,9 @@ t_bad_sql_parameter(Config) ->
|
||||||
Config,
|
Config,
|
||||||
#{
|
#{
|
||||||
<<"resource_opts">> => #{
|
<<"resource_opts">> => #{
|
||||||
<<"request_ttl">> => 500,
|
<<"request_ttl">> => <<"500ms">>,
|
||||||
<<"resume_interval">> => 100,
|
<<"resume_interval">> => <<"100ms">>,
|
||||||
<<"health_check_interval">> => 100
|
<<"health_check_interval">> => <<"100ms">>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
|
@ -184,7 +184,7 @@ clickhouse_config() ->
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
connect_timeout => 10000
|
connect_timeout => <<"10s">>
|
||||||
},
|
},
|
||||||
#{<<"config">> => Config}.
|
#{<<"config">> => Config}.
|
||||||
|
|
||||||
|
|
|
@ -135,8 +135,8 @@ bridge_config(TestCase, _TestGroup, Config) ->
|
||||||
" iotdb_version = \"~s\"\n"
|
" iotdb_version = \"~s\"\n"
|
||||||
" pool_size = 1\n"
|
" pool_size = 1\n"
|
||||||
" resource_opts = {\n"
|
" resource_opts = {\n"
|
||||||
" health_check_interval = 5000\n"
|
" health_check_interval = \"5s\"\n"
|
||||||
" request_ttl = 30000\n"
|
" request_ttl = 30s\n"
|
||||||
" query_mode = \"async\"\n"
|
" query_mode = \"async\"\n"
|
||||||
" worker_pool_size = 1\n"
|
" worker_pool_size = 1\n"
|
||||||
" }\n"
|
" }\n"
|
||||||
|
|
|
@ -590,7 +590,7 @@ kafka_config(TestCase, _KafkaType, Config) ->
|
||||||
" kafka {\n"
|
" kafka {\n"
|
||||||
" max_batch_bytes = 896KB\n"
|
" max_batch_bytes = 896KB\n"
|
||||||
" max_rejoin_attempts = 5\n"
|
" max_rejoin_attempts = 5\n"
|
||||||
" offset_commit_interval_seconds = 3\n"
|
" offset_commit_interval_seconds = 3s\n"
|
||||||
%% todo: matrix this
|
%% todo: matrix this
|
||||||
" offset_reset_policy = latest\n"
|
" offset_reset_policy = latest\n"
|
||||||
" }\n"
|
" }\n"
|
||||||
|
|
|
@ -307,7 +307,7 @@ bridges.kafka_consumer.my_consumer {
|
||||||
kafka {
|
kafka {
|
||||||
max_batch_bytes = 896KB
|
max_batch_bytes = 896KB
|
||||||
max_rejoin_attempts = 5
|
max_rejoin_attempts = 5
|
||||||
offset_commit_interval_seconds = 3
|
offset_commit_interval_seconds = 3s
|
||||||
offset_reset_policy = latest
|
offset_reset_policy = latest
|
||||||
}
|
}
|
||||||
topic_mapping = [
|
topic_mapping = [
|
||||||
|
|
|
@ -298,9 +298,9 @@ t_write_timeout(Config) ->
|
||||||
Config,
|
Config,
|
||||||
#{
|
#{
|
||||||
<<"resource_opts">> => #{
|
<<"resource_opts">> => #{
|
||||||
<<"request_ttl">> => 500,
|
<<"request_ttl">> => <<"500ms">>,
|
||||||
<<"resume_interval">> => 100,
|
<<"resume_interval">> => <<"100ms">>,
|
||||||
<<"health_check_interval">> => 100
|
<<"health_check_interval">> => <<"100ms">>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
|
|
@ -456,9 +456,9 @@ t_write_timeout(Config) ->
|
||||||
Config,
|
Config,
|
||||||
#{
|
#{
|
||||||
<<"resource_opts">> => #{
|
<<"resource_opts">> => #{
|
||||||
<<"request_ttl">> => 500,
|
<<"request_ttl">> => <<"500ms">>,
|
||||||
<<"resume_interval">> => 100,
|
<<"resume_interval">> => <<"100ms">>,
|
||||||
<<"health_check_interval">> => 100
|
<<"health_check_interval">> => <<"100ms">>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
|
|
@ -55,7 +55,7 @@ log.console_handler {
|
||||||
burst_limit {
|
burst_limit {
|
||||||
enable = true
|
enable = true
|
||||||
max_count = 10000
|
max_count = 10000
|
||||||
window_time = 1000
|
window_time = 1s
|
||||||
}
|
}
|
||||||
chars_limit = unlimited
|
chars_limit = unlimited
|
||||||
drop_mode_qlen = 3000
|
drop_mode_qlen = 3000
|
||||||
|
@ -66,9 +66,9 @@ log.console_handler {
|
||||||
max_depth = 100
|
max_depth = 100
|
||||||
overload_kill {
|
overload_kill {
|
||||||
enable = true
|
enable = true
|
||||||
mem_size = 31457280
|
mem_size = \"30MB\"
|
||||||
qlen = 20000
|
qlen = 20000
|
||||||
restart_after = 5000
|
restart_after = \"5s\"
|
||||||
}
|
}
|
||||||
single_line = true
|
single_line = true
|
||||||
supervisor_reports = error
|
supervisor_reports = error
|
||||||
|
@ -80,7 +80,7 @@ log.file_handlers {
|
||||||
burst_limit {
|
burst_limit {
|
||||||
enable = true
|
enable = true
|
||||||
max_count = 10000
|
max_count = 10000
|
||||||
window_time = 1000
|
window_time = 1s
|
||||||
}
|
}
|
||||||
chars_limit = unlimited
|
chars_limit = unlimited
|
||||||
drop_mode_qlen = 3000
|
drop_mode_qlen = 3000
|
||||||
|
@ -93,9 +93,9 @@ log.file_handlers {
|
||||||
max_size = \"1024MB\"
|
max_size = \"1024MB\"
|
||||||
overload_kill {
|
overload_kill {
|
||||||
enable = true
|
enable = true
|
||||||
mem_size = 31457280
|
mem_size = \"30MB\"
|
||||||
qlen = 20000
|
qlen = 20000
|
||||||
restart_after = 5000
|
restart_after = \"5s\"
|
||||||
}
|
}
|
||||||
rotation {count = 20, enable = true}
|
rotation {count = 20, enable = true}
|
||||||
single_line = true
|
single_line = true
|
||||||
|
|
|
@ -35,7 +35,7 @@ t_run_gc(_) ->
|
||||||
node => #{
|
node => #{
|
||||||
cookie => <<"cookie">>,
|
cookie => <<"cookie">>,
|
||||||
data_dir => <<"data">>,
|
data_dir => <<"data">>,
|
||||||
global_gc_interval => 1000
|
global_gc_interval => <<"1s">>
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
emqx_common_test_helpers:load_config(emqx_conf_schema, Conf0),
|
emqx_common_test_helpers:load_config(emqx_conf_schema, Conf0),
|
||||||
|
|
|
@ -195,7 +195,7 @@ enable(Bool) ->
|
||||||
bind(Port) ->
|
bind(Port) ->
|
||||||
{"bind",
|
{"bind",
|
||||||
?HOCON(
|
?HOCON(
|
||||||
?UNION([non_neg_integer(), emqx_schema:ip_port()]),
|
emqx_schema:ip_port(),
|
||||||
#{
|
#{
|
||||||
default => 0,
|
default => 0,
|
||||||
required => false,
|
required => false,
|
||||||
|
|
|
@ -225,7 +225,7 @@ t_update_conf(_Config) ->
|
||||||
DeletedConf = Conf#{<<"servers">> => Servers2},
|
DeletedConf = Conf#{<<"servers">> => Servers2},
|
||||||
validate_servers(Path, DeletedConf, Servers2),
|
validate_servers(Path, DeletedConf, Servers2),
|
||||||
[L1, L2 | Servers3] = Servers,
|
[L1, L2 | Servers3] = Servers,
|
||||||
UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => 1000},
|
UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => <<"1s">>},
|
||||||
UpdatedServers = [L1, UpdateL2 | Servers3],
|
UpdatedServers = [L1, UpdateL2 | Servers3],
|
||||||
UpdatedConf = Conf#{<<"servers">> => UpdatedServers},
|
UpdatedConf = Conf#{<<"servers">> => UpdatedServers},
|
||||||
validate_servers(Path, UpdatedConf, UpdatedServers),
|
validate_servers(Path, UpdatedConf, UpdatedServers),
|
||||||
|
|
|
@ -67,7 +67,7 @@ init_per_suite(Config) ->
|
||||||
_ = emqx_exhook_demo_svr:start(),
|
_ = emqx_exhook_demo_svr:start(),
|
||||||
load_cfg(?CONF_DEFAULT),
|
load_cfg(?CONF_DEFAULT),
|
||||||
emqx_mgmt_api_test_util:init_suite([emqx_exhook]),
|
emqx_mgmt_api_test_util:init_suite([emqx_exhook]),
|
||||||
[Conf] = emqx:get_config([exhook, servers]),
|
[Conf] = emqx:get_raw_config([exhook, servers]),
|
||||||
[{template, Conf} | Config].
|
[{template, Conf} | Config].
|
||||||
|
|
||||||
end_per_suite(Config) ->
|
end_per_suite(Config) ->
|
||||||
|
@ -157,8 +157,8 @@ t_get(_) ->
|
||||||
t_add(Cfg) ->
|
t_add(Cfg) ->
|
||||||
Template = proplists:get_value(template, Cfg),
|
Template = proplists:get_value(template, Cfg),
|
||||||
Instance = Template#{
|
Instance = Template#{
|
||||||
name => <<"test1">>,
|
<<"name">> => <<"test1">>,
|
||||||
url => "http://127.0.0.1:9001"
|
<<"url">> => "http://127.0.0.1:9001"
|
||||||
},
|
},
|
||||||
{ok, Data} = request_api(
|
{ok, Data} = request_api(
|
||||||
post,
|
post,
|
||||||
|
@ -186,8 +186,8 @@ t_add(Cfg) ->
|
||||||
t_add_duplicate(Cfg) ->
|
t_add_duplicate(Cfg) ->
|
||||||
Template = proplists:get_value(template, Cfg),
|
Template = proplists:get_value(template, Cfg),
|
||||||
Instance = Template#{
|
Instance = Template#{
|
||||||
name => <<"test1">>,
|
<<"name">> => <<"test1">>,
|
||||||
url => "http://127.0.0.1:9001"
|
<<"url">> => "http://127.0.0.1:9001"
|
||||||
},
|
},
|
||||||
|
|
||||||
{error, _Reason} = request_api(
|
{error, _Reason} = request_api(
|
||||||
|
@ -203,8 +203,8 @@ t_add_duplicate(Cfg) ->
|
||||||
t_add_with_bad_name(Cfg) ->
|
t_add_with_bad_name(Cfg) ->
|
||||||
Template = proplists:get_value(template, Cfg),
|
Template = proplists:get_value(template, Cfg),
|
||||||
Instance = Template#{
|
Instance = Template#{
|
||||||
name => <<"🤔">>,
|
<<"name">> => <<"🤔">>,
|
||||||
url => "http://127.0.0.1:9001"
|
<<"url">> => "http://127.0.0.1:9001"
|
||||||
},
|
},
|
||||||
|
|
||||||
{error, _Reason} = request_api(
|
{error, _Reason} = request_api(
|
||||||
|
@ -298,7 +298,7 @@ t_hooks(_Cfg) ->
|
||||||
|
|
||||||
t_update(Cfg) ->
|
t_update(Cfg) ->
|
||||||
Template = proplists:get_value(template, Cfg),
|
Template = proplists:get_value(template, Cfg),
|
||||||
Instance = Template#{enable => false},
|
Instance = Template#{<<"enable">> => false},
|
||||||
{ok, <<"{\"", _/binary>>} = request_api(
|
{ok, <<"{\"", _/binary>>} = request_api(
|
||||||
put,
|
put,
|
||||||
api_path(["exhooks", "default"]),
|
api_path(["exhooks", "default"]),
|
||||||
|
|
|
@ -77,7 +77,7 @@ set_special_configs(Config) ->
|
||||||
% complete transfers.
|
% complete transfers.
|
||||||
Storage = emqx_utils_maps:deep_merge(
|
Storage = emqx_utils_maps:deep_merge(
|
||||||
emqx_ft_test_helpers:local_storage(Config),
|
emqx_ft_test_helpers:local_storage(Config),
|
||||||
#{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}}
|
#{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => <<"0s">>}}}}
|
||||||
),
|
),
|
||||||
emqx_ft_test_helpers:load_config(#{
|
emqx_ft_test_helpers:load_config(#{
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
|
|
|
@ -261,7 +261,7 @@ common_listener_opts() ->
|
||||||
)},
|
)},
|
||||||
{bind,
|
{bind,
|
||||||
sc(
|
sc(
|
||||||
hoconsc:union([ip_port(), integer()]),
|
ip_port(),
|
||||||
#{desc => ?DESC(gateway_common_listener_bind)}
|
#{desc => ?DESC(gateway_common_listener_bind)}
|
||||||
)},
|
)},
|
||||||
{max_connections,
|
{max_connections,
|
||||||
|
|
|
@ -56,7 +56,7 @@ fields(exproto_grpc_server) ->
|
||||||
[
|
[
|
||||||
{bind,
|
{bind,
|
||||||
sc(
|
sc(
|
||||||
hoconsc:union([ip_port(), integer()]),
|
ip_port(),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC(exproto_grpc_server_bind)
|
desc => ?DESC(exproto_grpc_server_bind)
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
%%--------------------------------------------------------------------
|
%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
@ -192,7 +192,7 @@ default_config(Overrides) ->
|
||||||
" xml_dir = \"~s\"\n"
|
" xml_dir = \"~s\"\n"
|
||||||
" lifetime_min = 1s\n"
|
" lifetime_min = 1s\n"
|
||||||
" lifetime_max = 86400s\n"
|
" lifetime_max = 86400s\n"
|
||||||
" qmode_time_window = 22\n"
|
" qmode_time_window = 22s\n"
|
||||||
" auto_observe = ~w\n"
|
" auto_observe = ~w\n"
|
||||||
" mountpoint = \"lwm2m/${username}\"\n"
|
" mountpoint = \"lwm2m/${username}\"\n"
|
||||||
" update_msg_publish_condition = contains_object_list\n"
|
" update_msg_publish_condition = contains_object_list\n"
|
||||||
|
|
|
@ -468,7 +468,7 @@ fields(rebalance_evacuation_start) ->
|
||||||
)},
|
)},
|
||||||
{"wait_takeover",
|
{"wait_takeover",
|
||||||
mk(
|
mk(
|
||||||
pos_integer(),
|
emqx_schema:timeout_duration_s(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC(wait_takeover),
|
desc => ?DESC(wait_takeover),
|
||||||
required => false
|
required => false
|
||||||
|
@ -709,24 +709,24 @@ fields(global_status) ->
|
||||||
|
|
||||||
rebalance_example() ->
|
rebalance_example() ->
|
||||||
#{
|
#{
|
||||||
wait_health_check => 10,
|
wait_health_check => <<"10s">>,
|
||||||
conn_evict_rate => 10,
|
conn_evict_rate => 10,
|
||||||
sess_evict_rate => 20,
|
sess_evict_rate => 20,
|
||||||
abs_conn_threshold => 10,
|
abs_conn_threshold => 10,
|
||||||
rel_conn_threshold => 1.5,
|
rel_conn_threshold => 1.5,
|
||||||
abs_sess_threshold => 10,
|
abs_sess_threshold => 10,
|
||||||
rel_sess_threshold => 1.5,
|
rel_sess_threshold => 1.5,
|
||||||
wait_takeover => 10,
|
wait_takeover => <<"10s">>,
|
||||||
nodes => [<<"othernode@127.0.0.1">>]
|
nodes => [<<"othernode@127.0.0.1">>]
|
||||||
}.
|
}.
|
||||||
|
|
||||||
rebalance_evacuation_example() ->
|
rebalance_evacuation_example() ->
|
||||||
#{
|
#{
|
||||||
wait_health_check => 10,
|
wait_health_check => <<"10s">>,
|
||||||
conn_evict_rate => 100,
|
conn_evict_rate => 100,
|
||||||
sess_evict_rate => 100,
|
sess_evict_rate => 100,
|
||||||
redirect_to => <<"othernode:1883">>,
|
redirect_to => <<"othernode:1883">>,
|
||||||
wait_takeover => 10,
|
wait_takeover => <<"10s">>,
|
||||||
migrate_to => [<<"othernode@127.0.0.1">>]
|
migrate_to => [<<"othernode@127.0.0.1">>]
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -67,7 +67,6 @@ t_start_evacuation_validation(Config) ->
|
||||||
BadOpts = [
|
BadOpts = [
|
||||||
#{conn_evict_rate => <<"conn">>},
|
#{conn_evict_rate => <<"conn">>},
|
||||||
#{sess_evict_rate => <<"sess">>},
|
#{sess_evict_rate => <<"sess">>},
|
||||||
#{redirect_to => 123},
|
|
||||||
#{wait_takeover => <<"wait">>},
|
#{wait_takeover => <<"wait">>},
|
||||||
#{wait_health_check => <<"wait">>},
|
#{wait_health_check => <<"wait">>},
|
||||||
#{migrate_to => []},
|
#{migrate_to => []},
|
||||||
|
@ -83,7 +82,8 @@ t_start_evacuation_validation(Config) ->
|
||||||
api_post(
|
api_post(
|
||||||
["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
|
["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
|
||||||
Opts
|
Opts
|
||||||
)
|
),
|
||||||
|
Opts
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
BadOpts
|
BadOpts
|
||||||
|
@ -103,8 +103,8 @@ t_start_evacuation_validation(Config) ->
|
||||||
#{
|
#{
|
||||||
conn_evict_rate => 10,
|
conn_evict_rate => 10,
|
||||||
sess_evict_rate => 10,
|
sess_evict_rate => 10,
|
||||||
wait_takeover => 10,
|
wait_takeover => <<"10s">>,
|
||||||
wait_health_check => 10,
|
wait_health_check => <<"10s">>,
|
||||||
redirect_to => <<"srv">>,
|
redirect_to => <<"srv">>,
|
||||||
migrate_to => [atom_to_binary(RecipientNode)]
|
migrate_to => [atom_to_binary(RecipientNode)]
|
||||||
}
|
}
|
||||||
|
@ -166,8 +166,8 @@ t_start_rebalance_validation(Config) ->
|
||||||
#{
|
#{
|
||||||
conn_evict_rate => 10,
|
conn_evict_rate => 10,
|
||||||
sess_evict_rate => 10,
|
sess_evict_rate => 10,
|
||||||
wait_takeover => 10,
|
wait_takeover => <<"10s">>,
|
||||||
wait_health_check => 10,
|
wait_health_check => <<"10s">>,
|
||||||
abs_conn_threshold => 10,
|
abs_conn_threshold => 10,
|
||||||
rel_conn_threshold => 1.001,
|
rel_conn_threshold => 1.001,
|
||||||
abs_sess_threshold => 10,
|
abs_sess_threshold => 10,
|
||||||
|
|
|
@ -483,26 +483,25 @@ t_clear_expired(_) ->
|
||||||
with_conf(ConfMod, Case).
|
with_conf(ConfMod, Case).
|
||||||
|
|
||||||
t_max_payload_size(_) ->
|
t_max_payload_size(_) ->
|
||||||
ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := 6} end,
|
ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := <<"1kb">>} end,
|
||||||
Case = fun() ->
|
Case = fun() ->
|
||||||
emqx_retainer:clean(),
|
emqx_retainer:clean(),
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
Payload = iolist_to_binary(lists:duplicate(1024, <<"0">>)),
|
||||||
emqtt:publish(
|
emqtt:publish(
|
||||||
C1,
|
C1,
|
||||||
<<"retained/1">>,
|
<<"retained/1">>,
|
||||||
#{},
|
#{},
|
||||||
<<"1234">>,
|
Payload,
|
||||||
[{qos, 0}, {retain, true}]
|
[{qos, 0}, {retain, true}]
|
||||||
),
|
),
|
||||||
|
|
||||||
emqtt:publish(
|
emqtt:publish(
|
||||||
C1,
|
C1,
|
||||||
<<"retained/2">>,
|
<<"retained/2">>,
|
||||||
#{},
|
#{},
|
||||||
<<"1234567">>,
|
<<"1", Payload/binary>>,
|
||||||
[{qos, 0}, {retain, true}]
|
[{qos, 0}, {retain, true}]
|
||||||
),
|
),
|
||||||
|
|
||||||
|
|
|
@ -84,7 +84,7 @@ t_full_config(_Config) ->
|
||||||
<<"min_part_size">> => <<"10mb">>,
|
<<"min_part_size">> => <<"10mb">>,
|
||||||
<<"acl">> => <<"public_read">>,
|
<<"acl">> => <<"public_read">>,
|
||||||
<<"transport_options">> => #{
|
<<"transport_options">> => #{
|
||||||
<<"connect_timeout">> => 30000,
|
<<"connect_timeout">> => <<"30s">>,
|
||||||
<<"enable_pipelining">> => 200,
|
<<"enable_pipelining">> => 200,
|
||||||
<<"pool_size">> => 10,
|
<<"pool_size">> => 10,
|
||||||
<<"pool_type">> => <<"random">>,
|
<<"pool_type">> => <<"random">>,
|
||||||
|
|
|
@ -64,10 +64,10 @@ base_raw_config(tcp) ->
|
||||||
<<"secret_access_key">> => bin(?SECRET_ACCESS_KEY),
|
<<"secret_access_key">> => bin(?SECRET_ACCESS_KEY),
|
||||||
<<"host">> => ?TCP_HOST,
|
<<"host">> => ?TCP_HOST,
|
||||||
<<"port">> => ?TCP_PORT,
|
<<"port">> => ?TCP_PORT,
|
||||||
<<"max_part_size">> => 10 * 1024 * 1024,
|
<<"max_part_size">> => <<"10MB">>,
|
||||||
<<"transport_options">> =>
|
<<"transport_options">> =>
|
||||||
#{
|
#{
|
||||||
<<"request_timeout">> => 2000
|
<<"request_timeout">> => <<"2s">>
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
base_raw_config(tls) ->
|
base_raw_config(tls) ->
|
||||||
|
@ -77,10 +77,10 @@ base_raw_config(tls) ->
|
||||||
<<"secret_access_key">> => bin(?SECRET_ACCESS_KEY),
|
<<"secret_access_key">> => bin(?SECRET_ACCESS_KEY),
|
||||||
<<"host">> => ?TLS_HOST,
|
<<"host">> => ?TLS_HOST,
|
||||||
<<"port">> => ?TLS_PORT,
|
<<"port">> => ?TLS_PORT,
|
||||||
<<"max_part_size">> => 10 * 1024 * 1024,
|
<<"max_part_size">> => <<"10MB">>,
|
||||||
<<"transport_options">> =>
|
<<"transport_options">> =>
|
||||||
#{
|
#{
|
||||||
<<"request_timeout">> => 2000,
|
<<"request_timeout">> => <<"2s">>,
|
||||||
<<"ssl">> => #{
|
<<"ssl">> => #{
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"cacertfile">> => bin(cert_path("ca.crt")),
|
<<"cacertfile">> => bin(cert_path("ca.crt")),
|
||||||
|
|
|
@ -139,7 +139,7 @@ settings(get, _) ->
|
||||||
{200, emqx:get_raw_config([slow_subs], #{})};
|
{200, emqx:get_raw_config([slow_subs], #{})};
|
||||||
settings(put, #{body := Body}) ->
|
settings(put, #{body := Body}) ->
|
||||||
case emqx_slow_subs:update_settings(Body) of
|
case emqx_slow_subs:update_settings(Body) of
|
||||||
{ok, #{config := NewConf}} ->
|
{ok, #{raw_config := NewConf}} ->
|
||||||
{200, NewConf};
|
{200, NewConf};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
Message = list_to_binary(io_lib:format("Update slow subs config failed ~p", [Reason])),
|
Message = list_to_binary(io_lib:format("Update slow subs config failed ~p", [Reason])),
|
||||||
|
|
|
@ -41,7 +41,7 @@
|
||||||
"{\n"
|
"{\n"
|
||||||
" enable = true\n"
|
" enable = true\n"
|
||||||
" top_k_num = 5,\n"
|
" top_k_num = 5,\n"
|
||||||
" expire_interval = 60000\n"
|
" expire_interval = 60s\n"
|
||||||
" stats_type = whole\n"
|
" stats_type = whole\n"
|
||||||
"}"
|
"}"
|
||||||
""
|
""
|
||||||
|
@ -137,36 +137,33 @@ t_clear(_) ->
|
||||||
?assertEqual(0, ets:info(?TOPK_TAB, size)).
|
?assertEqual(0, ets:info(?TOPK_TAB, size)).
|
||||||
|
|
||||||
t_settting(_) ->
|
t_settting(_) ->
|
||||||
Conf = emqx:get_config([slow_subs]),
|
RawConf = emqx:get_raw_config([slow_subs]),
|
||||||
Conf2 = Conf#{stats_type => internal},
|
RawConf2 = RawConf#{<<"stats_type">> => <<"internal">>},
|
||||||
{ok, Data} = request_api(
|
{ok, Data} = request_api(
|
||||||
put,
|
put,
|
||||||
api_path(["slow_subscriptions", "settings"]),
|
api_path(["slow_subscriptions", "settings"]),
|
||||||
[],
|
[],
|
||||||
auth_header_(),
|
auth_header_(),
|
||||||
Conf2
|
RawConf2
|
||||||
),
|
),
|
||||||
|
|
||||||
Return = decode_json(Data),
|
Return = decode_json(Data),
|
||||||
|
Expect = emqx_config:fill_defaults(RawConf2),
|
||||||
|
|
||||||
?assertEqual(Conf2#{stats_type := <<"internal">>}, Return),
|
?assertEqual(Expect, Return),
|
||||||
|
|
||||||
|
timer:sleep(800),
|
||||||
{ok, GetData} = request_api(
|
{ok, GetData} = request_api(
|
||||||
get,
|
get,
|
||||||
api_path(["slow_subscriptions", "settings"]),
|
api_path(["slow_subscriptions", "settings"]),
|
||||||
[],
|
[],
|
||||||
auth_header_()
|
auth_header_()
|
||||||
),
|
),
|
||||||
|
|
||||||
timer:sleep(1000),
|
|
||||||
|
|
||||||
GetReturn = decode_json(GetData),
|
GetReturn = decode_json(GetData),
|
||||||
|
?assertEqual(Expect, GetReturn).
|
||||||
?assertEqual(Conf2#{stats_type := <<"internal">>}, GetReturn).
|
|
||||||
|
|
||||||
decode_json(Data) ->
|
decode_json(Data) ->
|
||||||
BinJosn = emqx_utils_json:decode(Data, [return_maps]),
|
emqx_utils_json:decode(Data, [return_maps]).
|
||||||
emqx_utils_maps:unsafe_atom_key_map(BinJosn).
|
|
||||||
|
|
||||||
request_api(Method, Url, Auth) ->
|
request_api(Method, Url, Auth) ->
|
||||||
request_api(Method, Url, [], Auth, []).
|
request_api(Method, Url, [], Auth, []).
|
||||||
|
|
2
mix.exs
2
mix.exs
|
@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
# in conflict by emqtt and hocon
|
# in conflict by emqtt and hocon
|
||||||
{:getopt, "1.0.2", override: true},
|
{:getopt, "1.0.2", override: true},
|
||||||
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
|
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
|
||||||
{:hocon, github: "emqx/hocon", tag: "0.39.8", override: true},
|
{:hocon, github: "emqx/hocon", tag: "0.39.9", override: true},
|
||||||
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
|
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
|
||||||
{:esasl, github: "emqx/esasl", tag: "0.2.0"},
|
{:esasl, github: "emqx/esasl", tag: "0.2.0"},
|
||||||
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
|
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
|
||||||
|
|
|
@ -75,7 +75,7 @@
|
||||||
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
|
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
|
||||||
, {getopt, "1.0.2"}
|
, {getopt, "1.0.2"}
|
||||||
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
|
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
|
||||||
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.8"}}}
|
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.9"}}}
|
||||||
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
|
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
|
||||||
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
|
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
|
||||||
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
|
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
|
||||||
|
|
Loading…
Reference in New Issue