diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src index badddb20f..8c3223e8b 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_greptimedb, [ {description, "EMQX GreptimeDB Bridge"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index 1cd808e46..be52f4469 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -107,6 +107,10 @@ on_start(InstId, Config) -> %% See: greptimedb:start_client/1 start_client(InstId, Config). +on_stop(InstId, #{client := Client}) -> + Res = greptimedb:stop_client(Client), + ?tp(greptimedb_client_stopped, #{instance_id => InstId}), + Res; on_stop(InstId, _State) -> case emqx_resource:get_allocated_resources(InstId) of #{?greptime_client := Client} -> diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl index 96cf0d7c9..f9d778b2b 100644 --- a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl @@ -50,18 +50,15 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - delete_all_bridges(), - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_connector_test_helpers:stop_apps([ - emqx_conf, emqx_bridge, emqx_resource, emqx_rule_engine - ]), - _ = application:stop(emqx_connector), ok. init_per_group(GreptimedbType, Config0) when GreptimedbType =:= grpcv1_tcp; GreptimedbType =:= grpcv1_tls -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), #{ host := GreptimedbHost, port := GreptimedbPort, @@ -89,13 +86,19 @@ init_per_group(GreptimedbType, Config0) when end, case emqx_common_test_helpers:is_tcp_server_available(GreptimedbHost, GreptimedbHttpPort) of true -> - ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), - ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - ok = start_apps(), - {ok, _} = application:ensure_all_started(emqx_connector), - {ok, _} = application:ensure_all_started(greptimedb), - emqx_mgmt_api_test_util:init_suite(), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_bridge_greptimedb, + emqx_bridge, + emqx_rule_engine, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config0)} + ), + {ok, _Api} = emqx_common_test_http:create_default_app(), Config = [{use_tls, UseTLS} | Config0], {Name, ConfigString, GreptimedbConfig} = greptimedb_config( grpcv1, GreptimedbHost, GreptimedbPort, Config @@ -116,6 +119,7 @@ init_per_group(GreptimedbType, Config0) when ], {ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts), [ + {group_apps, Apps}, {proxy_host, ProxyHost}, {proxy_port, ProxyPort}, {proxy_name, ProxyName}, @@ -150,18 +154,21 @@ end_per_group(Group, Config) when Group =:= grpcv1_tcp; Group =:= grpcv1_tls -> + Apps = ?config(group_apps, Config), ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), EHttpcPoolName = ?config(ehttpc_pool_name, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), ehttpc_sup:stop_pool(EHttpcPoolName), - delete_bridge(Config), - _ = application:stop(greptimedb), + emqx_cth_suite:stop(Apps), ok; end_per_group(_Group, _Config) -> ok. init_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), delete_all_rules(), delete_all_bridges(), Config. @@ -179,14 +186,6 @@ end_per_testcase(_Testcase, Config) -> %% Helper fns %%------------------------------------------------------------------------------ -start_apps() -> - %% some configs in emqx_conf app are mandatory - %% we want to make sure they are loaded before - %% ekka start in emqx_common_test_helpers:start_apps/1 - emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]). - example_write_syntax() -> %% N.B.: this single space character is relevant <<"${topic},clientid=${clientid}", " ", "payload=${payload},", @@ -215,6 +214,7 @@ greptimedb_config(grpcv1 = Type, GreptimedbHost, GreptimedbPort, Config) -> " request_ttl = 1s\n" " query_mode = ~s\n" " batch_size = ~b\n" + " health_check_interval = 5s\n" " }\n" " ssl {\n" " enable = ~p\n" @@ -259,6 +259,7 @@ delete_bridge(Config) -> emqx_bridge:remove(Type, Name). delete_all_bridges() -> + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), lists:foreach( fun(#{name := Name, type := Type}) -> emqx_bridge:remove(Type, Name) @@ -692,6 +693,12 @@ t_boolean_variants(Config) -> {ok, _}, create_bridge(Config) ), + ResourceId = resource_id(Config), + ?retry( + _Sleep1 = 1_000, + _Attempts1 = 10, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), BoolVariants = #{ true => true, false => false, @@ -728,14 +735,22 @@ t_boolean_variants(Config) -> async -> ct:sleep(500); sync -> ok end, - PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config), - Expected = #{ - bool => Translation, - int_value => -123, - uint_value => 123, - payload => emqx_utils_json:encode(Payload) - }, - assert_persisted_data(ClientId, Expected, PersistedData), + ?retry( + _Sleep2 = 500, + _Attempts2 = 20, + begin + PersistedData = query_by_clientid( + atom_to_binary(?FUNCTION_NAME), ClientId, Config + ), + Expected = #{ + bool => Translation, + int_value => -123, + uint_value => 123, + payload => emqx_utils_json:encode(Payload) + }, + assert_persisted_data(ClientId, Expected, PersistedData) + end + ), ok end, BoolVariants @@ -841,6 +856,11 @@ t_get_status(Config) -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)) end), + ?retry( + _Sleep = 1_000, + _Attempts = 10, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), ok. t_create_disconnected(Config) -> @@ -859,6 +879,12 @@ t_create_disconnected(Config) -> ok end ), + ResourceId = resource_id(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 10, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), ok. t_start_error(Config) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index c042054e3..c3b746d8e 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -809,17 +809,18 @@ maybe_stop_resource(#data{status = Status} = Data) when Status =/= ?rm_status_st maybe_stop_resource(#data{status = ?rm_status_stopped} = Data) -> Data. -stop_resource(#data{state = ResState, id = ResId} = Data) -> +stop_resource(#data{id = ResId} = Data) -> %% We don't care about the return value of `Mod:on_stop/2'. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. HasAllocatedResources = emqx_resource:has_allocated_resources(ResId), %% Before stop is called we remove all the channels from the resource NewData = remove_channels(Data), - case ResState =/= undefined orelse HasAllocatedResources of + NewResState = NewData#data.state, + case NewResState =/= undefined orelse HasAllocatedResources of true -> %% we clear the allocated resources after stop is successful - emqx_resource:call_stop(NewData#data.id, NewData#data.mod, ResState); + emqx_resource:call_stop(NewData#data.id, NewData#data.mod, NewResState); false -> ok end,