chore(bridges): reduce Redis bridge flakyness

This commit is contained in:
Ilya Averyanov 2023-01-18 14:34:11 +02:00
parent 44a6e5ed15
commit f6fbbf3ee3
2 changed files with 53 additions and 41 deletions

View File

@ -52,7 +52,7 @@ down:
down --remove-orphans down --remove-orphans
ct: ct:
docker exec -i "$(CONTAINER)" bash -c "rebar3 ct --name 'test@127.0.0.1' -v --suite $(SUITE)" docker exec -i "$(CONTAINER)" bash -c "rebar3 ct --name 'test@127.0.0.1' --readable true -v --suite $(SUITE)"
ct-all: ct-all:
docker exec -i "$(CONTAINER)" bash -c "make ct" docker exec -i "$(CONTAINER)" bash -c "make ct"

View File

@ -137,6 +137,7 @@ end_per_suite(_Config) ->
ok. ok.
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
ok = delete_all_rules(),
ok = delete_all_bridges(), ok = delete_all_bridges(),
case ?config(transport_type, Config) of case ?config(transport_type, Config) of
undefined -> undefined ->
@ -248,29 +249,27 @@ t_check_replay(Config) ->
), ),
?check_trace( ?check_trace(
begin ?wait_async_action(
?wait_async_action( with_down_failure(Config, ProxyName, fun() ->
with_down_failure(Config, ProxyName, fun() -> {_, {ok, _}} =
{_, {ok, _}} = ?wait_async_action(
?wait_async_action( lists:foreach(
lists:foreach( fun(_) ->
fun(_) -> _ = publish_message(Topic, <<"test_payload">>)
_ = publish_message(Topic, <<"test_payload">>) end,
end, lists:seq(1, ?BATCH_SIZE)
lists:seq(1, ?BATCH_SIZE) ),
), #{
#{ ?snk_kind := redis_ee_connector_send_done,
?snk_kind := redis_ee_connector_send_done, batch := true,
batch := true, result := {error, _}
result := {error, _} },
}, 10_000
10_000 )
) end),
end), #{?snk_kind := redis_ee_connector_send_done, batch := true, result := {ok, _}},
#{?snk_kind := redis_ee_connector_send_done, batch := true, result := {ok, _}}, 10_000
10_000 ),
)
end,
fun(Trace) -> fun(Trace) ->
?assert( ?assert(
?strict_causality( ?strict_causality(
@ -340,7 +339,7 @@ with_down_failure(Config, Name, F) ->
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
emqx_common_test_helpers:with_failure(down, Name, ProxyHost, ProxyPort, F). emqx_common_test_helpers:with_failure(down, Name, ProxyHost, ProxyPort, F).
check_resource_queries(ResourceId, Topic, IsBatch) -> check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
RandomPayload = rand:bytes(20), RandomPayload = rand:bytes(20),
N = N =
case IsBatch of case IsBatch of
@ -348,18 +347,18 @@ check_resource_queries(ResourceId, Topic, IsBatch) ->
false -> 1 false -> 1
end, end,
?check_trace( ?check_trace(
begin ?wait_async_action(
?wait_async_action( lists:foreach(
lists:foreach( fun(I) ->
fun(_) -> IBin = integer_to_binary(I),
_ = publish_message(Topic, RandomPayload) Topic = <<BaseTopic/binary, "/", IBin/binary>>,
end, _ = publish_message(Topic, RandomPayload)
lists:seq(1, N) end,
), lists:seq(1, N)
#{?snk_kind := redis_ee_connector_send_done, batch := IsBatch}, ),
1000 #{?snk_kind := redis_ee_connector_send_done, batch := IsBatch},
) 5000
end, ),
fun(Trace) -> fun(Trace) ->
AddedMsgCount = length(added_msgs(ResourceId, RandomPayload)), AddedMsgCount = length(added_msgs(ResourceId, RandomPayload)),
case IsBatch of case IsBatch of
@ -394,6 +393,14 @@ conf_schema(StructName) ->
roots => [{root, hoconsc:ref(emqx_ee_bridge_redis, StructName)}] roots => [{root, hoconsc:ref(emqx_ee_bridge_redis, StructName)}]
}. }.
delete_all_rules() ->
lists:foreach(
fun(#{id := RuleId}) ->
emqx_rule_engine:delete_rule(RuleId)
end,
emqx_rule_engine:get_rules()
).
delete_all_bridges() -> delete_all_bridges() ->
lists:foreach( lists:foreach(
fun(#{name := Name, type := Type}) -> fun(#{name := Name, type := Type}) ->
@ -490,7 +497,8 @@ toxiproxy_redis_bridge_config() ->
<<"query_mode">> => <<"async">>, <<"query_mode">> => <<"async">>,
<<"worker_pool_size">> => <<"1">>, <<"worker_pool_size">> => <<"1">>,
<<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"batch_size">> => integer_to_binary(?BATCH_SIZE),
<<"health_check_interval">> => <<"1s">> <<"health_check_interval">> => <<"1s">>,
<<"start_timeout">> => <<"15s">>
} }
}, },
maps:merge(Conf0, ?COMMON_REDIS_OPTS). maps:merge(Conf0, ?COMMON_REDIS_OPTS).
@ -500,8 +508,10 @@ invalid_command_bridge_config() ->
Conf1 = maps:merge(Conf0, ?COMMON_REDIS_OPTS), Conf1 = maps:merge(Conf0, ?COMMON_REDIS_OPTS),
Conf1#{ Conf1#{
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"query_mode">> => <<"sync">>,
<<"batch_size">> => <<"1">>, <<"batch_size">> => <<"1">>,
<<"worker_pool_size">> => <<"1">> <<"worker_pool_size">> => <<"1">>,
<<"start_timeout">> => <<"15s">>
}, },
<<"command_template">> => [<<"BAD">>, <<"COMMAND">>, <<"${payload}">>] <<"command_template">> => [<<"BAD">>, <<"COMMAND">>, <<"${payload}">>]
}. }.
@ -510,12 +520,14 @@ resource_configs() ->
#{ #{
batch_off => #{ batch_off => #{
<<"query_mode">> => <<"sync">>, <<"query_mode">> => <<"sync">>,
<<"batch_size">> => <<"1">> <<"batch_size">> => <<"1">>,
<<"start_timeout">> => <<"15s">>
}, },
batch_on => #{ batch_on => #{
<<"query_mode">> => <<"async">>, <<"query_mode">> => <<"async">>,
<<"worker_pool_size">> => <<"1">>, <<"worker_pool_size">> => <<"1">>,
<<"batch_size">> => integer_to_binary(?BATCH_SIZE) <<"batch_size">> => integer_to_binary(?BATCH_SIZE),
<<"start_timeout">> => <<"15s">>
} }
}. }.