fix: don't assume resource-id starts with "connector:"

This commit is contained in:
Stefan Strigler 2023-10-09 17:16:40 +02:00 committed by Zaiming (Stone) Shi
parent 4b4eb19b0b
commit 8567ccafc1
2 changed files with 3 additions and 3 deletions

View File

@ -44,7 +44,7 @@ query_mode(_) ->
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
%% @doc Config schema is defined in emqx_connector_kafka. %% @doc Config schema is defined in emqx_connector_kafka.
on_start(<<"connector:", _/binary>> = InstId, Config) -> on_start(InstId, Config) ->
#{ #{
authentication := Auth, authentication := Auth,
bootstrap_hosts := Hosts0, bootstrap_hosts := Hosts0,
@ -467,7 +467,7 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise,
%% `emqx_resource_manager' will kill the wolff producers and messages might be lost. %% `emqx_resource_manager' will kill the wolff producers and messages might be lost.
on_get_status( on_get_status(
<<"connector:", _/binary>> = _InstId, _InstId,
#{client_id := ClientId} = State #{client_id := ClientId} = State
) -> ) ->
case wolff_client_sup:find_client(ClientId) of case wolff_client_sup:find_client(ClientId) of

View File

@ -86,7 +86,7 @@ parse_connector_id(ConnectorId, Opts) ->
case string:split(bin(ConnectorId), ":", all) of case string:split(bin(ConnectorId), ":", all) of
[Type, Name] -> [Type, Name] ->
{to_type_atom(Type), validate_name(Name, Opts)}; {to_type_atom(Type), validate_name(Name, Opts)};
[<<"connector">>, Type, Name] -> [_, Type, Name] ->
{to_type_atom(Type), validate_name(Name, Opts)}; {to_type_atom(Type), validate_name(Name, Opts)};
_ -> _ ->
invalid_data( invalid_data(