From 9864587389ac7d8c457e8f84d1a29a2c06260a80 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 2 Feb 2023 12:03:02 +0100 Subject: [PATCH] fix: send to buffer-supported connector even when disconnected --- apps/emqx_resource/src/emqx_resource.erl | 3 +- .../src/emqx_resource_buffer_worker.erl | 40 ++++++++++++------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index ad7f30b47..1c5eecfbb 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -264,7 +264,8 @@ query(ResId, Request, Opts) -> case {IsBufferSupported, QM} of {true, _} -> %% only Kafka so far - emqx_resource_buffer_worker:simple_async_query(ResId, Request); + Opts1 = Opts#{is_buffer_supported => true}, + emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); {false, sync} -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); {false, async} -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 4ef384da6..c7b143381 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -38,7 +38,7 @@ -export([ simple_sync_query/2, - simple_async_query/2 + simple_async_query/3 ]). -export([ @@ -130,10 +130,10 @@ simple_sync_query(Id, Request) -> Result. %% simple async-query the resource without batching and queuing. --spec simple_async_query(id(), request()) -> term(). -simple_async_query(Id, Request) -> +-spec simple_async_query(id(), request(), query_opts()) -> term(). +simple_async_query(Id, Request, QueryOpts0) -> Index = undefined, - QueryOpts = simple_query_opts(), + QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), @@ -851,23 +851,33 @@ handle_async_worker_down(Data0, Pid) -> call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> ?tp(call_query_enter, #{id => Id, query => Query}), case emqx_resource_manager:ets_lookup(Id) of - {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} -> - QM = - case QM0 =:= configured of - true -> maps:get(query_mode, Data); - false -> QM0 - end, - CBM = maps:get(callback_mode, Data), - CallMode = call_mode(QM, CBM), - apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); - {ok, _Group, #{status := S}} when S == connecting; S == disconnected -> - ?RESOURCE_ERROR(not_connected, "resource not connected"); + {ok, _Group, Resource} -> + QM = + case QM0 =:= configured of + true -> maps:get(query_mode, Resource); + false -> QM0 + end, + do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource); {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") end. +do_call_query(QM, Id, Index, Ref, Query, #{is_buffer_supported := true} = QueryOpts, Resource) -> + %% The connector supprots buffer, send even in disconnected state + #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource, + CallMode = call_mode(QM, CBM), + apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts); +do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) -> + %% when calling from the buffer worker or other simple queries, + %% only apply the query fun when it's at connected status + #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource, + CallMode = call_mode(QM, CBM), + apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts); +do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) -> + ?RESOURCE_ERROR(not_connected, "resource not connected"). + -define(APPLY_RESOURCE(NAME, EXPR, REQ), try %% if the callback module (connector) wants to return an error that