Convert value of attribute table to map

This commit is contained in:
Gilbert Wong 2019-04-13 10:35:59 +08:00
parent 81ef5b9b8d
commit f1616c33d9
8 changed files with 68 additions and 65 deletions

View File

@ -74,6 +74,6 @@ drop_bridge(Id) ->
ok ->
supervisor:delete_child(?SUP, Id);
Error ->
?LOG(error, "[Bridge] Delete bridge failed", [Error]),
?LOG(error, "[Bridge] Delete bridge failed, error : ~p", [Error]),
Error
end.

View File

@ -929,7 +929,7 @@ handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
{stop, {shutdown, Reason}, State};
handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) ->
?LOG(error, "[Client] State: ~s, Unexpected Event: (info, ~p)",
?LOG(info, "[Client] State: ~s, Unexpected Event: (info, ~p)",
[StateName, EventContent]),
keep_state_and_data;

View File

@ -87,15 +87,15 @@ info(#state{transport = Transport,
rate_limit = RateLimit,
pub_limit = PubLimit,
proto_state = ProtoState}) ->
ConnInfo = [{socktype, Transport:type(Socket)},
{peername, Peername},
{sockname, Sockname},
{conn_state, ConnState},
{active_n, ActiveN},
{rate_limit, rate_limit_info(RateLimit)},
{pub_limit, rate_limit_info(PubLimit)}],
ConnInfo = #{socktype => Transport:type(Socket),
peername => Peername,
sockname => Sockname,
conn_state => ConnState,
active_n => ActiveN,
rate_limit => rate_limit_info(RateLimit),
pub_limit => rate_limit_info(PubLimit)},
ProtoInfo = emqx_protocol:info(ProtoState),
lists:usort(lists:append(ConnInfo, ProtoInfo)).
maps:merge(ConnInfo, ProtoInfo).
rate_limit_info(undefined) ->
#{};
@ -109,10 +109,10 @@ attrs(CPid) when is_pid(CPid) ->
attrs(#state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername},
{sockname, Sockname}],
SockAttrs = #{peername => Peername,
sockname => Sockname},
ProtoAttrs = emqx_protocol:attrs(ProtoState),
lists:usort(lists:append(SockAttrs, ProtoAttrs)).
maps:merge(SockAttrs, ProtoAttrs).
%% Conn stats
stats(CPid) when is_pid(CPid) ->

View File

@ -42,12 +42,15 @@ load(Env) ->
on_client_connected(#{client_id := ClientId,
username := Username,
peername := {IpAddr, _}}, ConnAck, ConnAttrs, Env) ->
Attrs = lists:filter(fun({K, _}) -> lists:member(K, ?ATTR_KEYS) end, ConnAttrs),
case emqx_json:safe_encode([{clientid, ClientId},
{username, Username},
{ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))},
{connack, ConnAck},
{ts, os:system_time(second)} | Attrs]) of
Attrs = maps:filter(fun(K, _) ->
lists:member(K, ?ATTR_KEYS)
end, ConnAttrs),
case emqx_json:safe_encode(Attrs#{clientid => ClientId,
username => Username,
ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)),
connack => ConnAck,
ts => os:system_time(second)
}) of
{ok, Payload} ->
emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
{error, Reason} ->
@ -84,4 +87,3 @@ qos(Env) -> proplists:get_value(qos, Env, 0).
reason(Reason) when is_atom(Reason) -> Reason;
reason({Error, _}) when is_atom(Error) -> Error;
reason(_) -> internal_error.

View File

@ -139,12 +139,13 @@ info(PState = #pstate{conn_props = ConnProps,
topic_aliases = Aliases,
will_msg = WillMsg,
enable_acl = EnableAcl}) ->
attrs(PState) ++ [{conn_props, ConnProps},
{ack_props, AckProps},
{session, Session},
{topic_aliases, Aliases},
{will_msg, WillMsg},
{enable_acl, EnableAcl}].
maps:merge(attrs(PState), #{conn_props => ConnProps,
ack_props => AckProps,
session => Session,
topic_aliases => Aliases,
will_msg => WillMsg,
enable_acl => EnableAcl
}).
attrs(#pstate{zone = Zone,
client_id = ClientId,
@ -159,20 +160,20 @@ attrs(#pstate{zone = Zone,
connected_at = ConnectedAt,
conn_mod = ConnMod,
credentials = Credentials}) ->
[{zone, Zone},
{client_id, ClientId},
{username, Username},
{peername, Peername},
{peercert, Peercert},
{proto_ver, ProtoVer},
{proto_name, ProtoName},
{clean_start, CleanStart},
{keepalive, Keepalive},
{is_bridge, IsBridge},
{connected_at, ConnectedAt},
{conn_mod, ConnMod},
{credentials, Credentials}
].
#{ zone => Zone
, client_id => ClientId
, username => Username
, peername => Peername
, peercert => Peercert
, proto_ver => ProtoVer
, proto_name => ProtoName
, clean_start => CleanStart
, keepalive => Keepalive
, is_bridge => IsBridge
, connected_at => ConnectedAt
, conn_mod => ConnMod
, credentials => Credentials
}.
attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
get_property('Receive-Maximum', ConnProps, 65535);

