226 lines
8.5 KiB
Erlang
226 lines
8.5 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc The ExProto Gateway Implement interface
|
|
-module(emqx_exproto_impl).
|
|
|
|
-behaviour(emqx_gateway_impl).
|
|
|
|
%% APIs
|
|
-export([ reg/0
|
|
, unreg/0
|
|
]).
|
|
|
|
-export([ on_gateway_load/2
|
|
, on_gateway_update/3
|
|
, on_gateway_unload/2
|
|
]).
|
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
reg() ->
|
|
RegistryOptions = [ {cbkmod, ?MODULE}
|
|
],
|
|
emqx_gateway_registry:reg(exproto, RegistryOptions).
|
|
|
|
unreg() ->
|
|
emqx_gateway_registry:unreg(exproto).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% emqx_gateway_registry callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
start_grpc_server(_GwName, undefined) ->
|
|
undefined;
|
|
start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
|
|
Services = #{protos => [emqx_exproto_pb],
|
|
services => #{
|
|
'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr}
|
|
},
|
|
SvrOptions = case maps:to_list(maps:get(ssl, Options, #{})) of
|
|
[] -> [];
|
|
SslOpts ->
|
|
[{ssl_options, SslOpts}]
|
|
end,
|
|
_ = grpc:start_server(GwName, ListenOn, Services, SvrOptions),
|
|
?ULOG("Start ~ts gRPC server on ~p successfully.~n", [GwName, ListenOn]).
|
|
|
|
stop_grpc_server(GwName) ->
|
|
_ = grpc:stop_server(GwName),
|
|
?ULOG("Stop ~s gRPC server successfully.~n", [GwName]).
|
|
|
|
start_grpc_client_channel(_GwName, undefined) ->
|
|
undefined;
|
|
start_grpc_client_channel(GwName, Options = #{address := Address}) ->
|
|
{Host, Port} = emqx_gateway_utils:parse_address(Address),
|
|
case maps:to_list(maps:get(ssl, Options, #{})) of
|
|
[] ->
|
|
SvrAddr = compose_http_uri(http, Host, Port),
|
|
grpc_client_sup:create_channel_pool(GwName, SvrAddr, #{});
|
|
SslOpts ->
|
|
ClientOpts = #{gun_opts =>
|
|
#{transport => ssl,
|
|
transport_opts => SslOpts}},
|
|
SvrAddr = compose_http_uri(https, Host, Port),
|
|
grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts)
|
|
end.
|
|
|
|
compose_http_uri(Scheme, Host, Port) ->
|
|
lists:flatten(
|
|
io_lib:format(
|
|
"~s://~s:~w", [Scheme, Host, Port])).
|
|
|
|
stop_grpc_client_channel(GwName) ->
|
|
_ = grpc_client_sup:stop_channel_pool(GwName),
|
|
ok.
|
|
|
|
on_gateway_load(_Gateway = #{ name := GwName,
|
|
config := Config
|
|
}, Ctx) ->
|
|
%% XXX: How to monitor it ?
|
|
%% Start grpc client pool & client channel
|
|
PoolName = pool_name(GwName),
|
|
PoolSize = emqx_vm:schedulers() * 2,
|
|
{ok, PoolSup} = emqx_pool_sup:start_link(
|
|
PoolName, hash, PoolSize,
|
|
{emqx_exproto_gcli, start_link, []}),
|
|
_ = start_grpc_client_channel(GwName,
|
|
maps:get(handler, Config, undefined)
|
|
),
|
|
%% XXX: How to monitor it ?
|
|
_ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
|
|
|
|
NConfig = maps:without(
|
|
[server, handler],
|
|
Config#{pool_name => PoolName}
|
|
),
|
|
Listeners = emqx_gateway_utils:normalize_config(
|
|
NConfig#{handler => GwName}
|
|
),
|
|
ListenerPids = lists:map(fun(Lis) ->
|
|
start_listener(GwName, Ctx, Lis)
|
|
end, Listeners),
|
|
{ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}}.
|
|
|
|
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
|
GwName = maps:get(name, Gateway),
|
|
try
|
|
%% XXX: 1. How hot-upgrade the changes ???
|
|
%% XXX: 2. Check the New confs first before destroy old instance ???
|
|
on_gateway_unload(Gateway, GwState),
|
|
on_gateway_load(Gateway#{config => Config}, Ctx)
|
|
catch
|
|
Class : Reason : Stk ->
|
|
logger:error("Failed to update ~ts; "
|
|
"reason: {~0p, ~0p} stacktrace: ~0p",
|
|
[GwName, Class, Reason, Stk]),
|
|
{error, {Class, Reason}}
|
|
end.
|
|
|
|
on_gateway_unload(_Gateway = #{ name := GwName,
|
|
config := Config
|
|
}, _GwState = #{pool := PoolSup}) ->
|
|
Listeners = emqx_gateway_utils:normalize_config(Config),
|
|
%% Stop funcs???
|
|
exit(PoolSup, kill),
|
|
stop_grpc_server(GwName),
|
|
stop_grpc_client_channel(GwName),
|
|
lists:foreach(fun(Lis) ->
|
|
stop_listener(GwName, Lis)
|
|
end, Listeners).
|
|
|
|
pool_name(GwName) ->
|
|
list_to_atom(lists:concat([GwName, "_gcli_pool"])).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal funcs
|
|
%%--------------------------------------------------------------------
|
|
|
|
start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
|
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
|
case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
|
|
{ok, Pid} ->
|
|
?ULOG("Gateway ~ts:~ts:~ts on ~ts started.~n",
|
|
[GwName, Type, LisName, ListenOnStr]),
|
|
Pid;
|
|
{error, Reason} ->
|
|
?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
|
[GwName, Type, LisName, ListenOnStr, Reason]),
|
|
throw({badconf, Reason})
|
|
end.
|
|
|
|
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
|
|
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
|
NCfg = Cfg#{
|
|
ctx => Ctx,
|
|
listener => {GwName, Type, LisName},
|
|
frame_mod => emqx_exproto_frame,
|
|
chann_mod => emqx_exproto_channel
|
|
},
|
|
MFA = {emqx_gateway_conn, start_link, [NCfg]},
|
|
NSockOpts = merge_default_by_type(Type, SocketOpts),
|
|
do_start_listener(Type, Name, ListenOn, NSockOpts, MFA).
|
|
|
|
do_start_listener(Type, Name, ListenOn, Opts, MFA)
|
|
when Type == tcp;
|
|
Type == ssl ->
|
|
esockd:open(Name, ListenOn, Opts, MFA);
|
|
do_start_listener(udp, Name, ListenOn, Opts, MFA) ->
|
|
esockd:open_udp(Name, ListenOn, Opts, MFA);
|
|
do_start_listener(dtls, Name, ListenOn, Opts, MFA) ->
|
|
esockd:open_dtls(Name, ListenOn, Opts, MFA).
|
|
|
|
merge_default_by_type(Type, Options) when Type =:= tcp;
|
|
Type =:= ssl ->
|
|
Default = emqx_gateway_utils:default_tcp_options(),
|
|
case lists:keytake(tcp_options, 1, Options) of
|
|
{value, {tcp_options, TcpOpts}, Options1} ->
|
|
[{tcp_options, emqx_misc:merge_opts(Default, TcpOpts)}
|
|
| Options1];
|
|
false ->
|
|
[{tcp_options, Default} | Options]
|
|
end;
|
|
merge_default_by_type(Type, Options) when Type =:= udp;
|
|
Type =:= dtls ->
|
|
Default = emqx_gateway_utils:default_udp_options(),
|
|
case lists:keytake(udp_options, 1, Options) of
|
|
{value, {udp_options, TcpOpts}, Options1} ->
|
|
[{udp_options, emqx_misc:merge_opts(Default, TcpOpts)}
|
|
| Options1];
|
|
false ->
|
|
[{udp_options, Default} | Options]
|
|
end.
|
|
|
|
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
|
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
|
|
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
|
case StopRet of
|
|
ok -> ?ULOG("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
|
|
[GwName, Type, LisName, ListenOnStr]);
|
|
{error, Reason} ->
|
|
?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
|
[GwName, Type, LisName, ListenOnStr, Reason])
|
|
end,
|
|
StopRet.
|
|
|
|
stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
|
|
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
|
esockd:close(Name, ListenOn).
|