From 77239da7ed8279faedf6b380caada1f81b9377da Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 27 Feb 2024 22:07:41 +0800 Subject: [PATCH 1/3] fix(iotdb): use the `ping` API to check the status of the IOTDB connections --- .../src/emqx_bridge_http_connector.erl | 32 +++++++++++-------- .../src/emqx_bridge_iotdb_connector.erl | 32 +++++++++++++------ 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 9574fc80a..9ef7c28e8 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -31,6 +31,7 @@ on_query/3, on_query_async/4, on_get_status/2, + on_get_status/3, on_add_channel/4, on_remove_channel/3, on_get_channels/1, @@ -50,7 +51,7 @@ %% for other http-like connectors. -export([redact_request/1]). --export([validate_method/1, join_paths/2]). +-export([validate_method/1, join_paths/2, formalize_request/3, transform_result/1]). -define(DEFAULT_PIPELINE_SIZE, 100). -define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000). @@ -511,8 +512,11 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) -> on_get_channels(ResId) -> emqx_bridge_v2:get_channels_for_connector(ResId). -on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State) -> - case do_get_status(InstId, Timeout) of +on_get_status(InstId, State) -> + on_get_status(InstId, State, fun default_health_checker/2). + +on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State, DoPerWorker) -> + case do_get_status(InstId, Timeout, DoPerWorker) of ok -> connected; {error, still_connecting} -> @@ -522,17 +526,11 @@ on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State end. do_get_status(PoolName, Timeout) -> + do_get_status(PoolName, Timeout, fun default_health_checker/2). + +do_get_status(PoolName, Timeout, DoPerWorker) -> Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)], - DoPerWorker = - fun(Worker) -> - case ehttpc:health_check(Worker, Timeout) of - ok -> - ok; - {error, _} = Error -> - Error - end - end, - try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of + try emqx_utils:pmap(fun(Worker) -> DoPerWorker(Worker, Timeout) end, Workers, Timeout) of [] -> {error, still_connecting}; [_ | _] = Results -> @@ -557,6 +555,14 @@ do_get_status(PoolName, Timeout) -> {error, timeout} end. +default_health_checker(Worker, Timeout) -> + case ehttpc:health_check(Worker, Timeout) of + ok -> + ok; + {error, _} = Error -> + Error + end. + on_get_channel_status( InstId, _ChannelId, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index ccf97f143..d42ccfd71 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -66,6 +66,7 @@ -type manager_id() :: binary(). -define(CONNECTOR_TYPE, iotdb). +-define(IOTDB_PING_PATH, <<"ping">>). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -238,8 +239,26 @@ on_stop(InstanceId, State) -> -spec on_get_status(manager_id(), state()) -> {connected, state()} | {disconnected, state(), term()}. -on_get_status(InstanceId, State) -> - emqx_bridge_http_connector:on_get_status(InstanceId, State). +on_get_status(InstanceId, #{base_path := BasePath} = State) -> + Func = fun(Worker, Timeout) -> + Request = {?IOTDB_PING_PATH, [], undefined}, + NRequest = emqx_bridge_http_connector:formalize_request(get, BasePath, Request), + Result0 = ehttpc:request(Worker, get, NRequest, Timeout), + case emqx_bridge_http_connector:transform_result(Result0) of + {ok, 200, _, Body} -> + case emqx_utils_json:decode(Body) of + #{<<"code">> := 200} -> + ok; + Json -> + {error, {unexpected_status, Json}} + end; + {error, _} = Error -> + Error; + Result -> + {error, {unexpected_ping_result, Result}} + end + end, + emqx_bridge_http_connector:on_get_status(InstanceId, State, Func). -spec on_query(manager_id(), {send_message, map()}, state()) -> {ok, pos_integer(), [term()], term()} @@ -352,13 +371,8 @@ on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) -> on_get_channels(InstanceId) -> emqx_bridge_v2:get_channels_for_connector(InstanceId). -on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> - case maps:is_key(ChannelId, Channels) of - true -> - connected; - _ -> - {error, not_exists} - end. +on_get_channel_status(InstanceId, _ChannelId, State) -> + on_get_status(InstanceId, State). %%-------------------------------------------------------------------- %% Internal Functions From c3a2cf62206c14593b454868245fe9d9084735c6 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 28 Feb 2024 09:39:46 +0800 Subject: [PATCH 2/3] chore: update change && bump version --- apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src | 2 +- .../src/emqx_bridge_iotdb_connector.erl | 9 ++++++++- changes/ee/feat-12602.en.md | 1 + 3 files changed, 10 insertions(+), 2 deletions(-) create mode 100644 changes/ee/feat-12602.en.md diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index 4fd96d5e7..86d2a93b3 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_connector diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index d42ccfd71..0090e472a 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -70,6 +70,8 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). +-dialyzer({no_match, [on_get_channel_status/3]}). + %%------------------------------------------------------------------------------------- %% connector examples %%------------------------------------------------------------------------------------- @@ -372,7 +374,12 @@ on_get_channels(InstanceId) -> emqx_bridge_v2:get_channels_for_connector(InstanceId). on_get_channel_status(InstanceId, _ChannelId, State) -> - on_get_status(InstanceId, State). + case on_get_status(InstanceId, State) of + connected -> + connected; + _ -> + disconnected + end. %%-------------------------------------------------------------------- %% Internal Functions diff --git a/changes/ee/feat-12602.en.md b/changes/ee/feat-12602.en.md new file mode 100644 index 000000000..2a62dfb75 --- /dev/null +++ b/changes/ee/feat-12602.en.md @@ -0,0 +1 @@ +Enhanced health checking for IOTDB connector, using its `ping` API replacing the old method which only validating via a socket connection. From 0bebd66f056d4194fae43b84dc06aec4615ac8be Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 29 Feb 2024 10:51:12 +0800 Subject: [PATCH 3/3] fix(iotdb): make dialyzer happy --- .../src/emqx_bridge_iotdb_connector.erl | 11 +++++------ changes/ee/feat-12602.en.md | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 0090e472a..e68520f47 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -9,6 +9,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). %% `emqx_resource' API -export([ @@ -70,8 +71,6 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). --dialyzer({no_match, [on_get_channel_status/3]}). - %%------------------------------------------------------------------------------------- %% connector examples %%------------------------------------------------------------------------------------- @@ -240,7 +239,7 @@ on_stop(InstanceId, State) -> Res. -spec on_get_status(manager_id(), state()) -> - {connected, state()} | {disconnected, state(), term()}. + connected | connecting | {disconnected, state(), term()}. on_get_status(InstanceId, #{base_path := BasePath} = State) -> Func = fun(Worker, Timeout) -> Request = {?IOTDB_PING_PATH, [], undefined}, @@ -375,10 +374,10 @@ on_get_channels(InstanceId) -> on_get_channel_status(InstanceId, _ChannelId, State) -> case on_get_status(InstanceId, State) of - connected -> - connected; + ?status_connected -> + ?status_connected; _ -> - disconnected + ?status_disconnected end. %%-------------------------------------------------------------------- diff --git a/changes/ee/feat-12602.en.md b/changes/ee/feat-12602.en.md index 2a62dfb75..b5171b9e1 100644 --- a/changes/ee/feat-12602.en.md +++ b/changes/ee/feat-12602.en.md @@ -1 +1 @@ -Enhanced health checking for IOTDB connector, using its `ping` API replacing the old method which only validating via a socket connection. +Enhanced health checking for IoTDB connector, using its `ping` API instead of just checking for an existing socket connection.