diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index ad7f30b47..1c5eecfbb 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -264,7 +264,8 @@ query(ResId, Request, Opts) -> case {IsBufferSupported, QM} of {true, _} -> %% only Kafka so far - emqx_resource_buffer_worker:simple_async_query(ResId, Request); + Opts1 = Opts#{is_buffer_supported => true}, + emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); {false, sync} -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); {false, async} -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 4ef384da6..c7b143381 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -38,7 +38,7 @@ -export([ simple_sync_query/2, - simple_async_query/2 + simple_async_query/3 ]). -export([ @@ -130,10 +130,10 @@ simple_sync_query(Id, Request) -> Result. %% simple async-query the resource without batching and queuing. --spec simple_async_query(id(), request()) -> term(). -simple_async_query(Id, Request) -> +-spec simple_async_query(id(), request(), query_opts()) -> term(). +simple_async_query(Id, Request, QueryOpts0) -> Index = undefined, - QueryOpts = simple_query_opts(), + QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), @@ -851,23 +851,33 @@ handle_async_worker_down(Data0, Pid) -> call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> ?tp(call_query_enter, #{id => Id, query => Query}), case emqx_resource_manager:ets_lookup(Id) of - {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} -> - QM = - case QM0 =:= configured of - true -> maps:get(query_mode, Data); - false -> QM0 - end, - CBM = maps:get(callback_mode, Data), - CallMode = call_mode(QM, CBM), - apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); - {ok, _Group, #{status := S}} when S == connecting; S == disconnected -> - ?RESOURCE_ERROR(not_connected, "resource not connected"); + {ok, _Group, Resource} -> + QM = + case QM0 =:= configured of + true -> maps:get(query_mode, Resource); + false -> QM0 + end, + do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource); {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") end. +do_call_query(QM, Id, Index, Ref, Query, #{is_buffer_supported := true} = QueryOpts, Resource) -> + %% The connector supprots buffer, send even in disconnected state + #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource, + CallMode = call_mode(QM, CBM), + apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts); +do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) -> + %% when calling from the buffer worker or other simple queries, + %% only apply the query fun when it's at connected status + #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource, + CallMode = call_mode(QM, CBM), + apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts); +do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) -> + ?RESOURCE_ERROR(not_connected, "resource not connected"). + -define(APPLY_RESOURCE(NAME, EXPR, REQ), try %% if the callback module (connector) wants to return an error that diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index 6ca554c72..fa6dd560e 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,5 +1,5 @@ {erl_opts, [debug_info]}. -{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.4"}}} +{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.7"}}} diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 1ac619626..ac98209ed 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -91,6 +91,7 @@ on_start(InstId, Config) -> {ok, #{ message_template => compile_message_template(MessageTemplate), client_id => ClientId, + kafka_topic => KafkaTopic, producers => Producers, resource_id => ResourceID }}; @@ -234,8 +235,35 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> %% do not apply the callback (which is basically to bump success or fail counter) ok. -on_get_status(_InstId, _State) -> - connected. +on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) -> + case wolff_client_sup:find_client(ClientId) of + {ok, Pid} -> + do_get_status(Pid, KafkaTopic); + {error, _Reason} -> + disconnected + end. + +do_get_status(Client, KafkaTopic) -> + %% TODO: add a wolff_producers:check_connectivity + case wolff_client:get_leader_connections(Client, KafkaTopic) of + {ok, Leaders} -> + %% Kafka is considered healthy as long as any of the partition leader is reachable + case + lists:any( + fun({_Partition, Pid}) -> + is_pid(Pid) andalso erlang:is_process_alive(Pid) + end, + Leaders + ) + of + true -> + connected; + false -> + disconnected + end; + {error, _} -> + disconnected + end. %% Parse comma separated host:port list into a [{Host,Port}] list hosts(Hosts) when is_binary(Hosts) -> diff --git a/mix.exs b/mix.exs index 8e271390f..c6e497d37 100644 --- a/mix.exs +++ b/mix.exs @@ -130,7 +130,7 @@ defmodule EMQXUmbrella.MixProject do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.7", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.7.4"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.7.5"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.7"},