diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 9574fc80a..9ef7c28e8 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -31,6 +31,7 @@ on_query/3, on_query_async/4, on_get_status/2, + on_get_status/3, on_add_channel/4, on_remove_channel/3, on_get_channels/1, @@ -50,7 +51,7 @@ %% for other http-like connectors. -export([redact_request/1]). --export([validate_method/1, join_paths/2]). +-export([validate_method/1, join_paths/2, formalize_request/3, transform_result/1]). -define(DEFAULT_PIPELINE_SIZE, 100). -define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000). @@ -511,8 +512,11 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) -> on_get_channels(ResId) -> emqx_bridge_v2:get_channels_for_connector(ResId). -on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State) -> - case do_get_status(InstId, Timeout) of +on_get_status(InstId, State) -> + on_get_status(InstId, State, fun default_health_checker/2). + +on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State, DoPerWorker) -> + case do_get_status(InstId, Timeout, DoPerWorker) of ok -> connected; {error, still_connecting} -> @@ -522,17 +526,11 @@ on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State end. do_get_status(PoolName, Timeout) -> + do_get_status(PoolName, Timeout, fun default_health_checker/2). + +do_get_status(PoolName, Timeout, DoPerWorker) -> Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)], - DoPerWorker = - fun(Worker) -> - case ehttpc:health_check(Worker, Timeout) of - ok -> - ok; - {error, _} = Error -> - Error - end - end, - try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of + try emqx_utils:pmap(fun(Worker) -> DoPerWorker(Worker, Timeout) end, Workers, Timeout) of [] -> {error, still_connecting}; [_ | _] = Results -> @@ -557,6 +555,14 @@ do_get_status(PoolName, Timeout) -> {error, timeout} end. +default_health_checker(Worker, Timeout) -> + case ehttpc:health_check(Worker, Timeout) of + ok -> + ok; + {error, _} = Error -> + Error + end. + on_get_channel_status( InstId, _ChannelId, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index ccf97f143..d42ccfd71 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -66,6 +66,7 @@ -type manager_id() :: binary(). -define(CONNECTOR_TYPE, iotdb). +-define(IOTDB_PING_PATH, <<"ping">>). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -238,8 +239,26 @@ on_stop(InstanceId, State) -> -spec on_get_status(manager_id(), state()) -> {connected, state()} | {disconnected, state(), term()}. -on_get_status(InstanceId, State) -> - emqx_bridge_http_connector:on_get_status(InstanceId, State). +on_get_status(InstanceId, #{base_path := BasePath} = State) -> + Func = fun(Worker, Timeout) -> + Request = {?IOTDB_PING_PATH, [], undefined}, + NRequest = emqx_bridge_http_connector:formalize_request(get, BasePath, Request), + Result0 = ehttpc:request(Worker, get, NRequest, Timeout), + case emqx_bridge_http_connector:transform_result(Result0) of + {ok, 200, _, Body} -> + case emqx_utils_json:decode(Body) of + #{<<"code">> := 200} -> + ok; + Json -> + {error, {unexpected_status, Json}} + end; + {error, _} = Error -> + Error; + Result -> + {error, {unexpected_ping_result, Result}} + end + end, + emqx_bridge_http_connector:on_get_status(InstanceId, State, Func). -spec on_query(manager_id(), {send_message, map()}, state()) -> {ok, pos_integer(), [term()], term()} @@ -352,13 +371,8 @@ on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) -> on_get_channels(InstanceId) -> emqx_bridge_v2:get_channels_for_connector(InstanceId). -on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> - case maps:is_key(ChannelId, Channels) of - true -> - connected; - _ -> - {error, not_exists} - end. +on_get_channel_status(InstanceId, _ChannelId, State) -> + on_get_status(InstanceId, State). %%-------------------------------------------------------------------- %% Internal Functions