diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 04965ec02..082198309 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -49,11 +49,11 @@ get_channels/2 ]). --callback connector_config(ParsedConfig, ConnectorName :: atom() | binary()) -> +-callback connector_config(ParsedConfig) -> ParsedConfig when ParsedConfig :: #{atom() => any()}. --optional_callbacks([connector_config/2]). +-optional_callbacks([connector_config/1]). -if(?EMQX_RELEASE_EDITION == ee). connector_to_resource_type(ConnectorType) -> @@ -164,14 +164,15 @@ start(Type, Name) -> create(Type, Name, Conf) -> create(Type, Name, Conf, #{}). -create(Type, Name, Conf, Opts) -> +create(Type, Name, Conf0, Opts) -> ?SLOG(info, #{ msg => "create connector", type => Type, name => Name, - config => emqx_utils:redact(Conf) + config => emqx_utils:redact(Conf0) }), TypeBin = bin(Type), + Conf = Conf0#{connector_type => TypeBin, connector_name => Name}, {ok, _Data} = emqx_resource:create_local( resource_id(Type, Name), <<"emqx_connector">>, @@ -263,14 +264,15 @@ create_dry_run(Type, Conf0, Callback) -> Conf1 = maps:without([<<"name">>], Conf0), RawConf = #{<<"connectors">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, try - CheckedConf = + CheckedConf1 = hocon_tconf:check_plain( emqx_connector_schema, RawConf, #{atom_key => true, required => false} ), - Conf = get_temp_conf(TypeAtom, CheckedConf), - case emqx_connector_ssl:convert_certs(TmpPath, Conf) of + CheckedConf2 = get_temp_conf(TypeAtom, CheckedConf1), + CheckedConf = CheckedConf2#{connector_type => TypeBin, connector_name => TmpName}, + case emqx_connector_ssl:convert_certs(TmpPath, CheckedConf) of {error, Reason} -> {error, Reason}; {ok, ConfNew} -> @@ -386,20 +388,14 @@ parse_confs(<<"iotdb">>, Name, Conf) -> ); %% TODO: rename this to `kafka_producer' after alias support is added %% to hocon; keeping this as just `kafka' for backwards compatibility. -parse_confs(<<"kafka">> = _Type, Name, Conf) -> - Conf#{connector_name => Name}; -parse_confs(<<"pulsar_producer">> = _Type, Name, Conf) -> - Conf#{connector_name => Name}; -parse_confs(<<"kinesis_producer">> = _Type, Name, Conf) -> - Conf#{connector_name => Name}; -parse_confs(ConnectorType, ConnectorName, Config) -> - connector_config(ConnectorType, ConnectorName, Config). +parse_confs(ConnectorType, _Name, Config) -> + connector_config(ConnectorType, Config). -connector_config(ConnectorType, ConnectorName, Config) -> +connector_config(ConnectorType, Config) -> Mod = connector_impl_module(ConnectorType), - case erlang:function_exported(Mod, connector_config, 2) of + case erlang:function_exported(Mod, connector_config, 1) of true -> - Mod:connector_config(Config, ConnectorName); + Mod:connector_config(Config); false -> Config end.