From 4974eab20e2fd91b8ef7c57e8aa4b1834c17655b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 25 Jun 2019 14:53:45 +0800 Subject: [PATCH] Improve the channel design --- src/emqx_broker.erl | 2 +- src/{emqx_connection.erl => emqx_channel.erl} | 187 ++-- src/emqx_frame.erl | 22 +- src/emqx_inflight.erl | 4 +- src/emqx_protocol.erl | 574 +++++------ src/emqx_session.erl | 971 +++++++----------- src/emqx_shared_sub.erl | 10 +- ..._ws_connection.erl => emqx_ws_channel.erl} | 31 +- 8 files changed, 757 insertions(+), 1044 deletions(-) rename src/{emqx_connection.erl => emqx_channel.erl} (77%) rename src/{emqx_ws_connection.erl => emqx_ws_channel.erl} (91%) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 6c3b6163a..6e74bcfa4 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -286,7 +286,7 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> case erlang:is_process_alive(SubPid) of true -> - SubPid ! {dispatch, Topic, Msg}, + SubPid ! {deliver, Topic, Msg}, 1; false -> 0 end; diff --git a/src/emqx_connection.erl b/src/emqx_channel.erl similarity index 77% rename from src/emqx_connection.erl rename to src/emqx_channel.erl index 65612b0a2..be192f22e 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_channel.erl @@ -14,6 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- +%% MQTT TCP/SSL Channel -module(emqx_channel). -behaviour(gen_statem). @@ -21,6 +22,7 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). +-include("types.hrl"). -logger_header("[Channel]"). @@ -32,16 +34,10 @@ , stats/1 ]). --export([ kick/1 - , discard/1 - , takeover/1 - ]). - --export([session/1]). - %% gen_statem callbacks -export([ idle/3 , connected/3 + , disconnected/3 ]). -export([ init/1 @@ -51,28 +47,32 @@ ]). -record(state, { - transport, - socket, - peername, - sockname, - conn_state, - active_n, - proto_state, - parse_state, - gc_state, - keepalive, - rate_limit, - pub_limit, - limit_timer, - enable_stats, - stats_timer, - idle_timeout + transport :: esockd:transport(), + socket :: esockd:sock(), + peername :: {inet:ip_address(), inet:port_number()}, + sockname :: {inet:ip_address(), inet:port_number()}, + conn_state :: running | blocked, + active_n :: pos_integer(), + rate_limit :: maybe(esockd_rate_limit:bucket()), + pub_limit :: maybe(esockd_rate_limit:bucket()), + limit_timer :: maybe(reference()), + serializer :: emqx_frame:serializer(), %% TODO: remove it later. + parse_state :: emqx_frame:parse_state(), + proto_state :: emqx_protocol:protocol(), + gc_state :: emqx_gc:gc_state(), + keepalive :: maybe(reference()), + enable_stats :: boolean(), + stats_timer :: maybe(reference()), + idle_timeout :: timeout() }). -define(ACTIVE_N, 100). -define(HANDLE(T, C, D), handle((T), (C), (D))). +-define(CHAN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). +-spec(start_link(esockd:transport(), esockd:sock(), proplists:proplist()) + -> {ok, pid()}). start_link(Transport, Socket, Options) -> {ok, proc_lib:spawn_link(?MODULE, init, [{Transport, Socket, Options}])}. @@ -126,28 +126,13 @@ attrs(#state{peername = Peername, stats(CPid) when is_pid(CPid) -> call(CPid, stats); -stats(#state{transport = Transport, - socket = Socket, - proto_state = ProtoState}) -> +stats(#state{transport = Transport, socket = Socket}) -> SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of {ok, Ss} -> Ss; {error, _} -> [] end, - lists:append([SockStats, - emqx_misc:proc_stats(), - emqx_protocol:stats(ProtoState)]). - -kick(CPid) -> - call(CPid, kick). - -discard(CPid) -> - call(CPid, discard). - -takeover(CPid) -> - call(CPid, takeover). - -session(CPid) -> - call(CPid, session). + ChanStats = [{Name, emqx_pd:get_counter(Name)} || Name <- ?CHAN_STATS], + lists:append([SockStats, ChanStats, emqx_misc:proc_stats()]). call(CPid, Req) -> gen_statem:call(CPid, Req, infinity). @@ -166,23 +151,15 @@ init({Transport, RawSocket, Options}) -> RateLimit = init_limiter(proplists:get_value(rate_limit, Options)), PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)), ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), - SendFun = fun(Packet, Opts) -> - Data = emqx_frame:serialize(Packet, Opts), - case Transport:async_send(Socket, Data) of - ok -> {ok, Data}; - {error, Reason} -> - {error, Reason} - end - end, + MaxSize = emqx_zone:get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE), + ParseState = emqx_frame:initial_parse_state(#{max_size => MaxSize}), ProtoState = emqx_protocol:init(#{peername => Peername, sockname => Sockname, peercert => Peercert, - sendfun => SendFun, conn_mod => ?MODULE}, Options), - MaxSize = emqx_zone:get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE), - ParseState = emqx_frame:initial_parse_state(#{max_size => MaxSize}), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), GcState = emqx_gc:init(GcPolicy), + ok = emqx_misc:init_proc_mng_policy(Zone), EnableStats = emqx_zone:get_env(Zone, enable_stats, true), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), State = #state{transport = Transport, @@ -192,13 +169,12 @@ init({Transport, RawSocket, Options}) -> active_n = ActiveN, rate_limit = RateLimit, pub_limit = PubLimit, - proto_state = ProtoState, parse_state = ParseState, + proto_state = ProtoState, gc_state = GcState, enable_stats = EnableStats, idle_timeout = IdleTimout }, - ok = emqx_misc:init_proc_mng_policy(Zone), gen_statem:enter_loop(?MODULE, [{hibernate_after, 2 * IdleTimout}], idle, State, self(), [IdleTimout]). @@ -218,12 +194,17 @@ idle(enter, _, State) -> keep_state_and_data; idle(timeout, _Timeout, State) -> - {stop, idle_timeout, State}; + stop(idle_timeout, State); + +idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnVar)}, State) -> + #mqtt_packet_connect{proto_ver = ProtoVer} = ConnVar, + Serializer = emqx_frame:init_serializer(#{version => ProtoVer}), + NState = State#state{serializer = Serializer}, + handle_incoming(Packet, fun(St) -> {next_state, connected, St} end, NState); idle(cast, {incoming, Packet}, State) -> - handle_incoming(Packet, fun(NState) -> - {next_state, connected, NState} - end, State); + ?LOG(warning, "Unexpected incoming: ~p", [Packet]), + shutdown(unexpected_incoming_packet, State); idle(EventType, Content, State) -> ?HANDLE(EventType, Content, State). @@ -235,18 +216,23 @@ connected(enter, _, _State) -> %% What to do? keep_state_and_data; -%% Handle Input +connected(cast, {incoming, Packet = ?PACKET(?CONNECT)}, State) -> + ?LOG(warning, "Unexpected connect: ~p", [Packet]), + shutdown(unexpected_incoming_connect, State); + connected(cast, {incoming, Packet = ?PACKET(Type)}, State) -> ok = emqx_metrics:inc_recv(Packet), (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1), - handle_incoming(Packet, fun(NState) -> {keep_state, NState} end, State); + handle_incoming(Packet, fun(St) -> {keep_state, St} end, State); -%% Handle Output -connected(info, {deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> - case emqx_protocol:deliver(PubOrAck, ProtoState) of +%% Handle delivery +connected(info, Devliery = {deliver, _Topic, Msg}, State = #state{proto_state = ProtoState}) -> + case emqx_protocol:handle_out(Devliery, ProtoState) of {ok, NProtoState} -> + {keep_state, State#state{proto_state = NProtoState}}; + {ok, Packet, NProtoState} -> NState = State#state{proto_state = NProtoState}, - {keep_state, maybe_gc(PubOrAck, NState)}; + handle_outgoing(Packet, fun(St) -> {keep_state, St} end, NState); {error, Reason} -> shutdown(Reason, State) end; @@ -281,6 +267,16 @@ connected(info, {keepalive, check}, State = #state{keepalive = KeepAlive}) -> connected(EventType, Content, State) -> ?HANDLE(EventType, Content, State). +%%-------------------------------------------------------------------- +%% Disconnected State + +disconnected(enter, _, _State) -> + %% TODO: What to do? + keep_state_and_data; + +disconnected(EventType, Content, State) -> + ?HANDLE(EventType, Content, State). + %% Handle call handle({call, From}, info, State) -> reply(From, info(State), State); @@ -299,9 +295,6 @@ handle({call, From}, discard, State) -> ok = gen_statem:reply(From, ok), shutdown(discard, State); -handle({call, From}, session, State = #state{proto_state = ProtoState}) -> - reply(From, emqx_protocol:session(ProtoState), State); - handle({call, From}, Req, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), reply(From, ignored, State); @@ -312,7 +305,8 @@ handle(cast, Msg, State) -> {keep_state, State}; %% Handle Incoming -handle(info, {Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> +handle(info, {Inet, _Sock, Data}, State) when Inet == tcp; + Inet == ssl -> Oct = iolist_size(Data), ?LOG(debug, "RECV ~p", [Data]), emqx_pd:update_counter(incoming_bytes, Oct), @@ -350,7 +344,7 @@ handle(info, {inet_reply, _Sock, {error, Reason}}, State) -> handle(info, {timeout, Timer, emit_stats}, State = #state{stats_timer = Timer, proto_state = ProtoState, - gc_state = GcState}) -> + gc_state = GcState}) -> ClientId = emqx_protocol:client_id(ProtoState), emqx_cm:set_conn_stats(ClientId, stats(State)), NState = State#state{stats_timer = undefined}, @@ -390,15 +384,9 @@ terminate(Reason, _StateName, #state{transport = Transport, keepalive = KeepAlive, proto_state = ProtoState}) -> ?LOG(debug, "Terminated for ~p", [Reason]), - Transport:fast_close(Socket), - emqx_keepalive:cancel(KeepAlive), - case {ProtoState, Reason} of - {undefined, _} -> ok; - {_, {shutdown, Error}} -> - emqx_protocol:terminate(Error, ProtoState); - {_, Reason} -> - emqx_protocol:terminate(Reason, ProtoState) - end. + ok = Transport:fast_close(Socket), + ok = emqx_keepalive:cancel(KeepAlive), + emqx_protocol:terminate(Reason, ProtoState). %%-------------------------------------------------------------------- %% Process incoming data @@ -431,10 +419,16 @@ next_events(Packet) -> %%-------------------------------------------------------------------- %% Handle incoming packet -handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> - case emqx_protocol:received(Packet, ProtoState) of +handle_incoming(Packet = ?PACKET(Type), SuccFun, + State = #state{proto_state = ProtoState}) -> + _ = inc_incoming_stats(Type), + ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), + case emqx_protocol:handle_in(Packet, ProtoState) of {ok, NProtoState} -> SuccFun(State#state{proto_state = NProtoState}); + {ok, OutPacket, NProtoState} -> + handle_outgoing(OutPacket, SuccFun, + State#state{proto_state = NProtoState}); {error, Reason} -> shutdown(Reason, State); {error, Reason, NProtoState} -> @@ -443,6 +437,22 @@ handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> stop(Error, State#state{proto_state = NProtoState}) end. +%%-------------------------------------------------------------------- +%% Handle outgoing packet + +handle_outgoing(Packet = ?PACKET(Type), SuccFun, + State = #state{transport = Transport, + socket = Socket, + serializer = Serializer}) -> + _ = inc_outgoing_stats(Type), + ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), + Data = Serializer(Packet), + case Transport:async_send(Socket, Data) of + ok -> SuccFun(State); + {error, Reason} -> + shutdown(Reason, State) + end. + %%-------------------------------------------------------------------- %% Ensure rate limit @@ -465,6 +475,12 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1) end. +%% start_keepalive(0, _PState) -> +%% ignore; +%% start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 -> +%% Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75), +%% self() ! {keepalive, start, round(Secs * Backoff)}. + %%-------------------------------------------------------------------- %% Activate socket @@ -479,6 +495,17 @@ activate_socket(#state{transport = Transport, socket = Socket, active_n = N}) -> ok end. +%%-------------------------------------------------------------------- +%% Inc incoming/outgoing stats + +inc_incoming_stats(Type) -> + emqx_pd:update_counter(recv_pkt, 1), + Type =:= ?PUBLISH andalso emqx_pd:update_counter(recv_msg, 1). + +inc_outgoing_stats(Type) -> + emqx_pd:update_counter(send_pkt, 1), + Type =:= ?PUBLISH andalso emqx_pd:update_counter(send_msg, 1). + %%-------------------------------------------------------------------- %% Ensure stats timer diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index a4ef34840..3c46e50db 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -21,6 +21,7 @@ -export([ initial_parse_state/0 , initial_parse_state/1 + , init_serializer/1 ]). -export([ parse/1 @@ -29,22 +30,22 @@ , serialize/2 ]). +-export_type([ options/0 + , parse_state/0 + , parse_result/0 + ]). + -type(options() :: #{max_size => 1..?MAX_PACKET_SIZE, - version => emqx_mqtt_types:version() + version => emqx_mqtt:version() }). -opaque(parse_state() :: {none, options()} | {more, cont_fun()}). -opaque(parse_result() :: {ok, parse_state()} - | {ok, emqx_mqtt_types:packet(), binary(), parse_state()}). + | {ok, emqx_mqtt:packet(), binary(), parse_state()}). -type(cont_fun() :: fun((binary()) -> parse_result())). --export_type([ options/0 - , parse_state/0 - , parse_result/0 - ]). - -define(none(Opts), {none, Opts}). -define(more(Cont), {more, Cont}). -define(DEFAULT_OPTIONS, @@ -385,11 +386,14 @@ parse_binary_data(<>) -> %% Serialize MQTT Packet %%-------------------------------------------------------------------- --spec(serialize(emqx_mqtt_types:packet()) -> iodata()). +init_serializer(Options) -> + fun(Packet) -> serialize(Packet, Options) end. + +-spec(serialize(emqx_mqtt:packet()) -> iodata()). serialize(Packet) -> serialize(Packet, ?DEFAULT_OPTIONS). --spec(serialize(emqx_mqtt_types:packet(), options()) -> iodata()). +-spec(serialize(emqx_mqtt:packet(), options()) -> iodata()). serialize(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, Options) when is_map(Options) -> diff --git a/src/emqx_inflight.erl b/src/emqx_inflight.erl index 4522b3510..88c4796c4 100644 --- a/src/emqx_inflight.erl +++ b/src/emqx_inflight.erl @@ -33,6 +33,8 @@ , window/1 ]). +-export_type([inflight/0]). + -type(key() :: term()). -type(max_size() :: pos_integer()). @@ -43,8 +45,6 @@ -define(Inflight(MaxSize, Tree), {?MODULE, MaxSize, (Tree)}). --export_type([inflight/0]). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index f7a337541..699963644 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -14,11 +14,13 @@ %% limitations under the License. %%-------------------------------------------------------------------- +%% MQTT Protocol -module(emqx_protocol). -include("emqx.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). +-include("types.hrl"). -logger_header("[Protocol]"). @@ -27,53 +29,49 @@ , attr/2 , caps/1 , caps/2 - , stats/1 , client_id/1 , credentials/1 , session/1 ]). -export([ init/2 - , received/2 - , process/2 - , deliver/2 - , send/2 + , handle_in/2 + , handle_out/2 + , handle_timeout/3 , terminate/2 ]). --record(pstate, { - zone, +-export_type([protocol/0]). + +-record(protocol, { + zone :: emqx_zone:zone(), + conn_mod :: module(), sendfun, sockname, peername, peercert, - proto_ver, + proto_ver :: emqx_mqtt:version(), proto_name, - client_id, + client_id :: maybe(emqx_types:client_id()), is_assigned, + username :: maybe(emqx_types:username()), conn_props, ack_props, - username, - session, + credentials :: map(), + session :: maybe(emqx_session:session()), clean_start, topic_aliases, will_topic, will_msg, keepalive, - is_bridge, - recv_stats, - send_stats, - connected, - connected_at, + is_bridge :: boolean(), + connected :: boolean(), + connected_at :: erlang:timestamp(), topic_alias_maximum, - conn_mod, - credentials, ws_cookie - }). + }). --opaque(state() :: #pstate{}). - --export_type([state/0]). +-opaque(protocol() :: #protocol{}). -ifdef(TEST). -compile(export_all). @@ -86,33 +84,31 @@ %% Init %%-------------------------------------------------------------------- --spec(init(map(), list()) -> state()). +-spec(init(map(), list()) -> protocol()). init(SocketOpts = #{sockname := Sockname, peername := Peername, - peercert := Peercert, - sendfun := SendFun}, Options) -> + peercert := Peercert}, Options) -> Zone = proplists:get_value(zone, Options), - #pstate{zone = Zone, - sendfun = SendFun, - sockname = Sockname, - peername = Peername, - peercert = Peercert, - proto_ver = ?MQTT_PROTO_V4, - proto_name = <<"MQTT">>, - client_id = <<>>, - is_assigned = false, - username = init_username(Peercert, Options), - clean_start = false, - topic_aliases = #{}, - is_bridge = false, - recv_stats = #{msg => 0, pkt => 0}, - send_stats = #{msg => 0, pkt => 0}, - connected = false, - %% TODO: ...? - topic_alias_maximum = #{to_client => 0, from_client => 0}, - conn_mod = maps:get(conn_mod, SocketOpts, undefined), - credentials = #{}, - ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}. + #protocol{zone = Zone, + %%sendfun = SendFun, + sockname = Sockname, + peername = Peername, + peercert = Peercert, + proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + client_id = <<>>, + is_assigned = false, + %%conn_pid = self(), + username = init_username(Peercert, Options), + clean_start = false, + topic_aliases = #{}, + is_bridge = false, + connected = false, + topic_alias_maximum = #{to_client => 0, from_client => 0}, + conn_mod = maps:get(conn_mod, SocketOpts, undefined), + credentials = #{}, + ws_cookie = maps:get(ws_cookie, SocketOpts, undefined) + }. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -122,8 +118,8 @@ init_username(Peercert, Options) -> _ -> undefined end. -set_username(Username, PState = #pstate{username = undefined}) -> - PState#pstate{username = Username}; +set_username(Username, PState = #protocol{username = undefined}) -> + PState#protocol{username = Username}; set_username(_Username, PState) -> PState. @@ -131,12 +127,12 @@ set_username(_Username, PState) -> %% API %%-------------------------------------------------------------------- -info(PState = #pstate{zone = Zone, - conn_props = ConnProps, - ack_props = AckProps, - session = Session, - topic_aliases = Aliases, - will_msg = WillMsg}) -> +info(PState = #protocol{zone = Zone, + conn_props = ConnProps, + ack_props = AckProps, + session = Session, + topic_aliases = Aliases, + will_msg = WillMsg}) -> maps:merge(attrs(PState), #{conn_props => ConnProps, ack_props => AckProps, session => Session, @@ -145,19 +141,19 @@ info(PState = #pstate{zone = Zone, enable_acl => emqx_zone:get_env(Zone, enable_acl, false) }). -attrs(#pstate{zone = Zone, - client_id = ClientId, - username = Username, - peername = Peername, - peercert = Peercert, - clean_start = CleanStart, - proto_ver = ProtoVer, - proto_name = ProtoName, - keepalive = Keepalive, - is_bridge = IsBridge, - connected_at = ConnectedAt, - conn_mod = ConnMod, - credentials = Credentials}) -> +attrs(#protocol{zone = Zone, + client_id = ClientId, + username = Username, + peername = Peername, + peercert = Peercert, + clean_start = CleanStart, + proto_ver = ProtoVer, + proto_name = ProtoName, + keepalive = Keepalive, + is_bridge = IsBridge, + connected_at = ConnectedAt, + conn_mod = ConnMod, + credentials = Credentials}) -> #{zone => Zone, client_id => ClientId, username => Username, @@ -173,25 +169,25 @@ attrs(#pstate{zone = Zone, credentials => Credentials }. -attr(proto_ver, #pstate{proto_ver = ProtoVer}) -> +attr(proto_ver, #protocol{proto_ver = ProtoVer}) -> ProtoVer; -attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> +attr(max_inflight, #protocol{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> get_property('Receive-Maximum', ConnProps, 65535); -attr(max_inflight, #pstate{zone = Zone}) -> +attr(max_inflight, #protocol{zone = Zone}) -> emqx_zone:get_env(Zone, max_inflight, 65535); -attr(expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> +attr(expiry_interval, #protocol{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> get_property('Session-Expiry-Interval', ConnProps, 0); -attr(expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}) -> +attr(expiry_interval, #protocol{zone = Zone, clean_start = CleanStart}) -> case CleanStart of true -> 0; false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) end; -attr(topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> +attr(topic_alias_maximum, #protocol{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> get_property('Topic-Alias-Maximum', ConnProps, 0); -attr(topic_alias_maximum, #pstate{zone = Zone}) -> +attr(topic_alias_maximum, #protocol{zone = Zone}) -> emqx_zone:get_env(Zone, max_topic_alias, 0); attr(Name, PState) -> - Attrs = lists:zip(record_info(fields, pstate), tl(tuple_to_list(PState))), + Attrs = lists:zip(record_info(fields, protocol), tl(tuple_to_list(PState))), case lists:keyfind(Name, 1, Attrs) of {_, Value} -> Value; false -> undefined @@ -200,13 +196,13 @@ attr(Name, PState) -> caps(Name, PState) -> maps:get(Name, caps(PState)). -caps(#pstate{zone = Zone}) -> +caps(#protocol{zone = Zone}) -> emqx_mqtt_caps:get_caps(Zone). -client_id(#pstate{client_id = ClientId}) -> +client_id(#protocol{client_id = ClientId}) -> ClientId. -credentials(#pstate{zone = Zone, +credentials(#protocol{zone = Zone, client_id = ClientId, username = Username, sockname = Sockname, @@ -232,79 +228,61 @@ keepsafety(Credentials) -> (cn, _) -> false; (_, _) -> true end, Credentials). -stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg}, - send_stats = #{pkt := SendPkt, msg := SendMsg}}) -> - [{recv_pkt, RecvPkt}, - {recv_msg, RecvMsg}, - {send_pkt, SendPkt}, - {send_msg, SendMsg}]. - -session(#pstate{session = Session}) -> +session(#protocol{session = Session}) -> Session. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Packet Received -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- set_protover(?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = ProtoVer}), PState) -> - PState#pstate{proto_ver = ProtoVer}; + PState#protocol{proto_ver = ProtoVer}; set_protover(_Packet, PState) -> PState. --spec(received(emqx_mqtt_types:packet(), state()) - -> {ok, state()} - | {error, term()} - | {error, term(), state()} - | {stop, term(), state()}). -received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT -> +handle_in(?PACKET(Type), PState = #protocol{connected = false}) when Type =/= ?CONNECT -> {error, proto_not_connected, PState}; -received(?PACKET(?CONNECT), PState = #pstate{connected = true}) -> +handle_in(?PACKET(?CONNECT), PState = #protocol{connected = true}) -> {error, proto_unexpected_connect, PState}; -received(Packet = ?PACKET(Type), PState) -> - trace(recv, Packet), +handle_in(Packet = ?PACKET(_Type), PState) -> PState1 = set_protover(Packet, PState), try emqx_packet:validate(Packet) of true -> case preprocess_properties(Packet, PState1) of {ok, Packet1, PState2} -> - process(Packet1, inc_stats(recv, Type, PState2)); + process(Packet1, PState2); {error, ReasonCode} -> - {error, ReasonCode, PState1} + handle_out({disconnect, ReasonCode}, PState1) end catch error:protocol_error -> - deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState1), - {error, protocol_error, PState}; + handle_out({disconnect, ?RC_PROTOCOL_ERROR}, PState1); error:subscription_identifier_invalid -> - deliver({disconnect, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED}, PState1), - {error, subscription_identifier_invalid, PState1}; + handle_out({disconnect, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED}, PState1); error:topic_alias_invalid -> - deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState1), - {error, topic_alias_invalid, PState1}; + handle_out({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState1); error:topic_filters_invalid -> - deliver({disconnect, ?RC_TOPIC_FILTER_INVALID}, PState1), - {error, topic_filters_invalid, PState1}; + handle_out({disconnect, ?RC_TOPIC_FILTER_INVALID}, PState1); error:topic_name_invalid -> - deliver({disconnect, ?RC_TOPIC_FILTER_INVALID}, PState1), - {error, topic_filters_invalid, PState1}; - error:Reason -> - deliver({disconnect, ?RC_MALFORMED_PACKET}, PState1), - {error, Reason, PState1} + handle_out({disconnect, ?RC_TOPIC_FILTER_INVALID}, PState1); + error:_Reason -> + %% TODO: {error, Reason, PState1} + handle_out({disconnect, ?RC_MALFORMED_PACKET}, PState1) end. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Preprocess MQTT Properties -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- preprocess_properties(Packet = #mqtt_packet{ variable = #mqtt_packet_connect{ properties = #{'Topic-Alias-Maximum' := ToClient} } }, - PState = #pstate{topic_alias_maximum = TopicAliasMaximum}) -> - {ok, Packet, PState#pstate{topic_alias_maximum = TopicAliasMaximum#{to_client => ToClient}}}; + PState = #protocol{topic_alias_maximum = TopicAliasMaximum}) -> + {ok, Packet, PState#protocol{topic_alias_maximum = TopicAliasMaximum#{to_client => ToClient}}}; %% Subscription Identifier preprocess_properties(Packet = #mqtt_packet{ @@ -313,7 +291,7 @@ preprocess_properties(Packet = #mqtt_packet{ topic_filters = TopicFilters } }, - PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) -> + PState = #protocol{proto_ver = ?MQTT_PROTO_V5}) -> TopicFilters1 = [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters], {ok, Packet#mqtt_packet{variable = Subscribe#mqtt_packet_subscribe{topic_filters = TopicFilters1}}, PState}; @@ -323,7 +301,6 @@ preprocess_properties(#mqtt_packet{ properties = #{'Topic-Alias' := 0}} }, PState) -> - deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState), {error, ?RC_TOPIC_ALIAS_INVALID}; preprocess_properties(Packet = #mqtt_packet{ @@ -331,7 +308,7 @@ preprocess_properties(Packet = #mqtt_packet{ topic_name = <<>>, properties = #{'Topic-Alias' := AliasId}} }, - PState = #pstate{proto_ver = ?MQTT_PROTO_V5, + PState = #protocol{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases, topic_alias_maximum = #{from_client := TopicAliasMaximum}}) -> case AliasId =< TopicAliasMaximum of @@ -339,7 +316,6 @@ preprocess_properties(Packet = #mqtt_packet{ {ok, Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{ topic_name = maps:get(AliasId, Aliases, <<>>)}}, PState}; false -> - deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState), {error, ?RC_TOPIC_ALIAS_INVALID} end; @@ -348,23 +324,22 @@ preprocess_properties(Packet = #mqtt_packet{ topic_name = Topic, properties = #{'Topic-Alias' := AliasId}} }, - PState = #pstate{proto_ver = ?MQTT_PROTO_V5, + PState = #protocol{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases, topic_alias_maximum = #{from_client := TopicAliasMaximum}}) -> case AliasId =< TopicAliasMaximum of true -> - {ok, Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}}; + {ok, Packet, PState#protocol{topic_aliases = maps:put(AliasId, Topic, Aliases)}}; false -> - deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState), {error, ?RC_TOPIC_ALIAS_INVALID} end; preprocess_properties(Packet, PState) -> {ok, Packet, PState}. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Process MQTT Packet -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- process(?CONNECT_PACKET( #mqtt_packet_connect{proto_name = ProtoName, @@ -381,7 +356,7 @@ process(?CONNECT_PACKET( %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) PState0 = maybe_use_username_as_clientid(ClientId, set_username(Username, - PState#pstate{proto_ver = ProtoVer, + PState#protocol{proto_ver = ProtoVer, proto_name = ProtoName, clean_start = CleanStart, keepalive = Keepalive, @@ -389,120 +364,115 @@ process(?CONNECT_PACKET( is_bridge = IsBridge, connected_at = os:timestamp()})), - NewClientId = PState0#pstate.client_id, + NewClientId = PState0#protocol.client_id, emqx_logger:set_metadata_client_id(NewClientId), Credentials = credentials(PState0), - PState1 = PState0#pstate{credentials = Credentials}, + PState1 = PState0#protocol{credentials = Credentials}, connack( case check_connect(ConnPkt, PState1) of ok -> case emqx_access_control:authenticate(Credentials#{password => Password}) of {ok, Credentials0} -> PState3 = maybe_assign_client_id(PState1), - emqx_logger:set_metadata_client_id(PState3#pstate.client_id), + emqx_logger:set_metadata_client_id(PState3#protocol.client_id), %% Open session SessAttrs = #{will_msg => make_will_msg(ConnPkt)}, case try_open_session(SessAttrs, PState3) of {ok, Session, SP} -> - PState4 = PState3#pstate{session = Session, connected = true, + PState4 = PState3#protocol{session = Session, connected = true, credentials = keepsafety(Credentials0)}, ok = emqx_cm:register_channel(client_id(PState4)), - ok = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)), + true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)), %% Start keepalive start_keepalive(Keepalive, PState4), %% Success {?RC_SUCCESS, SP, PState4}; {error, Error} -> ?LOG(error, "Failed to open session: ~p", [Error]), - {?RC_UNSPECIFIED_ERROR, PState1#pstate{credentials = Credentials0}} + {?RC_UNSPECIFIED_ERROR, PState1#protocol{credentials = Credentials0}} end; {error, Reason} -> ?LOG(warning, "Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]), - {emqx_reason_codes:connack_error(Reason), PState1#pstate{credentials = Credentials}} + {emqx_reason_codes:connack_error(Reason), PState1#protocol{credentials = Credentials}} end; {error, ReasonCode} -> {ReasonCode, PState1} end); -process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState = #pstate{zone = Zone}) -> +process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState = #protocol{zone = Zone}) -> case check_publish(Packet, PState) of ok -> do_publish(Packet, PState); {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), + %% TODO: ... AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), do_acl_deny_action(AclDenyAction, Packet, ReasonCode, PState) end; -process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState = #pstate{zone = Zone}) -> +process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState = #protocol{zone = Zone}) -> case check_publish(Packet, PState) of ok -> do_publish(Packet, PState); {error, ReasonCode} -> - ?LOG(warning, "Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), - case deliver({puback, PacketId, ReasonCode}, PState) of - {ok, PState1} -> - AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), - do_acl_deny_action(AclDenyAction, Packet, ReasonCode, PState1); - Error -> Error - end + ?LOG(warning, "Cannot publish qos1 message to ~s for ~s", + [Topic, emqx_reason_codes:text(ReasonCode)]), + handle_out({puback, PacketId, ReasonCode}, PState) end; -process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState = #pstate{zone = Zone}) -> +process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState = #protocol{zone = Zone}) -> case check_publish(Packet, PState) of ok -> do_publish(Packet, PState); {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos2 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), - case deliver({pubrec, PacketId, ReasonCode}, PState) of - {ok, PState1} -> - AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), - do_acl_deny_action(AclDenyAction, Packet, ReasonCode, PState1); - Error -> Error - end + handle_out({pubrec, PacketId, ReasonCode}, PState) end; -process(?PUBACK_PACKET(PacketId, ReasonCode), PState = #pstate{session = Session}) -> - NSession = emqx_session:puback(PacketId, ReasonCode, Session), - {ok, PState#pstate{session = NSession}}; +process(?PUBACK_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> + case emqx_session:puback(PacketId, ReasonCode, Session) of + {ok, NSession} -> + {ok, PState#protocol{session = NSession}}; + {error, _NotFound} -> + {ok, PState} %% TODO: Fixme later + end; -process(?PUBREC_PACKET(PacketId, ReasonCode), PState = #pstate{session = Session}) -> +process(?PUBREC_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> case emqx_session:pubrec(PacketId, ReasonCode, Session) of {ok, NSession} -> - send(?PUBREL_PACKET(PacketId), PState#pstate{session = NSession}); + {ok, ?PUBREL_PACKET(PacketId), PState#protocol{session = NSession}}; {error, NotFound} -> - send(?PUBREL_PACKET(PacketId, NotFound), PState) + {ok, ?PUBREL_PACKET(PacketId, NotFound), PState} end; -process(?PUBREL_PACKET(PacketId, ReasonCode), PState = #pstate{session = Session}) -> +process(?PUBREL_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> case emqx_session:pubrel(PacketId, ReasonCode, Session) of {ok, NSession} -> - send(?PUBCOMP_PACKET(PacketId), PState#pstate{session = NSession}); + {ok, ?PUBCOMP_PACKET(PacketId), PState#protocol{session = NSession}}; {error, NotFound} -> - send(?PUBCOMP_PACKET(PacketId, NotFound), PState) + {ok, ?PUBCOMP_PACKET(PacketId, NotFound), PState} end; -process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = Session}) -> +process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> case emqx_session:pubcomp(PacketId, ReasonCode, Session) of {ok, NSession} -> - {ok, PState#pstate{session = NSession}}; - {error, _NotFound} -> + {ok, PState#protocol{session = NSession}}; + {error, _NotFound} -> ok %% TODO: How to handle NotFound? - {ok, PState} end; process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{zone = Zone, proto_ver = ProtoVer, session = Session, credentials = Credentials}) -> + PState = #protocol{zone = Zone, session = Session, credentials = Credentials}) -> case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of {ok, TopicFilters} -> TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters), TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters0), {ok, ReasonCodes, NSession} = emqx_session:subscribe(TopicFilters1, Session), - deliver({suback, PacketId, ReasonCodes}, PState#pstate{session = NSession}); + handle_out({suback, PacketId, ReasonCodes}, PState#protocol{session = NSession}); {error, TopicFilters} -> {SubTopics, ReasonCodes} = lists:foldr(fun({Topic, #{rc := ?RC_SUCCESS}}, {Topics, Codes}) -> @@ -512,119 +482,102 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), end, {[], []}, TopicFilters), ?LOG(warning, "Cannot subscribe ~p for ~p", [SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]), - case deliver({suback, PacketId, ReasonCodes}, PState) of - {ok, PState1} -> - AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), - do_acl_deny_action(AclDenyAction, Packet, ReasonCodes, PState1); - Error -> - Error - end + handle_out({suback, PacketId, ReasonCodes}, PState) end; process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = Session, credentials = Credentials}) -> + PState = #protocol{session = Session, credentials = Credentials}) -> TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [Credentials], parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)), TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters), {ok, ReasonCodes, NSession} = emqx_session:unsubscribe(TopicFilters1, Session), - deliver({unsuback, PacketId, ReasonCodes}, PState#pstate{session = NSession}); + handle_out({unsuback, PacketId, ReasonCodes}, PState#protocol{session = NSession}); process(?PACKET(?PINGREQ), PState) -> - send(?PACKET(?PINGRESP), PState); + {ok, ?PACKET(?PINGRESP), PState}; process(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), - PState = #pstate{session = Session, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) -> + PState = #protocol{session = Session, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) -> case Interval =/= 0 andalso OldInterval =:= 0 of true -> - deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), - {error, protocol_error, PState#pstate{will_msg = undefined}}; + handle_out({disconnect, ?RC_PROTOCOL_ERROR}, PState#protocol{will_msg = undefined}); false -> - NSession = emqx_session:update_expiry_interval(Interval, Session), + %% TODO: + %% emqx_session:update_expiry_interval(SPid, Interval), %% Clean willmsg - {stop, normal, PState#pstate{will_msg = undefined, session = NSession}} + {stop, normal, PState#protocol{will_msg = undefined}} end; process(?DISCONNECT_PACKET(?RC_SUCCESS), PState) -> - {stop, normal, PState#pstate{will_msg = undefined}}; + {stop, normal, PState#protocol{will_msg = undefined}}; process(?DISCONNECT_PACKET(_), PState) -> - {stop, {shutdown, abnormal_disconnet}, PState}. + {stop, {shutdown, abnormal_disconnet}, PState}; -%%------------------------------------------------------------------------------ +process(?AUTH_PACKET(), State) -> + %%TODO: implement later. + {ok, State}. + +%%-------------------------------------------------------------------- %% ConnAck --> Client -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -connack({?RC_SUCCESS, SP, PState = #pstate{credentials = Credentials}}) -> +connack({?RC_SUCCESS, SP, PState = #protocol{credentials = Credentials}}) -> ok = emqx_hooks:run('client.connected', [Credentials, ?RC_SUCCESS, attrs(PState)]), - deliver({connack, ?RC_SUCCESS, sp(SP)}, PState); + handle_out({connack, ?RC_SUCCESS, sp(SP)}, PState); -connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Credentials}}) -> +connack({ReasonCode, PState = #protocol{proto_ver = ProtoVer, credentials = Credentials}}) -> ok = emqx_hooks:run('client.connected', [Credentials, ReasonCode, attrs(PState)]), [ReasonCode1] = reason_codes_compat(connack, [ReasonCode], ProtoVer), - _ = deliver({connack, ReasonCode1}, PState), - {error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}. + handle_out({connack, ReasonCode1}, PState). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Publish Message -> Broker -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId), - PState = #pstate{session = Session, credentials = Credentials}) -> + PState = #protocol{session = Session, credentials = Credentials}) -> Msg = emqx_mountpoint:mount(mountpoint(Credentials), emqx_packet:to_message(Credentials, Packet)), Msg1 = emqx_message:set_flag(dup, false, Msg), case emqx_session:publish(PacketId, Msg1, Session) of - {ok, Result} -> - puback(QoS, PacketId, {ok, Result}, PState); - {ok, Result, NSession} -> - puback(QoS, PacketId, {ok, Result}, PState#pstate{session = NSession}); - {error, ReasonCode} -> - puback(QoS, PacketId, {error, ReasonCode}, PState) + {ok, Results} -> + puback(QoS, PacketId, Results, PState); + {ok, Results, NSession} -> + puback(QoS, PacketId, Results, PState#protocol{session = NSession}); + {error, Reason} -> + puback(QoS, PacketId, {error, Reason}, PState) end. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Puback -> Client -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ puback(?QOS_0, _PacketId, _Result, PState) -> {ok, PState}; puback(?QOS_1, PacketId, {ok, []}, PState) -> - deliver({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); + handle_out({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); %%TODO: calc the deliver count? puback(?QOS_1, PacketId, {ok, _Result}, PState) -> - deliver({puback, PacketId, ?RC_SUCCESS}, PState); + handle_out({puback, PacketId, ?RC_SUCCESS}, PState); puback(?QOS_1, PacketId, {error, ReasonCode}, PState) -> - deliver({puback, PacketId, ReasonCode}, PState); + handle_out({puback, PacketId, ReasonCode}, PState); puback(?QOS_2, PacketId, {ok, []}, PState) -> - deliver({pubrec, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); + handle_out({pubrec, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); puback(?QOS_2, PacketId, {ok, _Result}, PState) -> - deliver({pubrec, PacketId, ?RC_SUCCESS}, PState); + handle_out({pubrec, PacketId, ?RC_SUCCESS}, PState); puback(?QOS_2, PacketId, {error, ReasonCode}, PState) -> - deliver({pubrec, PacketId, ReasonCode}, PState). + handle_out({pubrec, PacketId, ReasonCode}, PState). -%%------------------------------------------------------------------------------ -%% Deliver Packet -> Client -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- +%% Handle outgoing +%%-------------------------------------------------------------------- --spec(deliver(list(tuple()) | tuple(), state()) -> {ok, state()} | {error, term()}). -deliver([], PState) -> - {ok, PState}; -deliver([Pub|More], PState) -> - case deliver(Pub, PState) of - {ok, PState1} -> - deliver(More, PState1); - {error, _} = Error -> - Error - end; - -deliver({connack, ReasonCode}, PState) -> - send(?CONNACK_PACKET(ReasonCode), PState); - -deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, - proto_ver = ?MQTT_PROTO_V5, - client_id = ClientId, - is_assigned = IsAssigned, - topic_alias_maximum = TopicAliasMaximum}) -> +handle_out({connack, ?RC_SUCCESS, SP}, PState = #protocol{zone = Zone, + proto_ver = ?MQTT_PROTO_V5, + client_id = ClientId, + is_assigned = IsAssigned, + topic_alias_maximum = TopicAliasMaximum}) -> #{max_packet_size := MaxPktSize, max_qos_allowed := MaxQoS, mqtt_retain_available := Retain, @@ -668,81 +621,76 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, Keepalive -> Props2#{'Server-Keep-Alive' => Keepalive} end, - PState1 = PState#pstate{topic_alias_maximum = TopicAliasMaximum#{from_client => MaxAlias}}, + PState1 = PState#protocol{topic_alias_maximum = TopicAliasMaximum#{from_client => MaxAlias}}, - send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props3), PState1); + {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, Props3), PState1}; -deliver({connack, ReasonCode, SP}, PState) -> - send(?CONNACK_PACKET(ReasonCode, SP), PState); +handle_out({connack, ?RC_SUCCESS, SP}, PState) -> + {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP), PState}; -deliver({publish, PacketId, Msg}, PState = #pstate{credentials = Credentials}) -> - Msg0 = emqx_hooks:run_fold('message.deliver', [Credentials], Msg), - Msg1 = emqx_message:update_expiry(Msg0), - Msg2 = emqx_mountpoint:unmount(mountpoint(Credentials), Msg1), - send(emqx_packet:from_message(PacketId, Msg2), PState); +handle_out({connack, ReasonCode}, PState = #protocol{proto_ver = ProtoVer}) -> + Reason = emqx_reason_codes:name(ReasonCode, ProtoVer), + {error, Reason, ?CONNACK_PACKET(ReasonCode), PState}; -deliver({puback, PacketId, ReasonCode}, PState) -> - send(?PUBACK_PACKET(PacketId, ReasonCode), PState); +handle_out({puback, PacketId, ReasonCode}, PState) -> + {ok, ?PUBACK_PACKET(PacketId, ReasonCode), PState}; + %% TODO: + %% AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), + %% do_acl_deny_action(AclDenyAction, Packet, ReasonCode, PState1); -deliver({pubrel, PacketId}, PState) -> - send(?PUBREL_PACKET(PacketId), PState); +handle_out({pubrel, PacketId}, PState) -> + {ok, ?PUBREL_PACKET(PacketId), PState}; -deliver({pubrec, PacketId, ReasonCode}, PState) -> - send(?PUBREC_PACKET(PacketId, ReasonCode), PState); +handle_out({pubrec, PacketId, ReasonCode}, PState) -> + %% TODO: + %% AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), + %% do_acl_deny_action(AclDenyAction, Packet, ReasonCode, PState1); + {ok, ?PUBREC_PACKET(PacketId, ReasonCode), PState}; -deliver({suback, PacketId, ReasonCodes}, PState = #pstate{proto_ver = ProtoVer}) -> - send(?SUBACK_PACKET(PacketId, reason_codes_compat(suback, ReasonCodes, ProtoVer)), PState); +%%handle_out({pubrec, PacketId, ReasonCode}, PState) -> +%% {ok, ?PUBREC_PACKET(PacketId, ReasonCode), PState}; -deliver({unsuback, PacketId, ReasonCodes}, PState = #pstate{proto_ver = ProtoVer}) -> - send(?UNSUBACK_PACKET(PacketId, reason_codes_compat(unsuback, ReasonCodes, ProtoVer)), PState); +handle_out({suback, PacketId, ReasonCodes}, PState = #protocol{proto_ver = ProtoVer}) -> + %% TODO: ACL Deny + {ok, ?SUBACK_PACKET(PacketId, reason_codes_compat(suback, ReasonCodes, ProtoVer)), PState}; + +handle_out({unsuback, PacketId, ReasonCodes}, PState = #protocol{proto_ver = ProtoVer}) -> + {ok, ?UNSUBACK_PACKET(PacketId, reason_codes_compat(unsuback, ReasonCodes, ProtoVer)), PState}; %% Deliver a disconnect for mqtt 5.0 -deliver({disconnect, ReasonCode}, PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) -> - send(?DISCONNECT_PACKET(ReasonCode), PState); +handle_out({disconnect, RC}, PState = #protocol{proto_ver = ?MQTT_PROTO_V5}) -> + {error, emqx_reason_codes:name(RC), ?DISCONNECT_PACKET(RC), PState}; -deliver({disconnect, _ReasonCode}, PState) -> +handle_out({disconnect, RC}, PState) -> + {error, emqx_reason_codes:name(RC), PState}. + +handle_timeout(Timer, Name, PState) -> {ok, PState}. -%%------------------------------------------------------------------------------ -%% Send Packet to Client - --spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}). -send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send}) -> - case Send(Packet, #{version => Ver}) of - ok -> - trace(send, Packet), - {ok, PState}; - {ok, Data} -> - trace(send, Packet), - emqx_metrics:inc_sent(Packet), - ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)), - {ok, inc_stats(send, Type, PState)}; - {error, Reason} -> - {error, Reason} - end. - %%------------------------------------------------------------------------------ %% Maybe use username replace client id -maybe_use_username_as_clientid(ClientId, PState = #pstate{username = undefined}) -> - PState#pstate{client_id = ClientId}; -maybe_use_username_as_clientid(ClientId, PState = #pstate{username = Username, zone = Zone}) -> +maybe_use_username_as_clientid(ClientId, PState = #protocol{username = undefined}) -> + PState#protocol{client_id = ClientId}; +maybe_use_username_as_clientid(ClientId, PState = #protocol{username = Username, zone = Zone}) -> case emqx_zone:get_env(Zone, use_username_as_clientid, false) of - true -> PState#pstate{client_id = Username}; - false -> PState#pstate{client_id = ClientId} + true -> + PState#protocol{client_id = Username}; + false -> + PState#protocol{client_id = ClientId} end. %%------------------------------------------------------------------------------ %% Assign a clientId -maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps}) -> +maybe_assign_client_id(PState = #protocol{client_id = <<>>, ack_props = AckProps}) -> ClientId = emqx_guid:to_base62(emqx_guid:gen()), AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps), - PState#pstate{client_id = ClientId, is_assigned = true, ack_props = AckProps1}; + PState#protocol{client_id = ClientId, is_assigned = true, ack_props = AckProps1}; maybe_assign_client_id(PState) -> PState. -try_open_session(SessAttrs, PState = #pstate{zone = Zone, +try_open_session(SessAttrs, PState = #protocol{zone = Zone, client_id = ClientId, username = Username, clean_start = CleanStart}) -> @@ -782,9 +730,9 @@ make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer, ConnPkt end). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Check Packet -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- check_connect(Packet, PState) -> run_check_steps([fun check_proto_ver/2, @@ -815,7 +763,7 @@ check_client_id(#mqtt_packet_connect{client_id = <<>>, clean_start = true}, _PState) -> ok; -check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}) -> +check_client_id(#mqtt_packet_connect{client_id = ClientId}, #protocol{zone = Zone}) -> Len = byte_size(ClientId), MaxLen = emqx_zone:get_env(Zone, max_clientid_len), case (1 =< Len) andalso (Len =< MaxLen) of @@ -827,7 +775,7 @@ check_flapping(#mqtt_packet_connect{}, PState) -> do_flapping_detect(connect, PState). check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username}, - #pstate{zone = Zone, peername = Peername}) -> + #protocol{zone = Zone, peername = Peername}) -> Credentials = #{client_id => ClientId, username => Username, peername => Peername}, @@ -845,7 +793,7 @@ check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState) check_will_retain(#mqtt_packet_connect{will_retain = false, proto_ver = ?MQTT_PROTO_V5}, _PState) -> ok; -check_will_retain(#mqtt_packet_connect{will_retain = true, proto_ver = ?MQTT_PROTO_V5}, #pstate{zone = Zone}) -> +check_will_retain(#mqtt_packet_connect{will_retain = true, proto_ver = ?MQTT_PROTO_V5}, #protocol{zone = Zone}) -> case emqx_zone:get_env(Zone, mqtt_retain_available, true) of true -> {error, ?RC_RETAIN_NOT_SUPPORTED}; false -> ok @@ -854,7 +802,7 @@ check_will_retain(_Packet, _PState) -> ok. check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, - #pstate{zone = Zone, credentials = Credentials}) -> + #protocol{zone = Zone, credentials = Credentials}) -> EnableAcl = emqx_zone:get_env(Zone, enable_acl, false), case do_acl_check(EnableAcl, publish, Credentials, WillTopic) of ok -> ok; @@ -869,14 +817,14 @@ check_publish(Packet, PState) -> check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain}, variable = #mqtt_packet_publish{properties = _Properties}}, - #pstate{zone = Zone}) -> + #protocol{zone = Zone}) -> emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}). -check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}}) +check_pub_acl(_Packet, #protocol{credentials = #{is_superuser := IsSuper}}) when IsSuper -> ok; check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, - #pstate{zone = Zone, credentials = Credentials}) -> + #protocol{zone = Zone, credentials = Credentials}) -> EnableAcl = emqx_zone:get_env(Zone, enable_acl, false), do_acl_check(EnableAcl, publish, Credentials, Topic). @@ -890,7 +838,7 @@ run_check_steps([Check|Steps], Packet, PState) -> Error end. -check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) -> +check_subscribe(TopicFilters, PState = #protocol{zone = Zone}) -> case emqx_mqtt_caps:check_sub(Zone, TopicFilters) of {ok, TopicFilter1} -> check_sub_acl(TopicFilter1, PState); @@ -898,10 +846,10 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) -> {error, TopicFilter1} end. -check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}}) +check_sub_acl(TopicFilters, #protocol{credentials = #{is_superuser := IsSuper}}) when IsSuper -> {ok, TopicFilters}; -check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) -> +check_sub_acl(TopicFilters, #protocol{zone = Zone, credentials = Credentials}) -> EnableAcl = emqx_zone:get_env(Zone, enable_acl, false), lists:foldr( fun({Topic, SubOpts}, {Ok, Acc}) when EnableAcl -> @@ -912,26 +860,9 @@ check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) -> {ok, [TopicFilter | Acc]} end, {ok, []}, TopicFilters). -trace(recv, Packet) -> - ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]); -trace(send, Packet) -> - ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]). - -inc_stats(recv, Type, PState = #pstate{recv_stats = Stats}) -> - PState#pstate{recv_stats = inc_stats(Type, Stats)}; - -inc_stats(send, Type, PState = #pstate{send_stats = Stats}) -> - PState#pstate{send_stats = inc_stats(Type, Stats)}. - -inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) -> - Stats#{pkt := PktCnt + 1, msg := case Type =:= ?PUBLISH of - true -> MsgCnt + 1; - false -> MsgCnt - end}. - -terminate(_Reason, #pstate{client_id = undefined}) -> +terminate(_Reason, #protocol{client_id = undefined}) -> ok; -terminate(_Reason, PState = #pstate{connected = false}) -> +terminate(_Reason, PState = #protocol{connected = false}) -> do_flapping_detect(disconnect, PState), ok; terminate(Reason, PState) when Reason =:= conflict; @@ -939,20 +870,20 @@ terminate(Reason, PState) when Reason =:= conflict; do_flapping_detect(disconnect, PState), ok; -terminate(Reason, PState = #pstate{credentials = Credentials}) -> +terminate(Reason, PState = #protocol{credentials = Credentials}) -> do_flapping_detect(disconnect, PState), ?LOG(info, "Shutdown for ~p", [Reason]), ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]). start_keepalive(0, _PState) -> ignore; -start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 -> +start_keepalive(Secs, #protocol{zone = Zone}) when Secs > 0 -> Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75), self() ! {keepalive, start, round(Secs * Backoff)}. -%%----------------------------------------------------------------------------- +%%-------------------------------------------------------------------- %% Parse topic filters -%%----------------------------------------------------------------------------- +%%-------------------------------------------------------------------- parse_topic_filters(?SUBSCRIBE, RawTopicFilters) -> [emqx_topic:parse(RawTopic, SubOpts) || {RawTopic, SubOpts} <- RawTopicFilters]; @@ -966,17 +897,17 @@ sp(false) -> 0. flag(false) -> 0; flag(true) -> 1. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Execute actions in case acl deny -do_flapping_detect(Action, #pstate{zone = Zone, +do_flapping_detect(Action, #protocol{zone = Zone, client_id = ClientId}) -> ok = case emqx_zone:get_env(Zone, enable_flapping_detect, false) of true -> Threshold = emqx_zone:get_env(Zone, flapping_threshold, {10, 60}), case emqx_flapping:check(Action, ClientId, Threshold) of flapping -> - BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000), + BanExpiryInterval = emqx_zone:get_env(Zone, flapping_banned_expiry_interval, 3600000), Until = erlang:system_time(second) + BanExpiryInterval, emqx_banned:add(#banned{who = {client_id, ClientId}, reason = <<"flapping">>, @@ -990,13 +921,14 @@ do_flapping_detect(Action, #pstate{zone = Zone, end. do_acl_deny_action(disconnect, ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload), - ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer}) -> + ?RC_NOT_AUTHORIZED, PState = #protocol{proto_ver = ProtoVer}) -> {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState}; do_acl_deny_action(disconnect, ?PUBLISH_PACKET(QoS, _Topic, _PacketId, _Payload), - ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer}) + ?RC_NOT_AUTHORIZED, PState = #protocol{proto_ver = ProtoVer}) when QoS =:= ?QOS_1; QoS =:= ?QOS_2 -> - deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState), + %% TODO:... + %% deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState), {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState}; do_acl_deny_action(Action, ?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters), ReasonCodes, PState) @@ -1004,18 +936,18 @@ do_acl_deny_action(Action, ?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFi traverse_reason_codes(ReasonCodes, Action, PState); do_acl_deny_action(_OtherAction, _PubSubPacket, ?RC_NOT_AUTHORIZED, PState) -> {ok, PState}; -do_acl_deny_action(_OtherAction, _PubSubPacket, ReasonCode, PState = #pstate{proto_ver = ProtoVer}) -> +do_acl_deny_action(_OtherAction, _PubSubPacket, ReasonCode, PState = #protocol{proto_ver = ProtoVer}) -> {error, emqx_reason_codes:name(ReasonCode, ProtoVer), PState}. traverse_reason_codes([], _Action, PState) -> {ok, PState}; traverse_reason_codes([?RC_SUCCESS | LeftReasonCodes], Action, PState) -> traverse_reason_codes(LeftReasonCodes, Action, PState); -traverse_reason_codes([?RC_NOT_AUTHORIZED | _LeftReasonCodes], disconnect, PState = #pstate{proto_ver = ProtoVer}) -> +traverse_reason_codes([?RC_NOT_AUTHORIZED | _LeftReasonCodes], disconnect, PState = #protocol{proto_ver = ProtoVer}) -> {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState}; traverse_reason_codes([?RC_NOT_AUTHORIZED | LeftReasonCodes], Action, PState) -> traverse_reason_codes(LeftReasonCodes, Action, PState); -traverse_reason_codes([OtherCode | _LeftReasonCodes], _Action, PState = #pstate{proto_ver = ProtoVer}) -> +traverse_reason_codes([OtherCode | _LeftReasonCodes], _Action, PState = #protocol{proto_ver = ProtoVer}) -> {error, emqx_reason_codes:name(OtherCode, ProtoVer), PState}. %% Reason code compat @@ -1026,7 +958,7 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) -> reason_codes_compat(PktType, ReasonCodes, _ProtoVer) -> [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes]. -raw_topic_filters(#pstate{zone = Zone, proto_ver = ProtoVer, is_bridge = IsBridge}, RawTopicFilters) -> +raw_topic_filters(#protocol{zone = Zone, proto_ver = ProtoVer, is_bridge = IsBridge}, RawTopicFilters) -> IgnoreLoop = emqx_zone:get_env(Zone, ignore_loop_deliver, false), case ProtoVer < ?MQTT_PROTO_V5 of true -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 74cbee9d0..f5e7414d5 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -40,6 +40,7 @@ %% @end %%-------------------------------------------------------------------- +%% MQTT Session -module(emqx_session). -include("emqx.hrl"). @@ -49,91 +50,86 @@ -logger_header("[Session]"). --export([ new/1 - , handle/2 - , close/1 - ]). +-export([new/1]). -export([ info/1 , attrs/1 , stats/1 ]). --export([ subscribe/2 - , unsubscribe/2 - , publish/3 - , puback/2 - , puback/3 - , pubrec/2 +-export([ subscribe/3 + , unsubscribe/3 + ]). + +-export([publish/3]). + +-export([ puback/3 , pubrec/3 , pubrel/3 , pubcomp/3 ]). +-export([ deliver/3 + , await/3 + , enqueue/2 + ]). + +-export_type([ session/0 + , puback_ret/0 + ]). + +-import(emqx_zone, + [ get_env/2 + , get_env/3 + ]). + -record(session, { - %% ClientId: Identifier of Session - client_id :: binary(), - %% Clean Start Flag - clean_start = false :: boolean(), + clean_start :: boolean(), - %% Username - username :: maybe(binary()), - - %% Conn Binding: local | remote - %% binding = local :: local | remote, - - %% Deliver fun - deliver_fun :: function(), - - %% Next packet id of the session - next_pkt_id = 1 :: emqx_mqtt_types:packet_id(), - - %% Max subscriptions + %% Max subscriptions allowed max_subscriptions :: non_neg_integer(), %% Client’s Subscriptions. subscriptions :: map(), %% Upgrade QoS? - upgrade_qos = false :: boolean(), + upgrade_qos :: boolean(), - %% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked. + %% Client <- Broker: + %% Inflight QoS1, QoS2 messages sent to the client but unacked. inflight :: emqx_inflight:inflight(), - %% Max Inflight Size. DEPRECATED: Get from inflight - %% max_inflight = 32 :: non_neg_integer(), - - %% Retry interval for redelivering QoS1/2 messages - retry_interval = 20000 :: timeout(), - - %% Retry Timer - retry_timer :: maybe(reference()), - %% All QoS1, QoS2 messages published to when client is disconnected. %% QoS 1 and QoS 2 messages pending transmission to the Client. %% %% Optionally, QoS 0 messages pending transmission to the Client. mqueue :: emqx_mqueue:mqueue(), - %% Max Packets Awaiting PUBREL - max_awaiting_rel = 100 :: non_neg_integer(), + %% Next packet id of the session + next_pkt_id = 1 :: emqx_mqtt:packet_id(), - %% Awaiting PUBREL Timeout - await_rel_timeout = 20000 :: timeout(), + %% Retry interval for redelivering QoS1/2 messages + retry_interval :: timeout(), - %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. + %% Retry delivery timer + retry_timer :: maybe(reference()), + + %% Client -> Broker: + %% Inflight QoS2 messages received from client and waiting for pubrel. awaiting_rel :: map(), %% Awaiting PUBREL Timer await_rel_timer :: maybe(reference()), - will_msg :: emqx:message(), + %% Max Packets Awaiting PUBREL + max_awaiting_rel :: non_neg_integer(), - will_delay_timer :: maybe(reference()), + %% Awaiting PUBREL Timeout + await_rel_timeout :: timeout(), %% Session Expiry Interval - expiry_interval = 7200 :: timeout(), + expiry_interval :: timeout(), %% Expired Timer expiry_timer :: maybe(reference()), @@ -144,287 +140,155 @@ -opaque(session() :: #session{}). --export_type([session/0]). +-type(puback_ret() :: {ok, session()} + | {ok, emqx_types:message(), session()} + | {error, emqx_mqtt:reason_code()}). %% @doc Create a session. -spec(new(Attrs :: map()) -> session()). new(#{zone := Zone, - client_id := ClientId, clean_start := CleanStart, - username := Username, - expiry_interval := ExpiryInterval, max_inflight := MaxInflight, - will_msg := WillMsg}) -> - emqx_logger:set_metadata_client_id(ClientId), - #session{client_id = ClientId, - clean_start = CleanStart, - username = Username, - subscriptions = #{}, - inflight = emqx_inflight:new(MaxInflight), - mqueue = init_mqueue(Zone), - awaiting_rel = #{}, - expiry_interval = ExpiryInterval, - created_at = os:timestamp(), - will_msg = WillMsg}. + expiry_interval := ExpiryInterval}) -> + %% emqx_logger:set_metadata_client_id(ClientId), + #session{clean_start = CleanStart, + max_subscriptions = get_env(Zone, max_subscriptions, 0), + subscriptions = #{}, + upgrade_qos = get_env(Zone, upgrade_qos, false), + inflight = emqx_inflight:new(MaxInflight), + mqueue = init_mqueue(Zone), + next_pkt_id = 1, + retry_interval = get_env(Zone, retry_interval, 0), + awaiting_rel = #{}, + max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), + await_rel_timeout = get_env(Zone, await_rel_timeout), + expiry_interval = ExpiryInterval, + created_at = os:timestamp() + }. init_mqueue(Zone) -> - emqx_mqueue:init(#{max_len => emqx_zone:get_env(Zone, max_mqueue_len, 1000), - store_qos0 => emqx_zone:get_env(Zone, mqueue_store_qos0, true), - priorities => emqx_zone:get_env(Zone, mqueue_priorities), - default_priority => emqx_zone:get_env(Zone, mqueue_default_priority) + emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000), + store_qos0 => get_env(Zone, mqueue_store_qos0, true), + priorities => get_env(Zone, mqueue_priorities), + default_priority => get_env(Zone, mqueue_default_priority) }). -%% @doc Get session info --spec(info(session()) -> list({atom(), term()})). -info(Session = #session{next_pkt_id = PktId, - max_subscriptions = MaxSubscriptions, - subscriptions = Subscriptions, - upgrade_qos = UpgradeQoS, - inflight = Inflight, - retry_interval = RetryInterval, - mqueue = MQueue, - max_awaiting_rel = MaxAwaitingRel, - awaiting_rel = AwaitingRel, - await_rel_timeout = AwaitRelTimeout}) -> - attrs(Session) ++ [{next_pkt_id, PktId}, - {max_subscriptions, MaxSubscriptions}, - {subscriptions, Subscriptions}, - {upgrade_qos, UpgradeQoS}, - {inflight, Inflight}, - {retry_interval, RetryInterval}, - {mqueue_len, emqx_mqueue:len(MQueue)}, - {awaiting_rel, AwaitingRel}, - {max_awaiting_rel, MaxAwaitingRel}, - {await_rel_timeout, AwaitRelTimeout}]. +%%------------------------------------------------------------------------------ +%% Info, Attrs, Stats +%%------------------------------------------------------------------------------ -%% @doc Get session attrs --spec(attrs(session()) -> list({atom(), term()})). -attrs(#session{client_id = ClientId, - clean_start = CleanStart, - username = Username, +%% @doc Get session info +-spec(info(session()) -> map()). +info(#session{clean_start = CleanStart, + max_subscriptions = MaxSubscriptions, + subscriptions = Subscriptions, + upgrade_qos = UpgradeQoS, + inflight = Inflight, + retry_interval = RetryInterval, + mqueue = MQueue, + next_pkt_id = PktId, + max_awaiting_rel = MaxAwaitingRel, + awaiting_rel = AwaitingRel, + await_rel_timeout = AwaitRelTimeout, + expiry_interval = ExpiryInterval, + created_at = CreatedAt}) -> + #{clean_start => CleanStart, + max_subscriptions => MaxSubscriptions, + subscriptions => Subscriptions, + upgrade_qos => UpgradeQoS, + inflight => Inflight, + retry_interval => RetryInterval, + mqueue_len => emqx_mqueue:len(MQueue), + next_pkt_id => PktId, + awaiting_rel => AwaitingRel, + max_awaiting_rel => MaxAwaitingRel, + await_rel_timeout => AwaitRelTimeout, + expiry_interval => ExpiryInterval div 1000, + created_at => CreatedAt + }. + +%% @doc Get session attrs. +-spec(attrs(session()) -> map()). +attrs(#session{clean_start = CleanStart, expiry_interval = ExpiryInterval, created_at = CreatedAt}) -> - [{client_id, ClientId}, - {clean_start, CleanStart}, - {username, Username}, - {expiry_interval, ExpiryInterval div 1000}, - {created_at, CreatedAt}]. + #{clean_start => CleanStart, + expiry_interval => ExpiryInterval div 1000, + created_at => CreatedAt + }. -%% @doc Get session stats --spec(stats(session()) -> list({atom(), non_neg_integer()})). +%% @doc Get session stats. +-spec(stats(session()) -> #{atom() => non_neg_integer()}). stats(#session{max_subscriptions = MaxSubscriptions, subscriptions = Subscriptions, inflight = Inflight, mqueue = MQueue, max_awaiting_rel = MaxAwaitingRel, awaiting_rel = AwaitingRel}) -> - lists:append(emqx_misc:proc_stats(), - [{max_subscriptions, MaxSubscriptions}, - {subscriptions_count, maps:size(Subscriptions)}, - {max_inflight, emqx_inflight:max_size(Inflight)}, - {inflight_len, emqx_inflight:size(Inflight)}, - {max_mqueue, emqx_mqueue:max_len(MQueue)}, - {mqueue_len, emqx_mqueue:len(MQueue)}, - {mqueue_dropped, emqx_mqueue:dropped(MQueue)}, - {max_awaiting_rel, MaxAwaitingRel}, - {awaiting_rel_len, maps:size(AwaitingRel)}, - {deliver_msg, emqx_pd:get_counter(deliver_stats)}, - {enqueue_msg, emqx_pd:get_counter(enqueue_stats)}]). + #{max_subscriptions => MaxSubscriptions, + subscriptions_count => maps:size(Subscriptions), + max_inflight => emqx_inflight:max_size(Inflight), + inflight_len => emqx_inflight:size(Inflight), + max_mqueue => emqx_mqueue:max_len(MQueue), + mqueue_len => emqx_mqueue:len(MQueue), + mqueue_dropped => emqx_mqueue:dropped(MQueue), + max_awaiting_rel => MaxAwaitingRel, + awaiting_rel_len => maps:size(AwaitingRel) + }. %%-------------------------------------------------------------------- %% PubSub API %%-------------------------------------------------------------------- -%% SUBSCRIBE: --spec(subscribe(list({emqx_topic:topic(), emqx_types:subopts()}), session()) - -> {ok, list(emqx_mqtt_types:reason_code()), session()}). -subscribe(RawTopicFilters, Session = #session{client_id = ClientId, - username = Username, - subscriptions = Subscriptions}) +%% Client -> Broker: SUBSCRIBE +-spec(subscribe(emqx_types:credentials(), emqx_mqtt:topic_filters(), session()) + -> {ok, list(emqx_mqtt:reason_code()), session()}). +subscribe(Credentials, RawTopicFilters, Session = #session{subscriptions = Subscriptions}) when is_list(RawTopicFilters) -> TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts)) || {RawTopic, SubOpts} <- RawTopicFilters], {ReasonCodes, Subscriptions1} = lists:foldr( - fun ({Topic, SubOpts = #{qos := QoS, rc := RC}}, {RcAcc, SubMap}) when - RC == ?QOS_0; RC == ?QOS_1; RC == ?QOS_2 -> - {[QoS|RcAcc], do_subscribe(ClientId, Username, Topic, SubOpts, SubMap)}; - ({_Topic, #{rc := RC}}, {RcAcc, SubMap}) -> - {[RC|RcAcc], SubMap} - end, {[], Subscriptions}, TopicFilters), + fun({Topic, SubOpts = #{qos := QoS, rc := RC}}, {RcAcc, SubMap}) + when RC == ?QOS_0; RC == ?QOS_1; RC == ?QOS_2 -> + {[QoS|RcAcc], do_subscribe(Credentials, Topic, SubOpts, SubMap)}; + ({_Topic, #{rc := RC}}, {RcAcc, SubMap}) -> + {[RC|RcAcc], SubMap} + end, {[], Subscriptions}, TopicFilters), {ok, ReasonCodes, Session#session{subscriptions = Subscriptions1}}. -%% TODO: improve later. -do_subscribe(ClientId, Username, Topic, SubOpts, SubMap) -> +do_subscribe(Credentials = #{client_id := ClientId}, Topic, SubOpts, SubMap) -> case maps:find(Topic, SubMap) of {ok, SubOpts} -> - ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]), + ok = emqx_hooks:run('session.subscribed', [Credentials, Topic, SubOpts#{first => false}]), SubMap; {ok, _SubOpts} -> emqx_broker:set_subopts(Topic, SubOpts), %% Why??? - ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]), + ok = emqx_hooks:run('session.subscribed', [Credentials, Topic, SubOpts#{first => false}]), maps:put(Topic, SubOpts, SubMap); error -> - emqx_broker:subscribe(Topic, ClientId, SubOpts), - ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]), + ok = emqx_broker:subscribe(Topic, ClientId, SubOpts), + ok = emqx_hooks:run('session.subscribed', [Credentials, Topic, SubOpts#{first => true}]), maps:put(Topic, SubOpts, SubMap) end. -%% PUBLISH: --spec(publish(emqx_mqtt_types:packet_id(), emqx_types:message(), session()) - -> {ok, emqx_types:deliver_results()} | {error, term()}). -publish(_PacketId, Msg = #message{qos = ?QOS_0}, _Session) -> - %% Publish QoS0 message directly - {ok, emqx_broker:publish(Msg)}; - -publish(_PacketId, Msg = #message{qos = ?QOS_1}, _Session) -> - %% Publish QoS1 message directly - {ok, emqx_broker:publish(Msg)}; - -%% PUBLISH: This is only to register packetId to session state. -%% The actual message dispatching should be done by the caller. -publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}, - Session = #session{awaiting_rel = AwaitingRel, - max_awaiting_rel = MaxAwaitingRel}) -> - %% Register QoS2 message packet ID (and timestamp) to session, then publish - case is_awaiting_full(MaxAwaitingRel, AwaitingRel) of - false -> - case maps:is_key(PacketId, AwaitingRel) of - false -> - NewAwaitingRel = maps:put(PacketId, Ts, AwaitingRel), - NSession = Session#session{awaiting_rel = NewAwaitingRel}, - {ok, emqx_broker:publish(Msg), ensure_await_rel_timer(NSession)}; - true -> - {error, ?RC_PACKET_IDENTIFIER_IN_USE} - end; - true -> - ?LOG(warning, "[Session] Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]), - ok = emqx_metrics:inc('messages.qos2.dropped'), - {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} - end. - -%% PUBACK: --spec(puback(emqx_mqtt_types:packet_id(), session()) -> session()). -puback(PacketId, Session) -> - puback(PacketId, ?RC_SUCCESS, Session). - --spec(puback(emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code(), session()) - -> session()). -puback(PacketId, ReasonCode, Session = #session{inflight = Inflight}) -> - case emqx_inflight:contain(PacketId, Inflight) of - true -> - dequeue(acked(puback, PacketId, Session)); - false -> - ?LOG(warning, "[Session] The PUBACK PacketId ~w is not found", [PacketId]), - ok = emqx_metrics:inc('packets.puback.missed'), - Session - end. - -%% PUBREC: --spec(pubrec(emqx_mqtt_types:packet_id(), session()) - -> ok | {error, emqx_mqtt_types:reason_code()}). -pubrec(PacketId, Session) -> - pubrec(PacketId, ?RC_SUCCESS, Session). - --spec(pubrec(emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code(), session()) - -> {ok, session()} | {error, emqx_mqtt_types:reason_code()}). -pubrec(PacketId, ReasonCode, Session = #session{inflight = Inflight}) -> - case emqx_inflight:contain(PacketId, Inflight) of - true -> - {ok, acked(pubrec, PacketId, Session)}; - false -> - ?LOG(warning, "[Session] The PUBREC PacketId ~w is not found.", [PacketId]), - ok = emqx_metrics:inc('packets.pubrec.missed'), - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} - end. - -%% PUBREL: --spec(pubrel(emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code(), session()) - -> {ok, session()} | {error, emqx_mqtt_types:reason_code()}). -pubrel(PacketId, ReasonCode, Session = #session{awaiting_rel = AwaitingRel}) -> - case maps:take(PacketId, AwaitingRel) of - {_Ts, AwaitingRel1} -> - {ok, Session#session{awaiting_rel = AwaitingRel1}}; - error -> - ?LOG(warning, "[Session] The PUBREL PacketId ~w is not found", [PacketId]), - ok = emqx_metrics:inc('packets.pubrel.missed'), - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} - end. - -%% PUBCOMP: --spec(pubcomp(emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code(), session()) - -> {ok, session()} | {error, emqx_mqtt_types:reason_code()}). -pubcomp(PacketId, ReasonCode, Session = #session{inflight = Inflight}) -> - case emqx_inflight:contain(PacketId, Inflight) of - true -> - {ok, dequeue(acked(pubcomp, PacketId, Session))}; - false -> - ?LOG(warning, "[Session] The PUBCOMP PacketId ~w is not found", [PacketId]), - ok = emqx_metrics:inc('packets.pubcomp.missed'), - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} - end. - -%%------------------------------------------------------------------------------ -%% Awaiting ACK for QoS1/QoS2 Messages -%%------------------------------------------------------------------------------ - -await(PacketId, Msg, Session = #session{inflight = Inflight}) -> - Inflight1 = emqx_inflight:insert( - PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight), - ensure_retry_timer(Session#session{inflight = Inflight1}). - -acked(puback, PacketId, Session = #session{client_id = ClientId, username = Username, inflight = Inflight}) -> - case emqx_inflight:lookup(PacketId, Inflight) of - {value, {publish, {_, Msg}, _Ts}} -> - ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]), - Session#session{inflight = emqx_inflight:delete(PacketId, Inflight)}; - none -> - ?LOG(warning, "[Session] Duplicated PUBACK PacketId ~w", [PacketId]), - Session - end; - -acked(pubrec, PacketId, Session = #session{client_id = ClientId, username = Username, inflight = Inflight}) -> - case emqx_inflight:lookup(PacketId, Inflight) of - {value, {publish, {_, Msg}, _Ts}} -> - ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]), - Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight), - Session#session{inflight = Inflight1}; - {value, {pubrel, PacketId, _Ts}} -> - ?LOG(warning, "[Session] Duplicated PUBREC PacketId ~w", [PacketId]), - Session; - none -> - ?LOG(warning, "[Session] Unexpected PUBREC PacketId ~w", [PacketId]), - Session - end; - -acked(pubcomp, PacketId, Session = #session{inflight = Inflight}) -> - case emqx_inflight:contain(PacketId, Inflight) of - true -> - Session#session{inflight = emqx_inflight:delete(PacketId, Inflight)}; - false -> - ?LOG(warning, "PUBCOMP PacketId ~w is not found", [PacketId]), - Session - end. - -%% UNSUBSCRIBE: --spec(unsubscribe(emqx_mqtt_types:topic_filters(), session()) - -> {ok, list(emqx_mqtt_types:reason_code()), session()}). -unsubscribe(RawTopicFilters, Session = #session{client_id = ClientId, - username = Username, - subscriptions = Subscriptions}) +%% Client -> Broker: UNSUBSCRIBE +-spec(unsubscribe(emqx_types:credentials(), emqx_mqtt:topic_filters(), session()) + -> {ok, list(emqx_mqtt:reason_code()), session()}). +unsubscribe(Credentials, RawTopicFilters, Session = #session{subscriptions = Subscriptions}) when is_list(RawTopicFilters) -> - TopicFilters = lists:map(fun({RawTopic, Opts}) -> - emqx_topic:parse(RawTopic, Opts); - (RawTopic) when is_binary(RawTopic) -> - emqx_topic:parse(RawTopic) - end, RawTopicFilters), + TopicFilters = lists:map(fun({RawTopic, Opts}) -> + emqx_topic:parse(RawTopic, Opts); + (RawTopic) when is_binary(RawTopic) -> + emqx_topic:parse(RawTopic) + end, RawTopicFilters), {ReasonCodes, Subscriptions1} = lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) -> case maps:find(Topic, SubMap) of {ok, SubOpts} -> ok = emqx_broker:unsubscribe(Topic), - ok = emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts]), + ok = emqx_hooks:run('session.unsubscribed', [Credentials, Topic, SubOpts]), {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; error -> {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap} @@ -432,124 +296,225 @@ unsubscribe(RawTopicFilters, Session = #session{client_id = ClientId, end, {[], Subscriptions}, TopicFilters), {ok, ReasonCodes, Session#session{subscriptions = Subscriptions1}}. --spec(resume(map(), session()) -> session()). -resume(#{will_msg := WillMsg, - expiry_interval := ExpiryInterval, - max_inflight := MaxInflight}, - Session = #session{client_id = ClientId, - clean_start = CleanStart, - retry_timer = RetryTimer, - await_rel_timer = AwaitTimer, - expiry_timer = ExpireTimer, - will_delay_timer = WillDelayTimer}) -> - - %% ?LOG(info, "[Session] Resumed by ~p ", [self()]), - - %% Cancel Timers - lists:foreach(fun emqx_misc:cancel_timer/1, - [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]), - - %% case kick(ClientId, OldConnPid, ConnPid) of - %% ok -> ?LOG(warning, "[Session] Connection ~p kickout ~p", [ConnPid, OldConnPid]); - %% ignore -> ok - %% end, - - Inflight = emqx_inflight:update_size(MaxInflight, Session#session.inflight), - - Session1 = Session#session{clean_start = false, - retry_timer = undefined, - awaiting_rel = #{}, - await_rel_timer = undefined, - expiry_timer = undefined, - expiry_interval = ExpiryInterval, - inflight = Inflight, - will_delay_timer = undefined, - will_msg = WillMsg - }, - - %% Clean Session: true -> false??? - CleanStart andalso emqx_cm:set_session_attrs(ClientId, attrs(Session1)), - - %%ok = emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(Session1)]), - - %% Replay delivery and Dequeue pending messages - dequeue(retry_delivery(true, Session1)). - --spec(update_expiry_interval(timeout(), session()) -> session()). -update_expiry_interval(Interval, Session) -> - Session#session{expiry_interval = Interval}. - --spec(close(session()) -> ok). -close(_Session) -> ok. - -%%------------------------------------------------------------------------------ -%% Internal functions -%%------------------------------------------------------------------------------ - - -%%deliver_fun(ConnPid) when node(ConnPid) == node() -> -%% fun(Packet) -> ConnPid ! {deliver, Packet}, ok end; -%%deliver_fun(ConnPid) -> -%% Node = node(ConnPid), -%% fun(Packet) -> -%% true = emqx_rpc:cast(Node, erlang, send, [ConnPid, {deliver, Packet}]), ok -%% end. - -%%------------------------------------------------------------------------------ -%% Replay or Retry Delivery - -%% Redeliver at once if force is true -retry_delivery(Force, Session = #session{inflight = Inflight}) -> - case emqx_inflight:is_empty(Inflight) of - true -> Session; +%% Client -> Broker: QoS2 PUBLISH +-spec(publish(emqx_mqtt:packet_id(), emqx_types:message(), session()) + -> {ok, emqx_types:deliver_results(), session()} | {error, emqx_mqtt:reason_code()}). +publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}, + Session = #session{awaiting_rel = AwaitingRel, + max_awaiting_rel = MaxAwaitingRel}) -> + case is_awaiting_full(MaxAwaitingRel, AwaitingRel) of false -> - SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end, - Msgs = lists:sort(SortFun, emqx_inflight:values(Inflight)), - retry_delivery(Force, Msgs, os:timestamp(), Session) - end. - -retry_delivery(_Force, [], _Now, Session) -> - %% Retry again... - ensure_retry_timer(Session); - -retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, - Session = #session{inflight = Inflight, - retry_interval = Interval}) -> - %% Microseconds -> MilliSeconds - Age = timer:now_diff(Now, Ts) div 1000, - if - Force orelse (Age >= Interval) -> - Inflight1 = case {Type, Msg0} of - {publish, {PacketId, Msg}} -> - case emqx_message:is_expired(Msg) of - true -> - ok = emqx_metrics:inc('messages.expired'), - emqx_inflight:delete(PacketId, Inflight); - false -> - redeliver({PacketId, Msg}, Session), - emqx_inflight:update(PacketId, {publish, {PacketId, Msg}, Now}, Inflight) - end; - {pubrel, PacketId} -> - redeliver({pubrel, PacketId}, Session), - emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight) - end, - retry_delivery(Force, Msgs, Now, Session#session{inflight = Inflight1}); + case maps:is_key(PacketId, AwaitingRel) of + false -> + DeliverResults = emqx_broker:publish(Msg), + AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel), + NSession = Session#session{awaiting_rel = AwaitingRel1}, + {ok, DeliverResults, ensure_await_rel_timer(NSession)}; + true -> + {error, ?RC_PACKET_IDENTIFIER_IN_USE} + end; true -> - ensure_retry_timer(Interval - max(0, Age), Session) + ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]), + ok = emqx_metrics:inc('messages.qos2.dropped'), + {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} + end; + +%% QoS0/1 +publish(_PacketId, Msg, Session) -> + {ok, emqx_broker:publish(Msg)}. + +%% Client -> Broker: PUBACK +-spec(puback(emqx_mqtt:packet_id(), emqx_mqtt:reason_code(), session()) + -> puback_ret()). +puback(PacketId, _ReasonCode, Session = #session{inflight = Inflight, mqueue = Q}) -> + case emqx_inflight:lookup(PacketId, Inflight) of + {value, {publish, {_, Msg}, _Ts}} -> + %% #{client_id => ClientId, username => Username} + %% ok = emqx_hooks:run('message.acked', [], Msg]), + Inflight1 = emqx_inflight:delete(PacketId, Inflight), + Session1 = Session#session{inflight = Inflight1}, + case (emqx_mqueue:is_empty(Q) orelse emqx_mqueue:out(Q)) of + true -> {ok, Session1}; + {{value, Msg}, Q1} -> + {ok, Msg, Session1#session{mqueue = Q1}} + end; + false -> + ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), + ok = emqx_metrics:inc('packets.puback.missed'), + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. -%%------------------------------------------------------------------------------ -%% Send Will Message -%%------------------------------------------------------------------------------ +%% Client -> Broker: PUBREC +-spec(pubrec(emqx_mqtt:packet_id(), emqx_mqtt:reason_code(), session()) + -> {ok, session()} | {error, emqx_mqtt:reason_code()}). +pubrec(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) -> + case emqx_inflight:lookup(PacketId, Inflight) of + {value, {publish, {_, Msg}, _Ts}} -> + %% ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]), + Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight), + {ok, Session#session{inflight = Inflight1}}; + {value, {pubrel, PacketId, _Ts}} -> + ?LOG(warning, "The PUBREC ~w is duplicated", [PacketId]), + {error, ?RC_PACKET_IDENTIFIER_IN_USE}; + none -> + ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]), + ok = emqx_metrics:inc('packets.pubrec.missed'), + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} + end. -send_willmsg(undefined) -> - ignore; -send_willmsg(WillMsg) -> - emqx_broker:publish(WillMsg). +%% Client -> Broker: PUBREL +-spec(pubrel(emqx_mqtt:packet_id(), emqx_mqtt:reason_code(), session()) + -> {ok, session()} | {error, emqx_mqtt:reason_code()}). +pubrel(PacketId, ReasonCode, Session = #session{awaiting_rel = AwaitingRel}) -> + case maps:take(PacketId, AwaitingRel) of + {_Ts, AwaitingRel1} -> + {ok, Session#session{awaiting_rel = AwaitingRel1}}; + error -> + ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]), + ok = emqx_metrics:inc('packets.pubrel.missed'), + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} + end. + +%% Client -> Broker: PUBCOMP +-spec(pubcomp(emqx_mqtt:packet_id(), emqx_mqtt:reason_code(), session()) -> puback_ret()). +pubcomp(PacketId, ReasonCode, Session = #session{inflight = Inflight, mqueue = Q}) -> + case emqx_inflight:contain(PacketId, Inflight) of + true -> + Inflight1 = emqx_inflight:delete(PacketId, Inflight), + Session1 = Session#session{inflight = Inflight1}, + case (emqx_mqueue:is_empty(Q) orelse emqx_mqueue:out(Q)) of + true -> {ok, Session1}; + {{value, Msg}, Q1} -> + {ok, Msg, Session1#session{mqueue = Q1}} + end; + false -> + ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]), + ok = emqx_metrics:inc('packets.pubcomp.missed'), + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} + end. + +%%-------------------------------------------------------------------- +%% Handle delivery +%%-------------------------------------------------------------------- + +deliver(Topic, Msg, Session = #session{subscriptions = SubMap}) -> + SubOpts = get_subopts(Topic, SubMap), + case enrich(SubOpts, Msg, Session) of + {ok, Msg1} -> + deliver(Msg1, Session); + ignore -> ignore + end. + +%% Enqueue message if the client has been disconnected +%% process_msg(Msg, Session = #session{conn_pid = undefined}) -> +%% {ignore, enqueue_msg(Msg, Session)}; + +deliver(Msg = #message{qos = ?QOS_0}, Session) -> + {ok, {publish, undefined, Msg}, Session}; + +deliver(Msg = #message{qos = QoS}, + Session = #session{next_pkt_id = PacketId, inflight = Inflight}) + when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> + case emqx_inflight:is_full(Inflight) of + true -> + {ignore, enqueue(Msg, Session)}; + false -> + Publish = {publish, PacketId, Msg}, + NSession = await(PacketId, Msg, Session), + {ok, Publish, next_pkt_id(NSession)} + end. + +enqueue(Msg, Session = #session{mqueue = Q}) -> + emqx_pd:update_counter(enqueue_stats, 1), + {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), + if + Dropped =/= undefined -> + %% SessProps = #{client_id => ClientId, username => Username}, + ok; %% = emqx_hooks:run('message.dropped', [SessProps, Dropped]); + true -> ok + end, + Session#session{mqueue = NewQ}. %%------------------------------------------------------------------------------ +%% Awaiting ACK for QoS1/QoS2 Messages +%%------------------------------------------------------------------------------ + +await(PacketId, Msg, Session = #session{inflight = Inflight}) -> + Publish = {publish, {PacketId, Msg}, os:timestamp()}, + Inflight1 = emqx_inflight:insert(PacketId, Publish, Inflight), + ensure_retry_timer(Session#session{inflight = Inflight1}). + +get_subopts(Topic, SubMap) -> + case maps:find(Topic, SubMap) of + {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> + [{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}]; + {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> + [{nl, Nl}, {qos, QoS}, {rap, Rap}]; + error -> [] + end. + +enrich([], Msg, _Session) -> + {ok, Msg}; +%%enrich([{nl, 1}|_Opts], #message{from = ClientId}, #session{client_id = ClientId}) -> +%% ignore; +enrich([{nl, _}|Opts], Msg, Session) -> + enrich(Opts, Msg, Session); +enrich([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos= true}) -> + enrich(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session); +enrich([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos= false}) -> + enrich(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session); +enrich([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, Session = #session{}) -> + enrich(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, Session); +enrich([{rap, 0}|Opts], Msg = #message{flags = Flags}, Session) -> + enrich(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session); +enrich([{rap, _}|Opts], Msg, Session) -> + enrich(Opts, Msg, Session); +enrich([{subid, SubId}|Opts], Msg, Session) -> + enrich(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), Session). + +%%-------------------------------------------------------------------- +%% Ensure retry timer +%%-------------------------------------------------------------------- + +ensure_retry_timer(Session = #session{retry_interval = Interval, retry_timer = undefined}) -> + ensure_retry_timer(Interval, Session); +ensure_retry_timer(Session) -> + Session. + +ensure_retry_timer(Interval, Session = #session{retry_timer = undefined}) -> + TRef = emqx_misc:start_timer(Interval, retry_delivery), + Session#session{retry_timer = TRef}; +ensure_retry_timer(_Interval, Session) -> + Session. + +%%-------------------------------------------------------------------- +%% Check awaiting rel +%%-------------------------------------------------------------------- + +is_awaiting_full(_MaxAwaitingRel = 0, _AwaitingRel) -> + false; +is_awaiting_full(MaxAwaitingRel, AwaitingRel) -> + maps:size(AwaitingRel) >= MaxAwaitingRel. + +%%-------------------------------------------------------------------- +%% Ensure await_rel timer +%%-------------------------------------------------------------------- + +ensure_await_rel_timer(Session = #session{await_rel_timeout = Timeout, + await_rel_timer = undefined}) -> + ensure_await_rel_timer(Timeout, Session); +ensure_await_rel_timer(Session) -> + Session. + +ensure_await_rel_timer(Timeout, Session = #session{await_rel_timer = undefined}) -> + TRef = emqx_misc:start_timer(Timeout, check_awaiting_rel), + Session#session{await_rel_timer = TRef}; +ensure_await_rel_timer(_Timeout, Session) -> + Session. + +%%-------------------------------------------------------------------- %% Expire Awaiting Rel -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- expire_awaiting_rel(Session = #session{awaiting_rel = AwaitingRel}) -> case maps:size(AwaitingRel) of @@ -573,237 +538,9 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now, ensure_await_rel_timer(Timeout - max(0, Age), Session) end. -%%------------------------------------------------------------------------------ -%% Check awaiting rel -%%------------------------------------------------------------------------------ - -is_awaiting_full(_MaxAwaitingRel = 0, _AwaitingRel) -> - false; -is_awaiting_full(MaxAwaitingRel, AwaitingRel) -> - maps:size(AwaitingRel) >= MaxAwaitingRel. - -%%------------------------------------------------------------------------------ -%% Dispatch messages -%%------------------------------------------------------------------------------ - -handle(Msgs, Session = #session{inflight = Inflight, - client_id = ClientId, - username = Username, - subscriptions = SubMap}) -> - SessProps = #{client_id => ClientId, username => Username}, - %% Drain the mailbox and batch deliver - Msgs1 = Msgs, %% drain_m(batch_n(Inflight), Msgs), - %% Ack the messages for shared subscription - Msgs2 = maybe_ack_shared(Msgs1, Session), - %% Process suboptions - Msgs3 = lists:foldr( - fun({Topic, Msg}, Acc) -> - SubOpts = find_subopts(Topic, SubMap), - case process_subopts(SubOpts, Msg, Session) of - {ok, Msg1} -> [Msg1|Acc]; - ignore -> - emqx_hooks:run('message.dropped', [SessProps, Msg]), - Acc - end - end, [], Msgs2), - batch_process(Msgs3, Session). - -%% Ack or nack the messages of shared subscription? -maybe_ack_shared(Msgs, Session) when is_list(Msgs) -> - lists:foldr( - fun({Topic, Msg}, Acc) -> - case maybe_ack_shared(Msg, Session) of - ok -> Acc; - Msg1 -> [{Topic, Msg1}|Acc] - end - end, [], Msgs); - -maybe_ack_shared(Msg, Session) -> - case emqx_shared_sub:is_ack_required(Msg) of - true -> do_ack_shared(Msg, Session); - false -> Msg - end. - -do_ack_shared(Msg, Session = #session{inflight = Inflight}) -> - case {true, %% is_connection_alive(Session), - emqx_inflight:is_full(Inflight)} of - {false, _} -> - %% Require ack, but we do not have connection - %% negative ack the message so it can try the next subscriber in the group - emqx_shared_sub:nack_no_connection(Msg); - {_, true} -> - emqx_shared_sub:maybe_nack_dropped(Msg); - _ -> - %% Ack QoS1/QoS2 messages when message is delivered to connection. - %% NOTE: NOT to wait for PUBACK because: - %% The sender is monitoring this session process, - %% if the message is delivered to client but connection or session crashes, - %% sender will try to dispatch the message to the next shared subscriber. - %% This violates spec as QoS2 messages are not allowed to be sent to more - %% than one member in the group. - emqx_shared_sub:maybe_ack(Msg) - end. - -process_subopts([], Msg, _Session) -> - {ok, Msg}; -process_subopts([{nl, 1}|_Opts], #message{from = ClientId}, #session{client_id = ClientId}) -> - ignore; -process_subopts([{nl, _}|Opts], Msg, Session) -> - process_subopts(Opts, Msg, Session); -process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, - Session = #session{upgrade_qos= true}) -> - process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session); -process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, - Session = #session{upgrade_qos= false}) -> - process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session); -process_subopts([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, Session = #session{}) -> - process_subopts(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, Session); -process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags}, Session = #session{}) -> - process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session); -process_subopts([{rap, _}|Opts], Msg, Session) -> - process_subopts(Opts, Msg, Session); -process_subopts([{subid, SubId}|Opts], Msg, Session) -> - process_subopts(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), Session). - -find_subopts(Topic, SubMap) -> - case maps:find(Topic, SubMap) of - {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> - [{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}]; - {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> - [{nl, Nl}, {qos, QoS}, {rap, Rap}]; - error -> [] - end. - -batch_process(Msgs, Session) -> - {ok, Publishes, NSession} = process_msgs(Msgs, [], Session), - ok = batch_deliver(Publishes, NSession), - NSession. - -process_msgs([], Publishes, Session) -> - {ok, lists:reverse(Publishes), Session}; - -process_msgs([Msg|Msgs], Publishes, Session) -> - case process_msg(Msg, Session) of - {ok, Publish, NSession} -> - process_msgs(Msgs, [Publish|Publishes], NSession); - {ignore, NSession} -> - process_msgs(Msgs, Publishes, NSession) - end. - -%% Enqueue message if the client has been disconnected -%% process_msg(Msg, Session = #session{conn_pid = undefined}) -> -%% {ignore, enqueue_msg(Msg, Session)}; - -%% Prepare the qos0 message delivery -process_msg(Msg = #message{qos = ?QOS_0}, Session) -> - {ok, {publish, undefined, Msg}, Session}; - -process_msg(Msg = #message{qos = QoS}, - Session = #session{next_pkt_id = PacketId, inflight = Inflight}) - when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> - case emqx_inflight:is_full(Inflight) of - true -> - {ignore, enqueue_msg(Msg, Session)}; - false -> - Publish = {publish, PacketId, Msg}, - NSession = await(PacketId, Msg, Session), - {ok, Publish, next_pkt_id(NSession)} - end. - -enqueue_msg(Msg, Session = #session{mqueue = Q, client_id = ClientId, username = Username}) -> - emqx_pd:update_counter(enqueue_stats, 1), - {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), - if - Dropped =/= undefined -> - SessProps = #{client_id => ClientId, username => Username}, - ok = emqx_hooks:run('message.dropped', [SessProps, Dropped]); - true -> ok - end, - Session#session{mqueue = NewQ}. - -%%------------------------------------------------------------------------------ -%% Deliver -%%------------------------------------------------------------------------------ - -redeliver({PacketId, Msg = #message{qos = QoS}}, Session) when QoS =/= ?QOS_0 -> - Msg1 = emqx_message:set_flag(dup, Msg), - do_deliver(PacketId, Msg1, Session); - -redeliver({pubrel, PacketId}, #session{deliver_fun = DeliverFun}) -> - DeliverFun({pubrel, PacketId}). - -do_deliver(PacketId, Msg, #session{deliver_fun = DeliverFun}) -> - emqx_pd:update_counter(deliver_stats, 1), - DeliverFun({publish, PacketId, Msg}). - -batch_deliver(Publishes, #session{deliver_fun = DeliverFun}) -> - emqx_pd:update_counter(deliver_stats, length(Publishes)), - DeliverFun(Publishes). - -%%------------------------------------------------------------------------------ -%% Dequeue -%%------------------------------------------------------------------------------ - -dequeue(Session = #session{inflight = Inflight, mqueue = Q}) -> - case emqx_mqueue:is_empty(Q) - orelse emqx_inflight:is_full(Inflight) of - true -> Session; - false -> - %% TODO: - Msgs = [], - Q1 = Q, - %% {Msgs, Q1} = drain_q(batch_n(Inflight), [], Q), - batch_process(lists:reverse(Msgs), Session#session{mqueue = Q1}) - end. - -drain_q(Cnt, Msgs, Q) when Cnt =< 0 -> - {Msgs, Q}; - -drain_q(Cnt, Msgs, Q) -> - case emqx_mqueue:out(Q) of - {empty, _Q} -> {Msgs, Q}; - {{value, Msg}, Q1} -> - drain_q(Cnt-1, [Msg|Msgs], Q1) - end. - -%%------------------------------------------------------------------------------ -%% Ensure timers - -ensure_await_rel_timer(Session = #session{await_rel_timeout = Timeout, - await_rel_timer = undefined}) -> - ensure_await_rel_timer(Timeout, Session); -ensure_await_rel_timer(Session) -> - Session. - -ensure_await_rel_timer(Timeout, Session = #session{await_rel_timer = undefined}) -> - Session#session{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)}; -ensure_await_rel_timer(_Timeout, Session) -> - Session. - -ensure_retry_timer(Session = #session{retry_interval = Interval, retry_timer = undefined}) -> - ensure_retry_timer(Interval, Session); -ensure_retry_timer(Session) -> - Session. - -ensure_retry_timer(Interval, Session = #session{retry_timer = undefined}) -> - Session#session{retry_timer = emqx_misc:start_timer(Interval, retry_delivery)}; -ensure_retry_timer(_Timeout, Session) -> - Session. - -ensure_expire_timer(Session = #session{expiry_interval = Interval}) - when Interval > 0 andalso Interval =/= 16#ffffffff -> - Session#session{expiry_timer = emqx_misc:start_timer(Interval * 1000, expired)}; -ensure_expire_timer(Session) -> - Session. - -ensure_will_delay_timer(Session = #session{will_msg = #message{headers = #{'Will-Delay-Interval' := WillDelayInterval}}}) -> - Session#session{will_delay_timer = emqx_misc:start_timer(WillDelayInterval * 1000, will_delay)}; -ensure_will_delay_timer(Session = #session{will_msg = WillMsg}) -> - send_willmsg(WillMsg), - Session#session{will_msg = undefined}. - %%-------------------------------------------------------------------- %% Next Packet Id +%%-------------------------------------------------------------------- next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) -> Session#session{next_pkt_id = 1}; diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 33924888c..e6c8deefc 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -135,7 +135,7 @@ ack_enabled() -> do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise - _ = erlang:send(SubPid, {dispatch, Topic, Msg}), + _ = erlang:send(SubPid, {deliver, Topic, Msg}), ok; do_dispatch(SubPid, Topic, Msg, Type) -> dispatch_per_qos(SubPid, Topic, Msg, Type). @@ -143,18 +143,18 @@ do_dispatch(SubPid, Topic, Msg, Type) -> %% return either 'ok' (when everything is fine) or 'error' dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch - _ = erlang:send(SubPid, {dispatch, Topic, Msg}), + _ = erlang:send(SubPid, {deliver, Topic, Msg}), ok; dispatch_per_qos(SubPid, Topic, Msg, retry) -> %% Retry implies all subscribers nack:ed, send again without ack - _ = erlang:send(SubPid, {dispatch, Topic, Msg}), + _ = erlang:send(SubPid, {deliver, Topic, Msg}), ok; dispatch_per_qos(SubPid, Topic, Msg, fresh) -> case ack_enabled() of true -> dispatch_with_ack(SubPid, Topic, Msg); false -> - _ = erlang:send(SubPid, {dispatch, Topic, Msg}), + _ = erlang:send(SubPid, {deliver, Topic, Msg}), ok end. @@ -162,7 +162,7 @@ dispatch_with_ack(SubPid, Topic, Msg) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - _ = erlang:send(SubPid, {dispatch, Topic, with_ack_ref(Msg, {Sender, Ref})}), + _ = erlang:send(SubPid, {deliver, Topic, with_ack_ref(Msg, {Sender, Ref})}), Timeout = case Msg#message.qos of ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_2 -> infinity diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_channel.erl similarity index 91% rename from src/emqx_ws_connection.erl rename to src/emqx_ws_channel.erl index fcf145341..e2ce4e238 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_channel.erl @@ -14,6 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- +%% MQTT WebSocket Channel -module(emqx_ws_channel). -include("emqx.hrl"). @@ -170,7 +171,8 @@ websocket_init(#state{request = Req, options = Options}) -> parse_state = ParseState, proto_state = ProtoState, enable_stats = EnableStats, - idle_timeout = IdleTimout}}. + idle_timeout = IdleTimout + }}. send_fun(WsPid) -> fun(Packet, Options) -> @@ -242,10 +244,13 @@ websocket_info({call, From, session}, State = #state{proto_state = ProtoState}) gen_server:reply(From, emqx_protocol:session(ProtoState)), {ok, State}; -websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> - case emqx_protocol:deliver(PubOrAck, ProtoState) of - {ok, ProtoState1} -> - {ok, ensure_stats_timer(State#state{proto_state = ProtoState1})}; +websocket_info(Delivery, State = #state{proto_state = ProtoState}) + when element(1, Delivery) =:= deliver -> + case emqx_protocol:handle_out(Delivery, ProtoState) of + {ok, NProtoState} -> + {ok, State#state{proto_state = NProtoState}}; + {ok, Packet, NProtoState} -> + handle_outgoing(Packet, State#state{proto_state = NProtoState}); {error, Reason} -> shutdown(Reason, State) end; @@ -285,8 +290,8 @@ websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) -> ?LOG(warning, "Clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); -websocket_info({binary, Data}, State) -> - {reply, {binary, Data}, State}; +%% websocket_info({binary, Data}, State) -> +%% {reply, {binary, Data}, State}; websocket_info({shutdown, Reason}, State) -> shutdown(Reason, State); @@ -317,9 +322,12 @@ terminate(SockError, _Req, #state{keepalive = Keepalive, %%-------------------------------------------------------------------- handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> - case emqx_protocol:received(Packet, ProtoState) of + case emqx_protocol:handle_in(Packet, ProtoState) of {ok, NProtoState} -> SuccFun(State#state{proto_state = NProtoState}); + {ok, OutPacket, NProtoState} -> + %% TODO: How to call SuccFun??? + handle_outgoing(OutPacket, State#state{proto_state = NProtoState}); {error, Reason} -> ?LOG(error, "Protocol error: ~p", [Reason]), shutdown(Reason, State); @@ -329,7 +337,12 @@ handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> shutdown(Error, State#state{proto_state = NProtoState}) end. - +handle_outgoing(Packet, State = #state{proto_state = _NProtoState}) -> + Data = emqx_frame:serialize(Packet), %% TODO:, Options), + BinSize = iolist_size(Data), + emqx_pd:update_counter(send_cnt, 1), + emqx_pd:update_counter(send_oct, BinSize), + {reply, {binary, Data}, ensure_stats_timer(State)}. ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined,