From aa34258f1e36a2e85eb48f09d06c06b065f5767d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Wed, 5 Sep 2018 14:25:33 +0800 Subject: [PATCH 01/10] Support Retain As Published in Subscription Options --- src/emqx_session.erl | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 75c3ac197..e7e63763c 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -548,10 +548,10 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> %% Dispatch message handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) -> noreply(case maps:find(Topic, SubMap) of - {ok, #{nl := Nl, qos := QoS, subid := SubId}} -> - run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State); - {ok, #{nl := Nl, qos := QoS}} -> - run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State); + {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> + run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State); + {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> + run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State); error -> dispatch(emqx_message:unset_flag(dup, Msg), State) end); @@ -726,6 +726,11 @@ run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State); run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) -> run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State); +run_dispatch_steps([{rap, false}|Steps], Msg = #message{flags = Flags}, State = #state{}) -> + Flags1 = maps:put(retain, false, Flags), + run_dispatch_steps(Steps, Msg#message{flags = Flags1}, State); +run_dispatch_steps([{rap, _}|Steps], Msg, State) -> + run_dispatch_steps(Steps, Msg, State); run_dispatch_steps([{subid, SubId}|Steps], Msg, State) -> run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State). From 47955f4309a4b2e9158c56f51b44a42b161cbaf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Wed, 5 Sep 2018 15:18:26 +0800 Subject: [PATCH 02/10] fix bug in retain as published flag --- src/emqx_mqtt_types.erl | 4 ++-- src/emqx_session.erl | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/emqx_mqtt_types.erl b/src/emqx_mqtt_types.erl index 0b231fc88..6beb17780 100644 --- a/src/emqx_mqtt_types.erl +++ b/src/emqx_mqtt_types.erl @@ -32,8 +32,8 @@ -type(reason_code() :: 0..16#FF). -type(packet_id() :: 1..16#FFFF). -type(properties() :: #{atom() => term()}). --type(subopts() :: #{rh := 0 | 1, - rap := 0 | 1 | 2, +-type(subopts() :: #{rh := 0 | 1 | 2, + rap := 0 | 1, nl := 0 | 1, qos := qos(), rc => reason_code() diff --git a/src/emqx_session.erl b/src/emqx_session.erl index e7e63763c..e8f8ed249 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -726,7 +726,7 @@ run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State); run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) -> run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State); -run_dispatch_steps([{rap, false}|Steps], Msg = #message{flags = Flags}, State = #state{}) -> +run_dispatch_steps([{rap, 0}|Steps], Msg = #message{flags = Flags}, State = #state{}) -> Flags1 = maps:put(retain, false, Flags), run_dispatch_steps(Steps, Msg#message{flags = Flags1}, State); run_dispatch_steps([{rap, _}|Steps], Msg, State) -> From 9029ee29d33cfa07acfe98d6d7b52b4dfbbe9d1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Wed, 5 Sep 2018 18:03:28 +0800 Subject: [PATCH 03/10] Drop will msg when receive the DISCONNECT packet whose reason code is equal to 0x00 --- src/emqx_protocol.erl | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index ec104799e..013fa3c41 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -396,9 +396,15 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), process_packet(?PACKET(?PINGREQ), PState) -> send(?PACKET(?PINGRESP), PState); -process_packet(?PACKET(?DISCONNECT), PState) -> +process_packet(?DISCONNECT_PACKET(16#00), PState) -> %% Clean willmsg - {stop, normal, PState#pstate{will_msg = undefined}}. + {stop, normal, PState#pstate{will_msg = undefined}}; +process_packet(?DISCONNECT_PACKET(_), PState) -> + {stop, normal, PState}; +process_packet(Packet = ?PACKET(?DISCONNECT), PState) -> + if Packet#mqtt_packet.variable =:= undefined -> + {stop, normal, PState#pstate{will_msg = undefined}} + end. %%------------------------------------------------------------------------------ %% ConnAck --> Client From 42b3c9b4d6ce08e6e495fe03da68dce9ac1166d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Thu, 6 Sep 2018 14:47:34 +0800 Subject: [PATCH 04/10] Send DISCONNECT packet with reason code PROTOCOL_ERROR when topic is empty, add checks for topics --- src/emqx_protocol.erl | 3 +++ src/emqx_topic.erl | 13 ++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 013fa3c41..cee0af03a 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -208,6 +208,9 @@ received(Packet = ?PACKET(Type), PState) -> true -> {Packet1, PState1} = preprocess_properties(Packet, PState), process_packet(Packet1, inc_stats(recv, Type, PState1)); + {'EXIT', {topic_filters_invalid, _Stacktrace}} -> + deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), + {error, topic_filters_invalid, PState}; {'EXIT', {Reason, _Stacktrace}} -> deliver({disconnect, ?RC_MALFORMED_PACKET}, PState), {error, Reason, PState} diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index c244a40b3..3dcad0b33 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -184,9 +184,16 @@ parse(Topic = <<"$share/", _/binary>>, #{share := _Group}) -> error({invalid_topic, Topic}); parse(<<"$queue/", Topic1/binary>>, Options) -> parse(Topic1, maps:put(share, <<"$queue">>, Options)); -parse(<<"$share/", Topic1/binary>>, Options) -> - [Group, Topic2] = binary:split(Topic1, <<"/">>), - {Topic2, maps:put(share, Group, Options)}; +parse(Topic = <<"$share/", Topic1/binary>>, Options) -> + case binary:split(Topic1, <<"/">>) of + [<<>>] -> error({invalid_topic, Topic}); + [_] -> error({invalid_topic, Topic}); + [Group, Topic2] -> + case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of + nomatch -> error({invalid_topic, Topic}); + _ -> {Topic2, maps:put(share, Group, Options)} + end + end; parse(Topic, Options) -> {Topic, Options}. From 917eb8e29fd3bf43a22f869773671ec5eed83956 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Thu, 6 Sep 2018 17:17:09 +0800 Subject: [PATCH 05/10] Make DISCONNECT packet with reason code 0x00 when this packet doesn't have payload --- src/emqx_frame.erl | 3 +++ src/emqx_protocol.erl | 8 ++------ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 1a0332f5b..f3cd33ddb 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -76,6 +76,9 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, error(mqtt_frame_too_large); parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end}; +%% Match DISCONNECT without payload +parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, _Options) -> + wrap(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}, Rest); %% Match PINGREQ. parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> parse_frame(Rest, Header, 0, Options); diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index cee0af03a..4f4f4bfb8 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -399,15 +399,11 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), process_packet(?PACKET(?PINGREQ), PState) -> send(?PACKET(?PINGRESP), PState); -process_packet(?DISCONNECT_PACKET(16#00), PState) -> +process_packet(?DISCONNECT_PACKET(?RC_SUCCESS), PState) -> %% Clean willmsg {stop, normal, PState#pstate{will_msg = undefined}}; process_packet(?DISCONNECT_PACKET(_), PState) -> - {stop, normal, PState}; -process_packet(Packet = ?PACKET(?DISCONNECT), PState) -> - if Packet#mqtt_packet.variable =:= undefined -> - {stop, normal, PState#pstate{will_msg = undefined}} - end. + {stop, normal, PState}. %%------------------------------------------------------------------------------ %% ConnAck --> Client From f95c82e37a6061235988f3bd97f5782f54670140 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Thu, 6 Sep 2018 18:14:14 +0800 Subject: [PATCH 06/10] Add metric for DISCONNECT packet --- src/emqx_metrics.erl | 57 +++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index e319a425b..8130a307e 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -34,32 +34,33 @@ %% Packets sent and received of broker -define(PACKET_METRICS, [ - {counter, 'packets/received'}, % All Packets received - {counter, 'packets/sent'}, % All Packets sent - {counter, 'packets/connect'}, % CONNECT Packets received - {counter, 'packets/connack'}, % CONNACK Packets sent - {counter, 'packets/publish/received'}, % PUBLISH packets received - {counter, 'packets/publish/sent'}, % PUBLISH packets sent - {counter, 'packets/puback/received'}, % PUBACK packets received - {counter, 'packets/puback/sent'}, % PUBACK packets sent - {counter, 'packets/puback/missed'}, % PUBACK packets missed - {counter, 'packets/pubrec/received'}, % PUBREC packets received - {counter, 'packets/pubrec/sent'}, % PUBREC packets sent - {counter, 'packets/pubrec/missed'}, % PUBREC packets missed - {counter, 'packets/pubrel/received'}, % PUBREL packets received - {counter, 'packets/pubrel/sent'}, % PUBREL packets sent - {counter, 'packets/pubrel/missed'}, % PUBREL packets missed - {counter, 'packets/pubcomp/received'}, % PUBCOMP packets received - {counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent - {counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed - {counter, 'packets/subscribe'}, % SUBSCRIBE Packets received - {counter, 'packets/suback'}, % SUBACK packets sent - {counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received - {counter, 'packets/unsuback'}, % UNSUBACK Packets sent - {counter, 'packets/pingreq'}, % PINGREQ packets received - {counter, 'packets/pingresp'}, % PINGRESP Packets sent - {counter, 'packets/disconnect'}, % DISCONNECT Packets received - {counter, 'packets/auth'} % Auth Packets received + {counter, 'packets/received'}, % All Packets received + {counter, 'packets/sent'}, % All Packets sent + {counter, 'packets/connect'}, % CONNECT Packets received + {counter, 'packets/connack'}, % CONNACK Packets sent + {counter, 'packets/publish/received'}, % PUBLISH packets received + {counter, 'packets/publish/sent'}, % PUBLISH packets sent + {counter, 'packets/puback/received'}, % PUBACK packets received + {counter, 'packets/puback/sent'}, % PUBACK packets sent + {counter, 'packets/puback/missed'}, % PUBACK packets missed + {counter, 'packets/pubrec/received'}, % PUBREC packets received + {counter, 'packets/pubrec/sent'}, % PUBREC packets sent + {counter, 'packets/pubrec/missed'}, % PUBREC packets missed + {counter, 'packets/pubrel/received'}, % PUBREL packets received + {counter, 'packets/pubrel/sent'}, % PUBREL packets sent + {counter, 'packets/pubrel/missed'}, % PUBREL packets missed + {counter, 'packets/pubcomp/received'}, % PUBCOMP packets received + {counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent + {counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed + {counter, 'packets/subscribe'}, % SUBSCRIBE Packets received + {counter, 'packets/suback'}, % SUBACK packets sent + {counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received + {counter, 'packets/unsuback'}, % UNSUBACK Packets sent + {counter, 'packets/pingreq'}, % PINGREQ packets received + {counter, 'packets/pingresp'}, % PINGRESP Packets sent + {counter, 'packets/disconnect/received'}, % DISCONNECT Packets received + {counter, 'packets/disconnect/sent'}, % DISCONNECT Packets sent + {counter, 'packets/auth'} % Auth Packets received ]). %% Messages sent and received of broker @@ -194,7 +195,7 @@ received2(?UNSUBSCRIBE) -> received2(?PINGREQ) -> inc('packets/pingreq'); received2(?DISCONNECT) -> - inc('packets/disconnect'); + inc('packets/disconnect/received'); received2(_) -> ignore. qos_received(?QOS_0) -> @@ -233,6 +234,8 @@ sent2(?UNSUBACK) -> inc('packets/unsuback'); sent2(?PINGRESP) -> inc('packets/pingresp'); +sent2(?DISCONNECT) -> + inc('packets/disconnect/sent'); sent2(_Type) -> ignore. qos_sent(?QOS_0) -> From d819ec0b58a4170cc49d04aa6d9411ffe03426bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 7 Sep 2018 10:18:21 +0800 Subject: [PATCH 07/10] Comment unused function in emqx_frame.erl --- src/emqx_frame.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index f3cd33ddb..aa7aad064 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -236,8 +236,8 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, parse_will_message(Packet, Bin) -> {Packet, Bin}. -protocol_approved(Ver, Name) -> - lists:member({Ver, Name}, ?PROTOCOL_NAMES). +% protocol_approved(Ver, Name) -> +% lists:member({Ver, Name}, ?PROTOCOL_NAMES). parse_packet_id(<>) -> {PacketId, Rest}. From f8471afb97a1aa6e97e3d1406880ae3d9268ef0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 7 Sep 2018 13:50:12 +0800 Subject: [PATCH 08/10] Add handling of retain handling subscription option --- src/emqx_protocol.erl | 26 ++++++++++---------------- src/emqx_session.erl | 4 ++-- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 4f4f4bfb8..364cc0ec1 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -359,9 +359,15 @@ process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState}; process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = SPid, mountpoint = Mountpoint}) -> + PState = #pstate{session = SPid, mountpoint = Mountpoint, proto_ver = ProtoVer, is_bridge = IsBridge}) -> + RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 -> + case IsBridge of + true -> [{RawTopic, SubOpts#{rap => 1}} || {RawTopic, SubOpts} <- RawTopicFilters]; + false -> [{RawTopic, SubOpts#{rap => 0}} || {RawTopic, SubOpts} <- RawTopicFilters] + end + end, case check_subscribe( - parse_topic_filters(?SUBSCRIBE, RawTopicFilters), PState) of + parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of {ok, TopicFilters} -> case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of {ok, TopicFilters1} -> @@ -490,10 +496,10 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); -deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge = IsBridge, mountpoint = MountPoint}) -> +deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) -> _ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg), Msg1 = emqx_message:update_expiry(Msg), - Msg2 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg1)), + Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), send(emqx_packet:from_message(PacketId, Msg2), PState); deliver({puback, PacketId, ReasonCode}, PState) -> @@ -741,18 +747,6 @@ parse_topic_filters(?SUBSCRIBE, RawTopicFilters) -> parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) -> lists:map(fun emqx_topic:parse/1, RawTopicFilters). -%%----------------------------------------------------------------------------- -%% The retained flag should be propagated for bridge. -%%----------------------------------------------------------------------------- - -clean_retain(false, Msg = #message{flags = #{retain := true}, headers = Headers}) -> - case maps:get(retained, Headers, false) of - true -> Msg; - false -> emqx_message:set_flag(retain, false, Msg) - end; -clean_retain(_IsBridge, Msg) -> - Msg. - %%------------------------------------------------------------------------------ %% Update mountpoint diff --git a/src/emqx_session.erl b/src/emqx_session.erl index e8f8ed249..05142297e 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -448,11 +448,11 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, {ok, _SubOpts} -> emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), %% Why??? - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]), maps:put(Topic, SubOpts, SubMap); error -> emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => true}]), maps:put(Topic, SubOpts, SubMap) end} end, {[], Subscriptions}, TopicFilters), From 1326e8959321a49c231ddfdef5c26b3ca69c7dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 7 Sep 2018 15:16:32 +0800 Subject: [PATCH 09/10] Fix a bug in emqx_protocol.erl --- src/emqx_protocol.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 364cc0ec1..39929513e 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -364,7 +364,9 @@ process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), case IsBridge of true -> [{RawTopic, SubOpts#{rap => 1}} || {RawTopic, SubOpts} <- RawTopicFilters]; false -> [{RawTopic, SubOpts#{rap => 0}} || {RawTopic, SubOpts} <- RawTopicFilters] - end + end; + true -> + RawTopicFilters end, case check_subscribe( parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of From 6f6e24592bebc71a35370caf5b79fc3161cbcd74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 7 Sep 2018 18:32:03 +0800 Subject: [PATCH 10/10] Fix the reverse match --- src/emqx_topic.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 3dcad0b33..b3b417717 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -190,8 +190,8 @@ parse(Topic = <<"$share/", Topic1/binary>>, Options) -> [_] -> error({invalid_topic, Topic}); [Group, Topic2] -> case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of - nomatch -> error({invalid_topic, Topic}); - _ -> {Topic2, maps:put(share, Group, Options)} + nomatch -> {Topic2, maps:put(share, Group, Options)}; + _ -> error({invalid_topic, Topic}) end end; parse(Topic, Options) ->