diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 1a0332f5b..aa7aad064 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -76,6 +76,9 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, error(mqtt_frame_too_large); parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end}; +%% Match DISCONNECT without payload +parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, _Options) -> + wrap(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}, Rest); %% Match PINGREQ. parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> parse_frame(Rest, Header, 0, Options); @@ -233,8 +236,8 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, parse_will_message(Packet, Bin) -> {Packet, Bin}. -protocol_approved(Ver, Name) -> - lists:member({Ver, Name}, ?PROTOCOL_NAMES). +% protocol_approved(Ver, Name) -> +% lists:member({Ver, Name}, ?PROTOCOL_NAMES). parse_packet_id(<>) -> {PacketId, Rest}. diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index e319a425b..8130a307e 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -34,32 +34,33 @@ %% Packets sent and received of broker -define(PACKET_METRICS, [ - {counter, 'packets/received'}, % All Packets received - {counter, 'packets/sent'}, % All Packets sent - {counter, 'packets/connect'}, % CONNECT Packets received - {counter, 'packets/connack'}, % CONNACK Packets sent - {counter, 'packets/publish/received'}, % PUBLISH packets received - {counter, 'packets/publish/sent'}, % PUBLISH packets sent - {counter, 'packets/puback/received'}, % PUBACK packets received - {counter, 'packets/puback/sent'}, % PUBACK packets sent - {counter, 'packets/puback/missed'}, % PUBACK packets missed - {counter, 'packets/pubrec/received'}, % PUBREC packets received - {counter, 'packets/pubrec/sent'}, % PUBREC packets sent - {counter, 'packets/pubrec/missed'}, % PUBREC packets missed - {counter, 'packets/pubrel/received'}, % PUBREL packets received - {counter, 'packets/pubrel/sent'}, % PUBREL packets sent - {counter, 'packets/pubrel/missed'}, % PUBREL packets missed - {counter, 'packets/pubcomp/received'}, % PUBCOMP packets received - {counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent - {counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed - {counter, 'packets/subscribe'}, % SUBSCRIBE Packets received - {counter, 'packets/suback'}, % SUBACK packets sent - {counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received - {counter, 'packets/unsuback'}, % UNSUBACK Packets sent - {counter, 'packets/pingreq'}, % PINGREQ packets received - {counter, 'packets/pingresp'}, % PINGRESP Packets sent - {counter, 'packets/disconnect'}, % DISCONNECT Packets received - {counter, 'packets/auth'} % Auth Packets received + {counter, 'packets/received'}, % All Packets received + {counter, 'packets/sent'}, % All Packets sent + {counter, 'packets/connect'}, % CONNECT Packets received + {counter, 'packets/connack'}, % CONNACK Packets sent + {counter, 'packets/publish/received'}, % PUBLISH packets received + {counter, 'packets/publish/sent'}, % PUBLISH packets sent + {counter, 'packets/puback/received'}, % PUBACK packets received + {counter, 'packets/puback/sent'}, % PUBACK packets sent + {counter, 'packets/puback/missed'}, % PUBACK packets missed + {counter, 'packets/pubrec/received'}, % PUBREC packets received + {counter, 'packets/pubrec/sent'}, % PUBREC packets sent + {counter, 'packets/pubrec/missed'}, % PUBREC packets missed + {counter, 'packets/pubrel/received'}, % PUBREL packets received + {counter, 'packets/pubrel/sent'}, % PUBREL packets sent + {counter, 'packets/pubrel/missed'}, % PUBREL packets missed + {counter, 'packets/pubcomp/received'}, % PUBCOMP packets received + {counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent + {counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed + {counter, 'packets/subscribe'}, % SUBSCRIBE Packets received + {counter, 'packets/suback'}, % SUBACK packets sent + {counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received + {counter, 'packets/unsuback'}, % UNSUBACK Packets sent + {counter, 'packets/pingreq'}, % PINGREQ packets received + {counter, 'packets/pingresp'}, % PINGRESP Packets sent + {counter, 'packets/disconnect/received'}, % DISCONNECT Packets received + {counter, 'packets/disconnect/sent'}, % DISCONNECT Packets sent + {counter, 'packets/auth'} % Auth Packets received ]). %% Messages sent and received of broker @@ -194,7 +195,7 @@ received2(?UNSUBSCRIBE) -> received2(?PINGREQ) -> inc('packets/pingreq'); received2(?DISCONNECT) -> - inc('packets/disconnect'); + inc('packets/disconnect/received'); received2(_) -> ignore. qos_received(?QOS_0) -> @@ -233,6 +234,8 @@ sent2(?UNSUBACK) -> inc('packets/unsuback'); sent2(?PINGRESP) -> inc('packets/pingresp'); +sent2(?DISCONNECT) -> + inc('packets/disconnect/sent'); sent2(_Type) -> ignore. qos_sent(?QOS_0) -> diff --git a/src/emqx_mqtt_types.erl b/src/emqx_mqtt_types.erl index 0b231fc88..6beb17780 100644 --- a/src/emqx_mqtt_types.erl +++ b/src/emqx_mqtt_types.erl @@ -32,8 +32,8 @@ -type(reason_code() :: 0..16#FF). -type(packet_id() :: 1..16#FFFF). -type(properties() :: #{atom() => term()}). --type(subopts() :: #{rh := 0 | 1, - rap := 0 | 1 | 2, +-type(subopts() :: #{rh := 0 | 1 | 2, + rap := 0 | 1, nl := 0 | 1, qos := qos(), rc => reason_code() diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index ec104799e..39929513e 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -208,6 +208,9 @@ received(Packet = ?PACKET(Type), PState) -> true -> {Packet1, PState1} = preprocess_properties(Packet, PState), process_packet(Packet1, inc_stats(recv, Type, PState1)); + {'EXIT', {topic_filters_invalid, _Stacktrace}} -> + deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), + {error, topic_filters_invalid, PState}; {'EXIT', {Reason, _Stacktrace}} -> deliver({disconnect, ?RC_MALFORMED_PACKET}, PState), {error, Reason, PState} @@ -356,9 +359,17 @@ process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState}; process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = SPid, mountpoint = Mountpoint}) -> + PState = #pstate{session = SPid, mountpoint = Mountpoint, proto_ver = ProtoVer, is_bridge = IsBridge}) -> + RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 -> + case IsBridge of + true -> [{RawTopic, SubOpts#{rap => 1}} || {RawTopic, SubOpts} <- RawTopicFilters]; + false -> [{RawTopic, SubOpts#{rap => 0}} || {RawTopic, SubOpts} <- RawTopicFilters] + end; + true -> + RawTopicFilters + end, case check_subscribe( - parse_topic_filters(?SUBSCRIBE, RawTopicFilters), PState) of + parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of {ok, TopicFilters} -> case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of {ok, TopicFilters1} -> @@ -396,9 +407,11 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), process_packet(?PACKET(?PINGREQ), PState) -> send(?PACKET(?PINGRESP), PState); -process_packet(?PACKET(?DISCONNECT), PState) -> +process_packet(?DISCONNECT_PACKET(?RC_SUCCESS), PState) -> %% Clean willmsg - {stop, normal, PState#pstate{will_msg = undefined}}. + {stop, normal, PState#pstate{will_msg = undefined}}; +process_packet(?DISCONNECT_PACKET(_), PState) -> + {stop, normal, PState}. %%------------------------------------------------------------------------------ %% ConnAck --> Client @@ -485,10 +498,10 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); -deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge = IsBridge, mountpoint = MountPoint}) -> +deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) -> _ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg), Msg1 = emqx_message:update_expiry(Msg), - Msg2 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg1)), + Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), send(emqx_packet:from_message(PacketId, Msg2), PState); deliver({puback, PacketId, ReasonCode}, PState) -> @@ -736,18 +749,6 @@ parse_topic_filters(?SUBSCRIBE, RawTopicFilters) -> parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) -> lists:map(fun emqx_topic:parse/1, RawTopicFilters). -%%----------------------------------------------------------------------------- -%% The retained flag should be propagated for bridge. -%%----------------------------------------------------------------------------- - -clean_retain(false, Msg = #message{flags = #{retain := true}, headers = Headers}) -> - case maps:get(retained, Headers, false) of - true -> Msg; - false -> emqx_message:set_flag(retain, false, Msg) - end; -clean_retain(_IsBridge, Msg) -> - Msg. - %%------------------------------------------------------------------------------ %% Update mountpoint diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 75c3ac197..05142297e 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -448,11 +448,11 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, {ok, _SubOpts} -> emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), %% Why??? - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]), maps:put(Topic, SubOpts, SubMap); error -> emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => true}]), maps:put(Topic, SubOpts, SubMap) end} end, {[], Subscriptions}, TopicFilters), @@ -548,10 +548,10 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> %% Dispatch message handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) -> noreply(case maps:find(Topic, SubMap) of - {ok, #{nl := Nl, qos := QoS, subid := SubId}} -> - run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State); - {ok, #{nl := Nl, qos := QoS}} -> - run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State); + {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> + run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State); + {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> + run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State); error -> dispatch(emqx_message:unset_flag(dup, Msg), State) end); @@ -726,6 +726,11 @@ run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State); run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) -> run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State); +run_dispatch_steps([{rap, 0}|Steps], Msg = #message{flags = Flags}, State = #state{}) -> + Flags1 = maps:put(retain, false, Flags), + run_dispatch_steps(Steps, Msg#message{flags = Flags1}, State); +run_dispatch_steps([{rap, _}|Steps], Msg, State) -> + run_dispatch_steps(Steps, Msg, State); run_dispatch_steps([{subid, SubId}|Steps], Msg, State) -> run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State). diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index c244a40b3..b3b417717 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -184,9 +184,16 @@ parse(Topic = <<"$share/", _/binary>>, #{share := _Group}) -> error({invalid_topic, Topic}); parse(<<"$queue/", Topic1/binary>>, Options) -> parse(Topic1, maps:put(share, <<"$queue">>, Options)); -parse(<<"$share/", Topic1/binary>>, Options) -> - [Group, Topic2] = binary:split(Topic1, <<"/">>), - {Topic2, maps:put(share, Group, Options)}; +parse(Topic = <<"$share/", Topic1/binary>>, Options) -> + case binary:split(Topic1, <<"/">>) of + [<<>>] -> error({invalid_topic, Topic}); + [_] -> error({invalid_topic, Topic}); + [Group, Topic2] -> + case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of + nomatch -> {Topic2, maps:put(share, Group, Options)}; + _ -> error({invalid_topic, Topic}) + end + end; parse(Topic, Options) -> {Topic, Options}.