diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 10e521a44..13b813928 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -145,7 +145,8 @@ init({Transport, RawSocket, Options}) -> ProtoState = emqx_protocol:init(#{peername => Peername, sockname => Sockname, peercert => Peercert, - sendfun => SendFun}, Options), + sendfun => SendFun, + conn_mod => ?MODULE}, Options), ParseState = emqx_protocol:parser(ProtoState), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), GcState = emqx_gc:init(GcPolicy), diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index a75fc25bf..9618a351e 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -67,7 +67,8 @@ connected, connected_at, ignore_loop, - topic_alias_maximum + topic_alias_maximum, + conn_mod }). -opaque(state() :: #pstate{}). @@ -84,7 +85,7 @@ %%------------------------------------------------------------------------------ -spec(init(map(), list()) -> state()). -init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> +init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> Zone = proplists:get_value(zone, Options), #pstate{zone = Zone, sendfun = SendFun, @@ -109,7 +110,8 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) send_stats = #{msg => 0, pkt => 0}, connected = false, ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), - topic_alias_maximum = #{to_client => 0, from_client => 0}}. + topic_alias_maximum = #{to_client => 0, from_client => 0}, + conn_mod = maps:get(conn_mod, SocketOpts, undefined)}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -153,7 +155,8 @@ attrs(#pstate{zone = Zone, mountpoint = Mountpoint, is_super = IsSuper, is_bridge = IsBridge, - connected_at = ConnectedAt}) -> + connected_at = ConnectedAt, + conn_mod = ConnMod}) -> [{zone, Zone}, {client_id, ClientId}, {username, Username}, @@ -166,7 +169,8 @@ attrs(#pstate{zone = Zone, {mountpoint, Mountpoint}, {is_super, IsSuper}, {is_bridge, IsBridge}, - {connected_at, ConnectedAt}]. + {connected_at, ConnectedAt}, + {conn_mod, ConnMod}]. attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> get_property('Receive-Maximum', ConnProps, 65535); diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 83434e760..c9fca61f9 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -128,8 +128,9 @@ websocket_init(#state{request = Req, options = Options}) -> ProtoState = emqx_protocol:init(#{peername => Peername, sockname => Sockname, peercert => Peercert, - sendfun => send_fun(self())}, Options), - ParseState = emqx_protocol:parser(ProtoState), + sendfun => send_fun(self()), + conn_mod => ?MODULE}, Options), + ParserState = emqx_protocol:parser(ProtoState), Zone = proplists:get_value(zone, Options), EnableStats = emqx_zone:get_env(Zone, enable_stats, true), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 2f8eccabe..9c9c3ab55 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -23,59 +23,6 @@ -include("emqx_mqtt.hrl"). --define(STATS, [{mailbox_len, _}, - {heap_size, _}, - {reductions, _}, - {recv_pkt, _}, - {recv_msg, _}, - {send_pkt, _}, - {send_msg, _}, - {recv_oct, _}, - {recv_cnt, _}, - {send_oct, _}, - {send_cnt, _}, - {send_pend, _}]). - --define(ATTRS, [{clean_start, _}, - {client_id, _}, - {connected_at, _}, - {is_bridge, _}, - {is_super, _}, - {keepalive, _}, - {mountpoint, _}, - {peercert, _}, - {peername, _}, - {proto_name, _}, - {proto_ver, _}, - {sockname, _}, - {username, _}, - {zone, _}]). - --define(INFO, [{ack_props, _}, - {active_n, _}, - {clean_start, _}, - {client_id, _}, - {conn_props, _}, - {conn_state, _}, - {connected_at, _}, - {enable_acl, _}, - {is_bridge, _}, - {is_super, _}, - {keepalive, _}, - {mountpoint, _}, - {peercert, _}, - {peername, _}, - {proto_name, _}, - {proto_ver, _}, - {pub_limit, _}, - {rate_limit, _}, - {session, _}, - {sockname, _}, - {socktype, _}, - {topic_aliases, _}, - {username, _}, - {zone, _}]). - all() -> [t_connect_api]. @@ -93,10 +40,33 @@ t_connect_api(_Config) -> {password, <<"pass1">>}]), {ok, _} = emqx_client:connect(T1), CPid = emqx_cm:lookup_conn_pid(<<"client1">>), - ?STATS = emqx_connection:stats(CPid), - ?ATTRS = emqx_connection:attrs(CPid), - ?INFO = emqx_connection:info(CPid), - SPid = emqx_connection:session(CPid), - true = is_pid(SPid), + ConnStats = emqx_connection:stats(CPid), + ok = t_stats(ConnStats), + ConnAttrs = emqx_connection:attrs(CPid), + ok = t_attrs(ConnAttrs), + ConnInfo = emqx_connection:info(CPid), + ok = t_info(ConnInfo), + SessionPid = emqx_connection:session(CPid), + true = is_pid(SessionPid), emqx_client:disconnect(T1). +t_info(ConnInfo) -> + ?assertEqual(tcp, proplists:get_value(socktype, ConnInfo)), + ?assertEqual(running, proplists:get_value(conn_state, ConnInfo)), + ?assertEqual(<<"client1">>, proplists:get_value(client_id, ConnInfo)), + ?assertEqual(<<"testuser1">>, proplists:get_value(username, ConnInfo)), + ?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, ConnInfo)). + +t_attrs(AttrsData) -> + ?assertEqual(<<"client1">>, proplists:get_value(client_id, AttrsData)), + ?assertEqual(emqx_connection, proplists:get_value(conn_mod, AttrsData)), + ?assertEqual(<<"testuser1">>, proplists:get_value(username, AttrsData)). + +t_stats(StatsData) -> + ?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0), + ?assertEqual(true, proplists:get_value(mailbox_len, StatsData) >= 0), + ?assertEqual(true, proplists:get_value(heap_size, StatsData) >= 0), + ?assertEqual(true, proplists:get_value(reductions, StatsData) >=0), + ?assertEqual(true, proplists:get_value(recv_pkt, StatsData) =:=1), + ?assertEqual(true, proplists:get_value(recv_msg, StatsData) >=0), + ?assertEqual(true, proplists:get_value(send_pkt, StatsData) =:=1). diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 7fada411a..77830ef93 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -60,7 +60,8 @@ connected, connected_at, ignore_loop, - topic_alias_maximum + topic_alias_maximum, + conn_mod }). @@ -88,7 +89,8 @@ send_stats = SendStats, connected = false, ignore_loop = false, - topic_alias_maximum = #{to_client => 0, from_client => 0}}). + topic_alias_maximum = #{to_client => 0, from_client => 0}, + conn_mod = emqx_connection}). all() -> [ diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index c3698861b..c45344bae 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -35,56 +35,6 @@ -define(PUBQOS, 1). --define(INFO, [{socktype, _}, - {conn_state, _}, - {peername, _}, - {sockname, _}, - {zone, _}, - {client_id, <<"mqtt_client">>}, - {username, <<"admin">>}, - {peername, _}, - {peercert, _}, - {proto_ver, _}, - {proto_name, _}, - {clean_start, _}, - {keepalive, _}, - {mountpoint, _}, - {is_super, _}, - {is_bridge, _}, - {connected_at, _}, - {conn_props, _}, - {ack_props, _}, - {session, _}, - {topic_aliases, _}, - {enable_acl, _}]). - --define(ATTRS, [{clean_start,true}, - {client_id, <<"mqtt_client">>}, - {connected_at, _}, - {is_bridge, _}, - {is_super, _}, - {keepalive, _}, - {mountpoint, _}, - {peercert, _}, - {peername, _}, - {proto_name, _}, - {proto_ver, _}, - {sockname, _}, - {username, <<"admin">>}, - {zone, _}]). - --define(STATS, [{recv_oct, _}, - {recv_cnt, _}, - {send_oct, _}, - {send_cnt, _}, - {mailbox_len, _}, - {heap_size, _}, - {reductions, _}, - {recv_pkt, _}, - {recv_msg, _}, - {send_pkt, _}, - {send_msg, _}]). - all() -> [t_ws_connect_api]. @@ -103,9 +53,12 @@ t_ws_connect_api(_Config) -> {binary, CONACK} = rfc6455_client:recv(WS), {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK), Pid = emqx_cm:lookup_conn_pid(<<"mqtt_client">>), - ?INFO = emqx_ws_connection:info(Pid), - ?ATTRS = emqx_ws_connection:attrs(Pid), - ?STATS = emqx_ws_connection:stats(Pid), + ConnInfo = emqx_ws_connection:info(Pid), + ok = t_info(ConnInfo), + ConnAttrs = emqx_ws_connection:attrs(Pid), + ok = t_attrs(ConnAttrs), + ConnStats = emqx_ws_connection:stats(Pid), + ok = t_stats(ConnStats), SessionPid = emqx_ws_connection:session(Pid), true = is_pid(SessionPid), ok = emqx_ws_connection:kick(Pid), @@ -118,3 +71,24 @@ raw_send_serialize(Packet) -> raw_recv_pase(P) -> emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, version => ?MQTT_PROTO_V4} }). + +t_info(InfoData) -> + ?assertEqual(websocket, proplists:get_value(socktype, InfoData)), + ?assertEqual(running, proplists:get_value(conn_state, InfoData)), + ?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, InfoData)), + ?assertEqual(<<"admin">>, proplists:get_value(username, InfoData)), + ?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, InfoData)). + +t_attrs(AttrsData) -> + ?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, AttrsData)), + ?assertEqual(emqx_ws_connection, proplists:get_value(conn_mod, AttrsData)), + ?assertEqual(<<"admin">>, proplists:get_value(username, AttrsData)). + +t_stats(StatsData) -> + ?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0), + ?assertEqual(true, proplists:get_value(mailbox_len, StatsData) >= 0), + ?assertEqual(true, proplists:get_value(heap_size, StatsData) >= 0), + ?assertEqual(true, proplists:get_value(reductions, StatsData) >=0), + ?assertEqual(true, proplists:get_value(recv_pkt, StatsData) =:=1), + ?assertEqual(true, proplists:get_value(recv_msg, StatsData) >=0), + ?assertEqual(true, proplists:get_value(send_pkt, StatsData) =:=1). \ No newline at end of file