Merge pull request #13295 from thalesmg/test-flaky-greptime-mkII-r57-20240619

test(greptime): attempt to fix flaky tests (attempt without driver patch)
This commit is contained in:
zmstone 2024-06-20 14:49:21 +02:00 committed by GitHub
commit 2a0071aa01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 65 additions and 35 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_greptimedb, [ {application, emqx_bridge_greptimedb, [
{description, "EMQX GreptimeDB Bridge"}, {description, "EMQX GreptimeDB Bridge"},
{vsn, "0.2.0"}, {vsn, "0.2.1"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -107,6 +107,10 @@ on_start(InstId, Config) ->
%% See: greptimedb:start_client/1 %% See: greptimedb:start_client/1
start_client(InstId, Config). start_client(InstId, Config).
on_stop(InstId, #{client := Client}) ->
Res = greptimedb:stop_client(Client),
?tp(greptimedb_client_stopped, #{instance_id => InstId}),
Res;
on_stop(InstId, _State) -> on_stop(InstId, _State) ->
case emqx_resource:get_allocated_resources(InstId) of case emqx_resource:get_allocated_resources(InstId) of
#{?greptime_client := Client} -> #{?greptime_client := Client} ->

View File

@ -50,18 +50,15 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
delete_all_bridges(),
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_connector_test_helpers:stop_apps([
emqx_conf, emqx_bridge, emqx_resource, emqx_rule_engine
]),
_ = application:stop(emqx_connector),
ok. ok.
init_per_group(GreptimedbType, Config0) when init_per_group(GreptimedbType, Config0) when
GreptimedbType =:= grpcv1_tcp; GreptimedbType =:= grpcv1_tcp;
GreptimedbType =:= grpcv1_tls GreptimedbType =:= grpcv1_tls
-> ->
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
#{ #{
host := GreptimedbHost, host := GreptimedbHost,
port := GreptimedbPort, port := GreptimedbPort,
@ -89,13 +86,18 @@ init_per_group(GreptimedbType, Config0) when
end, end,
case emqx_common_test_helpers:is_tcp_server_available(GreptimedbHost, GreptimedbHttpPort) of case emqx_common_test_helpers:is_tcp_server_available(GreptimedbHost, GreptimedbHttpPort) of
true -> true ->
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), Apps = emqx_cth_suite:start(
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), [
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx,
ok = start_apps(), emqx_conf,
{ok, _} = application:ensure_all_started(emqx_connector), emqx_bridge_greptimedb,
{ok, _} = application:ensure_all_started(greptimedb), emqx_bridge,
emqx_mgmt_api_test_util:init_suite(), emqx_rule_engine,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config0)}
),
Config = [{use_tls, UseTLS} | Config0], Config = [{use_tls, UseTLS} | Config0],
{Name, ConfigString, GreptimedbConfig} = greptimedb_config( {Name, ConfigString, GreptimedbConfig} = greptimedb_config(
grpcv1, GreptimedbHost, GreptimedbPort, Config grpcv1, GreptimedbHost, GreptimedbPort, Config
@ -116,6 +118,7 @@ init_per_group(GreptimedbType, Config0) when
], ],
{ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts), {ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts),
[ [
{group_apps, Apps},
{proxy_host, ProxyHost}, {proxy_host, ProxyHost},
{proxy_port, ProxyPort}, {proxy_port, ProxyPort},
{proxy_name, ProxyName}, {proxy_name, ProxyName},
@ -150,18 +153,21 @@ end_per_group(Group, Config) when
Group =:= grpcv1_tcp; Group =:= grpcv1_tcp;
Group =:= grpcv1_tls Group =:= grpcv1_tls
-> ->
Apps = ?config(group_apps, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
EHttpcPoolName = ?config(ehttpc_pool_name, Config), EHttpcPoolName = ?config(ehttpc_pool_name, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ehttpc_sup:stop_pool(EHttpcPoolName), ehttpc_sup:stop_pool(EHttpcPoolName),
delete_bridge(Config), emqx_cth_suite:stop(Apps),
_ = application:stop(greptimedb),
ok; ok;
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
delete_all_rules(), delete_all_rules(),
delete_all_bridges(), delete_all_bridges(),
Config. Config.
@ -179,14 +185,6 @@ end_per_testcase(_Testcase, Config) ->
%% Helper fns %% Helper fns
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
start_apps() ->
%% some configs in emqx_conf app are mandatory
%% we want to make sure they are loaded before
%% ekka start in emqx_common_test_helpers:start_apps/1
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]).
example_write_syntax() -> example_write_syntax() ->
%% N.B.: this single space character is relevant %% N.B.: this single space character is relevant
<<"${topic},clientid=${clientid}", " ", "payload=${payload},", <<"${topic},clientid=${clientid}", " ", "payload=${payload},",
@ -215,6 +213,7 @@ greptimedb_config(grpcv1 = Type, GreptimedbHost, GreptimedbPort, Config) ->
" request_ttl = 1s\n" " request_ttl = 1s\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" health_check_interval = 5s\n"
" }\n" " }\n"
" ssl {\n" " ssl {\n"
" enable = ~p\n" " enable = ~p\n"
@ -259,6 +258,7 @@ delete_bridge(Config) ->
emqx_bridge:remove(Type, Name). emqx_bridge:remove(Type, Name).
delete_all_bridges() -> delete_all_bridges() ->
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
lists:foreach( lists:foreach(
fun(#{name := Name, type := Type}) -> fun(#{name := Name, type := Type}) ->
emqx_bridge:remove(Type, Name) emqx_bridge:remove(Type, Name)
@ -692,6 +692,12 @@ t_boolean_variants(Config) ->
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
ResourceId = resource_id(Config),
?retry(
_Sleep1 = 1_000,
_Attempts1 = 10,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
BoolVariants = #{ BoolVariants = #{
true => true, true => true,
false => false, false => false,
@ -728,14 +734,22 @@ t_boolean_variants(Config) ->
async -> ct:sleep(500); async -> ct:sleep(500);
sync -> ok sync -> ok
end, end,
PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config), ?retry(
Expected = #{ _Sleep2 = 500,
bool => Translation, _Attempts2 = 20,
int_value => -123, begin
uint_value => 123, PersistedData = query_by_clientid(
payload => emqx_utils_json:encode(Payload) atom_to_binary(?FUNCTION_NAME), ClientId, Config
}, ),
assert_persisted_data(ClientId, Expected, PersistedData), Expected = #{
bool => Translation,
int_value => -123,
uint_value => 123,
payload => emqx_utils_json:encode(Payload)
},
assert_persisted_data(ClientId, Expected, PersistedData)
end
),
ok ok
end, end,
BoolVariants BoolVariants
@ -841,6 +855,11 @@ t_get_status(Config) ->
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)) ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
end), end),
?retry(
_Sleep = 1_000,
_Attempts = 10,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
ok. ok.
t_create_disconnected(Config) -> t_create_disconnected(Config) ->
@ -859,6 +878,12 @@ t_create_disconnected(Config) ->
ok ok
end end
), ),
ResourceId = resource_id(Config),
?retry(
_Sleep = 1_000,
_Attempts = 10,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
ok. ok.
t_start_error(Config) -> t_start_error(Config) ->

View File

@ -809,17 +809,18 @@ maybe_stop_resource(#data{status = Status} = Data) when Status =/= ?rm_status_st
maybe_stop_resource(#data{status = ?rm_status_stopped} = Data) -> maybe_stop_resource(#data{status = ?rm_status_stopped} = Data) ->
Data. Data.
stop_resource(#data{state = ResState, id = ResId} = Data) -> stop_resource(#data{id = ResId} = Data) ->
%% We don't care about the return value of `Mod:on_stop/2'. %% We don't care about the return value of `Mod:on_stop/2'.
%% The callback mod should make sure the resource is stopped after on_stop/2 %% The callback mod should make sure the resource is stopped after on_stop/2
%% is returned. %% is returned.
HasAllocatedResources = emqx_resource:has_allocated_resources(ResId), HasAllocatedResources = emqx_resource:has_allocated_resources(ResId),
%% Before stop is called we remove all the channels from the resource %% Before stop is called we remove all the channels from the resource
NewData = remove_channels(Data), NewData = remove_channels(Data),
case ResState =/= undefined orelse HasAllocatedResources of NewResState = NewData#data.state,
case NewResState =/= undefined orelse HasAllocatedResources of
true -> true ->
%% we clear the allocated resources after stop is successful %% we clear the allocated resources after stop is successful
emqx_resource:call_stop(NewData#data.id, NewData#data.mod, ResState); emqx_resource:call_stop(NewData#data.id, NewData#data.mod, NewResState);
false -> false ->
ok ok
end, end,