fix(kafka): some fixes for kafka producer

- MQTT topic should be a binary
- use correct gauge functions from `wolff_metrics`.
- don't double increment success counter for kafka action
- adds a few more metrics assertions
This commit is contained in:
Thales Macedo Garitezi 2022-10-10 16:55:25 -03:00
parent 57270fb8fc
commit 98500313eb
3 changed files with 55 additions and 32 deletions

View File

@ -145,7 +145,7 @@ fields(producer_opts) ->
})} })}
]; ];
fields(producer_mqtt_opts) -> fields(producer_mqtt_opts) ->
[{topic, mk(string(), #{desc => ?DESC(mqtt_topic)})}]; [{topic, mk(binary(), #{desc => ?DESC(mqtt_topic)})}];
fields(producer_kafka_opts) -> fields(producer_kafka_opts) ->
[ [
{topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},

View File

@ -33,7 +33,7 @@ on_start(InstId, Config) ->
authentication := Auth, authentication := Auth,
ssl := SSL ssl := SSL
} = Config, } = Config,
maybe_install_wolff_telemetry_handlers(), _ = maybe_install_wolff_telemetry_handlers(),
%% it's a bug if producer config is not found %% it's a bug if producer config is not found
%% the caller should not try to start a producer if %% the caller should not try to start a producer if
%% there is no producer config %% there is no producer config
@ -226,7 +226,9 @@ producers_config(BridgeName, ClientId, Input) ->
disk -> {false, replayq_dir(ClientId)}; disk -> {false, replayq_dir(ClientId)};
hybrid -> {true, replayq_dir(ClientId)} hybrid -> {true, replayq_dir(ClientId)}
end, end,
BridgeNameBin = erlang:atom_to_binary(BridgeName), %% TODO: change this once we add kafka source
BridgeType = kafka,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
#{ #{
name => make_producer_name(BridgeName), name => make_producer_name(BridgeName),
partitioner => PartitionStrategy, partitioner => PartitionStrategy,
@ -240,8 +242,7 @@ producers_config(BridgeName, ClientId, Input) ->
max_batch_bytes => MaxBatchBytes, max_batch_bytes => MaxBatchBytes,
max_send_ahead => MaxInflight - 1, max_send_ahead => MaxInflight - 1,
compression => Compression, compression => Compression,
telemetry_meta_data => telemetry_meta_data => #{bridge_id => ResourceID}
#{bridge_id => <<<<"bridge:kafka:">>/binary, BridgeNameBin/binary>>}
}. }.
replayq_dir(ClientId) -> replayq_dir(ClientId) ->
@ -280,66 +281,59 @@ handle_telemetry_event(
[wolff, dropped], [wolff, dropped],
#{counter_inc := Val}, #{counter_inc := Val},
#{bridge_id := ID}, #{bridge_id := ID},
_ _HandlerConfig
) when is_integer(Val) -> ) when is_integer(Val) ->
emqx_resource_metrics:dropped_inc(ID, Val); emqx_resource_metrics:dropped_inc(ID, Val);
handle_telemetry_event( handle_telemetry_event(
[wolff, dropped_queue_full], [wolff, dropped_queue_full],
#{counter_inc := Val}, #{counter_inc := Val},
#{bridge_id := ID}, #{bridge_id := ID},
_ _HandlerConfig
) when is_integer(Val) -> ) when is_integer(Val) ->
emqx_resource_metrics:dropped_queue_full_inc(ID, Val); emqx_resource_metrics:dropped_queue_full_inc(ID, Val);
handle_telemetry_event( handle_telemetry_event(
[wolff, queuing], [wolff, queuing],
#{counter_inc := Val}, #{counter_inc := Val},
#{bridge_id := ID}, #{bridge_id := ID},
_ _HandlerConfig
) when is_integer(Val) -> ) when is_integer(Val) ->
emqx_resource_metrics:queuing_inc(ID, Val); emqx_resource_metrics:queuing_change(ID, Val);
handle_telemetry_event( handle_telemetry_event(
[wolff, retried], [wolff, retried],
#{counter_inc := Val}, #{counter_inc := Val},
#{bridge_id := ID}, #{bridge_id := ID},
_ _HandlerConfig
) when is_integer(Val) -> ) when is_integer(Val) ->
emqx_resource_metrics:retried_inc(ID, Val); emqx_resource_metrics:retried_inc(ID, Val);
handle_telemetry_event( handle_telemetry_event(
[wolff, failed], [wolff, failed],
#{counter_inc := Val}, #{counter_inc := Val},
#{bridge_id := ID}, #{bridge_id := ID},
_ _HandlerConfig
) when is_integer(Val) -> ) when is_integer(Val) ->
emqx_resource_metrics:failed_inc(ID, Val); emqx_resource_metrics:failed_inc(ID, Val);
handle_telemetry_event( handle_telemetry_event(
[wolff, inflight], [wolff, inflight],
#{counter_inc := Val}, #{counter_inc := Val},
#{bridge_id := ID}, #{bridge_id := ID},
_ _HandlerConfig
) when is_integer(Val) -> ) when is_integer(Val) ->
emqx_resource_metrics:inflight_inc(ID, Val); emqx_resource_metrics:inflight_change(ID, Val);
handle_telemetry_event( handle_telemetry_event(
[wolff, retried_failed], [wolff, retried_failed],
#{counter_inc := Val}, #{counter_inc := Val},
#{bridge_id := ID}, #{bridge_id := ID},
_ _HandlerConfig
) when is_integer(Val) -> ) when is_integer(Val) ->
emqx_resource_metrics:retried_failed_inc(ID, Val); emqx_resource_metrics:retried_failed_inc(ID, Val);
handle_telemetry_event( handle_telemetry_event(
[wolff, retried_success], [wolff, retried_success],
#{counter_inc := Val}, #{counter_inc := Val},
#{bridge_id := ID}, #{bridge_id := ID},
_ _HandlerConfig
) when is_integer(Val) -> ) when is_integer(Val) ->
emqx_resource_metrics:retried_success_inc(ID, Val); emqx_resource_metrics:retried_success_inc(ID, Val);
handle_telemetry_event( handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
[wolff, success],
#{counter_inc := Val},
#{bridge_id := ID},
_
) when is_integer(Val) ->
emqx_resource_metrics:success_inc(ID, Val);
handle_telemetry_event(_EventId, _Metrics, _MetaData, _Config) ->
%% Event that we do not handle %% Event that we do not handle
ok. ok.
@ -349,6 +343,11 @@ maybe_install_wolff_telemetry_handlers() ->
telemetry:attach_many( telemetry:attach_many(
%% unique handler id %% unique handler id
<<"emqx-bridge-kafka-producer-telemetry-handler">>, <<"emqx-bridge-kafka-producer-telemetry-handler">>,
%% Note: we don't handle `[wolff, success]' because,
%% currently, we already increment the success counter for
%% this resource at `emqx_rule_runtime:handle_action' when
%% the response is `ok' and we would double increment it
%% here.
[ [
[wolff, dropped], [wolff, dropped],
[wolff, dropped_queue_full], [wolff, dropped_queue_full],
@ -357,8 +356,7 @@ maybe_install_wolff_telemetry_handlers() ->
[wolff, failed], [wolff, failed],
[wolff, inflight], [wolff, inflight],
[wolff, retried_failed], [wolff, retried_failed],
[wolff, retried_success], [wolff, retried_success]
[wolff, success]
], ],
fun emqx_bridge_impl_kafka_producer:handle_telemetry_event/4, fun emqx_bridge_impl_kafka_producer:handle_telemetry_event/4,
[] []

View File

@ -323,6 +323,22 @@ kafka_bridge_rest_api_helper(Config) ->
<<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">> <<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">>
} }
), ),
%% counters should be empty before
?assertEqual(0, emqx_resource_metrics:matched_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:success_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)),
%% Get offset before sending message %% Get offset before sending message
{ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
%% Send message to topic and check that it got forwarded to Kafka %% Send message to topic and check that it got forwarded to Kafka
@ -335,12 +351,21 @@ kafka_bridge_rest_api_helper(Config) ->
{ok, {_, [KafkaMsg]}} = show(BrodOut), {ok, {_, [KafkaMsg]}} = show(BrodOut),
Body = KafkaMsg#kafka_message.value, Body = KafkaMsg#kafka_message.value,
%% Check crucial counters and gauges %% Check crucial counters and gauges
1 = emqx_resource_metrics:matched_get(ResourceId), ?assertEqual(1, emqx_resource_metrics:matched_get(ResourceId)),
1 = emqx_resource_metrics:success_get(ResourceId), ?assertEqual(1, emqx_resource_metrics:success_get(ResourceId)),
0 = emqx_resource_metrics:dropped_get(ResourceId), ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)),
0 = emqx_resource_metrics:failed_get(ResourceId), ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)),
0 = emqx_resource_metrics:inflight_get(ResourceId), ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)),
0 = emqx_resource_metrics:queuing_get(ResourceId), ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)),
%% Perform operations %% Perform operations
{ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
{ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),