Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-04-13 14:48:50 +08:00
commit 0c104faef7
9 changed files with 91 additions and 70 deletions

View File

@ -74,6 +74,6 @@ drop_bridge(Id) ->
ok -> ok ->
supervisor:delete_child(?SUP, Id); supervisor:delete_child(?SUP, Id);
Error -> Error ->
?LOG(error, "[Bridge] Delete bridge failed", [Error]), ?LOG(error, "[Bridge] Delete bridge failed, error : ~p", [Error]),
Error Error
end. end.

View File

@ -929,7 +929,7 @@ handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
{stop, {shutdown, Reason}, State}; {stop, {shutdown, Reason}, State};
handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) -> handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) ->
?LOG(error, "[Client] State: ~s, Unexpected Event: (info, ~p)", ?LOG(info, "[Client] State: ~s, Unexpected Event: (info, ~p)",
[StateName, EventContent]), [StateName, EventContent]),
keep_state_and_data; keep_state_and_data;

View File

@ -87,15 +87,15 @@ info(#state{transport = Transport,
rate_limit = RateLimit, rate_limit = RateLimit,
pub_limit = PubLimit, pub_limit = PubLimit,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
ConnInfo = [{socktype, Transport:type(Socket)}, ConnInfo = #{socktype => Transport:type(Socket),
{peername, Peername}, peername => Peername,
{sockname, Sockname}, sockname => Sockname,
{conn_state, ConnState}, conn_state => ConnState,
{active_n, ActiveN}, active_n => ActiveN,
{rate_limit, rate_limit_info(RateLimit)}, rate_limit => rate_limit_info(RateLimit),
{pub_limit, rate_limit_info(PubLimit)}], pub_limit => rate_limit_info(PubLimit)},
ProtoInfo = emqx_protocol:info(ProtoState), ProtoInfo = emqx_protocol:info(ProtoState),
lists:usort(lists:append(ConnInfo, ProtoInfo)). maps:merge(ConnInfo, ProtoInfo).
rate_limit_info(undefined) -> rate_limit_info(undefined) ->
#{}; #{};
@ -109,10 +109,10 @@ attrs(CPid) when is_pid(CPid) ->
attrs(#state{peername = Peername, attrs(#state{peername = Peername,
sockname = Sockname, sockname = Sockname,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername}, SockAttrs = #{peername => Peername,
{sockname, Sockname}], sockname => Sockname},
ProtoAttrs = emqx_protocol:attrs(ProtoState), ProtoAttrs = emqx_protocol:attrs(ProtoState),
lists:usort(lists:append(SockAttrs, ProtoAttrs)). maps:merge(SockAttrs, ProtoAttrs).
%% Conn stats %% Conn stats
stats(CPid) when is_pid(CPid) -> stats(CPid) when is_pid(CPid) ->

View File

@ -42,12 +42,15 @@ load(Env) ->
on_client_connected(#{client_id := ClientId, on_client_connected(#{client_id := ClientId,
username := Username, username := Username,
peername := {IpAddr, _}}, ConnAck, ConnAttrs, Env) -> peername := {IpAddr, _}}, ConnAck, ConnAttrs, Env) ->
Attrs = lists:filter(fun({K, _}) -> lists:member(K, ?ATTR_KEYS) end, ConnAttrs), Attrs = maps:filter(fun(K, _) ->
case emqx_json:safe_encode([{clientid, ClientId}, lists:member(K, ?ATTR_KEYS)
{username, Username}, end, ConnAttrs),
{ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))}, case emqx_json:safe_encode(Attrs#{clientid => ClientId,
{connack, ConnAck}, username => Username,
{ts, os:system_time(second)} | Attrs]) of ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)),
connack => ConnAck,
ts => os:system_time(second)
}) of
{ok, Payload} -> {ok, Payload} ->
emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
{error, Reason} -> {error, Reason} ->
@ -84,4 +87,3 @@ qos(Env) -> proplists:get_value(qos, Env, 0).
reason(Reason) when is_atom(Reason) -> Reason; reason(Reason) when is_atom(Reason) -> Reason;
reason({Error, _}) when is_atom(Error) -> Error; reason({Error, _}) when is_atom(Error) -> Error;
reason(_) -> internal_error. reason(_) -> internal_error.

View File