View File

@ -61,11 +61,11 @@ info(#state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
ProtoInfo = emqx_protocol:info(ProtoState),
ConnInfo = [{socktype, websocket},
{conn_state, running},
{peername, Peername},
{sockname, Sockname}],
lists:append([ConnInfo, ProtoInfo]).
ConnInfo = #{socktype => websocket,
conn_state => running,
peername => Peername,
sockname => Sockname},
maps:merge(ProtoInfo, ConnInfo).
%% for dashboard
attrs(WSPid) when is_pid(WSPid) ->
@ -74,10 +74,10 @@ attrs(WSPid) when is_pid(WSPid) ->
attrs(#state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername},
{sockname, Sockname}],
SockAttrs = #{peername => Peername,
sockname => Sockname},
ProtoAttrs = emqx_protocol:attrs(ProtoState),
lists:usort(lists:append(SockAttrs, ProtoAttrs)).
maps:merge(SockAttrs, ProtoAttrs).
stats(WSPid) when is_pid(WSPid) ->
call(WSPid, stats);

View File

@ -51,16 +51,16 @@ t_connect_api(_Config) ->
emqx_client:disconnect(T1).
t_info(ConnInfo) ->
?assertEqual(tcp, proplists:get_value(socktype, ConnInfo)),
?assertEqual(running, proplists:get_value(conn_state, ConnInfo)),
?assertEqual(<<"client1">>, proplists:get_value(client_id, ConnInfo)),
?assertEqual(<<"testuser1">>, proplists:get_value(username, ConnInfo)),
?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, ConnInfo)).
?assertEqual(tcp, maps:get(socktype, ConnInfo)),
?assertEqual(running, maps:get(conn_state, ConnInfo)),
?assertEqual(<<"client1">>, maps:get(client_id, ConnInfo)),
?assertEqual(<<"testuser1">>, maps:get(username, ConnInfo)),
?assertEqual(<<"MQTT">>, maps:get(proto_name, ConnInfo)).
t_attrs(AttrsData) ->
?assertEqual(<<"client1">>, proplists:get_value(client_id, AttrsData)),
?assertEqual(emqx_connection, proplists:get_value(conn_mod, AttrsData)),
?assertEqual(<<"testuser1">>, proplists:get_value(username, AttrsData)).
?assertEqual(<<"client1">>, maps:get(client_id, AttrsData)),
?assertEqual(emqx_connection, maps:get(conn_mod, AttrsData)),
?assertEqual(<<"testuser1">>, maps:get(username, AttrsData)).
t_stats(StatsData) ->
?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0),

View File

@ -73,16 +73,16 @@ raw_recv_pase(P) ->
version => ?MQTT_PROTO_V4} }).
t_info(InfoData) ->
?assertEqual(websocket, proplists:get_value(socktype, InfoData)),
?assertEqual(running, proplists:get_value(conn_state, InfoData)),
?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, InfoData)),
?assertEqual(<<"admin">>, proplists:get_value(username, InfoData)),
?assertEqual(<<"MQTT">>, proplists:get_value(proto_name, InfoData)).
?assertEqual(websocket, maps:get(socktype, InfoData)),
?assertEqual(running, maps:get(conn_state, InfoData)),
?assertEqual(<<"mqtt_client">>, maps:get(client_id, InfoData)),
?assertEqual(<<"admin">>, maps:get(username, InfoData)),
?assertEqual(<<"MQTT">>, maps:get(proto_name, InfoData)).
t_attrs(AttrsData) ->
?assertEqual(<<"mqtt_client">>, proplists:get_value(client_id, AttrsData)),
?assertEqual(emqx_ws_connection, proplists:get_value(conn_mod, AttrsData)),
?assertEqual(<<"admin">>, proplists:get_value(username, AttrsData)).
?assertEqual(<<"mqtt_client">>, maps:get(client_id, AttrsData)),
?assertEqual(emqx_ws_connection, maps:get(conn_mod, AttrsData)),
?assertEqual(<<"admin">>, maps:get(username, AttrsData)).
t_stats(StatsData) ->
?assertEqual(true, proplists:get_value(recv_oct, StatsData) >= 0),