diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl index ba5413dfc..a75c90c4d 100644 --- a/apps/emqx_stomp/src/emqx_stomp_connection.erl +++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl @@ -349,8 +349,7 @@ handle_info({deliver, _Topic, Msg}, State = #state{pstate = PState}) -> end}); handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), - noreply(State). + with_proto(handle_info, [Info], State). terminate(Reason, #state{transport = Transport, socket = Sock, @@ -374,6 +373,8 @@ code_change(_OldVsn, State, _Extra) -> with_proto(Fun, Args, State = #state{pstate = PState}) -> case erlang:apply(emqx_stomp_protocol, Fun, Args ++ [PState]) of + ok -> + noreply(State); {ok, NPState} -> noreply(State#state{pstate = NPState}); {F, Reason, NPState} when F == stop; diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index dc02ac232..c2851895b 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -40,6 +40,9 @@ , timeout/3 ]). +-export([ handle_info/2 + ]). + %% for trans callback -export([ handle_recv_send_frame/2 , handle_recv_ack_frame/2 @@ -60,7 +63,7 @@ %% Transaction transaction :: #{binary() => list()}, %% Subscriptions - subscriptions = [], + subscriptions = #{}, %% Send function sendfun :: function(), %% Heartbeat function @@ -109,7 +112,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, sockport => SockPort, clientid => undefined, username => undefined, - mountpoint => undefined, + mountpoint => undefined, %% XXX: not supported now is_bridge => false, is_superuser => false }, @@ -183,10 +186,7 @@ info(will_msg, _) -> undefined. session_info(#pstate{conninfo = ConnInfo, subscriptions = Subs}) -> - NSubs = lists:foldl(fun({_Id, Topic, _Ack}, Acc) -> - Acc#{Topic => ?DEFAULT_SUBOPTS} - end, #{}, Subs), - #{subscriptions => NSubs, + #{subscriptions => Subs, upgrade_qos => false, retry_interval => 0, await_rel_timeout => 0, @@ -195,7 +195,7 @@ session_info(#pstate{conninfo = ConnInfo, subscriptions = Subs}) -> -spec stats(pstate()) -> emqx_types:stats(). stats(#pstate{subscriptions = Subs}) -> - [{subscriptions_cnt, length(Subs)}, + [{subscriptions_cnt, maps:size(Subs)}, {subscriptions_max, 0}, {inflight_cnt, 0}, {inflight_max, 0}, @@ -284,29 +284,23 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, Topic = header(<<"destination">>, Headers), Ack = header(<<"ack">>, Headers, <<"auto">>), - case lists:keyfind(Id, 1, Subs) of - {Id, Topic, Ack} -> + case find_sub_by_id(Id, Subs) of + {Topic, #{sub_props := #{id := Id, ack := Ack}}} -> ?LOG(info, "Subscription has established: ~s", [Topic]), maybe_send_receipt(receipt_id(Headers), State); - false -> + undefined -> case check_acl(subscribe, Topic, State) of allow -> ClientInfo = State#pstate.clientinfo, - ClientId = maps:get(clientid, ClientInfo), - %% XXX: We don't parse the request topic name or filter - %% which meaning stomp does not support shared - %% subscription + + [{TopicFilter, SubOpts}] = parse_topic_filters( + [{Topic, ?DEFAULT_SUBOPTS} + ]), + NSubOpts = SubOpts#{sub_props => #{id => Id, ack => Ack}}, _ = run_hooks('client.subscribe', [ClientInfo, _SubProps = #{}], - [{Topic, _TopicOpts = #{}}]), - - emqx_broker:subscribe(Topic, ClientId), - - SubOpts = ?DEFAULT_SUBOPTS#{is_new => true}, - _ = run_hooks('session.subscribed', - [ClientInfo, Topic, SubOpts]), - - NState = put_subs({Id, Topic, Ack}, State), + [{TopicFilter, NSubOpts}]), + NState = do_subscribe(TopicFilter, NSubOpts, State), maybe_send_receipt(receipt_id(Headers), NState) end end; @@ -314,22 +308,17 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers}, State = #pstate{subscriptions = Subs, clientinfo = ClientInfo}) -> Id = header(<<"id">>, Headers), - {ok, State1} = case lists:keyfind(Id, 1, Subs) of - {Id, Topic, _Ack} -> + {ok, NState} = case find_sub_by_id(Id, Subs) of + {Topic, #{sub_props := #{id := Id}}} -> _ = run_hooks('client.unsubscribe', [ClientInfo, #{}], [{Topic, #{}}]), - - ok = emqx_broker:unsubscribe(Topic), - - _ = run_hooks('session.unsubscribe', - [ClientInfo, Topic, ?DEFAULT_SUBOPTS]), - - {ok, remove_subs(Id, State)}; - false -> + State1 = do_unsubscribe(Topic, ?DEFAULT_SUBOPTS, State), + {ok, State1}; + undefined -> {ok, State} end, - maybe_send_receipt(receipt_id(Headers), State1); + maybe_send_receipt(receipt_id(Headers), NState); %% ACK %% id:12345 @@ -407,8 +396,8 @@ received(#stomp_frame{command = <<"DISCONNECT">>, headers = Headers}, State) -> send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, State = #pstate{subscriptions = Subs}) -> - case lists:keyfind(Topic, 2, Subs) of - {Id, Topic, Ack} -> + case find_sub_by_topic(Topic, Subs) of + {Topic, #{sub_props := #{id := Id, ack := Ack}}} -> Headers0 = [{<<"subscription">>, Id}, {<<"message-id">>, next_msgid()}, {<<"destination">>, Topic}, @@ -423,7 +412,7 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, headers = Headers1 ++ maps:get(stomp_headers, Headers, []), body = Payload}, send(Frame, State); - false -> + undefined -> ?LOG(error, "Stomp dropped: ~p", [Msg]), {error, dropped, State} end; @@ -466,6 +455,27 @@ timeout(_TRef, clean_trans, State = #pstate{transaction = Trans}) -> NTrans = maps:filter(fun(_, {Ts, _}) -> Ts + ?TRANS_TIMEOUT < Now end, Trans), {ok, ensure_clean_trans_timer(State#pstate{transaction = NTrans})}. + +-spec(handle_info(Info :: term(), pstate()) + -> ok | {ok, pstate()} | {shutdown, Reason :: term(), pstate()}). + +handle_info({subscribe, TopicFilters}, State) -> + NState = lists:foldl( + fun({TopicFilter, SubOpts}, StateAcc) -> + do_subscribe(TopicFilter, SubOpts#{is_new => true}, StateAcc) + end, State, parse_topic_filters(TopicFilters)), + {ok, NState}; + +handle_info({unsubscribe, TopicFilters}, State) -> + NState = lists:foldl(fun({TopicFilter, SubOpts}, StateAcc) -> + do_unsubscribe(TopicFilter, SubOpts, StateAcc) + end, State, parse_topic_filters(TopicFilters)), + {ok, NState}; + +handle_info(Info, State) -> + ?LOG(warning, "Unexpected info ~p", [Info]), + {ok, State}. + negotiate_version(undefined) -> {ok, <<"1.0">>}; negotiate_version(Accepts) -> @@ -612,7 +622,10 @@ start_heartbeart_timer(Heartbeats, State) -> State#pstate{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}). %%-------------------------------------------------------------------- -%% ... +%% pub & sub helpers + +parse_topic_filters(TopicFilters) -> + lists:map(fun emqx_topic:parse/1, TopicFilters). check_acl(PubSub, Topic, State = #pstate{clientinfo = ClientInfo}) -> case is_acl_enabled(State) andalso @@ -621,19 +634,47 @@ check_acl(PubSub, Topic, State = #pstate{clientinfo = ClientInfo}) -> Res -> Res end. -put_subs({Id, Topic, Ack}, State = #pstate{subscriptions = Subs}) -> - State#pstate{subscriptions = lists:keystore(Id, 1, Subs, {Id, Topic, Ack})}. +do_subscribe(TopicFilter, SubOpts, + State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) -> + ClientId = maps:get(clientid, ClientInfo), + _ = emqx_broker:subscribe(TopicFilter, ClientId), + NSubOpts = SubOpts#{is_new => true}, + _ = run_hooks('session.subscribed', + [ClientInfo, TopicFilter, NSubOpts]), -remove_subs(Id, State = #pstate{subscriptions = Subs}) -> - State#pstate{subscriptions = lists:keydelete(Id, 1, Subs)}. + State#pstate{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}. -%%-------------------------------------------------------------------- -%% ... +do_unsubscribe(TopicFilter, SubOpts, + State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) -> + ok = emqx_broker:unsubscribe(TopicFilter), + _ = run_hooks('session.unsubscribe', + [ClientInfo, TopicFilter, SubOpts]), + + State#pstate{subscriptions = maps:remove(TopicFilter, Subs)}. + +find_sub_by_topic(Topic, Subs) -> + case maps:get(Topic, Subs, undefined) of + undefined -> undefined; + SubOpts -> {Topic, SubOpts} + end. + +find_sub_by_id(Id, Subs) -> + Found = maps:filter(fun(_, SubOpts) -> + %% FIXME: datatype?? + maps:get(id, maps:get(sub_props, SubOpts, #{}), -1) == Id + end, Subs), + case maps:to_list(Found) of + [] -> undefined; + [Sub|_] -> Sub + end. is_acl_enabled(_) -> %% TODO: configs from somewhere true. +%%-------------------------------------------------------------------- +%% helpers + default_user(#pstate{default_user = DefaultUser}) -> DefaultUser. allow_anonymous(#pstate{allow_anonymous = AllowAnonymous}) ->