From a58ee801b210ee1f029a12287b9d0c6933db3cb0 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 13 Mar 2024 13:58:36 +0800 Subject: [PATCH] fix(tdengine): enhanced health check result, make it more sense --- .../src/emqx_bridge_tdengine_connector.erl | 66 +++++++++++++++---- 1 file changed, 52 insertions(+), 14 deletions(-) diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 7d2815f54..f0c3a6e35 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -6,10 +6,11 @@ -behaviour(emqx_resource). --include_lib("typerefl/include/types.hrl"). --include_lib("emqx/include/logger.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -export([namespace/0, roots/0, fields/1, desc/1]). @@ -209,18 +210,50 @@ on_batch_query(InstanceId, BatchReq, State) -> ?SLOG(error, LogMeta#{msg => "invalid_request"}), {error, {unrecoverable_error, invalid_request}}. -on_get_status(_InstanceId, #{pool_name := PoolName}) -> - Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), - status_result(Health). - -do_get_status(Conn) -> - case tdengine:insert(Conn, "select server_version()", []) of - {ok, _} -> true; - _ -> false +on_get_status(_InstanceId, #{pool_name := PoolName} = State) -> + case + emqx_resource_pool:health_check_workers( + PoolName, + fun ?MODULE:do_get_status/1, + emqx_resource_pool:health_check_timeout(), + #{return_values => true} + ) + of + {ok, []} -> + {?status_connecting, State, undefined}; + {ok, Values} -> + case lists:keyfind(error, 1, Values) of + false -> + ?status_connected; + {error, Reason} -> + {?status_connecting, State, enhance_reason(Reason)} + end; + {error, Reason} -> + {?status_connecting, State, enhance_reason(Reason)} end. -status_result(_Status = true) -> connected; -status_result(_Status = false) -> connecting. +do_get_status(Conn) -> + try + tdengine:insert( + Conn, + "select server_version()", + [], + emqx_resource_pool:health_check_timeout() + ) + of + {ok, _} -> + true; + {error, _} = Error -> + Error + catch + _Type:Reason -> + {error, Reason} + end. + +enhance_reason(timeout) -> + connection_timeout; +enhance_reason(Reason) -> + Reason. on_add_channel( _InstanceId, @@ -253,7 +286,12 @@ on_get_channels(InstanceId) -> on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) -> case maps:is_key(ChannelId, Channels) of true -> - on_get_status(InstanceId, State); + case on_get_status(InstanceId, State) of + {Status, _State, Reason} -> + {Status, Reason}; + Status -> + Status + end; _ -> {error, not_exists} end.