fix: send to buffer-supported connector even when disconnected

This commit is contained in:
Zaiming (Stone) Shi 2023-02-02 12:03:02 +01:00
parent 0eb554a62e
commit 9864587389
2 changed files with 27 additions and 16 deletions

View File

@ -264,7 +264,8 @@ query(ResId, Request, Opts) ->
case {IsBufferSupported, QM} of
{true, _} ->
%% only Kafka so far
emqx_resource_buffer_worker:simple_async_query(ResId, Request);
Opts1 = Opts#{is_buffer_supported => true},
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
{false, sync} ->
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{false, async} ->

View File

@ -38,7 +38,7 @@
-export([
simple_sync_query/2,
simple_async_query/2
simple_async_query/3
]).
-export([
@ -130,10 +130,10 @@ simple_sync_query(Id, Request) ->
Result.
%% simple async-query the resource without batching and queuing.
-spec simple_async_query(id(), request()) -> term().
simple_async_query(Id, Request) ->
-spec simple_async_query(id(), request(), query_opts()) -> term().
simple_async_query(Id, Request, QueryOpts0) ->
Index = undefined,
QueryOpts = simple_query_opts(),
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
emqx_resource_metrics:matched_inc(Id),
Ref = make_request_ref(),
Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
@ -851,23 +851,33 @@ handle_async_worker_down(Data0, Pid) ->
call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
?tp(call_query_enter, #{id => Id, query => Query}),
case emqx_resource_manager:ets_lookup(Id) of
{ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
QM =
case QM0 =:= configured of
true -> maps:get(query_mode, Data);
false -> QM0
end,
CBM = maps:get(callback_mode, Data),
CallMode = call_mode(QM, CBM),
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
{ok, _Group, #{status := stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
?RESOURCE_ERROR(not_connected, "resource not connected");
{ok, _Group, Resource} ->
QM =
case QM0 =:= configured of
true -> maps:get(query_mode, Resource);
false -> QM0
end,
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);
{error, not_found} ->
?RESOURCE_ERROR(not_found, "resource not found")
end.
do_call_query(QM, Id, Index, Ref, Query, #{is_buffer_supported := true} = QueryOpts, Resource) ->
%% The connector supprots buffer, send even in disconnected state
#{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
CallMode = call_mode(QM, CBM),
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) ->
%% when calling from the buffer worker or other simple queries,
%% only apply the query fun when it's at connected status
#{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
CallMode = call_mode(QM, CBM),
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
?RESOURCE_ERROR(not_connected, "resource not connected").
-define(APPLY_RESOURCE(NAME, EXPR, REQ),
try
%% if the callback module (connector) wants to return an error that