From 57270fb8fc67ac56845313ccba4bc0fde1b7f7aa Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 28 Sep 2022 11:54:25 +0200 Subject: [PATCH 01/13] feat: add support for counters and gauges to the Kafka Bridge This commit adds support for counters and gauges to the Kafka Brige. The Kafka bridge uses [Wolff](https://github.com/kafka4beam/wolff) for the Kafka connection. Wolff does its own batching and does not use the batching functionality in `emqx_resource_worker` that is used by other bridge types. Therefore, the counter events have to be generated by Wolff. We have added [telemetry](https://github.com/beam-telemetry/telemetry) events to Wolff that we hook into to change counters and gauges for the Kafka bridge. The counter called `matched` does not depend on specific functionality of any bridge type so the updates of this counter is moved higher up in the call chain then previously so that it also gets updated for Kafka bridges. --- .../src/emqx_resource_metrics.erl | 197 ++++++++++++++++++ .../src/emqx_resource_worker.erl | 67 +++--- lib-ee/emqx_ee_bridge/rebar.config | 2 +- .../emqx_ee_bridge/src/emqx_ee_bridge.app.src | 3 +- .../kafka/emqx_bridge_impl_kafka_producer.erl | 99 ++++++++- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 71 +++++-- mix.exs | 4 +- 7 files changed, 389 insertions(+), 54 deletions(-) create mode 100644 apps/emqx_resource/src/emqx_resource_metrics.erl diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl new file mode 100644 index 000000000..4fe3d3182 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -0,0 +1,197 @@ +-module(emqx_resource_metrics). + +-export([ + batching_change/2, + batching_get/1, + inflight_change/2, + inflight_get/1, + queuing_change/2, + queuing_get/1, + dropped_inc/1, + dropped_inc/2, + dropped_get/1, + dropped_other_inc/1, + dropped_other_inc/2, + dropped_other_get/1, + dropped_queue_full_inc/1, + dropped_queue_full_inc/2, + dropped_queue_full_get/1, + dropped_queue_not_enabled_inc/1, + dropped_queue_not_enabled_inc/2, + dropped_queue_not_enabled_get/1, + dropped_resource_not_found_inc/1, + dropped_resource_not_found_inc/2, + dropped_resource_not_found_get/1, + dropped_resource_stopped_inc/1, + dropped_resource_stopped_inc/2, + dropped_resource_stopped_get/1, + failed_inc/1, + failed_inc/2, + failed_get/1, + matched_inc/1, + matched_inc/2, + matched_get/1, + retried_inc/1, + retried_inc/2, + retried_get/1, + retried_failed_inc/1, + retried_failed_inc/2, + retried_failed_get/1, + retried_success_inc/1, + retried_success_inc/2, + retried_success_get/1, + success_inc/1, + success_inc/2, + success_get/1 +]). + +-define(RES_METRICS, resource_metrics). + +%% Gauges (value can go both up and down): +%% -------------------------------------- + +%% @doc Count of messages that are currently accumulated in memory waiting for +%% being sent in one batch +batching_change(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val). + +batching_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'batching'). + +%% @doc Count of messages that are currently queuing. [Gauge] +queuing_change(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val). + +queuing_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing'). + +%% @doc Count of messages that were sent asynchronously but ACKs are not +%% received. [Gauge] +inflight_change(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val). + +inflight_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight'). + +%% Counters (value can only got up): +%% -------------------------------------- + +%% @doc Count of messages dropped +dropped_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped'). + +dropped_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val). + +dropped_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped'). + +%% @doc Count of messages dropped due to other reasons +dropped_other_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other'). + +dropped_other_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val). + +dropped_other_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other'). + +%% @doc Count of messages dropped because the queue was full +dropped_queue_full_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full'). + +dropped_queue_full_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val). + +dropped_queue_full_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full'). + +%% @doc Count of messages dropped because the queue was not enabled +dropped_queue_not_enabled_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled'). + +dropped_queue_not_enabled_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val). + +dropped_queue_not_enabled_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled'). + +%% @doc Count of messages dropped because the resource was not found +dropped_resource_not_found_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found'). + +dropped_resource_not_found_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val). + +dropped_resource_not_found_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_not_found'). + +%% @doc Count of messages dropped because the resource was stopped +dropped_resource_stopped_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped'). + +dropped_resource_stopped_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val). + +dropped_resource_stopped_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_stopped'). + +%% @doc Count of how many times this bridge has been matched and queried +matched_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched'). + +matched_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val). + +matched_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'matched'). + +%% @doc The number of times message sends have been retried +retried_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried'). + +retried_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val). + +retried_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'retried'). + +%% @doc Count of message sends that have failed +failed_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed'). + +failed_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val). + +failed_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'failed'). + +%%% @doc Count of message sends that have failed after having been retried +retried_failed_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed'). + +retried_failed_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed', Val). + +retried_failed_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.failed'). + +%% @doc Count messages that were sucessfully sent after at least one retry +retried_success_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success'). + +retried_success_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success', Val). + +retried_success_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.success'). + +%% @doc Count of messages that have been sent successfully +success_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'success'). + +success_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val). + +success_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'success'). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 288edcf4f..75a63d427 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -80,27 +80,23 @@ start_link(Id, Index, Opts) -> sync_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), Timeout = maps:get(timeout, Opts, infinity), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), pick_cast(Id, PickKey, {query, Request, Opts}). %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), _ = handle_query_result(Id, Result, false, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), _ = handle_query_result(Id, Result, false, false), Result. @@ -134,7 +130,7 @@ init({Id, Index, Opts}) -> false -> undefined end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)), + emqx_resource_metrics:queuing_change(Id, queue_count(Queue)), InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), ok = inflight_new(Name, InfltWinSZ), HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), @@ -297,7 +293,7 @@ retry_inflight_sync( query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> Acc1 = [?QUERY(From, Request, false) | Acc], - emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'), + emqx_resource_metrics:batching_change(Id, 1), St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); @@ -330,7 +326,7 @@ flush( QueryOpts = #{ inflight_name => maps:get(name, St) }, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)), + emqx_resource_metrics:batching_change(Id, -length(Batch)), Result = call_query(configured, Id, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of @@ -380,18 +376,18 @@ handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when true; handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_resource_not_found_inc(Id), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_resource_stopped_inc(Id), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_other_inc(Id), BlockWorker; handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) -> %% the message will be queued in replayq or inflight window, @@ -425,6 +421,7 @@ call_query(QM0, Id, Query, QueryOpts) -> _ -> QM0 end, CM = maps:get(callback_mode, Data), + emqx_resource_metrics:matched_inc(Id), apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); @@ -464,7 +461,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) true -> {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'), + ok = emqx_resource_metrics:inflight_change(Id, 1), ReplyFun = fun ?MODULE:reply_after_query/6, Ref = make_message_ref(), Args = [self(), Id, Name, Ref, Query], @@ -488,7 +485,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) {async_return, inflight_full}; false -> BatchLen = length(Batch), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', BatchLen), + ok = emqx_resource_metrics:inflight_change(Id, BatchLen), ReplyFun = fun ?MODULE:batch_reply_after_query/6, Ref = make_message_ref(), Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, @@ -503,12 +500,12 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) -> %% NOTE: 'inflight' is message count that sent async but no ACK received, %% NOT the message number ququed in the inflight window. - emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -1), + emqx_resource_metrics:inflight_change(Id, -1), case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of true -> %% we marked these messages are 'queuing' although they are actually %% keeped in inflight window, not replayq - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), + emqx_resource_metrics:queuing_change(Id, 1), ?MODULE:block(Pid); false -> drop_inflight_and_resume(Pid, Name, Ref) @@ -518,12 +515,12 @@ batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> %% NOTE: 'inflight' is message count that sent async but no ACK received, %% NOT the message number ququed in the inflight window. BatchLen = length(Batch), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -BatchLen), + emqx_resource_metrics:inflight_change(Id, -BatchLen), case batch_reply_caller(Id, Result, Batch) of true -> %% we marked these messages are 'queuing' although they are actually - %% keeped in inflight window, not replayq - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen), + %% kept in inflight window, not replayq + emqx_resource_metrics:queuing_change(Id, BatchLen), ?MODULE:block(Pid); false -> drop_inflight_and_resume(Pid, Name, Ref) @@ -549,8 +546,8 @@ estimate_size(QItem) -> size(queue_item_marshaller(QItem)). maybe_append_queue(Id, undefined, _Items) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_queue_not_enabled_inc(Id), undefined; maybe_append_queue(Id, Q, Items) -> Q2 = @@ -562,13 +559,13 @@ maybe_append_queue(Id, Q, Items) -> {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), + emqx_resource_metrics:queuing_change(Id, -Dropped), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_queue_full_inc(Id), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), + emqx_resource_metrics:queuing_change(Id, 1), replayq:append(Q2, Items). get_first_n_from_queue(Q, N) -> @@ -590,7 +587,7 @@ drop_first_n_from_queue(Q, N, Id) when N > 0 -> drop_head(Q, Id) -> {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), ok = replayq:ack(Q1, AckRef), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1), + emqx_resource_metrics:queuing_change(Id, -1), Q1. %%============================================================================== @@ -645,18 +642,18 @@ inflight_drop(Name, Ref) -> %%============================================================================== inc_sent_failed(Id, true) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.failed'); + emqx_resource_metrics:failed_inc(Id), + emqx_resource_metrics:retried_inc(Id), + emqx_resource_metrics:retried_failed_inc(Id); inc_sent_failed(Id, _HasSent) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'). + emqx_resource_metrics:failed_inc(Id). inc_sent_success(Id, true) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.success'); + emqx_resource_metrics:success_inc(Id), + emqx_resource_metrics:retried_inc(Id), + emqx_resource_metrics:retried_success_inc(Id); inc_sent_success(Id, _HasSent) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'). + emqx_resource_metrics:success_inc(Id). call_mode(sync, _) -> sync; call_mode(async, always_sync) -> sync; diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index 8c79e7274..f281a9b85 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,6 +1,6 @@ {erl_opts, [debug_info]}. {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} - , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.6.4"}}} + , {wolff, {git, "https://github.com/kjellwinblad/wolff.git", {branch, "kjell/add_counters_support_ok"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 0ede2a6a5..7759ef2a2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -4,7 +4,8 @@ {applications, [ kernel, stdlib, - emqx_ee_connector + emqx_ee_connector, + telemetry ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index ce82dbe2d..5e8635d44 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -12,7 +12,10 @@ on_get_status/2 ]). --export([on_kafka_ack/3]). +-export([ + on_kafka_ack/3, + handle_telemetry_event/4 +]). -include_lib("emqx/include/logger.hrl"). @@ -30,6 +33,7 @@ on_start(InstId, Config) -> authentication := Auth, ssl := SSL } = Config, + maybe_install_wolff_telemetry_handlers(), %% it's a bug if producer config is not found %% the caller should not try to start a producer if %% there is no producer config @@ -222,6 +226,7 @@ producers_config(BridgeName, ClientId, Input) -> disk -> {false, replayq_dir(ClientId)}; hybrid -> {true, replayq_dir(ClientId)} end, + BridgeNameBin = erlang:atom_to_binary(BridgeName), #{ name => make_producer_name(BridgeName), partitioner => PartitionStrategy, @@ -234,7 +239,9 @@ producers_config(BridgeName, ClientId, Input) -> required_acks => RequiredAcks, max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, - compression => Compression + compression => Compression, + telemetry_meta_data => + #{bridge_id => <<<<"bridge:kafka:">>/binary, BridgeNameBin/binary>>} }. replayq_dir(ClientId) -> @@ -268,3 +275,91 @@ get_required(Field, Config, Throw) -> Value = maps:get(Field, Config, none), Value =:= none andalso throw(Throw), Value. + +handle_telemetry_event( + [wolff, dropped], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:dropped_inc(ID, Val); +handle_telemetry_event( + [wolff, dropped_queue_full], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:dropped_queue_full_inc(ID, Val); +handle_telemetry_event( + [wolff, queuing], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:queuing_inc(ID, Val); +handle_telemetry_event( + [wolff, retried], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:retried_inc(ID, Val); +handle_telemetry_event( + [wolff, failed], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:failed_inc(ID, Val); +handle_telemetry_event( + [wolff, inflight], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:inflight_inc(ID, Val); +handle_telemetry_event( + [wolff, retried_failed], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:retried_failed_inc(ID, Val); +handle_telemetry_event( + [wolff, retried_success], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:retried_success_inc(ID, Val); +handle_telemetry_event( + [wolff, success], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:success_inc(ID, Val); +handle_telemetry_event(_EventId, _Metrics, _MetaData, _Config) -> + %% Event that we do not handle + ok. + +maybe_install_wolff_telemetry_handlers() -> + %% Attach event handlers for Kafka telemetry events. If a handler with the + %% handler id already exists, the attach_many function does nothing + telemetry:attach_many( + %% unique handler id + <<"emqx-bridge-kafka-producer-telemetry-handler">>, + [ + [wolff, dropped], + [wolff, dropped_queue_full], + [wolff, queuing], + [wolff, retried], + [wolff, failed], + [wolff, inflight], + [wolff, retried_failed], + [wolff, retried_success], + [wolff, success] + ], + fun emqx_bridge_impl_kafka_producer:handle_telemetry_event/4, + [] + ). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index fb929e692..152862f6b 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -184,10 +184,6 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> true -> kafka_hosts_string_ssl(); false -> kafka_hosts_string() end, - kafka_bridge_rest_api_helper(#{ - <<"bootstrap_hosts">> => NormalHostsString, - <<"authentication">> => <<"none">> - }), SASLHostsString = case UseSSL of true -> kafka_hosts_string_ssl_sasl(); @@ -204,6 +200,15 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())}; false -> #{} end, + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => NormalHostsString, + <<"authentication">> => <<"none">> + }, + SSLSettings + ) + ), kafka_bridge_rest_api_helper( maps:merge( #{ @@ -243,10 +248,20 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> ok. kafka_bridge_rest_api_helper(Config) -> + BridgeType = "kafka", + BridgeName = "my_kafka_bridge", + BridgeID = emqx_bridge_resource:bridge_id( + erlang:list_to_binary(BridgeType), + erlang:list_to_binary(BridgeName) + ), + ResourceId = emqx_bridge_resource:resource_id( + erlang:list_to_binary(BridgeType), + erlang:list_to_binary(BridgeName) + ), UrlEscColon = "%3A", - BridgeIdUrlEnc = "kafka" ++ UrlEscColon ++ "my_kafka_bridge", + BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName, BridgesParts = ["bridges"], - BridgesPartsId = ["bridges", BridgeIdUrlEnc], + BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"], OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end, BridgesPartsOpDisable = OpUrlFun("disable"), BridgesPartsOpEnable = OpUrlFun("enable"), @@ -268,15 +283,13 @@ kafka_bridge_rest_api_helper(Config) -> case MyKafkaBridgeExists() of true -> %% Delete the bridge my_kafka_bridge - show( - '========================================== DELETE ========================================' - ), - {ok, 204, <<>>} = show(http_delete(BridgesPartsId)); + {ok, 204, <<>>} = show(http_delete(BridgesPartsIdDeleteAlsoActions)); false -> ok end, false = MyKafkaBridgeExists(), %% Create new Kafka bridge + KafkaTopic = "test-topic-one-partition", CreateBodyTmp = #{ <<"type">> => <<"kafka">>, <<"name">> => <<"my_kafka_bridge">>, @@ -288,7 +301,7 @@ kafka_bridge_rest_api_helper(Config) -> topic => <<"t/#">> }, <<"kafka">> => #{ - <<"topic">> => <<"test-topic-one-partition">> + <<"topic">> => erlang:list_to_binary(KafkaTopic) } } }, @@ -300,6 +313,34 @@ kafka_bridge_rest_api_helper(Config) -> {ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))), %% Check that the new bridge is in the list of bridges true = MyKafkaBridgeExists(), + %% Create a rule that uses the bridge + {ok, 201, _Rule} = http_post( + ["rules"], + #{ + <<"name">> => <<"kafka_bridge_rest_api_helper_rule">>, + <<"enable">> => true, + <<"actions">> => [BridgeID], + <<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">> + } + ), + %% Get offset before sending message + {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), + %% Send message to topic and check that it got forwarded to Kafka + Body = <<"message from EMQX">>, + emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)), + %% Give Kafka some time to get message + timer:sleep(100), + %% Check that Kafka got message + BrodOut = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), + {ok, {_, [KafkaMsg]}} = show(BrodOut), + Body = KafkaMsg#kafka_message.value, + %% Check crucial counters and gauges + 1 = emqx_resource_metrics:matched_get(ResourceId), + 1 = emqx_resource_metrics:success_get(ResourceId), + 0 = emqx_resource_metrics:dropped_get(ResourceId), + 0 = emqx_resource_metrics:failed_get(ResourceId), + 0 = emqx_resource_metrics:inflight_get(ResourceId), + 0 = emqx_resource_metrics:queuing_get(ResourceId), %% Perform operations {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), @@ -309,7 +350,7 @@ kafka_bridge_rest_api_helper(Config) -> {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})), %% Cleanup - {ok, 204, _} = show(http_delete(BridgesPartsId)), + {ok, 204, _} = show(http_delete(BridgesPartsIdDeleteAlsoActions)), false = MyKafkaBridgeExists(), ok. @@ -325,7 +366,8 @@ publish_with_and_without_ssl(AuthSettings) -> publish_helper(#{ auth_settings => AuthSettings, ssl_settings => valid_ssl_settings() - }). + }), + ok. publish_helper(#{ auth_settings := AuthSettings, @@ -345,6 +387,7 @@ publish_helper(#{ Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]), Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), InstId = emqx_bridge_resource:resource_id("kafka", Name), + BridgeId = emqx_bridge_resource:bridge_id("kafka", Name), KafkaTopic = "test-topic-one-partition", Conf = config(#{ "authentication" => AuthSettings, @@ -353,6 +396,7 @@ publish_helper(#{ "instance_id" => InstId, "ssl" => SSLSettings }), + emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}), %% To make sure we get unique value timer:sleep(1), Time = erlang:monotonic_time(), @@ -371,6 +415,7 @@ publish_helper(#{ {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), ok = ?PRODUCER:on_stop(InstId, State), + ok = emqx_bridge_resource:remove(BridgeId), ok. config(Args) -> diff --git a/mix.exs b/mix.exs index 69538142e..f9941912a 100644 --- a/mix.exs +++ b/mix.exs @@ -130,7 +130,7 @@ defmodule EMQXUmbrella.MixProject do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.4", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.6.4"}, + {:wolff, github: "kjellwinblad/wolff", branch: "kjell/add_counters_support_ok"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.4"}, @@ -516,7 +516,7 @@ defmodule EMQXUmbrella.MixProject do |> Path.join("RELEASES") |> File.open!([:write, :utf8], fn handle -> IO.puts(handle, "%% coding: utf-8") - :io.format(handle, '~tp.~n', [release_entry]) + :io.format(handle, ~c"~tp.~n", [release_entry]) end) release From 98500313eb91fdc30765b8d0122601698c6cb9e2 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 10 Oct 2022 16:55:25 -0300 Subject: [PATCH 02/13] fix(kafka): some fixes for kafka producer - MQTT topic should be a binary - use correct gauge functions from `wolff_metrics`. - don't double increment success counter for kafka action - adds a few more metrics assertions --- .../src/emqx_ee_bridge_kafka.erl | 2 +- .../kafka/emqx_bridge_impl_kafka_producer.erl | 46 +++++++++---------- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 39 +++++++++++++--- 3 files changed, 55 insertions(+), 32 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index ac5177f6e..2540b987c 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -145,7 +145,7 @@ fields(producer_opts) -> })} ]; fields(producer_mqtt_opts) -> - [{topic, mk(string(), #{desc => ?DESC(mqtt_topic)})}]; + [{topic, mk(binary(), #{desc => ?DESC(mqtt_topic)})}]; fields(producer_kafka_opts) -> [ {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 5e8635d44..09712c29d 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -33,7 +33,7 @@ on_start(InstId, Config) -> authentication := Auth, ssl := SSL } = Config, - maybe_install_wolff_telemetry_handlers(), + _ = maybe_install_wolff_telemetry_handlers(), %% it's a bug if producer config is not found %% the caller should not try to start a producer if %% there is no producer config @@ -226,7 +226,9 @@ producers_config(BridgeName, ClientId, Input) -> disk -> {false, replayq_dir(ClientId)}; hybrid -> {true, replayq_dir(ClientId)} end, - BridgeNameBin = erlang:atom_to_binary(BridgeName), + %% TODO: change this once we add kafka source + BridgeType = kafka, + ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), #{ name => make_producer_name(BridgeName), partitioner => PartitionStrategy, @@ -240,8 +242,7 @@ producers_config(BridgeName, ClientId, Input) -> max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, compression => Compression, - telemetry_meta_data => - #{bridge_id => <<<<"bridge:kafka:">>/binary, BridgeNameBin/binary>>} + telemetry_meta_data => #{bridge_id => ResourceID} }. replayq_dir(ClientId) -> @@ -280,66 +281,59 @@ handle_telemetry_event( [wolff, dropped], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> emqx_resource_metrics:dropped_inc(ID, Val); handle_telemetry_event( [wolff, dropped_queue_full], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> emqx_resource_metrics:dropped_queue_full_inc(ID, Val); handle_telemetry_event( [wolff, queuing], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> - emqx_resource_metrics:queuing_inc(ID, Val); + emqx_resource_metrics:queuing_change(ID, Val); handle_telemetry_event( [wolff, retried], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> emqx_resource_metrics:retried_inc(ID, Val); handle_telemetry_event( [wolff, failed], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> emqx_resource_metrics:failed_inc(ID, Val); handle_telemetry_event( [wolff, inflight], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> - emqx_resource_metrics:inflight_inc(ID, Val); + emqx_resource_metrics:inflight_change(ID, Val); handle_telemetry_event( [wolff, retried_failed], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> emqx_resource_metrics:retried_failed_inc(ID, Val); handle_telemetry_event( [wolff, retried_success], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> emqx_resource_metrics:retried_success_inc(ID, Val); -handle_telemetry_event( - [wolff, success], - #{counter_inc := Val}, - #{bridge_id := ID}, - _ -) when is_integer(Val) -> - emqx_resource_metrics:success_inc(ID, Val); -handle_telemetry_event(_EventId, _Metrics, _MetaData, _Config) -> +handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Event that we do not handle ok. @@ -349,6 +343,11 @@ maybe_install_wolff_telemetry_handlers() -> telemetry:attach_many( %% unique handler id <<"emqx-bridge-kafka-producer-telemetry-handler">>, + %% Note: we don't handle `[wolff, success]' because, + %% currently, we already increment the success counter for + %% this resource at `emqx_rule_runtime:handle_action' when + %% the response is `ok' and we would double increment it + %% here. [ [wolff, dropped], [wolff, dropped_queue_full], @@ -357,8 +356,7 @@ maybe_install_wolff_telemetry_handlers() -> [wolff, failed], [wolff, inflight], [wolff, retried_failed], - [wolff, retried_success], - [wolff, success] + [wolff, retried_success] ], fun emqx_bridge_impl_kafka_producer:handle_telemetry_event/4, [] diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 152862f6b..2eef1170d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -323,6 +323,22 @@ kafka_bridge_rest_api_helper(Config) -> <<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">> } ), + %% counters should be empty before + ?assertEqual(0, emqx_resource_metrics:matched_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:success_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)), %% Get offset before sending message {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), %% Send message to topic and check that it got forwarded to Kafka @@ -335,12 +351,21 @@ kafka_bridge_rest_api_helper(Config) -> {ok, {_, [KafkaMsg]}} = show(BrodOut), Body = KafkaMsg#kafka_message.value, %% Check crucial counters and gauges - 1 = emqx_resource_metrics:matched_get(ResourceId), - 1 = emqx_resource_metrics:success_get(ResourceId), - 0 = emqx_resource_metrics:dropped_get(ResourceId), - 0 = emqx_resource_metrics:failed_get(ResourceId), - 0 = emqx_resource_metrics:inflight_get(ResourceId), - 0 = emqx_resource_metrics:queuing_get(ResourceId), + ?assertEqual(1, emqx_resource_metrics:matched_get(ResourceId)), + ?assertEqual(1, emqx_resource_metrics:success_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)), %% Perform operations {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), @@ -452,7 +477,7 @@ hocon_config_template() -> """ bootstrap_hosts = \"{{ kafka_hosts_string }}\" enable = true -authentication = {{{ authentication }}} +authentication = {{{ authentication }}} ssl = {{{ ssl }}} producer = { mqtt { From cf361546f827c99b5e15260d4696a90418b4cce8 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 10 Oct 2022 17:38:11 -0300 Subject: [PATCH 03/13] feat: update emqx dashboard version -> e1.0.1-beta.5 --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 654cec168..3dd11aec3 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ export EMQX_DEFAULT_RUNNER = debian:11-slim export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh) export EMQX_DASHBOARD_VERSION ?= v1.0.9 -export EMQX_EE_DASHBOARD_VERSION ?= e1.0.1-beta.4 +export EMQX_EE_DASHBOARD_VERSION ?= e1.0.1-beta.5 export EMQX_REL_FORM ?= tgz export QUICER_DOWNLOAD_FROM_RELEASE = 1 ifeq ($(OS),Windows_NT) From 4475289ce40a626501265745370a4c5242279afd Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 11 Oct 2022 09:47:40 -0300 Subject: [PATCH 04/13] feat: use upstream newly tagged 1.7.0 wolff --- lib-ee/emqx_ee_bridge/rebar.config | 2 +- mix.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index f281a9b85..bfd1c957e 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,6 +1,6 @@ {erl_opts, [debug_info]}. {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} - , {wolff, {git, "https://github.com/kjellwinblad/wolff.git", {branch, "kjell/add_counters_support_ok"}}} + , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.0"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}} diff --git a/mix.exs b/mix.exs index f9941912a..ad6a7d7b0 100644 --- a/mix.exs +++ b/mix.exs @@ -130,7 +130,7 @@ defmodule EMQXUmbrella.MixProject do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.4", override: true}, - {:wolff, github: "kjellwinblad/wolff", branch: "kjell/add_counters_support_ok"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.7.0"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.4"}, From 357e5919cec9c4d8a456c438c9d05b8efff71e4c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 11 Oct 2022 09:51:16 -0300 Subject: [PATCH 05/13] chore: add copyright disclaimer --- apps/emqx_resource/src/emqx_resource_metrics.erl | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index 4fe3d3182..96d955db0 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -1,3 +1,19 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + -module(emqx_resource_metrics). -export([ From f0ff32c031dd3d27ae0f2ff8f8bd8bd65fa1f8d1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 11 Oct 2022 17:37:59 -0300 Subject: [PATCH 06/13] test: fix tests after counter changes --- .../test/emqx_bridge_mqtt_SUITE.erl | 3 ++- .../test/emqx_connector_demo.erl | 24 ++++++++++++++++--- .../test/emqx_resource_SUITE.erl | 12 ++++++++-- 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 4ffeee71f..819556d81 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -540,6 +540,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% stop the listener 1883 to make the bridge disconnected ok = emqx_listeners:stop_listener('tcp:default'), + ct:sleep(1500), %% PUBLISH 2 messages to the 'local' broker, the message should emqx:publish(emqx_message:make(LocalTopic, Payload)), @@ -551,7 +552,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> #{ <<"status">> := Status, <<"metrics">> := #{ - <<"matched">> := 3, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 + <<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 } } when Status == <<"connected">> orelse Status == <<"connecting">>, jsx:decode(BridgeStr1) diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 3b83cf7ed..105bcad77 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -100,6 +100,15 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> after 1000 -> {error, timeout} end; +on_query(_InstId, get_incorrect_status_count, #{pid := Pid}) -> + ReqRef = make_ref(), + From = {self(), ReqRef}, + Pid ! {From, get_incorrect_status_count}, + receive + {ReqRef, Count} -> {ok, Count} + after 1000 -> + {error, timeout} + end; on_query(_InstId, get_counter, #{pid := Pid}) -> ReqRef = make_ref(), From = {self(), ReqRef}, @@ -157,9 +166,15 @@ spawn_counter_process(Name, Register) -> Pid. counter_loop() -> - counter_loop(#{counter => 0, status => running}). + counter_loop(#{counter => 0, status => running, incorrect_status_count => 0}). -counter_loop(#{counter := Num, status := Status} = State) -> +counter_loop( + #{ + counter := Num, + status := Status, + incorrect_status_count := IncorrectCount + } = State +) -> NewState = receive block -> @@ -179,10 +194,13 @@ counter_loop(#{counter := Num, status := Status} = State) -> State#{counter => Num + N}; {{FromPid, ReqRef}, {inc, _N}} when Status == blocked -> FromPid ! {ReqRef, incorrect_status}, - State; + State#{incorrect_status_count := IncorrectCount + 1}; {get, ReplyFun} -> apply_reply(ReplyFun, Num), State; + {{FromPid, ReqRef}, get_incorrect_status_count} -> + FromPid ! {ReqRef, IncorrectCount}, + State; {{FromPid, ReqRef}, get} -> FromPid ! {ReqRef, Num}, State diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 2446c8102..672e01896 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -420,10 +420,18 @@ t_query_counter_async_inflight(_) -> {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), ct:pal("metrics: ~p", [C]), + {ok, IncorrectStatusCount} = emqx_resource:simple_sync_query(?ID, get_incorrect_status_count), + %% The `simple_sync_query' we just did also increases the matched + %% count, hence the + 1. + ExtraSimpleCallCount = IncorrectStatusCount + 1, ?assertMatch( #{matched := M, success := Ss, dropped := Dp, 'retried.success' := Rs} when - M == Ss + Dp - Rs, - C + M == Ss + Dp - Rs + ExtraSimpleCallCount, + C, + #{ + metrics => C, + extra_simple_call_count => ExtraSimpleCallCount + } ), ?assert( lists:all( From 24eda247ae58838f47b33f30bc4c0c12c8e16370 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 13 Oct 2022 09:27:24 -0300 Subject: [PATCH 07/13] chore: pin `telemetry` version --- mix.exs | 3 ++- rebar.config | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/mix.exs b/mix.exs index ad6a7d7b0..5750598ae 100644 --- a/mix.exs +++ b/mix.exs @@ -63,6 +63,7 @@ defmodule EMQXUmbrella.MixProject do {:rulesql, github: "emqx/rulesql", tag: "0.1.4"}, {:observer_cli, "1.7.1"}, {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"}, + {:telemetry, "1.1.0"}, # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true}, @@ -516,7 +517,7 @@ defmodule EMQXUmbrella.MixProject do |> Path.join("RELEASES") |> File.open!([:write, :utf8], fn handle -> IO.puts(handle, "%% coding: utf-8") - :io.format(handle, ~c"~tp.~n", [release_entry]) + :io.format(handle, '~tp.~n', [release_entry]) end) release diff --git a/rebar.config b/rebar.config index 769fe6e78..505c475cc 100644 --- a/rebar.config +++ b/rebar.config @@ -71,6 +71,7 @@ , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.1"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} + , {telemetry, "1.1.0"} ]}. {xref_ignores, From 1ad3b5df17c44279bb97289fdf1671c73acd726a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 13 Oct 2022 09:58:57 -0300 Subject: [PATCH 08/13] fix: uninstall telemetry handler on resource stop, use unique id --- .../kafka/emqx_bridge_impl_kafka_producer.erl | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 09712c29d..0a03f582f 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -33,7 +33,7 @@ on_start(InstId, Config) -> authentication := Auth, ssl := SSL } = Config, - _ = maybe_install_wolff_telemetry_handlers(), + _ = maybe_install_wolff_telemetry_handlers(InstId), %% it's a bug if producer config is not found %% the caller should not try to start a producer if %% there is no producer config @@ -89,7 +89,7 @@ on_start(InstId, Config) -> throw(failed_to_start_kafka_producer) end. -on_stop(_InstId, #{client_id := ClientID, producers := Producers}) -> +on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) -> with_log_at_error( fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, #{ @@ -103,6 +103,13 @@ on_stop(_InstId, #{client_id := ClientID, producers := Producers}) -> msg => "failed_to_delete_kafka_client", client_id => ClientID } + ), + with_log_at_error( + fun() -> uninstall_telemetry_handlers(InstanceID) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + client_id => ClientID + } ). %% @doc The callback API for rule-engine (or bridge without rules) @@ -337,12 +344,20 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Event that we do not handle ok. -maybe_install_wolff_telemetry_handlers() -> +-spec telemetry_handler_id(emqx_resource:resource_id()) -> binary(). +telemetry_handler_id(InstanceID) -> + <<"emqx-bridge-kafka-producer-", InstanceID/binary, "-telemetry-handler">>. + +uninstall_telemetry_handlers(InstanceID) -> + HandlerID = telemetry_handler_id(InstanceID), + telemetry:detach(HandlerID). + +maybe_install_wolff_telemetry_handlers(InstanceID) -> %% Attach event handlers for Kafka telemetry events. If a handler with the %% handler id already exists, the attach_many function does nothing telemetry:attach_many( %% unique handler id - <<"emqx-bridge-kafka-producer-telemetry-handler">>, + telemetry_handler_id(InstanceID), %% Note: we don't handle `[wolff, success]' because, %% currently, we already increment the success counter for %% this resource at `emqx_rule_runtime:handle_action' when @@ -358,6 +373,6 @@ maybe_install_wolff_telemetry_handlers() -> [wolff, retried_failed], [wolff, retried_success] ], - fun emqx_bridge_impl_kafka_producer:handle_telemetry_event/4, + fun ?MODULE:handle_telemetry_event/4, [] ). From a6ba8494d8cfd4dd50532b96b29ed36f5741b95a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 13 Oct 2022 11:16:37 -0300 Subject: [PATCH 09/13] feat: start `telemetry` app before `emqx` --- mix.exs | 1 + rebar.config.erl | 1 + 2 files changed, 2 insertions(+) diff --git a/mix.exs b/mix.exs index 5750598ae..e9f861ce5 100644 --- a/mix.exs +++ b/mix.exs @@ -208,6 +208,7 @@ defmodule EMQXUmbrella.MixProject do redbug: :permanent, xmerl: :permanent, hocon: :load, + telemetry: :permanent, emqx: :load, emqx_conf: :load, emqx_machine: :permanent diff --git a/rebar.config.erl b/rebar.config.erl index a8c54e5f4..f745b5cca 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -360,6 +360,7 @@ relx_apps(ReleaseType, Edition) -> redbug, xmerl, {hocon, load}, + telemetry, % started by emqx_machine {emqx, load}, {emqx_conf, load}, From 1b2b629cdde72c433327e2d55f7eef310da42083 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 13 Oct 2022 11:04:47 -0300 Subject: [PATCH 10/13] feat: emit telemetry events for all resource worker metrics --- apps/emqx_resource/src/emqx_resource.app.src | 3 +- apps/emqx_resource/src/emqx_resource_app.erl | 9 + .../src/emqx_resource_metrics.erl | 160 +++++++++++++++--- .../src/emqx_resource_worker.erl | 13 +- .../kafka/emqx_bridge_impl_kafka_producer.erl | 4 +- 5 files changed, 148 insertions(+), 41 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index b688e3c11..38dac5449 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -9,7 +9,8 @@ stdlib, gproc, jsx, - emqx + emqx, + telemetry ]}, {env, []}, {modules, []}, diff --git a/apps/emqx_resource/src/emqx_resource_app.erl b/apps/emqx_resource/src/emqx_resource_app.erl index 72838a8c1..51e7b2556 100644 --- a/apps/emqx_resource/src/emqx_resource_app.erl +++ b/apps/emqx_resource/src/emqx_resource_app.erl @@ -23,9 +23,18 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> + %% since the handler is generic and executed in the process + %% emitting the event, we need to install only a single handler + %% for the whole app. + TelemetryHandlerID = telemetry_handler_id(), + ok = emqx_resource_metrics:install_telemetry_handler(TelemetryHandlerID), emqx_resource_sup:start_link(). stop(_State) -> + TelemetryHandlerID = telemetry_handler_id(), + ok = emqx_resource_metrics:uninstall_telemetry_handler(TelemetryHandlerID), ok. %% internal functions +telemetry_handler_id() -> + <<"emqx-resource-app-telemetry-handler">>. diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index 96d955db0..e6637b68f 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -16,6 +16,13 @@ -module(emqx_resource_metrics). +-export([ + events/0, + install_telemetry_handler/1, + uninstall_telemetry_handler/1, + handle_telemetry_event/4 +]). + -export([ batching_change/2, batching_get/1, @@ -62,6 +69,91 @@ ]). -define(RES_METRICS, resource_metrics). +-define(TELEMETRY_PREFIX, emqx, resource). + +-spec events() -> [telemetry:event_name()]. +events() -> + [ + [?TELEMETRY_PREFIX, Event] + || Event <- [ + batching, + dropped_other, + dropped_queue_full, + dropped_queue_not_enabled, + dropped_resource_not_found, + dropped_resource_stopped, + failed, + inflight, + matched, + queuing, + retried_failed, + retried_success, + success + ] + ]. + +-spec install_telemetry_handler(binary()) -> ok. +install_telemetry_handler(HandlerID) -> + _ = telemetry:attach_many( + HandlerID, + events(), + fun ?MODULE:handle_telemetry_event/4, + _HandlerConfig = #{} + ), + ok. + +-spec uninstall_telemetry_handler(binary()) -> ok. +uninstall_telemetry_handler(HandlerID) -> + _ = telemetry:detach(HandlerID), + ok. + +handle_telemetry_event( + [?TELEMETRY_PREFIX, Event], + _Measurements = #{counter_inc := Val}, + _Metadata = #{resource_id := ID}, + _HandlerConfig +) -> + case Event of + batching -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val); + dropped_other -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val); + dropped_queue_full -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val); + dropped_queue_not_enabled -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val); + dropped_resource_not_found -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val); + dropped_resource_stopped -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val); + failed -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val); + inflight -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val); + matched -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val); + queuing -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val); + retried_failed -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed', Val); + retried_success -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success', Val); + success -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val); + _ -> + ok + end; +handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> + ok. %% Gauges (value can go both up and down): %% -------------------------------------- @@ -69,14 +161,14 @@ %% @doc Count of messages that are currently accumulated in memory waiting for %% being sent in one batch batching_change(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val). + telemetry:execute([?TELEMETRY_PREFIX, batching], #{counter_inc => Val}, #{resource_id => ID}). batching_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'batching'). %% @doc Count of messages that are currently queuing. [Gauge] queuing_change(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val). + telemetry:execute([?TELEMETRY_PREFIX, queuing], #{counter_inc => Val}, #{resource_id => ID}). queuing_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing'). @@ -84,7 +176,7 @@ queuing_get(ID) -> %% @doc Count of messages that were sent asynchronously but ACKs are not %% received. [Gauge] inflight_change(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val). + telemetry:execute([?TELEMETRY_PREFIX, inflight], #{counter_inc => Val}, #{resource_id => ID}). inflight_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight'). @@ -94,120 +186,134 @@ inflight_get(ID) -> %% @doc Count of messages dropped dropped_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped'). + dropped_inc(ID, 1). dropped_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val). + telemetry:execute([?TELEMETRY_PREFIX, dropped], #{counter_inc => Val}, #{resource_id => ID}). dropped_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped'). %% @doc Count of messages dropped due to other reasons dropped_other_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other'). + dropped_other_inc(ID, 1). dropped_other_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val). + telemetry:execute([?TELEMETRY_PREFIX, dropped_other], #{counter_inc => Val}, #{ + resource_id => ID + }). dropped_other_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other'). %% @doc Count of messages dropped because the queue was full dropped_queue_full_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full'). + dropped_queue_full_inc(ID, 1). dropped_queue_full_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val). + telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_full], #{counter_inc => Val}, #{ + resource_id => ID + }). dropped_queue_full_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full'). %% @doc Count of messages dropped because the queue was not enabled dropped_queue_not_enabled_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled'). + dropped_queue_not_enabled_inc(ID, 1). dropped_queue_not_enabled_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val). + telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_not_enabled], #{counter_inc => Val}, #{ + resource_id => ID + }). dropped_queue_not_enabled_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled'). %% @doc Count of messages dropped because the resource was not found dropped_resource_not_found_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found'). + dropped_resource_not_found_inc(ID, 1). dropped_resource_not_found_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val). + telemetry:execute([?TELEMETRY_PREFIX, dropped_resource_not_found], #{counter_inc => Val}, #{ + resource_id => ID + }). dropped_resource_not_found_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_not_found'). %% @doc Count of messages dropped because the resource was stopped dropped_resource_stopped_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped'). + dropped_resource_stopped_inc(ID, 1). dropped_resource_stopped_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val). + telemetry:execute([?TELEMETRY_PREFIX, dropped_resource_stopped], #{counter_inc => Val}, #{ + resource_id => ID + }). dropped_resource_stopped_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_stopped'). %% @doc Count of how many times this bridge has been matched and queried matched_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched'). + matched_inc(ID, 1). matched_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val). + telemetry:execute([?TELEMETRY_PREFIX, matched], #{counter_inc => Val}, #{resource_id => ID}). matched_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'matched'). %% @doc The number of times message sends have been retried retried_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried'). + retried_inc(ID, 1). retried_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val). + telemetry:execute([?TELEMETRY_PREFIX, retried], #{counter_inc => Val}, #{resource_id => ID}). retried_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'retried'). %% @doc Count of message sends that have failed failed_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed'). + failed_inc(ID, 1). failed_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val). + telemetry:execute([?TELEMETRY_PREFIX, failed], #{counter_inc => Val}, #{resource_id => ID}). failed_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'failed'). %%% @doc Count of message sends that have failed after having been retried retried_failed_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed'). + retried_failed_inc(ID, 1). retried_failed_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed', Val). + telemetry:execute([?TELEMETRY_PREFIX, retried_failed], #{counter_inc => Val}, #{ + resource_id => ID + }). retried_failed_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.failed'). %% @doc Count messages that were sucessfully sent after at least one retry retried_success_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success'). + retried_success_inc(ID, 1). retried_success_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success', Val). + telemetry:execute([?TELEMETRY_PREFIX, retried_success], #{counter_inc => Val}, #{ + resource_id => ID + }). retried_success_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.success'). %% @doc Count of messages that have been sent successfully success_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'success'). + success_inc(ID, 1). success_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val). + telemetry:execute([?TELEMETRY_PREFIX, success], #{counter_inc => Val}, #{resource_id => ID}). success_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'success'). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 75a63d427..b309299f6 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -376,17 +376,14 @@ handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when true; handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), - emqx_resource_metrics:dropped_inc(Id), emqx_resource_metrics:dropped_resource_not_found_inc(Id), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), - emqx_resource_metrics:dropped_inc(Id), emqx_resource_metrics:dropped_resource_stopped_inc(Id), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), - emqx_resource_metrics:dropped_inc(Id), emqx_resource_metrics:dropped_other_inc(Id), BlockWorker; handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) -> @@ -546,7 +543,6 @@ estimate_size(QItem) -> size(queue_item_marshaller(QItem)). maybe_append_queue(Id, undefined, _Items) -> - emqx_resource_metrics:dropped_inc(Id), emqx_resource_metrics:dropped_queue_not_enabled_inc(Id), undefined; maybe_append_queue(Id, Q, Items) -> @@ -560,7 +556,6 @@ maybe_append_queue(Id, Q, Items) -> ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), emqx_resource_metrics:queuing_change(Id, -Dropped), - emqx_resource_metrics:dropped_inc(Id), emqx_resource_metrics:dropped_queue_full_inc(Id), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 @@ -641,16 +636,12 @@ inflight_drop(Name, Ref) -> %%============================================================================== -inc_sent_failed(Id, true) -> - emqx_resource_metrics:failed_inc(Id), - emqx_resource_metrics:retried_inc(Id), +inc_sent_failed(Id, _HasSent = true) -> emqx_resource_metrics:retried_failed_inc(Id); inc_sent_failed(Id, _HasSent) -> emqx_resource_metrics:failed_inc(Id). -inc_sent_success(Id, true) -> - emqx_resource_metrics:success_inc(Id), - emqx_resource_metrics:retried_inc(Id), +inc_sent_success(Id, _HasSent = true) -> emqx_resource_metrics:retried_success_inc(Id); inc_sent_success(Id, _HasSent) -> emqx_resource_metrics:success_inc(Id). diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 0a03f582f..19f110bd9 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -90,14 +90,14 @@ on_start(InstId, Config) -> end. on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) -> - with_log_at_error( + _ = with_log_at_error( fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, #{ msg => "failed_to_delete_kafka_producer", client_id => ClientID } ), - with_log_at_error( + _ = with_log_at_error( fun() -> wolff:stop_and_delete_supervised_client(ClientID) end, #{ msg => "failed_to_delete_kafka_client", From 2d01726b223dbcf8c33172bed3ae9fd873d77a10 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 13 Oct 2022 15:27:35 -0300 Subject: [PATCH 11/13] fix: account calls when resource is not connected as matched --- .../test/emqx_bridge_mqtt_SUITE.erl | 46 ++++++++++++++----- .../src/emqx_resource_worker.erl | 3 ++ 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 819556d81..84152efc6 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -23,6 +23,7 @@ -include("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include("emqx_dashboard/include/emqx_dashboard.hrl"). %% output functions @@ -511,15 +512,15 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% we now test if the bridge works as expected LocalTopic = <<"local_topic/1">>, RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, - Payload = <<"hello">>, + Payload0 = <<"hello">>, emqx:subscribe(RemoteTopic), timer:sleep(100), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. - emqx:publish(emqx_message:make(LocalTopic, Payload)), + emqx:publish(emqx_message:make(LocalTopic, Payload0)), %% we should receive a message on the "remote" broker, with specified topic - assert_mqtt_msg_received(RemoteTopic, Payload), + assert_mqtt_msg_received(RemoteTopic, Payload0), %% verify the metrics of the bridge {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), @@ -543,18 +544,40 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> ct:sleep(1500), %% PUBLISH 2 messages to the 'local' broker, the message should - emqx:publish(emqx_message:make(LocalTopic, Payload)), - emqx:publish(emqx_message:make(LocalTopic, Payload)), + ok = snabbkaffe:start_trace(), + {ok, SRef} = + snabbkaffe:subscribe( + fun + ( + #{ + ?snk_kind := call_query_enter, + query := {query, _From, {send_message, #{}}, _Sent} + } + ) -> + true; + (_) -> + false + end, + _NEvents = 2, + _Timeout = 1_000 + ), + Payload1 = <<"hello2">>, + Payload2 = <<"hello3">>, + emqx:publish(emqx_message:make(LocalTopic, Payload1)), + emqx:publish(emqx_message:make(LocalTopic, Payload2)), + {ok, _} = snabbkaffe:receive_events(SRef), + ok = snabbkaffe:stop(), %% verify the metrics of the bridge, the message should be queued {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + %% matched >= 3 because of possible retries. ?assertMatch( #{ <<"status">> := Status, <<"metrics">> := #{ - <<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 + <<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 } - } when Status == <<"connected">> orelse Status == <<"connecting">>, + } when Matched >= 3 andalso (Status == <<"connected">> orelse Status == <<"connecting">>), jsx:decode(BridgeStr1) ), @@ -563,22 +586,23 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> timer:sleep(1500), %% verify the metrics of the bridge, the 2 queued messages should have been sent {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []), + %% matched >= 3 because of possible retries. ?assertMatch( #{ <<"status">> := <<"connected">>, <<"metrics">> := #{ - <<"matched">> := 3, + <<"matched">> := Matched, <<"success">> := 3, <<"failed">> := 0, <<"queuing">> := 0, <<"retried">> := _ } - }, + } when Matched >= 3, jsx:decode(BridgeStr2) ), %% also verify the 2 messages have been sent to the remote broker - assert_mqtt_msg_received(RemoteTopic, Payload), - assert_mqtt_msg_received(RemoteTopic, Payload), + assert_mqtt_msg_received(RemoteTopic, Payload1), + assert_mqtt_msg_received(RemoteTopic, Payload2), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index b309299f6..a36cb15b7 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -410,6 +410,7 @@ handle_query_result(Id, Result, HasSent, BlockWorker) -> BlockWorker. call_query(QM0, Id, Query, QueryOpts) -> + ?tp(call_query_enter, #{id => Id, query => Query}), case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} -> QM = @@ -421,8 +422,10 @@ call_query(QM0, Id, Query, QueryOpts) -> emqx_resource_metrics:matched_inc(Id), apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> + emqx_resource_metrics:matched_inc(Id), ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, #{status := S}} when S == connecting; S == disconnected -> + emqx_resource_metrics:matched_inc(Id), ?RESOURCE_ERROR(not_connected, "resource not connected"); {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") From ee4c723fcb08420c1fd8d80d9de0935fca92a5d7 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 17 Oct 2022 16:26:04 -0300 Subject: [PATCH 12/13] refactor: simplify wolff telemetry handler id --- .../src/kafka/emqx_bridge_impl_kafka_producer.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 19f110bd9..aceafd6d1 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -346,7 +346,7 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> -spec telemetry_handler_id(emqx_resource:resource_id()) -> binary(). telemetry_handler_id(InstanceID) -> - <<"emqx-bridge-kafka-producer-", InstanceID/binary, "-telemetry-handler">>. + <<"emqx-bridge-kafka-producer-", InstanceID/binary>>. uninstall_telemetry_handlers(InstanceID) -> HandlerID = telemetry_handler_id(InstanceID), From 62eeb4b8e8290f5e9869fe987fdb9c2d872f26ee Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 18 Oct 2022 09:32:35 -0300 Subject: [PATCH 13/13] feat(resource): reset metrics when stopping a resource --- .../emqx_resource/src/emqx_resource_manager.erl | 5 ++++- apps/emqx_resource/test/emqx_resource_SUITE.erl | 17 ++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index c8d5e4194..10c501865 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -507,13 +507,16 @@ start_resource(Data, From) -> stop_resource(#data{state = undefined, id = ResId} = _Data) -> _ = maybe_clear_alarm(ResId), + ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), ok; stop_resource(Data) -> %% We don't care the return value of the Mod:on_stop/2. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. + ResId = Data#data.id, _ = emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, Data#data.state), - _ = maybe_clear_alarm(Data#data.id), + _ = maybe_clear_alarm(ResId), + ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), ok. make_test_id() -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 672e01896..107ca2a93 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -502,6 +502,10 @@ t_stop_start(_) -> #{<<"name">> => <<"test_resource">>} ), + %% add some metrics to test their persistence + emqx_resource_metrics:batching_change(?ID, 5), + ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)), + {ok, _} = emqx_resource:check_and_recreate( ?ID, ?TEST_RESOURCE, @@ -513,6 +517,9 @@ t_stop_start(_) -> ?assert(is_process_alive(Pid0)), + %% metrics are reset when recreating + ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)), + ok = emqx_resource:stop(?ID), ?assertNot(is_process_alive(Pid0)), @@ -527,7 +534,15 @@ t_stop_start(_) -> {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid1)). + ?assert(is_process_alive(Pid1)), + + %% now stop while resetting the metrics + emqx_resource_metrics:batching_change(?ID, 5), + ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)), + ok = emqx_resource:stop(?ID), + ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)), + + ok. t_stop_start_local(_) -> {error, _} = emqx_resource:check_and_create_local(