Improve the hooks design for emqx 3.0

This commit is contained in:
Feng Lee 2018-08-29 17:27:56 +08:00
parent 594819b752
commit 1a7d60a7e3
10 changed files with 174 additions and 200 deletions

View File

@ -174,7 +174,7 @@ delivery(Msg) ->
%%------------------------------------------------------------------------------
route([], Delivery = #delivery{message = Msg}) ->
emqx_hooks:run('message.dropped', [node(), Msg]),
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
inc_dropped_cnt(Msg#message.topic), Delivery;
route([{To, Node}], Delivery) when Node =:= node() ->
@ -215,7 +215,7 @@ forward(Node, To, Delivery) ->
dispatch(Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
case subscribers(Topic) of
[] ->
emqx_hooks:run('message.dropped', [node(), Msg]),
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
inc_dropped_cnt(Topic), Delivery;
[Sub] -> %% optimize?
dispatch(Sub, Topic, Msg),

View File

@ -22,6 +22,7 @@
-export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]).
-export([set_headers/2]).
-export([get_header/2, get_header/3, set_header/3]).
-export([format/1]).
-spec(make(topic(), payload()) -> message()).
make(Topic, Payload) ->
@ -55,10 +56,14 @@ get_flag(Flag, #message{flags = Flags}, Default) ->
maps:get(Flag, Flags, Default).
-spec(set_flag(message_flag(), message()) -> message()).
set_flag(Flag, Msg = #message{flags = undefined}) when is_atom(Flag) ->
Msg#message{flags = #{Flag => true}};
set_flag(Flag, Msg = #message{flags = Flags}) when is_atom(Flag) ->
Msg#message{flags = maps:put(Flag, true, Flags)}.
-spec(set_flag(message_flag(), boolean() | integer(), message()) -> message()).
set_flag(Flag, Val, Msg = #message{flags = undefined}) when is_atom(Flag) ->
Msg#message{flags = #{Flag => Val}};
set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) ->
Msg#message{flags = maps:put(Flag, Val, Flags)}.
@ -83,3 +88,14 @@ set_header(Hdr, Val, Msg = #message{headers = undefined}) ->
set_header(Hdr, Val, Msg = #message{headers = Headers}) ->
Msg#message{headers = maps:put(Hdr, Val, Headers)}.
format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) ->
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~s, Flags=~s, Headers=~s)",
[Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]).
format(_, undefined) ->
"";
format(flags, Flags) ->
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
format(headers, Headers) ->
io_lib:format("~p", [Headers]).

View File

@ -19,50 +19,48 @@
-include("emqx.hrl").
-export([load/1, unload/1]).
-export([on_client_connected/3, on_client_disconnected/3]).
-export([on_client_connected/4, on_client_disconnected/3]).
load(Env) ->
emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]).
emqx_hooks:add('client.connected', fun ?MODULE:on_client_connected/4, [Env]),
emqx_hooks:add('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]).
on_client_connected(ConnAck, Client = #client{id = ClientId,
username = Username,
peername = {IpAddr, _}
%%clean_sess = CleanSess,
%%proto_ver = ProtoVer
}, Env) ->
on_client_connected(#{client_id := ClientId,
username := Username,
peername := {IpAddr, _}}, ConnAck, ConnInfo, Env) ->
case emqx_json:safe_encode([{clientid, ClientId},
{username, Username},
{ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))},
%%{clean_sess, CleanSess}, %%TODO:: fixme later
%%{protocol, ProtoVer},
{clean_start, proplists:get_value(clean_start, ConnInfo)},
{proto_ver, proplists:get_value(proto_ver, ConnInfo)},
{proto_name, proplists:get_value(proto_name, ConnInfo)},
{keepalive, proplists:get_value(keepalive, ConnInfo)},
{connack, ConnAck},
{ts, emqx_time:now_secs()}]) of
{ts, os:system_time(second)}]) of
{ok, Payload} ->
emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
{error, Reason} ->
emqx_logger:error("[Presence Module] Json error: ~p", [Reason])
end,
{ok, Client}.
end.
on_client_disconnected(Reason, #client{id = ClientId, username = Username}, Env) ->
on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, Env) ->
case emqx_json:safe_encode([{clientid, ClientId},
{username, Username},
{reason, reason(Reason)},
{ts, emqx_time:now_secs()}]) of
{ts, os:system_time(second)}]) of
{ok, Payload} ->
emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload));
{error, Reason} ->
emqx_logger:error("[Presence Module] Json error: ~p", [Reason])
end, ok.
end.
unload(_Env) ->
emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3),
emqx:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3).
emqx_hooks:delete('client.connected', fun ?MODULE:on_client_connected/4),
emqx_hooks:delete('client.disconnected', fun ?MODULE:on_client_disconnected/3).
message(QoS, Topic, Payload) ->
Msg = emqx_message:make(?MODULE, QoS, Topic, iolist_to_binary(Payload)),
emqx_message:set_flags(#{sys => true}, Msg).
emqx_message:set_flag(sys, Msg).
topic(connected, ClientId) ->
emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/connected"]));

View File

@ -15,47 +15,31 @@
-module(emqx_mod_rewrite).
-include_lib("emqx.hrl").
-include_lib("emqx_mqtt.hrl").
-export([load/1, unload/1]).
-export([rewrite_subscribe/4, rewrite_unsubscribe/4, rewrite_publish/2]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]).
load(Rules0) ->
Rules = compile(Rules0),
emqx:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Rules]),
emqx:hook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]),
emqx:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]).
emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Rules]),
emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]),
emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]).
rewrite_subscribe(_ClientId, _Username, TopicTable, Rules) ->
emqx_logger:info("Rewrite subscribe: ~p", [TopicTable]),
rewrite_subscribe(_Credentials, TopicTable, Rules) ->
{ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}.
rewrite_unsubscribe(_ClientId, _Username, TopicTable, Rules) ->
emqx_logger:info("Rewrite unsubscribe: ~p", [TopicTable]),
rewrite_unsubscribe(_Credentials, TopicTable, Rules) ->
{ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}.
rewrite_publish(Message = #message{topic = Topic}, Rules) ->
%%TODO: this will not work if the client is always online.
RewriteTopic =
case get({rewrite, Topic}) of
undefined ->
DestTopic = match_rule(Topic, Rules),
put({rewrite, Topic}, DestTopic), DestTopic;
DestTopic ->
DestTopic
end,
{ok, Message#message{topic = RewriteTopic}}.
{ok, Message#message{topic = match_rule(Topic, Rules)}}.
unload(_) ->
emqx:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/4),
emqx:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4),
emqx:unhook('message.publish', fun ?MODULE:rewrite_publish/2).
emqx_hooks:delete('client.subscribe', fun ?MODULE:rewrite_subscribe/3),
emqx_hooks:delete('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3),
emqx_hooks:delete('message.publish', fun ?MODULE:rewrite_publish/2).
%%--------------------------------------------------------------------
%% Internal functions
@ -79,8 +63,7 @@ match_regx(Topic, MP, Dest) ->
fun({Var, Val}, Acc) ->
re:replace(Acc, Var, Val, [global])
end, Dest, Vars));
nomatch ->
Topic
nomatch -> Topic
end.
compile(Rules) ->

