Merge pull request #10997 from kjellwinblad/kjell/fix/clickhouse_lost_messages/EMQX-10217
fix: lost messages when clickhouse closes while sending messages
This commit is contained in:
commit
43292c06e4
|
@ -450,10 +450,14 @@ execute_sql_in_clickhouse_server_using_connection(Connection, SQL) ->
|
||||||
|
|
||||||
%% This function transforms the result received from clickhouse to something
|
%% This function transforms the result received from clickhouse to something
|
||||||
%% that is a little bit more readable and creates approprieate log messages
|
%% that is a little bit more readable and creates approprieate log messages
|
||||||
transform_and_log_clickhouse_result({ok, 200, <<"">>} = _ClickhouseResult, _, _) ->
|
transform_and_log_clickhouse_result({ok, ResponseCode, <<"">>} = _ClickhouseResult, _, _) when
|
||||||
|
ResponseCode =:= 200; ResponseCode =:= 204
|
||||||
|
->
|
||||||
snabbkaffe_log_return(ok),
|
snabbkaffe_log_return(ok),
|
||||||
ok;
|
ok;
|
||||||
transform_and_log_clickhouse_result({ok, 200, Data}, _, _) ->
|
transform_and_log_clickhouse_result({ok, ResponseCode, Data}, _, _) when
|
||||||
|
ResponseCode =:= 200; ResponseCode =:= 204
|
||||||
|
->
|
||||||
Result = {ok, Data},
|
Result = {ok, Data},
|
||||||
snabbkaffe_log_return(Result),
|
snabbkaffe_log_return(Result),
|
||||||
Result;
|
Result;
|
||||||
|
@ -464,13 +468,58 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
|
||||||
sql => SQL,
|
sql => SQL,
|
||||||
reason => ClickhouseErrorResult
|
reason => ClickhouseErrorResult
|
||||||
}),
|
}),
|
||||||
case ClickhouseErrorResult of
|
case is_recoverable_error(ClickhouseErrorResult) of
|
||||||
{error, ecpool_empty} ->
|
%% TODO: The hackeny errors that the clickhouse library forwards are
|
||||||
{error, {recoverable_error, ecpool_empty}};
|
%% very loosely defined. We should try to make sure that the following
|
||||||
_ ->
|
%% handles all error cases that we need to handle as recoverable_error
|
||||||
{error, ClickhouseErrorResult}
|
true ->
|
||||||
|
?SLOG(warning, #{
|
||||||
|
msg => "clickhouse connector: sql query failed (recoverable)",
|
||||||
|
recoverable_error => true,
|
||||||
|
connector => ResourceID,
|
||||||
|
sql => SQL,
|
||||||
|
reason => ClickhouseErrorResult
|
||||||
|
}),
|
||||||
|
to_recoverable_error(ClickhouseErrorResult);
|
||||||
|
false ->
|
||||||
|
?SLOG(error, #{
|
||||||
|
msg => "clickhouse connector: sql query failed (unrecoverable)",
|
||||||
|
recoverable_error => false,
|
||||||
|
connector => ResourceID,
|
||||||
|
sql => SQL,
|
||||||
|
reason => ClickhouseErrorResult
|
||||||
|
}),
|
||||||
|
to_error_tuple(ClickhouseErrorResult)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
to_recoverable_error({error, Reason}) ->
|
||||||
|
{error, {recoverable_error, Reason}};
|
||||||
|
to_recoverable_error(Error) ->
|
||||||
|
{error, {recoverable_error, Error}}.
|
||||||
|
|
||||||
|
to_error_tuple({error, Reason}) ->
|
||||||
|
{error, {unrecoverable_error, Reason}};
|
||||||
|
to_error_tuple(Error) ->
|
||||||
|
{error, {unrecoverable_error, Error}}.
|
||||||
|
|
||||||
|
is_recoverable_error({error, Reason}) ->
|
||||||
|
is_recoverable_error_reason(Reason);
|
||||||
|
is_recoverable_error(_) ->
|
||||||
|
false.
|
||||||
|
|
||||||
|
is_recoverable_error_reason(ecpool_empty) ->
|
||||||
|
true;
|
||||||
|
is_recoverable_error_reason(econnrefused) ->
|
||||||
|
true;
|
||||||
|
is_recoverable_error_reason(closed) ->
|
||||||
|
true;
|
||||||
|
is_recoverable_error_reason({closed, _PartialBody}) ->
|
||||||
|
true;
|
||||||
|
is_recoverable_error_reason(disconnected) ->
|
||||||
|
true;
|
||||||
|
is_recoverable_error_reason(_) ->
|
||||||
|
false.
|
||||||
|
|
||||||
snabbkaffe_log_return(_Result) ->
|
snabbkaffe_log_return(_Result) ->
|
||||||
?tp(
|
?tp(
|
||||||
clickhouse_connector_query_return,
|
clickhouse_connector_query_return,
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
The ClickHouse bridge had a problem that could cause messages to be dropped when the ClickHouse server is closed while sending messages even when the request_ttl is set to infinity. This has been fixed by treating errors due to a closed connection as recoverable errors.
|
Loading…
Reference in New Issue