This commit is contained in:
Stefan Strigler 2023-10-25 11:43:56 +02:00 committed by Zaiming (Stone) Shi
parent b72abaf661
commit f760f0a5c5
1 changed files with 14 additions and 18 deletions

View File

@ -49,11 +49,11 @@
get_channels/2 get_channels/2
]). ]).
-callback connector_config(ParsedConfig, ConnectorName :: atom() | binary()) -> -callback connector_config(ParsedConfig) ->
ParsedConfig ParsedConfig
when when
ParsedConfig :: #{atom() => any()}. ParsedConfig :: #{atom() => any()}.
-optional_callbacks([connector_config/2]). -optional_callbacks([connector_config/1]).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
connector_to_resource_type(ConnectorType) -> connector_to_resource_type(ConnectorType) ->
@ -164,14 +164,15 @@ start(Type, Name) ->
create(Type, Name, Conf) -> create(Type, Name, Conf) ->
create(Type, Name, Conf, #{}). create(Type, Name, Conf, #{}).
create(Type, Name, Conf, Opts) -> create(Type, Name, Conf0, Opts) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "create connector", msg => "create connector",
type => Type, type => Type,
name => Name, name => Name,
config => emqx_utils:redact(Conf) config => emqx_utils:redact(Conf0)
}), }),
TypeBin = bin(Type), TypeBin = bin(Type),
Conf = Conf0#{connector_type => TypeBin, connector_name => Name},
{ok, _Data} = emqx_resource:create_local( {ok, _Data} = emqx_resource:create_local(
resource_id(Type, Name), resource_id(Type, Name),
<<"emqx_connector">>, <<"emqx_connector">>,
@ -263,14 +264,15 @@ create_dry_run(Type, Conf0, Callback) ->
Conf1 = maps:without([<<"name">>], Conf0), Conf1 = maps:without([<<"name">>], Conf0),
RawConf = #{<<"connectors">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, RawConf = #{<<"connectors">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
try try
CheckedConf = CheckedConf1 =
hocon_tconf:check_plain( hocon_tconf:check_plain(
emqx_connector_schema, emqx_connector_schema,
RawConf, RawConf,
#{atom_key => true, required => false} #{atom_key => true, required => false}
), ),
Conf = get_temp_conf(TypeAtom, CheckedConf), CheckedConf2 = get_temp_conf(TypeAtom, CheckedConf1),
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of CheckedConf = CheckedConf2#{connector_type => TypeBin, connector_name => TmpName},
case emqx_connector_ssl:convert_certs(TmpPath, CheckedConf) of
{error, Reason} -> {error, Reason} ->
{error, Reason}; {error, Reason};
{ok, ConfNew} -> {ok, ConfNew} ->
@ -386,20 +388,14 @@ parse_confs(<<"iotdb">>, Name, Conf) ->
); );
%% TODO: rename this to `kafka_producer' after alias support is added %% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility. %% to hocon; keeping this as just `kafka' for backwards compatibility.
parse_confs(<<"kafka">> = _Type, Name, Conf) -> parse_confs(ConnectorType, _Name, Config) ->
Conf#{connector_name => Name}; connector_config(ConnectorType, Config).
parse_confs(<<"pulsar_producer">> = _Type, Name, Conf) ->
Conf#{connector_name => Name};
parse_confs(<<"kinesis_producer">> = _Type, Name, Conf) ->
Conf#{connector_name => Name};
parse_confs(ConnectorType, ConnectorName, Config) ->
connector_config(ConnectorType, ConnectorName, Config).
connector_config(ConnectorType, ConnectorName, Config) -> connector_config(ConnectorType, Config) ->
Mod = connector_impl_module(ConnectorType), Mod = connector_impl_module(ConnectorType),
case erlang:function_exported(Mod, connector_config, 2) of case erlang:function_exported(Mod, connector_config, 1) of
true -> true ->
Mod:connector_config(Config, ConnectorName); Mod:connector_config(Config);
false -> false ->
Config Config
end. end.