@ -68,7 +68,8 @@
ignore_loop, ignore_loop,
topic_alias_maximum, topic_alias_maximum,
conn_mod, conn_mod,
credentials credentials,
ws_cookie
}). }).
-opaque(state() :: #pstate{}). -opaque(state() :: #pstate{}).
@ -85,7 +86,9 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec(init(map(), list()) -> state()). -spec(init(map(), list()) -> state()).
init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> init(SocketOpts = #{ peername := Peername
, peercert := Peercert
, sendfun := SendFun}, Options) ->
Zone = proplists:get_value(zone, Options), Zone = proplists:get_value(zone, Options),
#pstate{zone = Zone, #pstate{zone = Zone,
sendfun = SendFun, sendfun = SendFun,
@ -110,7 +113,8 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
topic_alias_maximum = #{to_client => 0, from_client => 0}, topic_alias_maximum = #{to_client => 0, from_client => 0},
conn_mod = maps:get(conn_mod, SocketOpts, undefined), conn_mod = maps:get(conn_mod, SocketOpts, undefined),
credentials = #{}}. credentials = #{},
ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}.
init_username(Peercert, Options) -> init_username(Peercert, Options) ->
case proplists:get_value(peer_cert_as_username, Options) of case proplists:get_value(peer_cert_as_username, Options) of
@ -135,12 +139,13 @@ info(PState = #pstate{conn_props = ConnProps,
topic_aliases = Aliases, topic_aliases = Aliases,
will_msg = WillMsg, will_msg = WillMsg,
enable_acl = EnableAcl}) -> enable_acl = EnableAcl}) ->
attrs(PState) ++ [{conn_props, ConnProps}, maps:merge(attrs(PState), #{conn_props => ConnProps,
{ack_props, AckProps}, ack_props => AckProps,
{session, Session}, session => Session,
{topic_aliases, Aliases}, topic_aliases => Aliases,
{will_msg, WillMsg}, will_msg => WillMsg,
{enable_acl, EnableAcl}]. enable_acl => EnableAcl
}).
attrs(#pstate{zone = Zone, attrs(#pstate{zone = Zone,
client_id = ClientId, client_id = ClientId,
@ -155,20 +160,20 @@ attrs(#pstate{zone = Zone,
connected_at = ConnectedAt, connected_at = ConnectedAt,
conn_mod = ConnMod, conn_mod = ConnMod,
credentials = Credentials}) -> credentials = Credentials}) ->
[{zone, Zone}, #{ zone => Zone
{client_id, ClientId}, , client_id => ClientId
{username, Username}, , username => Username
{peername, Peername}, , peername => Peername
{peercert, Peercert}, , peercert => Peercert
{proto_ver, ProtoVer}, , proto_ver => ProtoVer
{proto_name, ProtoName}, , proto_name => ProtoName
{clean_start, CleanStart}, , clean_start => CleanStart
{keepalive, Keepalive}, , keepalive => Keepalive
{is_bridge, IsBridge}, , is_bridge => IsBridge
{connected_at, ConnectedAt}, , connected_at => ConnectedAt
{conn_mod, ConnMod}, , conn_mod => ConnMod
{credentials, Credentials} , credentials => Credentials
]. }.
attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
get_property('Receive-Maximum', ConnProps, 65535); get_property('Receive-Maximum', ConnProps, 65535);
@ -202,11 +207,13 @@ credentials(#pstate{zone = Zone,
client_id = ClientId, client_id = ClientId,
username = Username, username = Username,
peername = Peername, peername = Peername,
peercert = Peercert}) -> peercert = Peercert,
ws_cookie = WsCookie}) ->
with_cert(#{zone => Zone, with_cert(#{zone => Zone,
client_id => ClientId, client_id => ClientId,
username => Username, username => Username,
peername => Peername, peername => Peername,
ws_cookie => WsCookie,
mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert). mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert).
with_cert(Credentials, undefined) -> Credentials; with_cert(Credentials, undefined) -> Credentials;

View File

@ -61,11 +61,11 @@ info(#state{peername = Peername,
sockname = Sockname, sockname = Sockname,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
ProtoInfo = emqx_protocol:info(ProtoState), ProtoInfo = emqx_protocol:info(ProtoState),
ConnInfo = [{socktype, websocket}, ConnInfo = #{socktype => websocket,
{conn_state, running}, conn_state => running,
{peername, Peername}, peername => Peername,
{sockname, Sockname}], sockname => Sockname},
lists:append([ConnInfo, ProtoInfo]). maps:merge(ProtoInfo, ConnInfo).
%% for dashboard %% for dashboard
attrs(WSPid) when is_pid(WSPid) -> attrs(WSPid) when is_pid(WSPid) ->
@ -74,10 +74,10 @@ attrs(WSPid) when is_pid(WSPid) ->
attrs(#state{peername = Peername, attrs(#state{peername = Peername,
sockname = Sockname, sockname = Sockname,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername}, SockAttrs = #{peername => Peername,
{sockname, Sockname}], sockname => Sockname},
ProtoAttrs = emqx_protocol:attrs(ProtoState), ProtoAttrs = emqx_protocol:attrs(ProtoState),
lists:usort(lists:append(SockAttrs, ProtoAttrs)). maps:merge(SockAttrs, ProtoAttrs).
stats(WSPid) when is_pid(WSPid) -> stats(WSPid) when is_pid(WSPid) ->
call(WSPid, stats); call(WSPid, stats);
@ -138,10 +138,22 @@ websocket_init(#state{request = Req, options = Options}) ->
Peername = cowboy_req:peer(Req), Peername = cowboy_req:peer(Req),
Sockname = cowboy_req:sock(Req), Sockname = cowboy_req:sock(Req),
Peercert = cowboy_req:cert(Req), Peercert = cowboy_req:cert(Req),
WsCookie = try cowboy_req:parse_cookies(Req)
catch
error:badarg ->
?LOG(error, "[WS Connection] Illegal cookie"),
undefined;
Error:Reason ->
?LOG(error,
"[WS Connection] Cookie is parsed failed, Error: ~p, Reason ~p",
[Error, Reason]),
undefined
end,
ProtoState = emqx_protocol:init(#{peername => Peername, ProtoState = emqx_protocol:init(#{peername => Peername,
sockname => Sockname, sockname => Sockname,
peercert => Peercert, peercert => Peercert,
sendfun => send_fun(self()), sendfun => send_fun(self()),
ws_cookie => WsCookie,
conn_mod => ?MODULE}, Options), conn_mod => ?MODULE}, Options),
ParserState = emqx_protocol:parser(ProtoState), ParserState = emqx_protocol:parser(ProtoState),
Zone = proplists:get_value(zone, Options), Zone = proplists:get_value(zone, Options),

