chore(gw): add ssl feilds for exproto

This commit is contained in:
JianBo He 2021-10-12 19:02:54 +08:00
parent b637764095
commit fc2e358f01
3 changed files with 66 additions and 26 deletions

View File

@ -109,12 +109,24 @@ fields(exproto) ->
fields(exproto_grpc_server) ->
[ {bind, sc(hoconsc:union([ip_port(), integer()]))}
%% TODO: ssl options
, {ssl, sc_meta(ref(simple_ssl_options),
#{nullable => {true, recursively}})}
];
fields(exproto_grpc_handler) ->
[ {address, sc(binary())}
%% TODO: ssl
, {ssl, sc_meta(ref(simple_ssl_options),
#{nullable => {true, recursively}})}
];
fields(simple_ssl_options) ->
[ {cacertfile, sc_meta(string(), #{nullable => true})}
, {certfile, sc_meta(string(), #{nullable => true})}
, {keyfile, sc_meta(string(), #{nullable => true})}
, {verify, sc(hoconsc:enum([verify_peer, verify_none]), verify_none)}
, {depth, sc(integer(), 10)}
, {password, sc_meta(string(), #{sensitive => true, nullable => true})}
%% XXX: More confs ???
];
fields(clientinfo_override) ->

View File

@ -37,6 +37,7 @@
]).
-export([ stringfy/1
, parse_address/1
]).
-export([ normalize_config/1
@ -182,6 +183,19 @@ stringfy(T) when is_list(T); is_binary(T) ->
stringfy(T) ->
iolist_to_binary(io_lib:format("~0p", [T])).
-spec parse_address(binary()|list()) -> {list(), integer()}.
parse_address(S) when is_binary(S); is_list(S) ->
S1 = case is_binary(S) of
true -> lists:reverse(binary_to_list(S));
_ -> lists:reverse(S)
end,
case re:split(S1, ":", [{parts, 2}, {return, list}]) of
[Port0, Host0] ->
{lists:reverse(Host0), list_to_integer(lists:reverse(Port0))};
_ ->
error(badarg)
end.
-spec normalize_config(emqx_config:config())
-> list({ Type :: udp | tcp | ssl | dtls
, Name :: atom()

View File

@ -62,26 +62,34 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
_ = grpc:start_server(GwName, ListenOn, Services, SvrOptions),
?ULOG("Start ~ts gRPC server on ~p successfully.~n", [GwName, ListenOn]).
start_grpc_client_channel(_GwType, undefined) ->
stop_grpc_server(GwName) ->
_ = grpc:stop_server(GwName),
?ULOG("Stop ~s gRPC server successfully.~n", [GwName]).
start_grpc_client_channel(_GwName, undefined) ->
undefined;
start_grpc_client_channel(GwName, Options = #{address := UriStr}) ->
UriMap = uri_string:parse(UriStr),
Scheme = maps:get(scheme, UriMap),
Host = maps:get(host, UriMap),
Port = maps:get(port, UriMap),
SvrAddr = lists:flatten(
io_lib:format(
"~ts://~ts:~w", [Scheme, Host, Port])
),
ClientOpts = case Scheme of
"https" ->
SslOpts = maps:to_list(maps:get(ssl, Options, #{})),
#{gun_opts =>
start_grpc_client_channel(GwName, Options = #{address := Address}) ->
{Host, Port} = emqx_gateway_utils:parse_address(Address),
case maps:to_list(maps:get(ssl, Options, #{})) of
[] ->
SvrAddr = compose_http_uri(http, Host, Port),
grpc_client_sup:create_channel_pool(GwName, SvrAddr, #{});
SslOpts ->
ClientOpts = #{gun_opts =>
#{transport => ssl,
transport_opts => SslOpts}};
_ -> #{}
end,
grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts).
transport_opts => SslOpts}},
SvrAddr = compose_http_uri(https, Host, Port),
grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts)
end.
compose_http_uri(Scheme, Host, Port) ->
lists:flatten(
io_lib:format(
"~s://~s:~w", [Scheme, Host, Port])).
stop_grpc_client_channel(GwName) ->
_ = grpc_client_sup:stop_channel_pool(GwName),
ok.
on_gateway_load(_Gateway = #{ name := GwName,
config := Config
@ -90,10 +98,12 @@ on_gateway_load(_Gateway = #{ name := GwName,
%% Start grpc client pool & client channel
PoolName = pool_name(GwName),
PoolSize = emqx_vm:schedulers() * 2,
{ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize,
{ok, PoolSup} = emqx_pool_sup:start_link(
PoolName, hash, PoolSize,
{emqx_exproto_gcli, start_link, []}),
_ = start_grpc_client_channel(GwName, maps:get(handler, Config, undefined)),
_ = start_grpc_client_channel(GwName,
maps:get(handler, Config, undefined)
),
%% XXX: How to monitor it ?
_ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
@ -107,7 +117,7 @@ on_gateway_load(_Gateway = #{ name := GwName,
ListenerPids = lists:map(fun(Lis) ->
start_listener(GwName, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
{ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}}.
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
GwName = maps:get(name, Gateway),
@ -126,8 +136,12 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
on_gateway_unload(_Gateway = #{ name := GwName,
config := Config
}, _GwState) ->
}, _GwState = #{pool := PoolSup}) ->
Listeners = emqx_gateway_utils:normalize_config(Config),
%% Stop funcs???
exit(PoolSup, kill),
stop_grpc_server(GwName),
stop_grpc_client_channel(GwName),
lists:foreach(fun(Lis) ->
stop_listener(GwName, Lis)
end, Listeners).