diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index eb81c4b6e..ff5f0c2f2 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -326,7 +326,7 @@ mk_client_opts( ], Config ), - Options#{ + mk_client_opt_password(Options#{ hosts => [HostPort], clientid => clientid(ResourceId, ClientScope, Config), connect_timeout => 30, @@ -334,7 +334,13 @@ mk_client_opts( force_ping => true, ssl => EnableSsl, ssl_opts => maps:to_list(maps:remove(enable, Ssl)) - }. + }). + +mk_client_opt_password(Options = #{password := Secret}) -> + %% TODO: Teach `emqtt` to accept 0-arity closures as passwords. + Options#{password := emqx_secret:unwrap(Secret)}; +mk_client_opt_password(Options) -> + Options. ms_to_s(Ms) -> erlang:ceil(Ms / 1000). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl index 1dc3ca5f8..eb298c5ff 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl @@ -99,13 +99,9 @@ fields("server_configs") -> } )}, {password, - mk( - binary(), + emqx_schema_secret:mk( #{ - format => <<"password">>, - sensitive => true, - desc => ?DESC("password"), - converter => fun emqx_schema:password_converter/2 + desc => ?DESC("password") } )}, {clean_start, diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index bc0f2450a..6b5cc86da 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -21,13 +21,15 @@ -import(emqx_dashboard_api_test_helpers, [request/4, uri/1]). -include("emqx/include/emqx.hrl"). +-include("emqx/include/emqx_hooks.hrl"). +-include("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% output functions -export([inspect/3]). --define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(TYPE_MQTT, <<"mqtt">>). -define(BRIDGE_NAME_INGRESS, <<"ingress_mqtt_bridge">>). -define(BRIDGE_NAME_EGRESS, <<"egress_mqtt_bridge">>). @@ -38,14 +40,18 @@ -define(EGRESS_REMOTE_TOPIC, "egress_remote_topic"). -define(EGRESS_LOCAL_TOPIC, "egress_local_topic"). --define(SERVER_CONF(Username), #{ +-define(SERVER_CONF, #{ + <<"type">> => ?TYPE_MQTT, <<"server">> => <<"127.0.0.1:1883">>, - <<"username">> => Username, - <<"password">> => <<"">>, <<"proto_ver">> => <<"v4">>, <<"ssl">> => #{<<"enable">> => false} }). +-define(SERVER_CONF(Username, Password), (?SERVER_CONF)#{ + <<"username">> => Username, + <<"password">> => Password +}). + -define(INGRESS_CONF, #{ <<"remote">> => #{ <<"topic">> => <>, @@ -129,43 +135,32 @@ suite() -> [{timetrap, {seconds, 30}}]. init_per_suite(Config) -> - _ = application:load(emqx_conf), - ok = emqx_common_test_helpers:start_apps( + Apps = emqx_cth_suite:start( [ - emqx_rule_engine, + emqx_conf, emqx_bridge, + emqx_rule_engine, emqx_bridge_mqtt, - emqx_dashboard + {emqx_dashboard, + "dashboard {" + "\n listeners.http { bind = 18083 }" + "\n default_username = connector_admin" + "\n default_password = public" + "\n }"} ], - fun set_special_configs/1 + #{work_dir => emqx_cth_suite:work_dir(Config)} ), - ok = emqx_common_test_helpers:load_config( - emqx_rule_engine_schema, - <<"rule_engine {rules {}}">> - ), - ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT), - Config. + [{suite_apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([ - emqx_dashboard, - emqx_bridge_mqtt, - emqx_bridge, - emqx_rule_engine - ]), - ok. - -set_special_configs(emqx_dashboard) -> - emqx_dashboard_api_test_helpers:set_default_config(<<"connector_admin">>); -set_special_configs(_) -> - ok. +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(suite_apps, Config)). init_per_testcase(_, Config) -> - {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), ok = snabbkaffe:start_trace(), Config. end_per_testcase(_, _Config) -> + ok = unhook_authenticate(), clear_resources(), snabbkaffe:stop(), ok. @@ -187,14 +182,84 @@ clear_resources() -> %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ + +t_conf_bridge_authn_anonymous(_) -> + ok = hook_authenticate(), + {ok, 201, _Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF#{ + <<"name">> => <<"t_conf_bridge_anonymous">>, + <<"ingress">> => ?INGRESS_CONF#{<<"pool_size">> => 1} + } + ), + ?assertReceive( + {authenticate, #{username := undefined, password := undefined}} + ). + +t_conf_bridge_authn_password(_) -> + Username1 = <<"user1">>, + Password1 = <<"from-here">>, + ok = hook_authenticate(), + {ok, 201, _Bridge1} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(Username1, Password1)#{ + <<"name">> => <<"t_conf_bridge_authn_password">>, + <<"ingress">> => ?INGRESS_CONF#{<<"pool_size">> => 1} + } + ), + ?assertReceive( + {authenticate, #{username := Username1, password := Password1}} + ). + +t_conf_bridge_authn_passfile(Config) -> + DataDir = ?config(data_dir, Config), + Username2 = <<"user2">>, + PasswordFilename = filename:join(DataDir, "password"), + Password2 = <<"from-there">>, + ok = hook_authenticate(), + {ok, 201, _Bridge2} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(Username2, iolist_to_binary(["file://", PasswordFilename]))#{ + <<"name">> => <<"t_conf_bridge_authn_passfile">>, + <<"ingress">> => ?INGRESS_CONF#{<<"pool_size">> => 1} + } + ), + ?assertReceive( + {authenticate, #{username := Username2, password := Password2}} + ), + {ok, 400, #{<<"message">> := Message}} = request_json( + post, + uri(["bridges"]), + ?SERVER_CONF(<<>>, <<"file://im/pretty/sure/theres/no/such/file">>)#{ + <<"name">> => <<"t_conf_bridge_authn_no_passfile">> + } + ), + ?assertMatch( + #{<<"reason">> := <<"{inaccessible_secret_file,enoent}">>}, + emqx_utils_json:decode(Message) + ). + +hook_authenticate() -> + emqx_hooks:add('client.authenticate', {?MODULE, authenticate, [self()]}, ?HP_HIGHEST). + +unhook_authenticate() -> + emqx_hooks:del('client.authenticate', {?MODULE, authenticate}). + +authenticate(Credential, _, TestRunnerPid) -> + _ = TestRunnerPid ! {authenticate, Credential}, + ignore. + +%%------------------------------------------------------------------------------ + t_mqtt_conn_bridge_ingress(_) -> - User1 = <<"user1">>, %% create an MQTT bridge, using POST {ok, 201, Bridge} = request( post, uri(["bridges"]), - ServerConf = ?SERVER_CONF(User1)#{ - <<"type">> => ?TYPE_MQTT, + ServerConf = ?SERVER_CONF#{ <<"name">> => ?BRIDGE_NAME_INGRESS, <<"ingress">> => ?INGRESS_CONF } @@ -249,7 +314,6 @@ t_mqtt_conn_bridge_ingress(_) -> ok. t_mqtt_conn_bridge_ingress_full_context(_Config) -> - User1 = <<"user1">>, IngressConf = emqx_utils_maps:deep_merge( ?INGRESS_CONF, @@ -258,8 +322,7 @@ t_mqtt_conn_bridge_ingress_full_context(_Config) -> {ok, 201, _Bridge} = request( post, uri(["bridges"]), - ?SERVER_CONF(User1)#{ - <<"type">> => ?TYPE_MQTT, + ?SERVER_CONF#{ <<"name">> => ?BRIDGE_NAME_INGRESS, <<"ingress">> => IngressConf } @@ -297,8 +360,7 @@ t_mqtt_conn_bridge_ingress_shared_subscription(_) -> Ns = lists:seq(1, 10), BridgeName = atom_to_binary(?FUNCTION_NAME), BridgeID = create_bridge( - ?SERVER_CONF(<<>>)#{ - <<"type">> => ?TYPE_MQTT, + ?SERVER_CONF#{ <<"name">> => BridgeName, <<"ingress">> => #{ <<"pool_size">> => PoolSize, @@ -337,8 +399,7 @@ t_mqtt_conn_bridge_ingress_shared_subscription(_) -> t_mqtt_egress_bridge_ignores_clean_start(_) -> BridgeName = atom_to_binary(?FUNCTION_NAME), BridgeID = create_bridge( - ?SERVER_CONF(<<"user1">>)#{ - <<"type">> => ?TYPE_MQTT, + ?SERVER_CONF#{ <<"name">> => BridgeName, <<"egress">> => ?EGRESS_CONF, <<"clean_start">> => false @@ -366,8 +427,7 @@ t_mqtt_egress_bridge_ignores_clean_start(_) -> t_mqtt_conn_bridge_ingress_downgrades_qos_2(_) -> BridgeName = atom_to_binary(?FUNCTION_NAME), BridgeID = create_bridge( - ?SERVER_CONF(<<"user1">>)#{ - <<"type">> => ?TYPE_MQTT, + ?SERVER_CONF#{ <<"name">> => BridgeName, <<"ingress">> => emqx_utils_maps:deep_merge( ?INGRESS_CONF, @@ -392,9 +452,8 @@ t_mqtt_conn_bridge_ingress_downgrades_qos_2(_) -> ok. t_mqtt_conn_bridge_ingress_no_payload_template(_) -> - User1 = <<"user1">>, BridgeIDIngress = create_bridge( - ?SERVER_CONF(User1)#{ + ?SERVER_CONF#{ <<"type">> => ?TYPE_MQTT, <<"name">> => ?BRIDGE_NAME_INGRESS, <<"ingress">> => ?INGRESS_CONF_NO_PAYLOAD_TEMPLATE @@ -428,10 +487,8 @@ t_mqtt_conn_bridge_ingress_no_payload_template(_) -> t_mqtt_conn_bridge_egress(_) -> %% then we add a mqtt connector, using POST - User1 = <<"user1">>, BridgeIDEgress = create_bridge( - ?SERVER_CONF(User1)#{ - <<"type">> => ?TYPE_MQTT, + ?SERVER_CONF#{ <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF } @@ -473,11 +530,8 @@ t_mqtt_conn_bridge_egress(_) -> t_mqtt_conn_bridge_egress_no_payload_template(_) -> %% then we add a mqtt connector, using POST - User1 = <<"user1">>, - BridgeIDEgress = create_bridge( - ?SERVER_CONF(User1)#{ - <<"type">> => ?TYPE_MQTT, + ?SERVER_CONF#{ <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE } @@ -520,11 +574,9 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) -> ok. t_egress_custom_clientid_prefix(_Config) -> - User1 = <<"user1">>, BridgeIDEgress = create_bridge( - ?SERVER_CONF(User1)#{ + ?SERVER_CONF#{ <<"clientid_prefix">> => <<"my-custom-prefix">>, - <<"type">> => ?TYPE_MQTT, <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF } @@ -545,17 +597,14 @@ t_egress_custom_clientid_prefix(_Config) -> ok. t_mqtt_conn_bridge_ingress_and_egress(_) -> - User1 = <<"user1">>, BridgeIDIngress = create_bridge( - ?SERVER_CONF(User1)#{ - <<"type">> => ?TYPE_MQTT, + ?SERVER_CONF#{ <<"name">> => ?BRIDGE_NAME_INGRESS, <<"ingress">> => ?INGRESS_CONF } ), BridgeIDEgress = create_bridge( - ?SERVER_CONF(User1)#{ - <<"type">> => ?TYPE_MQTT, + ?SERVER_CONF#{ <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF } @@ -627,8 +676,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> t_ingress_mqtt_bridge_with_rules(_) -> BridgeIDIngress = create_bridge( - ?SERVER_CONF(<<"user1">>)#{ - <<"type">> => ?TYPE_MQTT, + ?SERVER_CONF#{ <<"name">> => ?BRIDGE_NAME_INGRESS, <<"ingress">> => ?INGRESS_CONF } @@ -712,8 +760,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> t_egress_mqtt_bridge_with_rules(_) -> BridgeIDEgress = create_bridge( - ?SERVER_CONF(<<"user1">>)#{ - <<"type">> => ?TYPE_MQTT, + ?SERVER_CONF#{ <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF } @@ -789,10 +836,8 @@ t_egress_mqtt_bridge_with_rules(_) -> t_mqtt_conn_bridge_egress_reconnect(_) -> %% then we add a mqtt connector, using POST - User1 = <<"user1">>, BridgeIDEgress = create_bridge( - ?SERVER_CONF(User1)#{ - <<"type">> => ?TYPE_MQTT, + ?SERVER_CONF#{ <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF, <<"resource_opts">> => #{ @@ -897,10 +942,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> ok. t_mqtt_conn_bridge_egress_async_reconnect(_) -> - User1 = <<"user1">>, BridgeIDEgress = create_bridge( - ?SERVER_CONF(User1)#{ - <<"type">> => ?TYPE_MQTT, + ?SERVER_CONF#{ <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF, <<"resource_opts">> => #{ @@ -1018,5 +1061,9 @@ request_bridge_metrics(BridgeID) -> {ok, 200, BridgeMetrics} = request(get, uri(["bridges", BridgeID, "metrics"]), []), emqx_utils_json:decode(BridgeMetrics). +request_json(Method, Url, Body) -> + {ok, Code, Response} = request(Method, Url, Body), + {ok, Code, emqx_utils_json:decode(Response)}. + request(Method, Url, Body) -> request(<<"connector_admin">>, Method, Url, Body). diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE_data/password b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE_data/password new file mode 100644 index 000000000..d68418fda --- /dev/null +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE_data/password @@ -0,0 +1 @@ +from-there