From 57270fb8fc67ac56845313ccba4bc0fde1b7f7aa Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 28 Sep 2022 11:54:25 +0200 Subject: [PATCH] feat: add support for counters and gauges to the Kafka Bridge This commit adds support for counters and gauges to the Kafka Brige. The Kafka bridge uses [Wolff](https://github.com/kafka4beam/wolff) for the Kafka connection. Wolff does its own batching and does not use the batching functionality in `emqx_resource_worker` that is used by other bridge types. Therefore, the counter events have to be generated by Wolff. We have added [telemetry](https://github.com/beam-telemetry/telemetry) events to Wolff that we hook into to change counters and gauges for the Kafka bridge. The counter called `matched` does not depend on specific functionality of any bridge type so the updates of this counter is moved higher up in the call chain then previously so that it also gets updated for Kafka bridges. --- .../src/emqx_resource_metrics.erl | 197 ++++++++++++++++++ .../src/emqx_resource_worker.erl | 67 +++--- lib-ee/emqx_ee_bridge/rebar.config | 2 +- .../emqx_ee_bridge/src/emqx_ee_bridge.app.src | 3 +- .../kafka/emqx_bridge_impl_kafka_producer.erl | 99 ++++++++- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 71 +++++-- mix.exs | 4 +- 7 files changed, 389 insertions(+), 54 deletions(-) create mode 100644 apps/emqx_resource/src/emqx_resource_metrics.erl diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl new file mode 100644 index 000000000..4fe3d3182 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -0,0 +1,197 @@ +-module(emqx_resource_metrics). + +-export([ + batching_change/2, + batching_get/1, + inflight_change/2, + inflight_get/1, + queuing_change/2, + queuing_get/1, + dropped_inc/1, + dropped_inc/2, + dropped_get/1, + dropped_other_inc/1, + dropped_other_inc/2, + dropped_other_get/1, + dropped_queue_full_inc/1, + dropped_queue_full_inc/2, + dropped_queue_full_get/1, + dropped_queue_not_enabled_inc/1, + dropped_queue_not_enabled_inc/2, + dropped_queue_not_enabled_get/1, + dropped_resource_not_found_inc/1, + dropped_resource_not_found_inc/2, + dropped_resource_not_found_get/1, + dropped_resource_stopped_inc/1, + dropped_resource_stopped_inc/2, + dropped_resource_stopped_get/1, + failed_inc/1, + failed_inc/2, + failed_get/1, + matched_inc/1, + matched_inc/2, + matched_get/1, + retried_inc/1, + retried_inc/2, + retried_get/1, + retried_failed_inc/1, + retried_failed_inc/2, + retried_failed_get/1, + retried_success_inc/1, + retried_success_inc/2, + retried_success_get/1, + success_inc/1, + success_inc/2, + success_get/1 +]). + +-define(RES_METRICS, resource_metrics). + +%% Gauges (value can go both up and down): +%% -------------------------------------- + +%% @doc Count of messages that are currently accumulated in memory waiting for +%% being sent in one batch +batching_change(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val). + +batching_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'batching'). + +%% @doc Count of messages that are currently queuing. [Gauge] +queuing_change(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val). + +queuing_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing'). + +%% @doc Count of messages that were sent asynchronously but ACKs are not +%% received. [Gauge] +inflight_change(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val). + +inflight_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight'). + +%% Counters (value can only got up): +%% -------------------------------------- + +%% @doc Count of messages dropped +dropped_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped'). + +dropped_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val). + +dropped_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped'). + +%% @doc Count of messages dropped due to other reasons +dropped_other_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other'). + +dropped_other_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val). + +dropped_other_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other'). + +%% @doc Count of messages dropped because the queue was full +dropped_queue_full_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full'). + +dropped_queue_full_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val). + +dropped_queue_full_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full'). + +%% @doc Count of messages dropped because the queue was not enabled +dropped_queue_not_enabled_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled'). + +dropped_queue_not_enabled_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val). + +dropped_queue_not_enabled_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled'). + +%% @doc Count of messages dropped because the resource was not found +dropped_resource_not_found_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found'). + +dropped_resource_not_found_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val). + +dropped_resource_not_found_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_not_found'). + +%% @doc Count of messages dropped because the resource was stopped +dropped_resource_stopped_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped'). + +dropped_resource_stopped_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val). + +dropped_resource_stopped_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_stopped'). + +%% @doc Count of how many times this bridge has been matched and queried +matched_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched'). + +matched_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val). + +matched_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'matched'). + +%% @doc The number of times message sends have been retried +retried_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried'). + +retried_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val). + +retried_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'retried'). + +%% @doc Count of message sends that have failed +failed_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed'). + +failed_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val). + +failed_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'failed'). + +%%% @doc Count of message sends that have failed after having been retried +retried_failed_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed'). + +retried_failed_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed', Val). + +retried_failed_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.failed'). + +%% @doc Count messages that were sucessfully sent after at least one retry +retried_success_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success'). + +retried_success_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success', Val). + +retried_success_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.success'). + +%% @doc Count of messages that have been sent successfully +success_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'success'). + +success_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val). + +success_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'success'). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 288edcf4f..75a63d427 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -80,27 +80,23 @@ start_link(Id, Index, Opts) -> sync_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), Timeout = maps:get(timeout, Opts, infinity), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), pick_cast(Id, PickKey, {query, Request, Opts}). %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), _ = handle_query_result(Id, Result, false, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), _ = handle_query_result(Id, Result, false, false), Result. @@ -134,7 +130,7 @@ init({Id, Index, Opts}) -> false -> undefined end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)), + emqx_resource_metrics:queuing_change(Id, queue_count(Queue)), InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), ok = inflight_new(Name, InfltWinSZ), HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), @@ -297,7 +293,7 @@ retry_inflight_sync( query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> Acc1 = [?QUERY(From, Request, false) | Acc], - emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'), + emqx_resource_metrics:batching_change(Id, 1), St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); @@ -330,7 +326,7 @@ flush( QueryOpts = #{ inflight_name => maps:get(name, St) }, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)), + emqx_resource_metrics:batching_change(Id, -length(Batch)), Result = call_query(configured, Id, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of @@ -380,18 +376,18 @@ handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when true; handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_resource_not_found_inc(Id), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_resource_stopped_inc(Id), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_other_inc(Id), BlockWorker; handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) -> %% the message will be queued in replayq or inflight window, @@ -425,6 +421,7 @@ call_query(QM0, Id, Query, QueryOpts) -> _ -> QM0 end, CM = maps:get(callback_mode, Data), + emqx_resource_metrics:matched_inc(Id), apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); @@ -464,7 +461,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) true -> {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'), + ok = emqx_resource_metrics:inflight_change(Id, 1), ReplyFun = fun ?MODULE:reply_after_query/6, Ref = make_message_ref(), Args = [self(), Id, Name, Ref, Query], @@ -488,7 +485,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) {async_return, inflight_full}; false -> BatchLen = length(Batch), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', BatchLen), + ok = emqx_resource_metrics:inflight_change(Id, BatchLen), ReplyFun = fun ?MODULE:batch_reply_after_query/6, Ref = make_message_ref(), Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, @@ -503,12 +500,12 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) -> %% NOTE: 'inflight' is message count that sent async but no ACK received, %% NOT the message number ququed in the inflight window. - emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -1), + emqx_resource_metrics:inflight_change(Id, -1), case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of true -> %% we marked these messages are 'queuing' although they are actually %% keeped in inflight window, not replayq - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), + emqx_resource_metrics:queuing_change(Id, 1), ?MODULE:block(Pid); false -> drop_inflight_and_resume(Pid, Name, Ref) @@ -518,12 +515,12 @@ batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> %% NOTE: 'inflight' is message count that sent async but no ACK received, %% NOT the message number ququed in the inflight window. BatchLen = length(Batch), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -BatchLen), + emqx_resource_metrics:inflight_change(Id, -BatchLen), case batch_reply_caller(Id, Result, Batch) of true -> %% we marked these messages are 'queuing' although they are actually - %% keeped in inflight window, not replayq - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen), + %% kept in inflight window, not replayq + emqx_resource_metrics:queuing_change(Id, BatchLen), ?MODULE:block(Pid); false -> drop_inflight_and_resume(Pid, Name, Ref) @@ -549,8 +546,8 @@ estimate_size(QItem) -> size(queue_item_marshaller(QItem)). maybe_append_queue(Id, undefined, _Items) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_queue_not_enabled_inc(Id), undefined; maybe_append_queue(Id, Q, Items) -> Q2 = @@ -562,13 +559,13 @@ maybe_append_queue(Id, Q, Items) -> {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), + emqx_resource_metrics:queuing_change(Id, -Dropped), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_queue_full_inc(Id), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), + emqx_resource_metrics:queuing_change(Id, 1), replayq:append(Q2, Items). get_first_n_from_queue(Q, N) -> @@ -590,7 +587,7 @@ drop_first_n_from_queue(Q, N, Id) when N > 0 -> drop_head(Q, Id) -> {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), ok = replayq:ack(Q1, AckRef), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1), + emqx_resource_metrics:queuing_change(Id, -1), Q1. %%============================================================================== @@ -645,18 +642,18 @@ inflight_drop(Name, Ref) -> %%============================================================================== inc_sent_failed(Id, true) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.failed'); + emqx_resource_metrics:failed_inc(Id), + emqx_resource_metrics:retried_inc(Id), + emqx_resource_metrics:retried_failed_inc(Id); inc_sent_failed(Id, _HasSent) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'). + emqx_resource_metrics:failed_inc(Id). inc_sent_success(Id, true) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.success'); + emqx_resource_metrics:success_inc(Id), + emqx_resource_metrics:retried_inc(Id), + emqx_resource_metrics:retried_success_inc(Id); inc_sent_success(Id, _HasSent) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'). + emqx_resource_metrics:success_inc(Id). call_mode(sync, _) -> sync; call_mode(async, always_sync) -> sync; diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index 8c79e7274..f281a9b85 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,6 +1,6 @@ {erl_opts, [debug_info]}. {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} - , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.6.4"}}} + , {wolff, {git, "https://github.com/kjellwinblad/wolff.git", {branch, "kjell/add_counters_support_ok"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 0ede2a6a5..7759ef2a2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -4,7 +4,8 @@ {applications, [ kernel, stdlib, - emqx_ee_connector + emqx_ee_connector, + telemetry ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index ce82dbe2d..5e8635d44 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -12,7 +12,10 @@ on_get_status/2 ]). --export([on_kafka_ack/3]). +-export([ + on_kafka_ack/3, + handle_telemetry_event/4 +]). -include_lib("emqx/include/logger.hrl"). @@ -30,6 +33,7 @@ on_start(InstId, Config) -> authentication := Auth, ssl := SSL } = Config, + maybe_install_wolff_telemetry_handlers(), %% it's a bug if producer config is not found %% the caller should not try to start a producer if %% there is no producer config @@ -222,6 +226,7 @@ producers_config(BridgeName, ClientId, Input) -> disk -> {false, replayq_dir(ClientId)}; hybrid -> {true, replayq_dir(ClientId)} end, + BridgeNameBin = erlang:atom_to_binary(BridgeName), #{ name => make_producer_name(BridgeName), partitioner => PartitionStrategy, @@ -234,7 +239,9 @@ producers_config(BridgeName, ClientId, Input) -> required_acks => RequiredAcks, max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, - compression => Compression + compression => Compression, + telemetry_meta_data => + #{bridge_id => <<<<"bridge:kafka:">>/binary, BridgeNameBin/binary>>} }. replayq_dir(ClientId) -> @@ -268,3 +275,91 @@ get_required(Field, Config, Throw) -> Value = maps:get(Field, Config, none), Value =:= none andalso throw(Throw), Value. + +handle_telemetry_event( + [wolff, dropped], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:dropped_inc(ID, Val); +handle_telemetry_event( + [wolff, dropped_queue_full], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:dropped_queue_full_inc(ID, Val); +handle_telemetry_event( + [wolff, queuing], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:queuing_inc(ID, Val); +handle_telemetry_event( + [wolff, retried], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:retried_inc(ID, Val); +handle_telemetry_event( + [wolff, failed], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:failed_inc(ID, Val); +handle_telemetry_event( + [wolff, inflight], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:inflight_inc(ID, Val); +handle_telemetry_event( + [wolff, retried_failed], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:retried_failed_inc(ID, Val); +handle_telemetry_event( + [wolff, retried_success], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:retried_success_inc(ID, Val); +handle_telemetry_event( + [wolff, success], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:success_inc(ID, Val); +handle_telemetry_event(_EventId, _Metrics, _MetaData, _Config) -> + %% Event that we do not handle + ok. + +maybe_install_wolff_telemetry_handlers() -> + %% Attach event handlers for Kafka telemetry events. If a handler with the + %% handler id already exists, the attach_many function does nothing + telemetry:attach_many( + %% unique handler id + <<"emqx-bridge-kafka-producer-telemetry-handler">>, + [ + [wolff, dropped], + [wolff, dropped_queue_full], + [wolff, queuing], + [wolff, retried], + [wolff, failed], + [wolff, inflight], + [wolff, retried_failed], + [wolff, retried_success], + [wolff, success] + ], + fun emqx_bridge_impl_kafka_producer:handle_telemetry_event/4, + [] + ). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index fb929e692..152862f6b 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -184,10 +184,6 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> true -> kafka_hosts_string_ssl(); false -> kafka_hosts_string() end, - kafka_bridge_rest_api_helper(#{ - <<"bootstrap_hosts">> => NormalHostsString, - <<"authentication">> => <<"none">> - }), SASLHostsString = case UseSSL of true -> kafka_hosts_string_ssl_sasl(); @@ -204,6 +200,15 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())}; false -> #{} end, + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => NormalHostsString, + <<"authentication">> => <<"none">> + }, + SSLSettings + ) + ), kafka_bridge_rest_api_helper( maps:merge( #{ @@ -243,10 +248,20 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> ok. kafka_bridge_rest_api_helper(Config) -> + BridgeType = "kafka", + BridgeName = "my_kafka_bridge", + BridgeID = emqx_bridge_resource:bridge_id( + erlang:list_to_binary(BridgeType), + erlang:list_to_binary(BridgeName) + ), + ResourceId = emqx_bridge_resource:resource_id( + erlang:list_to_binary(BridgeType), + erlang:list_to_binary(BridgeName) + ), UrlEscColon = "%3A", - BridgeIdUrlEnc = "kafka" ++ UrlEscColon ++ "my_kafka_bridge", + BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName, BridgesParts = ["bridges"], - BridgesPartsId = ["bridges", BridgeIdUrlEnc], + BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"], OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end, BridgesPartsOpDisable = OpUrlFun("disable"), BridgesPartsOpEnable = OpUrlFun("enable"), @@ -268,15 +283,13 @@ kafka_bridge_rest_api_helper(Config) -> case MyKafkaBridgeExists() of true -> %% Delete the bridge my_kafka_bridge - show( - '========================================== DELETE ========================================' - ), - {ok, 204, <<>>} = show(http_delete(BridgesPartsId)); + {ok, 204, <<>>} = show(http_delete(BridgesPartsIdDeleteAlsoActions)); false -> ok end, false = MyKafkaBridgeExists(), %% Create new Kafka bridge + KafkaTopic = "test-topic-one-partition", CreateBodyTmp = #{ <<"type">> => <<"kafka">>, <<"name">> => <<"my_kafka_bridge">>, @@ -288,7 +301,7 @@ kafka_bridge_rest_api_helper(Config) -> topic => <<"t/#">> }, <<"kafka">> => #{ - <<"topic">> => <<"test-topic-one-partition">> + <<"topic">> => erlang:list_to_binary(KafkaTopic) } } }, @@ -300,6 +313,34 @@ kafka_bridge_rest_api_helper(Config) -> {ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))), %% Check that the new bridge is in the list of bridges true = MyKafkaBridgeExists(), + %% Create a rule that uses the bridge + {ok, 201, _Rule} = http_post( + ["rules"], + #{ + <<"name">> => <<"kafka_bridge_rest_api_helper_rule">>, + <<"enable">> => true, + <<"actions">> => [BridgeID], + <<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">> + } + ), + %% Get offset before sending message + {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), + %% Send message to topic and check that it got forwarded to Kafka + Body = <<"message from EMQX">>, + emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)), + %% Give Kafka some time to get message + timer:sleep(100), + %% Check that Kafka got message + BrodOut = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), + {ok, {_, [KafkaMsg]}} = show(BrodOut), + Body = KafkaMsg#kafka_message.value, + %% Check crucial counters and gauges + 1 = emqx_resource_metrics:matched_get(ResourceId), + 1 = emqx_resource_metrics:success_get(ResourceId), + 0 = emqx_resource_metrics:dropped_get(ResourceId), + 0 = emqx_resource_metrics:failed_get(ResourceId), + 0 = emqx_resource_metrics:inflight_get(ResourceId), + 0 = emqx_resource_metrics:queuing_get(ResourceId), %% Perform operations {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), @@ -309,7 +350,7 @@ kafka_bridge_rest_api_helper(Config) -> {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})), %% Cleanup - {ok, 204, _} = show(http_delete(BridgesPartsId)), + {ok, 204, _} = show(http_delete(BridgesPartsIdDeleteAlsoActions)), false = MyKafkaBridgeExists(), ok. @@ -325,7 +366,8 @@ publish_with_and_without_ssl(AuthSettings) -> publish_helper(#{ auth_settings => AuthSettings, ssl_settings => valid_ssl_settings() - }). + }), + ok. publish_helper(#{ auth_settings := AuthSettings, @@ -345,6 +387,7 @@ publish_helper(#{ Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]), Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), InstId = emqx_bridge_resource:resource_id("kafka", Name), + BridgeId = emqx_bridge_resource:bridge_id("kafka", Name), KafkaTopic = "test-topic-one-partition", Conf = config(#{ "authentication" => AuthSettings, @@ -353,6 +396,7 @@ publish_helper(#{ "instance_id" => InstId, "ssl" => SSLSettings }), + emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}), %% To make sure we get unique value timer:sleep(1), Time = erlang:monotonic_time(), @@ -371,6 +415,7 @@ publish_helper(#{ {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), ok = ?PRODUCER:on_stop(InstId, State), + ok = emqx_bridge_resource:remove(BridgeId), ok. config(Args) -> diff --git a/mix.exs b/mix.exs index 69538142e..f9941912a 100644 --- a/mix.exs +++ b/mix.exs @@ -130,7 +130,7 @@ defmodule EMQXUmbrella.MixProject do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.4", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.6.4"}, + {:wolff, github: "kjellwinblad/wolff", branch: "kjell/add_counters_support_ok"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.4"}, @@ -516,7 +516,7 @@ defmodule EMQXUmbrella.MixProject do |> Path.join("RELEASES") |> File.open!([:write, :utf8], fn handle -> IO.puts(handle, "%% coding: utf-8") - :io.format(handle, '~tp.~n', [release_entry]) + :io.format(handle, ~c"~tp.~n", [release_entry]) end) release