Merge pull request #12602 from lafirest/fix/iotdb_ping

fix(iotdb): use the `ping` API to check the status of the IOTDB connections
This commit is contained in:
lafirest 2024-02-29 12:26:10 +08:00 committed by GitHub
commit d2dda2535b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 49 additions and 22 deletions

View File

@ -31,6 +31,7 @@
on_query/3,
on_query_async/4,
on_get_status/2,
on_get_status/3,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
@ -50,7 +51,7 @@
%% for other http-like connectors.
-export([redact_request/1]).
-export([validate_method/1, join_paths/2]).
-export([validate_method/1, join_paths/2, formalize_request/3, transform_result/1]).
-define(DEFAULT_PIPELINE_SIZE, 100).
-define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000).
@ -511,8 +512,11 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State) ->
case do_get_status(InstId, Timeout) of
on_get_status(InstId, State) ->
on_get_status(InstId, State, fun default_health_checker/2).
on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State, DoPerWorker) ->
case do_get_status(InstId, Timeout, DoPerWorker) of
ok ->
connected;
{error, still_connecting} ->
@ -522,17 +526,11 @@ on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State
end.
do_get_status(PoolName, Timeout) ->
do_get_status(PoolName, Timeout, fun default_health_checker/2).
do_get_status(PoolName, Timeout, DoPerWorker) ->
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)],
DoPerWorker =
fun(Worker) ->
case ehttpc:health_check(Worker, Timeout) of
ok ->
ok;
{error, _} = Error ->
Error
end
end,
try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
try emqx_utils:pmap(fun(Worker) -> DoPerWorker(Worker, Timeout) end, Workers, Timeout) of
[] ->
{error, still_connecting};
[_ | _] = Results ->
@ -557,6 +555,14 @@ do_get_status(PoolName, Timeout) ->
{error, timeout}
end.
default_health_checker(Worker, Timeout) ->
case ehttpc:health_check(Worker, Timeout) of
ok ->
ok;
{error, _} = Error ->
Error
end.
on_get_channel_status(
InstId,
_ChannelId,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_iotdb, [
{description, "EMQX Enterprise Apache IoTDB Bridge"},
{vsn, "0.1.5"},
{vsn, "0.1.6"},
{modules, [
emqx_bridge_iotdb,
emqx_bridge_iotdb_connector

View File

@ -9,6 +9,7 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
%% `emqx_resource' API
-export([
@ -66,6 +67,7 @@
-type manager_id() :: binary().
-define(CONNECTOR_TYPE, iotdb).
-define(IOTDB_PING_PATH, <<"ping">>).
-import(hoconsc, [mk/2, enum/1, ref/2]).
@ -237,9 +239,27 @@ on_stop(InstanceId, State) ->
Res.
-spec on_get_status(manager_id(), state()) ->
{connected, state()} | {disconnected, state(), term()}.
on_get_status(InstanceId, State) ->
emqx_bridge_http_connector:on_get_status(InstanceId, State).
connected | connecting | {disconnected, state(), term()}.
on_get_status(InstanceId, #{base_path := BasePath} = State) ->
Func = fun(Worker, Timeout) ->
Request = {?IOTDB_PING_PATH, [], undefined},
NRequest = emqx_bridge_http_connector:formalize_request(get, BasePath, Request),
Result0 = ehttpc:request(Worker, get, NRequest, Timeout),
case emqx_bridge_http_connector:transform_result(Result0) of
{ok, 200, _, Body} ->
case emqx_utils_json:decode(Body) of
#{<<"code">> := 200} ->
ok;
Json ->
{error, {unexpected_status, Json}}
end;
{error, _} = Error ->
Error;
Result ->
{error, {unexpected_ping_result, Result}}
end
end,
emqx_bridge_http_connector:on_get_status(InstanceId, State, Func).
-spec on_query(manager_id(), {send_message, map()}, state()) ->
{ok, pos_integer(), [term()], term()}
@ -352,12 +372,12 @@ on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) ->
on_get_channels(InstanceId) ->
emqx_bridge_v2:get_channels_for_connector(InstanceId).
on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
case maps:is_key(ChannelId, Channels) of
true ->
connected;
on_get_channel_status(InstanceId, _ChannelId, State) ->
case on_get_status(InstanceId, State) of
?status_connected ->
?status_connected;
_ ->
{error, not_exists}
?status_disconnected
end.
%%--------------------------------------------------------------------

View File

@ -0,0 +1 @@
Enhanced health checking for IoTDB connector, using its `ping` API instead of just checking for an existing socket connection.