Use conn_mod instead of socktype

This commit is contained in:
linjun 2019-02-28 11:32:05 +08:00
parent 1fe7431370
commit 16165ce935
5 changed files with 21 additions and 13 deletions

View File

@ -62,14 +62,17 @@ start_link(Transport, Socket, Options) ->
info(CPid) when is_pid(CPid) -> info(CPid) when is_pid(CPid) ->
call(CPid, info); call(CPid, info);
info(#state{peername = Peername, info(#state{transport = Transport,
socket = Socket,
peername = Peername,
sockname = Sockname, sockname = Sockname,
conn_state = ConnState, conn_state = ConnState,
active_n = ActiveN, active_n = ActiveN,
rate_limit = RateLimit, rate_limit = RateLimit,
pub_limit = PubLimit, pub_limit = PubLimit,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
ConnInfo = [{peername, Peername}, ConnInfo = [{socktype, Transport:type(Socket)},
{peername, Peername},
{sockname, Sockname}, {sockname, Sockname},
{conn_state, ConnState}, {conn_state, ConnState},
{active_n, ActiveN}, {active_n, ActiveN},
@ -131,11 +134,11 @@ init([Transport, RawSocket, Options]) ->
EnableStats = emqx_zone:get_env(Zone, enable_stats, true), EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
SendFun = send_fun(Transport, Socket), SendFun = send_fun(Transport, Socket),
SockType = Transport:type(Socket),
ProtoState = emqx_protocol:init(#{peername => Peername, ProtoState = emqx_protocol:init(#{peername => Peername,
sockname => Sockname, sockname => Sockname,
peercert => Peercert, peercert => Peercert,
sendfun => SendFun}, Options ++ [{socktype, SockType}]), sendfun => SendFun,
conn_mod => ?MODULE}, Options),
ParserState = emqx_protocol:parser(ProtoState), ParserState = emqx_protocol:parser(ProtoState),
GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
GcState = emqx_gc:init(GcPolicy), GcState = emqx_gc:init(GcPolicy),

View File

@ -66,7 +66,7 @@
connected_at, connected_at,
ignore_loop, ignore_loop,
topic_alias_maximum, topic_alias_maximum,
socktype conn_mod
}). }).
-opaque(state() :: #pstate{}). -opaque(state() :: #pstate{}).
@ -83,9 +83,9 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec(init(map(), list()) -> state()). -spec(init(map(), list()) -> state()).
init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) ->
ConnMod = maps:get(conn_mod, SocketOpts),
Zone = proplists:get_value(zone, Options), Zone = proplists:get_value(zone, Options),
SockType = proplists:get_value(socktype, Options, tcp),
#pstate{zone = Zone, #pstate{zone = Zone,
sendfun = SendFun, sendfun = SendFun,
peername = Peername, peername = Peername,
@ -110,7 +110,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
connected = false, connected = false,
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
topic_alias_maximum = #{to_client => 0, from_client => 0}, topic_alias_maximum = #{to_client => 0, from_client => 0},
socktype = SockType}. conn_mod = ConnMod}.
init_username(Peercert, Options) -> init_username(Peercert, Options) ->
case proplists:get_value(peer_cert_as_username, Options) of case proplists:get_value(peer_cert_as_username, Options) of
@ -153,7 +153,7 @@ attrs(#pstate{zone = Zone,
is_super = IsSuper, is_super = IsSuper,
is_bridge = IsBridge, is_bridge = IsBridge,
connected_at = ConnectedAt, connected_at = ConnectedAt,
socktype = SockType}) -> conn_mod = ConnMod}) ->
[{zone, Zone}, [{zone, Zone},
{client_id, ClientId}, {client_id, ClientId},
{username, Username}, {username, Username},
@ -167,7 +167,7 @@ attrs(#pstate{zone = Zone,
{is_super, IsSuper}, {is_super, IsSuper},
{is_bridge, IsBridge}, {is_bridge, IsBridge},
{connected_at, ConnectedAt}, {connected_at, ConnectedAt},
{socktype, SockType}]. {conn_mod, ConnMod}].
attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
get_property('Receive-Maximum', ConnProps, 65535); get_property('Receive-Maximum', ConnProps, 65535);

View File

@ -58,7 +58,8 @@ info(#state{peername = Peername,
sockname = Sockname, sockname = Sockname,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
ProtoInfo = emqx_protocol:info(ProtoState), ProtoInfo = emqx_protocol:info(ProtoState),
ConnInfo = [{conn_state, running}, ConnInfo = [{socktype, websocket},
{conn_state, running},
{peername, Peername}, {peername, Peername},
{sockname, Sockname}], {sockname, Sockname}],
lists:append([ConnInfo, ProtoInfo]). lists:append([ConnInfo, ProtoInfo]).
@ -126,7 +127,8 @@ websocket_init(#state{request = Req, options = Options}) ->
ProtoState = emqx_protocol:init(#{peername => Peername, ProtoState = emqx_protocol:init(#{peername => Peername,
sockname => Sockname, sockname => Sockname,
peercert => Peercert, peercert => Peercert,
sendfun => send_fun(self())}, Options ++ [{socktype, websocket}]), sendfun => send_fun(self()),
conn_mod => ?MODULE}, Options),
ParserState = emqx_protocol:parser(ProtoState), ParserState = emqx_protocol:parser(ProtoState),
Zone = proplists:get_value(zone, Options), Zone = proplists:get_value(zone, Options),
EnableStats = emqx_zone:get_env(Zone, enable_stats, true), EnableStats = emqx_zone:get_env(Zone, enable_stats, true),

View File

@ -38,6 +38,7 @@
-define(ATTRS, [{clean_start, _}, -define(ATTRS, [{clean_start, _},
{client_id, _}, {client_id, _},
{conn_mod, _},
{connected_at, _}, {connected_at, _},
{is_bridge, _}, {is_bridge, _},
{is_super, _}, {is_super, _},
@ -56,6 +57,7 @@
{active_n, _}, {active_n, _},
{clean_start, _}, {clean_start, _},
{client_id, _}, {client_id, _},
{conn_mod, _},
{conn_props, _}, {conn_props, _},
{conn_state, _}, {conn_state, _},
{connected_at, _}, {connected_at, _},

View File

@ -52,6 +52,7 @@
{is_super, _}, {is_super, _},
{is_bridge, _}, {is_bridge, _},
{connected_at, _}, {connected_at, _},
{conn_mod, _},
{conn_props, _}, {conn_props, _},
{ack_props, _}, {ack_props, _},
{session, _}, {session, _},
@ -60,6 +61,7 @@
-define(ATTRS, [{clean_start,true}, -define(ATTRS, [{clean_start,true},
{client_id, <<"mqtt_client">>}, {client_id, <<"mqtt_client">>},
{conn_mod, emqx_connection},
{connected_at, _}, {connected_at, _},
{is_bridge, _}, {is_bridge, _},
{is_super, _}, {is_super, _},
@ -70,7 +72,6 @@
{proto_name, _}, {proto_name, _},
{proto_ver, _}, {proto_ver, _},
{sockname, _}, {sockname, _},
{socktype, _},
{username, <<"admin">>}, {username, <<"admin">>},
{zone, _}]). {zone, _}]).