fix(kafka_producer): send messages to wolff producer to buffer even when connector is in `connecting` state
Fixes https://emqx.atlassian.net/browse/EMQX-11085 Messages would not be sent to wolff if the connection was down, so they were effectively lost.
This commit is contained in:
parent
10bd74002a
commit
34186fcc74
|
@ -3,6 +3,8 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_bridge_kafka_impl_producer).
|
-module(emqx_bridge_kafka_impl_producer).
|
||||||
|
|
||||||
|
-behaviour(emqx_resource).
|
||||||
|
|
||||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
-include_lib("snabbkaffe/include/trace.hrl").
|
-include_lib("snabbkaffe/include/trace.hrl").
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_resource, [
|
{application, emqx_resource, [
|
||||||
{description, "Manager for all external resources"},
|
{description, "Manager for all external resources"},
|
||||||
{vsn, "0.1.23"},
|
{vsn, "0.1.24"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_resource_app, []}},
|
{mod, {emqx_resource_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -306,8 +306,7 @@ query(ResId, Request, Opts) ->
|
||||||
{simple_async, _} ->
|
{simple_async, _} ->
|
||||||
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
|
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
|
||||||
%% so the buffer worker does not need to lookup the cache again
|
%% so the buffer worker does not need to lookup the cache again
|
||||||
Opts1 = Opts#{is_buffer_supported => true},
|
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
|
||||||
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
|
|
||||||
{simple_sync, _} ->
|
{simple_sync, _} ->
|
||||||
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
|
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
|
||||||
%% so the buffer worker does not need to lookup the cache again
|
%% so the buffer worker does not need to lookup the cache again
|
||||||
|
|
|
@ -1048,7 +1048,9 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
|
||||||
?RESOURCE_ERROR(not_found, "resource not found")
|
?RESOURCE_ERROR(not_found, "resource not found")
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_call_query(QM, Id, Index, Ref, Query, #{is_buffer_supported := true} = QueryOpts, Resource) ->
|
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
|
||||||
|
ResQM =:= simple_async; ResQM =:= simple_sync
|
||||||
|
->
|
||||||
%% The connector supports buffer, send even in disconnected state
|
%% The connector supports buffer, send even in disconnected state
|
||||||
#{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
|
#{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
|
||||||
CallMode = call_mode(QM, CBM),
|
CallMode = call_mode(QM, CBM),
|
||||||
|
@ -1059,7 +1061,7 @@ do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Res
|
||||||
#{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
|
#{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
|
||||||
CallMode = call_mode(QM, CBM),
|
CallMode = call_mode(QM, CBM),
|
||||||
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
|
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
|
||||||
do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
|
do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Resource) ->
|
||||||
?RESOURCE_ERROR(not_connected, "resource not connected").
|
?RESOURCE_ERROR(not_connected, "resource not connected").
|
||||||
|
|
||||||
-define(APPLY_RESOURCE(NAME, EXPR, REQ),
|
-define(APPLY_RESOURCE(NAME, EXPR, REQ),
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed an issue where a Kafka Producer bridge with `sync` query mode would not buffer messages when in the `connecting` state.
|
Loading…
Reference in New Issue