diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 78198c4e4..d9980b829 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -20,7 +20,7 @@ -include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). --logger_header("[GW-Conn]"). +-logger_header("[PGW-Conn]"). %% API -export([ start_link/3 diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 270d7de4b..1708a417b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -14,7 +14,11 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc The gateway connection management +%% @doc The Gateway Connection-Manager +%% +%% For a certain type of protocol, this is a single instance of the manager. +%% It means that no matter how many instances of the stomp gateway are created, +%% they all share a single this Connection-Manager -module(emqx_gateway_cm). -behaviour(gen_server). @@ -57,19 +61,23 @@ ]). -record(state, { - type :: atom(), %% Gateway Id + type :: atom(), %% Gateway Type locker :: pid(), %% ClientId Locker for CM registry :: pid(), %% ClientId Registry server chan_pmon :: emqx_pmon:pmon() }). +-type option() :: {type, gateway_type()}. +-type options() :: list(option()). + -define(T_TAKEOVER, 15000). +-define(DEFAULT_BATCH_SIZE, 10000). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- -%% XXX: Options for cm process +-spec start_link(options()) -> {ok, pid()} | ignore | {error, any()}. start_link(Options) -> Type = proplists:get_value(type, Options), gen_server:start_link({local, procname(Type)}, ?MODULE, Options, []). @@ -416,7 +424,7 @@ handle_cast(_Msg, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state{type = Type, chan_pmon = PMon}) -> - ChanPids = [Pid | emqx_misc:drain_down(10000)], %% XXX: Fixed BATCH_SIZE + ChanPids = [Pid | emqx_misc:drain_down(?DEFAULT_BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), CmTabs = cmtabs(Type), diff --git a/apps/emqx_gateway/src/emqx_gateway_sup.erl b/apps/emqx_gateway/src/emqx_gateway_sup.erl index d56b27e52..c974060d0 100644 --- a/apps/emqx_gateway/src/emqx_gateway_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_sup.erl @@ -191,4 +191,3 @@ started_gateway_pid() -> is_a_gateway_id(Id) -> Id /= emqx_gateway_registry. - diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 04eca9e5d..8adf06249 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -41,6 +41,10 @@ , oom_policy/1 ]). +-export([ default_tcp_options/0 + , default_udp_options/0 + ]). + -define(ACTIVE_N, 100). -define(DEFAULT_IDLE_TIMEOUT, 30000). -define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}). @@ -172,3 +176,13 @@ stats_timer(Options) -> -spec enable_stats(map()) -> boolean(). enable_stats(Options) -> maps:get(enable_stats, Options, true). + +%%-------------------------------------------------------------------- +%% Envs2 + +default_tcp_options() -> + [binary, {packet, raw}, {reuseaddr, true}, + {nodelay, true}, {backlog, 512}]. + +default_udp_options() -> + [binary]. diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index 844df2b28..b55d30729 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -32,11 +32,6 @@ , on_insta_destroy/3 ]). --define(TCP_SOCKOPTS, [binary, {packet, raw}, {reuseaddr, true}, - {backlog, 512}, {nodelay, true}]). - --define(UDP_SOCKOPTS, []). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -186,21 +181,23 @@ name(InstaId, Type) -> merge_default_by_type(Type, Options) when Type =:= tcp; Type =:= ssl -> + Default = emqx_gateway_utils:default_tcp_options(), case lists:keytake(tcp_options, 1, Options) of {value, {tcp_options, TcpOpts}, Options1} -> - [{tcp_options, emqx_misc:merge_opts(?TCP_SOCKOPTS, TcpOpts)} + [{tcp_options, emqx_misc:merge_opts(Default, TcpOpts)} | Options1]; false -> - [{tcp_options, ?TCP_SOCKOPTS} | Options] + [{tcp_options, Default} | Options] end; merge_default_by_type(Type, Options) when Type =:= udp; Type =:= dtls -> + Default = emqx_gateway_utils:default_udp_options(), case lists:keytake(udp_options, 1, Options) of {value, {udp_options, TcpOpts}, Options1} -> - [{udp_options, emqx_misc:merge_opts(?UDP_SOCKOPTS, TcpOpts)} + [{udp_options, emqx_misc:merge_opts(Default, TcpOpts)} | Options1]; false -> - [{udp_options, ?UDP_SOCKOPTS} | Options] + [{udp_options, Default} | Options] end. stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> diff --git a/apps/emqx_gateway/src/exproto/include/emqx_exproto.hrl b/apps/emqx_gateway/src/exproto/include/emqx_exproto.hrl index a599b3039..75adfb6d1 100644 --- a/apps/emqx_gateway/src/exproto/include/emqx_exproto.hrl +++ b/apps/emqx_gateway/src/exproto/include/emqx_exproto.hrl @@ -19,7 +19,6 @@ -define(TCP_SOCKOPTS, [binary, {packet, raw}, {reuseaddr, true}, {backlog, 512}, {nodelay, true}]). -%% TODO: -define(UDP_SOCKOPTS, []). %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl index 11a52327a..e13b19e0a 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl @@ -102,7 +102,7 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1)) end), emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)), - emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername), + emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername), {ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}}; {error, Error} -> diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index 411ec7bb4..bd27c9fb5 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -32,8 +32,6 @@ , on_insta_destroy/3 ]). --define(UDP_SOCKOPTS, []). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -41,8 +39,7 @@ load() -> RegistryOptions = [ {cbkmod, ?MODULE} ], - YourOptions = [params1, params2], - emqx_gateway_registry:load(mqttsn, RegistryOptions, YourOptions). + emqx_gateway_registry:load(mqttsn, RegistryOptions, []). unload() -> emqx_gateway_registry:unload(mqttsn). @@ -140,11 +137,13 @@ name(InstaId, Type) -> list_to_atom(lists:concat([InstaId, ":", Type])). merge_default(Options) -> + Default = emqx_gateway_utils:default_udp_options(), case lists:keytake(udp_options, 1, Options) of {value, {udp_options, TcpOpts}, Options1} -> - [{udp_options, emqx_misc:merge_opts(?UDP_SOCKOPTS, TcpOpts)} | Options1]; + [{udp_options, emqx_misc:merge_opts(Default, TcpOpts)} + | Options1]; false -> - [{udp_options, ?UDP_SOCKOPTS} | Options] + [{udp_options, Default} | Options] end. stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index 487b15077..64d0b32e5 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -31,8 +31,6 @@ , on_insta_destroy/3 ]). --define(TCP_OPTS, [binary, {packet, raw}, {reuseaddr, true}, {nodelay, true}]). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -41,15 +39,13 @@ load() -> RegistryOptions = [ {cbkmod, ?MODULE} ], - - YourOptions = [param1, param2], - emqx_gateway_registry:load(stomp, RegistryOptions, YourOptions). + emqx_gateway_registry:load(stomp, RegistryOptions, []). -spec unload() -> ok | {error, any()}. unload() -> emqx_gateway_registry:unload(stomp). -init([param1, param2]) -> +init(_) -> GwState = #{}, {ok, GwState}. @@ -125,11 +121,13 @@ name(InstaId, Type) -> list_to_atom(lists:concat([InstaId, ":", Type])). merge_default(Options) -> + Default = emqx_gateway_utils:default_tcp_options(), case lists:keytake(tcp_options, 1, Options) of {value, {tcp_options, TcpOpts}, Options1} -> - [{tcp_options, emqx_misc:merge_opts(?TCP_OPTS, TcpOpts)} | Options1]; + [{tcp_options, emqx_misc:merge_opts(Default, TcpOpts)} + | Options1]; false -> - [{tcp_options, ?TCP_OPTS} | Options] + [{tcp_options, Default} | Options] end. stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->