Merge pull request #12688 from lafirest/fix/tdengine
fix(tdengine): enhanced health check result, make it more sense
This commit is contained in:
commit
35fb6ee656
|
@ -6,10 +6,11 @@
|
||||||
|
|
||||||
-behaviour(emqx_resource).
|
-behaviour(emqx_resource).
|
||||||
|
|
||||||
-include_lib("typerefl/include/types.hrl").
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
||||||
-include_lib("hocon/include/hoconsc.hrl").
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
|
||||||
-export([namespace/0, roots/0, fields/1, desc/1]).
|
-export([namespace/0, roots/0, fields/1, desc/1]).
|
||||||
|
|
||||||
|
@ -209,18 +210,50 @@ on_batch_query(InstanceId, BatchReq, State) ->
|
||||||
?SLOG(error, LogMeta#{msg => "invalid_request"}),
|
?SLOG(error, LogMeta#{msg => "invalid_request"}),
|
||||||
{error, {unrecoverable_error, invalid_request}}.
|
{error, {unrecoverable_error, invalid_request}}.
|
||||||
|
|
||||||
on_get_status(_InstanceId, #{pool_name := PoolName}) ->
|
on_get_status(_InstanceId, #{pool_name := PoolName} = State) ->
|
||||||
Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1),
|
case
|
||||||
status_result(Health).
|
emqx_resource_pool:health_check_workers(
|
||||||
|
PoolName,
|
||||||
do_get_status(Conn) ->
|
fun ?MODULE:do_get_status/1,
|
||||||
case tdengine:insert(Conn, "select server_version()", []) of
|
emqx_resource_pool:health_check_timeout(),
|
||||||
{ok, _} -> true;
|
#{return_values => true}
|
||||||
_ -> false
|
)
|
||||||
|
of
|
||||||
|
{ok, []} ->
|
||||||
|
{?status_connecting, State, undefined};
|
||||||
|
{ok, Values} ->
|
||||||
|
case lists:keyfind(error, 1, Values) of
|
||||||
|
false ->
|
||||||
|
?status_connected;
|
||||||
|
{error, Reason} ->
|
||||||
|
{?status_connecting, State, enhance_reason(Reason)}
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
{?status_connecting, State, enhance_reason(Reason)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
status_result(_Status = true) -> connected;
|
do_get_status(Conn) ->
|
||||||
status_result(_Status = false) -> connecting.
|
try
|
||||||
|
tdengine:insert(
|
||||||
|
Conn,
|
||||||
|
"select server_version()",
|
||||||
|
[],
|
||||||
|
emqx_resource_pool:health_check_timeout()
|
||||||
|
)
|
||||||
|
of
|
||||||
|
{ok, _} ->
|
||||||
|
true;
|
||||||
|
{error, _} = Error ->
|
||||||
|
Error
|
||||||
|
catch
|
||||||
|
_Type:Reason ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
enhance_reason(timeout) ->
|
||||||
|
connection_timeout;
|
||||||
|
enhance_reason(Reason) ->
|
||||||
|
Reason.
|
||||||
|
|
||||||
on_add_channel(
|
on_add_channel(
|
||||||
_InstanceId,
|
_InstanceId,
|
||||||
|
@ -253,7 +286,12 @@ on_get_channels(InstanceId) ->
|
||||||
on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) ->
|
on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) ->
|
||||||
case maps:is_key(ChannelId, Channels) of
|
case maps:is_key(ChannelId, Channels) of
|
||||||
true ->
|
true ->
|
||||||
on_get_status(InstanceId, State);
|
case on_get_status(InstanceId, State) of
|
||||||
|
{Status, _State, Reason} ->
|
||||||
|
{Status, Reason};
|
||||||
|
Status ->
|
||||||
|
Status
|
||||||
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
{error, not_exists}
|
{error, not_exists}
|
||||||
end.
|
end.
|
||||||
|
|
Loading…
Reference in New Issue