diff --git a/etc/emqx.conf b/etc/emqx.conf index a88e7a33e..788c11c3f 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1672,21 +1672,25 @@ bridge.aws.mqueue_type = memory ## Value: Number bridge.aws.max_pending_messages = 10000 +## Bribge to remote server via SSL. +## +## Value: on | off +bridge.aws.ssl = off ## PEM-encoded CA certificates of the bridge. ## ## Value: File -## bridge.aws.cacertfile = cacert.pem +## bridge.aws.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem ## SSL Certfile of the bridge. ## ## Value: File -## bridge.aws.certfile = cert.pem +## bridge.aws.certfile = {{ platform_etc_dir }}/certs/client-cert.pem ## SSL Keyfile of the bridge. ## ## Value: File -## bridge.aws.keyfile = key.pem +## bridge.aws.keyfile = {{ platform_etc_dir }}/certs/client-key.pem ## SSL Ciphers used by the bridge. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index d09937bb2..fc89586ba 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1552,6 +1552,11 @@ end}. {datatype, string} ]}. +{mapping, "bridge.$name.ssl", "emqx.bridges", [ + {datatype, flag}, + {default, off} +]}. + {mapping, "bridge.$name.cacertfile", "emqx.bridges", [ {datatype, string} ]}. @@ -1575,11 +1580,12 @@ end}. {mapping, "bridge.$name.keepalive", "emqx.bridges", [ {default, "10s"}, - {datatype, {duration, s}} + {datatype, {duration, ms}} ]}. {mapping, "bridge.$name.tls_versions", "emqx.bridges", [ - {datatype, string} + {datatype, string}, + {default, "tlsv1,tlsv1.1,tlsv1.2"} ]}. {mapping, "bridge.$name.subscription.$id.topic", "emqx.bridges", [ @@ -1597,12 +1603,11 @@ end}. {mapping, "bridge.$name.reconnect_interval", "emqx.bridges", [ {default, "30s"}, - {datatype, {duration, s}} + {datatype, {duration, ms}} ]}. {translation, "emqx.bridges", fun(Conf) -> - Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, IsSsl = fun(cacertfile) -> true; @@ -1625,11 +1630,12 @@ end}. case IsSsl(Opt) of true -> SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])], - lists:ukeymerge(1, [{ssl_opts, SslOpts}], Opts); + lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts)); false -> [{Opt, Val}|Opts] end end, + Subscriptions = fun(Name) -> Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf), lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])], @@ -1639,11 +1645,10 @@ end}. maps:to_list( lists:foldl( fun({["bridge", Name, Opt], Val}, Acc) -> + %% e.g #{aws => [{OptKey, OptVal}]} + Init = [{list_to_atom(Opt), Val},{subscriptions, Subscriptions(Name)}], maps:update_with(list_to_atom(Name), - fun(Opts) -> - Merge(list_to_atom(Opt), Val, Opts) - end, [{list_to_atom(Opt), Val}, - {subscriptions, Subscriptions(Name)}], Acc); + fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc); (_, Acc) -> Acc end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf)))) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index a4dc840cc..d335d8655 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -199,7 +199,8 @@ handle_info(start, State = #state{options = Options, [emqx_client:subscribe(ClientPid, {Topic, Qos}) || {Topic, Qos} <- Subs], [emqx_broker:subscribe(Topic) || Topic <- Forwards], {noreply, State#state{client_pid = ClientPid, subscriptions = Subs, forwards = Forwards}}; - {error,_} -> + {error, Reason} -> + logger:error("[Bridge] start failed! error: ~p", [Reason]), erlang:send_after(ReconnectInterval, self(), start), {noreply, State} end; @@ -285,6 +286,10 @@ options([{clean_start, CleanStart}| Options], Acc) -> options([{address, Address}| Options], Acc) -> {Host, Port} = address(Address), options(Options, [{host, Host}, {port, Port}|Acc]); +options([{ssl, Ssl}| Options], Acc) -> + options(Options, [{ssl, Ssl}|Acc]); +options([{ssl_opts, SslOpts}| Options], Acc) -> + options(Options, [{ssl_opts, SslOpts}|Acc]); options([_Option | Options], Acc) -> options(Options, Acc). diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 3825583bc..e4077e4d9 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -538,14 +538,24 @@ init([{hosts, Hosts} | Opts], State) -> init(Opts, State#state{hosts = Hosts1}); init([{tcp_opts, TcpOpts} | Opts], State = #state{sock_opts = SockOpts}) -> init(Opts, State#state{sock_opts = emqx_misc:merge_opts(SockOpts, TcpOpts)}); -init([ssl | Opts], State = #state{sock_opts = SockOpts}) -> - ok = ssl:start(), - SockOpts1 = emqx_misc:merge_opts([{ssl_opts, []}], SockOpts), - init(Opts, State#state{sock_opts = SockOpts1}); +init([{ssl, EnableSsl} | Opts], State) -> + case lists:keytake(ssl_opts, 1, Opts) of + {value, SslOpts, WithOutSslOpts} -> + init([SslOpts, {ssl, EnableSsl}| WithOutSslOpts], State); + false -> + init([{ssl_opts, []}, {ssl, EnableSsl}| Opts], State) + end; init([{ssl_opts, SslOpts} | Opts], State = #state{sock_opts = SockOpts}) -> - ok = ssl:start(), - SockOpts1 = emqx_misc:merge_opts(SockOpts, [{ssl_opts, SslOpts}]), - init(Opts, State#state{sock_opts = SockOpts1}); + case lists:keytake(ssl, 1, Opts) of + {value, {ssl, true}, WithOutEnableSsl} -> + ok = ssl:start(), + SockOpts1 = emqx_misc:merge_opts(SockOpts, [{ssl_opts, SslOpts}]), + init(WithOutEnableSsl, State#state{sock_opts = SockOpts1}); + {value, {ssl, false}, WithOutEnableSsl} -> + init(WithOutEnableSsl, State); + false -> + init(Opts, State) + end; init([{client_id, ClientId} | Opts], State) -> init(Opts, State#state{client_id = iolist_to_binary(ClientId)}); init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) -> diff --git a/src/emqx_client_sock.erl b/src/emqx_client_sock.erl index dc19a8d91..505454e2d 100644 --- a/src/emqx_client_sock.erl +++ b/src/emqx_client_sock.erl @@ -49,7 +49,10 @@ connect(Host, Port, SockOpts, Timeout) -> end. ssl_upgrade(Sock, SslOpts, Timeout) -> - case ssl:connect(Sock, SslOpts, Timeout) of + TlsVersions = proplists:get_value(versions, SslOpts, []), + Ciphers = proplists:get_value(ciphers, SslOpts, default_ciphers(TlsVersions)), + SslOpts2 = emqx_misc:merge_opts(SslOpts, [{ciphers, Ciphers}]), + case ssl:connect(Sock, SslOpts2, Timeout) of {ok, SslSock} -> ok = ssl:controlling_process(SslSock, self()), {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; @@ -91,3 +94,8 @@ sockname(Sock) when is_port(Sock) -> sockname(#ssl_socket{ssl = SslSock}) -> ssl:sockname(SslSock). +default_ciphers(TlsVersions) -> + lists:foldl( + fun(TlsVer, Ciphers) -> + Ciphers ++ ssl:cipher_suites(all, TlsVer) + end, [], TlsVersions).