Rewrite the 'presence' extended module

This commit is contained in:
Feng Lee 2019-09-18 19:58:12 +08:00
parent 8404fce6a6
commit 981afd38e3
3 changed files with 68 additions and 59 deletions

View File

@ -23,70 +23,74 @@
-logger_header("[Presence]").
%% APIs
-export([ on_client_connected/4
, on_client_disconnected/3
]).
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-export([ on_client_connected/4
, on_client_disconnected/4
]).
load(_Env) ->
ok.
%% emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Env]}),
%% emqx_hooks:add('client.disconnected', {?MODULE, on_client_disconnected, [Env]}).
on_client_connected(#{client_id := ClientId,
username := Username,
peername := {IpAddr, _}
}, ConnAck,
#{session := Session,
proto_name := ProtoName,
proto_ver := ProtoVer,
keepalive := Keepalive
}, Env) ->
case emqx_json:safe_encode(maps:merge(#{clientid => ClientId,
username => Username,
ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)),
proto_name => ProtoName,
proto_ver => ProtoVer,
keepalive => Keepalive,
connack => ConnAck,
ts => erlang:system_time(millisecond)
}, maps:with([clean_start, expiry_interval], Session))) of
{ok, Payload} ->
emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
{error, Reason} ->
?LOG(error, "Encoding connected event error: ~p", [Reason])
end.
on_client_disconnected(#{client_id := ClientId,
username := Username}, Reason, Env) ->
case emqx_json:safe_encode(#{clientid => ClientId,
username => Username,
reason => reason(Reason),
ts => erlang:system_time(millisecond)
}) of
{ok, Payload} ->
emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload));
{error, Reason} ->
?LOG(error, "Encoding disconnected event error: ~p", [Reason])
end.
load(Env) ->
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Env]}),
emqx_hooks:add('client.disconnected', {?MODULE, on_client_disconnected, [Env]}).
unload(_Env) ->
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}),
emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
message(QoS, Topic, Payload) ->
on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) ->
#{peerhost := PeerHost} = ClientInfo,
#{clean_start := CleanStart,
proto_name := ProtoName,
proto_ver := ProtoVer,
keepalive := Keepalive,
expiry_interval := ExpiryInterval} = ConnInfo,
ClientId = clientid(ClientInfo, ConnInfo),
Username = username(ClientInfo, ConnInfo),
Presence = #{clientid => ClientId,
username => Username,
ipaddress => ntoa(PeerHost),
proto_name => ProtoName,
proto_ver => ProtoVer,
keepalive => Keepalive,
connack => ConnAck,
clean_start => CleanStart,
expiry_interval => ExpiryInterval,
ts => emqx_time:now_ms()
},
case emqx_json:safe_encode(Presence) of
{ok, Payload} ->
emqx_broker:safe_publish(
make_msg(qos(Env), topic(connected, ClientId), Payload));
{error, _Reason} ->
?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence])
end.
on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
ClientId = clientid(ClientInfo, ConnInfo),
Username = username(ClientInfo, ConnInfo),
Presence = #{clientid => ClientId,
username => Username,
reason => reason(Reason),
ts => emqx_time:now_ms()
},
case emqx_json:safe_encode(Presence) of
{ok, Payload} ->
emqx_broker:safe_publish(
make_msg(qos(Env), topic(disconnected, ClientId), Payload));
{error, _Reason} ->
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence])
end.
clientid(#{client_id := undefined}, #{client_id := ClientId}) -> ClientId;
clientid(#{client_id := ClientId}, _ConnInfo) -> ClientId.
username(#{username := undefined}, #{username := Username}) -> Username;
username(#{username := Username}, _ConnInfo) -> Username.
make_msg(QoS, Topic, Payload) ->
emqx_message:set_flag(
sys, emqx_message:make(
?MODULE, QoS, Topic, iolist_to_binary(Payload))).
@ -99,6 +103,10 @@ topic(disconnected, ClientId) ->
qos(Env) -> proplists:get_value(qos, Env, 0).
reason(Reason) when is_atom(Reason) -> Reason;
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
reason({Error, _}) when is_atom(Error) -> Error;
reason(_) -> internal_error.
-compile({inline, [ntoa/1]}).
ntoa(IpAddr) -> iolist_to_binary(esockd_net:ntoa(IpAddr)).

View File

@ -47,10 +47,10 @@ load(RawRules) ->
emqx_hooks:add('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [Rules]}),
emqx_hooks:add('message.publish', {?MODULE, rewrite_publish, [Rules]}).
rewrite_subscribe(_Client, _Properties, TopicFilters, Rules) ->
rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
{ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
rewrite_unsubscribe(_Client, _Properties, TopicFilters, Rules) ->
rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
{ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
rewrite_publish(Message = #message{topic = Topic}, Rules) ->
@ -91,3 +91,4 @@ compile(Rules) ->
{ok, MP} = re:compile(Re),
{rewrite, Topic, MP, Dest}
end, Rules).

View File

@ -21,14 +21,14 @@
-include_lib("emqx.hrl").
-include_lib("emqx_mqtt.hrl").
%% APIs
-export([on_client_connected/4]).
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
]).
%% APIs
-export([on_client_connected/4]).
%%--------------------------------------------------------------------
%% Load/Unload Hook
%%--------------------------------------------------------------------
@ -37,7 +37,7 @@ load(Topics) ->
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
on_client_connected(#{client_id := ClientId,
username := Username}, ?RC_SUCCESS, _ConnAttrs, Topics) ->
username := Username}, ?RC_SUCCESS, _ConnInfo, Topics) ->
Replace = fun(Topic) ->
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
end,