chore(gw): delete needless files
This commit is contained in:
parent
fd828ad216
commit
dbd78b83b1
|
@ -1,187 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_exproto).
|
|
||||||
|
|
||||||
-include("src/exproto/include/emqx_exproto.hrl").
|
|
||||||
|
|
||||||
-export([ start_listeners/0
|
|
||||||
, stop_listeners/0
|
|
||||||
, start_listener/1
|
|
||||||
, start_listener/4
|
|
||||||
, stop_listener/4
|
|
||||||
, stop_listener/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([ start_servers/0
|
|
||||||
, stop_servers/0
|
|
||||||
, start_server/1
|
|
||||||
, stop_server/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% APIs
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-spec(start_listeners() -> ok).
|
|
||||||
start_listeners() ->
|
|
||||||
Listeners = application:get_env(?APP, listeners, []),
|
|
||||||
NListeners = [start_connection_handler_instance(Listener)
|
|
||||||
|| Listener <- Listeners],
|
|
||||||
lists:foreach(fun start_listener/1, NListeners).
|
|
||||||
|
|
||||||
-spec(stop_listeners() -> ok).
|
|
||||||
stop_listeners() ->
|
|
||||||
Listeners = application:get_env(?APP, listeners, []),
|
|
||||||
lists:foreach(fun stop_connection_handler_instance/1, Listeners),
|
|
||||||
lists:foreach(fun stop_listener/1, Listeners).
|
|
||||||
|
|
||||||
-spec(start_servers() -> ok).
|
|
||||||
start_servers() ->
|
|
||||||
lists:foreach(fun start_server/1, application:get_env(?APP, servers, [])).
|
|
||||||
|
|
||||||
-spec(stop_servers() -> ok).
|
|
||||||
stop_servers() ->
|
|
||||||
lists:foreach(fun stop_server/1, application:get_env(?APP, servers, [])).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal functions
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
start_connection_handler_instance({_Proto, _LisType, _ListenOn, Opts}) ->
|
|
||||||
Name = name(_Proto, _LisType),
|
|
||||||
{value, {_, HandlerOpts}, LisOpts} = lists:keytake(handler, 1, Opts),
|
|
||||||
{SvrAddr, ChannelOptions} = handler_opts(HandlerOpts),
|
|
||||||
case emqx_exproto_sup:start_grpc_client_channel(Name, SvrAddr, ChannelOptions) of
|
|
||||||
{ok, _ClientChannelPid} ->
|
|
||||||
{_Proto, _LisType, _ListenOn, [{handler, Name} | LisOpts]};
|
|
||||||
{error, Reason} ->
|
|
||||||
io:format(standard_error, "Failed to start ~s's connection handler: ~0p~n",
|
|
||||||
[Name, Reason]),
|
|
||||||
error(Reason)
|
|
||||||
end.
|
|
||||||
|
|
||||||
stop_connection_handler_instance({_Proto, _LisType, _ListenOn, _Opts}) ->
|
|
||||||
Name = name(_Proto, _LisType),
|
|
||||||
_ = emqx_exproto_sup:stop_grpc_client_channel(Name),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
start_server({Name, Port, SSLOptions}) ->
|
|
||||||
case emqx_exproto_sup:start_grpc_server(Name, Port, SSLOptions) of
|
|
||||||
{ok, _} ->
|
|
||||||
io:format("Start ~s gRPC server on ~w successfully.~n",
|
|
||||||
[Name, Port]);
|
|
||||||
{error, Reason} ->
|
|
||||||
io:format(standard_error, "Failed to start ~s gRPC server on ~w: ~0p~n",
|
|
||||||
[Name, Port, Reason]),
|
|
||||||
error({failed_start_server, Reason})
|
|
||||||
end.
|
|
||||||
|
|
||||||
stop_server({Name, Port, _SSLOptions}) ->
|
|
||||||
ok = emqx_exproto_sup:stop_grpc_server(Name),
|
|
||||||
io:format("Stop ~s gRPC server on ~w successfully.~n", [Name, Port]).
|
|
||||||
|
|
||||||
start_listener({Proto, LisType, ListenOn, Opts}) ->
|
|
||||||
Name = name(Proto, LisType),
|
|
||||||
case start_listener(LisType, Name, ListenOn, Opts) of
|
|
||||||
{ok, _} ->
|
|
||||||
io:format("Start ~s listener on ~s successfully.~n",
|
|
||||||
[Name, format(ListenOn)]);
|
|
||||||
{error, Reason} ->
|
|
||||||
io:format(standard_error, "Failed to start ~s listener on ~s: ~0p~n",
|
|
||||||
[Name, format(ListenOn), Reason]),
|
|
||||||
error(Reason)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
start_listener(LisType, Name, ListenOn, LisOpts)
|
|
||||||
when LisType =:= tcp;
|
|
||||||
LisType =:= ssl ->
|
|
||||||
SockOpts = esockd:parse_opt(LisOpts),
|
|
||||||
esockd:open(Name, ListenOn, merge_tcp_default(SockOpts),
|
|
||||||
{emqx_exproto_conn, start_link, [LisOpts-- SockOpts]});
|
|
||||||
|
|
||||||
start_listener(udp, Name, ListenOn, LisOpts) ->
|
|
||||||
SockOpts = esockd:parse_opt(LisOpts),
|
|
||||||
esockd:open_udp(Name, ListenOn, merge_udp_default(SockOpts),
|
|
||||||
{emqx_exproto_conn, start_link, [LisOpts-- SockOpts]});
|
|
||||||
|
|
||||||
start_listener(dtls, Name, ListenOn, LisOpts) ->
|
|
||||||
SockOpts = esockd:parse_opt(LisOpts),
|
|
||||||
esockd:open_dtls(Name, ListenOn, merge_udp_default(SockOpts),
|
|
||||||
{emqx_exproto_conn, start_link, [LisOpts-- SockOpts]}).
|
|
||||||
|
|
||||||
stop_listener({Proto, LisType, ListenOn, Opts}) ->
|
|
||||||
Name = name(Proto, LisType),
|
|
||||||
StopRet = stop_listener(LisType, Name, ListenOn, Opts),
|
|
||||||
case StopRet of
|
|
||||||
ok ->
|
|
||||||
io:format("Stop ~s listener on ~s successfully.~n",
|
|
||||||
[Name, format(ListenOn)]);
|
|
||||||
{error, Reason} ->
|
|
||||||
io:format(standard_error, "Failed to stop ~s listener on ~s: ~0p~n",
|
|
||||||
[Name, format(ListenOn), Reason])
|
|
||||||
end,
|
|
||||||
StopRet.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
stop_listener(_LisType, Name, ListenOn, _Opts) ->
|
|
||||||
esockd:close(Name, ListenOn).
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
name(Proto, LisType) ->
|
|
||||||
list_to_atom(lists:flatten(io_lib:format("~s:~s", [Proto, LisType]))).
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
format(Port) when is_integer(Port) ->
|
|
||||||
io_lib:format("0.0.0.0:~w", [Port]);
|
|
||||||
format({Addr, Port}) when is_list(Addr) ->
|
|
||||||
io_lib:format("~s:~w", [Addr, Port]);
|
|
||||||
format({Addr, Port}) when is_tuple(Addr) ->
|
|
||||||
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
merge_tcp_default(Opts) ->
|
|
||||||
case lists:keytake(tcp_options, 1, Opts) of
|
|
||||||
{value, {tcp_options, TcpOpts}, Opts1} ->
|
|
||||||
[{tcp_options, emqx_misc:merge_opts(?TCP_SOCKOPTS, TcpOpts)} | Opts1];
|
|
||||||
false ->
|
|
||||||
[{tcp_options, ?TCP_SOCKOPTS} | Opts]
|
|
||||||
end.
|
|
||||||
|
|
||||||
merge_udp_default(Opts) ->
|
|
||||||
case lists:keytake(udp_options, 1, Opts) of
|
|
||||||
{value, {udp_options, TcpOpts}, Opts1} ->
|
|
||||||
[{udp_options, emqx_misc:merge_opts(?UDP_SOCKOPTS, TcpOpts)} | Opts1];
|
|
||||||
false ->
|
|
||||||
[{udp_options, ?UDP_SOCKOPTS} | Opts]
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
handler_opts(Opts) ->
|
|
||||||
Scheme = proplists:get_value(scheme, Opts),
|
|
||||||
Host = proplists:get_value(host, Opts),
|
|
||||||
Port = proplists:get_value(port, Opts),
|
|
||||||
SvrAddr = lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])),
|
|
||||||
ClientOpts = case Scheme of
|
|
||||||
https ->
|
|
||||||
SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])),
|
|
||||||
#{gun_opts =>
|
|
||||||
#{transport => ssl,
|
|
||||||
transport_opts => SslOpts}};
|
|
||||||
_ -> #{}
|
|
||||||
end,
|
|
||||||
{SvrAddr, ClientOpts}.
|
|
|
@ -1,37 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_exproto_app).
|
|
||||||
|
|
||||||
-behaviour(application).
|
|
||||||
|
|
||||||
-emqx_plugin(extension).
|
|
||||||
|
|
||||||
-export([start/2, prep_stop/1, stop/1]).
|
|
||||||
|
|
||||||
start(_StartType, _StartArgs) ->
|
|
||||||
{ok, Sup} = emqx_exproto_sup:start_link(),
|
|
||||||
emqx_exproto:start_servers(),
|
|
||||||
emqx_exproto:start_listeners(),
|
|
||||||
{ok, Sup}.
|
|
||||||
|
|
||||||
prep_stop(State) ->
|
|
||||||
emqx_exproto:stop_servers(),
|
|
||||||
emqx_exproto:stop_listeners(),
|
|
||||||
State.
|
|
||||||
|
|
||||||
stop(_State) ->
|
|
||||||
ok.
|
|
|
@ -1,83 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_exproto_sup).
|
|
||||||
|
|
||||||
-behaviour(supervisor).
|
|
||||||
|
|
||||||
-export([start_link/0]).
|
|
||||||
|
|
||||||
-export([ start_grpc_server/3
|
|
||||||
, stop_grpc_server/1
|
|
||||||
, start_grpc_client_channel/3
|
|
||||||
, stop_grpc_client_channel/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([init/1]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% APIs
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
start_link() ->
|
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
|
||||||
|
|
||||||
-spec start_grpc_server(atom(), inet:port_number(), list())
|
|
||||||
-> {ok, pid()} | {error, term()}.
|
|
||||||
start_grpc_server(Name, Port, SSLOptions) ->
|
|
||||||
Services = #{protos => [emqx_exproto_pb],
|
|
||||||
services => #{'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr}
|
|
||||||
},
|
|
||||||
Options = case SSLOptions of
|
|
||||||
[] -> [];
|
|
||||||
_ ->
|
|
||||||
[{ssl_options, lists:keydelete(ssl, 1, SSLOptions)}]
|
|
||||||
end,
|
|
||||||
grpc:start_server(prefix(Name), Port, Services, Options).
|
|
||||||
|
|
||||||
-spec stop_grpc_server(atom()) -> ok.
|
|
||||||
stop_grpc_server(Name) ->
|
|
||||||
grpc:stop_server(prefix(Name)).
|
|
||||||
|
|
||||||
-spec start_grpc_client_channel(
|
|
||||||
atom(),
|
|
||||||
uri_string:uri_string(),
|
|
||||||
grpc_client:grpc_opts()) -> {ok, pid()} | {error, term()}.
|
|
||||||
start_grpc_client_channel(Name, SvrAddr, ClientOpts) ->
|
|
||||||
grpc_client_sup:create_channel_pool(Name, SvrAddr, ClientOpts).
|
|
||||||
|
|
||||||
-spec stop_grpc_client_channel(atom()) -> ok.
|
|
||||||
stop_grpc_client_channel(Name) ->
|
|
||||||
grpc_client_sup:stop_channel_pool(Name).
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
prefix(Name) when is_atom(Name) ->
|
|
||||||
"exproto:" ++ atom_to_list(Name);
|
|
||||||
prefix(Name) when is_binary(Name) ->
|
|
||||||
"exproto:" ++ binary_to_list(Name);
|
|
||||||
prefix(Name) when is_list(Name) ->
|
|
||||||
"exproto:" ++ Name.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Supervisor callbacks
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
init([]) ->
|
|
||||||
%% gRPC Client Pool
|
|
||||||
PoolSize = emqx_vm:schedulers() * 2,
|
|
||||||
Pool = emqx_pool_sup:spec([exproto_gcli_pool, hash, PoolSize,
|
|
||||||
{emqx_exproto_gcli, start_link, []}]),
|
|
||||||
{ok, {{one_for_one, 10, 5}, [Pool]}}.
|
|
Loading…
Reference in New Issue