diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl new file mode 100644 index 000000000..4fe3d3182 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -0,0 +1,197 @@ +-module(emqx_resource_metrics). + +-export([ + batching_change/2, + batching_get/1, + inflight_change/2, + inflight_get/1, + queuing_change/2, + queuing_get/1, + dropped_inc/1, + dropped_inc/2, + dropped_get/1, + dropped_other_inc/1, + dropped_other_inc/2, + dropped_other_get/1, + dropped_queue_full_inc/1, + dropped_queue_full_inc/2, + dropped_queue_full_get/1, + dropped_queue_not_enabled_inc/1, + dropped_queue_not_enabled_inc/2, + dropped_queue_not_enabled_get/1, + dropped_resource_not_found_inc/1, + dropped_resource_not_found_inc/2, + dropped_resource_not_found_get/1, + dropped_resource_stopped_inc/1, + dropped_resource_stopped_inc/2, + dropped_resource_stopped_get/1, + failed_inc/1, + failed_inc/2, + failed_get/1, + matched_inc/1, + matched_inc/2, + matched_get/1, + retried_inc/1, + retried_inc/2, + retried_get/1, + retried_failed_inc/1, + retried_failed_inc/2, + retried_failed_get/1, + retried_success_inc/1, + retried_success_inc/2, + retried_success_get/1, + success_inc/1, + success_inc/2, + success_get/1 +]). + +-define(RES_METRICS, resource_metrics). + +%% Gauges (value can go both up and down): +%% -------------------------------------- + +%% @doc Count of messages that are currently accumulated in memory waiting for +%% being sent in one batch +batching_change(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val). + +batching_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'batching'). + +%% @doc Count of messages that are currently queuing. [Gauge] +queuing_change(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val). + +queuing_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing'). + +%% @doc Count of messages that were sent asynchronously but ACKs are not +%% received. [Gauge] +inflight_change(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val). + +inflight_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight'). + +%% Counters (value can only got up): +%% -------------------------------------- + +%% @doc Count of messages dropped +dropped_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped'). + +dropped_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val). + +dropped_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped'). + +%% @doc Count of messages dropped due to other reasons +dropped_other_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other'). + +dropped_other_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val). + +dropped_other_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other'). + +%% @doc Count of messages dropped because the queue was full +dropped_queue_full_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full'). + +dropped_queue_full_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val). + +dropped_queue_full_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full'). + +%% @doc Count of messages dropped because the queue was not enabled +dropped_queue_not_enabled_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled'). + +dropped_queue_not_enabled_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val). + +dropped_queue_not_enabled_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled'). + +%% @doc Count of messages dropped because the resource was not found +dropped_resource_not_found_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found'). + +dropped_resource_not_found_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val). + +dropped_resource_not_found_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_not_found'). + +%% @doc Count of messages dropped because the resource was stopped +dropped_resource_stopped_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped'). + +dropped_resource_stopped_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val). + +dropped_resource_stopped_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_stopped'). + +%% @doc Count of how many times this bridge has been matched and queried +matched_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched'). + +matched_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val). + +matched_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'matched'). + +%% @doc The number of times message sends have been retried +retried_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried'). + +retried_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val). + +retried_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'retried'). + +%% @doc Count of message sends that have failed +failed_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed'). + +failed_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val). + +failed_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'failed'). + +%%% @doc Count of message sends that have failed after having been retried +retried_failed_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed'). + +retried_failed_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed', Val). + +retried_failed_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.failed'). + +%% @doc Count messages that were sucessfully sent after at least one retry +retried_success_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success'). + +retried_success_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success', Val). + +retried_success_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.success'). + +%% @doc Count of messages that have been sent successfully +success_inc(ID) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'success'). + +success_inc(ID, Val) -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val). + +success_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'success'). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 288edcf4f..75a63d427 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -80,27 +80,23 @@ start_link(Id, Index, Opts) -> sync_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), Timeout = maps:get(timeout, Opts, infinity), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), pick_cast(Id, PickKey, {query, Request, Opts}). %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), _ = handle_query_result(Id, Result, false, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), _ = handle_query_result(Id, Result, false, false), Result. @@ -134,7 +130,7 @@ init({Id, Index, Opts}) -> false -> undefined end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)), + emqx_resource_metrics:queuing_change(Id, queue_count(Queue)), InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), ok = inflight_new(Name, InfltWinSZ), HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), @@ -297,7 +293,7 @@ retry_inflight_sync( query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> Acc1 = [?QUERY(From, Request, false) | Acc], - emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'), + emqx_resource_metrics:batching_change(Id, 1), St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); @@ -330,7 +326,7 @@ flush( QueryOpts = #{ inflight_name => maps:get(name, St) }, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)), + emqx_resource_metrics:batching_change(Id, -length(Batch)), Result = call_query(configured, Id, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of @@ -380,18 +376,18 @@ handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when true; handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_resource_not_found_inc(Id), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_resource_stopped_inc(Id), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_other_inc(Id), BlockWorker; handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) -> %% the message will be queued in replayq or inflight window, @@ -425,6 +421,7 @@ call_query(QM0, Id, Query, QueryOpts) -> _ -> QM0 end, CM = maps:get(callback_mode, Data), + emqx_resource_metrics:matched_inc(Id), apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); @@ -464,7 +461,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) true -> {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'), + ok = emqx_resource_metrics:inflight_change(Id, 1), ReplyFun = fun ?MODULE:reply_after_query/6, Ref = make_message_ref(), Args = [self(), Id, Name, Ref, Query], @@ -488,7 +485,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) {async_return, inflight_full}; false -> BatchLen = length(Batch), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', BatchLen), + ok = emqx_resource_metrics:inflight_change(Id, BatchLen), ReplyFun = fun ?MODULE:batch_reply_after_query/6, Ref = make_message_ref(), Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, @@ -503,12 +500,12 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) -> %% NOTE: 'inflight' is message count that sent async but no ACK received, %% NOT the message number ququed in the inflight window. - emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -1), + emqx_resource_metrics:inflight_change(Id, -1), case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of true -> %% we marked these messages are 'queuing' although they are actually %% keeped in inflight window, not replayq - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), + emqx_resource_metrics:queuing_change(Id, 1), ?MODULE:block(Pid); false -> drop_inflight_and_resume(Pid, Name, Ref) @@ -518,12 +515,12 @@ batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> %% NOTE: 'inflight' is message count that sent async but no ACK received, %% NOT the message number ququed in the inflight window. BatchLen = length(Batch), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -BatchLen), + emqx_resource_metrics:inflight_change(Id, -BatchLen), case batch_reply_caller(Id, Result, Batch) of true -> %% we marked these messages are 'queuing' although they are actually - %% keeped in inflight window, not replayq - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen), + %% kept in inflight window, not replayq + emqx_resource_metrics:queuing_change(Id, BatchLen), ?MODULE:block(Pid); false -> drop_inflight_and_resume(Pid, Name, Ref) @@ -549,8 +546,8 @@ estimate_size(QItem) -> size(queue_item_marshaller(QItem)). maybe_append_queue(Id, undefined, _Items) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_queue_not_enabled_inc(Id), undefined; maybe_append_queue(Id, Q, Items) -> Q2 = @@ -562,13 +559,13 @@ maybe_append_queue(Id, Q, Items) -> {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), + emqx_resource_metrics:queuing_change(Id, -Dropped), + emqx_resource_metrics:dropped_inc(Id), + emqx_resource_metrics:dropped_queue_full_inc(Id), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), + emqx_resource_metrics:queuing_change(Id, 1), replayq:append(Q2, Items). get_first_n_from_queue(Q, N) -> @@ -590,7 +587,7 @@ drop_first_n_from_queue(Q, N, Id) when N > 0 -> drop_head(Q, Id) -> {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), ok = replayq:ack(Q1, AckRef), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1), + emqx_resource_metrics:queuing_change(Id, -1), Q1. %%============================================================================== @@ -645,18 +642,18 @@ inflight_drop(Name, Ref) -> %%============================================================================== inc_sent_failed(Id, true) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.failed'); + emqx_resource_metrics:failed_inc(Id), + emqx_resource_metrics:retried_inc(Id), + emqx_resource_metrics:retried_failed_inc(Id); inc_sent_failed(Id, _HasSent) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'). + emqx_resource_metrics:failed_inc(Id). inc_sent_success(Id, true) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.success'); + emqx_resource_metrics:success_inc(Id), + emqx_resource_metrics:retried_inc(Id), + emqx_resource_metrics:retried_success_inc(Id); inc_sent_success(Id, _HasSent) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'). + emqx_resource_metrics:success_inc(Id). call_mode(sync, _) -> sync; call_mode(async, always_sync) -> sync; diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index 8c79e7274..f281a9b85 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,6 +1,6 @@ {erl_opts, [debug_info]}. {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} - , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.6.4"}}} + , {wolff, {git, "https://github.com/kjellwinblad/wolff.git", {branch, "kjell/add_counters_support_ok"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 0ede2a6a5..7759ef2a2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -4,7 +4,8 @@ {applications, [ kernel, stdlib, - emqx_ee_connector + emqx_ee_connector, + telemetry ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index ce82dbe2d..5e8635d44 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -12,7 +12,10 @@ on_get_status/2 ]). --export([on_kafka_ack/3]). +-export([ + on_kafka_ack/3, + handle_telemetry_event/4 +]). -include_lib("emqx/include/logger.hrl"). @@ -30,6 +33,7 @@ on_start(InstId, Config) -> authentication := Auth, ssl := SSL } = Config, + maybe_install_wolff_telemetry_handlers(), %% it's a bug if producer config is not found %% the caller should not try to start a producer if %% there is no producer config @@ -222,6 +226,7 @@ producers_config(BridgeName, ClientId, Input) -> disk -> {false, replayq_dir(ClientId)}; hybrid -> {true, replayq_dir(ClientId)} end, + BridgeNameBin = erlang:atom_to_binary(BridgeName), #{ name => make_producer_name(BridgeName), partitioner => PartitionStrategy, @@ -234,7 +239,9 @@ producers_config(BridgeName, ClientId, Input) -> required_acks => RequiredAcks, max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, - compression => Compression + compression => Compression, + telemetry_meta_data => + #{bridge_id => <<<<"bridge:kafka:">>/binary, BridgeNameBin/binary>>} }. replayq_dir(ClientId) -> @@ -268,3 +275,91 @@ get_required(Field, Config, Throw) -> Value = maps:get(Field, Config, none), Value =:= none andalso throw(Throw), Value. + +handle_telemetry_event( + [wolff, dropped], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:dropped_inc(ID, Val); +handle_telemetry_event( + [wolff, dropped_queue_full], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:dropped_queue_full_inc(ID, Val); +handle_telemetry_event( + [wolff, queuing], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:queuing_inc(ID, Val); +handle_telemetry_event( + [wolff, retried], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:retried_inc(ID, Val); +handle_telemetry_event( + [wolff, failed], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:failed_inc(ID, Val); +handle_telemetry_event( + [wolff, inflight], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:inflight_inc(ID, Val); +handle_telemetry_event( + [wolff, retried_failed], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:retried_failed_inc(ID, Val); +handle_telemetry_event( + [wolff, retried_success], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:retried_success_inc(ID, Val); +handle_telemetry_event( + [wolff, success], + #{counter_inc := Val}, + #{bridge_id := ID}, + _ +) when is_integer(Val) -> + emqx_resource_metrics:success_inc(ID, Val); +handle_telemetry_event(_EventId, _Metrics, _MetaData, _Config) -> + %% Event that we do not handle + ok. + +maybe_install_wolff_telemetry_handlers() -> + %% Attach event handlers for Kafka telemetry events. If a handler with the + %% handler id already exists, the attach_many function does nothing + telemetry:attach_many( + %% unique handler id + <<"emqx-bridge-kafka-producer-telemetry-handler">>, + [ + [wolff, dropped], + [wolff, dropped_queue_full], + [wolff, queuing], + [wolff, retried], + [wolff, failed], + [wolff, inflight], + [wolff, retried_failed], + [wolff, retried_success], + [wolff, success] + ], + fun emqx_bridge_impl_kafka_producer:handle_telemetry_event/4, + [] + ). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index fb929e692..152862f6b 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -184,10 +184,6 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> true -> kafka_hosts_string_ssl(); false -> kafka_hosts_string() end, - kafka_bridge_rest_api_helper(#{ - <<"bootstrap_hosts">> => NormalHostsString, - <<"authentication">> => <<"none">> - }), SASLHostsString = case UseSSL of true -> kafka_hosts_string_ssl_sasl(); @@ -204,6 +200,15 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())}; false -> #{} end, + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => NormalHostsString, + <<"authentication">> => <<"none">> + }, + SSLSettings + ) + ), kafka_bridge_rest_api_helper( maps:merge( #{ @@ -243,10 +248,20 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> ok. kafka_bridge_rest_api_helper(Config) -> + BridgeType = "kafka", + BridgeName = "my_kafka_bridge", + BridgeID = emqx_bridge_resource:bridge_id( + erlang:list_to_binary(BridgeType), + erlang:list_to_binary(BridgeName) + ), + ResourceId = emqx_bridge_resource:resource_id( + erlang:list_to_binary(BridgeType), + erlang:list_to_binary(BridgeName) + ), UrlEscColon = "%3A", - BridgeIdUrlEnc = "kafka" ++ UrlEscColon ++ "my_kafka_bridge", + BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName, BridgesParts = ["bridges"], - BridgesPartsId = ["bridges", BridgeIdUrlEnc], + BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"], OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end, BridgesPartsOpDisable = OpUrlFun("disable"), BridgesPartsOpEnable = OpUrlFun("enable"), @@ -268,15 +283,13 @@ kafka_bridge_rest_api_helper(Config) -> case MyKafkaBridgeExists() of true -> %% Delete the bridge my_kafka_bridge - show( - '========================================== DELETE ========================================' - ), - {ok, 204, <<>>} = show(http_delete(BridgesPartsId)); + {ok, 204, <<>>} = show(http_delete(BridgesPartsIdDeleteAlsoActions)); false -> ok end, false = MyKafkaBridgeExists(), %% Create new Kafka bridge + KafkaTopic = "test-topic-one-partition", CreateBodyTmp = #{ <<"type">> => <<"kafka">>, <<"name">> => <<"my_kafka_bridge">>, @@ -288,7 +301,7 @@ kafka_bridge_rest_api_helper(Config) -> topic => <<"t/#">> }, <<"kafka">> => #{ - <<"topic">> => <<"test-topic-one-partition">> + <<"topic">> => erlang:list_to_binary(KafkaTopic) } } }, @@ -300,6 +313,34 @@ kafka_bridge_rest_api_helper(Config) -> {ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))), %% Check that the new bridge is in the list of bridges true = MyKafkaBridgeExists(), + %% Create a rule that uses the bridge + {ok, 201, _Rule} = http_post( + ["rules"], + #{ + <<"name">> => <<"kafka_bridge_rest_api_helper_rule">>, + <<"enable">> => true, + <<"actions">> => [BridgeID], + <<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">> + } + ), + %% Get offset before sending message + {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), + %% Send message to topic and check that it got forwarded to Kafka + Body = <<"message from EMQX">>, + emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)), + %% Give Kafka some time to get message + timer:sleep(100), + %% Check that Kafka got message + BrodOut = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), + {ok, {_, [KafkaMsg]}} = show(BrodOut), + Body = KafkaMsg#kafka_message.value, + %% Check crucial counters and gauges + 1 = emqx_resource_metrics:matched_get(ResourceId), + 1 = emqx_resource_metrics:success_get(ResourceId), + 0 = emqx_resource_metrics:dropped_get(ResourceId), + 0 = emqx_resource_metrics:failed_get(ResourceId), + 0 = emqx_resource_metrics:inflight_get(ResourceId), + 0 = emqx_resource_metrics:queuing_get(ResourceId), %% Perform operations {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), @@ -309,7 +350,7 @@ kafka_bridge_rest_api_helper(Config) -> {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})), %% Cleanup - {ok, 204, _} = show(http_delete(BridgesPartsId)), + {ok, 204, _} = show(http_delete(BridgesPartsIdDeleteAlsoActions)), false = MyKafkaBridgeExists(), ok. @@ -325,7 +366,8 @@ publish_with_and_without_ssl(AuthSettings) -> publish_helper(#{ auth_settings => AuthSettings, ssl_settings => valid_ssl_settings() - }). + }), + ok. publish_helper(#{ auth_settings := AuthSettings, @@ -345,6 +387,7 @@ publish_helper(#{ Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]), Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), InstId = emqx_bridge_resource:resource_id("kafka", Name), + BridgeId = emqx_bridge_resource:bridge_id("kafka", Name), KafkaTopic = "test-topic-one-partition", Conf = config(#{ "authentication" => AuthSettings, @@ -353,6 +396,7 @@ publish_helper(#{ "instance_id" => InstId, "ssl" => SSLSettings }), + emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}), %% To make sure we get unique value timer:sleep(1), Time = erlang:monotonic_time(), @@ -371,6 +415,7 @@ publish_helper(#{ {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), ok = ?PRODUCER:on_stop(InstId, State), + ok = emqx_bridge_resource:remove(BridgeId), ok. config(Args) -> diff --git a/mix.exs b/mix.exs index 69538142e..f9941912a 100644 --- a/mix.exs +++ b/mix.exs @@ -130,7 +130,7 @@ defmodule EMQXUmbrella.MixProject do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.4", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.6.4"}, + {:wolff, github: "kjellwinblad/wolff", branch: "kjell/add_counters_support_ok"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.4"}, @@ -516,7 +516,7 @@ defmodule EMQXUmbrella.MixProject do |> Path.join("RELEASES") |> File.open!([:write, :utf8], fn handle -> IO.puts(handle, "%% coding: utf-8") - :io.format(handle, '~tp.~n', [release_entry]) + :io.format(handle, ~c"~tp.~n", [release_entry]) end) release