chore(gw): some format improvement

This commit is contained in:
JianBo He 2021-07-23 14:13:42 +08:00
parent 6f084fe6cf
commit 67bb8d0564
9 changed files with 45 additions and 31 deletions

View File

@ -20,7 +20,7 @@
-include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-logger_header("[GW-Conn]"). -logger_header("[PGW-Conn]").
%% API %% API
-export([ start_link/3 -export([ start_link/3

View File

@ -14,7 +14,11 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc The gateway connection management %% @doc The Gateway Connection-Manager
%%
%% For a certain type of protocol, this is a single instance of the manager.
%% It means that no matter how many instances of the stomp gateway are created,
%% they all share a single this Connection-Manager
-module(emqx_gateway_cm). -module(emqx_gateway_cm).
-behaviour(gen_server). -behaviour(gen_server).
@ -57,19 +61,23 @@
]). ]).
-record(state, { -record(state, {
type :: atom(), %% Gateway Id type :: atom(), %% Gateway Type
locker :: pid(), %% ClientId Locker for CM locker :: pid(), %% ClientId Locker for CM
registry :: pid(), %% ClientId Registry server registry :: pid(), %% ClientId Registry server
chan_pmon :: emqx_pmon:pmon() chan_pmon :: emqx_pmon:pmon()
}). }).
-type option() :: {type, gateway_type()}.
-type options() :: list(option()).
-define(T_TAKEOVER, 15000). -define(T_TAKEOVER, 15000).
-define(DEFAULT_BATCH_SIZE, 10000).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% XXX: Options for cm process -spec start_link(options()) -> {ok, pid()} | ignore | {error, any()}.
start_link(Options) -> start_link(Options) ->
Type = proplists:get_value(type, Options), Type = proplists:get_value(type, Options),
gen_server:start_link({local, procname(Type)}, ?MODULE, Options, []). gen_server:start_link({local, procname(Type)}, ?MODULE, Options, []).
@ -416,7 +424,7 @@ handle_cast(_Msg, State) ->
handle_info({'DOWN', _MRef, process, Pid, _Reason}, handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{type = Type, chan_pmon = PMon}) -> State = #state{type = Type, chan_pmon = PMon}) ->
ChanPids = [Pid | emqx_misc:drain_down(10000)], %% XXX: Fixed BATCH_SIZE ChanPids = [Pid | emqx_misc:drain_down(?DEFAULT_BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
CmTabs = cmtabs(Type), CmTabs = cmtabs(Type),

View File

@ -191,4 +191,3 @@ started_gateway_pid() ->
is_a_gateway_id(Id) -> is_a_gateway_id(Id) ->
Id /= emqx_gateway_registry. Id /= emqx_gateway_registry.

View File

@ -41,6 +41,10 @@
, oom_policy/1 , oom_policy/1
]). ]).
-export([ default_tcp_options/0
, default_udp_options/0
]).
-define(ACTIVE_N, 100). -define(ACTIVE_N, 100).
-define(DEFAULT_IDLE_TIMEOUT, 30000). -define(DEFAULT_IDLE_TIMEOUT, 30000).
-define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}). -define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}).
@ -172,3 +176,13 @@ stats_timer(Options) ->
-spec enable_stats(map()) -> boolean(). -spec enable_stats(map()) -> boolean().
enable_stats(Options) -> enable_stats(Options) ->
maps:get(enable_stats, Options, true). maps:get(enable_stats, Options, true).
%%--------------------------------------------------------------------
%% Envs2
default_tcp_options() ->
[binary, {packet, raw}, {reuseaddr, true},
{nodelay, true}, {backlog, 512}].
default_udp_options() ->
[binary].

View File

