From 0072f4a59866347846b810ea6f0c2d91b028199c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 1 Jun 2023 17:13:06 -0300 Subject: [PATCH] fix(ecpool,bridge): treat `{error, ecpool_empty}` as a retriable error Example from flaky CI test: ``` /emqx/apps/emqx_resource/src/emqx_resource_buffer_worker.erl:716 2023-06-01T19:30:37.119862+00:00 [buffer_worker_flush_ack] #{batch_or_query => [{query,undefined,{send_message,#{payload => <<"-576460752303420927">>,timestamp => 1668602148000,topic => <<"t_write_failure">>}},false,-576460614954536809}],queue_count => 0,result => {async_return,{error,{unrecoverable_error,ecpool_empty}}},'~meta' => #{gl => <0.3883.5>,node => 'test@127.0.0.1',pid => <0.4724.5>}}. ``` --- .../src/emqx_bridge_cassandra.app.src | 2 +- .../src/emqx_bridge_cassandra_connector.erl | 2 ++ .../src/emqx_bridge_clickhouse.app.src | 2 +- .../src/emqx_bridge_clickhouse_connector.erl | 7 ++++++- .../src/emqx_bridge_dynamo.app.src | 2 +- .../src/emqx_bridge_dynamo_connector.erl | 7 ++++++- .../src/emqx_bridge_opents.app.src | 2 +- .../src/emqx_bridge_opents_connector.erl | 7 ++++++- .../src/emqx_bridge_rabbitmq_connector.erl | 15 +++++++++++---- .../src/emqx_bridge_sqlserver.app.src | 2 +- .../src/emqx_bridge_sqlserver_connector.erl | 8 +++++++- .../src/emqx_bridge_tdengine_connector.erl | 7 ++++++- apps/emqx_connector/src/emqx_connector_ldap.erl | 13 +++++++++---- apps/emqx_connector/src/emqx_connector_mongo.erl | 9 ++++++++- apps/emqx_connector/src/emqx_connector_pgsql.erl | 7 ++++++- apps/emqx_connector/src/emqx_connector_redis.erl | 2 ++ apps/emqx_oracle/src/emqx_oracle.app.src | 2 +- apps/emqx_oracle/src/emqx_oracle.erl | 7 ++++++- 18 files changed, 81 insertions(+), 22 deletions(-) diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src index 1bde274f3..ea3495e0f 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_cassandra, [ {description, "EMQX Enterprise Cassandra Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, ecql]}, {env, []}, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index a3032a9df..98ba587e8 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -480,6 +480,8 @@ prepare_cql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Co handle_result({error, disconnected}) -> {error, {recoverable_error, disconnected}}; +handle_result({error, ecpool_empty}) -> + {error, {recoverable_error, ecpool_empty}}; handle_result({error, Error}) -> {error, {unrecoverable_error, Error}}; handle_result(Res) -> diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src index 72669ba8f..58a92fde4 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_clickhouse, [ {description, "EMQX Enterprise ClickHouse Bridge"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [kernel, stdlib, clickhouse, emqx_resource]}, {env, []}, diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index a2de1d3c9..aefd9112f 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -464,7 +464,12 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) -> sql => SQL, reason => ClickhouseErrorResult }), - {error, ClickhouseErrorResult}. + case ClickhouseErrorResult of + {error, ecpool_empty} -> + {error, {recoverable_error, ecpool_empty}}; + _ -> + {error, ClickhouseErrorResult} + end. snabbkaffe_log_return(_Result) -> ?tp( diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index 2d2e299d2..0e202b714 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_dynamo, [ {description, "EMQX Enterprise Dynamo Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, erlcloud]}, {env, []}, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 981c31090..86e816c5d 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -170,7 +170,12 @@ do_query( query => Query, reason => Reason }), - Result; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; _ -> ?tp( dynamo_connector_query_return, diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src index d001446b3..9037b8840 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_opents, [ {description, "EMQX Enterprise OpenTSDB Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 0366c9dc2..e49f552e9 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -142,7 +142,12 @@ do_query(InstanceId, Query, #{pool_name := PoolName} = State) -> query => Query, reason => Reason }), - Result; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; _ -> ?tp( opents_connector_query_return, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 749cb8bc1..cbdcbc845 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -431,11 +431,12 @@ on_query( state => emqx_utils:redact(State) }), MessageData = format_data(PayloadTemplate, Data), - ecpool:pick_and_do( + Res = ecpool:pick_and_do( PoolName, {?MODULE, publish_messages, [Config, [MessageData]]}, no_handover - ). + ), + handle_result(Res). %% emqx_resource callback that is called when a batch query is received @@ -467,11 +468,12 @@ on_batch_query( || Data <- MessagesToInsert ], %% Publish the messages - ecpool:pick_and_do( + Res = ecpool:pick_and_do( PoolName, {?MODULE, publish_messages, [Config, FormattedMessages]}, no_handover - ). + ), + handle_result(Res). publish_messages( {_Connection, Channel}, @@ -543,3 +545,8 @@ format_data([], Msg) -> emqx_utils_json:encode(Msg); format_data(Tokens, Msg) -> emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg). + +handle_result({error, ecpool_empty}) -> + {error, {recoverable_error, ecpool_empty}}; +handle_result(Res) -> + Res. diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index a0b4e287b..e5c5ae73d 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_sqlserver, [ {description, "EMQX Enterprise SQL Server Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, odbc]}, {env, []}, diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index ed8134051..341f89852 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -336,6 +336,7 @@ conn_str([{_, _} | Opts], Acc) -> ) -> {ok, list()} | {error, {recoverable_error, term()}} + | {error, {unrecoverable_error, term()}} | {error, term()}. do_query( ResourceId, @@ -374,7 +375,12 @@ do_query( query => Query, reason => Reason }), - Result; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; _ -> ?tp( sqlserver_connector_query_return, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 25fa3a84d..aed5e5e0a 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -200,7 +200,12 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> job => Job, reason => Reason }), - Result; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; _ -> ?tp( tdengine_connector_query_return, diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 84048901f..1d969e6f1 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -135,11 +135,16 @@ on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} = request => Request, connector => InstId, reason => Reason - }); + }), + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; _ -> - ok - end, - Result. + Result + end. on_get_status(_InstId, _State) -> connected. diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 3657bb74c..5b63daef3 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -233,6 +233,8 @@ on_query( connector => InstId }), {error, Reason}; + {error, ecpool_empty} -> + {error, {recoverable_error, ecpool_empty}}; {{true, _Info}, _Document} -> ok end; @@ -261,7 +263,12 @@ on_query( reason => Reason, connector => InstId }), - {error, Reason}; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + {error, Reason} + end; {ok, Cursor} when is_pid(Cursor) -> {ok, mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)}; Result -> diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index ba80d0c1d..a2c8df3d3 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -241,7 +241,12 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> sql => NameOrSQL, reason => Reason }), - Result; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; Result -> ?tp( pgsql_connector_query_return, diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index e7b474326..89aab0a8c 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -220,6 +220,8 @@ do_query(InstId, Query, #{pool_name := PoolName, type := Type} = State) -> case is_unrecoverable_error(Reason) of true -> {error, {unrecoverable_error, Reason}}; + false when Reason =:= ecpool_empty -> + {error, {recoverable_error, Reason}}; false -> Result end; diff --git a/apps/emqx_oracle/src/emqx_oracle.app.src b/apps/emqx_oracle/src/emqx_oracle.app.src index 3beda05a4..10dbe7990 100644 --- a/apps/emqx_oracle/src/emqx_oracle.app.src +++ b/apps/emqx_oracle/src/emqx_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_oracle, [ {description, "EMQX Enterprise Oracle Database Connector"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index c5d8ecc77..ae2128a7e 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -222,7 +222,12 @@ on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL, Data) -> sql => NameOrSQL, reason => Reason }), - Result; + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; Result -> ?tp( oracle_connector_query_return,