From 425bc2157e58a01a34c1992c2a1f8398dfa76bd5 Mon Sep 17 00:00:00 2001 From: erylee Date: Sat, 22 Dec 2012 16:42:32 +0800 Subject: [PATCH] add auth support --- bin/emqtt_ctl | 36 +++++++++++ etc/emqtt.config | 1 + src/.emqtt_frame.erl.swp | Bin 20480 -> 0 bytes src/emqtt_auth.erl | 67 ++++++++++++++++++++ src/emqtt_auth_anonymous.erl | 18 ++++++ src/emqtt_auth_internal.erl | 29 +++++++++ src/emqtt_client.erl | 10 +-- src/emqtt_ctl.erl | 5 ++ src/emqtt_router.erl | 107 ++++++++++++++++++++++++------- src/emqtt_sup.erl | 2 +- src/emqtt_topic.erl | 119 ----------------------------------- 11 files changed, 244 insertions(+), 150 deletions(-) delete mode 100644 src/.emqtt_frame.erl.swp create mode 100644 src/emqtt_auth.erl create mode 100644 src/emqtt_auth_anonymous.erl create mode 100644 src/emqtt_auth_internal.erl delete mode 100644 src/emqtt_topic.erl diff --git a/bin/emqtt_ctl b/bin/emqtt_ctl index edd24c2dc..b056316c1 100755 --- a/bin/emqtt_ctl +++ b/bin/emqtt_ctl @@ -111,11 +111,47 @@ case "$1" in $NODETOOL rpc emqtt_ctl cluster $@ ;; + add_user) + if [ $# -ne 3 ]; then + echo "Usage: $SCRIPT add_user " + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqtt is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqtt_ctl add_user $@ + ;; + + delete_user) + if [ $# -ne 2 ]; then + echo "Usage: $SCRIPT delete_user " + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqtt is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqtt_ctl delete_user $@ + ;; + *) echo "Usage: $SCRIPT" echo " status #query emqtt status" echo " cluster_info #query cluster nodes" echo " cluster #cluster node" + echo " add_user #add user" + echo " delete_user #delete user" exit 1 ;; diff --git a/etc/emqtt.config b/etc/emqtt.config index 4512e5848..c995ebc38 100644 --- a/etc/emqtt.config +++ b/etc/emqtt.config @@ -22,6 +22,7 @@ ]} ]}, {emqtt, [ + {auth, {internal, []}}, %internal, anonymous {listeners, [ {1883, [ binary, diff --git a/src/.emqtt_frame.erl.swp b/src/.emqtt_frame.erl.swp deleted file mode 100644 index 6c2212e6f44145ebc2606d4ccdb4d3037c977ad9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 20480 zcmeHNTZ|-C87@V+C?Ho86B7@!!R>~gp6*$eU1xh{GBZ27+u7}z-Rap|$kNtySNGIz zPgPS@J;TyF7-I|xA_)OC7(sjxqKOhBCO&{*4DbLe;w6d?1{5#rgF$@~ynX*Ur>ahM zSNF_W;o*WDRFfF9QZh ziV_Ya9Ej&Y+p7=nyiD6Rn$J;%EkoC_tFJm7UpM)ha3JA8!hwVX2?r7mBpgUMkZ>U3 z!2cx&yw(NUr!X^w`}{d7f8W#f`#$-8T>c*GD!)L!e?tB~(N!*Alb?hG2?r7mBpgUM zkZ>U3K*E890|^Hb4kR2%IFNAQf4~8~rfEAc?&B2jmYz@xx2a09RvxDvPm_}itL_E+F6@Lk|Z;9;NxoB$?)>i`B^ z1Y8LG<%62`L*NXs9eDEtpa-Y}dx1T`5b(Q8H0=k#7k~$V6TlSkQQ(b>HSK<27Wnf; zn)X$o0n~x*z?&Cp+Ao0zfeNq{xCr>e`!($a;4$DXpaJX!hJd$_2zddx8z=#n0xu%@ z@kQWH;8q|9{2d9E?*sP$W#GfWZ@^>X^SAxv%K6Ki^?#~O)3CgXSwod^HpAbFwT#xk z1v_OJt%}|>R}E^iA8%9iGO$gxY3NqPHC&h4EZKH5HAA1u_+r{tdSK%Ty13ZE6`-C`?SO6Wdz4;~EuQ1V}9yuD2t@ zsL~qh#PGj!*fpFv+{Vc;#%r(lbZ~QqQ6&qN;yNI1m)r*0^A2tfJ6LfWoPzBcF3En4 z7wwi=&G5$C!^S@OH}!8E=)M^FrryTedK7@>xNT&~v~=h6!~|#_=L3&+OFS?V644mT z-;6em?n)K(W;m+`nIcu{E>{gEuH3f?YTmCL`p=6|-a}-l=<)4Ux%c*8D5zqMD_2bd zJL#+($uU;2jG0!W(lo48p^zKP zXQ=iVY?yiqrXF;u^pat`e@=KvMzqTaIUxa{W^!L~@4o3GxZjt`U45v`Z%Ac0Qa6#E z72AQ->TQb{Otee(Md{|um-10N^JQgNwe^1FO-uybifbxY;f~=rwv%C|wTg|hR#8@Y z$O4@+wzdqpi4oZZn=DNos~nh}3-u&Qa)r}ekwjNwhE}yD+BPC4s6eidhA8y{v7a|b z(`-}R;{yl{5VkO5TOJn}{lTS|1(VM6iEi9-(`M^w;`ezz+?r`UKV+A?7vfEHRcp7X zmUMM?w_9@S5Y`+gpMHx0by9+tq&Ez)D)25~&dSFzY-<>UEC=L{jgozn3qv2eEZwp# zV?*f*k%+2_I58-xf6#@7)x|UQX2tb%&q#6cQDIiohHFOMw2ah&`J-3{${9AgC6^Zu zw9hXMR*BIq`J8x^QfU$yY?oW=;!#SYo=k=fl@1l%T5CwDvM^hoT{tqkTd2G?Y~0?m z9WQkYc3Dtuy2eQU1XcIHbIy){dy#pK3G>`(umsdid}0r zjg(S`&(~j%e{9BXop#K|vd2=@G#kz3b_|c^@}ulXX-%i)s%TZSj7>G0jMs9RBmDsC zfGsW?sKNQnb{nfk({91VwCl`6!8}^P`DkK{n!!vr(x)7rI_RmlpfBP6b?J~#W7AwQ zAw)QQ+58?hHFuotFU~GM9r!i7GcwmIas@gQTZdDDo2EtSZWZ7}M%~tf&%wFQ5f{xlPya}mDwoxZC0$}-U zo~4L|L$Yuvy^EceomjTV+HlvsWe{FBn+8}=HCi6@kos6eGsFfS;xsUYhVZKqC*1bZ zNkk3IvpIz&F%x#Mz0`zY6jJ1$W1>sH3;c$}f70Pf+kqM2D&R`s6~z7b0Q-T9f!`y(e;RlgI0NhlwgImrhJOwC z60iu20+#?kMJ)dl;A_BVfa5?O$N~RCJbxDW3Qz+6i5UJY@HlWUPzJ6At^wXgO#dA4 zP2lrD1E>R&z%cL;;K!KL%fMs6T>!=Ydx1Rw&38ADJP8L94kR2%IFNAQU2wo|d8uP* z5ek}a#l#-pFdX`X$n{uOqjE~_rF$X{l;?Nx^19x1dn=$kC(6k?B3HZ?=>+AIzI(h& zgpP#*54}@!@IP0A2tG~Of>hu!^HjndysOo)Mmk6&WVNn}L8KwG4|8KXCMI_E*5bkN z92+mNNf|LvAgs}}3k6D9$cz~)!|Sse2!@q0sTNOxjDmvu+4ddS^YkkDP?$vxQ+t-={nPEoYy)6|F#Mj?Gj9wjIcYb-S> zJtaw|1nTLgt=Ga&6B8oXEfgrkqi8NgqNlJiO)(sQLt+VUh)(#M3~s4d3x~xlLi(D> z`hXfa(=HDA35Q-5DHNxXUgB&EDdzRLxDJ;GR(qiJn}uBccE?gt+S__mGo7lz^QW@K#fW}>B9kwE}A#`RHq(+K0=!9aM17^+8g-2>B9y$~`JUunD zU&-1Ic@-SBd%BI+z3PmWcFoXZ@<#& zsh~Se|1LD=K(V}Uuz@QJD&^-~XpYQCiWg8*DBxePA&=pV?9kXK%>K+7c8C>*`JsYu zMM`(PWgzK@wecmo++Lcho?_!;Q1Y@+awN#%FEF*_@SH4*UyEbbA0Gd2h2!&>JpcFa z_rHW#|I0uFxDmJ-_%-7Fv%tN;F(3_`L#+Q0a1xjX{(%^u&iC&J4g)FRRmAob@81d( zfy;qkA-=y4m|Z8NT2PTo#OgLwzJ>>0r|z@15>b=!82~-8)t2RkVa#nVJqGiY4K^ z;tO_@Y1KXVD+fM<8XoYihjtQ;coL4V>iZJXb@5r8QR?^|OTMM!of_&_)+C*im2p(l z$F-sQ{^+P53>+@ps(;f#78S>-)W|=G)hO-h&x9wsHI3gS$#dS2cbpX;a{L1it0V(D z{6Yd9q^$dAt8^J58kwLDoi7JYe%HB6C74+!V7WM7_D>qAOYUHC0tD+jLVy&9!1?hg z+(?Tfoq!VBw4;=uQh!P~85(g$97R!c+K~K-$@dJ~nBbtQO0az@a;}X9LWn}jfjoD~ zNaaQg{}eK)w~si{Z>r-!QT_`N8@4w(2AVu0B~QjghSC4oHTN0QVHOtLm4SPSe)Ct zFndT{%R91yv=_}MXtTdujM7Pw?mzC?NV|$oLYyj1_tjh98LQ7e9Y z2j&fGH+u&vB9?x9+(aZUR@ifs$0@jUL zB=q(jMXiI<1F>{>O3<}UuUQrH2_cjqx4NLJsL*dMcAVaiJo%tHHoQ5%=M+}%gl6VD ze6-?srzZRc9lnl>Jx+8NSR#2IzTPPI#}$Xi + gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []). + +check(Username, Password) when is_binary(Username) -> + gen_server2:call(?MODULE, {check, Username, Password}). + +add(Username, Password) when is_binary(Username) -> + gen_server2:call(?MODULE, {add, Username, Password}). + +delete(Username) when is_binary(Username) -> + gen_server2:cast(?MODULE, {delete, Username}). + +init([]) -> + {ok, {Name, Opts}} = application:get_env(auth), + AuthMod = authmod(Name), + ok = AuthMod:init(Opts), + ?INFO("authmod is ~p", [AuthMod]), + ?INFO("~p is started", [?MODULE]), + {ok, #state{authmod=AuthMod, authopts=Opts}}. + +authmod(Name) when is_atom(Name) -> + list_to_atom(lists:concat(["emqtt_auth_", Name])). + +handle_call({check, Username, Password}, _From, #state{authmod=AuthMod} = State) -> + {reply, AuthMod:check(Username, Password), State}; + +handle_call({add, Username, Password}, _From, #state{authmod=AuthMod} = State) -> + {reply, AuthMod:add(Username, Password), State}; + +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. + +handle_cast({delete, Username}, #state{authmod=AuthMod} = State) -> + AuthMod:delete(Username), + {noreply, State}; + +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. + +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/emqtt_auth_anonymous.erl b/src/emqtt_auth_anonymous.erl new file mode 100644 index 000000000..f17dec334 --- /dev/null +++ b/src/emqtt_auth_anonymous.erl @@ -0,0 +1,18 @@ +-module(emqtt_auth_anonymous). + +-export([init/1, + add/2, + check/2, + delete/1]). + +init(_Opts) -> + ok. + +check(Username, _) when is_binary(Username) -> + true. + +add(Username, _Password) when is_binary(Username) -> + ok. + +delete(Username) when is_binary(Username) -> + ok. diff --git a/src/emqtt_auth_internal.erl b/src/emqtt_auth_internal.erl new file mode 100644 index 000000000..66b3a8e3e --- /dev/null +++ b/src/emqtt_auth_internal.erl @@ -0,0 +1,29 @@ +-module(emqtt_auth_internal). + +-include("emqtt.hrl"). + +-export([init/1, + add/2, + check/2, + delete/1]). + +init(_Opts) -> + mnesia:create_table(internal_user, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, internal_user)}]), + mnesia:add_table_copy(internal_user, node(), ram_copies), + ok. + +check(Username, Password) when is_binary(Username) -> + PasswdHash = crypto:md5(Password), + case mnesia:dirty_read(internal_user, Username) of + [#internal_user{passwdhash=PasswdHash}] -> true; + _ -> false + end. + +add(Username, Password) -> + mnesia:dirty_write(#internal_user{username=Username, passwdhash=crypto:md5(Password)}). + +delete(Username) -> + mnesia:dirty_delete(internal_user, Username). + diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index 8615d6679..4cc9eca20 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -68,7 +68,7 @@ handle_call({go, Sock}, _From, _State) -> parse_state = emqtt_frame:initial_state(), message_id = 1, subscriptions = dict:new(), - awaiting_ack = gb_tree:empty()})}. + awaiting_ack = gb_trees:empty()})}. handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. @@ -241,11 +241,8 @@ process_request(?SUBSCRIBE, [SupportedQos | QosList] end, [], Topics), - [emqtt_topic:insert(Name) || #mqtt_topic{name=Name} <- Topics], + [emqtt_router:subscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], - [emqtt_router:insert(#subscriber{topic=emqtt_util:binary(Name), pid=self()}) - || #mqtt_topic{name=Name} <- Topics], - send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK }, variable = #mqtt_frame_suback{ message_id = MessageId, @@ -261,8 +258,7 @@ process_request(?UNSUBSCRIBE, subscriptions = Subs0} = State) -> - [emqtt_router:delete(#subscriber{topic=Name, pid=self()}) - || #mqtt_topic{name=Name} <- Topics], + [emqtt_router:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK }, variable = #mqtt_frame_suback{ message_id = MessageId }}), diff --git a/src/emqtt_ctl.erl b/src/emqtt_ctl.erl index e70ff41c0..c6673aefa 100644 --- a/src/emqtt_ctl.erl +++ b/src/emqtt_ctl.erl @@ -26,3 +26,8 @@ cluster(Node) -> ?PRINT("failed to cluster with ~p~n", [Node]) end. +add_user(Username, Password) -> + ?PRINT("~p", [emqtt_auth:add(list_to_binary(Username), list_to_binary(Password))]). + +delete_user(Username) -> + ?PRINT("~p", [emqtt_auth:delete(list_to_binary(Username))]). diff --git a/src/emqtt_router.erl b/src/emqtt_router.erl index 461356621..00c413920 100644 --- a/src/emqtt_router.erl +++ b/src/emqtt_router.erl @@ -6,10 +6,9 @@ -export([start_link/0]). --export([route/1, - route/2, - insert/1, - delete/1]). +-export([subscribe/2, + unsubscribe/2, + publish/2]). -behaviour(gen_server). @@ -23,41 +22,72 @@ -record(state, {}). start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []). -binary(S) when is_list(S) -> - list_to_binary(S); +subscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) -> + gen_server2:call(?MODULE, {subscribe, Topic, Client}). -binary(B) when is_binary(B) -> - B. +unsubscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) -> + gen_server2:cast(?MODULE, {unsubscribe, Topic, Client}). -route(#mqtt_msg{topic=Topic}=Msg) when is_record(Msg, mqtt_msg) -> - error_logger:info_msg("route msg: ~p~n", [Msg]), - [ Pid ! {route, Msg} || #subscriber{pid=Pid} <- ets:lookup(subscriber, binary(Topic)) ]. +publish(Topic, Msg) when is_binary(Topic) and is_record(Msg, mqtt_msg) -> + [ + [Client ! {route, Msg} || #subscriber{client=Client} <- ets:lookup(subscriber, Path)] + || #topic{path=Path} <- match(Topic)]. -route(Topic, Msg) -> - [ Pid ! {route, Msg} || #subscriber{pid=Pid} <- ets:lookup(subscriber, Topic) ]. -insert(Sub) when is_record(Sub, subscriber) -> - gen_server:call(?MODULE, {insert, Sub}). - -delete(Sub) when is_record(Sub, subscriber) -> - gen_server:cast(?MODULE, {delete, Sub}). +match(Topic) when is_binary(Topic) -> + Words = topic_split(Topic), + DirectMatches = mnesia:dirty_read(direct_topic, Words), + WildcardMatches = lists:append([ + mnesia:dirty_read(wildcard_topic, Key) || + Key <- mnesia:dirty_all_keys(wildcard_topic), + topic_match(Words, Key) + ]), + DirectMatches ++ WildcardMatches. init([]) -> + mnesia:create_table( + direct_topic, [ + {record_name, topic}, + {ram_copies, [node()]}, + {attributes, record_info(fields, topic)}]), + mnesia:add_table_copy(direct_topic, node(), ram_copies), + mnesia:create_table( + wildcard_topic, [ + {record_name, topic}, + {ram_copies, [node()]}, + {attributes, record_info(fields, topic)}]), + mnesia:add_table_copy(wildcard_topic, node(), ram_copies), ets:new(subscriber, [bag, named_table, {keypos, 2}]), ?INFO_MSG("emqtt_router is started."), {ok, #state{}}. -handle_call({insert, Sub}, _From, State) -> - ets:insert(subscriber, Sub), +handle_call({subscribe, Topic, Client}, _From, State) -> + Words = topic_split(Topic), + case topic_type(Words) of + direct -> + ok = mnesia:dirty_write(direct_topic, #topic{words=Words, path=Topic}); + wildcard -> + ok = mnesia:dirty_write(wildcard_topic, #topic{words=Words, path=Topic}) + end, + ets:insert(subscriber, #subscriber{topic=Topic, client=Client}), {reply, ok, State}; handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. -handle_cast({delete, Sub}, State) -> - ets:delete_object(subscriber, Sub), +handle_cast({unsubscribe, Topic, Client}, State) -> + ets:delete_object(subscriber, #subscriber{topic=Topic, client=Client}), + %TODO: how to remove topic + % + %Words = topic_split(Topic), + %case topic_type(Words) of + %direct -> + % mnesia:dirty_delete(direct_topic, #topic{words=Words, path=Topic}); + %wildcard -> + % mnesia:direct_delete(wildcard_topic, #topic{words=Words, path=Topic}) + %end, {noreply, State}; handle_cast(Msg, State) -> @@ -72,4 +102,35 @@ terminate(_Reason, _State) -> code_change(_OldVsn, _State, _Extra) -> ok. +%-------------------------------------- +% internal functions +%-------------------------------------- + +topic_type([]) -> + direct; +topic_type([<<"#">>]) -> + wildcard; +topic_type([<<"+">>|_T]) -> + wildcard; +topic_type([_|T]) -> + topic_type(T). + +topic_match([], []) -> + true; + +topic_match([H|T1], [H|T2]) -> + topic_match(T1, T2); + +topic_match([_H|T1], [<<"+">>|T2]) -> + topic_match(T1, T2); + +topic_match(_, [<<"#">>]) -> + true; + +topic_match([], [_H|_T2]) -> + false. + +topic_split(S) -> + binary:split(S, [<<"/">>], [global]). + diff --git a/src/emqtt_sup.erl b/src/emqtt_sup.erl index 4d12fdf35..ed9f44148 100644 --- a/src/emqtt_sup.erl +++ b/src/emqtt_sup.erl @@ -26,7 +26,7 @@ start_link(Listeners) -> init([Listeners]) -> {ok, { {one_for_all, 5, 10}, [ - ?CHILD(emqtt_topic, worker), + ?CHILD(emqtt_auth, worker), ?CHILD(emqtt_router, worker), ?CHILD(emqtt_client_sup, supervisor) | listener_children(Listeners) ]} diff --git a/src/emqtt_topic.erl b/src/emqtt_topic.erl deleted file mode 100644 index f287e4d84..000000000 --- a/src/emqtt_topic.erl +++ /dev/null @@ -1,119 +0,0 @@ - --module(emqtt_topic). - --include("emqtt.hrl"). - --export([start_link/0, - match/1, - insert/1, - delete/1]). - --behaviour(gen_server). - --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). - --record(state, {}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -match(Topic0) -> - Topic = emqtt_util:binary(Topic0), - Words = topic_split(Topic), - DirectMatches = mnesia:dirty_read(direct_topic, Words), - WildcardMatches = lists:append([ - mnesia:dirty_read(wildcard_topic, Key) || - Key <- mnesia:dirty_all_keys(wildcard_topic), - topic_match(Words, Key) - ]), - DirectMatches ++ WildcardMatches. - -insert(Topic) -> - gen_server:call(?MODULE, {insert, emqtt_util:binary(Topic)}). - -delete(Topic) -> - gen_server:cast(?MODULE, {delete, emqtt_util:binary(Topic)}). - -init([]) -> - {atomic, ok} = mnesia:create_table( - direct_topic, [ - {record_name, topic}, - {ram_copies, [node()]}, - {attributes, record_info(fields, topic)}]), - {atomic, ok} = mnesia:create_table( - wildcard_topic, [ - {record_name, topic}, - {ram_copies, [node()]}, - {attributes, record_info(fields, topic)}]), - ?INFO_MSG("emqtt_topic is started."), - {ok, #state{}}. - -handle_call({insert, Topic}, _From, State) -> - Words = topic_split(Topic), - Reply = - case topic_type(Words) of - direct -> - mnesia:dirty_write(direct_topic, #topic{words=Words, path=Topic}); - wildcard -> - mnesia:dirty_write(wildcard_topic, #topic{words=Words, path=Topic}) - end, - {reply, Reply, State}; - -handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. - -handle_cast({delete, Topic}, State) -> - Words = topic_split(Topic), - case topic_type(Words) of - direct -> - mnesia:dirty_delete(direct_topic, #topic{words=Words, path=Topic}); - wildcard -> - mnesia:direct_delete(wildcard_topic, #topic{words=Words, path=Topic}) - end, - {noreply, State}; - -handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. - -handle_info(Info, State) -> - {stop, {badinfo, Info}, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, _State, _Extra) -> - ok. - -topic_type([]) -> - direct; -topic_type([<<"#">>]) -> - wildcard; -topic_type([<<"+">>|_T]) -> - wildcard; -topic_type([_|T]) -> - topic_type(T). - -topic_match([], []) -> - true; - -topic_match([H|T1], [H|T2]) -> - topic_match(T1, T2); - -topic_match([_H|T1], [<<"+">>|T2]) -> - topic_match(T1, T2); - -topic_match(_, [<<"#">>]) -> - true; - -topic_match([], [_H|_T2]) -> - false. - -topic_split(S) -> - binary:split(S, [<<"/">>], [global]). - -