fix: uninstall telemetry handler on resource stop, use unique id

This commit is contained in:
Thales Macedo Garitezi 2022-10-13 09:58:57 -03:00
parent 24eda247ae
commit 1ad3b5df17
1 changed files with 20 additions and 5 deletions

View File

@ -33,7 +33,7 @@ on_start(InstId, Config) ->
authentication := Auth, authentication := Auth,
ssl := SSL ssl := SSL
} = Config, } = Config,
_ = maybe_install_wolff_telemetry_handlers(), _ = maybe_install_wolff_telemetry_handlers(InstId),
%% it's a bug if producer config is not found %% it's a bug if producer config is not found
%% the caller should not try to start a producer if %% the caller should not try to start a producer if
%% there is no producer config %% there is no producer config
@ -89,7 +89,7 @@ on_start(InstId, Config) ->
throw(failed_to_start_kafka_producer) throw(failed_to_start_kafka_producer)
end. end.
on_stop(_InstId, #{client_id := ClientID, producers := Producers}) -> on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) ->
with_log_at_error( with_log_at_error(
fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
#{ #{
@ -103,6 +103,13 @@ on_stop(_InstId, #{client_id := ClientID, producers := Producers}) ->
msg => "failed_to_delete_kafka_client", msg => "failed_to_delete_kafka_client",
client_id => ClientID client_id => ClientID
} }
),
with_log_at_error(
fun() -> uninstall_telemetry_handlers(InstanceID) end,
#{
msg => "failed_to_uninstall_telemetry_handlers",
client_id => ClientID
}
). ).
%% @doc The callback API for rule-engine (or bridge without rules) %% @doc The callback API for rule-engine (or bridge without rules)
@ -337,12 +344,20 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
%% Event that we do not handle %% Event that we do not handle
ok. ok.
maybe_install_wolff_telemetry_handlers() -> -spec telemetry_handler_id(emqx_resource:resource_id()) -> binary().
telemetry_handler_id(InstanceID) ->
<<"emqx-bridge-kafka-producer-", InstanceID/binary, "-telemetry-handler">>.
uninstall_telemetry_handlers(InstanceID) ->
HandlerID = telemetry_handler_id(InstanceID),
telemetry:detach(HandlerID).
maybe_install_wolff_telemetry_handlers(InstanceID) ->
%% Attach event handlers for Kafka telemetry events. If a handler with the %% Attach event handlers for Kafka telemetry events. If a handler with the
%% handler id already exists, the attach_many function does nothing %% handler id already exists, the attach_many function does nothing
telemetry:attach_many( telemetry:attach_many(
%% unique handler id %% unique handler id
<<"emqx-bridge-kafka-producer-telemetry-handler">>, telemetry_handler_id(InstanceID),
%% Note: we don't handle `[wolff, success]' because, %% Note: we don't handle `[wolff, success]' because,
%% currently, we already increment the success counter for %% currently, we already increment the success counter for
%% this resource at `emqx_rule_runtime:handle_action' when %% this resource at `emqx_rule_runtime:handle_action' when
@ -358,6 +373,6 @@ maybe_install_wolff_telemetry_handlers() ->
[wolff, retried_failed], [wolff, retried_failed],
[wolff, retried_success] [wolff, retried_success]
], ],
fun emqx_bridge_impl_kafka_producer:handle_telemetry_event/4, fun ?MODULE:handle_telemetry_event/4,
[] []
). ).