Merge pull request #10918 from thalesmg/fix-ecpool-empty-unrecoverable-v50

fix(ecpool,bridge): treat `{error, ecpool_empty}` as a retriable error
This commit is contained in:
Thales Macedo Garitezi 2023-06-02 09:21:00 -03:00 committed by GitHub
commit 74ffd9ef96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 81 additions and 22 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_cassandra, [
{description, "EMQX Enterprise Cassandra Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [kernel, stdlib, ecql]},
{env, []},

View File

@ -480,6 +480,8 @@ prepare_cql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Co
handle_result({error, disconnected}) ->
{error, {recoverable_error, disconnected}};
handle_result({error, ecpool_empty}) ->
{error, {recoverable_error, ecpool_empty}};
handle_result({error, Error}) ->
{error, {unrecoverable_error, Error}};
handle_result(Res) ->

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_clickhouse, [
{description, "EMQX Enterprise ClickHouse Bridge"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, []},
{applications, [kernel, stdlib, clickhouse, emqx_resource]},
{env, []},

View File

@ -464,7 +464,12 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
sql => SQL,
reason => ClickhouseErrorResult
}),
{error, ClickhouseErrorResult}.
case ClickhouseErrorResult of
{error, ecpool_empty} ->
{error, {recoverable_error, ecpool_empty}};
_ ->
{error, ClickhouseErrorResult}
end.
snabbkaffe_log_return(_Result) ->
?tp(

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_dynamo, [
{description, "EMQX Enterprise Dynamo Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [kernel, stdlib, erlcloud]},
{env, []},

View File

@ -170,7 +170,12 @@ do_query(
query => Query,
reason => Reason
}),
Result;
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
Result
end;
_ ->
?tp(
dynamo_connector_query_return,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_opents, [
{description, "EMQX Enterprise OpenTSDB Bridge"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [
kernel,

View File

@ -142,7 +142,12 @@ do_query(InstanceId, Query, #{pool_name := PoolName} = State) ->
query => Query,
reason => Reason
}),
Result;
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
Result
end;
_ ->
?tp(
opents_connector_query_return,

View File

@ -431,11 +431,12 @@ on_query(
state => emqx_utils:redact(State)
}),
MessageData = format_data(PayloadTemplate, Data),
ecpool:pick_and_do(
Res = ecpool:pick_and_do(
PoolName,
{?MODULE, publish_messages, [Config, [MessageData]]},
no_handover
).
),
handle_result(Res).
%% emqx_resource callback that is called when a batch query is received
@ -467,11 +468,12 @@ on_batch_query(
|| Data <- MessagesToInsert
],
%% Publish the messages
ecpool:pick_and_do(
Res = ecpool:pick_and_do(
PoolName,
{?MODULE, publish_messages, [Config, FormattedMessages]},
no_handover
).
),
handle_result(Res).
publish_messages(
{_Connection, Channel},
@ -543,3 +545,8 @@ format_data([], Msg) ->
emqx_utils_json:encode(Msg);
format_data(Tokens, Msg) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg).
handle_result({error, ecpool_empty}) ->
{error, {recoverable_error, ecpool_empty}};
handle_result(Res) ->
Res.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_sqlserver, [
{description, "EMQX Enterprise SQL Server Bridge"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [kernel, stdlib, odbc]},
{env, []},

View File

@ -336,6 +336,7 @@ conn_str([{_, _} | Opts], Acc) ->
) ->
{ok, list()}
| {error, {recoverable_error, term()}}
| {error, {unrecoverable_error, term()}}
| {error, term()}.
do_query(
ResourceId,
@ -374,7 +375,12 @@ do_query(
query => Query,
reason => Reason
}),
Result;
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
Result
end;
_ ->
?tp(
sqlserver_connector_query_return,

View File

@ -200,7 +200,12 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) ->
job => Job,
reason => Reason
}),
Result;
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
Result
end;
_ ->
?tp(
tdengine_connector_query_return,

View File

@ -135,11 +135,16 @@ on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} =
request => Request,
connector => InstId,
reason => Reason
});
}),
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
ok
end,
Result.
Result
end;
_ ->
Result
end.
on_get_status(_InstId, _State) -> connected.

View File

@ -233,6 +233,8 @@ on_query(
connector => InstId
}),
{error, Reason};
{error, ecpool_empty} ->
{error, {recoverable_error, ecpool_empty}};
{{true, _Info}, _Document} ->
ok
end;
@ -261,7 +263,12 @@ on_query(
reason => Reason,
connector => InstId
}),
{error, Reason};
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
{error, Reason}
end;
{ok, Cursor} when is_pid(Cursor) ->
{ok, mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)};
Result ->

View File

@ -241,7 +241,12 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
sql => NameOrSQL,
reason => Reason
}),
Result;
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
Result
end;
Result ->
?tp(
pgsql_connector_query_return,

View File

@ -220,6 +220,8 @@ do_query(InstId, Query, #{pool_name := PoolName, type := Type} = State) ->
case is_unrecoverable_error(Reason) of
true ->
{error, {unrecoverable_error, Reason}};
false when Reason =:= ecpool_empty ->
{error, {recoverable_error, Reason}};
false ->
Result
end;

View File

@ -1,6 +1,6 @@
{application, emqx_oracle, [
{description, "EMQX Enterprise Oracle Database Connector"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [
kernel,

View File

@ -222,7 +222,12 @@ on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL, Data) ->
sql => NameOrSQL,
reason => Reason
}),
Result;
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
Result
end;
Result ->
?tp(
oracle_connector_query_return,