fix(iotdb): use the `ping` API to check the status of the IOTDB connections
This commit is contained in:
parent
bb6a95bae8
commit
77239da7ed
|
@ -31,6 +31,7 @@
|
||||||
on_query/3,
|
on_query/3,
|
||||||
on_query_async/4,
|
on_query_async/4,
|
||||||
on_get_status/2,
|
on_get_status/2,
|
||||||
|
on_get_status/3,
|
||||||
on_add_channel/4,
|
on_add_channel/4,
|
||||||
on_remove_channel/3,
|
on_remove_channel/3,
|
||||||
on_get_channels/1,
|
on_get_channels/1,
|
||||||
|
@ -50,7 +51,7 @@
|
||||||
%% for other http-like connectors.
|
%% for other http-like connectors.
|
||||||
-export([redact_request/1]).
|
-export([redact_request/1]).
|
||||||
|
|
||||||
-export([validate_method/1, join_paths/2]).
|
-export([validate_method/1, join_paths/2, formalize_request/3, transform_result/1]).
|
||||||
|
|
||||||
-define(DEFAULT_PIPELINE_SIZE, 100).
|
-define(DEFAULT_PIPELINE_SIZE, 100).
|
||||||
-define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000).
|
-define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000).
|
||||||
|
@ -511,8 +512,11 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
|
||||||
on_get_channels(ResId) ->
|
on_get_channels(ResId) ->
|
||||||
emqx_bridge_v2:get_channels_for_connector(ResId).
|
emqx_bridge_v2:get_channels_for_connector(ResId).
|
||||||
|
|
||||||
on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State) ->
|
on_get_status(InstId, State) ->
|
||||||
case do_get_status(InstId, Timeout) of
|
on_get_status(InstId, State, fun default_health_checker/2).
|
||||||
|
|
||||||
|
on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State, DoPerWorker) ->
|
||||||
|
case do_get_status(InstId, Timeout, DoPerWorker) of
|
||||||
ok ->
|
ok ->
|
||||||
connected;
|
connected;
|
||||||
{error, still_connecting} ->
|
{error, still_connecting} ->
|
||||||
|
@ -522,17 +526,11 @@ on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_get_status(PoolName, Timeout) ->
|
do_get_status(PoolName, Timeout) ->
|
||||||
|
do_get_status(PoolName, Timeout, fun default_health_checker/2).
|
||||||
|
|
||||||
|
do_get_status(PoolName, Timeout, DoPerWorker) ->
|
||||||
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)],
|
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)],
|
||||||
DoPerWorker =
|
try emqx_utils:pmap(fun(Worker) -> DoPerWorker(Worker, Timeout) end, Workers, Timeout) of
|
||||||
fun(Worker) ->
|
|
||||||
case ehttpc:health_check(Worker, Timeout) of
|
|
||||||
ok ->
|
|
||||||
ok;
|
|
||||||
{error, _} = Error ->
|
|
||||||
Error
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
|
|
||||||
[] ->
|
[] ->
|
||||||
{error, still_connecting};
|
{error, still_connecting};
|
||||||
[_ | _] = Results ->
|
[_ | _] = Results ->
|
||||||
|
@ -557,6 +555,14 @@ do_get_status(PoolName, Timeout) ->
|
||||||
{error, timeout}
|
{error, timeout}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
default_health_checker(Worker, Timeout) ->
|
||||||
|
case ehttpc:health_check(Worker, Timeout) of
|
||||||
|
ok ->
|
||||||
|
ok;
|
||||||
|
{error, _} = Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
on_get_channel_status(
|
on_get_channel_status(
|
||||||
InstId,
|
InstId,
|
||||||
_ChannelId,
|
_ChannelId,
|
||||||
|
|
|
@ -66,6 +66,7 @@
|
||||||
-type manager_id() :: binary().
|
-type manager_id() :: binary().
|
||||||
|
|
||||||
-define(CONNECTOR_TYPE, iotdb).
|
-define(CONNECTOR_TYPE, iotdb).
|
||||||
|
-define(IOTDB_PING_PATH, <<"ping">>).
|
||||||
|
|
||||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||||
|
|
||||||
|
@ -238,8 +239,26 @@ on_stop(InstanceId, State) ->
|
||||||
|
|
||||||
-spec on_get_status(manager_id(), state()) ->
|
-spec on_get_status(manager_id(), state()) ->
|
||||||
{connected, state()} | {disconnected, state(), term()}.
|
{connected, state()} | {disconnected, state(), term()}.
|
||||||
on_get_status(InstanceId, State) ->
|
on_get_status(InstanceId, #{base_path := BasePath} = State) ->
|
||||||
emqx_bridge_http_connector:on_get_status(InstanceId, State).
|
Func = fun(Worker, Timeout) ->
|
||||||
|
Request = {?IOTDB_PING_PATH, [], undefined},
|
||||||
|
NRequest = emqx_bridge_http_connector:formalize_request(get, BasePath, Request),
|
||||||
|
Result0 = ehttpc:request(Worker, get, NRequest, Timeout),
|
||||||
|
case emqx_bridge_http_connector:transform_result(Result0) of
|
||||||
|
{ok, 200, _, Body} ->
|
||||||
|
case emqx_utils_json:decode(Body) of
|
||||||
|
#{<<"code">> := 200} ->
|
||||||
|
ok;
|
||||||
|
Json ->
|
||||||
|
{error, {unexpected_status, Json}}
|
||||||
|
end;
|
||||||
|
{error, _} = Error ->
|
||||||
|
Error;
|
||||||
|
Result ->
|
||||||
|
{error, {unexpected_ping_result, Result}}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
emqx_bridge_http_connector:on_get_status(InstanceId, State, Func).
|
||||||
|
|
||||||
-spec on_query(manager_id(), {send_message, map()}, state()) ->
|
-spec on_query(manager_id(), {send_message, map()}, state()) ->
|
||||||
{ok, pos_integer(), [term()], term()}
|
{ok, pos_integer(), [term()], term()}
|
||||||
|
@ -352,13 +371,8 @@ on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) ->
|
||||||
on_get_channels(InstanceId) ->
|
on_get_channels(InstanceId) ->
|
||||||
emqx_bridge_v2:get_channels_for_connector(InstanceId).
|
emqx_bridge_v2:get_channels_for_connector(InstanceId).
|
||||||
|
|
||||||
on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
|
on_get_channel_status(InstanceId, _ChannelId, State) ->
|
||||||
case maps:is_key(ChannelId, Channels) of
|
on_get_status(InstanceId, State).
|
||||||
true ->
|
|
||||||
connected;
|
|
||||||
_ ->
|
|
||||||
{error, not_exists}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
|
|
Loading…
Reference in New Issue