From 0a6468cf484a4c55ec164bd0187effa2174827dc Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 31 Jul 2019 08:09:47 +0800 Subject: [PATCH] Introduce the 'pipeline' design pattern - Introduce the 'pipeline' design pattern in emqx_protocol module - Reame the '{dispatch, ...' to '{deliver, ...' in emqx_broker module - Rename type 'credentials' to 'client' --- src/emqx_broker.erl | 4 +- src/emqx_client.erl | 2 +- src/emqx_cm.erl | 16 +- src/emqx_mod_acl_internal.erl | 2 +- src/emqx_mod_rewrite.erl | 13 +- src/emqx_mqtt_caps.erl | 22 +- src/emqx_packet.erl | 2 +- src/emqx_protocol.erl | 565 +++++++++++++++++++++------------- src/emqx_session.erl | 33 +- src/emqx_shared_sub.erl | 2 +- 10 files changed, 408 insertions(+), 253 deletions(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 674c50194..d6acc8cdb 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -274,13 +274,13 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> Delivery; [Sub] -> %% optimize? Cnt = dispatch(Sub, Topic, Msg), - Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]}; + Delivery#delivery{results = [{deliver, Topic, Cnt}|Results]}; Subs -> Cnt = lists:foldl( fun(Sub, Acc) -> dispatch(Sub, Topic, Msg) + Acc end, 0, Subs), - Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]} + Delivery#delivery{results = [{deliver, Topic, Cnt}|Results]} end. dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 7e4120628..ab7e9386a 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -205,7 +205,7 @@ | {nl, boolean()} | {qos, qos()}). --type(reason_code() :: emqx_mqtt_types:reason_code()). +-type(reason_code() :: emqx_types:reason_code()). -type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}). diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index b1e3982fe..26e5a4a2f 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -42,7 +42,7 @@ , set_chan_stats/2 ]). --export([ open_session/1 +-export([ open_session/3 , discard_session/1 , resume_session/1 ]). @@ -168,24 +168,22 @@ set_chan_stats(ClientId, ChanPid, Stats) -> ok. %% @doc Open a session. --spec(open_session(map()) -> {ok, emqx_session:session()} - | {error, Reason :: term()}). -open_session(Attrs = #{clean_start := true, - client_id := ClientId}) -> +-spec(open_session(boolean(), emqx_types:client(), map()) + -> {ok, emqx_session:session()} | {error, Reason :: term()}). +open_session(true, Client = #{client_id := ClientId}, Options) -> CleanStart = fun(_) -> ok = discard_session(ClientId), - {ok, emqx_session:init(Attrs), false} + {ok, emqx_session:init(true, Client, Options), false} end, emqx_cm_locker:trans(ClientId, CleanStart); -open_session(Attrs = #{clean_start := false, - client_id := ClientId}) -> +open_session(false, Client = #{client_id := ClientId}, Options) -> ResumeStart = fun(_) -> case resume_session(ClientId) of {ok, Session} -> {ok, Session, true}; {error, not_found} -> - {ok, emqx_session:init(Attrs), false} + {ok, emqx_session:init(false, Client, Options), false} end end, emqx_cm_locker:trans(ClientId, ResumeStart). diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index f1288e323..2629ed60a 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -61,7 +61,7 @@ all_rules() -> %%-------------------------------------------------------------------- %% @doc Check ACL --spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic(), +-spec(check_acl(emqx_types:client(), emqx_types:pubsub(), emqx_topic:topic(), emqx_access_rule:acl_result(), acl_rules()) -> {ok, allow} | {ok, deny} | ok). check_acl(Client, PubSub, Topic, _AclResult, Rules) -> diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 0b0285ddf..4a278dd53 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -32,9 +32,9 @@ , unload/1 ]). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Load/Unload -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- load(RawRules) -> Rules = compile(RawRules), @@ -42,10 +42,10 @@ load(RawRules) -> emqx_hooks:add('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Rules]), emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). -rewrite_subscribe(_Credentials, TopicTable, Rules) -> +rewrite_subscribe(_Client, TopicTable, Rules) -> {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}. -rewrite_unsubscribe(_Credentials, TopicTable, Rules) -> +rewrite_unsubscribe(_Client, TopicTable, Rules) -> {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}. rewrite_publish(Message = #message{topic = Topic}, Rules) -> @@ -56,9 +56,9 @@ unload(_) -> emqx_hooks:del('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3), emqx_hooks:del('message.publish', fun ?MODULE:rewrite_publish/2). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Internal functions -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- match_rule(Topic, []) -> Topic; @@ -86,3 +86,4 @@ compile(Rules) -> {ok, MP} = re:compile(Re), {rewrite, Topic, MP, Dest} end, Rules). + diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index a95641723..5e0d2a3fc 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTTv5 capabilities +%% @doc MQTTv5 Capabilities -module(emqx_mqtt_caps). -include("emqx.hrl"). @@ -28,14 +28,17 @@ -export([default_caps/0]). +-export_type([caps/0]). + -type(caps() :: #{max_packet_size => integer(), max_clientid_len => integer(), max_topic_alias => integer(), max_topic_levels => integer(), max_qos_allowed => emqx_types:qos(), - mqtt_retain_available => boolean(), - mqtt_shared_subscription => boolean(), - mqtt_wildcard_subscription => boolean()}). + mqtt_retain_available => boolean(), + mqtt_shared_subscription => boolean(), + mqtt_wildcard_subscription => boolean() + }). -define(UNLIMITED, 0). @@ -44,18 +47,21 @@ {max_topic_alias, ?UNLIMITED}, {max_topic_levels, ?UNLIMITED}, {max_qos_allowed, ?QOS_2}, - {mqtt_retain_available, true}, - {mqtt_shared_subscription, true}, - {mqtt_wildcard_subscription, true}]). + {mqtt_retain_available, true}, + {mqtt_shared_subscription, true}, + {mqtt_wildcard_subscription, true} + ]). -define(PUBCAP_KEYS, [max_qos_allowed, mqtt_retain_available, max_topic_alias ]). + -define(SUBCAP_KEYS, [max_qos_allowed, max_topic_levels, mqtt_shared_subscription, - mqtt_wildcard_subscription]). + mqtt_wildcard_subscription + ]). -spec(check_pub(emqx_types:zone(), map()) -> ok | {error, emqx_types:reason_code()}). check_pub(Zone, Props) when is_map(Props) -> diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 71d1fb116..be21f41d1 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -142,7 +142,7 @@ publish_props(Headers) -> 'Message-Expiry-Interval'], Headers). %% @doc Message from Packet --spec(to_message(emqx_types:credentials(), emqx_ypes:packet()) +-spec(to_message(emqx_types:client(), emqx_ypes:packet()) -> emqx_types:message()). to_message(#{client_id := ClientId, username := Username, peername := Peername}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index ea052f05f..12bb008ac 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -27,6 +27,7 @@ -export([ info/1 , info/2 , attrs/1 + , caps/1 ]). -export([ init/2 @@ -44,14 +45,12 @@ proto_ver :: emqx_types:version(), client :: emqx_types:client(), session :: emqx_session:session(), - mountfun :: fun((emqx_topic:topic()) -> emqx_topic:topic()), keepalive :: non_neg_integer(), will_msg :: emqx_types:message(), enable_acl :: boolean(), - is_bridge :: boolean(), - topic_aliases :: map(), - alias_maximum :: map() - }). + topic_aliases :: maybe(map()), + alias_maximum :: maybe(map()) + }). -opaque(proto_state() :: #protocol{}). @@ -62,8 +61,6 @@ info(#protocol{proto_name = ProtoName, session = Session, keepalive = Keepalive, will_msg = WillMsg, - enable_acl = EnableAcl, - is_bridge = IsBridge, topic_aliases = Aliases}) -> #{proto_name => ProtoName, proto_ver => ProtoVer, @@ -71,8 +68,6 @@ info(#protocol{proto_name = ProtoName, session => emqx_session:info(Session), keepalive => Keepalive, will_msg => WillMsg, - enable_acl => EnableAcl, - is_bridge => IsBridge, topic_aliases => Aliases }. @@ -91,8 +86,6 @@ info(session, #protocol{session = Session}) -> Session; info(keepalive, #protocol{keepalive = Keepalive}) -> Keepalive; -info(is_bridge, #protocol{is_bridge = IsBridge}) -> - IsBridge; info(topic_aliases, #protocol{topic_aliases = Aliases}) -> Aliases. @@ -100,34 +93,35 @@ attrs(#protocol{proto_name = ProtoName, proto_ver = ProtoVer, client = Client, session = Session, - keepalive = Keepalive, - is_bridge = IsBridge}) -> + keepalive = Keepalive}) -> #{proto_name => ProtoName, - proto_ver => ProtoVer, - client => Client, - session => emqx_session:attrs(Session), - keepalive => Keepalive, - is_bridge => IsBridge + proto_ver => ProtoVer, + client => Client, + session => emqx_session:attrs(Session), + keepalive => Keepalive }. +caps(#protocol{client = #{zone := Zone}}) -> + emqx_mqtt_caps:get_caps(Zone). + -spec(init(map(), proplists:proplist()) -> proto_state()). init(ConnInfo, Options) -> Zone = proplists:get_value(zone, Options), - Peercert = maps:get(peercert, ConnInfo, nossl), + Peercert = maps:get(peercert, ConnInfo, undefined), Username = peer_cert_as_username(Peercert, Options), Mountpoint = emqx_zone:get_env(Zone, mountpoint), - Client = maps:merge(#{zone => Zone, - username => Username, - mountpoint => Mountpoint + EnableAcl = emqx_zone:get_env(Zone, enable_acl, true), + Client = maps:merge(#{zone => Zone, + username => Username, + mountpoint => Mountpoint, + is_bridge => false, + is_superuser => false }, ConnInfo), - EnableAcl = emqx_zone:get_env(Zone, enable_acl, false), - MountFun = fun(Topic) -> - emqx_mountpoint:mount(Mountpoint, Topic) - end, - #protocol{client = Client, - mountfun = MountFun, - enable_acl = EnableAcl, - is_bridge = false + #protocol{proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + client = Client, + %%mountfun = MountFun, + enable_acl = EnableAcl }. peer_cert_as_username(Peercert, Options) -> @@ -147,61 +141,39 @@ peer_cert_as_username(Peercert, Options) -> | {ok, emqx_types:packet(), proto_state()} | {error, Reason :: term(), proto_state()} | {stop, Error :: atom(), proto_state()}). -handle_in(?CONNECT_PACKET( - #mqtt_packet_connect{proto_name = ProtoName, - proto_ver = ProtoVer, - is_bridge = IsBridge, - client_id = ClientId, - username = Username, - password = Password, - keepalive = Keepalive} = ConnPkt), - PState = #protocol{client = Client}) -> - Client1 = maps:merge(Client, #{client_id => ClientId, - username => Username, - password => Password - }), - emqx_logger:set_metadata_client_id(ClientId), - WillMsg = emqx_packet:will_msg(ConnPkt), - PState1 = PState#protocol{client = Client1, - proto_name = ProtoName, - proto_ver = ProtoVer, - is_bridge = IsBridge, - keepalive = Keepalive, - will_msg = WillMsg - }, - %% fun validate_packet/2, - case pipeline([fun check_connect/2, - fun handle_connect/2], ConnPkt, PState1) of - {ok, SP, PState2} -> - handle_out({connack, ?RC_SUCCESS, sp(SP)}, PState2); - {error, ReasonCode} -> - handle_out({connack, ReasonCode}, PState1); - {error, ReasonCode, PState2} -> - handle_out({connack, ReasonCode}, PState2) +handle_in(?CONNECT_PACKET(#mqtt_packet_connect{client_id = ClientId} = ConnPkt), + PState) -> + ok = emqx_logger:set_metadata_client_id(ClientId), + case pipeline([fun validate_in/2, + fun preprocess_props/2, + fun check_connect/2, + fun enrich_pstate/2, + fun auth_connect/2], ConnPkt, PState) of + {ok, NConnPkt, NPState} -> + handle_connect(NConnPkt, NPState); + {error, ReasonCode, NPState} -> + handle_out({disconnect, ReasonCode}, NPState) end; handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState) -> - case pipeline([fun validate_packet/2, - fun check_pub_caps/2, - fun check_pub_acl/2, - fun handle_publish/2], Packet, PState) of - {error, ReasonCode} -> - ?LOG(warning, "Cannot publish qos~w message to ~s due to ~s", - [QoS, Topic, emqx_reason_codes:text(ReasonCode)]), - handle_out(case QoS of - ?QOS_0 -> {puberr, ReasonCode}; - ?QOS_1 -> {puback, PacketId, ReasonCode}; - ?QOS_2 -> {pubrec, PacketId, ReasonCode} - end, PState); - Result -> Result + case pipeline([fun validate_in/2, + fun preprocess_props/2, + fun check_publish/2], Packet, PState) of + {ok, NPacket, NPState} -> + handle_publish(NPacket, NPState); + {error, ReasonCode, PState1} -> + ?LOG(warning, "Cannot publish message to ~s due to ~s", + [Topic, emqx_reason_codes:text(ReasonCode)]), + handle_puback(QoS, PacketId, ReasonCode, PState1) end; handle_in(?PUBACK_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> case emqx_session:puback(PacketId, ReasonCode, Session) of {ok, NSession} -> {ok, PState#protocol{session = NSession}}; + {ok, Publishes, NSession} -> + handle_out({publish, Publishes}, PState#protocol{session = NSession}); {error, _NotFound} -> - %% TODO: metrics? error msg? {ok, PState} end; @@ -221,35 +193,37 @@ handle_in(?PUBREL_PACKET(PacketId, ReasonCode), PState = #protocol{session = Ses handle_out({pubcomp, PacketId, ReasonCode}, PState) end; -handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), - PState = #protocol{session = Session}) -> +handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> case emqx_session:pubcomp(PacketId, ReasonCode, Session) of {ok, NSession} -> {ok, PState#protocol{session = NSession}}; - {error, _ReasonCode} -> - %% TODO: How to handle the reason code? + {ok, Publishes, NSession} -> + handle_out({publish, Publishes}, PState#protocol{session = NSession}); + {error, _NotFound} -> {ok, PState} end; handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), PState = #protocol{client = Client}) -> - case validate(Packet) of - ok -> ok = emqx_hooks:run('client.subscribe', - [Client, Properties, TopicFilters]), - TopicFilters1 = enrich_subid(Properties, TopicFilters), - {ReasonCodes, PState1} = handle_subscribe(TopicFilters1, PState), - handle_out({suback, PacketId, ReasonCodes}, PState1); + case validate_in(Packet, PState) of + ok -> + ok = emqx_hooks:run('client.subscribe', + [Client, Properties, TopicFilters]), + TopicFilters1 = enrich_subid(Properties, TopicFilters), + {ReasonCodes, PState1} = handle_subscribe(TopicFilters1, PState), + handle_out({suback, PacketId, ReasonCodes}, PState1); {error, ReasonCode} -> handle_out({disconnect, ReasonCode}, PState) end; handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), PState = #protocol{client = Client}) -> - case validate(Packet) of - ok -> ok = emqx_hooks:run('client.unsubscribe', - [Client, Properties, TopicFilters]), - {ReasonCodes, PState1} = handle_unsubscribe(TopicFilters, PState), - handle_out({unsuback, PacketId, ReasonCodes}, PState1); + case validate_in(Packet, PState) of + ok -> + ok = emqx_hooks:run('client.unsubscribe', + [Client, Properties, TopicFilters]), + {ReasonCodes, PState1} = handle_unsubscribe(TopicFilters, PState), + handle_out({unsuback, PacketId, ReasonCodes}, PState1); {error, ReasonCode} -> handle_out({disconnect, ReasonCode}, PState) end; @@ -280,7 +254,7 @@ handle_in(Packet, PState) -> handle_deliver(Delivers, PState = #protocol{client = Client, session = Session}) when is_list(Delivers) -> - case emqx_session:handle(Delivers, Session) of + case emqx_session:deliver(Delivers, Session) of {ok, Publishes, NSession} -> Packets = lists:map(fun({publish, PacketId, Msg}) -> Msg0 = emqx_hooks:run_fold('message.deliver', [Client], Msg), @@ -293,12 +267,24 @@ handle_deliver(Delivers, PState = #protocol{client = Client, session = Session}) {ok, PState#protocol{session = NSession}} end. +%%-------------------------------------------------------------------- +%% Handle puback +%%-------------------------------------------------------------------- + +handle_puback(?QOS_0, _PacketId, ReasonCode, PState) -> + handle_out({puberr, ReasonCode}, PState); +handle_puback(?QOS_1, PacketId, ReasonCode, PState) -> + handle_out({puback, PacketId, ReasonCode}, PState); +handle_puback(?QOS_2, PacketId, ReasonCode, PState) -> + handle_out({pubrec, PacketId, ReasonCode}, PState). + %%-------------------------------------------------------------------- %% Handle outgoing packet %%-------------------------------------------------------------------- handle_out({connack, ?RC_SUCCESS, SP}, PState = #protocol{client = Client}) -> - ok = emqx_hooks:run('client.connected', [Client, ?RC_SUCCESS, info(PState)]), + ok = emqx_hooks:run('client.connected', + [Client, ?RC_SUCCESS, info(PState)]), Props = #{}, %% TODO: ... {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, Props), PState}; @@ -312,13 +298,24 @@ handle_out({connack, ReasonCode}, PState = #protocol{client = Client, Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer), {error, Reason, ?CONNACK_PACKET(ReasonCode1), PState}; +handle_out({publish, Publishes}, PState = #protocol{client = Client = #{mountpoint := Mountpoint}}) -> + Mount = fun(Msg) -> emqx_mountpoint:mount(Mountpoint, Msg) end, + Packets = lists:map( + fun({publish, PacketId, Msg}) -> + Msg1 = emqx_hooks:run_fold('message.deliver', [Client], Msg), + Msg2 = Mount(emqx_message:update_expiry(Msg1)), + emqx_packet:from_message(PacketId, Msg2) + end, Publishes), + {ok, Packets, PState}; + handle_out({publish, PacketId, Msg}, PState = #protocol{client = Client}) -> Msg0 = emqx_hooks:run_fold('message.deliver', [Client], Msg), Msg1 = emqx_message:update_expiry(Msg0), Msg2 = emqx_mountpoint:unmount(maps:get(mountpoint, Client), Msg1), {ok, emqx_packet:from_message(PacketId, Msg2), PState}; -handle_out({puberr, ReasonCode}, PState) -> +%% TODO: How to handle the err? +handle_out({puberr, _ReasonCode}, PState) -> {ok, PState}; handle_out({puback, PacketId, ReasonCode}, PState) -> @@ -364,77 +361,265 @@ handle_timeout(TRef, Msg, PState = #protocol{session = Session}) -> {ok, NSession} -> {ok, PState#protocol{session = NSession}}; {ok, Publishes, NSession} -> - %% TODO: handle out... - io:format("Timeout publishes: ~p~n", [Publishes]), - {ok, PState#protocol{session = NSession}} + handle_out({publish, Publishes}, PState#protocol{session = NSession}) end. -terminate(Reason, _State) -> +terminate(Reason, _PState) -> io:format("Terminated for ~p~n", [Reason]), ok. +%%-------------------------------------------------------------------- +%% Validate incoming packet +%%-------------------------------------------------------------------- + +-spec(validate_in(emqx_types:packet(), proto_state()) + -> ok | {error, emqx_types:reason_code()}). +validate_in(Packet, _PState) -> + try emqx_packet:validate(Packet) of + true -> ok + catch + error:protocol_error -> + {error, ?RC_PROTOCOL_ERROR}; + error:subscription_identifier_invalid -> + {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED}; + error:topic_alias_invalid -> + {error, ?RC_TOPIC_ALIAS_INVALID}; + error:topic_filters_invalid -> + {error, ?RC_TOPIC_FILTER_INVALID}; + error:topic_name_invalid -> + {error, ?RC_TOPIC_FILTER_INVALID}; + error:_Reason -> + {error, ?RC_MALFORMED_PACKET} + end. + +%%-------------------------------------------------------------------- +%% PreProcess Properties +%%-------------------------------------------------------------------- + +preprocess_props(#mqtt_packet_connect{ + properties = #{'Topic-Alias-Maximum' := Max} + }, + PState = #protocol{alias_maximum = AliasMaximum}) -> + {ok, PState#protocol{alias_maximum = AliasMaximum#{outbound => Max}}}; + +preprocess_props(Packet = #mqtt_packet{variable = Publish}, PState) -> + case preprocess_props(Publish, PState) of + {ok, Publish1, PState1} -> + {ok, Packet#mqtt_packet{variable = Publish1}, PState1}; + Error -> Error + end; + +preprocess_props(Publish = #mqtt_packet_publish{topic_name = <<>>, + properties = #{'Topic-Alias' := AliasId} + }, + PState = #protocol{topic_aliases = TopicAliases}) -> + case maps:find(AliasId, TopicAliases) of + {ok, Topic} -> + {ok, Publish#mqtt_packet_publish{topic_name = Topic}, PState}; + false -> {error, ?RC_TOPIC_ALIAS_INVALID} + end; + +preprocess_props(Publish = #mqtt_packet_publish{topic_name = Topic, + properties = #{'Topic-Alias' := AliasId} + }, + PState = #protocol{topic_aliases = Aliases}) -> + Aliases1 = maps:put(AliasId, Topic, Aliases), + {ok, Publish, PState#protocol{topic_aliases = Aliases1}}; + +preprocess_props(Packet, PState) -> + {ok, Packet, PState}. + %%-------------------------------------------------------------------- %% Check Connect Packet %%-------------------------------------------------------------------- -check_connect(_ConnPkt, PState) -> - {ok, PState}. +check_connect(ConnPkt, PState) -> + pipeline([fun check_proto_ver/2, + fun check_client_id/2, + %%fun check_flapping/2, + fun check_banned/2, + fun check_will_topic/2, + fun check_will_retain/2], ConnPkt, PState). -%%-------------------------------------------------------------------- -%% Handle Connect Packet -%%-------------------------------------------------------------------- +check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, + proto_name = Name}, _PState) -> + case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of + true -> ok; + false -> {error, ?RC_PROTOCOL_ERROR} + end. -handle_connect(#mqtt_packet_connect{proto_name = ProtoName, - proto_ver = ProtoVer, - is_bridge = IsBridge, - clean_start = CleanStart, - keepalive = Keepalive, - properties = ConnProps, - client_id = ClientId, - username = Username, - password = Password - } = ConnPkt, - PState = #protocol{client = Client}) -> - case emqx_access_control:authenticate( - Client#{password => Password}) of - {ok, AuthResult} -> - Client1 = maps:merge(Client, AuthResult), - %% Open session - case open_session(ConnPkt, PState) of - {ok, Session, SP} -> - PState1 = PState#protocol{client = Client1, - session = Session}, - ok = emqx_cm:register_channel(ClientId), - {ok, SP, PState1}; - {error, Error} -> - ?LOG(error, "Failed to open session: ~p", [Error]), - {error, ?RC_UNSPECIFIED_ERROR, PState#protocol{client = Client1}} +%% MQTT3.1 does not allow null clientId +check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3, + client_id = <<>>}, _PState) -> + {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; + +%% Issue#599: Null clientId and clean_start = false +check_client_id(#mqtt_packet_connect{client_id = <<>>, + clean_start = false}, _PState) -> + {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; +check_client_id(#mqtt_packet_connect{client_id = <<>>, + clean_start = true}, _PState) -> + ok; +check_client_id(#mqtt_packet_connect{client_id = ClientId}, + #protocol{client = #{zone := Zone}}) -> + Len = byte_size(ClientId), + MaxLen = emqx_zone:get_env(Zone, max_clientid_len), + case (1 =< Len) andalso (Len =< MaxLen) of + true -> ok; + false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} + end. + +check_banned(#mqtt_packet_connect{client_id = ClientId, + username = Username}, + #protocol{client = Client = #{zone := Zone}}) -> + case emqx_zone:get_env(Zone, enable_ban, false) of + true -> + case emqx_banned:check(Client#{client_id => ClientId, + username => Username}) of + true -> {error, ?RC_BANNED}; + false -> ok end; + false -> ok + end. + +check_will_topic(#mqtt_packet_connect{will_flag = false}, _PState) -> + ok; +check_will_topic(#mqtt_packet_connect{will_topic = WillTopic}, _PState) -> + try emqx_topic:validate(WillTopic) of + true -> ok + catch error:_Error -> + {error, ?RC_TOPIC_NAME_INVALID} + end. + +check_will_retain(#mqtt_packet_connect{will_retain = false}, _PState) -> + ok; +check_will_retain(#mqtt_packet_connect{will_retain = true}, + #protocol{client = #{zone := Zone}}) -> + case emqx_zone:get_env(Zone, mqtt_retain_available, true) of + true -> ok; + false -> {error, ?RC_RETAIN_NOT_SUPPORTED} + end. + +%%-------------------------------------------------------------------- +%% Enrich state +%%-------------------------------------------------------------------- + +enrich_pstate(#mqtt_packet_connect{proto_name = ProtoName, + proto_ver = ProtoVer, + keepalive = Keepalive, + client_id = ClientId, + username = Username, + is_bridge = IsBridge + }, + PState = #protocol{client = Client}) -> + Client1 = maps:merge(Client, #{client_id => ClientId, + username => Username, + is_bridge => IsBridge + }), + {ok, PState#protocol{proto_name = ProtoName, + proto_ver = ProtoVer, + client = Client1, + keepalive = Keepalive}}. + +%%-------------------------------------------------------------------- +%% Auth Connect +%%-------------------------------------------------------------------- + +auth_connect(#mqtt_packet_connect{client_id = ClientId, + username = Username, + password = Password}, + PState = #protocol{client = Client}) -> + case emqx_access_control:authenticate(Client#{password => Password}) of + {ok, AuthResult} -> + {ok, PState#protocol{client = maps:merge(Client, AuthResult)}}; {error, Reason} -> ?LOG(warning, "Client ~s (Username: '~s') login failed for ~p", [ClientId, Username, Reason]), - {error, emqx_reason_codes:connack_error(Reason), PState} + {error, Reason} end. +%%-------------------------------------------------------------------- +%% Handle Connect +%%-------------------------------------------------------------------- + +handle_connect(ConnPkt, PState) -> + case open_session(ConnPkt, PState) of + {ok, Session, SP} -> + WillMsg = emqx_packet:will_msg(ConnPkt), + handle_out({connack, ?RC_SUCCESS, sp(SP)}, + PState#protocol{session = Session, will_msg = WillMsg}); + {error, Reason} -> + %% TODO: Unknown error? + ?LOG(error, "Failed to open session: ~p", [Reason]), + handle_out({connack, ?RC_UNSPECIFIED_ERROR}, PState) + end. + +%%-------------------------------------------------------------------- +%% Open session +%%-------------------------------------------------------------------- + open_session(#mqtt_packet_connect{clean_start = CleanStart, - %%properties = ConnProps, - client_id = ClientId, - username = Username} = ConnPkt, - PState = #protocol{client = Client}) -> - emqx_cm:open_session(maps:merge(Client, #{clean_start => CleanStart, - max_inflight => 0, - expiry_interval => 0})). + properties = ConnProps}, + #protocol{client = Client = #{zone := Zone}}) -> + MaxInflight = maps:get('Receive-Maximum', ConnProps, + emqx_zone:get_env(Zone, max_inflight, 65535)), + Interval = maps:get('Session-Expiry-Interval', ConnProps, + emqx_zone:get_env(Zone, session_expiry_interval, 0)), + emqx_cm:open_session(CleanStart, Client, #{max_inflight => MaxInflight, + expiry_interval => Interval + }). %%-------------------------------------------------------------------- %% Handle Publish Message: Client -> Broker %%-------------------------------------------------------------------- -handle_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, PacketId), - PState = #protocol{client = Client = #{mountpoint := Mountpoint}}) -> + +%% Check Publish +check_publish(Packet, PState) -> + pipeline([fun check_pub_alias/2, + fun check_pub_caps/2, + fun check_pub_acl/2], Packet, PState). + +%% Check Pub Alias +check_pub_alias(#mqtt_packet{ + variable = #mqtt_packet_publish{ + properties = #{'Topic-Alias' := AliasId} + } + }, + #protocol{alias_maximum = Limits}) -> + case (Limits == undefined) + orelse (Max = maps:get(inbound, Limits, 0)) == 0 + orelse (AliasId > Max) of + true -> {error, ?RC_TOPIC_ALIAS_INVALID}; + false -> ok + end; +check_pub_alias(_Packet, _PState) -> ok. + +%% Check Pub Caps +check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, + retain = Retain + }}, + #protocol{client = #{zone := Zone}}) -> + emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}). + +%% Check Pub ACL +check_pub_acl(_Packet, #protocol{enable_acl = false}) -> + ok; +check_pub_acl(_Packet, #protocol{client = #{is_superuser := true}}) -> + ok; +check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, + #protocol{client = Client}) -> + case emqx_access_control:check_acl(Client, publish, Topic) of + allow -> ok; + deny -> {error, ?RC_NOT_AUTHORIZED} + end. + +handle_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), + PState = #protocol{client = Client = #{mountpoint := MountPoint}}) -> %% TODO: ugly... publish_to_msg(...) - Msg = emqx_packet:to_message(Client, Packet), - Msg1 = emqx_mountpoint:mount(Mountpoint, Msg), - Msg2 = emqx_message:set_flag(dup, false, Msg1), + Mount = fun(Msg) -> emqx_mountpoint:mount(MountPoint, Msg) end, + Msg1 = emqx_packet:to_message(Client, Packet), + Msg2 = Mount(emqx_message:set_flag(dup, false, Msg1)), handle_publish(PacketId, Msg2, PState). handle_publish(_PacketId, Msg = #message{qos = ?QOS_0}, PState) -> @@ -472,15 +657,15 @@ handle_subscribe([{TopicFilter, SubOpts}|More], Acc, PState) -> handle_subscribe(More, [RC|Acc], PState1). do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, - PState = #protocol{client = Client, - session = Session, - mountfun = Mount}) -> + PState = #protocol{client = Client = #{mountpoint := Mountpoint}, + session = Session}) -> + Mount = fun(Msg) -> emqx_mountpoint:mount(Mountpoint, Msg) end, %% 1. Parse 2. Check 3. Enrich 5. MountPoint 6. Session SubOpts1 = maps:merge(?DEFAULT_SUBOPTS, SubOpts), {TopicFilter1, SubOpts2} = emqx_topic:parse(TopicFilter, SubOpts1), SubOpts3 = enrich_subopts(SubOpts2, PState), case check_subscribe(TopicFilter1, PState) of - ok -> + {ok, _, _} -> %% TODO:... TopicFilter2 = Mount(TopicFilter1), case emqx_session:subscribe(Client, TopicFilter2, SubOpts3, Session) of {ok, NSession} -> @@ -497,14 +682,30 @@ enrich_subid(_Properties, TopicFilters) -> enrich_subopts(SubOpts, #protocol{proto_ver = ?MQTT_PROTO_V5}) -> SubOpts; -enrich_subopts(SubOpts, #protocol{client = #{zone := Zone}, - is_bridge = IsBridge}) -> +enrich_subopts(SubOpts, #protocol{client = #{zone := Zone, + is_bridge := IsBridge}}) -> Rap = flag(IsBridge), Nl = flag(emqx_zone:get_env(Zone, ignore_loop_deliver, false)), SubOpts#{rap => Rap, nl => Nl}. -check_subscribe(_TopicFilter, _PState) -> - ok. +check_subscribe(TopicFilter, PState) -> + pipeline([%%TODO: fun check_sub_caps/2, + fun check_sub_acl/2], TopicFilter, PState). + +%% Check Sub Caps +check_sub_caps(TopicFilter, #protocol{client = #{zone := Zone}}) -> + emqx_mqtt_caps:check_sub(Zone, TopicFilter). + +%% Check Sub ACL +check_sub_acl(_TopicFilter, #protocol{enable_acl = false}) -> + ok; +check_sub_acl(_TopicFilter, #protocol{client = #{is_superuser := true}}) -> + ok; +check_sub_acl(TopicFilter, #protocol{client = Client}) -> + case emqx_access_control:check_acl(Client, subscribe, TopicFilter) of + allow -> ok; + deny -> {error, ?RC_NOT_AUTHORIZED} + end. %%-------------------------------------------------------------------- %% Handle Unsubscribe Request @@ -520,9 +721,9 @@ handle_unsubscribe([TopicFilter|More], Acc, PState) -> {RC, PState1} = do_unsubscribe(TopicFilter, PState), handle_unsubscribe(More, [RC|Acc], PState1). -do_unsubscribe(TopicFilter, PState = #protocol{client = Client, - session = Session, - mountfun = Mount}) -> +do_unsubscribe(TopicFilter, PState = #protocol{client = Client = #{mountpoint := Mountpoint}, + session = Session}) -> + Mount = fun(Topic) -> emqx_mountpoint:mount(Mountpoint, Topic) end, TopicFilter1 = Mount(element(1, emqx_topic:parse(TopicFilter))), case emqx_session:unsubscribe(Client, TopicFilter1, Session) of {ok, NSession} -> @@ -530,65 +731,13 @@ do_unsubscribe(TopicFilter, PState = #protocol{client = Client, {error, RC} -> {RC, PState} end. -%%-------------------------------------------------------------------- -%% Validate Incoming Packet -%%-------------------------------------------------------------------- - -validate_packet(Packet, _PState) -> - validate(Packet). - --spec(validate(emqx_types:packet()) -> ok | {error, emqx_types:reason_code()}). -validate(Packet) -> - try emqx_packet:validate(Packet) of - true -> ok - catch - error:protocol_error -> - {error, ?RC_PROTOCOL_ERROR}; - error:subscription_identifier_invalid -> - {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED}; - error:topic_alias_invalid -> - {error, ?RC_TOPIC_ALIAS_INVALID}; - error:topic_filters_invalid -> - {error, ?RC_TOPIC_FILTER_INVALID}; - error:topic_name_invalid -> - {error, ?RC_TOPIC_FILTER_INVALID}; - error:_Reason -> - {error, ?RC_MALFORMED_PACKET} - end. - -%%-------------------------------------------------------------------- -%% Check Publish -%%-------------------------------------------------------------------- - -check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, - retain = Retain}, - variable = #mqtt_packet_publish{}}, - #protocol{client = #{zone := Zone}}) -> - emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}). - -check_pub_acl(_Packet, #protocol{enable_acl = false}) -> - ok; -check_pub_acl(_Packet, #protocol{client = #{is_superuser := true}}) -> - ok; -check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, - #protocol{client = Client}) -> - do_acl_check(Client, publish, Topic). - -check_sub_acl(_Packet, #protocol{enable_acl = false}) -> - ok. - -do_acl_check(Client, PubSub, Topic) -> - case emqx_access_control:check_acl(Client, PubSub, Topic) of - allow -> ok; - deny -> {error, ?RC_NOT_AUTHORIZED} - end. - %%-------------------------------------------------------------------- %% Pipeline %%-------------------------------------------------------------------- -pipeline([Fun], Packet, PState) -> - Fun(Packet, PState); +pipeline([], Packet, PState) -> + {ok, Packet, PState}; + pipeline([Fun|More], Packet, PState) -> case Fun(Packet, PState) of ok -> pipeline(More, Packet, PState); @@ -597,7 +746,9 @@ pipeline([Fun|More], Packet, PState) -> {ok, NPacket, NPState} -> pipeline(More, NPacket, NPState); {error, Reason} -> - {error, Reason} + {error, Reason, PState}; + {error, Reason, NPState} -> + {error, Reason, NPState} end. %%-------------------------------------------------------------------- diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 95f8b3727..8d529a921 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -50,7 +50,7 @@ -logger_header("[Session]"). --export([init/1]). +-export([init/3]). -export([ info/1 , attrs/1 @@ -68,7 +68,7 @@ , pubcomp/3 ]). --export([handle/2]). +-export([deliver/2]). -export([timeout/3]). @@ -143,11 +143,9 @@ %%-------------------------------------------------------------------- %% @doc Init a session. --spec(init(Attrs :: map()) -> session()). -init(#{zone := Zone, - clean_start := CleanStart, - max_inflight := MaxInflight, - expiry_interval := ExpiryInterval}) -> +-spec(init(boolean(), emqx_types:client(), Options :: map()) -> session()). +init(CleanStart, #{zone := Zone}, #{max_inflight := MaxInflight, + expiry_interval := ExpiryInterval}) -> #session{clean_start = CleanStart, max_subscriptions = get_env(Zone, max_subscriptions, 0), subscriptions = #{}, @@ -361,6 +359,7 @@ pubrec(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) -> {ok, Session#session{inflight = Inflight1}}; {value, {pubrel, _Ts}} -> ?LOG(warning, "The PUBREC ~w is duplicated", [PacketId]), + ok = emqx_metrics:inc('packets.pubrec.inuse'), {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]), @@ -410,7 +409,7 @@ dequeue(Session = #session{inflight = Inflight, mqueue = Q}) -> true -> {ok, Session}; false -> {Msgs, Q1} = dequeue(batch_n(Inflight), [], Q), - handle(lists:reverse(Msgs), [], Session#session{mqueue = Q1}) + deliver(lists:reverse(Msgs), [], Session#session{mqueue = Q1}) end. dequeue(Cnt, Msgs, Q) when Cnt =< 0 -> @@ -433,28 +432,28 @@ batch_n(Inflight) -> %% Broker -> Client: Publish | Msg %%-------------------------------------------------------------------- -handle(Delivers, Session = #session{subscriptions = Subs}) +deliver(Delivers, Session = #session{subscriptions = Subs}) when is_list(Delivers) -> Msgs = [enrich(get_subopts(Topic, Subs), Msg, Session) || {deliver, Topic, Msg} <- Delivers], - handle(Msgs, [], Session). + deliver(Msgs, [], Session). -handle([], Publishes, Session) -> +deliver([], Publishes, Session) -> {ok, lists:reverse(Publishes), Session}; -handle([Msg = #message{qos = ?QOS_0}|More], Acc, Session) -> - handle(More, [{publish, undefined, Msg}|Acc], Session); +deliver([Msg = #message{qos = ?QOS_0}|More], Acc, Session) -> + deliver(More, [{publish, undefined, Msg}|Acc], Session); -handle([Msg = #message{qos = QoS}|More], Acc, - Session = #session{next_pkt_id = PacketId, inflight = Inflight}) +deliver([Msg = #message{qos = QoS}|More], Acc, + Session = #session{next_pkt_id = PacketId, inflight = Inflight}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case emqx_inflight:is_full(Inflight) of true -> - handle(More, Acc, enqueue(Msg, Session)); + deliver(More, Acc, enqueue(Msg, Session)); false -> Publish = {publish, PacketId, Msg}, Session1 = await(PacketId, Msg, Session), - handle(More, [Publish|Acc], next_pkt_id(Session1)) + deliver(More, [Publish|Acc], next_pkt_id(Session1)) end. enqueue(Msg, Session = #session{mqueue = Q}) -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index e6c8deefc..ac64e1d7a 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -118,7 +118,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, F {Type, SubPid} -> case do_dispatch(SubPid, Topic, Msg, Type) of ok -> - Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]}; + Delivery#delivery{results = [{deliver, {Group, Topic}, 1} | Results]}; {error, _Reason} -> %% Failed to dispatch to this sub, try next. dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])