diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index b00bb9012..a40e7b2e3 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -74,6 +74,6 @@ drop_bridge(Id) -> ok -> supervisor:delete_child(?SUP, Id); Error -> - ?LOG(error, "[Bridge] Delete bridge failed", [Error]), + ?LOG(error, "[Bridge] Delete bridge failed, error : ~p", [Error]), Error end. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 737c699ac..1c9b26a0f 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -929,7 +929,7 @@ handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> {stop, {shutdown, Reason}, State}; handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) -> - ?LOG(error, "[Client] State: ~s, Unexpected Event: (info, ~p)", + ?LOG(info, "[Client] State: ~s, Unexpected Event: (info, ~p)", [StateName, EventContent]), keep_state_and_data; diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index e9cfd6ae4..5beee28bb 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -87,15 +87,15 @@ info(#state{transport = Transport, rate_limit = RateLimit, pub_limit = PubLimit, proto_state = ProtoState}) -> - ConnInfo = [{socktype, Transport:type(Socket)}, - {peername, Peername}, - {sockname, Sockname}, - {conn_state, ConnState}, - {active_n, ActiveN}, - {rate_limit, rate_limit_info(RateLimit)}, - {pub_limit, rate_limit_info(PubLimit)}], + ConnInfo = #{socktype => Transport:type(Socket), + peername => Peername, + sockname => Sockname, + conn_state => ConnState, + active_n => ActiveN, + rate_limit => rate_limit_info(RateLimit), + pub_limit => rate_limit_info(PubLimit)}, ProtoInfo = emqx_protocol:info(ProtoState), - lists:usort(lists:append(ConnInfo, ProtoInfo)). + maps:merge(ConnInfo, ProtoInfo). rate_limit_info(undefined) -> #{}; @@ -109,10 +109,10 @@ attrs(CPid) when is_pid(CPid) -> attrs(#state{peername = Peername, sockname = Sockname, proto_state = ProtoState}) -> - SockAttrs = [{peername, Peername}, - {sockname, Sockname}], + SockAttrs = #{peername => Peername, + sockname => Sockname}, ProtoAttrs = emqx_protocol:attrs(ProtoState), - lists:usort(lists:append(SockAttrs, ProtoAttrs)). + maps:merge(SockAttrs, ProtoAttrs). %% Conn stats stats(CPid) when is_pid(CPid) -> diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index d2ce98abc..9789474d7 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -42,12 +42,15 @@ load(Env) -> on_client_connected(#{client_id := ClientId, username := Username, peername := {IpAddr, _}}, ConnAck, ConnAttrs, Env) -> - Attrs = lists:filter(fun({K, _}) -> lists:member(K, ?ATTR_KEYS) end, ConnAttrs), - case emqx_json:safe_encode([{clientid, ClientId}, - {username, Username}, - {ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))}, - {connack, ConnAck}, - {ts, os:system_time(second)} | Attrs]) of + Attrs = maps:filter(fun(K, _) -> + lists:member(K, ?ATTR_KEYS) + end, ConnAttrs), + case emqx_json:safe_encode(Attrs#{clientid => ClientId, + username => Username, + ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), + connack => ConnAck, + ts => os:system_time(second) + }) of {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> @@ -84,4 +87,3 @@ qos(Env) -> proplists:get_value(qos, Env, 0). reason(Reason) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; reason(_) -> internal_error. - diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index a8bc0b288..54baac636 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -68,7 +68,8 @@ ignore_loop, topic_alias_maximum, conn_mod, - credentials + credentials, + ws_cookie }). -opaque(state() :: #pstate{}). @@ -85,7 +86,9 @@ %%------------------------------------------------------------------------------ -spec(init(map(), list()) -> state()). -init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> +init(SocketOpts = #{ peername := Peername + , peercert := Peercert + , sendfun := SendFun}, Options) -> Zone = proplists:get_value(zone, Options), #pstate{zone = Zone, sendfun = SendFun, @@ -110,7 +113,8 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), topic_alias_maximum = #{to_client => 0, from_client => 0}, conn_mod = maps:get(conn_mod, SocketOpts, undefined), - credentials = #{}}. + credentials = #{}, + ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -135,12 +139,13 @@ info(PState = #pstate{conn_props = ConnProps, topic_aliases = Aliases, will_msg = WillMsg, enable_acl = EnableAcl}) -> - attrs(PState) ++ [{conn_props, ConnProps}, - {ack_props, AckProps}, - {session, Session}, - {topic_aliases, Aliases}, - {will_msg, WillMsg}, - {enable_acl, EnableAcl}]. + maps:merge(attrs(PState), #{conn_props => ConnProps, + ack_props => AckProps, + session => Session, + topic_aliases => Aliases, + will_msg => WillMsg, + enable_acl => EnableAcl + }). attrs(#pstate{zone = Zone, client_id = ClientId, @@ -155,20 +160,20 @@ attrs(#pstate{zone = Zone, connected_at = ConnectedAt, conn_mod = ConnMod, credentials = Credentials}) -> - [{zone, Zone}, - {client_id, ClientId}, - {username, Username}, - {peername, Peername}, - {peercert, Peercert}, - {proto_ver, ProtoVer}, - {proto_name, ProtoName}, - {clean_start, CleanStart}, - {keepalive, Keepalive}, - {is_bridge, IsBridge}, - {connected_at, ConnectedAt}, - {conn_mod, ConnMod}, - {credentials, Credentials} - ]. + #{ zone => Zone + , client_id => ClientId + , username => Username + , peername => Peername + , peercert => Peercert + , proto_ver => ProtoVer + , proto_name => ProtoName + , clean_start => CleanStart + , keepalive => Keepalive + , is_bridge => IsBridge + , connected_at => ConnectedAt + , conn_mod => ConnMod + , credentials => Credentials + }. attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> get_property('Receive-Maximum', ConnProps, 65535); @@ -202,11 +207,13 @@ credentials(#pstate{zone = Zone, client_id = ClientId, username = Username, peername = Peername, - peercert = Peercert}) -> + peercert = Peercert, + ws_cookie = WsCookie}) -> with_cert(#{zone => Zone, client_id => ClientId, username => Username, peername => Peername, + ws_cookie => WsCookie, mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert). with_cert(Credentials, undefined) -> Credentials; diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 92ae6a7dd..44e1eb681 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -61,11 +61,11 @@ info(#state{peername = Peername, sockname = Sockname, proto_state = ProtoState}) -> ProtoInfo = emqx_protocol:info(ProtoState), - ConnInfo = [{socktype, websocket}, - {conn_state, running}, - {peername, Peername}, - {sockname, Sockname}], - lists:append([ConnInfo, ProtoInfo]). + ConnInfo = #{socktype => websocket, + conn_state => running, + peername => Peername, + sockname => Sockname}, + maps:merge(ProtoInfo, ConnInfo). %% for dashboard attrs(WSPid) when is_pid(WSPid) -> @@ -74,10 +74,10 @@ attrs(WSPid) when is_pid(WSPid) -> attrs(#state{peername = Peername, sockname = Sockname, proto_state = ProtoState}) -> - SockAttrs = [{peername, Peername}, - {sockname, Sockname}], + SockAttrs = #{peername => Peername, + sockname => Sockname}, ProtoAttrs = emqx_protocol:attrs(ProtoState), - lists:usort(lists:append(SockAttrs, ProtoAttrs)). + maps:merge(SockAttrs, ProtoAttrs). stats(WSPid) when is_pid(WSPid) -> call(WSPid, stats); @@ -138,10 +138,22 @@ websocket_init(#state{request = Req, options = Options}) -> Peername = cowboy_req:peer(Req), Sockname = cowboy_req:sock(Req), Peercert = cowboy_req:cert(Req), + WsCookie = try cowboy_req:parse_cookies(Req) + catch + error:badarg -> + ?LOG(error, "[WS Connection] Illegal cookie"), + undefined; + Error:Reason -> + ?LOG(error, + "[WS Connection] Cookie is parsed failed, Error: ~p, Reason ~p", + [Error, Reason]), + undefined + end, ProtoState = emqx_protocol:init(#{peername => Peername, sockname => Sockname, peercert => Peercert, sendfun => send_fun(self()), + ws_cookie => WsCookie, conn_mod => ?MODULE}, Options), ParserState = emqx_protocol:parser(ProtoState), Zone = proplists:get_value(zone, Options), diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 9c9c3ab55..bd2293c97 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -51,16 +51,16 @@ t_connect_api(_Config) -> emqx_client:disconnect(T1). t_info(ConnInfo) -> - ?assertEqual(tcp, proplists:get_value(socktype, ConnInfo)), - ?assertEqual(running, proplists:get_value(conn_state, ConnInfo)), - ?assertEqual(<<"client1">>, proplists:get_value(client_id, ConnInfo)), - ?assertEqual(<<"testuser1">>, proplists:get_value(username, ConnInfo)), - ?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, ConnInfo)). + ?assertEqual(tcp, maps:get(socktype, ConnInfo)), + ?assertEqual(running, maps:get(conn_state, ConnInfo)), + ?assertEqual(<<"client1">>, maps:get(client_id, ConnInfo)), + ?assertEqual(<<"testuser1">>, maps:get(username, ConnInfo)), + ?assertEqual(<<"MQTT">>, maps:get(proto_name, ConnInfo)). t_attrs(AttrsData) -> - ?assertEqual(<<"client1">>, proplists:get_value(client_id, AttrsData)), - ?assertEqual(emqx_connection, proplists:get_value(conn_mod, AttrsData)), - ?assertEqual(<<"testuser1">>, proplists:get_value(username, AttrsData)). + ?assertEqual(<<"client1">>, maps:get(client_id, AttrsData)), + ?assertEqual(emqx_connection, maps:get(conn_mod, AttrsData)), + ?assertEqual(<<"testuser1">>, maps:get(username, AttrsData)). t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0), diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index c45344bae..289608428 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -73,16 +73,16 @@ raw_recv_pase(P) -> version => ?MQTT_PROTO_V4} }). t_info(InfoData) -> - ?assertEqual(websocket, proplists:get_value(socktype, InfoData)), - ?assertEqual(running, proplists:get_value(conn_state, InfoData)), - ?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, InfoData)), - ?assertEqual(<<"admin">>, proplists:get_value(username, InfoData)), - ?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, InfoData)). + ?assertEqual(websocket, maps:get(socktype, InfoData)), + ?assertEqual(running, maps:get(conn_state, InfoData)), + ?assertEqual(<<"mqtt_client">>, maps:get(client_id, InfoData)), + ?assertEqual(<<"admin">>, maps:get(username, InfoData)), + ?assertEqual(<<"MQTT">>, maps:get(proto_name, InfoData)). t_attrs(AttrsData) -> - ?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, AttrsData)), - ?assertEqual(emqx_ws_connection, proplists:get_value(conn_mod, AttrsData)), - ?assertEqual(<<"admin">>, proplists:get_value(username, AttrsData)). + ?assertEqual(<<"mqtt_client">>, maps:get(client_id, AttrsData)), + ?assertEqual(emqx_ws_connection, maps:get(conn_mod, AttrsData)), + ?assertEqual(<<"admin">>, maps:get(username, AttrsData)). t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0), @@ -91,4 +91,4 @@ t_stats(StatsData) -> ?assertEqual(true, proplists:get_value(reductions, StatsData) >=0), ?assertEqual(true, proplists:get_value(recv_pkt, StatsData) =:=1), ?assertEqual(true, proplists:get_value(recv_msg, StatsData) >=0), - ?assertEqual(true, proplists:get_value(send_pkt, StatsData) =:=1). \ No newline at end of file + ?assertEqual(true, proplists:get_value(send_pkt, StatsData) =:=1). diff --git a/test/rfc6455_client.erl b/test/rfc6455_client.erl index f5d8f1ef4..987b72407 100644 --- a/test/rfc6455_client.erl +++ b/test/rfc6455_client.erl @@ -36,7 +36,7 @@ new(WsUrl, PPid) -> addr = Addr, path = "/" ++ Path, ppid = PPid}, - spawn(fun () -> + spawn(fun() -> start_conn(State) end).