diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index c35fc45de..64cf6d331 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -109,12 +109,24 @@ fields(exproto) -> fields(exproto_grpc_server) -> [ {bind, sc(hoconsc:union([ip_port(), integer()]))} - %% TODO: ssl options + , {ssl, sc_meta(ref(simple_ssl_options), + #{nullable => {true, recursively}})} ]; fields(exproto_grpc_handler) -> [ {address, sc(binary())} - %% TODO: ssl + , {ssl, sc_meta(ref(simple_ssl_options), + #{nullable => {true, recursively}})} + ]; + +fields(simple_ssl_options) -> + [ {cacertfile, sc_meta(string(), #{nullable => true})} + , {certfile, sc_meta(string(), #{nullable => true})} + , {keyfile, sc_meta(string(), #{nullable => true})} + , {verify, sc(hoconsc:enum([verify_peer, verify_none]), verify_none)} + , {depth, sc(integer(), 10)} + , {password, sc_meta(string(), #{sensitive => true, nullable => true})} + %% XXX: More confs ??? ]; fields(clientinfo_override) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 91168eb70..a497e11d0 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -37,6 +37,7 @@ ]). -export([ stringfy/1 + , parse_address/1 ]). -export([ normalize_config/1 @@ -182,6 +183,19 @@ stringfy(T) when is_list(T); is_binary(T) -> stringfy(T) -> iolist_to_binary(io_lib:format("~0p", [T])). +-spec parse_address(binary()|list()) -> {list(), integer()}. +parse_address(S) when is_binary(S); is_list(S) -> + S1 = case is_binary(S) of + true -> lists:reverse(binary_to_list(S)); + _ -> lists:reverse(S) + end, + case re:split(S1, ":", [{parts, 2}, {return, list}]) of + [Port0, Host0] -> + {lists:reverse(Host0), list_to_integer(lists:reverse(Port0))}; + _ -> + error(badarg) + end. + -spec normalize_config(emqx_config:config()) -> list({ Type :: udp | tcp | ssl | dtls , Name :: atom() diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index 07207fb87..92169d67c 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -62,26 +62,34 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) -> _ = grpc:start_server(GwName, ListenOn, Services, SvrOptions), ?ULOG("Start ~ts gRPC server on ~p successfully.~n", [GwName, ListenOn]). -start_grpc_client_channel(_GwType, undefined) -> +stop_grpc_server(GwName) -> + _ = grpc:stop_server(GwName), + ?ULOG("Stop ~s gRPC server successfully.~n", [GwName]). + +start_grpc_client_channel(_GwName, undefined) -> undefined; -start_grpc_client_channel(GwName, Options = #{address := UriStr}) -> - UriMap = uri_string:parse(UriStr), - Scheme = maps:get(scheme, UriMap), - Host = maps:get(host, UriMap), - Port = maps:get(port, UriMap), - SvrAddr = lists:flatten( - io_lib:format( - "~ts://~ts:~w", [Scheme, Host, Port]) - ), - ClientOpts = case Scheme of - "https" -> - SslOpts = maps:to_list(maps:get(ssl, Options, #{})), - #{gun_opts => +start_grpc_client_channel(GwName, Options = #{address := Address}) -> + {Host, Port} = emqx_gateway_utils:parse_address(Address), + case maps:to_list(maps:get(ssl, Options, #{})) of + [] -> + SvrAddr = compose_http_uri(http, Host, Port), + grpc_client_sup:create_channel_pool(GwName, SvrAddr, #{}); + SslOpts -> + ClientOpts = #{gun_opts => #{transport => ssl, - transport_opts => SslOpts}}; - _ -> #{} - end, - grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts). + transport_opts => SslOpts}}, + SvrAddr = compose_http_uri(https, Host, Port), + grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts) + end. + +compose_http_uri(Scheme, Host, Port) -> + lists:flatten( + io_lib:format( + "~s://~s:~w", [Scheme, Host, Port])). + +stop_grpc_client_channel(GwName) -> + _ = grpc_client_sup:stop_channel_pool(GwName), + ok. on_gateway_load(_Gateway = #{ name := GwName, config := Config @@ -90,10 +98,12 @@ on_gateway_load(_Gateway = #{ name := GwName, %% Start grpc client pool & client channel PoolName = pool_name(GwName), PoolSize = emqx_vm:schedulers() * 2, - {ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize, - {emqx_exproto_gcli, start_link, []}), - _ = start_grpc_client_channel(GwName, maps:get(handler, Config, undefined)), - + {ok, PoolSup} = emqx_pool_sup:start_link( + PoolName, hash, PoolSize, + {emqx_exproto_gcli, start_link, []}), + _ = start_grpc_client_channel(GwName, + maps:get(handler, Config, undefined) + ), %% XXX: How to monitor it ? _ = start_grpc_server(GwName, maps:get(server, Config, undefined)), @@ -107,7 +117,7 @@ on_gateway_load(_Gateway = #{ name := GwName, ListenerPids = lists:map(fun(Lis) -> start_listener(GwName, Ctx, Lis) end, Listeners), - {ok, ListenerPids, _GwState = #{ctx => Ctx}}. + {ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}}. on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> GwName = maps:get(name, Gateway), @@ -126,8 +136,12 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> on_gateway_unload(_Gateway = #{ name := GwName, config := Config - }, _GwState) -> + }, _GwState = #{pool := PoolSup}) -> Listeners = emqx_gateway_utils:normalize_config(Config), + %% Stop funcs??? + exit(PoolSup, kill), + stop_grpc_server(GwName), + stop_grpc_client_channel(GwName), lists:foreach(fun(Lis) -> stop_listener(GwName, Lis) end, Listeners).