View File

@ -17,32 +17,26 @@
-behaviour(emqx_gen_mod).
-include_lib("emqx.hrl").
-include_lib("emqx_mqtt.hrl").
-export([load/1, on_client_connected/3, unload/1]).
-define(TAB, ?MODULE).
-export([load/1, on_session_created/3, unload/1]).
%%--------------------------------------------------------------------
%% Load/Unload Hook
%%--------------------------------------------------------------------
load(Topics) ->
emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Topics]).
emqx_hooks:add('session.created', fun ?MODULE:on_session_created/3, [Topics]).
on_client_connected(RC, Client = #client{id = ClientId, pid = ClientPid, username = Username}, Topics)
when RC < 16#80 ->
Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end,
TopicTable = [{Replace(Topic), QoS} || {Topic, QoS} <- Topics],
ClientPid ! {subscribe, TopicTable},
{ok, Client};
on_client_connected(_ConnAck, _Client, _State) ->
ok.
on_session_created(#{client_id := ClientId}, SessInfo, Topics) ->
Username = proplists:get_value(username, SessInfo),
Replace = fun(Topic) ->
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
end,
emqx_session:subscribe(self(), [{Replace(Topic), #{qos => QoS}} || {Topic, QoS} <- Topics]).
unload(_) ->
emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3).
emqx_hooks:delete('session.created', fun ?MODULE:on_session_created/3).
%%--------------------------------------------------------------------
%% Internal functions

View File

@ -22,6 +22,7 @@
-export([validate/1]).
-export([format/1]).
-export([to_message/2, from_message/2]).
-export([will_msg/1]).
%% @doc Protocol name of version
-spec(protocol_name(mqtt_version()) -> binary()).
@ -57,7 +58,7 @@ validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) ->
validate(?PUBLISH_PACKET(_QoS, <<>>, _, _)) ->
error(topic_name_invalid);
validate(?PUBLISH_PACKET(_QoS, Topic, _, _)) ->
emqx_topic:wildcard(Topic) orelse error(topic_name_invalid);
(not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid);
validate(_Packet) ->
true.
@ -85,14 +86,15 @@ validate_qos(_) -> error(bad_qos).
from_message(PacketId, Msg = #message{qos = QoS, topic = Topic, payload = Payload}) ->
Dup = emqx_message:get_flag(dup, Msg, false),
Retain = emqx_message:get_flag(retain, Msg, false),
Publish = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId,
%% TODO: Properties
properties = #{}},
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
dup = Dup,
qos = QoS,
retain = Retain,
dup = Dup},
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId,
properties = #{}}, %%TODO:
payload = Payload}.
retain = Retain},
variable = Publish, payload = Payload}.
%% @doc Message from Packet
-spec(to_message(emqx_types:credentials(), mqtt_packet()) -> message()).
@ -106,19 +108,21 @@ to_message(#{client_id := ClientId, username := Username},
payload = Payload}) ->
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
Msg#message{flags = #{dup => Dup, retain => Retain},
headers = merge_props(#{username => Username}, Props)};
headers = merge_props(#{username => Username}, Props)}.
to_message(_Credentials, #mqtt_packet_connect{will_flag = false}) ->
-spec(will_msg(#mqtt_packet_connect{}) -> message()).
will_msg(#mqtt_packet_connect{will_flag = false}) ->
undefined;
to_message(#{client_id := ClientId, username := Username},
#mqtt_packet_connect{will_retain = Retain,
will_qos = QoS,
will_topic = Topic,
will_props = Props,
will_payload = Payload}) ->
will_msg(#mqtt_packet_connect{client_id = ClientId,
username = Username,
will_retain = Retain,
will_qos = QoS,
will_topic = Topic,
will_props = Properties,
will_payload = Payload}) ->
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
Msg#message{flags = #{dup => false, retain => Retain},
headers = merge_props(#{username => Username}, Props)}.
headers = merge_props(#{username => Username}, Properties)}.
merge_props(Headers, undefined) ->
Headers;

View File

@ -112,13 +112,13 @@ set_username(_Username, PState) ->
%%------------------------------------------------------------------------------
info(#pstate{zone = Zone,
client_id = ClientId,
username = Username,
peername = Peername,
proto_ver = ProtoVer,
proto_name = ProtoName,
conn_props = ConnProps,
client_id = ClientId,
username = Username,
clean_start = CleanStart,
conn_props = ConnProps,
keepalive = Keepalive,
mountpoint = Mountpoint,
is_super = IsSuper,
@ -126,12 +126,12 @@ info(#pstate{zone = Zone,
connected = Connected,
connected_at = ConnectedAt}) ->
[{zone, Zone},
{client_id, ClientId},
{username, Username},
{peername, Peername},
{proto_ver, ProtoVer},
{proto_name, ProtoName},
{conn_props, ConnProps},
{client_id, ClientId},
{username, Username},
{clean_start, CleanStart},
{keepalive, Keepalive},
{mountpoint, Mountpoint},
@ -242,6 +242,10 @@ process_packet(?CONNECT_PACKET(
username = Username,
password = Password} = Connect), PState) ->
%% TODO: Mountpoint...
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
WillMsg = emqx_packet:will_msg(Connect),
PState1 = set_username(Username,
PState#pstate{client_id = ClientId,
proto_ver = ProtoVer,
@ -249,7 +253,7 @@ process_packet(?CONNECT_PACKET(
clean_start = CleanStart,
keepalive = Keepalive,
conn_props = ConnProps,
will_msg = willmsg(Connect, PState),
will_msg = WillMsg,
is_bridge = IsBridge,
connected = true,
connected_at = os:timestamp()}),
@ -379,10 +383,11 @@ process_packet(?PACKET(?DISCONNECT), PState) ->
%%------------------------------------------------------------------------------
connack({?RC_SUCCESS, SP, PState}) ->
emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS]),
emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]),
deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState));
connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]),
_ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 ->
ReasonCode;
true ->
@ -637,13 +642,6 @@ shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) ->
emqx_hooks:run('client.disconnected', [credentials(PState), Error]),
emqx_cm:unregister_connection(ClientId).
willmsg(Packet, #pstate{client_id = ClientId, mountpoint = MountPoint})
when is_record(Packet, mqtt_packet_connect) ->
case emqx_packet:to_message(ClientId, Packet) of
undefined -> undefined;
Msg -> emqx_mountpoint:mount(MountPoint, Msg)
end.
send_willmsg(undefined) ->
ignore;
send_willmsg(WillMsg) ->

View File

@ -73,11 +73,11 @@
%% Username
username :: binary() | undefined,
%% Client pid binding with session
client_pid :: pid(),
%% Connection pid binding with session
conn_pid :: pid(),
%% Old client Pid that has been kickout
old_client_pid :: pid(),
%% Old Connection Pid that has been kickout
old_conn_pid :: pid(),
%% Next packet id of the session
next_pkt_id = 1 :: mqtt_packet_id(),
@ -145,7 +145,7 @@
-define(TIMEOUT, 60000).
-define(INFO_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid,
-define(INFO_KEYS, [clean_start, client_id, username, binding, conn_pid, old_conn_pid,
next_pkt_id, max_subscriptions, subscriptions, upgrade_qos, inflight,
max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel,
await_rel_timeout, expiry_interval, enable_stats, created_at]).
@ -306,19 +306,18 @@ close(SPid) ->
init(#{zone := Zone,
client_id := ClientId,
client_pid := ClientPid,
clean_start := CleanStart,
username := Username,
%% TODO:
conn_pid := ConnPid,
clean_start := CleanStart,
conn_props := _ConnProps}) ->
process_flag(trap_exit, true),
true = link(ClientPid),
true = link(ConnPid),
MaxInflight = get_env(Zone, max_inflight),
State = #state{clean_start = CleanStart,
binding = binding(ClientPid),
binding = binding(ConnPid),
client_id = ClientId,
client_pid = ClientPid,
username = Username,
conn_pid = ConnPid,
subscriptions = #{},
max_subscriptions = get_env(Zone, max_subscriptions, 0),
upgrade_qos = get_env(Zone, upgrade_qos, false),
@ -335,7 +334,7 @@ init(#{zone := Zone,
enqueue_stats = 0,
created_at = os:timestamp()},
emqx_sm:register_session(ClientId, info(State)),
emqx_hooks:run('session.created', [ClientId]),
emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
{ok, ensure_stats_timer(State), hibernate}.
init_mqueue(Zone, ClientId) ->
@ -346,12 +345,12 @@ init_mqueue(Zone, ClientId) ->
binding(ClientPid) ->
case node(ClientPid) =:= node() of true -> local; false -> remote end.
handle_call({discard, ClientPid}, _From, State = #state{client_pid = undefined}) ->
?LOG(warning, "Discarded by ~p", [ClientPid], State),
handle_call({discard, ConnPid}, _From, State = #state{conn_pid = undefined}) ->
?LOG(warning, "Discarded by ~p", [ConnPid], State),
{stop, {shutdown, discard}, ok, State};
handle_call({discard, ClientPid}, _From, State = #state{client_pid = OldClientPid}) ->
?LOG(warning, " ~p kickout ~p", [ClientPid, OldClientPid], State),
handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) ->
?LOG(warning, " ~p kickout ~p", [ConnPid, OldConnPid], State),
{stop, {shutdown, conflict}, ok, State};
%% PUBLISH:
@ -418,11 +417,11 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
SubMap;
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
maps:put(Topic, SubOpts, SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
maps:put(Topic, SubOpts, SubMap)
end}
end, {[], Subscriptions}, TopicFilters),
@ -437,7 +436,7 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
ok = emqx_broker:unsubscribe(Topic, ClientId),
emqx_hooks:run('session.unsubscribed', [ClientId, Topic, SubOpts]),
emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]),
{[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)};
error ->
{[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap}
@ -469,30 +468,28 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
end;
%% RESUME:
handle_cast({resume, ClientPid},
State = #state{client_id = ClientId,
client_pid = OldClientPid,
clean_start = CleanStart,
retry_timer = RetryTimer,
await_rel_timer = AwaitTimer,
expiry_timer = ExpireTimer}) ->
handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
conn_pid = OldConnPid,
clean_start = CleanStart,
retry_timer = RetryTimer,
await_rel_timer = AwaitTimer,
expiry_timer = ExpireTimer}) ->
?LOG(info, "Resumed by ~p ", [ClientPid], State),
?LOG(info, "Resumed by connection ~p ", [ConnPid], State),
%% Cancel Timers
lists:foreach(fun emqx_misc:cancel_timer/1,
[RetryTimer, AwaitTimer, ExpireTimer]),
lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]),
case kick(ClientId, OldClientPid, ClientPid) of
ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], State);
case kick(ClientId, OldConnPid, ConnPid) of
ok -> ?LOG(warning, "connection ~p kickout ~p", [ConnPid, OldConnPid], State);
ignore -> ok
end,
true = link(ClientPid),
true = link(ConnPid),
State1 = State#state{client_pid = ClientPid,
binding = binding(ClientPid),
old_client_pid = OldClientPid,
State1 = State#state{conn_pid = ConnPid,
binding = binding(ConnPid),
old_conn_pid = OldConnPid,
clean_start = false,
retry_timer = undefined,
awaiting_rel = #{},
@ -500,14 +497,9 @@ handle_cast({resume, ClientPid},
expiry_timer = undefined},
%% Clean Session: true -> false?
if
CleanStart =:= true ->
?LOG(error, "CleanSess changed to false.", [], State1);
%%TODO::
%%emqx_sm:register_session(ClientId, info(State1));
CleanStart =:= false ->
ok
end,
CleanStart andalso emqx_sm:set_session_attrs(ClientId, info(State1)),
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, info(State)]),
%% Replay delivery and Dequeue pending messages
{noreply, ensure_stats_timer(dequeue(retry_delivery(true, State1)))};
@ -534,7 +526,7 @@ handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when
end};
%% Do nothing if the client has been disconnected.
handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) ->
handle_info({timeout, _Timer, retry_delivery}, State = #state{conn_pid = undefined}) ->
{noreply, ensure_stats_timer(State#state{retry_timer = undefined})};
handle_info({timeout, _Timer, retry_delivery}, State) ->
@ -547,27 +539,25 @@ handle_info({timeout, _Timer, expired}, State) ->
?LOG(info, "Expired, shutdown now.", [], State),
shutdown(expired, State);
handle_info({'EXIT', ClientPid, _Reason},
State = #state{clean_start= true, client_pid = ClientPid}) ->
handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start= true, conn_pid = ConnPid}) ->
{stop, normal, State};
handle_info({'EXIT', ClientPid, Reason},
State = #state{clean_start = false,
client_pid = ClientPid,
expiry_interval = Interval}) ->
?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = false,
conn_pid = ConnPid,
expiry_interval = Interval}) ->
?LOG(info, "Connection ~p EXIT for ~p", [ConnPid, Reason], State),
ExpireTimer = emqx_misc:start_timer(Interval, expired),
State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer},
{noreply, State1, hibernate};
State1 = State#state{conn_pid = undefined, expiry_timer = ExpireTimer},
{noreply, State1};
handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) ->
handle_info({'EXIT', Pid, _Reason}, State = #state{old_conn_pid = Pid}) ->
%% ignore
{noreply, State, hibernate};
handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) ->
?LOG(error, "unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
[ClientPid, Pid, Reason], State),
{noreply, State, hibernate};
handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
?LOG(error, "unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
[ConnPid, Pid, Reason], State),
{noreply, State};
handle_info(emit_stats, State = #state{client_id = ClientId}) ->
emqx_sm:set_session_stats(ClientId, stats(State)),
@ -577,8 +567,8 @@ handle_info(Info, State) ->
emqx_logger:error("[Session] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(Reason, #state{client_id = ClientId, username = Username}) ->
emqx_hooks:run('session.terminated', [ClientId, Username, Reason]),
terminate(Reason, #state{client_id = ClientId}) ->
emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
emqx_sm:unregister_session(ClientId).
code_change(_OldVsn, State, _Extra) ->
@ -713,8 +703,8 @@ run_dispatch_steps([{subid, SubId}|Steps], Msg, State) ->
run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).
%% Enqueue message if the client has been disconnected
dispatch(Msg, State = #state{client_id = ClientId, client_pid = undefined}) ->
case emqx_hooks:run('message.dropped', [ClientId, Msg]) of
dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) ->
case emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg]) of
ok -> enqueue_msg(Msg, State);
stop -> State
end;
@ -747,12 +737,12 @@ redeliver({PacketId, Msg = #message{qos = QoS}}, State) ->
true -> emqx_message:set_flag(dup, Msg)
end, State);
redeliver({pubrel, PacketId}, #state{client_pid = Pid}) ->
Pid ! {deliver, {pubrel, PacketId}}.
redeliver({pubrel, PacketId}, #state{conn_pid = ConnPid}) ->
ConnPid ! {deliver, {pubrel, PacketId}}.
deliver(PacketId, Msg, #state{client_pid = Pid, binding = local}) ->
deliver(PacketId, Msg, #state{conn_pid = Pid, binding = local}) ->
Pid ! {deliver, {publish, PacketId, Msg}};
deliver(PacketId, Msg, #state{client_pid = Pid, binding = remote}) ->
deliver(PacketId, Msg, #state{conn_pid = Pid, binding = remote}) ->
emqx_rpc:cast(node(Pid), erlang, send, [Pid, {deliver, PacketId, Msg}]).
%%------------------------------------------------------------------------------
@ -769,24 +759,20 @@ await(PacketId, Msg, State = #state{inflight = Inflight,
end,
State1#state{inflight = emqx_inflight:insert(PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight)}.
acked(puback, PacketId, State = #state{client_id = ClientId,
username = Username,
inflight = Inflight}) ->
acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, Msg, _Ts}} ->
emqx_hooks:run('message.acked', [ClientId, Username], Msg),
emqx_hooks:run('message.acked', [#{client_id =>ClientId}], Msg),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State),
State
end;
acked(pubrec, PacketId, State = #state{client_id = ClientId,
username = Username,
inflight = Inflight}) ->
acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, Msg, _Ts}} ->
emqx_hooks:run('message.acked', [ClientId, Username], Msg),
emqx_hooks:run('message.acked', [ClientId], Msg),
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
{value, {pubrel, PacketId, _Ts}} ->
?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State),
@ -804,7 +790,7 @@ acked(pubcomp, PacketId, State = #state{inflight = Inflight}) ->
%%------------------------------------------------------------------------------
%% Do nothing if client is disconnected
dequeue(State = #state{client_pid = undefined}) ->
dequeue(State = #state{conn_pid = undefined}) ->
State;
dequeue(State = #state{inflight = Inflight}) ->

View File

@ -35,8 +35,6 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-record(state, {session_pmon}).
-define(SM, ?MODULE).
%% ETS Tables
@ -45,26 +43,22 @@
-define(SESSION_ATTRS_TAB, emqx_session_attrs).
-define(SESSION_STATS_TAB, emqx_session_stats).
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
-spec(start_link() -> emqx_types:startlink_ret()).
start_link() ->
gen_server:start_link({local, ?SM}, ?MODULE, [], []).
%% @doc Open a session.
-spec(open_session(map()) -> {ok, pid()} | {ok, pid(), boolean()} | {error, term()}).
open_session(Attrs = #{clean_start := true,
client_id := ClientId,
client_pid := ClientPid}) ->
open_session(Attrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) ->
CleanStart = fun(_) ->
ok = discard_session(ClientId, ClientPid),
ok = discard_session(ClientId, ConnPid),
emqx_session_sup:start_session(Attrs)
end,
emqx_sm_locker:trans(ClientId, CleanStart);
open_session(Attrs = #{clean_start := false,
client_id := ClientId,
client_pid := ClientPid}) ->
open_session(Attrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) ->
ResumeStart = fun(_) ->
case resume_session(ClientId, ClientPid) of
case resume_session(ClientId, ConnPid) of
{ok, SPid} ->
{ok, SPid, true};
{error, not_found} ->
@ -80,34 +74,33 @@ open_session(Attrs = #{clean_start := false,
discard_session(ClientId) when is_binary(ClientId) ->
discard_session(ClientId, self()).
discard_session(ClientId, ClientPid) when is_binary(ClientId) ->
lists:foreach(
fun({_ClientId, SPid}) ->
case catch emqx_session:discard(SPid, ClientPid) of
{Err, Reason} when Err =:= 'EXIT'; Err =:= error ->
emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]);
ok -> ok
end
end, lookup_session(ClientId)).
discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
lists:foreach(fun({_ClientId, SPid}) ->
case catch emqx_session:discard(SPid, ConnPid) of
{Err, Reason} when Err =:= 'EXIT'; Err =:= error ->
emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]);
ok -> ok
end
end, lookup_session(ClientId)).
%% @doc Try to resume a session.
-spec(resume_session(client_id()) -> {ok, pid()} | {error, term()}).
resume_session(ClientId) ->
resume_session(ClientId, self()).
resume_session(ClientId, ClientPid) ->
resume_session(ClientId, ConnPid) ->
case lookup_session(ClientId) of
[] -> {error, not_found};
[{_ClientId, SPid}] ->
ok = emqx_session:resume(SPid, ClientPid),
ok = emqx_session:resume(SPid, ConnPid),
{ok, SPid};
Sessions ->
[{_, SPid}|StaleSessions] = lists:reverse(Sessions),
emqx_logger:error("[SM] More than one session found: ~p", [Sessions]),
lists:foreach(fun({_, StalePid}) ->
catch emqx_session:discard(StalePid, ClientPid)
catch emqx_session:discard(StalePid, ConnPid)
end, StaleSessions),
ok = emqx_session:resume(SPid, ClientPid),
ok = emqx_session:resume(SPid, ConnPid),
{ok, SPid}
end.
@ -224,11 +217,11 @@ handle_call(Req, _From, State) ->
emqx_logger:error("[SM] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({notify, {registered, ClientId, SPid}}, State = #state{session_pmon = PMon}) ->
{noreply, State#state{session_pmon = emqx_pmon:monitor(SPid, ClientId, PMon)}};
handle_cast({notify, {registered, ClientId, SPid}}, State = #{session_pmon := PMon}) ->
{noreply, State#{session_pmon := emqx_pmon:monitor(SPid, ClientId, PMon)}};
handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #state{session_pmon = PMon}) ->
{noreply, State#state{session_pmon = emqx_pmon:demonitor(SPid, PMon)}};
handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #{session_pmon := PMon}) ->
{noreply, State#{session_pmon := emqx_pmon:demonitor(SPid, PMon)}};
handle_cast(Msg, State) ->
emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
@ -236,7 +229,8 @@ handle_cast(Msg, State) ->
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{session_pmon := PMon}) ->
case emqx_pmon:find(DownPid, PMon) of
undefined -> {noreply, State};
undefined ->
{noreply, State};
ClientId ->
unregister_session({ClientId, DownPid}),
{noreply, State#{session_pmon := emqx_pmon:erase(DownPid, PMon)}}

View File

@ -26,4 +26,5 @@ now_ms() ->
erlang:system_time(millisecond).
now_ms({MegaSecs, Secs, MicroSecs}) ->
(MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).
(MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).