diff --git a/TODO b/TODO deleted file mode 100644 index 814b42a3d..000000000 --- a/TODO +++ /dev/null @@ -1,61 +0,0 @@ - -## MQTT 5.0 - -1. Topic Alias -2. Subscriber ID -3. Session ensure stats -4. Message Expiration - -## Connection - -## WebSocket - -## Listeners - -## Protocol - -1. Global ACL cache with limited age and size? -2. Whether to enable ACL for each zone? - -## Session - -## Bridges - -Config -CLI -Remote Bridge -replay queue - -## Access Control - - Global ACL Cache - Add ACL cache emqx_access_control module - -## Zone - -## Hooks - -The hooks design... - -## MQueue - -Bound Queue -LastValue Queue -Priority Queue - -## Supervisor tree - -KernelSup - -## Managment - -## Dashboard - -## Testcases - -1. Update the README.md -2. Update the Documentation -3. Shared subscription and dispatch strategy -4. Remove lager syslog: - dep_lager_syslog = git https://github.com/basho/lager_syslog - diff --git a/etc/emqx.conf b/etc/emqx.conf index cd1b4a8d4..64e116e2d 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -605,12 +605,6 @@ zone.external.max_awaiting_rel = 100 ## Value: Duration zone.external.await_rel_timeout = 60s -## Whether to ignore loop delivery of messages. -## -## Value: true | false -## Default: false -zone.external.ignore_loop_deliver = false - ## Default session expiry interval for MQTT V3.1.1 connections. ## ## Value: Duration diff --git a/priv/emqx.schema b/priv/emqx.schema index bfadd1b66..a3edc70fe 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -768,7 +768,6 @@ end}. %% @doc Ignore loop delivery of messages {mapping, "zone.$name.ignore_loop_deliver", "emqx.zones", [ - {default, false}, {datatype, {enum, [true, false]}} ]}. diff --git a/src/emqx.erl b/src/emqx.erl index de89a9981..60c43768f 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -84,10 +84,8 @@ subscribe(Topic, Subscriber, Options) when is_tuple(Subscriber)-> {SubPid, SubId} = Subscriber, emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId, Options). -%% @doc Publish Message --spec(publish(message()) -> {ok, delivery()} | {error, term()}). -publish(Msg) -> - emqx_broker:publish(Msg). +-spec(publish(message()) -> {ok, emqx_types:dispatches()}). +publish(Msg) -> emqx_broker:publish(Msg). -spec(unsubscribe(topic() | string()) -> ok | {error, term()}). unsubscribe(Topic) -> diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 17d88878d..dd6a4c1ba 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -143,16 +143,18 @@ multi_unsubscribe(Topics, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) - %% Publish %%------------------------------------------------------------------------------ --spec(publish(message()) -> delivery()). +-spec(publish(message()) -> {ok, emqx_types:dispatches()}). publish(Msg) when is_record(Msg, message) -> _ = emqx_tracer:trace(publish, Msg), - case emqx_hooks:run('message.publish', [], Msg) of - {ok, Msg1 = #message{topic = Topic}} -> - route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)); - {stop, Msg1} -> - emqx_logger:warning("Stop publishing: ~p", [Msg]), delivery(Msg1) - end. + {ok, case emqx_hooks:run('message.publish', [], Msg) of + {ok, Msg1 = #message{topic = Topic}} -> + Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)), + Delivery#delivery.flows; + {stop, _} -> + emqx_logger:warning("Stop publishing: ~p", [Msg]), [] + end}. +-spec(safe_publish(message()) -> ok). %% Called internally safe_publish(Msg) when is_record(Msg, message) -> try @@ -172,8 +174,8 @@ delivery(Msg) -> %%------------------------------------------------------------------------------ route([], Delivery = #delivery{message = Msg}) -> - emqx_hooks:run('message.dropped', [undefined, Msg]), - dropped(Msg#message.topic), Delivery; + emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), + inc_dropped_cnt(Msg#message.topic), Delivery; route([{To, Node}], Delivery) when Node =:= node() -> dispatch(To, Delivery); @@ -215,8 +217,8 @@ forward(Node, To, Delivery) -> dispatch(Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> case subscribers(Topic) of [] -> - emqx_hooks:run('message.dropped', [undefined, Msg]), - dropped(Topic), Delivery; + emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), + inc_dropped_cnt(Topic), Delivery; [Sub] -> %% optimize? dispatch(Sub, Topic, Msg), Delivery#delivery{flows = [{dispatch, Topic, 1}|Flows]}; @@ -232,9 +234,9 @@ dispatch({SubPid, _SubId}, Topic, Msg) when is_pid(SubPid) -> dispatch({share, _Group, _Sub}, _Topic, _Msg) -> ignored. -dropped(<<"$SYS/", _/binary>>) -> +inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; -dropped(_Topic) -> +inc_dropped_cnt(_Topic) -> emqx_metrics:inc('messages/dropped'). -spec(subscribers(topic()) -> [subscriber()]). diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 59a195e33..c1cc34d59 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -331,7 +331,7 @@ parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> {Value + Len * Multiplier, Rest}. parse_topic_filters(subscribe, Bin) -> - [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS, rc => 0, subid => 0}} + [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS, rc => 0}} || <> <= Bin]; parse_topic_filters(unsubscribe, Bin) -> diff --git a/src/emqx_message.erl b/src/emqx_message.erl index da762703e..86f6a825f 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -22,6 +22,7 @@ -export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]). -export([set_headers/2]). -export([get_header/2, get_header/3, set_header/3]). +-export([format/1]). -spec(make(topic(), payload()) -> message()). make(Topic, Payload) -> @@ -55,10 +56,14 @@ get_flag(Flag, #message{flags = Flags}, Default) -> maps:get(Flag, Flags, Default). -spec(set_flag(message_flag(), message()) -> message()). +set_flag(Flag, Msg = #message{flags = undefined}) when is_atom(Flag) -> + Msg#message{flags = #{Flag => true}}; set_flag(Flag, Msg = #message{flags = Flags}) when is_atom(Flag) -> Msg#message{flags = maps:put(Flag, true, Flags)}. -spec(set_flag(message_flag(), boolean() | integer(), message()) -> message()). +set_flag(Flag, Val, Msg = #message{flags = undefined}) when is_atom(Flag) -> + Msg#message{flags = #{Flag => Val}}; set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) -> Msg#message{flags = maps:put(Flag, Val, Flags)}. @@ -83,3 +88,14 @@ set_header(Hdr, Val, Msg = #message{headers = undefined}) -> set_header(Hdr, Val, Msg = #message{headers = Headers}) -> Msg#message{headers = maps:put(Hdr, Val, Headers)}. +format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) -> + io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~s, Flags=~s, Headers=~s)", + [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]). + +format(_, undefined) -> + ""; +format(flags, Flags) -> + io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); +format(headers, Headers) -> + io_lib:format("~p", [Headers]). + diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index ef70dc28d..812c3267d 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -19,50 +19,48 @@ -include("emqx.hrl"). -export([load/1, unload/1]). --export([on_client_connected/3, on_client_disconnected/3]). +-export([on_client_connected/4, on_client_disconnected/3]). load(Env) -> - emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]), - emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]). + emqx_hooks:add('client.connected', fun ?MODULE:on_client_connected/4, [Env]), + emqx_hooks:add('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]). -on_client_connected(ConnAck, Client = #client{id = ClientId, - username = Username, - peername = {IpAddr, _} - %%clean_sess = CleanSess, - %%proto_ver = ProtoVer - }, Env) -> +on_client_connected(#{client_id := ClientId, + username := Username, + peername := {IpAddr, _}}, ConnAck, ConnInfo, Env) -> case emqx_json:safe_encode([{clientid, ClientId}, {username, Username}, {ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))}, - %%{clean_sess, CleanSess}, %%TODO:: fixme later - %%{protocol, ProtoVer}, + {clean_start, proplists:get_value(clean_start, ConnInfo)}, + {proto_ver, proplists:get_value(proto_ver, ConnInfo)}, + {proto_name, proplists:get_value(proto_name, ConnInfo)}, + {keepalive, proplists:get_value(keepalive, ConnInfo)}, {connack, ConnAck}, - {ts, emqx_time:now_secs()}]) of + {ts, os:system_time(second)}]) of {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> emqx_logger:error("[Presence Module] Json error: ~p", [Reason]) - end, - {ok, Client}. + end. -on_client_disconnected(Reason, #client{id = ClientId, username = Username}, Env) -> +on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, Env) -> case emqx_json:safe_encode([{clientid, ClientId}, {username, Username}, {reason, reason(Reason)}, - {ts, emqx_time:now_secs()}]) of + {ts, os:system_time(second)}]) of {ok, Payload} -> emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload)); {error, Reason} -> emqx_logger:error("[Presence Module] Json error: ~p", [Reason]) - end, ok. + end. unload(_Env) -> - emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3), - emqx:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3). + emqx_hooks:delete('client.connected', fun ?MODULE:on_client_connected/4), + emqx_hooks:delete('client.disconnected', fun ?MODULE:on_client_disconnected/3). message(QoS, Topic, Payload) -> Msg = emqx_message:make(?MODULE, QoS, Topic, iolist_to_binary(Payload)), - emqx_message:set_flags(#{sys => true}, Msg). + emqx_message:set_flag(sys, Msg). topic(connected, ClientId) -> emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/connected"])); diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 8ddd07f6c..2a92793eb 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -15,47 +15,31 @@ -module(emqx_mod_rewrite). -include_lib("emqx.hrl"). - -include_lib("emqx_mqtt.hrl"). -export([load/1, unload/1]). --export([rewrite_subscribe/4, rewrite_unsubscribe/4, rewrite_publish/2]). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- +-export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]). load(Rules0) -> Rules = compile(Rules0), - emqx:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Rules]), - emqx:hook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]), - emqx:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). + emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Rules]), + emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]), + emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). -rewrite_subscribe(_ClientId, _Username, TopicTable, Rules) -> - emqx_logger:info("Rewrite subscribe: ~p", [TopicTable]), +rewrite_subscribe(_Credentials, TopicTable, Rules) -> {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}. -rewrite_unsubscribe(_ClientId, _Username, TopicTable, Rules) -> - emqx_logger:info("Rewrite unsubscribe: ~p", [TopicTable]), +rewrite_unsubscribe(_Credentials, TopicTable, Rules) -> {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}. rewrite_publish(Message = #message{topic = Topic}, Rules) -> - %%TODO: this will not work if the client is always online. - RewriteTopic = - case get({rewrite, Topic}) of - undefined -> - DestTopic = match_rule(Topic, Rules), - put({rewrite, Topic}, DestTopic), DestTopic; - DestTopic -> - DestTopic - end, - {ok, Message#message{topic = RewriteTopic}}. + {ok, Message#message{topic = match_rule(Topic, Rules)}}. unload(_) -> - emqx:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/4), - emqx:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4), - emqx:unhook('message.publish', fun ?MODULE:rewrite_publish/2). + emqx_hooks:delete('client.subscribe', fun ?MODULE:rewrite_subscribe/3), + emqx_hooks:delete('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3), + emqx_hooks:delete('message.publish', fun ?MODULE:rewrite_publish/2). %%-------------------------------------------------------------------- %% Internal functions @@ -79,8 +63,7 @@ match_regx(Topic, MP, Dest) -> fun({Var, Val}, Acc) -> re:replace(Acc, Var, Val, [global]) end, Dest, Vars)); - nomatch -> - Topic + nomatch -> Topic end. compile(Rules) -> diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index 978b46a3b..b0da175c6 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -17,32 +17,26 @@ -behaviour(emqx_gen_mod). -include_lib("emqx.hrl"). - -include_lib("emqx_mqtt.hrl"). --export([load/1, on_client_connected/3, unload/1]). - --define(TAB, ?MODULE). +-export([load/1, on_session_created/3, unload/1]). %%-------------------------------------------------------------------- %% Load/Unload Hook %%-------------------------------------------------------------------- load(Topics) -> - emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Topics]). + emqx_hooks:add('session.created', fun ?MODULE:on_session_created/3, [Topics]). -on_client_connected(RC, Client = #client{id = ClientId, pid = ClientPid, username = Username}, Topics) - when RC < 16#80 -> - Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end, - TopicTable = [{Replace(Topic), QoS} || {Topic, QoS} <- Topics], - ClientPid ! {subscribe, TopicTable}, - {ok, Client}; - -on_client_connected(_ConnAck, _Client, _State) -> - ok. +on_session_created(#{client_id := ClientId}, SessInfo, Topics) -> + Username = proplists:get_value(username, SessInfo), + Replace = fun(Topic) -> + rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) + end, + emqx_session:subscribe(self(), [{Replace(Topic), #{qos => QoS}} || {Topic, QoS} <- Topics]). unload(_) -> - emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3). + emqx_hooks:delete('session.created', fun ?MODULE:on_session_created/3). %%-------------------------------------------------------------------- %% Internal functions diff --git a/src/emqx_mountpoint.erl b/src/emqx_mountpoint.erl new file mode 100644 index 000000000..f2046d5ee --- /dev/null +++ b/src/emqx_mountpoint.erl @@ -0,0 +1,52 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_mountpoint). + +-include("emqx.hrl"). + +-export([mount/2, unmount/2]). +-export([replvar/2]). + +-type(mountpoint() :: binary()). +-export_type([mountpoint/0]). + +mount(undefined, Any) -> + Any; +mount(MountPoint, Msg = #message{topic = Topic}) -> + Msg#message{topic = <>}; + +mount(MountPoint, TopicFilters) when is_list(TopicFilters) -> + [{<>, SubOpts} || {Topic, SubOpts} <- TopicFilters]. + +unmount(undefined, Msg) -> + Msg; +unmount(MountPoint, Msg = #message{topic = Topic}) -> + case catch split_binary(Topic, byte_size(MountPoint)) of + {MountPoint, Topic1} -> Msg#message{topic = Topic1}; + _Other -> Msg + end. + +replvar(undefined, _Vars) -> + undefined; +replvar(MountPoint, #{client_id := ClientId, username := Username}) -> + lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]). + +feed_var({<<"%c">>, ClientId}, MountPoint) -> + emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint); +feed_var({<<"%u">>, undefined}, MountPoint) -> + MountPoint; +feed_var({<<"%u">>, Username}, MountPoint) -> + emqx_topic:feed_var(<<"%u">>, Username, MountPoint). + diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index c8a751526..c6ab19c3c 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -22,6 +22,7 @@ -export([validate/1]). -export([format/1]). -export([to_message/2, from_message/2]). +-export([will_msg/1]). %% @doc Protocol name of version -spec(protocol_name(mqtt_version()) -> binary()). @@ -37,30 +38,40 @@ protocol_name(?MQTT_PROTO_V5) -> type_name(Type) when Type > ?RESERVED andalso Type =< ?AUTH -> lists:nth(Type, ?TYPE_NAMES). +%%------------------------------------------------------------------------------ +%% Validate MQTT Packet +%%------------------------------------------------------------------------------ + validate(?SUBSCRIBE_PACKET(_PacketId, _Properties, [])) -> - error(packet_empty_topic_filters); + error(topic_filters_invalid); validate(?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters)) -> validate_packet_id(PacketId) andalso validate_properties(?SUBSCRIBE, Properties) andalso ok == lists:foreach(fun validate_subscription/1, TopicFilters); validate(?UNSUBSCRIBE_PACKET(_PacketId, [])) -> - error(packet_empty_topic_filters); + error(topic_filters_invalid); validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) -> validate_packet_id(PacketId) andalso ok == lists:foreach(fun emqx_topic:validate/1, TopicFilters); +validate(?PUBLISH_PACKET(_QoS, <<>>, _, _)) -> + error(topic_name_invalid); +validate(?PUBLISH_PACKET(_QoS, Topic, _, _)) -> + (not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid); + validate(_Packet) -> true. validate_packet_id(0) -> - error(bad_packet_id); + error(packet_id_invalid); validate_packet_id(_) -> true. -validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := 0}) -> - error(bad_subscription_identifier); -validate_properties(?SUBSCRIBE, _) -> +validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I}) + when I =< 0; I >= 16#FFFFFFF -> + error(subscription_identifier_invalid); +validate_properties(_, _) -> true. validate_subscription({Topic, #{qos := QoS}}) -> @@ -75,40 +86,48 @@ validate_qos(_) -> error(bad_qos). from_message(PacketId, Msg = #message{qos = QoS, topic = Topic, payload = Payload}) -> Dup = emqx_message:get_flag(dup, Msg, false), Retain = emqx_message:get_flag(retain, Msg, false), + Publish = #mqtt_packet_publish{topic_name = Topic, + packet_id = PacketId, + %% TODO: Properties + properties = #{}}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + dup = Dup, qos = QoS, - retain = Retain, - dup = Dup}, - variable = #mqtt_packet_publish{topic_name = Topic, - packet_id = PacketId, - properties = #{}}, %%TODO: - payload = Payload}. + retain = Retain}, + variable = Publish, payload = Payload}. %% @doc Message from Packet --spec(to_message(client_id(), mqtt_packet()) -> message()). -to_message(ClientId, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - retain = Retain, - qos = QoS, - dup = Dup}, - variable = #mqtt_packet_publish{topic_name = Topic, - properties = Props}, - payload = Payload}) -> +-spec(to_message(emqx_types:credentials(), mqtt_packet()) -> message()). +to_message(#{client_id := ClientId, username := Username}, + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + retain = Retain, + qos = QoS, + dup = Dup}, + variable = #mqtt_packet_publish{topic_name = Topic, + properties = Props}, + payload = Payload}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), Msg#message{flags = #{dup => Dup, retain => Retain}, - headers = if - Props =:= undefined -> #{}; - true -> Props - end}; + headers = merge_props(#{username => Username}, Props)}. -to_message(_ClientId, #mqtt_packet_connect{will_flag = false}) -> +-spec(will_msg(#mqtt_packet_connect{}) -> message()). +will_msg(#mqtt_packet_connect{will_flag = false}) -> undefined; -to_message(ClientId, #mqtt_packet_connect{will_retain = Retain, - will_qos = QoS, - will_topic = Topic, - will_props = Props, - will_payload = Payload}) -> +will_msg(#mqtt_packet_connect{client_id = ClientId, + username = Username, + will_retain = Retain, + will_qos = QoS, + will_topic = Topic, + will_props = Properties, + will_payload = Payload}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), - Msg#message{flags = #{qos => QoS, retain => Retain}, headers = Props}. + Msg#message{flags = #{dup => false, retain => Retain}, + headers = merge_props(#{username => Username}, Properties)}. + +merge_props(Headers, undefined) -> + Headers; +merge_props(Headers, Props) -> + maps:merge(Headers, Props). %% @doc Format packet -spec(format(mqtt_packet()) -> iolist()). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 9c28d1fcb..1ee80fbbf 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -18,11 +18,14 @@ -include("emqx_mqtt.hrl"). -export([init/2, info/1, caps/1, stats/1]). +-export([client_id/1]). -export([credentials/1]). --export([client/1, client_id/1]). --export([session/1]). -export([parser/1]). --export([received/2, process/2, deliver/2, send/2]). +-export([session/1]). +-export([received/2]). +-export([process_packet/2]). +-export([deliver/2]). +-export([send/2]). -export([shutdown/2]). -record(pstate, { @@ -34,12 +37,13 @@ proto_name, ackprops, client_id, - client_pid, + conn_pid, conn_props, ack_props, username, session, clean_start, + topic_aliases, packet_size, will_msg, keepalive, @@ -54,9 +58,12 @@ }). -type(state() :: #pstate{}). - -export_type([state/0]). +-ifdef(TEST). +-compile(export_all). +-endif. + -define(LOG(Level, Format, Args, PState), emqx_logger:Level([{client, PState#pstate.client_id}], "Client(~s@~s): " ++ Format, [PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])). @@ -75,10 +82,11 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) proto_ver = ?MQTT_PROTO_V4, proto_name = <<"MQTT">>, client_id = <<>>, - client_pid = self(), + conn_pid = self(), username = init_username(Peercert, Options), is_super = false, clean_start = false, + topic_aliases = #{}, packet_size = emqx_zone:get_env(Zone, max_packet_size), mountpoint = emqx_zone:get_env(Zone, mountpoint), is_bridge = false, @@ -104,13 +112,13 @@ set_username(_Username, PState) -> %%------------------------------------------------------------------------------ info(#pstate{zone = Zone, + client_id = ClientId, + username = Username, peername = Peername, proto_ver = ProtoVer, proto_name = ProtoName, - conn_props = ConnProps, - client_id = ClientId, - username = Username, clean_start = CleanStart, + conn_props = ConnProps, keepalive = Keepalive, mountpoint = Mountpoint, is_super = IsSuper, @@ -118,12 +126,12 @@ info(#pstate{zone = Zone, connected = Connected, connected_at = ConnectedAt}) -> [{zone, Zone}, + {client_id, ClientId}, + {username, Username}, {peername, Peername}, {proto_ver, ProtoVer}, {proto_name, ProtoName}, {conn_props, ConnProps}, - {client_id, ClientId}, - {username, Username}, {clean_start, CleanStart}, {keepalive, Keepalive}, {mountpoint, Mountpoint}, @@ -135,6 +143,9 @@ info(#pstate{zone = Zone, caps(#pstate{zone = Zone}) -> emqx_mqtt_caps:get_caps(Zone). +client_id(#pstate{client_id = ClientId}) -> + ClientId. + credentials(#pstate{zone = Zone, client_id = ClientId, username = Username, @@ -144,20 +155,6 @@ credentials(#pstate{zone = Zone, username => Username, peername => Peername}. -client(#pstate{zone = Zone, - client_id = ClientId, - client_pid = ClientPid, - peername = Peername, - username = Username}) -> - #client{id = ClientId, - pid = ClientPid, - zone = Zone, - peername = Peername, - username = Username}. - -client_id(#pstate{client_id = ClientId}) -> - ClientId. - stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg}, send_stats = #{pkt := SendPkt, msg := SendMsg}}) -> [{recv_pkt, RecvPkt}, @@ -177,42 +174,77 @@ parser(#pstate{packet_size = Size, proto_ver = Ver}) -> -spec(received(mqtt_packet(), state()) -> {ok, state()} | {error, term()} | {error, term(), state()}). -received(?PACKET(Type), PState = #pstate{connected = false}) - when Type =/= ?CONNECT -> +received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT -> {error, proto_not_connected, PState}; received(?PACKET(?CONNECT), PState = #pstate{connected = true}) -> - {error, proto_bad_connect, PState}; + {error, proto_unexpected_connect, PState}; received(Packet = ?PACKET(Type), PState) -> trace(recv, Packet, PState), case catch emqx_packet:validate(Packet) of true -> - process(Packet, inc_stats(recv, Type, PState)); - {'EXIT', {ReasonCode, _Stacktrace}} when is_integer(ReasonCode) -> - deliver({disconnect, ReasonCode}, PState), - {error, protocol_error, PState}; + {Packet1, PState1} = preprocess_properties(Packet, PState), + process_packet(Packet1, inc_stats(recv, Type, PState1)); {'EXIT', {Reason, _Stacktrace}} -> deliver({disconnect, ?RC_MALFORMED_PACKET}, PState), {error, Reason, PState} end. %%------------------------------------------------------------------------------ -%% Process Packet +%% Preprocess MQTT Properties %%------------------------------------------------------------------------------ -process(?CONNECT_PACKET( - #mqtt_packet_connect{proto_name = ProtoName, - proto_ver = ProtoVer, - is_bridge = IsBridge, - clean_start = CleanStart, - keepalive = Keepalive, - properties = ConnProps, - client_id = ClientId, - username = Username, - password = Password} = Connect), PState) -> +%% Subscription Identifier +preprocess_properties(Packet = #mqtt_packet{ + variable = Subscribe = #mqtt_packet_subscribe{ + properties = #{'Subscription-Identifier' := SubId}, + topic_filters = TopicFilters + } + }, + PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) -> + TopicFilters1 = [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters], + {Packet#mqtt_packet{variable = Subscribe#mqtt_packet_subscribe{topic_filters = TopicFilters1}}, PState}; - io:format("~p~n", [Connect]), +%% Topic Alias Mapping +preprocess_properties(Packet = #mqtt_packet{ + variable = Publish = #mqtt_packet_publish{ + topic_name = <<>>, + properties = #{'Topic-Alias' := AliasId}} + }, + PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) -> + {Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{ + topic_name = maps:get(AliasId, Aliases, <<>>)}}, PState}; + +preprocess_properties(Packet = #mqtt_packet{ + variable = #mqtt_packet_publish{ + topic_name = Topic, + properties = #{'Topic-Alias' := AliasId}} + }, + PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) -> + {Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}}; + +preprocess_properties(Packet, PState) -> + {Packet, PState}. + +%%------------------------------------------------------------------------------ +%% Process MQTT Packet +%%------------------------------------------------------------------------------ + +process_packet(?CONNECT_PACKET( + #mqtt_packet_connect{proto_name = ProtoName, + proto_ver = ProtoVer, + is_bridge = IsBridge, + clean_start = CleanStart, + keepalive = Keepalive, + properties = ConnProps, + client_id = ClientId, + username = Username, + password = Password} = Connect), PState) -> + + %% TODO: Mountpoint... + %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) + WillMsg = emqx_packet:will_msg(Connect), PState1 = set_username(Username, PState#pstate{client_id = ClientId, @@ -221,7 +253,7 @@ process(?CONNECT_PACKET( clean_start = CleanStart, keepalive = Keepalive, conn_props = ConnProps, - will_msg = willmsg(Connect, PState), + will_msg = WillMsg, is_bridge = IsBridge, connected = true, connected_at = os:timestamp()}), @@ -240,10 +272,8 @@ process(?CONNECT_PACKET( ok = emqx_cm:register_connection(client_id(PState4), info(PState4)), %% Start keepalive start_keepalive(Keepalive, PState4), - %% TODO: 'Run hooks' before open_session? - emqx_hooks:run('client.connected', [?RC_SUCCESS], client(PState4)), %% Success - {?RC_SUCCESS, SP, replvar(PState4)}; + {?RC_SUCCESS, SP, PState4}; {error, Error} -> ?LOG(error, "Failed to open session: ~p", [Error], PState1), {?RC_UNSPECIFIED_ERROR, PState1} @@ -256,7 +286,7 @@ process(?CONNECT_PACKET( {ReasonCode, PState1} end); -process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) -> +process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) -> case check_publish(Packet, PState) of {ok, PState1} -> do_publish(Packet, PState1); @@ -265,7 +295,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) -> {ok, PState} end; -process(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) -> +process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) -> case check_publish(Packet, PState) of {ok, PState1} -> do_publish(Packet, PState1); @@ -273,7 +303,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) -> deliver({puback, PacketId, ReasonCode}, PState) end; -process(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) -> +process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) -> case check_publish(Packet, PState) of {ok, PState1} -> do_publish(Packet, PState1); @@ -281,30 +311,37 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) -> deliver({pubrec, PacketId, ReasonCode}, PState) end; -process(?PUBACK_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> - ok = emqx_session:puback(SPid, PacketId, ReasonCode), - {ok, PState}; +process_packet(?PUBACK_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> + {ok = emqx_session:puback(SPid, PacketId, ReasonCode), PState}; -process(?PUBREC_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> - ok = emqx_session:pubrec(SPid, PacketId, ReasonCode), - send(?PUBREL_PACKET(PacketId), PState); +process_packet(?PUBREC_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> + case emqx_session:pubrec(SPid, PacketId, ReasonCode) of + ok -> + send(?PUBREL_PACKET(PacketId), PState); + {error, NotFound} -> + send(?PUBREL_PACKET(PacketId, NotFound), PState) + end; -process(?PUBREL_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> - ok = emqx_session:pubrel(SPid, PacketId, ReasonCode), - send(?PUBCOMP_PACKET(PacketId), PState); +process_packet(?PUBREL_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> + case emqx_session:pubrel(SPid, PacketId, ReasonCode) of + ok -> + send(?PUBCOMP_PACKET(PacketId), PState); + {error, NotFound} -> + send(?PUBCOMP_PACKET(PacketId, NotFound), PState) + end; -process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> - ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), - {ok, PState}; +process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> + {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState}; -process(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{client_id = ClientId, session = SPid}) -> +process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), + PState = #pstate{session = SPid, mountpoint = Mountpoint}) -> case check_subscribe( parse_topic_filters(?SUBSCRIBE, RawTopicFilters), PState) of {ok, TopicFilters} -> - case emqx_hooks:run('client.subscribe', [ClientId], TopicFilters) of + case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of {ok, TopicFilters1} -> - ok = emqx_session:subscribe(SPid, PacketId, Properties, mount(TopicFilters1, PState)), + ok = emqx_session:subscribe(SPid, PacketId, Properties, + emqx_mountpoint:mount(Mountpoint, TopicFilters1)), {ok, PState}; {stop, _} -> ReasonCodes = lists:duplicate(length(TopicFilters), @@ -320,12 +357,13 @@ process(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), deliver({suback, PacketId, ReasonCodes}, PState) end; -process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{client_id = ClientId, session = SPid}) -> - case emqx_hooks:run('client.unsubscribe', [ClientId], +process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), + PState = #pstate{session = SPid, mountpoint = MountPoint}) -> + case emqx_hooks:run('client.unsubscribe', [credentials(PState)], parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)) of {ok, TopicFilters} -> - ok = emqx_session:unsubscribe(SPid, PacketId, Properties, mount(TopicFilters, PState)), + ok = emqx_session:unsubscribe(SPid, PacketId, Properties, + emqx_mountpoint:mount(MountPoint, TopicFilters)), {ok, PState}; {stop, _Acc} -> ReasonCodes = lists:duplicate(length(RawTopicFilters), @@ -333,21 +371,23 @@ process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), deliver({unsuback, PacketId, ReasonCodes}, PState) end; -process(?PACKET(?PINGREQ), PState) -> +process_packet(?PACKET(?PINGREQ), PState) -> send(?PACKET(?PINGRESP), PState); -process(?PACKET(?DISCONNECT), PState) -> +process_packet(?PACKET(?DISCONNECT), PState) -> %% Clean willmsg {stop, normal, PState#pstate{will_msg = undefined}}. %%------------------------------------------------------------------------------ -%% ConnAck -> Client +%% ConnAck --> Client %%------------------------------------------------------------------------------ connack({?RC_SUCCESS, SP, PState}) -> - deliver({connack, ?RC_SUCCESS, sp(SP)}, PState); + emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]), + deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState)); connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> + emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]), _ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 -> ReasonCode; true -> @@ -360,20 +400,28 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> %%------------------------------------------------------------------------------ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId), - PState = #pstate{client_id = ClientId, session = SPid}) -> - Msg = mount(emqx_packet:to_message(ClientId, Packet), PState), - _ = emqx_session:publish(SPid, PacketId, Msg), - puback(QoS, PacketId, PState). + PState = #pstate{session = SPid, mountpoint = MountPoint}) -> + Msg = emqx_mountpoint:mount(MountPoint, + emqx_packet:to_message(credentials(PState), Packet)), + puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, Msg), PState). %%------------------------------------------------------------------------------ %% Puback -> Client %%------------------------------------------------------------------------------ -puback(?QOS_0, _PacketId, PState) -> +puback(?QOS_0, _PacketId, _Result, PState) -> {ok, PState}; -puback(?QOS_1, PacketId, PState) -> +puback(?QOS_1, PacketId, {error, ReasonCode}, PState) -> + deliver({puback, PacketId, ReasonCode}, PState); +puback(?QOS_1, PacketId, {ok, []}, PState) -> + deliver({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); +puback(?QOS_1, PacketId, {ok, _}, PState) -> deliver({puback, PacketId, ?RC_SUCCESS}, PState); -puback(?QOS_2, PacketId, PState) -> +puback(?QOS_2, PacketId, {error, ReasonCode}, PState) -> + deliver({pubrec, PacketId, ReasonCode}, PState); +puback(?QOS_2, PacketId, {ok, []}, PState) -> + deliver({pubrec, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); +puback(?QOS_2, PacketId, {ok, _}, PState) -> deliver({pubrec, PacketId, ?RC_SUCCESS}, PState). %%------------------------------------------------------------------------------ @@ -386,10 +434,9 @@ deliver({connack, ReasonCode}, PState) -> deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); -deliver({publish, PacketId, Msg}, PState = #pstate{client_id = ClientId, - is_bridge = IsBridge}) -> - _ = emqx_hooks:run('message.delivered', [ClientId], Msg), - Msg1 = unmount(clean_retain(IsBridge, Msg), PState), +deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge = IsBridge, mountpoint = MountPoint}) -> + _ = emqx_hooks:run('message.delivered', credentials(PState), Msg), + Msg1 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg)), send(emqx_packet:from_message(PacketId, Msg1), PState); deliver({puback, PacketId, ReasonCode}, PState) -> @@ -448,13 +495,13 @@ maybe_assign_client_id(PState) -> try_open_session(#pstate{zone = Zone, client_id = ClientId, - client_pid = ClientPid, + conn_pid = ConnPid, conn_props = ConnProps, username = Username, clean_start = CleanStart}) -> case emqx_sm:open_session(#{zone => Zone, client_id => ClientId, - client_pid => ClientPid, + conn_pid => ConnPid, username => Username, clean_start => CleanStart, conn_props => ConnProps}) of @@ -595,16 +642,9 @@ shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) -> true -> ok; false -> send_willmsg(WillMsg) end, - emqx_hooks:run('client.disconnected', [Error], client(PState)), + emqx_hooks:run('client.disconnected', [credentials(PState), Error]), emqx_cm:unregister_connection(ClientId). -willmsg(Packet, PState = #pstate{client_id = ClientId}) - when is_record(Packet, mqtt_packet_connect) -> - case emqx_packet:to_message(ClientId, Packet) of - undefined -> undefined; - Msg -> mount(Msg, PState) - end. - send_willmsg(undefined) -> ignore; send_willmsg(WillMsg) -> @@ -620,14 +660,11 @@ start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 -> %% Parse topic filters %%----------------------------------------------------------------------------- -parse_topic_filters(?SUBSCRIBE, TopicFilters) -> - [begin - {Topic, TOpts} = emqx_topic:parse(RawTopic), - {Topic, maps:merge(SubOpts, TOpts)} - end || {RawTopic, SubOpts} <- TopicFilters]; +parse_topic_filters(?SUBSCRIBE, RawTopicFilters) -> + [emqx_topic:parse(RawTopic, SubOpts) || {RawTopic, SubOpts} <- RawTopicFilters]; -parse_topic_filters(?UNSUBSCRIBE, TopicFilters) -> - lists:map(fun emqx_topic:parse/1, TopicFilters). +parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) -> + lists:map(fun emqx_topic:parse/1, RawTopicFilters). %%----------------------------------------------------------------------------- %% The retained flag should be propagated for bridge. @@ -641,37 +678,14 @@ clean_retain(false, Msg = #message{flags = #{retain := true}, headers = Headers} clean_retain(_IsBridge, Msg) -> Msg. -%%----------------------------------------------------------------------------- -%% Mount Point -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ +%% Update mountpoint -mount(Any, #pstate{mountpoint = undefined}) -> - Any; -mount(Msg = #message{topic = Topic}, #pstate{mountpoint = MountPoint}) -> - Msg#message{topic = <>}; -mount(TopicFilters, #pstate{mountpoint = MountPoint}) when is_list(TopicFilters) -> - [{<>, SubOpts} || {Topic, SubOpts} <- TopicFilters]. - -unmount(Any, #pstate{mountpoint = undefined}) -> - Any; -unmount(Msg = #message{topic = Topic}, #pstate{mountpoint = MountPoint}) -> - case catch split_binary(Topic, byte_size(MountPoint)) of - {MountPoint, Topic1} -> Msg#message{topic = Topic1}; - _Other -> Msg - end. - -replvar(PState = #pstate{mountpoint = undefined}) -> +update_mountpoint(PState = #pstate{mountpoint = undefined}) -> PState; -replvar(PState = #pstate{client_id = ClientId, username = Username, mountpoint = MountPoint}) -> - Vars = [{<<"%c">>, ClientId}, {<<"%u">>, Username}], - PState#pstate{mountpoint = lists:foldl(fun feed_var/2, MountPoint, Vars)}. - -feed_var({<<"%c">>, ClientId}, MountPoint) -> - emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint); -feed_var({<<"%u">>, undefined}, MountPoint) -> - MountPoint; -feed_var({<<"%u">>, Username}, MountPoint) -> - emqx_topic:feed_var(<<"%u">>, Username, MountPoint). +update_mountpoint(PState = #pstate{mountpoint = MountPoint}) -> + PState#pstate{mountpoint = emqx_mountpoint:replvar(MountPoint, credentials(PState))}. sp(true) -> 1; sp(false) -> 0. + diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 526b31c25..33661e2c3 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -73,11 +73,11 @@ %% Username username :: binary() | undefined, - %% Client pid binding with session - client_pid :: pid(), + %% Connection pid binding with session + conn_pid :: pid(), - %% Old client Pid that has been kickout - old_client_pid :: pid(), + %% Old Connection Pid that has been kickout + old_conn_pid :: pid(), %% Next packet id of the session next_pkt_id = 1 :: mqtt_packet_id(), @@ -133,9 +133,6 @@ %% Stats timer stats_timer :: reference() | undefined, - %% Ignore loop deliver? - ignore_loop_deliver = false :: boolean(), - %% TODO: deliver_stats = 0, @@ -148,7 +145,7 @@ -define(TIMEOUT, 60000). --define(INFO_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid, +-define(INFO_KEYS, [clean_start, client_id, username, binding, conn_pid, old_conn_pid, next_pkt_id, max_subscriptions, subscriptions, upgrade_qos, inflight, max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel, await_rel_timeout, expiry_interval, enable_stats, created_at]). @@ -169,20 +166,17 @@ start_link(SessAttrs) -> -spec(subscribe(pid(), list({topic(), map()}) | {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok). -subscribe(SPid, TopicFilters) when is_list(TopicFilters) -> - gen_server:cast(SPid, {subscribe, [begin - {Topic, Opts} = emqx_topic:parse(RawTopic), - {Topic, maps:merge( - maps:merge( - ?DEFAULT_SUBOPTS, SubOpts), Opts)} - end || {RawTopic, SubOpts} <- TopicFilters]}). +subscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> + TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts)) + || {RawTopic, SubOpts} <- RawTopicFilters], + subscribe(SPid, undefined, #{}, TopicFilters). %% for mqtt 5.0 subscribe(SPid, PacketId, Properties, TopicFilters) -> SubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {subscribe, self(), SubReq}). --spec(publish(pid(), mqtt_packet_id(), message()) -> {ok, delivery()} | {error, term()}). +-spec(publish(pid(), mqtt_packet_id(), message()) -> {ok, emqx_types:dispatches()}). publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 message to broker directly emqx_broker:publish(Msg); @@ -202,27 +196,29 @@ puback(SPid, PacketId) -> puback(SPid, PacketId, ReasonCode) -> gen_server:cast(SPid, {puback, PacketId, ReasonCode}). --spec(pubrec(pid(), mqtt_packet_id()) -> ok). +-spec(pubrec(pid(), mqtt_packet_id()) -> ok | {error, mqtt_reason_code()}). pubrec(SPid, PacketId) -> - gen_server:cast(SPid, {pubrec, PacketId}). + pubrec(SPid, PacketId, ?RC_SUCCESS). +-spec(pubrec(pid(), mqtt_packet_id(), mqtt_reason_code()) + -> ok | {error, mqtt_reason_code()}). pubrec(SPid, PacketId, ReasonCode) -> - gen_server:cast(SPid, {pubrec, PacketId, ReasonCode}). + gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity). --spec(pubrel(pid(), mqtt_packet_id(), mqtt_reason_code()) -> ok). +-spec(pubrel(pid(), mqtt_packet_id(), mqtt_reason_code()) + -> ok | {error, mqtt_reason_code()}). pubrel(SPid, PacketId, ReasonCode) -> - gen_server:cast(SPid, {pubrel, PacketId, ReasonCode}). + gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity). -spec(pubcomp(pid(), mqtt_packet_id(), mqtt_reason_code()) -> ok). pubcomp(SPid, PacketId, ReasonCode) -> gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}). --spec(unsubscribe(pid(), {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok). -unsubscribe(SPid, TopicFilters) when is_list(TopicFilters) -> - %%TODO: Parse the topic filters? - unsubscribe(SPid, undefined, #{}, TopicFilters). +-spec(unsubscribe(pid(), topic_table()) -> ok). +unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> + unsubscribe(SPid, undefined, #{}, lists:map(fun emqx_topic:parse/1, RawTopicFilters)). -%% TODO:... +-spec(unsubscribe(pid(), mqtt_packet_id(), mqtt_properties(), topic_table()) -> ok). unsubscribe(SPid, PacketId, Properties, TopicFilters) -> UnsubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). @@ -310,19 +306,18 @@ close(SPid) -> init(#{zone := Zone, client_id := ClientId, - client_pid := ClientPid, - clean_start := CleanStart, username := Username, - %% TODO: + conn_pid := ConnPid, + clean_start := CleanStart, conn_props := _ConnProps}) -> process_flag(trap_exit, true), - true = link(ClientPid), + true = link(ConnPid), MaxInflight = get_env(Zone, max_inflight), State = #state{clean_start = CleanStart, - binding = binding(ClientPid), + binding = binding(ConnPid), client_id = ClientId, - client_pid = ClientPid, username = Username, + conn_pid = ConnPid, subscriptions = #{}, max_subscriptions = get_env(Zone, max_subscriptions, 0), upgrade_qos = get_env(Zone, upgrade_qos, false), @@ -335,12 +330,11 @@ init(#{zone := Zone, max_awaiting_rel = get_env(Zone, max_awaiting_rel), expiry_interval = get_env(Zone, session_expiry_interval), enable_stats = get_env(Zone, enable_stats, true), - ignore_loop_deliver = get_env(Zone, ignore_loop_deliver, false), deliver_stats = 0, enqueue_stats = 0, created_at = os:timestamp()}, emqx_sm:register_session(ClientId, info(State)), - emqx_hooks:run('session.created', [ClientId]), + emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), {ok, ensure_stats_timer(State), hibernate}. init_mqueue(Zone, ClientId) -> @@ -351,29 +345,53 @@ init_mqueue(Zone, ClientId) -> binding(ClientPid) -> case node(ClientPid) =:= node() of true -> local; false -> remote end. -handle_call({discard, ClientPid}, _From, State = #state{client_pid = undefined}) -> - ?LOG(warning, "Discarded by ~p", [ClientPid], State), +handle_call({discard, ConnPid}, _From, State = #state{conn_pid = undefined}) -> + ?LOG(warning, "Discarded by ~p", [ConnPid], State), {stop, {shutdown, discard}, ok, State}; -handle_call({discard, ClientPid}, _From, State = #state{client_pid = OldClientPid}) -> - ?LOG(warning, " ~p kickout ~p", [ClientPid, OldClientPid], State), +handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) -> + ?LOG(warning, " ~p kickout ~p", [ConnPid, OldConnPid], State), {stop, {shutdown, conflict}, ok, State}; +%% PUBLISH: handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From, - State = #state{awaiting_rel = AwaitingRel, - await_rel_timer = Timer, - await_rel_timeout = Timeout}) -> + State = #state{awaiting_rel = AwaitingRel}) -> case is_awaiting_full(State) of false -> - State1 = case Timer == undefined of - true -> State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)}; - false -> State - end, - reply(ok, State1#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)}); + case maps:is_key(PacketId, AwaitingRel) of + true -> + reply({error, ?RC_PACKET_IDENTIFIER_IN_USE}, State); + false -> + State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)}, + reply(emqx_broker:publish(Msg), ensure_await_rel_timer(State1)) + end; true -> ?LOG(warning, "Dropped QoS2 Message for too many awaiting_rel: ~p", [Msg], State), emqx_metrics:inc('messages/qos2/dropped'), - reply({error, dropped}, State) + reply({error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State) + end; + +%% PUBREC: +handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = Inflight}) -> + case emqx_inflight:contain(PacketId, Inflight) of + true -> + reply(ok, acked(pubrec, PacketId, State)); + false -> + ?LOG(warning, "The PUBREC PacketId is not found: ~w", [PacketId], State), + emqx_metrics:inc('packets/pubrec/missed'), + reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State) + end; + +%% PUBREL: +handle_call({pubrel, PacketId, _ReasonCode}, _From, + State = #state{awaiting_rel = AwaitingRel}) -> + case maps:take(PacketId, AwaitingRel) of + {_, AwaitingRel1} -> + reply(ok, State#state{awaiting_rel = AwaitingRel1}); + error -> + ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State), + emqx_metrics:inc('packets/pubrel/missed'), + reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State) end; handle_call(info, _From, State) -> @@ -390,57 +408,38 @@ handle_call(Req, _From, State) -> {reply, ignored, State}. %% SUBSCRIBE: -handle_cast({subscribe, TopicFilters}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> - Subscriptions1 = lists:foldl( - fun({Topic, SubOpts}, SubMap) -> - case maps:find(Topic, SubMap) of - {ok, _OldOpts} -> - emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), - emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]), - ?LOG(warning, "Duplicated subscribe: ~s, subopts: ~p", [Topic, SubOpts], State); - error -> - emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]) - end, - maps:put(Topic, SubOpts, SubMap) - end, Subscriptions, TopicFilters), - {noreply, State#state{subscriptions = Subscriptions1}}; - -handle_cast({subscribe, From, {PacketId, Properties, TopicFilters}}, +handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> {ReasonCodes, Subscriptions1} = lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) -> - {[QoS|RcAcc], - case maps:find(Topic, SubMap) of - {ok, SubOpts} -> - ?LOG(warning, "Duplicated subscribe: ~s, subopts: ~p", [Topic, SubOpts], State), - SubMap; - {ok, OldOpts} -> - emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), - emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]), - ?LOG(warning, "Duplicated subscribe ~s, old_opts: ~p, new_opts: ~p", [Topic, OldOpts, SubOpts], State), - maps:put(Topic, with_subid(Properties, SubOpts), SubMap); - error -> - emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]), - maps:put(Topic, with_subid(Properties, SubOpts), SubMap) - end} + {[QoS|RcAcc], case maps:find(Topic, SubMap) of + {ok, SubOpts} -> + SubMap; + {ok, _SubOpts} -> + emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), + maps:put(Topic, SubOpts, SubMap); + error -> + emqx_broker:subscribe(Topic, ClientId, SubOpts), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), + maps:put(Topic, SubOpts, SubMap) + end} end, {[], Subscriptions}, TopicFilters), - suback(From, PacketId, ReasonCodes), + suback(FromPid, PacketId, ReasonCodes), {noreply, State#state{subscriptions = Subscriptions1}}; %% UNSUBSCRIBE: handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> {ReasonCodes, Subscriptions1} = - lists:foldr(fun({Topic, _Opts}, {RcAcc, SubMap}) -> + lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) -> case maps:find(Topic, SubMap) of {ok, SubOpts} -> - emqx_broker:unsubscribe(Topic, ClientId), - emqx_hooks:run('session.unsubscribed', [ClientId, Topic, SubOpts]), - {[?RC_SUCCESS|RcAcc], maps:remove(Topic, SubMap)}; + ok = emqx_broker:unsubscribe(Topic, ClientId), + emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]), + {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; error -> - {[?RC_NO_SUBSCRIPTION_EXISTED|RcAcc], SubMap} + {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap} end end, {[], Subscriptions}, TopicFilters), unsuback(From, PacketId, ReasonCodes), @@ -452,73 +451,45 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight} true -> {noreply, dequeue(acked(puback, PacketId, State))}; false -> - ?LOG(warning, "The PUBACK PacketId is not found: ~p", [PacketId], State), + ?LOG(warning, "The PUBACK PacketId is not found: ~w", [PacketId], State), emqx_metrics:inc('packets/puback/missed'), {noreply, State} end; -%% PUBREC: How to handle ReasonCode? -handle_cast({pubrec, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> - case emqx_inflight:contain(PacketId, Inflight) of - true -> - {noreply, acked(pubrec, PacketId, State)}; - false -> - ?LOG(warning, "The PUBREC PacketId is not found: ~w", [PacketId], State), - emqx_metrics:inc('packets/pubrec/missed'), - {noreply, State} - end; - -%% PUBREL: -handle_cast({pubrel, PacketId, _ReasonCode}, State = #state{awaiting_rel = AwaitingRel}) -> - {noreply, - case maps:take(PacketId, AwaitingRel) of - {Msg, AwaitingRel1} -> - %% Implement Qos2 by method A [MQTT 4.33] - %% Dispatch to subscriber when received PUBREL - emqx_broker:publish(Msg), %% FIXME: - maybe_gc(State#state{awaiting_rel = AwaitingRel1}); - error -> - ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State), - emqx_metrics:inc('packets/pubrel/missed'), - State - end, hibernate}; - %% PUBCOMP: handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> case emqx_inflight:contain(PacketId, Inflight) of true -> {noreply, dequeue(acked(pubcomp, PacketId, State))}; false -> - ?LOG(warning, "The PUBCOMP Packet Identifier is not found: ~w", [PacketId], State), + ?LOG(warning, "The PUBCOMP PacketId is not found: ~w", [PacketId], State), emqx_metrics:inc('packets/pubcomp/missed'), {noreply, State} end; %% RESUME: -handle_cast({resume, ClientPid}, - State = #state{client_id = ClientId, - client_pid = OldClientPid, - clean_start = CleanStart, - retry_timer = RetryTimer, - await_rel_timer = AwaitTimer, - expiry_timer = ExpireTimer}) -> +handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, + conn_pid = OldConnPid, + clean_start = CleanStart, + retry_timer = RetryTimer, + await_rel_timer = AwaitTimer, + expiry_timer = ExpireTimer}) -> - ?LOG(info, "Resumed by ~p ", [ClientPid], State), + ?LOG(info, "Resumed by connection ~p ", [ConnPid], State), %% Cancel Timers - lists:foreach(fun emqx_misc:cancel_timer/1, - [RetryTimer, AwaitTimer, ExpireTimer]), + lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]), - case kick(ClientId, OldClientPid, ClientPid) of - ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], State); + case kick(ClientId, OldConnPid, ConnPid) of + ok -> ?LOG(warning, "connection ~p kickout ~p", [ConnPid, OldConnPid], State); ignore -> ok end, - true = link(ClientPid), + true = link(ConnPid), - State1 = State#state{client_pid = ClientPid, - binding = binding(ClientPid), - old_client_pid = OldClientPid, + State1 = State#state{conn_pid = ConnPid, + binding = binding(ConnPid), + old_conn_pid = OldConnPid, clean_start = false, retry_timer = undefined, awaiting_rel = #{}, @@ -526,14 +497,9 @@ handle_cast({resume, ClientPid}, expiry_timer = undefined}, %% Clean Session: true -> false? - if - CleanStart =:= true -> - ?LOG(error, "CleanSess changed to false.", [], State1); - %%TODO:: - %%emqx_sm:register_session(ClientId, info(State1)); - CleanStart =:= false -> - ok - end, + CleanStart andalso emqx_sm:set_session_attrs(ClientId, info(State1)), + + emqx_hooks:run('session.resumed', [#{client_id => ClientId}, info(State)]), %% Replay delivery and Dequeue pending messages {noreply, ensure_stats_timer(dequeue(retry_delivery(true, State1)))}; @@ -542,22 +508,25 @@ handle_cast(Msg, State) -> emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), {noreply, State}. +%% Batch dispatch handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> {noreply, lists:foldl(fun(Msg, NewState) -> element(2, handle_info({dispatch, Topic, Msg}, NewState)) end, State, Msgs)}; -%% Ignore messages delivered by self -handle_info({dispatch, _Topic, #message{from = ClientId}}, - State = #state{client_id = ClientId, ignore_loop_deliver = true}) -> - {noreply, State}; - -%% Dispatch Message -handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) -> - {noreply, maybe_gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))}; +%% Dispatch message +handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) -> + {noreply, case maps:find(Topic, SubMap) of + {ok, #{nl := Nl, qos := QoS, subid := SubId}} -> + run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State); + {ok, #{nl := Nl, qos := QoS}} -> + run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State); + error -> + dispatch(reset_dup(Msg), State) + end}; %% Do nothing if the client has been disconnected. -handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) -> +handle_info({timeout, _Timer, retry_delivery}, State = #state{conn_pid = undefined}) -> {noreply, ensure_stats_timer(State#state{retry_timer = undefined})}; handle_info({timeout, _Timer, retry_delivery}, State) -> @@ -570,27 +539,25 @@ handle_info({timeout, _Timer, expired}, State) -> ?LOG(info, "Expired, shutdown now.", [], State), shutdown(expired, State); -handle_info({'EXIT', ClientPid, _Reason}, - State = #state{clean_start= true, client_pid = ClientPid}) -> +handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start= true, conn_pid = ConnPid}) -> {stop, normal, State}; -handle_info({'EXIT', ClientPid, Reason}, - State = #state{clean_start = false, - client_pid = ClientPid, - expiry_interval = Interval}) -> - ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State), +handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = false, + conn_pid = ConnPid, + expiry_interval = Interval}) -> + ?LOG(info, "Connection ~p EXIT for ~p", [ConnPid, Reason], State), ExpireTimer = emqx_misc:start_timer(Interval, expired), - State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer}, - {noreply, State1, hibernate}; + State1 = State#state{conn_pid = undefined, expiry_timer = ExpireTimer}, + {noreply, State1}; -handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) -> +handle_info({'EXIT', Pid, _Reason}, State = #state{old_conn_pid = Pid}) -> %% ignore {noreply, State, hibernate}; -handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) -> - ?LOG(error, "unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", - [ClientPid, Pid, Reason], State), - {noreply, State, hibernate}; +handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) -> + ?LOG(error, "unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p", + [ConnPid, Pid, Reason], State), + {noreply, State}; handle_info(emit_stats, State = #state{client_id = ClientId}) -> emqx_sm:set_session_stats(ClientId, stats(State)), @@ -600,8 +567,8 @@ handle_info(Info, State) -> emqx_logger:error("[Session] unexpected info: ~p", [Info]), {noreply, State}. -terminate(Reason, #state{client_id = ClientId, username = Username}) -> - emqx_hooks:run('session.terminated', [ClientId, Username, Reason]), +terminate(Reason, #state{client_id = ClientId}) -> + emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]), emqx_sm:unregister_session(ClientId). code_change(_OldVsn, State, _Extra) -> @@ -611,10 +578,6 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -with_subid(#{'Subscription-Identifier' := SubId}, Opts) -> - maps:put(subid, SubId, Opts); -with_subid(_Props, Opts) -> Opts. - suback(_From, undefined, _ReasonCodes) -> ignore; suback(From, PacketId, ReasonCodes) -> @@ -726,9 +689,22 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen}) %% Dispatch Messages %%------------------------------------------------------------------------------ +run_dispatch_steps([], Msg, State) -> + dispatch(Msg, State); +run_dispatch_steps([{nl, 1}|_Steps], #message{from = ClientId}, State = #state{client_id = ClientId}) -> + State; +run_dispatch_steps([{nl, 0}|Steps], Msg, State) -> + run_dispatch_steps(Steps, Msg, State); +run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = false}) -> + run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State); +run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) -> + run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State); +run_dispatch_steps([{subid, SubId}|Steps], Msg, State) -> + run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State). + %% Enqueue message if the client has been disconnected -dispatch(Msg, State = #state{client_id = ClientId, client_pid = undefined}) -> - case emqx_hooks:run('message.dropped', [ClientId, Msg]) of +dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) -> + case emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg]) of ok -> enqueue_msg(Msg, State); stop -> State end; @@ -761,12 +737,12 @@ redeliver({PacketId, Msg = #message{qos = QoS}}, State) -> true -> emqx_message:set_flag(dup, Msg) end, State); -redeliver({pubrel, PacketId}, #state{client_pid = Pid}) -> - Pid ! {deliver, {pubrel, PacketId}}. +redeliver({pubrel, PacketId}, #state{conn_pid = ConnPid}) -> + ConnPid ! {deliver, {pubrel, PacketId}}. -deliver(PacketId, Msg, #state{client_pid = Pid, binding = local}) -> +deliver(PacketId, Msg, #state{conn_pid = Pid, binding = local}) -> Pid ! {deliver, {publish, PacketId, Msg}}; -deliver(PacketId, Msg, #state{client_pid = Pid, binding = remote}) -> +deliver(PacketId, Msg, #state{conn_pid = Pid, binding = remote}) -> emqx_rpc:cast(node(Pid), erlang, send, [Pid, {deliver, PacketId, Msg}]). %%------------------------------------------------------------------------------ @@ -783,24 +759,20 @@ await(PacketId, Msg, State = #state{inflight = Inflight, end, State1#state{inflight = emqx_inflight:insert(PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight)}. -acked(puback, PacketId, State = #state{client_id = ClientId, - username = Username, - inflight = Inflight}) -> +acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, Msg, _Ts}} -> - emqx_hooks:run('message.acked', [ClientId, Username], Msg), + emqx_hooks:run('message.acked', [#{client_id =>ClientId}], Msg), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> ?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State), State end; -acked(pubrec, PacketId, State = #state{client_id = ClientId, - username = Username, - inflight = Inflight}) -> +acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, Msg, _Ts}} -> - emqx_hooks:run('message.acked', [ClientId, Username], Msg), + emqx_hooks:run('message.acked', [ClientId], Msg), State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)}; {value, {pubrel, PacketId, _Ts}} -> ?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State), @@ -818,7 +790,7 @@ acked(pubcomp, PacketId, State = #state{inflight = Inflight}) -> %%------------------------------------------------------------------------------ %% Do nothing if client is disconnected -dequeue(State = #state{client_pid = undefined}) -> +dequeue(State = #state{conn_pid = undefined}) -> State; dequeue(State = #state{inflight = Inflight}) -> @@ -836,19 +808,14 @@ dequeue2(State = #state{mqueue = Q}) -> dequeue(dispatch(Msg, State#state{mqueue = Q1})) end. -%%------------------------------------------------------------------------------ -%% Tune QoS -tune_qos(Topic, Msg = #message{qos = PubQoS}, - #state{subscriptions = SubMap, upgrade_qos = UpgradeQoS}) -> - case maps:find(Topic, SubMap) of - {ok, #{qos := SubQoS}} when UpgradeQoS andalso (SubQoS > PubQoS) -> - Msg#message{qos = SubQoS}; - {ok, #{qos := SubQoS}} when (not UpgradeQoS) andalso (SubQoS < PubQoS) -> - Msg#message{qos = SubQoS}; - {ok, _} -> Msg; - error -> Msg - end. +%%------------------------------------------------------------------------------ +%% Ensure timers + +ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) -> + State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)}; +ensure_await_rel_timer(State) -> + State. %%------------------------------------------------------------------------------ %% Reset Dup @@ -888,5 +855,5 @@ reply(Reply, State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. -maybe_gc(State) -> State. +%%TODO: maybe_gc(State) -> State. diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index e31adb141..577927b02 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -35,8 +35,6 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {session_pmon}). - -define(SM, ?MODULE). %% ETS Tables @@ -45,26 +43,22 @@ -define(SESSION_ATTRS_TAB, emqx_session_attrs). -define(SESSION_STATS_TAB, emqx_session_stats). --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?SM}, ?MODULE, [], []). %% @doc Open a session. -spec(open_session(map()) -> {ok, pid()} | {ok, pid(), boolean()} | {error, term()}). -open_session(Attrs = #{clean_start := true, - client_id := ClientId, - client_pid := ClientPid}) -> +open_session(Attrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) -> CleanStart = fun(_) -> - ok = discard_session(ClientId, ClientPid), + ok = discard_session(ClientId, ConnPid), emqx_session_sup:start_session(Attrs) end, emqx_sm_locker:trans(ClientId, CleanStart); -open_session(Attrs = #{clean_start := false, - client_id := ClientId, - client_pid := ClientPid}) -> +open_session(Attrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) -> ResumeStart = fun(_) -> - case resume_session(ClientId, ClientPid) of + case resume_session(ClientId, ConnPid) of {ok, SPid} -> {ok, SPid, true}; {error, not_found} -> @@ -80,34 +74,33 @@ open_session(Attrs = #{clean_start := false, discard_session(ClientId) when is_binary(ClientId) -> discard_session(ClientId, self()). -discard_session(ClientId, ClientPid) when is_binary(ClientId) -> - lists:foreach( - fun({_ClientId, SPid}) -> - case catch emqx_session:discard(SPid, ClientPid) of - {Err, Reason} when Err =:= 'EXIT'; Err =:= error -> - emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]); - ok -> ok - end - end, lookup_session(ClientId)). +discard_session(ClientId, ConnPid) when is_binary(ClientId) -> + lists:foreach(fun({_ClientId, SPid}) -> + case catch emqx_session:discard(SPid, ConnPid) of + {Err, Reason} when Err =:= 'EXIT'; Err =:= error -> + emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]); + ok -> ok + end + end, lookup_session(ClientId)). %% @doc Try to resume a session. -spec(resume_session(client_id()) -> {ok, pid()} | {error, term()}). resume_session(ClientId) -> resume_session(ClientId, self()). -resume_session(ClientId, ClientPid) -> +resume_session(ClientId, ConnPid) -> case lookup_session(ClientId) of [] -> {error, not_found}; [{_ClientId, SPid}] -> - ok = emqx_session:resume(SPid, ClientPid), + ok = emqx_session:resume(SPid, ConnPid), {ok, SPid}; Sessions -> [{_, SPid}|StaleSessions] = lists:reverse(Sessions), emqx_logger:error("[SM] More than one session found: ~p", [Sessions]), lists:foreach(fun({_, StalePid}) -> - catch emqx_session:discard(StalePid, ClientPid) + catch emqx_session:discard(StalePid, ConnPid) end, StaleSessions), - ok = emqx_session:resume(SPid, ClientPid), + ok = emqx_session:resume(SPid, ConnPid), {ok, SPid} end. @@ -224,11 +217,11 @@ handle_call(Req, _From, State) -> emqx_logger:error("[SM] unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({notify, {registered, ClientId, SPid}}, State = #state{session_pmon = PMon}) -> - {noreply, State#state{session_pmon = emqx_pmon:monitor(SPid, ClientId, PMon)}}; +handle_cast({notify, {registered, ClientId, SPid}}, State = #{session_pmon := PMon}) -> + {noreply, State#{session_pmon := emqx_pmon:monitor(SPid, ClientId, PMon)}}; -handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #state{session_pmon = PMon}) -> - {noreply, State#state{session_pmon = emqx_pmon:demonitor(SPid, PMon)}}; +handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #{session_pmon := PMon}) -> + {noreply, State#{session_pmon := emqx_pmon:demonitor(SPid, PMon)}}; handle_cast(Msg, State) -> emqx_logger:error("[SM] unexpected cast: ~p", [Msg]), @@ -236,7 +229,8 @@ handle_cast(Msg, State) -> handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{session_pmon := PMon}) -> case emqx_pmon:find(DownPid, PMon) of - undefined -> {noreply, State}; + undefined -> + {noreply, State}; ClientId -> unregister_session({ClientId, DownPid}), {noreply, State#{session_pmon := emqx_pmon:erase(DownPid, PMon)}} diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 32654b121..6fb0647c8 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -19,7 +19,7 @@ -export_type([startlink_ret/0]). -export_type([zone/0, client_id/0, username/0, password/0, peername/0, protocol/0, credentials/0]). --export_type([payload/0]). +-export_type([topic/0, payload/0, dispatches/0]). %%-export_type([payload/0, message/0, delivery/0]). -type(startlink_ret() :: {ok, pid()} | ignore | {error, term()}). @@ -36,7 +36,10 @@ zone => zone(), atom() => term()}). +-type(topic() :: binary()). -type(payload() :: binary() | iodata()). %-type(message() :: #message{}). %-type(delivery() :: #delivery{}). +-type(dispatches() :: [{route, node(), topic()} | {dispatch, topic(), pos_integer()}]). +