fix(connector api): handle bad tls config file conversion errors
Fixes https://emqx.atlassian.net/browse/EMQX-12581
This commit is contained in:
parent
7e089dce6b
commit
795d280861
|
@ -62,7 +62,9 @@ init_per_group(_Group, Config) ->
|
||||||
end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
|
end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
|
||||||
ProxyHost = ?config(proxy_host, Config),
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
Apps = ?config(apps, Config),
|
||||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||||
|
emqx_cth_suite:stop(Apps),
|
||||||
ok;
|
ok;
|
||||||
end_per_group(_Group, _Config) ->
|
end_per_group(_Group, _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
@ -109,22 +111,24 @@ common_init(ConfigT) ->
|
||||||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||||
% Ensure enterprise bridge module is loaded
|
Apps = emqx_cth_suite:start(
|
||||||
ok = emqx_common_test_helpers:start_apps([
|
[
|
||||||
emqx_conf, emqx_resource, emqx_bridge, rocketmq
|
emqx,
|
||||||
]),
|
emqx_conf,
|
||||||
_ = emqx_bridge_enterprise:module_info(),
|
emqx_connector,
|
||||||
emqx_mgmt_api_test_util:init_suite(),
|
emqx_bridge_rocketmq,
|
||||||
|
emqx_bridge,
|
||||||
|
emqx_rule_engine,
|
||||||
|
emqx_management,
|
||||||
|
emqx_mgmt_api_test_util:emqx_dashboard()
|
||||||
|
],
|
||||||
|
#{work_dir => emqx_cth_suite:work_dir(Config0)}
|
||||||
|
),
|
||||||
{Name, RocketMQConf} = rocketmq_config(BridgeType, Config0),
|
{Name, RocketMQConf} = rocketmq_config(BridgeType, Config0),
|
||||||
RocketMQSSLConf = RocketMQConf#{
|
RocketMQSSLConf = rocketmq_ssl_config(RocketMQConf, Config0),
|
||||||
<<"servers">> => <<"rocketmq_namesrv_ssl:9876">>,
|
|
||||||
<<"ssl">> => #{
|
|
||||||
<<"enable">> => true,
|
|
||||||
<<"verify">> => verify_none
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Config =
|
Config =
|
||||||
[
|
[
|
||||||
|
{apps, Apps},
|
||||||
{rocketmq_config, RocketMQConf},
|
{rocketmq_config, RocketMQConf},
|
||||||
{rocketmq_config_ssl, RocketMQSSLConf},
|
{rocketmq_config_ssl, RocketMQSSLConf},
|
||||||
{rocketmq_bridge_type, BridgeType},
|
{rocketmq_bridge_type, BridgeType},
|
||||||
|
@ -143,6 +147,24 @@ common_init(ConfigT) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
rocketmq_ssl_config(NonSSLConfig, _TCConfig) ->
|
||||||
|
%% TODO: generate fixed files for server and client to actually test TLS...
|
||||||
|
%% DataDir = ?config(data_dir, TCConfig),
|
||||||
|
%% emqx_test_tls_certs_helper:generate_tls_certs(TCConfig),
|
||||||
|
%% Keyfile = filename:join([DataDir, "client1.key"]),
|
||||||
|
%% Certfile = filename:join([DataDir, "client1.pem"]),
|
||||||
|
%% CACertfile = filename:join([DataDir, "intermediate1.pem"]),
|
||||||
|
NonSSLConfig#{
|
||||||
|
<<"servers">> => <<"rocketmq_namesrv_ssl:9876">>,
|
||||||
|
<<"ssl">> => #{
|
||||||
|
%% <<"keyfile">> => iolist_to_binary(Keyfile),
|
||||||
|
%% <<"certfile">> => iolist_to_binary(Certfile),
|
||||||
|
%% <<"cacertfile">> => iolist_to_binary(CACertfile),
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"verify">> => <<"verify_none">>
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
rocketmq_config(BridgeType, Config) ->
|
rocketmq_config(BridgeType, Config) ->
|
||||||
Port = integer_to_list(?GET_CONFIG(port, Config)),
|
Port = integer_to_list(?GET_CONFIG(port, Config)),
|
||||||
Server = ?GET_CONFIG(host, Config) ++ ":" ++ Port,
|
Server = ?GET_CONFIG(host, Config) ++ ":" ++ Port,
|
||||||
|
@ -174,13 +196,7 @@ rocketmq_config(BridgeType, Config) ->
|
||||||
QueryMode
|
QueryMode
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
{Name, parse_and_check(ConfigString, BridgeType, Name)}.
|
{Name, emqx_bridge_testlib:parse_and_check(BridgeType, Name, ConfigString)}.
|
||||||
|
|
||||||
parse_and_check(ConfigString, BridgeType, Name) ->
|
|
||||||
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
|
||||||
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
|
||||||
#{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
|
|
||||||
Config.
|
|
||||||
|
|
||||||
create_bridge(Config) ->
|
create_bridge(Config) ->
|
||||||
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
|
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
|
||||||
|
@ -192,7 +208,11 @@ create_bridge_ssl(Config) ->
|
||||||
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
|
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
|
||||||
Name = ?GET_CONFIG(rocketmq_name, Config),
|
Name = ?GET_CONFIG(rocketmq_name, Config),
|
||||||
RocketMQConf = ?GET_CONFIG(rocketmq_config_ssl, Config),
|
RocketMQConf = ?GET_CONFIG(rocketmq_config_ssl, Config),
|
||||||
emqx_bridge:create(BridgeType, Name, RocketMQConf).
|
emqx_bridge_testlib:create_bridge_api([
|
||||||
|
{bridge_type, BridgeType},
|
||||||
|
{bridge_name, Name},
|
||||||
|
{bridge_config, RocketMQConf}
|
||||||
|
]).
|
||||||
|
|
||||||
create_bridge_ssl_bad_ssl_opts(Config) ->
|
create_bridge_ssl_bad_ssl_opts(Config) ->
|
||||||
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
|
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
|
||||||
|
|
|
@ -121,8 +121,8 @@ pre_config_update(Path, _Name, Conf = #{<<"transport_options">> := TransportOpts
|
||||||
case emqx_connector_ssl:convert_certs(filename:join(Path), TransportOpts) of
|
case emqx_connector_ssl:convert_certs(filename:join(Path), TransportOpts) of
|
||||||
{ok, NTransportOpts} ->
|
{ok, NTransportOpts} ->
|
||||||
{ok, Conf#{<<"transport_options">> := NTransportOpts}};
|
{ok, Conf#{<<"transport_options">> := NTransportOpts}};
|
||||||
{error, {bad_ssl_config, Error}} ->
|
{error, Error} ->
|
||||||
{error, Error#{reason => <<"bad_ssl_config">>}}
|
{error, Error}
|
||||||
end;
|
end;
|
||||||
pre_config_update(_Path, _Name, Conf, _ConfOld) ->
|
pre_config_update(_Path, _Name, Conf, _ConfOld) ->
|
||||||
{ok, Conf}.
|
{ok, Conf}.
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_connector, [
|
{application, emqx_connector, [
|
||||||
{description, "EMQX Data Integration Connectors"},
|
{description, "EMQX Data Integration Connectors"},
|
||||||
{vsn, "0.3.2"},
|
{vsn, "0.3.3"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_connector_app, []}},
|
{mod, {emqx_connector_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -352,7 +352,7 @@ convert_certs(ConnectorsConf) ->
|
||||||
name => Name,
|
name => Name,
|
||||||
reason => Reason
|
reason => Reason
|
||||||
}),
|
}),
|
||||||
throw({bad_ssl_config, Reason});
|
throw(Reason);
|
||||||
{ok, ConnectorConf1} ->
|
{ok, ConnectorConf1} ->
|
||||||
ConnectorConf1
|
ConnectorConf1
|
||||||
end
|
end
|
||||||
|
|
|
@ -35,7 +35,7 @@ new_ssl_config(RltvDir, Config, SSL) ->
|
||||||
{ok, NewSSL} ->
|
{ok, NewSSL} ->
|
||||||
{ok, new_ssl_config(Config, NewSSL)};
|
{ok, new_ssl_config(Config, NewSSL)};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, {bad_ssl_config, Reason}}
|
{error, map_bad_ssl_error(Reason)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
new_ssl_config(#{connector := Connector} = Config, NewSSL) ->
|
new_ssl_config(#{connector := Connector} = Config, NewSSL) ->
|
||||||
|
@ -48,3 +48,25 @@ new_ssl_config(#{<<"ssl">> := _} = Config, NewSSL) ->
|
||||||
Config#{<<"ssl">> => NewSSL};
|
Config#{<<"ssl">> => NewSSL};
|
||||||
new_ssl_config(Config, _NewSSL) ->
|
new_ssl_config(Config, _NewSSL) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
map_bad_ssl_error(#{pem_check := invalid_pem} = TLSLibError) ->
|
||||||
|
#{which_options := Paths, file_read := Reason} = TLSLibError,
|
||||||
|
#{
|
||||||
|
kind => validation_error,
|
||||||
|
reason => <<"bad_ssl_config">>,
|
||||||
|
bad_fields => Paths,
|
||||||
|
file_read_error => Reason
|
||||||
|
};
|
||||||
|
map_bad_ssl_error(#{which_options := Paths, reason := Reason}) ->
|
||||||
|
#{
|
||||||
|
kind => validation_error,
|
||||||
|
reason => <<"bad_ssl_config">>,
|
||||||
|
bad_fields => Paths,
|
||||||
|
details => Reason
|
||||||
|
};
|
||||||
|
map_bad_ssl_error(TLSLibError) ->
|
||||||
|
#{
|
||||||
|
kind => validation_error,
|
||||||
|
reason => <<"bad_ssl_config">>,
|
||||||
|
details => TLSLibError
|
||||||
|
}.
|
||||||
|
|
|
@ -718,6 +718,37 @@ t_create_with_bad_name(Config) ->
|
||||||
?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg),
|
?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% Checks that we correctly handle `throw({bad_ssl_config, _})' from
|
||||||
|
%% `emqx_connector:convert_certs' and massage the error message accordingly.
|
||||||
|
t_create_with_bad_tls_files(Config) ->
|
||||||
|
ConnectorName = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
Conf0 = ?KAFKA_CONNECTOR(ConnectorName),
|
||||||
|
Conf = Conf0#{<<"ssl">> => #{<<"cacertfile">> => <<"bad_pem_file">>}},
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
{ok, 400, #{
|
||||||
|
<<"message">> := Msg0
|
||||||
|
}} = request_json(
|
||||||
|
post,
|
||||||
|
uri(["connectors"]),
|
||||||
|
Conf,
|
||||||
|
Config
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"kind">> := <<"validation_error">>,
|
||||||
|
<<"reason">> := <<"bad_ssl_config">>,
|
||||||
|
<<"file_read_error">> := <<"enoent">>,
|
||||||
|
<<"bad_fields">> := [[<<"cacertfile">>]]
|
||||||
|
},
|
||||||
|
json(Msg0)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_actions_field(Config) ->
|
t_actions_field(Config) ->
|
||||||
Name = ?CONNECTOR_NAME,
|
Name = ?CONNECTOR_NAME,
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
|
|
Loading…
Reference in New Issue