diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 2d9af6bee..d0164b57c 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -450,10 +450,14 @@ execute_sql_in_clickhouse_server_using_connection(Connection, SQL) -> %% This function transforms the result received from clickhouse to something %% that is a little bit more readable and creates approprieate log messages -transform_and_log_clickhouse_result({ok, 200, <<"">>} = _ClickhouseResult, _, _) -> +transform_and_log_clickhouse_result({ok, ResponseCode, <<"">>} = _ClickhouseResult, _, _) when + ResponseCode =:= 200; ResponseCode =:= 204 +-> snabbkaffe_log_return(ok), ok; -transform_and_log_clickhouse_result({ok, 200, Data}, _, _) -> +transform_and_log_clickhouse_result({ok, ResponseCode, Data}, _, _) when + ResponseCode =:= 200; ResponseCode =:= 204 +-> Result = {ok, Data}, snabbkaffe_log_return(Result), Result; @@ -464,13 +468,58 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) -> sql => SQL, reason => ClickhouseErrorResult }), - case ClickhouseErrorResult of - {error, ecpool_empty} -> - {error, {recoverable_error, ecpool_empty}}; - _ -> - {error, ClickhouseErrorResult} + case is_recoverable_error(ClickhouseErrorResult) of + %% TODO: The hackeny errors that the clickhouse library forwards are + %% very loosely defined. We should try to make sure that the following + %% handles all error cases that we need to handle as recoverable_error + true -> + ?SLOG(warning, #{ + msg => "clickhouse connector: sql query failed (recoverable)", + recoverable_error => true, + connector => ResourceID, + sql => SQL, + reason => ClickhouseErrorResult + }), + to_recoverable_error(ClickhouseErrorResult); + false -> + ?SLOG(error, #{ + msg => "clickhouse connector: sql query failed (unrecoverable)", + recoverable_error => false, + connector => ResourceID, + sql => SQL, + reason => ClickhouseErrorResult + }), + to_error_tuple(ClickhouseErrorResult) end. +to_recoverable_error({error, Reason}) -> + {error, {recoverable_error, Reason}}; +to_recoverable_error(Error) -> + {error, {recoverable_error, Error}}. + +to_error_tuple({error, Reason}) -> + {error, {unrecoverable_error, Reason}}; +to_error_tuple(Error) -> + {error, {unrecoverable_error, Error}}. + +is_recoverable_error({error, Reason}) -> + is_recoverable_error_reason(Reason); +is_recoverable_error(_) -> + false. + +is_recoverable_error_reason(ecpool_empty) -> + true; +is_recoverable_error_reason(econnrefused) -> + true; +is_recoverable_error_reason(closed) -> + true; +is_recoverable_error_reason({closed, _PartialBody}) -> + true; +is_recoverable_error_reason(disconnected) -> + true; +is_recoverable_error_reason(_) -> + false. + snabbkaffe_log_return(_Result) -> ?tp( clickhouse_connector_query_return, diff --git a/changes/ee/fix-10997.en.md b/changes/ee/fix-10997.en.md new file mode 100644 index 000000000..08ccfbe43 --- /dev/null +++ b/changes/ee/fix-10997.en.md @@ -0,0 +1 @@ +The ClickHouse bridge had a problem that could cause messages to be dropped when the ClickHouse server is closed while sending messages even when the request_ttl is set to infinity. This has been fixed by treating errors due to a closed connection as recoverable errors.