From 61488ec407fef3b34837277c4432a8224afa3eb1 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 9 Jun 2023 16:04:38 +0200 Subject: [PATCH 1/3] fix: lost messages when clickhouse closes while sending messages Fixes: https://emqx.atlassian.net/browse/EMQX-10217 --- .../src/emqx_bridge_clickhouse_connector.erl | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 211e41174..ceaf42a64 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -465,8 +465,19 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) -> reason => ClickhouseErrorResult }), case ClickhouseErrorResult of + %% TODO: The hackeny errors that the clickhouse library forwards are + %% very loosely defined. We should try to make sure that the following + %% handles all error cases that we need to handle as recoverable_error {error, ecpool_empty} -> {error, {recoverable_error, ecpool_empty}}; + {error, econnrefused} -> + {error, {recoverable_error, econnrefused}}; + {error, closed} -> + {error, {recoverable_error, closed}}; + {error, {closed, PartialBody}} -> + {error, {recoverable_error, {closed_partial_body, PartialBody}}}; + {error, disconnected} -> + {error, {recoverable_error, disconnected}}; _ -> {error, ClickhouseErrorResult} end. From c6eb1af82ce99953a9c909cbce2f37f2bd080c08 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 9 Jun 2023 16:17:13 +0200 Subject: [PATCH 2/3] docs: add changelog entry --- changes/ee/fix-10997.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/fix-10997.en.md diff --git a/changes/ee/fix-10997.en.md b/changes/ee/fix-10997.en.md new file mode 100644 index 000000000..08ccfbe43 --- /dev/null +++ b/changes/ee/fix-10997.en.md @@ -0,0 +1 @@ +The ClickHouse bridge had a problem that could cause messages to be dropped when the ClickHouse server is closed while sending messages even when the request_ttl is set to infinity. This has been fixed by treating errors due to a closed connection as recoverable errors. From 1b3af2ac91d7a57858dcd01cafc1bbf32956204f Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 9 Jun 2023 17:22:16 +0200 Subject: [PATCH 3/3] refactor: nicer handling of errors and warning log for recoverable errors --- .../src/emqx_bridge_clickhouse_connector.erl | 68 +++++++++++++++---- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index ceaf42a64..857a7c119 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -450,10 +450,14 @@ execute_sql_in_clickhouse_server_using_connection(Connection, SQL) -> %% This function transforms the result received from clickhouse to something %% that is a little bit more readable and creates approprieate log messages -transform_and_log_clickhouse_result({ok, 200, <<"">>} = _ClickhouseResult, _, _) -> +transform_and_log_clickhouse_result({ok, ResponseCode, <<"">>} = _ClickhouseResult, _, _) when + ResponseCode =:= 200; ResponseCode =:= 204 +-> snabbkaffe_log_return(ok), ok; -transform_and_log_clickhouse_result({ok, 200, Data}, _, _) -> +transform_and_log_clickhouse_result({ok, ResponseCode, Data}, _, _) when + ResponseCode =:= 200; ResponseCode =:= 204 +-> Result = {ok, Data}, snabbkaffe_log_return(Result), Result; @@ -464,24 +468,58 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) -> sql => SQL, reason => ClickhouseErrorResult }), - case ClickhouseErrorResult of + case is_recoverable_error(ClickhouseErrorResult) of %% TODO: The hackeny errors that the clickhouse library forwards are %% very loosely defined. We should try to make sure that the following %% handles all error cases that we need to handle as recoverable_error - {error, ecpool_empty} -> - {error, {recoverable_error, ecpool_empty}}; - {error, econnrefused} -> - {error, {recoverable_error, econnrefused}}; - {error, closed} -> - {error, {recoverable_error, closed}}; - {error, {closed, PartialBody}} -> - {error, {recoverable_error, {closed_partial_body, PartialBody}}}; - {error, disconnected} -> - {error, {recoverable_error, disconnected}}; - _ -> - {error, ClickhouseErrorResult} + true -> + ?SLOG(warning, #{ + msg => "clickhouse connector: sql query failed (recoverable)", + recoverable_error => true, + connector => ResourceID, + sql => SQL, + reason => ClickhouseErrorResult + }), + to_recoverable_error(ClickhouseErrorResult); + false -> + ?SLOG(error, #{ + msg => "clickhouse connector: sql query failed (unrecoverable)", + recoverable_error => false, + connector => ResourceID, + sql => SQL, + reason => ClickhouseErrorResult + }), + to_error_tuple(ClickhouseErrorResult) end. +to_recoverable_error({error, Reason}) -> + {error, {recoverable_error, Reason}}; +to_recoverable_error(Error) -> + {error, {recoverable_error, Error}}. + +to_error_tuple({error, Reason}) -> + {error, {unrecoverable_error, Reason}}; +to_error_tuple(Error) -> + {error, {unrecoverable_error, Error}}. + +is_recoverable_error({error, Reason}) -> + is_recoverable_error_reason(Reason); +is_recoverable_error(_) -> + false. + +is_recoverable_error_reason(ecpool_empty) -> + true; +is_recoverable_error_reason(econnrefused) -> + true; +is_recoverable_error_reason(closed) -> + true; +is_recoverable_error_reason({closed, _PartialBody}) -> + true; +is_recoverable_error_reason(disconnected) -> + true; +is_recoverable_error_reason(_) -> + false. + snabbkaffe_log_return(_Result) -> ?tp( clickhouse_connector_query_return,