diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index c6e8af182..f08d3a829 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -44,7 +44,7 @@ query_mode(_) -> callback_mode() -> async_if_possible. %% @doc Config schema is defined in emqx_connector_kafka. -on_start(<<"connector:", _/binary>> = InstId, Config) -> +on_start(InstId, Config) -> #{ authentication := Auth, bootstrap_hosts := Hosts0, @@ -467,7 +467,7 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, %% `emqx_resource_manager' will kill the wolff producers and messages might be lost. on_get_status( - <<"connector:", _/binary>> = _InstId, + _InstId, #{client_id := ClientId} = State ) -> case wolff_client_sup:find_client(ClientId) of diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 527225e5a..1776dccd8 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -86,7 +86,7 @@ parse_connector_id(ConnectorId, Opts) -> case string:split(bin(ConnectorId), ":", all) of [Type, Name] -> {to_type_atom(Type), validate_name(Name, Opts)}; - [<<"connector">>, Type, Name] -> + [_, Type, Name] -> {to_type_atom(Type), validate_name(Name, Opts)}; _ -> invalid_data(