refactor(kafka): fix typo and take connector type from input

This commit is contained in:
Thales Macedo Garitezi 2023-10-26 15:41:45 -03:00 committed by Zaiming (Stone) Shi
parent 3cb700827f
commit 04a832a80a
1 changed files with 12 additions and 13 deletions

View File

@ -31,10 +31,9 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%% Allocatable resources %% Allocatable resources
-define(kafka_telementry_id, kafka_telementry_id). -define(kafka_telemetry_id, kafka_telemetry_id).
-define(kafka_client_id, kafka_client_id). -define(kafka_client_id, kafka_client_id).
-define(kafka_producers, kafka_producers). -define(kafka_producers, kafka_producers).
-define(CONNECTOR_TYPE, kafka).
query_mode(#{kafka := #{query_mode := sync}}) -> query_mode(#{kafka := #{query_mode := sync}}) ->
simple_sync_internal_buffer; simple_sync_internal_buffer;
@ -43,22 +42,22 @@ query_mode(_) ->
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
%% @doc Config schema is defined in emqx_connector_kafka. %% @doc Config schema is defined in emqx_bridge_kafka.
on_start(InstId, Config) -> on_start(InstId, Config) ->
#{ #{
authentication := Auth, authentication := Auth,
bootstrap_hosts := Hosts0, bootstrap_hosts := Hosts0,
connector_name := ConnectorName, connector_name := ConnectorName,
connector_type := ConnectorType,
connect_timeout := ConnTimeout, connect_timeout := ConnTimeout,
metadata_request_timeout := MetaReqTimeout, metadata_request_timeout := MetaReqTimeout,
min_metadata_refresh_interval := MinMetaRefreshInterval, min_metadata_refresh_interval := MinMetaRefreshInterval,
socket_opts := SocketOpts, socket_opts := SocketOpts,
ssl := SSL ssl := SSL
} = Config, } = Config,
ConnectorType = ?CONNECTOR_TYPE,
ResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName), ResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
Hosts = emqx_bridge_kafka_impl:hosts(Hosts0), Hosts = emqx_bridge_kafka_impl:hosts(Hosts0),
ClientId = emqx_bridge_kafka_impl:make_client_id(?CONNECTOR_TYPE, ConnectorName), ClientId = emqx_bridge_kafka_impl:make_client_id(ConnectorType, ConnectorName),
ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId),
ClientConfig = #{ ClientConfig = #{
min_metadata_refresh_interval => MinMetaRefreshInterval, min_metadata_refresh_interval => MinMetaRefreshInterval,
@ -160,7 +159,7 @@ create_producers_for_bridge_v2(
{ok, Producers} -> {ok, Producers} ->
ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers), ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers),
ok = emqx_resource:allocate_resource( ok = emqx_resource:allocate_resource(
InstId, {?kafka_telementry_id, BridgeV2Id}, BridgeV2Id InstId, {?kafka_telemetry_id, BridgeV2Id}, BridgeV2Id
), ),
_ = maybe_install_wolff_telemetry_handlers(BridgeV2Id), _ = maybe_install_wolff_telemetry_handlers(BridgeV2Id),
{ok, #{ {ok, #{
@ -203,8 +202,8 @@ on_stop(InstanceId, _State) ->
fun fun
({?kafka_producers, _BridgeV2Id}, Producers) -> ({?kafka_producers, _BridgeV2Id}, Producers) ->
deallocate_producers(ClientId, Producers); deallocate_producers(ClientId, Producers);
({?kafka_telementry_id, _BridgeV2Id}, TelementryId) -> ({?kafka_telemetry_id, _BridgeV2Id}, TelemetryId) ->
deallocate_telementry_handlers(TelementryId); deallocate_telemetry_handlers(TelemetryId);
(_, _) -> (_, _) ->
ok ok
end, end,
@ -231,12 +230,12 @@ deallocate_producers(ClientId, Producers) ->
} }
). ).
deallocate_telementry_handlers(TelementryId) -> deallocate_telemetry_handlers(TelemetryId) ->
_ = with_log_at_error( _ = with_log_at_error(
fun() -> uninstall_telemetry_handlers(TelementryId) end, fun() -> uninstall_telemetry_handlers(TelemetryId) end,
#{ #{
msg => "failed_to_uninstall_telemetry_handlers", msg => "failed_to_uninstall_telemetry_handlers",
resource_id => TelementryId resource_id => TelemetryId
} }
). ).
@ -249,10 +248,10 @@ remove_producers_for_bridge_v2(
fun fun
({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id -> ({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id ->
deallocate_producers(ClientId, Producers); deallocate_producers(ClientId, Producers);
({?kafka_telementry_id, BridgeV2IdCheck}, TelementryId) when ({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when
BridgeV2IdCheck =:= BridgeV2Id BridgeV2IdCheck =:= BridgeV2Id
-> ->
deallocate_telementry_handlers(TelementryId); deallocate_telemetry_handlers(TelemetryId);
(_, _) -> (_, _) ->
ok ok
end, end,