feat(mqttbridge): support file-sourced secrets as passwords

This commit is contained in:
Andrew Mayorov 2023-10-24 14:52:25 +07:00
parent 1c2f9321d1
commit 52f4519eeb
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 126 additions and 76 deletions

View File

@ -326,7 +326,7 @@ mk_client_opts(
], ],
Config Config
), ),
Options#{ mk_client_opt_password(Options#{
hosts => [HostPort], hosts => [HostPort],
clientid => clientid(ResourceId, ClientScope, Config), clientid => clientid(ResourceId, ClientScope, Config),
connect_timeout => 30, connect_timeout => 30,
@ -334,7 +334,13 @@ mk_client_opts(
force_ping => true, force_ping => true,
ssl => EnableSsl, ssl => EnableSsl,
ssl_opts => maps:to_list(maps:remove(enable, Ssl)) ssl_opts => maps:to_list(maps:remove(enable, Ssl))
}. }).
mk_client_opt_password(Options = #{password := Secret}) ->
%% TODO: Teach `emqtt` to accept 0-arity closures as passwords.
Options#{password := emqx_secret:unwrap(Secret)};
mk_client_opt_password(Options) ->
Options.
ms_to_s(Ms) -> ms_to_s(Ms) ->
erlang:ceil(Ms / 1000). erlang:ceil(Ms / 1000).

View File

@ -99,13 +99,9 @@ fields("server_configs") ->
} }
)}, )},
{password, {password,
mk( emqx_schema_secret:mk(
binary(),
#{ #{
format => <<"password">>, desc => ?DESC("password")
sensitive => true,
desc => ?DESC("password"),
converter => fun emqx_schema:password_converter/2
} }
)}, )},
{clean_start, {clean_start,

View File

@ -21,13 +21,15 @@
-import(emqx_dashboard_api_test_helpers, [request/4, uri/1]). -import(emqx_dashboard_api_test_helpers, [request/4, uri/1]).
-include("emqx/include/emqx.hrl"). -include("emqx/include/emqx.hrl").
-include("emqx/include/emqx_hooks.hrl").
-include("emqx/include/asserts.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% output functions %% output functions
-export([inspect/3]). -export([inspect/3]).
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
-define(TYPE_MQTT, <<"mqtt">>). -define(TYPE_MQTT, <<"mqtt">>).
-define(BRIDGE_NAME_INGRESS, <<"ingress_mqtt_bridge">>). -define(BRIDGE_NAME_INGRESS, <<"ingress_mqtt_bridge">>).
-define(BRIDGE_NAME_EGRESS, <<"egress_mqtt_bridge">>). -define(BRIDGE_NAME_EGRESS, <<"egress_mqtt_bridge">>).
@ -38,14 +40,18 @@
-define(EGRESS_REMOTE_TOPIC, "egress_remote_topic"). -define(EGRESS_REMOTE_TOPIC, "egress_remote_topic").
-define(EGRESS_LOCAL_TOPIC, "egress_local_topic"). -define(EGRESS_LOCAL_TOPIC, "egress_local_topic").
-define(SERVER_CONF(Username), #{ -define(SERVER_CONF, #{
<<"type">> => ?TYPE_MQTT,
<<"server">> => <<"127.0.0.1:1883">>, <<"server">> => <<"127.0.0.1:1883">>,
<<"username">> => Username,
<<"password">> => <<"">>,
<<"proto_ver">> => <<"v4">>, <<"proto_ver">> => <<"v4">>,
<<"ssl">> => #{<<"enable">> => false} <<"ssl">> => #{<<"enable">> => false}
}). }).
-define(SERVER_CONF(Username, Password), (?SERVER_CONF)#{
<<"username">> => Username,
<<"password">> => Password
}).
-define(INGRESS_CONF, #{ -define(INGRESS_CONF, #{
<<"remote">> => #{ <<"remote">> => #{
<<"topic">> => <<?INGRESS_REMOTE_TOPIC, "/#">>, <<"topic">> => <<?INGRESS_REMOTE_TOPIC, "/#">>,
@ -129,43 +135,32 @@ suite() ->
[{timetrap, {seconds, 30}}]. [{timetrap, {seconds, 30}}].
init_per_suite(Config) -> init_per_suite(Config) ->
_ = application:load(emqx_conf), Apps = emqx_cth_suite:start(
ok = emqx_common_test_helpers:start_apps(
[ [
emqx_rule_engine, emqx_conf,
emqx_bridge, emqx_bridge,
emqx_rule_engine,
emqx_bridge_mqtt, emqx_bridge_mqtt,
emqx_dashboard {emqx_dashboard,
"dashboard {"
"\n listeners.http { bind = 18083 }"
"\n default_username = connector_admin"
"\n default_password = public"
"\n }"}
], ],
fun set_special_configs/1 #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
ok = emqx_common_test_helpers:load_config( [{suite_apps, Apps} | Config].
emqx_rule_engine_schema,
<<"rule_engine {rules {}}">>
),
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT),
Config.
end_per_suite(_Config) -> end_per_suite(Config) ->
emqx_common_test_helpers:stop_apps([ emqx_cth_suite:stop(?config(suite_apps, Config)).
emqx_dashboard,
emqx_bridge_mqtt,
emqx_bridge,
emqx_rule_engine
]),
ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config(<<"connector_admin">>);
set_special_configs(_) ->
ok.
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
Config. Config.
end_per_testcase(_, _Config) -> end_per_testcase(_, _Config) ->
ok = unhook_authenticate(),
clear_resources(), clear_resources(),
snabbkaffe:stop(), snabbkaffe:stop(),
ok. ok.
@ -187,14 +182,84 @@ clear_resources() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_conf_bridge_authn_anonymous(_) ->
ok = hook_authenticate(),
{ok, 201, _Bridge} = request(
post,
uri(["bridges"]),
?SERVER_CONF#{
<<"name">> => <<"t_conf_bridge_anonymous">>,
<<"ingress">> => ?INGRESS_CONF#{<<"pool_size">> => 1}
}
),
?assertReceive(
{authenticate, #{username := undefined, password := undefined}}
).
t_conf_bridge_authn_password(_) ->
Username1 = <<"user1">>,
Password1 = <<"from-here">>,
ok = hook_authenticate(),
{ok, 201, _Bridge1} = request(
post,
uri(["bridges"]),
?SERVER_CONF(Username1, Password1)#{
<<"name">> => <<"t_conf_bridge_authn_password">>,
<<"ingress">> => ?INGRESS_CONF#{<<"pool_size">> => 1}
}
),
?assertReceive(
{authenticate, #{username := Username1, password := Password1}}
).
t_conf_bridge_authn_passfile(Config) ->
DataDir = ?config(data_dir, Config),
Username2 = <<"user2">>,
PasswordFilename = filename:join(DataDir, "password"),
Password2 = <<"from-there">>,
ok = hook_authenticate(),
{ok, 201, _Bridge2} = request(
post,
uri(["bridges"]),
?SERVER_CONF(Username2, iolist_to_binary(["file://", PasswordFilename]))#{
<<"name">> => <<"t_conf_bridge_authn_passfile">>,
<<"ingress">> => ?INGRESS_CONF#{<<"pool_size">> => 1}
}
),
?assertReceive(
{authenticate, #{username := Username2, password := Password2}}
),
{ok, 400, #{<<"message">> := Message}} = request_json(
post,
uri(["bridges"]),
?SERVER_CONF(<<>>, <<"file://im/pretty/sure/theres/no/such/file">>)#{
<<"name">> => <<"t_conf_bridge_authn_no_passfile">>
}
),
?assertMatch(
#{<<"reason">> := <<"{inaccessible_secret_file,enoent}">>},
emqx_utils_json:decode(Message)
).
hook_authenticate() ->
emqx_hooks:add('client.authenticate', {?MODULE, authenticate, [self()]}, ?HP_HIGHEST).
unhook_authenticate() ->
emqx_hooks:del('client.authenticate', {?MODULE, authenticate}).
authenticate(Credential, _, TestRunnerPid) ->
_ = TestRunnerPid ! {authenticate, Credential},
ignore.
%%------------------------------------------------------------------------------
t_mqtt_conn_bridge_ingress(_) -> t_mqtt_conn_bridge_ingress(_) ->
User1 = <<"user1">>,
%% create an MQTT bridge, using POST %% create an MQTT bridge, using POST
{ok, 201, Bridge} = request( {ok, 201, Bridge} = request(
post, post,
uri(["bridges"]), uri(["bridges"]),
ServerConf = ?SERVER_CONF(User1)#{ ServerConf = ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_INGRESS, <<"name">> => ?BRIDGE_NAME_INGRESS,
<<"ingress">> => ?INGRESS_CONF <<"ingress">> => ?INGRESS_CONF
} }
@ -249,7 +314,6 @@ t_mqtt_conn_bridge_ingress(_) ->
ok. ok.
t_mqtt_conn_bridge_ingress_full_context(_Config) -> t_mqtt_conn_bridge_ingress_full_context(_Config) ->
User1 = <<"user1">>,
IngressConf = IngressConf =
emqx_utils_maps:deep_merge( emqx_utils_maps:deep_merge(
?INGRESS_CONF, ?INGRESS_CONF,
@ -258,8 +322,7 @@ t_mqtt_conn_bridge_ingress_full_context(_Config) ->
{ok, 201, _Bridge} = request( {ok, 201, _Bridge} = request(
post, post,
uri(["bridges"]), uri(["bridges"]),
?SERVER_CONF(User1)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_INGRESS, <<"name">> => ?BRIDGE_NAME_INGRESS,
<<"ingress">> => IngressConf <<"ingress">> => IngressConf
} }
@ -297,8 +360,7 @@ t_mqtt_conn_bridge_ingress_shared_subscription(_) ->
Ns = lists:seq(1, 10), Ns = lists:seq(1, 10),
BridgeName = atom_to_binary(?FUNCTION_NAME), BridgeName = atom_to_binary(?FUNCTION_NAME),
BridgeID = create_bridge( BridgeID = create_bridge(
?SERVER_CONF(<<>>)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => BridgeName, <<"name">> => BridgeName,
<<"ingress">> => #{ <<"ingress">> => #{
<<"pool_size">> => PoolSize, <<"pool_size">> => PoolSize,
@ -337,8 +399,7 @@ t_mqtt_conn_bridge_ingress_shared_subscription(_) ->
t_mqtt_egress_bridge_ignores_clean_start(_) -> t_mqtt_egress_bridge_ignores_clean_start(_) ->
BridgeName = atom_to_binary(?FUNCTION_NAME), BridgeName = atom_to_binary(?FUNCTION_NAME),
BridgeID = create_bridge( BridgeID = create_bridge(
?SERVER_CONF(<<"user1">>)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => BridgeName, <<"name">> => BridgeName,
<<"egress">> => ?EGRESS_CONF, <<"egress">> => ?EGRESS_CONF,
<<"clean_start">> => false <<"clean_start">> => false
@ -366,8 +427,7 @@ t_mqtt_egress_bridge_ignores_clean_start(_) ->
t_mqtt_conn_bridge_ingress_downgrades_qos_2(_) -> t_mqtt_conn_bridge_ingress_downgrades_qos_2(_) ->
BridgeName = atom_to_binary(?FUNCTION_NAME), BridgeName = atom_to_binary(?FUNCTION_NAME),
BridgeID = create_bridge( BridgeID = create_bridge(
?SERVER_CONF(<<"user1">>)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => BridgeName, <<"name">> => BridgeName,
<<"ingress">> => emqx_utils_maps:deep_merge( <<"ingress">> => emqx_utils_maps:deep_merge(
?INGRESS_CONF, ?INGRESS_CONF,
@ -392,9 +452,8 @@ t_mqtt_conn_bridge_ingress_downgrades_qos_2(_) ->
ok. ok.
t_mqtt_conn_bridge_ingress_no_payload_template(_) -> t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
User1 = <<"user1">>,
BridgeIDIngress = create_bridge( BridgeIDIngress = create_bridge(
?SERVER_CONF(User1)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT, <<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_INGRESS, <<"name">> => ?BRIDGE_NAME_INGRESS,
<<"ingress">> => ?INGRESS_CONF_NO_PAYLOAD_TEMPLATE <<"ingress">> => ?INGRESS_CONF_NO_PAYLOAD_TEMPLATE
@ -428,10 +487,8 @@ t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
t_mqtt_conn_bridge_egress(_) -> t_mqtt_conn_bridge_egress(_) ->
%% then we add a mqtt connector, using POST %% then we add a mqtt connector, using POST
User1 = <<"user1">>,
BridgeIDEgress = create_bridge( BridgeIDEgress = create_bridge(
?SERVER_CONF(User1)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS, <<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF <<"egress">> => ?EGRESS_CONF
} }
@ -473,11 +530,8 @@ t_mqtt_conn_bridge_egress(_) ->
t_mqtt_conn_bridge_egress_no_payload_template(_) -> t_mqtt_conn_bridge_egress_no_payload_template(_) ->
%% then we add a mqtt connector, using POST %% then we add a mqtt connector, using POST
User1 = <<"user1">>,
BridgeIDEgress = create_bridge( BridgeIDEgress = create_bridge(
?SERVER_CONF(User1)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS, <<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE <<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE
} }
@ -520,11 +574,9 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
ok. ok.
t_egress_custom_clientid_prefix(_Config) -> t_egress_custom_clientid_prefix(_Config) ->
User1 = <<"user1">>,
BridgeIDEgress = create_bridge( BridgeIDEgress = create_bridge(
?SERVER_CONF(User1)#{ ?SERVER_CONF#{
<<"clientid_prefix">> => <<"my-custom-prefix">>, <<"clientid_prefix">> => <<"my-custom-prefix">>,
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS, <<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF <<"egress">> => ?EGRESS_CONF
} }
@ -545,17 +597,14 @@ t_egress_custom_clientid_prefix(_Config) ->
ok. ok.
t_mqtt_conn_bridge_ingress_and_egress(_) -> t_mqtt_conn_bridge_ingress_and_egress(_) ->
User1 = <<"user1">>,
BridgeIDIngress = create_bridge( BridgeIDIngress = create_bridge(
?SERVER_CONF(User1)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_INGRESS, <<"name">> => ?BRIDGE_NAME_INGRESS,
<<"ingress">> => ?INGRESS_CONF <<"ingress">> => ?INGRESS_CONF
} }
), ),
BridgeIDEgress = create_bridge( BridgeIDEgress = create_bridge(
?SERVER_CONF(User1)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS, <<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF <<"egress">> => ?EGRESS_CONF
} }
@ -627,8 +676,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
t_ingress_mqtt_bridge_with_rules(_) -> t_ingress_mqtt_bridge_with_rules(_) ->
BridgeIDIngress = create_bridge( BridgeIDIngress = create_bridge(
?SERVER_CONF(<<"user1">>)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_INGRESS, <<"name">> => ?BRIDGE_NAME_INGRESS,
<<"ingress">> => ?INGRESS_CONF <<"ingress">> => ?INGRESS_CONF
} }
@ -712,8 +760,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
t_egress_mqtt_bridge_with_rules(_) -> t_egress_mqtt_bridge_with_rules(_) ->
BridgeIDEgress = create_bridge( BridgeIDEgress = create_bridge(
?SERVER_CONF(<<"user1">>)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS, <<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF <<"egress">> => ?EGRESS_CONF
} }
@ -789,10 +836,8 @@ t_egress_mqtt_bridge_with_rules(_) ->
t_mqtt_conn_bridge_egress_reconnect(_) -> t_mqtt_conn_bridge_egress_reconnect(_) ->
%% then we add a mqtt connector, using POST %% then we add a mqtt connector, using POST
User1 = <<"user1">>,
BridgeIDEgress = create_bridge( BridgeIDEgress = create_bridge(
?SERVER_CONF(User1)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS, <<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF, <<"egress">> => ?EGRESS_CONF,
<<"resource_opts">> => #{ <<"resource_opts">> => #{
@ -897,10 +942,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
ok. ok.
t_mqtt_conn_bridge_egress_async_reconnect(_) -> t_mqtt_conn_bridge_egress_async_reconnect(_) ->
User1 = <<"user1">>,
BridgeIDEgress = create_bridge( BridgeIDEgress = create_bridge(
?SERVER_CONF(User1)#{ ?SERVER_CONF#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS, <<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF, <<"egress">> => ?EGRESS_CONF,
<<"resource_opts">> => #{ <<"resource_opts">> => #{
@ -1018,5 +1061,9 @@ request_bridge_metrics(BridgeID) ->
{ok, 200, BridgeMetrics} = request(get, uri(["bridges", BridgeID, "metrics"]), []), {ok, 200, BridgeMetrics} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
emqx_utils_json:decode(BridgeMetrics). emqx_utils_json:decode(BridgeMetrics).
request_json(Method, Url, Body) ->
{ok, Code, Response} = request(Method, Url, Body),
{ok, Code, emqx_utils_json:decode(Response)}.
request(Method, Url, Body) -> request(Method, Url, Body) ->
request(<<"connector_admin">>, Method, Url, Body). request(<<"connector_admin">>, Method, Url, Body).

View File

@ -0,0 +1 @@
from-there