From 981afd38e32d964ef2a261e4d5f3a587675a1a54 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 18 Sep 2019 19:58:12 +0800 Subject: [PATCH] Rewrite the 'presence' extended module --- src/emqx_mod_presence.erl | 114 ++++++++++++++++++---------------- src/emqx_mod_rewrite.erl | 5 +- src/emqx_mod_subscription.erl | 8 +-- 3 files changed, 68 insertions(+), 59 deletions(-) diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 3947c2c8c..a1032ea57 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -23,70 +23,74 @@ -logger_header("[Presence]"). -%% APIs --export([ on_client_connected/4 - , on_client_disconnected/3 - ]). - %% emqx_gen_mod callbacks -export([ load/1 , unload/1 ]). -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- +-export([ on_client_connected/4 + , on_client_disconnected/4 + ]). -load(_Env) -> - ok. - %% emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Env]}), - %% emqx_hooks:add('client.disconnected', {?MODULE, on_client_disconnected, [Env]}). - -on_client_connected(#{client_id := ClientId, - username := Username, - peername := {IpAddr, _} - }, ConnAck, - #{session := Session, - proto_name := ProtoName, - proto_ver := ProtoVer, - keepalive := Keepalive - }, Env) -> - case emqx_json:safe_encode(maps:merge(#{clientid => ClientId, - username => Username, - ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), - proto_name => ProtoName, - proto_ver => ProtoVer, - keepalive => Keepalive, - connack => ConnAck, - ts => erlang:system_time(millisecond) - }, maps:with([clean_start, expiry_interval], Session))) of - {ok, Payload} -> - emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); - {error, Reason} -> - ?LOG(error, "Encoding connected event error: ~p", [Reason]) - end. - - - - -on_client_disconnected(#{client_id := ClientId, - username := Username}, Reason, Env) -> - case emqx_json:safe_encode(#{clientid => ClientId, - username => Username, - reason => reason(Reason), - ts => erlang:system_time(millisecond) - }) of - {ok, Payload} -> - emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload)); - {error, Reason} -> - ?LOG(error, "Encoding disconnected event error: ~p", [Reason]) - end. +load(Env) -> + emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Env]}), + emqx_hooks:add('client.disconnected', {?MODULE, on_client_disconnected, [Env]}). unload(_Env) -> emqx_hooks:del('client.connected', {?MODULE, on_client_connected}), emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}). -message(QoS, Topic, Payload) -> +on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) -> + #{peerhost := PeerHost} = ClientInfo, + #{clean_start := CleanStart, + proto_name := ProtoName, + proto_ver := ProtoVer, + keepalive := Keepalive, + expiry_interval := ExpiryInterval} = ConnInfo, + ClientId = clientid(ClientInfo, ConnInfo), + Username = username(ClientInfo, ConnInfo), + Presence = #{clientid => ClientId, + username => Username, + ipaddress => ntoa(PeerHost), + proto_name => ProtoName, + proto_ver => ProtoVer, + keepalive => Keepalive, + connack => ConnAck, + clean_start => CleanStart, + expiry_interval => ExpiryInterval, + ts => emqx_time:now_ms() + }, + case emqx_json:safe_encode(Presence) of + {ok, Payload} -> + emqx_broker:safe_publish( + make_msg(qos(Env), topic(connected, ClientId), Payload)); + {error, _Reason} -> + ?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence]) + end. + +on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) -> + ClientId = clientid(ClientInfo, ConnInfo), + Username = username(ClientInfo, ConnInfo), + Presence = #{clientid => ClientId, + username => Username, + reason => reason(Reason), + ts => emqx_time:now_ms() + }, + case emqx_json:safe_encode(Presence) of + {ok, Payload} -> + emqx_broker:safe_publish( + make_msg(qos(Env), topic(disconnected, ClientId), Payload)); + {error, _Reason} -> + ?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence]) + end. + +clientid(#{client_id := undefined}, #{client_id := ClientId}) -> ClientId; +clientid(#{client_id := ClientId}, _ConnInfo) -> ClientId. + +username(#{username := undefined}, #{username := Username}) -> Username; +username(#{username := Username}, _ConnInfo) -> Username. + +make_msg(QoS, Topic, Payload) -> emqx_message:set_flag( sys, emqx_message:make( ?MODULE, QoS, Topic, iolist_to_binary(Payload))). @@ -99,6 +103,10 @@ topic(disconnected, ClientId) -> qos(Env) -> proplists:get_value(qos, Env, 0). reason(Reason) when is_atom(Reason) -> Reason; +reason({shutdown, Reason}) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; reason(_) -> internal_error. +-compile({inline, [ntoa/1]}). +ntoa(IpAddr) -> iolist_to_binary(esockd_net:ntoa(IpAddr)). + diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 17cff974a..7630831a9 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -47,10 +47,10 @@ load(RawRules) -> emqx_hooks:add('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [Rules]}), emqx_hooks:add('message.publish', {?MODULE, rewrite_publish, [Rules]}). -rewrite_subscribe(_Client, _Properties, TopicFilters, Rules) -> +rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}. -rewrite_unsubscribe(_Client, _Properties, TopicFilters, Rules) -> +rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}. rewrite_publish(Message = #message{topic = Topic}, Rules) -> @@ -91,3 +91,4 @@ compile(Rules) -> {ok, MP} = re:compile(Re), {rewrite, Topic, MP, Dest} end, Rules). + diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index f46ef0f0b..a42234856 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -21,14 +21,14 @@ -include_lib("emqx.hrl"). -include_lib("emqx_mqtt.hrl"). -%% APIs --export([on_client_connected/4]). - %% emqx_gen_mod callbacks -export([ load/1 , unload/1 ]). +%% APIs +-export([on_client_connected/4]). + %%-------------------------------------------------------------------- %% Load/Unload Hook %%-------------------------------------------------------------------- @@ -37,7 +37,7 @@ load(Topics) -> emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}). on_client_connected(#{client_id := ClientId, - username := Username}, ?RC_SUCCESS, _ConnAttrs, Topics) -> + username := Username}, ?RC_SUCCESS, _ConnInfo, Topics) -> Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end,