Merge pull request #9628 from savonarola/fix-flaky-redis-bridge-test
chore(ee bridge): fix Redis bridge test flakyness
This commit is contained in:
commit
f9843de7ae
|
@ -52,7 +52,7 @@ down:
|
|||
down --remove-orphans
|
||||
|
||||
ct:
|
||||
docker exec -i "$(CONTAINER)" bash -c "rebar3 ct --name 'test@127.0.0.1' -v --suite $(SUITE)"
|
||||
docker exec -i "$(CONTAINER)" bash -c "rebar3 ct --name 'test@127.0.0.1' --readable true -v --suite $(SUITE)"
|
||||
|
||||
ct-all:
|
||||
docker exec -i "$(CONTAINER)" bash -c "make ct"
|
||||
|
|
|
@ -101,6 +101,14 @@
|
|||
-define(HEALTHCHECK_INTERVAL, 15000).
|
||||
-define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>).
|
||||
|
||||
%% milliseconds
|
||||
-define(START_TIMEOUT, 5000).
|
||||
-define(START_TIMEOUT_RAW, <<"5s">>).
|
||||
|
||||
%% boolean
|
||||
-define(START_AFTER_CREATED, true).
|
||||
-define(START_AFTER_CREATED_RAW, <<"true">>).
|
||||
|
||||
%% milliseconds
|
||||
-define(AUTO_RESTART_INTERVAL, 60000).
|
||||
-define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>).
|
||||
|
|
|
@ -116,11 +116,6 @@ create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) ->
|
|||
{ok, _Group, Data} = lookup(ResId),
|
||||
{ok, Data}.
|
||||
|
||||
%% internal configs
|
||||
-define(START_AFTER_CREATED, true).
|
||||
%% in milliseconds
|
||||
-define(START_TIMEOUT, 5000).
|
||||
|
||||
%% @doc Create a resource_manager and wait until it is running
|
||||
create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
|
||||
% The state machine will make the actual call to the callback/resource module after init
|
||||
|
|
|
@ -46,6 +46,8 @@ fields("creation_opts") ->
|
|||
[
|
||||
{worker_pool_size, fun worker_pool_size/1},
|
||||
{health_check_interval, fun health_check_interval/1},
|
||||
{start_after_created, fun start_after_created/1},
|
||||
{start_timeout, fun start_timeout/1},
|
||||
{auto_restart_interval, fun auto_restart_interval/1},
|
||||
{query_mode, fun query_mode/1},
|
||||
{request_timeout, fun request_timeout/1},
|
||||
|
@ -69,6 +71,18 @@ health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW;
|
|||
health_check_interval(required) -> false;
|
||||
health_check_interval(_) -> undefined.
|
||||
|
||||
start_after_created(type) -> boolean();
|
||||
start_after_created(desc) -> ?DESC("start_after_created");
|
||||
start_after_created(default) -> ?START_AFTER_CREATED_RAW;
|
||||
start_after_created(required) -> false;
|
||||
start_after_created(_) -> undefined.
|
||||
|
||||
start_timeout(type) -> emqx_schema:duration_ms();
|
||||
start_timeout(desc) -> ?DESC("start_timeout");
|
||||
start_timeout(default) -> ?START_TIMEOUT_RAW;
|
||||
start_timeout(required) -> false;
|
||||
start_timeout(_) -> undefined.
|
||||
|
||||
auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
|
||||
auto_restart_interval(desc) -> ?DESC("auto_restart_interval");
|
||||
auto_restart_interval(default) -> ?AUTO_RESTART_INTERVAL_RAW;
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Expose additional resource configuration parameters: `start_after_created` and `start_timeout`.
|
|
@ -0,0 +1 @@
|
|||
为桥接资源增加了配置参数:`start_after_created` 和 `start_timeout`。
|
|
@ -137,6 +137,7 @@ end_per_suite(_Config) ->
|
|||
ok.
|
||||
|
||||
init_per_testcase(_Testcase, Config) ->
|
||||
ok = delete_all_rules(),
|
||||
ok = delete_all_bridges(),
|
||||
case ?config(transport_type, Config) of
|
||||
undefined ->
|
||||
|
@ -248,29 +249,27 @@ t_check_replay(Config) ->
|
|||
),
|
||||
|
||||
?check_trace(
|
||||
begin
|
||||
?wait_async_action(
|
||||
with_down_failure(Config, ProxyName, fun() ->
|
||||
{_, {ok, _}} =
|
||||
?wait_async_action(
|
||||
lists:foreach(
|
||||
fun(_) ->
|
||||
_ = publish_message(Topic, <<"test_payload">>)
|
||||
end,
|
||||
lists:seq(1, ?BATCH_SIZE)
|
||||
),
|
||||
#{
|
||||
?snk_kind := redis_ee_connector_send_done,
|
||||
batch := true,
|
||||
result := {error, _}
|
||||
},
|
||||
10_000
|
||||
)
|
||||
end),
|
||||
#{?snk_kind := redis_ee_connector_send_done, batch := true, result := {ok, _}},
|
||||
10_000
|
||||
)
|
||||
end,
|
||||
?wait_async_action(
|
||||
with_down_failure(Config, ProxyName, fun() ->
|
||||
{_, {ok, _}} =
|
||||
?wait_async_action(
|
||||
lists:foreach(
|
||||
fun(_) ->
|
||||
_ = publish_message(Topic, <<"test_payload">>)
|
||||
end,
|
||||
lists:seq(1, ?BATCH_SIZE)
|
||||
),
|
||||
#{
|
||||
?snk_kind := redis_ee_connector_send_done,
|
||||
batch := true,
|
||||
result := {error, _}
|
||||
},
|
||||
10_000
|
||||
)
|
||||
end),
|
||||
#{?snk_kind := redis_ee_connector_send_done, batch := true, result := {ok, _}},
|
||||
10_000
|
||||
),
|
||||
fun(Trace) ->
|
||||
?assert(
|
||||
?strict_causality(
|
||||
|
@ -340,7 +339,7 @@ with_down_failure(Config, Name, F) ->
|
|||
ProxyHost = ?config(proxy_host, Config),
|
||||
emqx_common_test_helpers:with_failure(down, Name, ProxyHost, ProxyPort, F).
|
||||
|
||||
check_resource_queries(ResourceId, Topic, IsBatch) ->
|
||||
check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
|
||||
RandomPayload = rand:bytes(20),
|
||||
N =
|
||||
case IsBatch of
|
||||
|
@ -348,18 +347,18 @@ check_resource_queries(ResourceId, Topic, IsBatch) ->
|
|||
false -> 1
|
||||
end,
|
||||
?check_trace(
|
||||
begin
|
||||
?wait_async_action(
|
||||
lists:foreach(
|
||||
fun(_) ->
|
||||
_ = publish_message(Topic, RandomPayload)
|
||||
end,
|
||||
lists:seq(1, N)
|
||||
),
|
||||
#{?snk_kind := redis_ee_connector_send_done, batch := IsBatch},
|
||||
1000
|
||||
)
|
||||
end,
|
||||
?wait_async_action(
|
||||
lists:foreach(
|
||||
fun(I) ->
|
||||
IBin = integer_to_binary(I),
|
||||
Topic = <<BaseTopic/binary, "/", IBin/binary>>,
|
||||
_ = publish_message(Topic, RandomPayload)
|
||||
end,
|
||||
lists:seq(1, N)
|
||||
),
|
||||
#{?snk_kind := redis_ee_connector_send_done, batch := IsBatch},
|
||||
5000
|
||||
),
|
||||
fun(Trace) ->
|
||||
AddedMsgCount = length(added_msgs(ResourceId, RandomPayload)),
|
||||
case IsBatch of
|
||||
|
@ -394,6 +393,14 @@ conf_schema(StructName) ->
|
|||
roots => [{root, hoconsc:ref(emqx_ee_bridge_redis, StructName)}]
|
||||
}.
|
||||
|
||||
delete_all_rules() ->
|
||||
lists:foreach(
|
||||
fun(#{id := RuleId}) ->
|
||||
emqx_rule_engine:delete_rule(RuleId)
|
||||
end,
|
||||
emqx_rule_engine:get_rules()
|
||||
).
|
||||
|
||||
delete_all_bridges() ->
|
||||
lists:foreach(
|
||||
fun(#{name := Name, type := Type}) ->
|
||||
|
@ -490,7 +497,8 @@ toxiproxy_redis_bridge_config() ->
|
|||
<<"query_mode">> => <<"async">>,
|
||||
<<"worker_pool_size">> => <<"1">>,
|
||||
<<"batch_size">> => integer_to_binary(?BATCH_SIZE),
|
||||
<<"health_check_interval">> => <<"1s">>
|
||||
<<"health_check_interval">> => <<"1s">>,
|
||||
<<"start_timeout">> => <<"15s">>
|
||||
}
|
||||
},
|
||||
maps:merge(Conf0, ?COMMON_REDIS_OPTS).
|
||||
|
@ -500,8 +508,10 @@ invalid_command_bridge_config() ->
|
|||
Conf1 = maps:merge(Conf0, ?COMMON_REDIS_OPTS),
|
||||
Conf1#{
|
||||
<<"resource_opts">> => #{
|
||||
<<"query_mode">> => <<"sync">>,
|
||||
<<"batch_size">> => <<"1">>,
|
||||
<<"worker_pool_size">> => <<"1">>
|
||||
<<"worker_pool_size">> => <<"1">>,
|
||||
<<"start_timeout">> => <<"15s">>
|
||||
},
|
||||
<<"command_template">> => [<<"BAD">>, <<"COMMAND">>, <<"${payload}">>]
|
||||
}.
|
||||
|
@ -510,12 +520,14 @@ resource_configs() ->
|
|||
#{
|
||||
batch_off => #{
|
||||
<<"query_mode">> => <<"sync">>,
|
||||
<<"batch_size">> => <<"1">>
|
||||
<<"batch_size">> => <<"1">>,
|
||||
<<"start_timeout">> => <<"15s">>
|
||||
},
|
||||
batch_on => #{
|
||||
<<"query_mode">> => <<"async">>,
|
||||
<<"worker_pool_size">> => <<"1">>,
|
||||
<<"batch_size">> => integer_to_binary(?BATCH_SIZE)
|
||||
<<"batch_size">> => integer_to_binary(?BATCH_SIZE),
|
||||
<<"start_timeout">> => <<"15s">>
|
||||
}
|
||||
}.
|
||||
|
||||
|
|
Loading…
Reference in New Issue