View File

@ -51,16 +51,16 @@ t_connect_api(_Config) ->
emqx_client:disconnect(T1). emqx_client:disconnect(T1).
t_info(ConnInfo) -> t_info(ConnInfo) ->
?assertEqual(tcp, proplists:get_value(socktype, ConnInfo)), ?assertEqual(tcp, maps:get(socktype, ConnInfo)),
?assertEqual(running, proplists:get_value(conn_state, ConnInfo)), ?assertEqual(running, maps:get(conn_state, ConnInfo)),
?assertEqual(<<"client1">>, proplists:get_value(client_id, ConnInfo)), ?assertEqual(<<"client1">>, maps:get(client_id, ConnInfo)),
?assertEqual(<<"testuser1">>, proplists:get_value(username, ConnInfo)), ?assertEqual(<<"testuser1">>, maps:get(username, ConnInfo)),
?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, ConnInfo)). ?assertEqual(<<"MQTT">>, maps:get(proto_name, ConnInfo)).
t_attrs(AttrsData) -> t_attrs(AttrsData) ->
?assertEqual(<<"client1">>, proplists:get_value(client_id, AttrsData)), ?assertEqual(<<"client1">>, maps:get(client_id, AttrsData)),
?assertEqual(emqx_connection, proplists:get_value(conn_mod, AttrsData)), ?assertEqual(emqx_connection, maps:get(conn_mod, AttrsData)),
?assertEqual(<<"testuser1">>, proplists:get_value(username, AttrsData)). ?assertEqual(<<"testuser1">>, maps:get(username, AttrsData)).
t_stats(StatsData) -> t_stats(StatsData) ->
?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0), ?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0),

View File

@ -73,16 +73,16 @@ raw_recv_pase(P) ->
version => ?MQTT_PROTO_V4} }). version => ?MQTT_PROTO_V4} }).
t_info(InfoData) -> t_info(InfoData) ->
?assertEqual(websocket, proplists:get_value(socktype, InfoData)), ?assertEqual(websocket, maps:get(socktype, InfoData)),
?assertEqual(running, proplists:get_value(conn_state, InfoData)), ?assertEqual(running, maps:get(conn_state, InfoData)),
?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, InfoData)), ?assertEqual(<<"mqtt_client">>, maps:get(client_id, InfoData)),
?assertEqual(<<"admin">>, proplists:get_value(username, InfoData)), ?assertEqual(<<"admin">>, maps:get(username, InfoData)),
?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, InfoData)). ?assertEqual(<<"MQTT">>, maps:get(proto_name, InfoData)).
t_attrs(AttrsData) -> t_attrs(AttrsData) ->
?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, AttrsData)), ?assertEqual(<<"mqtt_client">>, maps:get(client_id, AttrsData)),
?assertEqual(emqx_ws_connection, proplists:get_value(conn_mod, AttrsData)), ?assertEqual(emqx_ws_connection, maps:get(conn_mod, AttrsData)),
?assertEqual(<<"admin">>, proplists:get_value(username, AttrsData)). ?assertEqual(<<"admin">>, maps:get(username, AttrsData)).
t_stats(StatsData) -> t_stats(StatsData) ->
?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0), ?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0),

View File

@ -36,7 +36,7 @@ new(WsUrl, PPid) ->
addr = Addr, addr = Addr,
path = "/" ++ Path, path = "/" ++ Path,
ppid = PPid}, ppid = PPid},
spawn(fun () -> spawn(fun() ->
start_conn(State) start_conn(State)
end). end).