diff --git a/Makefile b/Makefile index 654cec168..3dd11aec3 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ export EMQX_DEFAULT_RUNNER = debian:11-slim export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh) export EMQX_DASHBOARD_VERSION ?= v1.0.9 -export EMQX_EE_DASHBOARD_VERSION ?= e1.0.1-beta.4 +export EMQX_EE_DASHBOARD_VERSION ?= e1.0.1-beta.5 export EMQX_REL_FORM ?= tgz export QUICER_DOWNLOAD_FROM_RELEASE = 1 ifeq ($(OS),Windows_NT) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 4ffeee71f..84152efc6 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -23,6 +23,7 @@ -include("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include("emqx_dashboard/include/emqx_dashboard.hrl"). %% output functions @@ -511,15 +512,15 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% we now test if the bridge works as expected LocalTopic = <<"local_topic/1">>, RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, - Payload = <<"hello">>, + Payload0 = <<"hello">>, emqx:subscribe(RemoteTopic), timer:sleep(100), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. - emqx:publish(emqx_message:make(LocalTopic, Payload)), + emqx:publish(emqx_message:make(LocalTopic, Payload0)), %% we should receive a message on the "remote" broker, with specified topic - assert_mqtt_msg_received(RemoteTopic, Payload), + assert_mqtt_msg_received(RemoteTopic, Payload0), %% verify the metrics of the bridge {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), @@ -540,20 +541,43 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% stop the listener 1883 to make the bridge disconnected ok = emqx_listeners:stop_listener('tcp:default'), + ct:sleep(1500), %% PUBLISH 2 messages to the 'local' broker, the message should - emqx:publish(emqx_message:make(LocalTopic, Payload)), - emqx:publish(emqx_message:make(LocalTopic, Payload)), + ok = snabbkaffe:start_trace(), + {ok, SRef} = + snabbkaffe:subscribe( + fun + ( + #{ + ?snk_kind := call_query_enter, + query := {query, _From, {send_message, #{}}, _Sent} + } + ) -> + true; + (_) -> + false + end, + _NEvents = 2, + _Timeout = 1_000 + ), + Payload1 = <<"hello2">>, + Payload2 = <<"hello3">>, + emqx:publish(emqx_message:make(LocalTopic, Payload1)), + emqx:publish(emqx_message:make(LocalTopic, Payload2)), + {ok, _} = snabbkaffe:receive_events(SRef), + ok = snabbkaffe:stop(), %% verify the metrics of the bridge, the message should be queued {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + %% matched >= 3 because of possible retries. ?assertMatch( #{ <<"status">> := Status, <<"metrics">> := #{ - <<"matched">> := 3, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 + <<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 } - } when Status == <<"connected">> orelse Status == <<"connecting">>, + } when Matched >= 3 andalso (Status == <<"connected">> orelse Status == <<"connecting">>), jsx:decode(BridgeStr1) ), @@ -562,22 +586,23 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> timer:sleep(1500), %% verify the metrics of the bridge, the 2 queued messages should have been sent {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []), + %% matched >= 3 because of possible retries. ?assertMatch( #{ <<"status">> := <<"connected">>, <<"metrics">> := #{ - <<"matched">> := 3, + <<"matched">> := Matched, <<"success">> := 3, <<"failed">> := 0, <<"queuing">> := 0, <<"retried">> := _ } - }, + } when Matched >= 3, jsx:decode(BridgeStr2) ), %% also verify the 2 messages have been sent to the remote broker - assert_mqtt_msg_received(RemoteTopic, Payload), - assert_mqtt_msg_received(RemoteTopic, Payload), + assert_mqtt_msg_received(RemoteTopic, Payload1), + assert_mqtt_msg_received(RemoteTopic, Payload2), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index b688e3c11..38dac5449 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -9,7 +9,8 @@ stdlib, gproc, jsx, - emqx + emqx, + telemetry ]}, {env, []}, {modules, []}, diff --git a/apps/emqx_resource/src/emqx_resource_app.erl b/apps/emqx_resource/src/emqx_resource_app.erl index 72838a8c1..51e7b2556 100644 --- a/apps/emqx_resource/src/emqx_resource_app.erl +++ b/apps/emqx_resource/src/emqx_resource_app.erl @@ -23,9 +23,18 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> + %% since the handler is generic and executed in the process + %% emitting the event, we need to install only a single handler + %% for the whole app. + TelemetryHandlerID = telemetry_handler_id(), + ok = emqx_resource_metrics:install_telemetry_handler(TelemetryHandlerID), emqx_resource_sup:start_link(). stop(_State) -> + TelemetryHandlerID = telemetry_handler_id(), + ok = emqx_resource_metrics:uninstall_telemetry_handler(TelemetryHandlerID), ok. %% internal functions +telemetry_handler_id() -> + <<"emqx-resource-app-telemetry-handler">>. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index c8d5e4194..10c501865 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -507,13 +507,16 @@ start_resource(Data, From) -> stop_resource(#data{state = undefined, id = ResId} = _Data) -> _ = maybe_clear_alarm(ResId), + ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), ok; stop_resource(Data) -> %% We don't care the return value of the Mod:on_stop/2. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. + ResId = Data#data.id, _ = emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, Data#data.state), - _ = maybe_clear_alarm(Data#data.id), + _ = maybe_clear_alarm(ResId), + ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), ok. make_test_id() -> diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl new file mode 100644 index 000000000..e6637b68f --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -0,0 +1,319 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_resource_metrics). + +-export([ + events/0, + install_telemetry_handler/1, + uninstall_telemetry_handler/1, + handle_telemetry_event/4 +]). + +-export([ + batching_change/2, + batching_get/1, + inflight_change/2, + inflight_get/1, + queuing_change/2, + queuing_get/1, + dropped_inc/1, + dropped_inc/2, + dropped_get/1, + dropped_other_inc/1, + dropped_other_inc/2, + dropped_other_get/1, + dropped_queue_full_inc/1, + dropped_queue_full_inc/2, + dropped_queue_full_get/1, + dropped_queue_not_enabled_inc/1, + dropped_queue_not_enabled_inc/2, + dropped_queue_not_enabled_get/1, + dropped_resource_not_found_inc/1, + dropped_resource_not_found_inc/2, + dropped_resource_not_found_get/1, + dropped_resource_stopped_inc/1, + dropped_resource_stopped_inc/2, + dropped_resource_stopped_get/1, + failed_inc/1, + failed_inc/2, + failed_get/1, + matched_inc/1, + matched_inc/2, + matched_get/1, + retried_inc/1, + retried_inc/2, + retried_get/1, + retried_failed_inc/1, + retried_failed_inc/2, + retried_failed_get/1, + retried_success_inc/1, + retried_success_inc/2, + retried_success_get/1, + success_inc/1, + success_inc/2, + success_get/1 +]). + +-define(RES_METRICS, resource_metrics). +-define(TELEMETRY_PREFIX, emqx, resource). + +-spec events() -> [telemetry:event_name()]. +events() -> + [ + [?TELEMETRY_PREFIX, Event] + || Event <- [ + batching, + dropped_other, + dropped_queue_full, + dropped_queue_not_enabled, + dropped_resource_not_found, + dropped_resource_stopped, + failed, + inflight, + matched, + queuing, + retried_failed, + retried_success, + success + ] + ]. + +-spec install_telemetry_handler(binary()) -> ok. +install_telemetry_handler(HandlerID) -> + _ = telemetry:attach_many( + HandlerID, + events(), + fun ?MODULE:handle_telemetry_event/4, + _HandlerConfig = #{} + ), + ok. + +-spec uninstall_telemetry_handler(binary()) -> ok. +uninstall_telemetry_handler(HandlerID) -> + _ = telemetry:detach(HandlerID), + ok. + +handle_telemetry_event( + [?TELEMETRY_PREFIX, Event], + _Measurements = #{counter_inc := Val}, + _Metadata = #{resource_id := ID}, + _HandlerConfig +) -> + case Event of + batching -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val); + dropped_other -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val); + dropped_queue_full -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val); + dropped_queue_not_enabled -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val); + dropped_resource_not_found -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val); + dropped_resource_stopped -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val); + failed -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val); + inflight -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val); + matched -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val); + queuing -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val); + retried_failed -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed', Val); + retried_success -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success', Val); + success -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val); + _ -> + ok + end; +handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> + ok. + +%% Gauges (value can go both up and down): +%% -------------------------------------- + +%% @doc Count of messages that are currently accumulated in memory waiting for +%% being sent in one batch +batching_change(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, batching], #{counter_inc => Val}, #{resource_id => ID}). + +batching_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'batching'). + +%% @doc Count of messages that are currently queuing. [Gauge] +queuing_change(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, queuing], #{counter_inc => Val}, #{resource_id => ID}). + +queuing_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing'). + +%% @doc Count of messages that were sent asynchronously but ACKs are not +%% received. [Gauge] +inflight_change(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, inflight], #{counter_inc => Val}, #{resource_id => ID}). + +inflight_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight'). + +%% Counters (value can only got up): +%% -------------------------------------- + +%% @doc Count of messages dropped +dropped_inc(ID) -> + dropped_inc(ID, 1). + +dropped_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, dropped], #{counter_inc => Val}, #{resource_id => ID}). + +dropped_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped'). + +%% @doc Count of messages dropped due to other reasons +dropped_other_inc(ID) -> + dropped_other_inc(ID, 1). + +dropped_other_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, dropped_other], #{counter_inc => Val}, #{ + resource_id => ID + }). + +dropped_other_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other'). + +%% @doc Count of messages dropped because the queue was full +dropped_queue_full_inc(ID) -> + dropped_queue_full_inc(ID, 1). + +dropped_queue_full_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_full], #{counter_inc => Val}, #{ + resource_id => ID + }). + +dropped_queue_full_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full'). + +%% @doc Count of messages dropped because the queue was not enabled +dropped_queue_not_enabled_inc(ID) -> + dropped_queue_not_enabled_inc(ID, 1). + +dropped_queue_not_enabled_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_not_enabled], #{counter_inc => Val}, #{ + resource_id => ID + }). + +dropped_queue_not_enabled_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled'). + +%% @doc Count of messages dropped because the resource was not found +dropped_resource_not_found_inc(ID) -> + dropped_resource_not_found_inc(ID, 1). + +dropped_resource_not_found_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, dropped_resource_not_found], #{counter_inc => Val}, #{ + resource_id => ID + }). + +dropped_resource_not_found_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_not_found'). + +%% @doc Count of messages dropped because the resource was stopped +dropped_resource_stopped_inc(ID) -> + dropped_resource_stopped_inc(ID, 1). + +dropped_resource_stopped_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, dropped_resource_stopped], #{counter_inc => Val}, #{ + resource_id => ID + }). + +dropped_resource_stopped_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_stopped'). + +%% @doc Count of how many times this bridge has been matched and queried +matched_inc(ID) -> + matched_inc(ID, 1). + +matched_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, matched], #{counter_inc => Val}, #{resource_id => ID}). + +matched_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'matched'). + +%% @doc The number of times message sends have been retried +retried_inc(ID) -> + retried_inc(ID, 1). + +retried_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, retried], #{counter_inc => Val}, #{resource_id => ID}). + +retried_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'retried'). + +%% @doc Count of message sends that have failed +failed_inc(ID) -> + failed_inc(ID, 1). + +failed_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, failed], #{counter_inc => Val}, #{resource_id => ID}). + +failed_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'failed'). + +%%% @doc Count of message sends that have failed after having been retried +retried_failed_inc(ID) -> + retried_failed_inc(ID, 1). + +retried_failed_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, retried_failed], #{counter_inc => Val}, #{ + resource_id => ID + }). + +retried_failed_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.failed'). + +%% @doc Count messages that were sucessfully sent after at least one retry +retried_success_inc(ID) -> + retried_success_inc(ID, 1). + +retried_success_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, retried_success], #{counter_inc => Val}, #{ + resource_id => ID + }). + +retried_success_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.success'). + +%% @doc Count of messages that have been sent successfully +success_inc(ID) -> + success_inc(ID, 1). + +success_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, success], #{counter_inc => Val}, #{resource_id => ID}). + +success_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'success'). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 288edcf4f..a36cb15b7 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -80,27 +80,23 @@ start_link(Id, Index, Opts) -> sync_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), Timeout = maps:get(timeout, Opts, infinity), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), pick_cast(Id, PickKey, {query, Request, Opts}). %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), _ = handle_query_result(Id, Result, false, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), _ = handle_query_result(Id, Result, false, false), Result. @@ -134,7 +130,7 @@ init({Id, Index, Opts}) -> false -> undefined end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)), + emqx_resource_metrics:queuing_change(Id, queue_count(Queue)), InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), ok = inflight_new(Name, InfltWinSZ), HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), @@ -297,7 +293,7 @@ retry_inflight_sync( query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> Acc1 = [?QUERY(From, Request, false) | Acc], - emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'), + emqx_resource_metrics:batching_change(Id, 1), St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); @@ -330,7 +326,7 @@ flush( QueryOpts = #{ inflight_name => maps:get(name, St) }, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)), + emqx_resource_metrics:batching_change(Id, -length(Batch)), Result = call_query(configured, Id, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of @@ -380,18 +376,15 @@ handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when true; handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'), + emqx_resource_metrics:dropped_resource_not_found_inc(Id), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'), + emqx_resource_metrics:dropped_resource_stopped_inc(Id), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), + emqx_resource_metrics:dropped_other_inc(Id), BlockWorker; handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) -> %% the message will be queued in replayq or inflight window, @@ -417,6 +410,7 @@ handle_query_result(Id, Result, HasSent, BlockWorker) -> BlockWorker. call_query(QM0, Id, Query, QueryOpts) -> + ?tp(call_query_enter, #{id => Id, query => Query}), case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} -> QM = @@ -425,10 +419,13 @@ call_query(QM0, Id, Query, QueryOpts) -> _ -> QM0 end, CM = maps:get(callback_mode, Data), + emqx_resource_metrics:matched_inc(Id), apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> + emqx_resource_metrics:matched_inc(Id), ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, #{status := S}} when S == connecting; S == disconnected -> + emqx_resource_metrics:matched_inc(Id), ?RESOURCE_ERROR(not_connected, "resource not connected"); {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") @@ -464,7 +461,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) true -> {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'), + ok = emqx_resource_metrics:inflight_change(Id, 1), ReplyFun = fun ?MODULE:reply_after_query/6, Ref = make_message_ref(), Args = [self(), Id, Name, Ref, Query], @@ -488,7 +485,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) {async_return, inflight_full}; false -> BatchLen = length(Batch), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', BatchLen), + ok = emqx_resource_metrics:inflight_change(Id, BatchLen), ReplyFun = fun ?MODULE:batch_reply_after_query/6, Ref = make_message_ref(), Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, @@ -503,12 +500,12 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) -> %% NOTE: 'inflight' is message count that sent async but no ACK received, %% NOT the message number ququed in the inflight window. - emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -1), + emqx_resource_metrics:inflight_change(Id, -1), case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of true -> %% we marked these messages are 'queuing' although they are actually %% keeped in inflight window, not replayq - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), + emqx_resource_metrics:queuing_change(Id, 1), ?MODULE:block(Pid); false -> drop_inflight_and_resume(Pid, Name, Ref) @@ -518,12 +515,12 @@ batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> %% NOTE: 'inflight' is message count that sent async but no ACK received, %% NOT the message number ququed in the inflight window. BatchLen = length(Batch), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -BatchLen), + emqx_resource_metrics:inflight_change(Id, -BatchLen), case batch_reply_caller(Id, Result, Batch) of true -> %% we marked these messages are 'queuing' although they are actually - %% keeped in inflight window, not replayq - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen), + %% kept in inflight window, not replayq + emqx_resource_metrics:queuing_change(Id, BatchLen), ?MODULE:block(Pid); false -> drop_inflight_and_resume(Pid, Name, Ref) @@ -549,8 +546,7 @@ estimate_size(QItem) -> size(queue_item_marshaller(QItem)). maybe_append_queue(Id, undefined, _Items) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'), + emqx_resource_metrics:dropped_queue_not_enabled_inc(Id), undefined; maybe_append_queue(Id, Q, Items) -> Q2 = @@ -562,13 +558,12 @@ maybe_append_queue(Id, Q, Items) -> {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), + emqx_resource_metrics:queuing_change(Id, -Dropped), + emqx_resource_metrics:dropped_queue_full_inc(Id), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), + emqx_resource_metrics:queuing_change(Id, 1), replayq:append(Q2, Items). get_first_n_from_queue(Q, N) -> @@ -590,7 +585,7 @@ drop_first_n_from_queue(Q, N, Id) when N > 0 -> drop_head(Q, Id) -> {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), ok = replayq:ack(Q1, AckRef), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1), + emqx_resource_metrics:queuing_change(Id, -1), Q1. %%============================================================================== @@ -644,19 +639,15 @@ inflight_drop(Name, Ref) -> %%============================================================================== -inc_sent_failed(Id, true) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.failed'); +inc_sent_failed(Id, _HasSent = true) -> + emqx_resource_metrics:retried_failed_inc(Id); inc_sent_failed(Id, _HasSent) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'). + emqx_resource_metrics:failed_inc(Id). -inc_sent_success(Id, true) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.success'); +inc_sent_success(Id, _HasSent = true) -> + emqx_resource_metrics:retried_success_inc(Id); inc_sent_success(Id, _HasSent) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'). + emqx_resource_metrics:success_inc(Id). call_mode(sync, _) -> sync; call_mode(async, always_sync) -> sync; diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 3b83cf7ed..105bcad77 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -100,6 +100,15 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> after 1000 -> {error, timeout} end; +on_query(_InstId, get_incorrect_status_count, #{pid := Pid}) -> + ReqRef = make_ref(), + From = {self(), ReqRef}, + Pid ! {From, get_incorrect_status_count}, + receive + {ReqRef, Count} -> {ok, Count} + after 1000 -> + {error, timeout} + end; on_query(_InstId, get_counter, #{pid := Pid}) -> ReqRef = make_ref(), From = {self(), ReqRef}, @@ -157,9 +166,15 @@ spawn_counter_process(Name, Register) -> Pid. counter_loop() -> - counter_loop(#{counter => 0, status => running}). + counter_loop(#{counter => 0, status => running, incorrect_status_count => 0}). -counter_loop(#{counter := Num, status := Status} = State) -> +counter_loop( + #{ + counter := Num, + status := Status, + incorrect_status_count := IncorrectCount + } = State +) -> NewState = receive block -> @@ -179,10 +194,13 @@ counter_loop(#{counter := Num, status := Status} = State) -> State#{counter => Num + N}; {{FromPid, ReqRef}, {inc, _N}} when Status == blocked -> FromPid ! {ReqRef, incorrect_status}, - State; + State#{incorrect_status_count := IncorrectCount + 1}; {get, ReplyFun} -> apply_reply(ReplyFun, Num), State; + {{FromPid, ReqRef}, get_incorrect_status_count} -> + FromPid ! {ReqRef, IncorrectCount}, + State; {{FromPid, ReqRef}, get} -> FromPid ! {ReqRef, Num}, State diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 2446c8102..107ca2a93 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -420,10 +420,18 @@ t_query_counter_async_inflight(_) -> {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), ct:pal("metrics: ~p", [C]), + {ok, IncorrectStatusCount} = emqx_resource:simple_sync_query(?ID, get_incorrect_status_count), + %% The `simple_sync_query' we just did also increases the matched + %% count, hence the + 1. + ExtraSimpleCallCount = IncorrectStatusCount + 1, ?assertMatch( #{matched := M, success := Ss, dropped := Dp, 'retried.success' := Rs} when - M == Ss + Dp - Rs, - C + M == Ss + Dp - Rs + ExtraSimpleCallCount, + C, + #{ + metrics => C, + extra_simple_call_count => ExtraSimpleCallCount + } ), ?assert( lists:all( @@ -494,6 +502,10 @@ t_stop_start(_) -> #{<<"name">> => <<"test_resource">>} ), + %% add some metrics to test their persistence + emqx_resource_metrics:batching_change(?ID, 5), + ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)), + {ok, _} = emqx_resource:check_and_recreate( ?ID, ?TEST_RESOURCE, @@ -505,6 +517,9 @@ t_stop_start(_) -> ?assert(is_process_alive(Pid0)), + %% metrics are reset when recreating + ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)), + ok = emqx_resource:stop(?ID), ?assertNot(is_process_alive(Pid0)), @@ -519,7 +534,15 @@ t_stop_start(_) -> {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid1)). + ?assert(is_process_alive(Pid1)), + + %% now stop while resetting the metrics + emqx_resource_metrics:batching_change(?ID, 5), + ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)), + ok = emqx_resource:stop(?ID), + ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)), + + ok. t_stop_start_local(_) -> {error, _} = emqx_resource:check_and_create_local( diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index 8c79e7274..bfd1c957e 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,6 +1,6 @@ {erl_opts, [debug_info]}. {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} - , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.6.4"}}} + , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.0"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 0ede2a6a5..7759ef2a2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -4,7 +4,8 @@ {applications, [ kernel, stdlib, - emqx_ee_connector + emqx_ee_connector, + telemetry ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index ac5177f6e..2540b987c 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -145,7 +145,7 @@ fields(producer_opts) -> })} ]; fields(producer_mqtt_opts) -> - [{topic, mk(string(), #{desc => ?DESC(mqtt_topic)})}]; + [{topic, mk(binary(), #{desc => ?DESC(mqtt_topic)})}]; fields(producer_kafka_opts) -> [ {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index ce82dbe2d..aceafd6d1 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -12,7 +12,10 @@ on_get_status/2 ]). --export([on_kafka_ack/3]). +-export([ + on_kafka_ack/3, + handle_telemetry_event/4 +]). -include_lib("emqx/include/logger.hrl"). @@ -30,6 +33,7 @@ on_start(InstId, Config) -> authentication := Auth, ssl := SSL } = Config, + _ = maybe_install_wolff_telemetry_handlers(InstId), %% it's a bug if producer config is not found %% the caller should not try to start a producer if %% there is no producer config @@ -85,20 +89,27 @@ on_start(InstId, Config) -> throw(failed_to_start_kafka_producer) end. -on_stop(_InstId, #{client_id := ClientID, producers := Producers}) -> - with_log_at_error( +on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) -> + _ = with_log_at_error( fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, #{ msg => "failed_to_delete_kafka_producer", client_id => ClientID } ), - with_log_at_error( + _ = with_log_at_error( fun() -> wolff:stop_and_delete_supervised_client(ClientID) end, #{ msg => "failed_to_delete_kafka_client", client_id => ClientID } + ), + with_log_at_error( + fun() -> uninstall_telemetry_handlers(InstanceID) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + client_id => ClientID + } ). %% @doc The callback API for rule-engine (or bridge without rules) @@ -222,6 +233,9 @@ producers_config(BridgeName, ClientId, Input) -> disk -> {false, replayq_dir(ClientId)}; hybrid -> {true, replayq_dir(ClientId)} end, + %% TODO: change this once we add kafka source + BridgeType = kafka, + ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), #{ name => make_producer_name(BridgeName), partitioner => PartitionStrategy, @@ -234,7 +248,8 @@ producers_config(BridgeName, ClientId, Input) -> required_acks => RequiredAcks, max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, - compression => Compression + compression => Compression, + telemetry_meta_data => #{bridge_id => ResourceID} }. replayq_dir(ClientId) -> @@ -268,3 +283,96 @@ get_required(Field, Config, Throw) -> Value = maps:get(Field, Config, none), Value =:= none andalso throw(Throw), Value. + +handle_telemetry_event( + [wolff, dropped], + #{counter_inc := Val}, + #{bridge_id := ID}, + _HandlerConfig +) when is_integer(Val) -> + emqx_resource_metrics:dropped_inc(ID, Val); +handle_telemetry_event( + [wolff, dropped_queue_full], + #{counter_inc := Val}, + #{bridge_id := ID}, + _HandlerConfig +) when is_integer(Val) -> + emqx_resource_metrics:dropped_queue_full_inc(ID, Val); +handle_telemetry_event( + [wolff, queuing], + #{counter_inc := Val}, + #{bridge_id := ID}, + _HandlerConfig +) when is_integer(Val) -> + emqx_resource_metrics:queuing_change(ID, Val); +handle_telemetry_event( + [wolff, retried], + #{counter_inc := Val}, + #{bridge_id := ID}, + _HandlerConfig +) when is_integer(Val) -> + emqx_resource_metrics:retried_inc(ID, Val); +handle_telemetry_event( + [wolff, failed], + #{counter_inc := Val}, + #{bridge_id := ID}, + _HandlerConfig +) when is_integer(Val) -> + emqx_resource_metrics:failed_inc(ID, Val); +handle_telemetry_event( + [wolff, inflight], + #{counter_inc := Val}, + #{bridge_id := ID}, + _HandlerConfig +) when is_integer(Val) -> + emqx_resource_metrics:inflight_change(ID, Val); +handle_telemetry_event( + [wolff, retried_failed], + #{counter_inc := Val}, + #{bridge_id := ID}, + _HandlerConfig +) when is_integer(Val) -> + emqx_resource_metrics:retried_failed_inc(ID, Val); +handle_telemetry_event( + [wolff, retried_success], + #{counter_inc := Val}, + #{bridge_id := ID}, + _HandlerConfig +) when is_integer(Val) -> + emqx_resource_metrics:retried_success_inc(ID, Val); +handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> + %% Event that we do not handle + ok. + +-spec telemetry_handler_id(emqx_resource:resource_id()) -> binary(). +telemetry_handler_id(InstanceID) -> + <<"emqx-bridge-kafka-producer-", InstanceID/binary>>. + +uninstall_telemetry_handlers(InstanceID) -> + HandlerID = telemetry_handler_id(InstanceID), + telemetry:detach(HandlerID). + +maybe_install_wolff_telemetry_handlers(InstanceID) -> + %% Attach event handlers for Kafka telemetry events. If a handler with the + %% handler id already exists, the attach_many function does nothing + telemetry:attach_many( + %% unique handler id + telemetry_handler_id(InstanceID), + %% Note: we don't handle `[wolff, success]' because, + %% currently, we already increment the success counter for + %% this resource at `emqx_rule_runtime:handle_action' when + %% the response is `ok' and we would double increment it + %% here. + [ + [wolff, dropped], + [wolff, dropped_queue_full], + [wolff, queuing], + [wolff, retried], + [wolff, failed], + [wolff, inflight], + [wolff, retried_failed], + [wolff, retried_success] + ], + fun ?MODULE:handle_telemetry_event/4, + [] + ). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index fb929e692..2eef1170d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -184,10 +184,6 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> true -> kafka_hosts_string_ssl(); false -> kafka_hosts_string() end, - kafka_bridge_rest_api_helper(#{ - <<"bootstrap_hosts">> => NormalHostsString, - <<"authentication">> => <<"none">> - }), SASLHostsString = case UseSSL of true -> kafka_hosts_string_ssl_sasl(); @@ -204,6 +200,15 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())}; false -> #{} end, + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => NormalHostsString, + <<"authentication">> => <<"none">> + }, + SSLSettings + ) + ), kafka_bridge_rest_api_helper( maps:merge( #{ @@ -243,10 +248,20 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> ok. kafka_bridge_rest_api_helper(Config) -> + BridgeType = "kafka", + BridgeName = "my_kafka_bridge", + BridgeID = emqx_bridge_resource:bridge_id( + erlang:list_to_binary(BridgeType), + erlang:list_to_binary(BridgeName) + ), + ResourceId = emqx_bridge_resource:resource_id( + erlang:list_to_binary(BridgeType), + erlang:list_to_binary(BridgeName) + ), UrlEscColon = "%3A", - BridgeIdUrlEnc = "kafka" ++ UrlEscColon ++ "my_kafka_bridge", + BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName, BridgesParts = ["bridges"], - BridgesPartsId = ["bridges", BridgeIdUrlEnc], + BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"], OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end, BridgesPartsOpDisable = OpUrlFun("disable"), BridgesPartsOpEnable = OpUrlFun("enable"), @@ -268,15 +283,13 @@ kafka_bridge_rest_api_helper(Config) -> case MyKafkaBridgeExists() of true -> %% Delete the bridge my_kafka_bridge - show( - '========================================== DELETE ========================================' - ), - {ok, 204, <<>>} = show(http_delete(BridgesPartsId)); + {ok, 204, <<>>} = show(http_delete(BridgesPartsIdDeleteAlsoActions)); false -> ok end, false = MyKafkaBridgeExists(), %% Create new Kafka bridge + KafkaTopic = "test-topic-one-partition", CreateBodyTmp = #{ <<"type">> => <<"kafka">>, <<"name">> => <<"my_kafka_bridge">>, @@ -288,7 +301,7 @@ kafka_bridge_rest_api_helper(Config) -> topic => <<"t/#">> }, <<"kafka">> => #{ - <<"topic">> => <<"test-topic-one-partition">> + <<"topic">> => erlang:list_to_binary(KafkaTopic) } } }, @@ -300,6 +313,59 @@ kafka_bridge_rest_api_helper(Config) -> {ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))), %% Check that the new bridge is in the list of bridges true = MyKafkaBridgeExists(), + %% Create a rule that uses the bridge + {ok, 201, _Rule} = http_post( + ["rules"], + #{ + <<"name">> => <<"kafka_bridge_rest_api_helper_rule">>, + <<"enable">> => true, + <<"actions">> => [BridgeID], + <<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">> + } + ), + %% counters should be empty before + ?assertEqual(0, emqx_resource_metrics:matched_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:success_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)), + %% Get offset before sending message + {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), + %% Send message to topic and check that it got forwarded to Kafka + Body = <<"message from EMQX">>, + emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)), + %% Give Kafka some time to get message + timer:sleep(100), + %% Check that Kafka got message + BrodOut = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), + {ok, {_, [KafkaMsg]}} = show(BrodOut), + Body = KafkaMsg#kafka_message.value, + %% Check crucial counters and gauges + ?assertEqual(1, emqx_resource_metrics:matched_get(ResourceId)), + ?assertEqual(1, emqx_resource_metrics:success_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)), %% Perform operations {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), @@ -309,7 +375,7 @@ kafka_bridge_rest_api_helper(Config) -> {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})), %% Cleanup - {ok, 204, _} = show(http_delete(BridgesPartsId)), + {ok, 204, _} = show(http_delete(BridgesPartsIdDeleteAlsoActions)), false = MyKafkaBridgeExists(), ok. @@ -325,7 +391,8 @@ publish_with_and_without_ssl(AuthSettings) -> publish_helper(#{ auth_settings => AuthSettings, ssl_settings => valid_ssl_settings() - }). + }), + ok. publish_helper(#{ auth_settings := AuthSettings, @@ -345,6 +412,7 @@ publish_helper(#{ Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]), Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), InstId = emqx_bridge_resource:resource_id("kafka", Name), + BridgeId = emqx_bridge_resource:bridge_id("kafka", Name), KafkaTopic = "test-topic-one-partition", Conf = config(#{ "authentication" => AuthSettings, @@ -353,6 +421,7 @@ publish_helper(#{ "instance_id" => InstId, "ssl" => SSLSettings }), + emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}), %% To make sure we get unique value timer:sleep(1), Time = erlang:monotonic_time(), @@ -371,6 +440,7 @@ publish_helper(#{ {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), ok = ?PRODUCER:on_stop(InstId, State), + ok = emqx_bridge_resource:remove(BridgeId), ok. config(Args) -> @@ -407,7 +477,7 @@ hocon_config_template() -> """ bootstrap_hosts = \"{{ kafka_hosts_string }}\" enable = true -authentication = {{{ authentication }}} +authentication = {{{ authentication }}} ssl = {{{ ssl }}} producer = { mqtt { diff --git a/mix.exs b/mix.exs index 69538142e..e9f861ce5 100644 --- a/mix.exs +++ b/mix.exs @@ -63,6 +63,7 @@ defmodule EMQXUmbrella.MixProject do {:rulesql, github: "emqx/rulesql", tag: "0.1.4"}, {:observer_cli, "1.7.1"}, {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"}, + {:telemetry, "1.1.0"}, # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true}, @@ -130,7 +131,7 @@ defmodule EMQXUmbrella.MixProject do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.4", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.6.4"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.7.0"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.4"}, @@ -207,6 +208,7 @@ defmodule EMQXUmbrella.MixProject do redbug: :permanent, xmerl: :permanent, hocon: :load, + telemetry: :permanent, emqx: :load, emqx_conf: :load, emqx_machine: :permanent diff --git a/rebar.config b/rebar.config index 769fe6e78..505c475cc 100644 --- a/rebar.config +++ b/rebar.config @@ -71,6 +71,7 @@ , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.1"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} + , {telemetry, "1.1.0"} ]}. {xref_ignores, diff --git a/rebar.config.erl b/rebar.config.erl index a8c54e5f4..f745b5cca 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -360,6 +360,7 @@ relx_apps(ReleaseType, Edition) -> redbug, xmerl, {hocon, load}, + telemetry, % started by emqx_machine {emqx, load}, {emqx_conf, load},