@ -32,11 +32,6 @@
, on_insta_destroy/3 , on_insta_destroy/3
]). ]).
-define(TCP_SOCKOPTS, [binary, {packet, raw}, {reuseaddr, true},
{backlog, 512}, {nodelay, true}]).
-define(UDP_SOCKOPTS, []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -186,21 +181,23 @@ name(InstaId, Type) ->
merge_default_by_type(Type, Options) when Type =:= tcp; merge_default_by_type(Type, Options) when Type =:= tcp;
Type =:= ssl -> Type =:= ssl ->
Default = emqx_gateway_utils:default_tcp_options(),
case lists:keytake(tcp_options, 1, Options) of case lists:keytake(tcp_options, 1, Options) of
{value, {tcp_options, TcpOpts}, Options1} -> {value, {tcp_options, TcpOpts}, Options1} ->
[{tcp_options, emqx_misc:merge_opts(?TCP_SOCKOPTS, TcpOpts)} [{tcp_options, emqx_misc:merge_opts(Default, TcpOpts)}
| Options1]; | Options1];
false -> false ->
[{tcp_options, ?TCP_SOCKOPTS} | Options] [{tcp_options, Default} | Options]
end; end;
merge_default_by_type(Type, Options) when Type =:= udp; merge_default_by_type(Type, Options) when Type =:= udp;
Type =:= dtls -> Type =:= dtls ->
Default = emqx_gateway_utils:default_udp_options(),
case lists:keytake(udp_options, 1, Options) of case lists:keytake(udp_options, 1, Options) of
{value, {udp_options, TcpOpts}, Options1} -> {value, {udp_options, TcpOpts}, Options1} ->
[{udp_options, emqx_misc:merge_opts(?UDP_SOCKOPTS, TcpOpts)} [{udp_options, emqx_misc:merge_opts(Default, TcpOpts)}
| Options1]; | Options1];
false -> false ->
[{udp_options, ?UDP_SOCKOPTS} | Options] [{udp_options, Default} | Options]
end. end.
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->

View File

@ -19,7 +19,6 @@
-define(TCP_SOCKOPTS, [binary, {packet, raw}, {reuseaddr, true}, -define(TCP_SOCKOPTS, [binary, {packet, raw}, {reuseaddr, true},
{backlog, 512}, {nodelay, true}]). {backlog, 512}, {nodelay, true}]).
%% TODO:
-define(UDP_SOCKOPTS, []). -define(UDP_SOCKOPTS, []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -32,8 +32,6 @@
, on_insta_destroy/3 , on_insta_destroy/3
]). ]).
-define(UDP_SOCKOPTS, []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -41,8 +39,7 @@
load() -> load() ->
RegistryOptions = [ {cbkmod, ?MODULE} RegistryOptions = [ {cbkmod, ?MODULE}
], ],
YourOptions = [params1, params2], emqx_gateway_registry:load(mqttsn, RegistryOptions, []).
emqx_gateway_registry:load(mqttsn, RegistryOptions, YourOptions).
unload() -> unload() ->
emqx_gateway_registry:unload(mqttsn). emqx_gateway_registry:unload(mqttsn).
@ -140,11 +137,13 @@ name(InstaId, Type) ->
list_to_atom(lists:concat([InstaId, ":", Type])). list_to_atom(lists:concat([InstaId, ":", Type])).
merge_default(Options) -> merge_default(Options) ->
Default = emqx_gateway_utils:default_udp_options(),
case lists:keytake(udp_options, 1, Options) of case lists:keytake(udp_options, 1, Options) of
{value, {udp_options, TcpOpts}, Options1} -> {value, {udp_options, TcpOpts}, Options1} ->
[{udp_options, emqx_misc:merge_opts(?UDP_SOCKOPTS, TcpOpts)} | Options1]; [{udp_options, emqx_misc:merge_opts(Default, TcpOpts)}
| Options1];
false -> false ->
[{udp_options, ?UDP_SOCKOPTS} | Options] [{udp_options, Default} | Options]
end. end.
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->

View File

@ -31,8 +31,6 @@
, on_insta_destroy/3 , on_insta_destroy/3
]). ]).
-define(TCP_OPTS, [binary, {packet, raw}, {reuseaddr, true}, {nodelay, true}]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -41,15 +39,13 @@
load() -> load() ->
RegistryOptions = [ {cbkmod, ?MODULE} RegistryOptions = [ {cbkmod, ?MODULE}
], ],
emqx_gateway_registry:load(stomp, RegistryOptions, []).
YourOptions = [param1, param2],
emqx_gateway_registry:load(stomp, RegistryOptions, YourOptions).
-spec unload() -> ok | {error, any()}. -spec unload() -> ok | {error, any()}.
unload() -> unload() ->
emqx_gateway_registry:unload(stomp). emqx_gateway_registry:unload(stomp).
init([param1, param2]) -> init(_) ->
GwState = #{}, GwState = #{},
{ok, GwState}. {ok, GwState}.
@ -125,11 +121,13 @@ name(InstaId, Type) ->
list_to_atom(lists:concat([InstaId, ":", Type])). list_to_atom(lists:concat([InstaId, ":", Type])).
merge_default(Options) -> merge_default(Options) ->
Default = emqx_gateway_utils:default_tcp_options(),
case lists:keytake(tcp_options, 1, Options) of case lists:keytake(tcp_options, 1, Options) of
{value, {tcp_options, TcpOpts}, Options1} -> {value, {tcp_options, TcpOpts}, Options1} ->
[{tcp_options, emqx_misc:merge_opts(?TCP_OPTS, TcpOpts)} | Options1]; [{tcp_options, emqx_misc:merge_opts(Default, TcpOpts)}
| Options1];
false -> false ->
[{tcp_options, ?TCP_OPTS} | Options] [{tcp_options, Default} | Options]
end. end.
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->