From 594819b752e899a3b8caf37e10b218fd134eb139 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 29 Aug 2018 02:53:22 +0800 Subject: [PATCH] Upgrade the publish sequence of QoS1/2 messages --- TODO | 61 ---------- src/emqx.erl | 6 +- src/emqx_broker.erl | 28 ++--- src/emqx_packet.erl | 65 +++++++---- src/emqx_protocol.erl | 258 ++++++++++++++++++++++-------------------- src/emqx_session.erl | 224 +++++++++++++++++------------------- src/emqx_types.erl | 5 +- 7 files changed, 300 insertions(+), 347 deletions(-) delete mode 100644 TODO diff --git a/TODO b/TODO deleted file mode 100644 index 814b42a3d..000000000 --- a/TODO +++ /dev/null @@ -1,61 +0,0 @@ - -## MQTT 5.0 - -1. Topic Alias -2. Subscriber ID -3. Session ensure stats -4. Message Expiration - -## Connection - -## WebSocket - -## Listeners - -## Protocol - -1. Global ACL cache with limited age and size? -2. Whether to enable ACL for each zone? - -## Session - -## Bridges - -Config -CLI -Remote Bridge -replay queue - -## Access Control - - Global ACL Cache - Add ACL cache emqx_access_control module - -## Zone - -## Hooks - -The hooks design... - -## MQueue - -Bound Queue -LastValue Queue -Priority Queue - -## Supervisor tree - -KernelSup - -## Managment - -## Dashboard - -## Testcases - -1. Update the README.md -2. Update the Documentation -3. Shared subscription and dispatch strategy -4. Remove lager syslog: - dep_lager_syslog = git https://github.com/basho/lager_syslog - diff --git a/src/emqx.erl b/src/emqx.erl index c39dde931..a51a41608 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -84,10 +84,8 @@ subscribe(Topic, Subscriber, Options) when is_tuple(Subscriber)-> {SubPid, SubId} = Subscriber, emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId, Options). -%% @doc Publish Message --spec(publish(message()) -> {ok, delivery()} | {error, term()}). -publish(Msg) -> - emqx_broker:publish(Msg). +-spec(publish(message()) -> {ok, emqx_types:dispatches()}). +publish(Msg) -> emqx_broker:publish(Msg). -spec(unsubscribe(topic() | string()) -> ok | {error, term()}). unsubscribe(Topic) -> diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 7184bb1b3..7adf6064e 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -143,16 +143,18 @@ multi_unsubscribe(Topics, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) - %% Publish %%------------------------------------------------------------------------------ --spec(publish(message()) -> delivery()). +-spec(publish(message()) -> {ok, emqx_types:dispatches()}). publish(Msg) when is_record(Msg, message) -> _ = emqx_tracer:trace(publish, Msg), - case emqx_hooks:run('message.publish', [], Msg) of - {ok, Msg1 = #message{topic = Topic}} -> - route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)); - {stop, Msg1} -> - emqx_logger:warning("Stop publishing: ~p", [Msg]), delivery(Msg1) - end. + {ok, case emqx_hooks:run('message.publish', [], Msg) of + {ok, Msg1 = #message{topic = Topic}} -> + Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)), + Delivery#delivery.flows; + {stop, _} -> + emqx_logger:warning("Stop publishing: ~p", [Msg]), [] + end}. +-spec(safe_publish(message()) -> ok). %% Called internally safe_publish(Msg) when is_record(Msg, message) -> try @@ -172,8 +174,8 @@ delivery(Msg) -> %%------------------------------------------------------------------------------ route([], Delivery = #delivery{message = Msg}) -> - emqx_hooks:run('message.dropped', [undefined, Msg]), - dropped(Msg#message.topic), Delivery; + emqx_hooks:run('message.dropped', [node(), Msg]), + inc_dropped_cnt(Msg#message.topic), Delivery; route([{To, Node}], Delivery) when Node =:= node() -> dispatch(To, Delivery); @@ -213,8 +215,8 @@ forward(Node, To, Delivery) -> dispatch(Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> case subscribers(Topic) of [] -> - emqx_hooks:run('message.dropped', [undefined, Msg]), - dropped(Topic), Delivery; + emqx_hooks:run('message.dropped', [node(), Msg]), + inc_dropped_cnt(Topic), Delivery; [Sub] -> %% optimize? dispatch(Sub, Topic, Msg), Delivery#delivery{flows = [{dispatch, Topic, 1}|Flows]}; @@ -230,9 +232,9 @@ dispatch({SubPid, _SubId}, Topic, Msg) when is_pid(SubPid) -> dispatch({share, _Group, _Sub}, _Topic, _Msg) -> ignored. -dropped(<<"$SYS/", _/binary>>) -> +inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; -dropped(_Topic) -> +inc_dropped_cnt(_Topic) -> emqx_metrics:inc('messages/dropped'). -spec(subscribers(topic()) -> [subscriber()]). diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index c8a751526..939252c3e 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -37,30 +37,40 @@ protocol_name(?MQTT_PROTO_V5) -> type_name(Type) when Type > ?RESERVED andalso Type =< ?AUTH -> lists:nth(Type, ?TYPE_NAMES). +%%------------------------------------------------------------------------------ +%% Validate MQTT Packet +%%------------------------------------------------------------------------------ + validate(?SUBSCRIBE_PACKET(_PacketId, _Properties, [])) -> - error(packet_empty_topic_filters); + error(topic_filters_invalid); validate(?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters)) -> validate_packet_id(PacketId) andalso validate_properties(?SUBSCRIBE, Properties) andalso ok == lists:foreach(fun validate_subscription/1, TopicFilters); validate(?UNSUBSCRIBE_PACKET(_PacketId, [])) -> - error(packet_empty_topic_filters); + error(topic_filters_invalid); validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) -> validate_packet_id(PacketId) andalso ok == lists:foreach(fun emqx_topic:validate/1, TopicFilters); +validate(?PUBLISH_PACKET(_QoS, <<>>, _, _)) -> + error(topic_name_invalid); +validate(?PUBLISH_PACKET(_QoS, Topic, _, _)) -> + emqx_topic:wildcard(Topic) orelse error(topic_name_invalid); + validate(_Packet) -> true. validate_packet_id(0) -> - error(bad_packet_id); + error(packet_id_invalid); validate_packet_id(_) -> true. -validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := 0}) -> - error(bad_subscription_identifier); -validate_properties(?SUBSCRIBE, _) -> +validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I}) + when I =< 0; I >= 16#FFFFFFF -> + error(subscription_identifier_invalid); +validate_properties(_, _) -> true. validate_subscription({Topic, #{qos := QoS}}) -> @@ -85,30 +95,35 @@ from_message(PacketId, Msg = #message{qos = QoS, topic = Topic, payload = Payloa payload = Payload}. %% @doc Message from Packet --spec(to_message(client_id(), mqtt_packet()) -> message()). -to_message(ClientId, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - retain = Retain, - qos = QoS, - dup = Dup}, - variable = #mqtt_packet_publish{topic_name = Topic, - properties = Props}, - payload = Payload}) -> +-spec(to_message(emqx_types:credentials(), mqtt_packet()) -> message()). +to_message(#{client_id := ClientId, username := Username}, + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + retain = Retain, + qos = QoS, + dup = Dup}, + variable = #mqtt_packet_publish{topic_name = Topic, + properties = Props}, + payload = Payload}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), Msg#message{flags = #{dup => Dup, retain => Retain}, - headers = if - Props =:= undefined -> #{}; - true -> Props - end}; + headers = merge_props(#{username => Username}, Props)}; -to_message(_ClientId, #mqtt_packet_connect{will_flag = false}) -> +to_message(_Credentials, #mqtt_packet_connect{will_flag = false}) -> undefined; -to_message(ClientId, #mqtt_packet_connect{will_retain = Retain, - will_qos = QoS, - will_topic = Topic, - will_props = Props, - will_payload = Payload}) -> +to_message(#{client_id := ClientId, username := Username}, + #mqtt_packet_connect{will_retain = Retain, + will_qos = QoS, + will_topic = Topic, + will_props = Props, + will_payload = Payload}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), - Msg#message{flags = #{qos => QoS, retain => Retain}, headers = Props}. + Msg#message{flags = #{dup => false, retain => Retain}, + headers = merge_props(#{username => Username}, Props)}. + +merge_props(Headers, undefined) -> + Headers; +merge_props(Headers, Props) -> + maps:merge(Headers, Props). %% @doc Format packet -spec(format(mqtt_packet()) -> iolist()). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 2945954b1..9d59fff77 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -18,11 +18,14 @@ -include("emqx_mqtt.hrl"). -export([init/2, info/1, caps/1, stats/1]). +-export([client_id/1]). -export([credentials/1]). --export([client/1, client_id/1]). --export([session/1]). -export([parser/1]). --export([received/2, process/2, deliver/2, send/2]). +-export([session/1]). +-export([received/2]). +-export([process_packet/2]). +-export([deliver/2]). +-export([send/2]). -export([shutdown/2]). -record(pstate, { @@ -34,12 +37,13 @@ proto_name, ackprops, client_id, - client_pid, + conn_pid, conn_props, ack_props, username, session, clean_start, + topic_aliases, packet_size, will_msg, keepalive, @@ -54,9 +58,12 @@ }). -type(state() :: #pstate{}). - -export_type([state/0]). +-ifdef(TEST). +-compile(export_all). +-endif. + -define(LOG(Level, Format, Args, PState), emqx_logger:Level([{client, PState#pstate.client_id}], "Client(~s@~s): " ++ Format, [PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])). @@ -75,10 +82,11 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) proto_ver = ?MQTT_PROTO_V4, proto_name = <<"MQTT">>, client_id = <<>>, - client_pid = self(), + conn_pid = self(), username = init_username(Peercert, Options), is_super = false, clean_start = false, + topic_aliases = #{}, packet_size = emqx_zone:get_env(Zone, max_packet_size), mountpoint = emqx_zone:get_env(Zone, mountpoint), is_bridge = false, @@ -135,6 +143,9 @@ info(#pstate{zone = Zone, caps(#pstate{zone = Zone}) -> emqx_mqtt_caps:get_caps(Zone). +client_id(#pstate{client_id = ClientId}) -> + ClientId. + credentials(#pstate{zone = Zone, client_id = ClientId, username = Username, @@ -144,20 +155,6 @@ credentials(#pstate{zone = Zone, username => Username, peername => Peername}. -client(#pstate{zone = Zone, - client_id = ClientId, - client_pid = ClientPid, - peername = Peername, - username = Username}) -> - #client{id = ClientId, - pid = ClientPid, - zone = Zone, - peername = Peername, - username = Username}. - -client_id(#pstate{client_id = ClientId}) -> - ClientId. - stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg}, send_stats = #{pkt := SendPkt, msg := SendMsg}}) -> [{recv_pkt, RecvPkt}, @@ -177,42 +174,73 @@ parser(#pstate{packet_size = Size, proto_ver = Ver}) -> -spec(received(mqtt_packet(), state()) -> {ok, state()} | {error, term()} | {error, term(), state()}). -received(?PACKET(Type), PState = #pstate{connected = false}) - when Type =/= ?CONNECT -> +received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT -> {error, proto_not_connected, PState}; received(?PACKET(?CONNECT), PState = #pstate{connected = true}) -> - {error, proto_bad_connect, PState}; + {error, proto_unexpected_connect, PState}; received(Packet = ?PACKET(Type), PState) -> trace(recv, Packet, PState), case catch emqx_packet:validate(Packet) of true -> - process(Packet, inc_stats(recv, Type, PState)); - {'EXIT', {ReasonCode, _Stacktrace}} when is_integer(ReasonCode) -> - deliver({disconnect, ReasonCode}, PState), - {error, protocol_error, PState}; + {Packet1, PState1} = preprocess_properties(Packet, PState), + process_packet(Packet1, inc_stats(recv, Type, PState1)); {'EXIT', {Reason, _Stacktrace}} -> deliver({disconnect, ?RC_MALFORMED_PACKET}, PState), {error, Reason, PState} end. %%------------------------------------------------------------------------------ -%% Process Packet +%% Preprocess MQTT Properties %%------------------------------------------------------------------------------ -process(?CONNECT_PACKET( - #mqtt_packet_connect{proto_name = ProtoName, - proto_ver = ProtoVer, - is_bridge = IsBridge, - clean_start = CleanStart, - keepalive = Keepalive, - properties = ConnProps, - client_id = ClientId, - username = Username, - password = Password} = Connect), PState) -> +%% Subscription Identifier +preprocess_properties(Packet = #mqtt_packet{ + variable = Subscribe = #mqtt_packet_subscribe{ + properties = #{'Subscription-Identifier' := SubId}, + topic_filters = TopicFilters + } + }, + PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) -> + TopicFilters1 = [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters], + {Packet#mqtt_packet{variable = Subscribe#mqtt_packet_subscribe{topic_filters = TopicFilters1}}, PState}; - io:format("~p~n", [Connect]), +%% Topic Alias Mapping +preprocess_properties(Packet = #mqtt_packet{ + variable = Publish = #mqtt_packet_publish{ + topic_name = <<>>, + properties = #{'Topic-Alias' := AliasId}} + }, + PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) -> + {Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{ + topic_name = maps:get(AliasId, Aliases, <<>>)}}, PState}; + +preprocess_properties(Packet = #mqtt_packet{ + variable = #mqtt_packet_publish{ + topic_name = Topic, + properties = #{'Topic-Alias' := AliasId}} + }, + PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) -> + {Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}}; + +preprocess_properties(Packet, PState) -> + {Packet, PState}. + +%%------------------------------------------------------------------------------ +%% Process MQTT Packet +%%------------------------------------------------------------------------------ + +process_packet(?CONNECT_PACKET( + #mqtt_packet_connect{proto_name = ProtoName, + proto_ver = ProtoVer, + is_bridge = IsBridge, + clean_start = CleanStart, + keepalive = Keepalive, + properties = ConnProps, + client_id = ClientId, + username = Username, + password = Password} = Connect), PState) -> PState1 = set_username(Username, PState#pstate{client_id = ClientId, @@ -240,10 +268,8 @@ process(?CONNECT_PACKET( ok = emqx_cm:register_connection(client_id(PState4), info(PState4)), %% Start keepalive start_keepalive(Keepalive, PState4), - %% TODO: 'Run hooks' before open_session? - emqx_hooks:run('client.connected', [?RC_SUCCESS], client(PState4)), %% Success - {?RC_SUCCESS, SP, replvar(PState4)}; + {?RC_SUCCESS, SP, PState4}; {error, Error} -> ?LOG(error, "Failed to open session: ~p", [Error], PState1), {?RC_UNSPECIFIED_ERROR, PState1} @@ -256,7 +282,7 @@ process(?CONNECT_PACKET( {ReasonCode, PState1} end); -process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) -> +process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) -> case check_publish(Packet, PState) of {ok, PState1} -> do_publish(Packet, PState1); @@ -265,7 +291,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) -> {ok, PState} end; -process(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) -> +process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) -> case check_publish(Packet, PState) of {ok, PState1} -> do_publish(Packet, PState1); @@ -273,7 +299,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) -> deliver({puback, PacketId, ReasonCode}, PState) end; -process(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) -> +process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) -> case check_publish(Packet, PState) of {ok, PState1} -> do_publish(Packet, PState1); @@ -281,30 +307,37 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) -> deliver({pubrec, PacketId, ReasonCode}, PState) end; -process(?PUBACK_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> - ok = emqx_session:puback(SPid, PacketId, ReasonCode), - {ok, PState}; +process_packet(?PUBACK_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> + {ok = emqx_session:puback(SPid, PacketId, ReasonCode), PState}; -process(?PUBREC_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> - ok = emqx_session:pubrec(SPid, PacketId, ReasonCode), - send(?PUBREL_PACKET(PacketId), PState); +process_packet(?PUBREC_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> + case emqx_session:pubrec(SPid, PacketId, ReasonCode) of + ok -> + send(?PUBREL_PACKET(PacketId), PState); + {error, NotFound} -> + send(?PUBREL_PACKET(PacketId, NotFound), PState) + end; -process(?PUBREL_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> - ok = emqx_session:pubrel(SPid, PacketId, ReasonCode), - send(?PUBCOMP_PACKET(PacketId), PState); +process_packet(?PUBREL_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> + case emqx_session:pubrel(SPid, PacketId, ReasonCode) of + ok -> + send(?PUBCOMP_PACKET(PacketId), PState); + {error, NotFound} -> + send(?PUBCOMP_PACKET(PacketId, NotFound), PState) + end; -process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> - ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), - {ok, PState}; +process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> + {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState}; -process(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{client_id = ClientId, session = SPid}) -> +process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), + PState = #pstate{session = SPid, mountpoint = Mountpoint}) -> case check_subscribe( parse_topic_filters(?SUBSCRIBE, RawTopicFilters), PState) of {ok, TopicFilters} -> - case emqx_hooks:run('client.subscribe', [ClientId], TopicFilters) of + case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of {ok, TopicFilters1} -> - ok = emqx_session:subscribe(SPid, PacketId, Properties, mount(TopicFilters1, PState)), + ok = emqx_session:subscribe(SPid, PacketId, Properties, + emqx_mountpoint:mount(Mountpoint, TopicFilters1)), {ok, PState}; {stop, _} -> ReasonCodes = lists:duplicate(length(TopicFilters), @@ -320,12 +353,13 @@ process(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), deliver({suback, PacketId, ReasonCodes}, PState) end; -process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{client_id = ClientId, session = SPid}) -> - case emqx_hooks:run('client.unsubscribe', [ClientId], +process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), + PState = #pstate{session = SPid, mountpoint = MountPoint}) -> + case emqx_hooks:run('client.unsubscribe', [credentials(PState)], parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)) of {ok, TopicFilters} -> - ok = emqx_session:unsubscribe(SPid, PacketId, Properties, mount(TopicFilters, PState)), + ok = emqx_session:unsubscribe(SPid, PacketId, Properties, + emqx_mountpoint:mount(MountPoint, TopicFilters)), {ok, PState}; {stop, _Acc} -> ReasonCodes = lists:duplicate(length(RawTopicFilters), @@ -333,19 +367,20 @@ process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), deliver({unsuback, PacketId, ReasonCodes}, PState) end; -process(?PACKET(?PINGREQ), PState) -> +process_packet(?PACKET(?PINGREQ), PState) -> send(?PACKET(?PINGRESP), PState); -process(?PACKET(?DISCONNECT), PState) -> +process_packet(?PACKET(?DISCONNECT), PState) -> %% Clean willmsg {stop, normal, PState#pstate{will_msg = undefined}}. %%------------------------------------------------------------------------------ -%% ConnAck -> Client +%% ConnAck --> Client %%------------------------------------------------------------------------------ connack({?RC_SUCCESS, SP, PState}) -> - deliver({connack, ?RC_SUCCESS, sp(SP)}, PState); + emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS]), + deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState)); connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> _ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 -> @@ -360,20 +395,28 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> %%------------------------------------------------------------------------------ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId), - PState = #pstate{client_id = ClientId, session = SPid}) -> - Msg = mount(emqx_packet:to_message(ClientId, Packet), PState), - _ = emqx_session:publish(SPid, PacketId, Msg), - puback(QoS, PacketId, PState). + PState = #pstate{session = SPid, mountpoint = MountPoint}) -> + Msg = emqx_mountpoint:mount(MountPoint, + emqx_packet:to_message(credentials(PState), Packet)), + puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, Msg), PState). %%------------------------------------------------------------------------------ %% Puback -> Client %%------------------------------------------------------------------------------ -puback(?QOS_0, _PacketId, PState) -> +puback(?QOS_0, _PacketId, _Result, PState) -> {ok, PState}; -puback(?QOS_1, PacketId, PState) -> +puback(?QOS_1, PacketId, {error, ReasonCode}, PState) -> + deliver({puback, PacketId, ReasonCode}, PState); +puback(?QOS_1, PacketId, {ok, []}, PState) -> + deliver({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); +puback(?QOS_1, PacketId, {ok, _}, PState) -> deliver({puback, PacketId, ?RC_SUCCESS}, PState); -puback(?QOS_2, PacketId, PState) -> +puback(?QOS_2, PacketId, {error, ReasonCode}, PState) -> + deliver({pubrec, PacketId, ReasonCode}, PState); +puback(?QOS_2, PacketId, {ok, []}, PState) -> + deliver({pubrec, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); +puback(?QOS_2, PacketId, {ok, _}, PState) -> deliver({pubrec, PacketId, ?RC_SUCCESS}, PState). %%------------------------------------------------------------------------------ @@ -386,10 +429,9 @@ deliver({connack, ReasonCode}, PState) -> deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); -deliver({publish, PacketId, Msg}, PState = #pstate{client_id = ClientId, - is_bridge = IsBridge}) -> - _ = emqx_hooks:run('message.delivered', [ClientId], Msg), - Msg1 = unmount(clean_retain(IsBridge, Msg), PState), +deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge = IsBridge, mountpoint = MountPoint}) -> + _ = emqx_hooks:run('message.delivered', credentials(PState), Msg), + Msg1 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg)), send(emqx_packet:from_message(PacketId, Msg1), PState); deliver({puback, PacketId, ReasonCode}, PState) -> @@ -445,13 +487,13 @@ maybe_assign_client_id(PState) -> try_open_session(#pstate{zone = Zone, client_id = ClientId, - client_pid = ClientPid, + conn_pid = ConnPid, conn_props = ConnProps, username = Username, clean_start = CleanStart}) -> case emqx_sm:open_session(#{zone => Zone, client_id => ClientId, - client_pid => ClientPid, + conn_pid => ConnPid, username => Username, clean_start => CleanStart, conn_props => ConnProps}) of @@ -592,14 +634,14 @@ shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) -> true -> ok; false -> send_willmsg(WillMsg) end, - emqx_hooks:run('client.disconnected', [Error], client(PState)), + emqx_hooks:run('client.disconnected', [credentials(PState), Error]), emqx_cm:unregister_connection(ClientId). -willmsg(Packet, PState = #pstate{client_id = ClientId}) +willmsg(Packet, #pstate{client_id = ClientId, mountpoint = MountPoint}) when is_record(Packet, mqtt_packet_connect) -> case emqx_packet:to_message(ClientId, Packet) of undefined -> undefined; - Msg -> mount(Msg, PState) + Msg -> emqx_mountpoint:mount(MountPoint, Msg) end. send_willmsg(undefined) -> @@ -617,14 +659,11 @@ start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 -> %% Parse topic filters %%----------------------------------------------------------------------------- -parse_topic_filters(?SUBSCRIBE, TopicFilters) -> - [begin - {Topic, TOpts} = emqx_topic:parse(RawTopic), - {Topic, maps:merge(SubOpts, TOpts)} - end || {RawTopic, SubOpts} <- TopicFilters]; +parse_topic_filters(?SUBSCRIBE, RawTopicFilters) -> + [emqx_topic:parse(RawTopic, SubOpts) || {RawTopic, SubOpts} <- RawTopicFilters]; -parse_topic_filters(?UNSUBSCRIBE, TopicFilters) -> - lists:map(fun emqx_topic:parse/1, TopicFilters). +parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) -> + lists:map(fun emqx_topic:parse/1, RawTopicFilters). %%----------------------------------------------------------------------------- %% The retained flag should be propagated for bridge. @@ -638,37 +677,14 @@ clean_retain(false, Msg = #message{flags = #{retain := true}, headers = Headers} clean_retain(_IsBridge, Msg) -> Msg. -%%----------------------------------------------------------------------------- -%% Mount Point -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ +%% Update mountpoint -mount(Any, #pstate{mountpoint = undefined}) -> - Any; -mount(Msg = #message{topic = Topic}, #pstate{mountpoint = MountPoint}) -> - Msg#message{topic = <>}; -mount(TopicFilters, #pstate{mountpoint = MountPoint}) when is_list(TopicFilters) -> - [{<>, SubOpts} || {Topic, SubOpts} <- TopicFilters]. - -unmount(Any, #pstate{mountpoint = undefined}) -> - Any; -unmount(Msg = #message{topic = Topic}, #pstate{mountpoint = MountPoint}) -> - case catch split_binary(Topic, byte_size(MountPoint)) of - {MountPoint, Topic1} -> Msg#message{topic = Topic1}; - _Other -> Msg - end. - -replvar(PState = #pstate{mountpoint = undefined}) -> +update_mountpoint(PState = #pstate{mountpoint = undefined}) -> PState; -replvar(PState = #pstate{client_id = ClientId, username = Username, mountpoint = MountPoint}) -> - Vars = [{<<"%c">>, ClientId}, {<<"%u">>, Username}], - PState#pstate{mountpoint = lists:foldl(fun feed_var/2, MountPoint, Vars)}. - -feed_var({<<"%c">>, ClientId}, MountPoint) -> - emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint); -feed_var({<<"%u">>, undefined}, MountPoint) -> - MountPoint; -feed_var({<<"%u">>, Username}, MountPoint) -> - emqx_topic:feed_var(<<"%u">>, Username, MountPoint). +update_mountpoint(PState = #pstate{mountpoint = MountPoint}) -> + PState#pstate{mountpoint = emqx_mountpoint:replvar(MountPoint, credentials(PState))}. sp(true) -> 1; sp(false) -> 0. + diff --git a/src/emqx_session.erl b/src/emqx_session.erl index a96fac8f3..8c7795682 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -133,9 +133,6 @@ %% Stats timer stats_timer :: reference() | undefined, - %% Ignore loop deliver? - ignore_loop_deliver = false :: boolean(), - %% TODO: deliver_stats = 0, @@ -169,20 +166,17 @@ start_link(SessAttrs) -> -spec(subscribe(pid(), list({topic(), map()}) | {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok). -subscribe(SPid, TopicFilters) when is_list(TopicFilters) -> - gen_server:cast(SPid, {subscribe, [begin - {Topic, Opts} = emqx_topic:parse(RawTopic), - {Topic, maps:merge( - maps:merge( - ?DEFAULT_SUBOPTS, SubOpts), Opts)} - end || {RawTopic, SubOpts} <- TopicFilters]}). +subscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> + TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts)) + || {RawTopic, SubOpts} <- RawTopicFilters], + subscribe(SPid, undefined, #{}, TopicFilters). %% for mqtt 5.0 subscribe(SPid, PacketId, Properties, TopicFilters) -> SubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {subscribe, self(), SubReq}). --spec(publish(pid(), mqtt_packet_id(), message()) -> {ok, delivery()} | {error, term()}). +-spec(publish(pid(), mqtt_packet_id(), message()) -> {ok, emqx_types:dispatches()}). publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 message to broker directly emqx_broker:publish(Msg); @@ -202,27 +196,29 @@ puback(SPid, PacketId) -> puback(SPid, PacketId, ReasonCode) -> gen_server:cast(SPid, {puback, PacketId, ReasonCode}). --spec(pubrec(pid(), mqtt_packet_id()) -> ok). +-spec(pubrec(pid(), mqtt_packet_id()) -> ok | {error, mqtt_reason_code()}). pubrec(SPid, PacketId) -> - gen_server:cast(SPid, {pubrec, PacketId}). + pubrec(SPid, PacketId, ?RC_SUCCESS). +-spec(pubrec(pid(), mqtt_packet_id(), mqtt_reason_code()) + -> ok | {error, mqtt_reason_code()}). pubrec(SPid, PacketId, ReasonCode) -> - gen_server:cast(SPid, {pubrec, PacketId, ReasonCode}). + gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity). --spec(pubrel(pid(), mqtt_packet_id(), mqtt_reason_code()) -> ok). +-spec(pubrel(pid(), mqtt_packet_id(), mqtt_reason_code()) + -> ok | {error, mqtt_reason_code()}). pubrel(SPid, PacketId, ReasonCode) -> - gen_server:cast(SPid, {pubrel, PacketId, ReasonCode}). + gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity). -spec(pubcomp(pid(), mqtt_packet_id(), mqtt_reason_code()) -> ok). pubcomp(SPid, PacketId, ReasonCode) -> gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}). --spec(unsubscribe(pid(), {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok). -unsubscribe(SPid, TopicFilters) when is_list(TopicFilters) -> - %%TODO: Parse the topic filters? - unsubscribe(SPid, {undefined, #{}, TopicFilters}). +-spec(unsubscribe(pid(), topic_table()) -> ok). +unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> + unsubscribe(SPid, undefined, #{}, lists:map(fun emqx_topic:parse/1, RawTopicFilters)). -%% TODO:... +-spec(unsubscribe(pid(), mqtt_packet_id(), mqtt_properties(), topic_table()) -> ok). unsubscribe(SPid, PacketId, Properties, TopicFilters) -> UnsubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). @@ -335,7 +331,6 @@ init(#{zone := Zone, max_awaiting_rel = get_env(Zone, max_awaiting_rel), expiry_interval = get_env(Zone, session_expiry_interval), enable_stats = get_env(Zone, enable_stats, true), - ignore_loop_deliver = get_env(Zone, ignore_loop_deliver, false), deliver_stats = 0, enqueue_stats = 0, created_at = os:timestamp()}, @@ -359,21 +354,45 @@ handle_call({discard, ClientPid}, _From, State = #state{client_pid = OldClientPi ?LOG(warning, " ~p kickout ~p", [ClientPid, OldClientPid], State), {stop, {shutdown, conflict}, ok, State}; +%% PUBLISH: handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From, - State = #state{awaiting_rel = AwaitingRel, - await_rel_timer = Timer, - await_rel_timeout = Timeout}) -> + State = #state{awaiting_rel = AwaitingRel}) -> case is_awaiting_full(State) of false -> - State1 = case Timer == undefined of - true -> State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)}; - false -> State - end, - reply(ok, State1#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)}); + case maps:is_key(PacketId, AwaitingRel) of + true -> + reply({error, ?RC_PACKET_IDENTIFIER_IN_USE}, State); + false -> + State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)}, + reply(emqx_broker:publish(Msg), ensure_await_rel_timer(State1)) + end; true -> ?LOG(warning, "Dropped QoS2 Message for too many awaiting_rel: ~p", [Msg], State), emqx_metrics:inc('messages/qos2/dropped'), - reply({error, dropped}, State) + reply({error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State) + end; + +%% PUBREC: +handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = Inflight}) -> + case emqx_inflight:contain(PacketId, Inflight) of + true -> + reply(ok, acked(pubrec, PacketId, State)); + false -> + ?LOG(warning, "The PUBREC PacketId is not found: ~w", [PacketId], State), + emqx_metrics:inc('packets/pubrec/missed'), + reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State) + end; + +%% PUBREL: +handle_call({pubrel, PacketId, _ReasonCode}, _From, + State = #state{awaiting_rel = AwaitingRel}) -> + case maps:take(PacketId, AwaitingRel) of + {_, AwaitingRel1} -> + reply(ok, State#state{awaiting_rel = AwaitingRel1}); + error -> + ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State), + emqx_metrics:inc('packets/pubrel/missed'), + reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State) end; handle_call(info, _From, State) -> @@ -390,57 +409,38 @@ handle_call(Req, _From, State) -> {reply, ignored, State}. %% SUBSCRIBE: -handle_cast({subscribe, TopicFilters}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> - Subscriptions1 = lists:foldl( - fun({Topic, SubOpts}, SubMap) -> - case maps:find(Topic, SubMap) of - {ok, _OldOpts} -> - emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), - emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]), - ?LOG(warning, "Duplicated subscribe: ~s, subopts: ~p", [Topic, SubOpts], State); - error -> - emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]) - end, - maps:put(Topic, SubOpts, SubMap) - end, Subscriptions, TopicFilters), - {noreply, State#state{subscriptions = Subscriptions1}}; - -handle_cast({subscribe, From, {PacketId, Properties, TopicFilters}}, +handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> {ReasonCodes, Subscriptions1} = lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) -> - {[QoS|RcAcc], - case maps:find(Topic, SubMap) of - {ok, SubOpts} -> - ?LOG(warning, "Duplicated subscribe: ~s, subopts: ~p", [Topic, SubOpts], State), - SubMap; - {ok, OldOpts} -> - emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), - emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]), - ?LOG(warning, "Duplicated subscribe ~s, old_opts: ~p, new_opts: ~p", [Topic, OldOpts, SubOpts], State), - maps:put(Topic, with_subid(Properties, SubOpts), SubMap); - error -> - emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]), - maps:put(Topic, with_subid(Properties, SubOpts), SubMap) - end} + {[QoS|RcAcc], case maps:find(Topic, SubMap) of + {ok, SubOpts} -> + SubMap; + {ok, _SubOpts} -> + emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), + emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]), + maps:put(Topic, SubOpts, SubMap); + error -> + emqx_broker:subscribe(Topic, ClientId, SubOpts), + emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]), + maps:put(Topic, SubOpts, SubMap) + end} end, {[], Subscriptions}, TopicFilters), - suback(From, PacketId, ReasonCodes), + suback(FromPid, PacketId, ReasonCodes), {noreply, State#state{subscriptions = Subscriptions1}}; %% UNSUBSCRIBE: handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> {ReasonCodes, Subscriptions1} = - lists:foldr(fun({Topic, _Opts}, {RcAcc, SubMap}) -> + lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) -> case maps:find(Topic, SubMap) of {ok, SubOpts} -> - emqx_broker:unsubscribe(Topic, ClientId), + ok = emqx_broker:unsubscribe(Topic, ClientId), emqx_hooks:run('session.unsubscribed', [ClientId, Topic, SubOpts]), - {[?RC_SUCCESS|RcAcc], maps:remove(Topic, SubMap)}; + {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; error -> - {[?RC_NO_SUBSCRIPTION_EXISTED|RcAcc], SubMap} + {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap} end end, {[], Subscriptions}, TopicFilters), unsuback(From, PacketId, ReasonCodes), @@ -452,44 +452,18 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight} true -> {noreply, dequeue(acked(puback, PacketId, State))}; false -> - ?LOG(warning, "The PUBACK PacketId is not found: ~p", [PacketId], State), + ?LOG(warning, "The PUBACK PacketId is not found: ~w", [PacketId], State), emqx_metrics:inc('packets/puback/missed'), {noreply, State} end; -%% PUBREC: How to handle ReasonCode? -handle_cast({pubrec, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> - case emqx_inflight:contain(PacketId, Inflight) of - true -> - {noreply, acked(pubrec, PacketId, State)}; - false -> - ?LOG(warning, "The PUBREC PacketId is not found: ~w", [PacketId], State), - emqx_metrics:inc('packets/pubrec/missed'), - {noreply, State} - end; - -%% PUBREL: -handle_cast({pubrel, PacketId, _ReasonCode}, State = #state{awaiting_rel = AwaitingRel}) -> - {noreply, - case maps:take(PacketId, AwaitingRel) of - {Msg, AwaitingRel1} -> - %% Implement Qos2 by method A [MQTT 4.33] - %% Dispatch to subscriber when received PUBREL - emqx_broker:publish(Msg), %% FIXME: - maybe_gc(State#state{awaiting_rel = AwaitingRel1}); - error -> - ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State), - emqx_metrics:inc('packets/pubrel/missed'), - State - end, hibernate}; - %% PUBCOMP: handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> case emqx_inflight:contain(PacketId, Inflight) of true -> {noreply, dequeue(acked(pubcomp, PacketId, State))}; false -> - ?LOG(warning, "The PUBCOMP Packet Identifier is not found: ~w", [PacketId], State), + ?LOG(warning, "The PUBCOMP PacketId is not found: ~w", [PacketId], State), emqx_metrics:inc('packets/pubcomp/missed'), {noreply, State} end; @@ -542,19 +516,22 @@ handle_cast(Msg, State) -> emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), {noreply, State}. +%% Batch dispatch handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> {noreply, lists:foldl(fun(Msg, NewState) -> element(2, handle_info({dispatch, Topic, Msg}, NewState)) end, State, Msgs)}; -%% Ignore messages delivered by self -handle_info({dispatch, _Topic, #message{from = ClientId}}, - State = #state{client_id = ClientId, ignore_loop_deliver = true}) -> - {noreply, State}; - -%% Dispatch Message -handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) -> - {noreply, maybe_gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))}; +%% Dispatch message +handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) -> + {noreply, case maps:find(Topic, SubMap) of + {ok, #{nl := Nl, qos := QoS, subid := SubId}} -> + run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State); + {ok, #{nl := Nl, qos := QoS}} -> + run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State); + error -> + dispatch(reset_dup(Msg), State) + end}; %% Do nothing if the client has been disconnected. handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) -> @@ -611,11 +588,6 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -with_subid(#{'Subscription-Identifier' := SubId}, SubOpts) -> - maps:put(subid, SubId, SubOpts); -with_subid(_Props, SubOpts) -> - SubOpts. - suback(_From, undefined, _ReasonCodes) -> ignore; suback(From, PacketId, ReasonCodes) -> @@ -727,6 +699,19 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen}) %% Dispatch Messages %%------------------------------------------------------------------------------ +run_dispatch_steps([], Msg, State) -> + dispatch(Msg, State); +run_dispatch_steps([{nl, 1}|_Steps], #message{from = ClientId}, State = #state{client_id = ClientId}) -> + State; +run_dispatch_steps([{nl, 0}|Steps], Msg, State) -> + run_dispatch_steps(Steps, Msg, State); +run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = false}) -> + run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State); +run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) -> + run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State); +run_dispatch_steps([{subid, SubId}|Steps], Msg, State) -> + run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State). + %% Enqueue message if the client has been disconnected dispatch(Msg, State = #state{client_id = ClientId, client_pid = undefined}) -> case emqx_hooks:run('message.dropped', [ClientId, Msg]) of @@ -837,19 +822,14 @@ dequeue2(State = #state{mqueue = Q}) -> dequeue(dispatch(Msg, State#state{mqueue = Q1})) end. -%%------------------------------------------------------------------------------ -%% Tune QoS -tune_qos(Topic, Msg = #message{qos = PubQoS}, - #state{subscriptions = SubMap, upgrade_qos = UpgradeQoS}) -> - case maps:find(Topic, SubMap) of - {ok, #{qos := SubQoS}} when UpgradeQoS andalso (SubQoS > PubQoS) -> - Msg#message{qos = SubQoS}; - {ok, #{qos := SubQoS}} when (not UpgradeQoS) andalso (SubQoS < PubQoS) -> - Msg#message{qos = SubQoS}; - {ok, _} -> Msg; - error -> Msg - end. +%%------------------------------------------------------------------------------ +%% Ensure timers + +ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) -> + State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)}; +ensure_await_rel_timer(State) -> + State. %%------------------------------------------------------------------------------ %% Reset Dup @@ -889,5 +869,5 @@ reply(Reply, State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. -maybe_gc(State) -> State. +%%TODO: maybe_gc(State) -> State. diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 32654b121..6fb0647c8 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -19,7 +19,7 @@ -export_type([startlink_ret/0]). -export_type([zone/0, client_id/0, username/0, password/0, peername/0, protocol/0, credentials/0]). --export_type([payload/0]). +-export_type([topic/0, payload/0, dispatches/0]). %%-export_type([payload/0, message/0, delivery/0]). -type(startlink_ret() :: {ok, pid()} | ignore | {error, term()}). @@ -36,7 +36,10 @@ zone => zone(), atom() => term()}). +-type(topic() :: binary()). -type(payload() :: binary() | iodata()). %-type(message() :: #message{}). %-type(delivery() :: #delivery{}). +-type(dispatches() :: [{route, node(), topic()} | {dispatch, topic(), pos_integer()}]). +