diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 7adf6064e..357eacb7b 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -174,7 +174,7 @@ delivery(Msg) -> %%------------------------------------------------------------------------------ route([], Delivery = #delivery{message = Msg}) -> - emqx_hooks:run('message.dropped', [node(), Msg]), + emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), inc_dropped_cnt(Msg#message.topic), Delivery; route([{To, Node}], Delivery) when Node =:= node() -> @@ -215,7 +215,7 @@ forward(Node, To, Delivery) -> dispatch(Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> case subscribers(Topic) of [] -> - emqx_hooks:run('message.dropped', [node(), Msg]), + emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), inc_dropped_cnt(Topic), Delivery; [Sub] -> %% optimize? dispatch(Sub, Topic, Msg), diff --git a/src/emqx_message.erl b/src/emqx_message.erl index da762703e..86f6a825f 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -22,6 +22,7 @@ -export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]). -export([set_headers/2]). -export([get_header/2, get_header/3, set_header/3]). +-export([format/1]). -spec(make(topic(), payload()) -> message()). make(Topic, Payload) -> @@ -55,10 +56,14 @@ get_flag(Flag, #message{flags = Flags}, Default) -> maps:get(Flag, Flags, Default). -spec(set_flag(message_flag(), message()) -> message()). +set_flag(Flag, Msg = #message{flags = undefined}) when is_atom(Flag) -> + Msg#message{flags = #{Flag => true}}; set_flag(Flag, Msg = #message{flags = Flags}) when is_atom(Flag) -> Msg#message{flags = maps:put(Flag, true, Flags)}. -spec(set_flag(message_flag(), boolean() | integer(), message()) -> message()). +set_flag(Flag, Val, Msg = #message{flags = undefined}) when is_atom(Flag) -> + Msg#message{flags = #{Flag => Val}}; set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) -> Msg#message{flags = maps:put(Flag, Val, Flags)}. @@ -83,3 +88,14 @@ set_header(Hdr, Val, Msg = #message{headers = undefined}) -> set_header(Hdr, Val, Msg = #message{headers = Headers}) -> Msg#message{headers = maps:put(Hdr, Val, Headers)}. +format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) -> + io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~s, Flags=~s, Headers=~s)", + [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]). + +format(_, undefined) -> + ""; +format(flags, Flags) -> + io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); +format(headers, Headers) -> + io_lib:format("~p", [Headers]). + diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index ef70dc28d..812c3267d 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -19,50 +19,48 @@ -include("emqx.hrl"). -export([load/1, unload/1]). --export([on_client_connected/3, on_client_disconnected/3]). +-export([on_client_connected/4, on_client_disconnected/3]). load(Env) -> - emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]), - emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]). + emqx_hooks:add('client.connected', fun ?MODULE:on_client_connected/4, [Env]), + emqx_hooks:add('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]). -on_client_connected(ConnAck, Client = #client{id = ClientId, - username = Username, - peername = {IpAddr, _} - %%clean_sess = CleanSess, - %%proto_ver = ProtoVer - }, Env) -> +on_client_connected(#{client_id := ClientId, + username := Username, + peername := {IpAddr, _}}, ConnAck, ConnInfo, Env) -> case emqx_json:safe_encode([{clientid, ClientId}, {username, Username}, {ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))}, - %%{clean_sess, CleanSess}, %%TODO:: fixme later - %%{protocol, ProtoVer}, + {clean_start, proplists:get_value(clean_start, ConnInfo)}, + {proto_ver, proplists:get_value(proto_ver, ConnInfo)}, + {proto_name, proplists:get_value(proto_name, ConnInfo)}, + {keepalive, proplists:get_value(keepalive, ConnInfo)}, {connack, ConnAck}, - {ts, emqx_time:now_secs()}]) of + {ts, os:system_time(second)}]) of {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> emqx_logger:error("[Presence Module] Json error: ~p", [Reason]) - end, - {ok, Client}. + end. -on_client_disconnected(Reason, #client{id = ClientId, username = Username}, Env) -> +on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, Env) -> case emqx_json:safe_encode([{clientid, ClientId}, {username, Username}, {reason, reason(Reason)}, - {ts, emqx_time:now_secs()}]) of + {ts, os:system_time(second)}]) of {ok, Payload} -> emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload)); {error, Reason} -> emqx_logger:error("[Presence Module] Json error: ~p", [Reason]) - end, ok. + end. unload(_Env) -> - emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3), - emqx:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3). + emqx_hooks:delete('client.connected', fun ?MODULE:on_client_connected/4), + emqx_hooks:delete('client.disconnected', fun ?MODULE:on_client_disconnected/3). message(QoS, Topic, Payload) -> Msg = emqx_message:make(?MODULE, QoS, Topic, iolist_to_binary(Payload)), - emqx_message:set_flags(#{sys => true}, Msg). + emqx_message:set_flag(sys, Msg). topic(connected, ClientId) -> emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/connected"])); diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 8ddd07f6c..2a92793eb 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -15,47 +15,31 @@ -module(emqx_mod_rewrite). -include_lib("emqx.hrl"). - -include_lib("emqx_mqtt.hrl"). -export([load/1, unload/1]). --export([rewrite_subscribe/4, rewrite_unsubscribe/4, rewrite_publish/2]). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- +-export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]). load(Rules0) -> Rules = compile(Rules0), - emqx:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Rules]), - emqx:hook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]), - emqx:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). + emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Rules]), + emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]), + emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). -rewrite_subscribe(_ClientId, _Username, TopicTable, Rules) -> - emqx_logger:info("Rewrite subscribe: ~p", [TopicTable]), +rewrite_subscribe(_Credentials, TopicTable, Rules) -> {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}. -rewrite_unsubscribe(_ClientId, _Username, TopicTable, Rules) -> - emqx_logger:info("Rewrite unsubscribe: ~p", [TopicTable]), +rewrite_unsubscribe(_Credentials, TopicTable, Rules) -> {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}. rewrite_publish(Message = #message{topic = Topic}, Rules) -> - %%TODO: this will not work if the client is always online. - RewriteTopic = - case get({rewrite, Topic}) of - undefined -> - DestTopic = match_rule(Topic, Rules), - put({rewrite, Topic}, DestTopic), DestTopic; - DestTopic -> - DestTopic - end, - {ok, Message#message{topic = RewriteTopic}}. + {ok, Message#message{topic = match_rule(Topic, Rules)}}. unload(_) -> - emqx:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/4), - emqx:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4), - emqx:unhook('message.publish', fun ?MODULE:rewrite_publish/2). + emqx_hooks:delete('client.subscribe', fun ?MODULE:rewrite_subscribe/3), + emqx_hooks:delete('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3), + emqx_hooks:delete('message.publish', fun ?MODULE:rewrite_publish/2). %%-------------------------------------------------------------------- %% Internal functions @@ -79,8 +63,7 @@ match_regx(Topic, MP, Dest) -> fun({Var, Val}, Acc) -> re:replace(Acc, Var, Val, [global]) end, Dest, Vars)); - nomatch -> - Topic + nomatch -> Topic end. compile(Rules) -> diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index 978b46a3b..b0da175c6 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -17,32 +17,26 @@ -behaviour(emqx_gen_mod). -include_lib("emqx.hrl"). - -include_lib("emqx_mqtt.hrl"). --export([load/1, on_client_connected/3, unload/1]). - --define(TAB, ?MODULE). +-export([load/1, on_session_created/3, unload/1]). %%-------------------------------------------------------------------- %% Load/Unload Hook %%-------------------------------------------------------------------- load(Topics) -> - emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Topics]). + emqx_hooks:add('session.created', fun ?MODULE:on_session_created/3, [Topics]). -on_client_connected(RC, Client = #client{id = ClientId, pid = ClientPid, username = Username}, Topics) - when RC < 16#80 -> - Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end, - TopicTable = [{Replace(Topic), QoS} || {Topic, QoS} <- Topics], - ClientPid ! {subscribe, TopicTable}, - {ok, Client}; - -on_client_connected(_ConnAck, _Client, _State) -> - ok. +on_session_created(#{client_id := ClientId}, SessInfo, Topics) -> + Username = proplists:get_value(username, SessInfo), + Replace = fun(Topic) -> + rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) + end, + emqx_session:subscribe(self(), [{Replace(Topic), #{qos => QoS}} || {Topic, QoS} <- Topics]). unload(_) -> - emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3). + emqx_hooks:delete('session.created', fun ?MODULE:on_session_created/3). %%-------------------------------------------------------------------- %% Internal functions diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 939252c3e..c6ab19c3c 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -22,6 +22,7 @@ -export([validate/1]). -export([format/1]). -export([to_message/2, from_message/2]). +-export([will_msg/1]). %% @doc Protocol name of version -spec(protocol_name(mqtt_version()) -> binary()). @@ -57,7 +58,7 @@ validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) -> validate(?PUBLISH_PACKET(_QoS, <<>>, _, _)) -> error(topic_name_invalid); validate(?PUBLISH_PACKET(_QoS, Topic, _, _)) -> - emqx_topic:wildcard(Topic) orelse error(topic_name_invalid); + (not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid); validate(_Packet) -> true. @@ -85,14 +86,15 @@ validate_qos(_) -> error(bad_qos). from_message(PacketId, Msg = #message{qos = QoS, topic = Topic, payload = Payload}) -> Dup = emqx_message:get_flag(dup, Msg, false), Retain = emqx_message:get_flag(retain, Msg, false), + Publish = #mqtt_packet_publish{topic_name = Topic, + packet_id = PacketId, + %% TODO: Properties + properties = #{}}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + dup = Dup, qos = QoS, - retain = Retain, - dup = Dup}, - variable = #mqtt_packet_publish{topic_name = Topic, - packet_id = PacketId, - properties = #{}}, %%TODO: - payload = Payload}. + retain = Retain}, + variable = Publish, payload = Payload}. %% @doc Message from Packet -spec(to_message(emqx_types:credentials(), mqtt_packet()) -> message()). @@ -106,19 +108,21 @@ to_message(#{client_id := ClientId, username := Username}, payload = Payload}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), Msg#message{flags = #{dup => Dup, retain => Retain}, - headers = merge_props(#{username => Username}, Props)}; + headers = merge_props(#{username => Username}, Props)}. -to_message(_Credentials, #mqtt_packet_connect{will_flag = false}) -> +-spec(will_msg(#mqtt_packet_connect{}) -> message()). +will_msg(#mqtt_packet_connect{will_flag = false}) -> undefined; -to_message(#{client_id := ClientId, username := Username}, - #mqtt_packet_connect{will_retain = Retain, - will_qos = QoS, - will_topic = Topic, - will_props = Props, - will_payload = Payload}) -> +will_msg(#mqtt_packet_connect{client_id = ClientId, + username = Username, + will_retain = Retain, + will_qos = QoS, + will_topic = Topic, + will_props = Properties, + will_payload = Payload}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), Msg#message{flags = #{dup => false, retain => Retain}, - headers = merge_props(#{username => Username}, Props)}. + headers = merge_props(#{username => Username}, Properties)}. merge_props(Headers, undefined) -> Headers; diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 9d59fff77..6ca559a56 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -112,13 +112,13 @@ set_username(_Username, PState) -> %%------------------------------------------------------------------------------ info(#pstate{zone = Zone, + client_id = ClientId, + username = Username, peername = Peername, proto_ver = ProtoVer, proto_name = ProtoName, - conn_props = ConnProps, - client_id = ClientId, - username = Username, clean_start = CleanStart, + conn_props = ConnProps, keepalive = Keepalive, mountpoint = Mountpoint, is_super = IsSuper, @@ -126,12 +126,12 @@ info(#pstate{zone = Zone, connected = Connected, connected_at = ConnectedAt}) -> [{zone, Zone}, + {client_id, ClientId}, + {username, Username}, {peername, Peername}, {proto_ver, ProtoVer}, {proto_name, ProtoName}, {conn_props, ConnProps}, - {client_id, ClientId}, - {username, Username}, {clean_start, CleanStart}, {keepalive, Keepalive}, {mountpoint, Mountpoint}, @@ -242,6 +242,10 @@ process_packet(?CONNECT_PACKET( username = Username, password = Password} = Connect), PState) -> + %% TODO: Mountpoint... + %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) + WillMsg = emqx_packet:will_msg(Connect), + PState1 = set_username(Username, PState#pstate{client_id = ClientId, proto_ver = ProtoVer, @@ -249,7 +253,7 @@ process_packet(?CONNECT_PACKET( clean_start = CleanStart, keepalive = Keepalive, conn_props = ConnProps, - will_msg = willmsg(Connect, PState), + will_msg = WillMsg, is_bridge = IsBridge, connected = true, connected_at = os:timestamp()}), @@ -379,10 +383,11 @@ process_packet(?PACKET(?DISCONNECT), PState) -> %%------------------------------------------------------------------------------ connack({?RC_SUCCESS, SP, PState}) -> - emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS]), + emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]), deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState)); connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> + emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]), _ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 -> ReasonCode; true -> @@ -637,13 +642,6 @@ shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) -> emqx_hooks:run('client.disconnected', [credentials(PState), Error]), emqx_cm:unregister_connection(ClientId). -willmsg(Packet, #pstate{client_id = ClientId, mountpoint = MountPoint}) - when is_record(Packet, mqtt_packet_connect) -> - case emqx_packet:to_message(ClientId, Packet) of - undefined -> undefined; - Msg -> emqx_mountpoint:mount(MountPoint, Msg) - end. - send_willmsg(undefined) -> ignore; send_willmsg(WillMsg) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 8c7795682..9719d1bca 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -73,11 +73,11 @@ %% Username username :: binary() | undefined, - %% Client pid binding with session - client_pid :: pid(), + %% Connection pid binding with session + conn_pid :: pid(), - %% Old client Pid that has been kickout - old_client_pid :: pid(), + %% Old Connection Pid that has been kickout + old_conn_pid :: pid(), %% Next packet id of the session next_pkt_id = 1 :: mqtt_packet_id(), @@ -145,7 +145,7 @@ -define(TIMEOUT, 60000). --define(INFO_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid, +-define(INFO_KEYS, [clean_start, client_id, username, binding, conn_pid, old_conn_pid, next_pkt_id, max_subscriptions, subscriptions, upgrade_qos, inflight, max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel, await_rel_timeout, expiry_interval, enable_stats, created_at]). @@ -306,19 +306,18 @@ close(SPid) -> init(#{zone := Zone, client_id := ClientId, - client_pid := ClientPid, - clean_start := CleanStart, username := Username, - %% TODO: + conn_pid := ConnPid, + clean_start := CleanStart, conn_props := _ConnProps}) -> process_flag(trap_exit, true), - true = link(ClientPid), + true = link(ConnPid), MaxInflight = get_env(Zone, max_inflight), State = #state{clean_start = CleanStart, - binding = binding(ClientPid), + binding = binding(ConnPid), client_id = ClientId, - client_pid = ClientPid, username = Username, + conn_pid = ConnPid, subscriptions = #{}, max_subscriptions = get_env(Zone, max_subscriptions, 0), upgrade_qos = get_env(Zone, upgrade_qos, false), @@ -335,7 +334,7 @@ init(#{zone := Zone, enqueue_stats = 0, created_at = os:timestamp()}, emqx_sm:register_session(ClientId, info(State)), - emqx_hooks:run('session.created', [ClientId]), + emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), {ok, ensure_stats_timer(State), hibernate}. init_mqueue(Zone, ClientId) -> @@ -346,12 +345,12 @@ init_mqueue(Zone, ClientId) -> binding(ClientPid) -> case node(ClientPid) =:= node() of true -> local; false -> remote end. -handle_call({discard, ClientPid}, _From, State = #state{client_pid = undefined}) -> - ?LOG(warning, "Discarded by ~p", [ClientPid], State), +handle_call({discard, ConnPid}, _From, State = #state{conn_pid = undefined}) -> + ?LOG(warning, "Discarded by ~p", [ConnPid], State), {stop, {shutdown, discard}, ok, State}; -handle_call({discard, ClientPid}, _From, State = #state{client_pid = OldClientPid}) -> - ?LOG(warning, " ~p kickout ~p", [ClientPid, OldClientPid], State), +handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) -> + ?LOG(warning, " ~p kickout ~p", [ConnPid, OldConnPid], State), {stop, {shutdown, conflict}, ok, State}; %% PUBLISH: @@ -418,11 +417,11 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, SubMap; {ok, _SubOpts} -> emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), - emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), maps:put(Topic, SubOpts, SubMap); error -> emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), maps:put(Topic, SubOpts, SubMap) end} end, {[], Subscriptions}, TopicFilters), @@ -437,7 +436,7 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, case maps:find(Topic, SubMap) of {ok, SubOpts} -> ok = emqx_broker:unsubscribe(Topic, ClientId), - emqx_hooks:run('session.unsubscribed', [ClientId, Topic, SubOpts]), + emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]), {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; error -> {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap} @@ -469,30 +468,28 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight end; %% RESUME: -handle_cast({resume, ClientPid}, - State = #state{client_id = ClientId, - client_pid = OldClientPid, - clean_start = CleanStart, - retry_timer = RetryTimer, - await_rel_timer = AwaitTimer, - expiry_timer = ExpireTimer}) -> +handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, + conn_pid = OldConnPid, + clean_start = CleanStart, + retry_timer = RetryTimer, + await_rel_timer = AwaitTimer, + expiry_timer = ExpireTimer}) -> - ?LOG(info, "Resumed by ~p ", [ClientPid], State), + ?LOG(info, "Resumed by connection ~p ", [ConnPid], State), %% Cancel Timers - lists:foreach(fun emqx_misc:cancel_timer/1, - [RetryTimer, AwaitTimer, ExpireTimer]), + lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]), - case kick(ClientId, OldClientPid, ClientPid) of - ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], State); + case kick(ClientId, OldConnPid, ConnPid) of + ok -> ?LOG(warning, "connection ~p kickout ~p", [ConnPid, OldConnPid], State); ignore -> ok end, - true = link(ClientPid), + true = link(ConnPid), - State1 = State#state{client_pid = ClientPid, - binding = binding(ClientPid), - old_client_pid = OldClientPid, + State1 = State#state{conn_pid = ConnPid, + binding = binding(ConnPid), + old_conn_pid = OldConnPid, clean_start = false, retry_timer = undefined, awaiting_rel = #{}, @@ -500,14 +497,9 @@ handle_cast({resume, ClientPid}, expiry_timer = undefined}, %% Clean Session: true -> false? - if - CleanStart =:= true -> - ?LOG(error, "CleanSess changed to false.", [], State1); - %%TODO:: - %%emqx_sm:register_session(ClientId, info(State1)); - CleanStart =:= false -> - ok - end, + CleanStart andalso emqx_sm:set_session_attrs(ClientId, info(State1)), + + emqx_hooks:run('session.resumed', [#{client_id => ClientId}, info(State)]), %% Replay delivery and Dequeue pending messages {noreply, ensure_stats_timer(dequeue(retry_delivery(true, State1)))}; @@ -534,7 +526,7 @@ handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when end}; %% Do nothing if the client has been disconnected. -handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) -> +handle_info({timeout, _Timer, retry_delivery}, State = #state{conn_pid = undefined}) -> {noreply, ensure_stats_timer(State#state{retry_timer = undefined})}; handle_info({timeout, _Timer, retry_delivery}, State) -> @@ -547,27 +539,25 @@ handle_info({timeout, _Timer, expired}, State) -> ?LOG(info, "Expired, shutdown now.", [], State), shutdown(expired, State); -handle_info({'EXIT', ClientPid, _Reason}, - State = #state{clean_start= true, client_pid = ClientPid}) -> +handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start= true, conn_pid = ConnPid}) -> {stop, normal, State}; -handle_info({'EXIT', ClientPid, Reason}, - State = #state{clean_start = false, - client_pid = ClientPid, - expiry_interval = Interval}) -> - ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State), +handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = false, + conn_pid = ConnPid, + expiry_interval = Interval}) -> + ?LOG(info, "Connection ~p EXIT for ~p", [ConnPid, Reason], State), ExpireTimer = emqx_misc:start_timer(Interval, expired), - State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer}, - {noreply, State1, hibernate}; + State1 = State#state{conn_pid = undefined, expiry_timer = ExpireTimer}, + {noreply, State1}; -handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) -> +handle_info({'EXIT', Pid, _Reason}, State = #state{old_conn_pid = Pid}) -> %% ignore {noreply, State, hibernate}; -handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) -> - ?LOG(error, "unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", - [ClientPid, Pid, Reason], State), - {noreply, State, hibernate}; +handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) -> + ?LOG(error, "unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p", + [ConnPid, Pid, Reason], State), + {noreply, State}; handle_info(emit_stats, State = #state{client_id = ClientId}) -> emqx_sm:set_session_stats(ClientId, stats(State)), @@ -577,8 +567,8 @@ handle_info(Info, State) -> emqx_logger:error("[Session] unexpected info: ~p", [Info]), {noreply, State}. -terminate(Reason, #state{client_id = ClientId, username = Username}) -> - emqx_hooks:run('session.terminated', [ClientId, Username, Reason]), +terminate(Reason, #state{client_id = ClientId}) -> + emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]), emqx_sm:unregister_session(ClientId). code_change(_OldVsn, State, _Extra) -> @@ -713,8 +703,8 @@ run_dispatch_steps([{subid, SubId}|Steps], Msg, State) -> run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State). %% Enqueue message if the client has been disconnected -dispatch(Msg, State = #state{client_id = ClientId, client_pid = undefined}) -> - case emqx_hooks:run('message.dropped', [ClientId, Msg]) of +dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) -> + case emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg]) of ok -> enqueue_msg(Msg, State); stop -> State end; @@ -747,12 +737,12 @@ redeliver({PacketId, Msg = #message{qos = QoS}}, State) -> true -> emqx_message:set_flag(dup, Msg) end, State); -redeliver({pubrel, PacketId}, #state{client_pid = Pid}) -> - Pid ! {deliver, {pubrel, PacketId}}. +redeliver({pubrel, PacketId}, #state{conn_pid = ConnPid}) -> + ConnPid ! {deliver, {pubrel, PacketId}}. -deliver(PacketId, Msg, #state{client_pid = Pid, binding = local}) -> +deliver(PacketId, Msg, #state{conn_pid = Pid, binding = local}) -> Pid ! {deliver, {publish, PacketId, Msg}}; -deliver(PacketId, Msg, #state{client_pid = Pid, binding = remote}) -> +deliver(PacketId, Msg, #state{conn_pid = Pid, binding = remote}) -> emqx_rpc:cast(node(Pid), erlang, send, [Pid, {deliver, PacketId, Msg}]). %%------------------------------------------------------------------------------ @@ -769,24 +759,20 @@ await(PacketId, Msg, State = #state{inflight = Inflight, end, State1#state{inflight = emqx_inflight:insert(PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight)}. -acked(puback, PacketId, State = #state{client_id = ClientId, - username = Username, - inflight = Inflight}) -> +acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, Msg, _Ts}} -> - emqx_hooks:run('message.acked', [ClientId, Username], Msg), + emqx_hooks:run('message.acked', [#{client_id =>ClientId}], Msg), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> ?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State), State end; -acked(pubrec, PacketId, State = #state{client_id = ClientId, - username = Username, - inflight = Inflight}) -> +acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, Msg, _Ts}} -> - emqx_hooks:run('message.acked', [ClientId, Username], Msg), + emqx_hooks:run('message.acked', [ClientId], Msg), State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)}; {value, {pubrel, PacketId, _Ts}} -> ?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State), @@ -804,7 +790,7 @@ acked(pubcomp, PacketId, State = #state{inflight = Inflight}) -> %%------------------------------------------------------------------------------ %% Do nothing if client is disconnected -dequeue(State = #state{client_pid = undefined}) -> +dequeue(State = #state{conn_pid = undefined}) -> State; dequeue(State = #state{inflight = Inflight}) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index e31adb141..577927b02 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -35,8 +35,6 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {session_pmon}). - -define(SM, ?MODULE). %% ETS Tables @@ -45,26 +43,22 @@ -define(SESSION_ATTRS_TAB, emqx_session_attrs). -define(SESSION_STATS_TAB, emqx_session_stats). --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?SM}, ?MODULE, [], []). %% @doc Open a session. -spec(open_session(map()) -> {ok, pid()} | {ok, pid(), boolean()} | {error, term()}). -open_session(Attrs = #{clean_start := true, - client_id := ClientId, - client_pid := ClientPid}) -> +open_session(Attrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) -> CleanStart = fun(_) -> - ok = discard_session(ClientId, ClientPid), + ok = discard_session(ClientId, ConnPid), emqx_session_sup:start_session(Attrs) end, emqx_sm_locker:trans(ClientId, CleanStart); -open_session(Attrs = #{clean_start := false, - client_id := ClientId, - client_pid := ClientPid}) -> +open_session(Attrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) -> ResumeStart = fun(_) -> - case resume_session(ClientId, ClientPid) of + case resume_session(ClientId, ConnPid) of {ok, SPid} -> {ok, SPid, true}; {error, not_found} -> @@ -80,34 +74,33 @@ open_session(Attrs = #{clean_start := false, discard_session(ClientId) when is_binary(ClientId) -> discard_session(ClientId, self()). -discard_session(ClientId, ClientPid) when is_binary(ClientId) -> - lists:foreach( - fun({_ClientId, SPid}) -> - case catch emqx_session:discard(SPid, ClientPid) of - {Err, Reason} when Err =:= 'EXIT'; Err =:= error -> - emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]); - ok -> ok - end - end, lookup_session(ClientId)). +discard_session(ClientId, ConnPid) when is_binary(ClientId) -> + lists:foreach(fun({_ClientId, SPid}) -> + case catch emqx_session:discard(SPid, ConnPid) of + {Err, Reason} when Err =:= 'EXIT'; Err =:= error -> + emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]); + ok -> ok + end + end, lookup_session(ClientId)). %% @doc Try to resume a session. -spec(resume_session(client_id()) -> {ok, pid()} | {error, term()}). resume_session(ClientId) -> resume_session(ClientId, self()). -resume_session(ClientId, ClientPid) -> +resume_session(ClientId, ConnPid) -> case lookup_session(ClientId) of [] -> {error, not_found}; [{_ClientId, SPid}] -> - ok = emqx_session:resume(SPid, ClientPid), + ok = emqx_session:resume(SPid, ConnPid), {ok, SPid}; Sessions -> [{_, SPid}|StaleSessions] = lists:reverse(Sessions), emqx_logger:error("[SM] More than one session found: ~p", [Sessions]), lists:foreach(fun({_, StalePid}) -> - catch emqx_session:discard(StalePid, ClientPid) + catch emqx_session:discard(StalePid, ConnPid) end, StaleSessions), - ok = emqx_session:resume(SPid, ClientPid), + ok = emqx_session:resume(SPid, ConnPid), {ok, SPid} end. @@ -224,11 +217,11 @@ handle_call(Req, _From, State) -> emqx_logger:error("[SM] unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({notify, {registered, ClientId, SPid}}, State = #state{session_pmon = PMon}) -> - {noreply, State#state{session_pmon = emqx_pmon:monitor(SPid, ClientId, PMon)}}; +handle_cast({notify, {registered, ClientId, SPid}}, State = #{session_pmon := PMon}) -> + {noreply, State#{session_pmon := emqx_pmon:monitor(SPid, ClientId, PMon)}}; -handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #state{session_pmon = PMon}) -> - {noreply, State#state{session_pmon = emqx_pmon:demonitor(SPid, PMon)}}; +handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #{session_pmon := PMon}) -> + {noreply, State#{session_pmon := emqx_pmon:demonitor(SPid, PMon)}}; handle_cast(Msg, State) -> emqx_logger:error("[SM] unexpected cast: ~p", [Msg]), @@ -236,7 +229,8 @@ handle_cast(Msg, State) -> handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{session_pmon := PMon}) -> case emqx_pmon:find(DownPid, PMon) of - undefined -> {noreply, State}; + undefined -> + {noreply, State}; ClientId -> unregister_session({ClientId, DownPid}), {noreply, State#{session_pmon := emqx_pmon:erase(DownPid, PMon)}} diff --git a/src/emqx_time.erl b/src/emqx_time.erl index 0d74168c4..83e9deff5 100644 --- a/src/emqx_time.erl +++ b/src/emqx_time.erl @@ -26,4 +26,5 @@ now_ms() -> erlang:system_time(millisecond). now_ms({MegaSecs, Secs, MicroSecs}) -> - (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). \ No newline at end of file + (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). +