diff --git a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src index 1fda99700..bcc018ad4 100644 --- a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src +++ b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_stomp, [ {description, "Stomp Gateway"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index eef30b3dd..3577c0a60 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -508,9 +508,13 @@ handle_in( handle_out_and_update(receipt, receipt_id(Headers), NChannel1) end; {error, subscription_id_inused, NChannel} -> - ErrMsg = io_lib:format("Subscription id ~w is in used", [SubId]), + ErrMsg = io_lib:format("Subscription id ~s is in used", [SubId]), ErrorFrame = error_frame(receipt_id(Headers), ErrMsg), shutdown(subscription_id_inused, ErrorFrame, NChannel); + {error, topic_already_subscribed, NChannel} -> + ErrMsg = io_lib:format("Topic ~s already in subscribed", [Topic]), + ErrorFrame = error_frame(receipt_id(Headers), ErrMsg), + shutdown(topic_already_subscribed, ErrorFrame, NChannel); {error, acl_denied, NChannel} -> ErrMsg = io_lib:format("Insufficient permissions for ~s", [Topic]), ErrorFrame = error_frame(receipt_id(Headers), ErrMsg), @@ -695,12 +699,15 @@ check_subscribed_status( ) -> MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic), case lists:keyfind(SubId, 1, Subs) of - {SubId, MountedTopic, _Ack, _} -> - ok; - {SubId, _OtherTopic, _Ack, _} -> + {SubId, _MountedTopic, _Ack, _} -> {error, subscription_id_inused}; false -> - ok + case lists:keyfind(MountedTopic, 2, Subs) of + {_OtherSubId, MountedTopic, _Ack, _} -> + {error, topic_already_subscribed}; + false -> + ok + end end. check_sub_acl( @@ -826,13 +833,20 @@ handle_call( NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts} | Subs], NChannel1 = NChannel#channel{subscriptions = NSubs}, reply({ok, {MountedTopic, NSubOpts}}, [{event, updated}], NChannel1); - {error, ErrMsg, NChannel} -> + {error, ErrCode, NChannel} -> ?SLOG(error, #{ msg => "failed_to_subscribe_topic", topic => Topic, - reason => ErrMsg + reason => ErrCode }), - reply({error, ErrMsg}, NChannel) + ErrMsg = + case ErrCode of + subscription_id_inused -> + io_lib:format("Subscription id ~s is in used", [SubId]); + topic_already_subscribed -> + io_lib:format("Topic ~s already in subscribed", [Topic]) + end, + reply({error, lists:flatten(ErrMsg)}, NChannel) end end; handle_call( diff --git a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl index 47b191855..d9f0f4ce2 100644 --- a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl @@ -162,62 +162,28 @@ t_heartbeat(_) -> t_subscribe(_) -> with_connection(fun(Sock) -> - gen_tcp:send( - Sock, - serialize( - <<"CONNECT">>, - [ - {<<"accept-version">>, ?STOMP_VER}, - {<<"host">>, <<"127.0.0.1:61613">>}, - {<<"login">>, <<"guest">>}, - {<<"passcode">>, <<"guest">>}, - {<<"heart-beat">>, <<"0,0">>} - ] - ) - ), - {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, - #stomp_frame{ - command = <<"CONNECTED">>, - headers = _, - body = _ - }, - _, _} = parse(Data), + ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>), + ?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)), - %% Subscribe - gen_tcp:send( - Sock, - serialize( - <<"SUBSCRIBE">>, - [ - {<<"id">>, 0}, - {<<"destination">>, <<"/queue/foo">>}, - {<<"ack">>, <<"auto">>} - ] - ) - ), + ok = send_subscribe_frame(Sock, 0, <<"/queue/foo">>), + ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)), %% 'user-defined' header will be retain - gen_tcp:send( - Sock, - serialize( - <<"SEND">>, - [ - {<<"destination">>, <<"/queue/foo">>}, - {<<"user-defined">>, <<"emq">>} - ], - <<"hello">> - ) - ), + ok = send_message_frame(Sock, <<"/queue/foo">>, <<"hello">>, [ + {<<"user-defined">>, <<"emq">>} + ]), + ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)), - {ok, Data1} = gen_tcp:recv(Sock, 0, 1000), - {ok, - Frame = #stomp_frame{ + {ok, Frame} = recv_a_frame(Sock), + + ?assertMatch( + #stomp_frame{ command = <<"MESSAGE">>, headers = _, body = <<"hello">> }, - _, _} = parse(Data1), + Frame + ), lists:foreach( fun({Key, Val}) -> Val = proplists:get_value(Key, Frame#stomp_frame.headers) @@ -234,43 +200,80 @@ t_subscribe(_) -> ?assertMatch(#{subscriptions_cnt := 1}, ClientInfo1), %% Unsubscribe - gen_tcp:send( - Sock, - serialize( - <<"UNSUBSCRIBE">>, - [ - {<<"id">>, 0}, - {<<"receipt">>, <<"12345">>} - ] - ) - ), - - {ok, Data2} = gen_tcp:recv(Sock, 0, 1000), - - {ok, - #stomp_frame{ - command = <<"RECEIPT">>, - headers = [{<<"receipt-id">>, <<"12345">>}], - body = _ - }, - _, _} = parse(Data2), + ok = send_unsubscribe_frame(Sock, 0), + ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)), %% assert subscription stats [ClientInfo2] = clients(), ?assertMatch(#{subscriptions_cnt := 0}, ClientInfo2), - gen_tcp:send( - Sock, - serialize( - <<"SEND">>, - [{<<"destination">>, <<"/queue/foo">>}], - <<"You will not receive this msg">> - ) - ), + ok = send_message_frame(Sock, <<"/queue/foo">>, <<"You will not receive this msg">>), + ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)), {error, timeout} = gen_tcp:recv(Sock, 0, 500) end). +t_subscribe_inuse(_) -> + UsedTopic = <<"/queue/foo">>, + UsedSubId = <<"0">>, + Setup = + fun(Sock) -> + ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>), + ?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)), + ok = send_subscribe_frame(Sock, UsedSubId, UsedTopic), + ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)) + end, + TopicIdInuse = + fun(Sock) -> + Setup(Sock), + %% topic-id is in use + ok = send_subscribe_frame(Sock, UsedSubId, <<"/queue/bar">>), + + {ok, ErrorFrame} = recv_a_frame(Sock), + ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame), + ?assertEqual(<<"Subscription id 0 is in used">>, ErrorFrame#stomp_frame.body), + ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0)) + end, + + SubscriptionInuse = + fun(Sock) -> + Setup(Sock), + %% topic is in use + ok = send_subscribe_frame(Sock, 1, UsedTopic), + + {ok, ErrorFrame} = recv_a_frame(Sock), + ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame), + ?assertEqual(<<"Topic /queue/foo already in subscribed">>, ErrorFrame#stomp_frame.body), + ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0)) + end, + + TopicIdInuseViaHttp = + fun(Sock) -> + Setup(Sock), + %% assert subscription stats + [#{clientid := ClientId}] = clients(), + {error, ErrMsg} = create_subscription(ClientId, <<"/queue/bar">>, UsedSubId), + ?assertEqual(<<"Subscription id 0 is in used">>, ErrMsg), + + ok = send_disconnect_frame(Sock) + end, + + SubscriptionInuseViaHttp = + fun(Sock) -> + Setup(Sock), + %% assert subscription stats + [#{clientid := ClientId}] = clients(), + {error, ErrMsg} = create_subscription(ClientId, UsedTopic, <<"1">>), + ?assertEqual(<<"Topic /queue/foo already in subscribed">>, ErrMsg), + + ok = send_disconnect_frame(Sock) + end, + + with_connection(TopicIdInuse), + with_connection(SubscriptionInuse), + with_connection(TopicIdInuseViaHttp), + with_connection(SubscriptionInuseViaHttp). + t_transaction(_) -> with_connection(fun(Sock) -> gen_tcp:send( @@ -1072,6 +1075,7 @@ recv_a_frame(Sock) -> {ok, Frame, Rest, NParser} -> put(parser, NParser), put(rest, Rest), + ct:pal("recv_a_frame: ~p~n", [Frame]), {ok, Frame}; {error, _} = Err -> erase(parser), @@ -1124,11 +1128,23 @@ send_subscribe_frame(Sock, Id, Topic) -> ], ok = gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, Headers)). +send_unsubscribe_frame(Sock, Id) when is_integer(Id) -> + Headers = + [ + {<<"id">>, Id}, + {<<"receipt">>, <<"rp-", (integer_to_binary(Id))/binary>>} + ], + gen_tcp:send(Sock, serialize(<<"UNSUBSCRIBE">>, Headers)). + send_message_frame(Sock, Topic, Payload) -> + send_message_frame(Sock, Topic, Payload, []). + +send_message_frame(Sock, Topic, Payload, Headers0) -> Headers = [ {<<"destination">>, Topic}, {<<"receipt">>, <<"rp-", Topic/binary>>} + | Headers0 ], ok = gen_tcp:send(Sock, serialize(<<"SEND">>, Headers, Payload)). @@ -1142,3 +1158,17 @@ send_disconnect_frame(Sock, ReceiptId) -> clients() -> {200, Clients} = request(get, "/gateways/stomp/clients"), maps:get(data, Clients). + +create_subscription(ClientId, Topic, SubId) -> + Path = io_lib:format("/gateways/stomp/clients/~s/subscriptions", [ClientId]), + Body = #{ + topic => Topic, + qos => 1, + sub_props => #{subid => SubId} + }, + case request(post, Path, Body) of + {201, _} -> + ok; + {400, #{message := Message}} -> + {error, Message} + end. diff --git a/changes/ce/fix-11195.en.md b/changes/ce/fix-11195.en.md new file mode 100644 index 000000000..4c2d8b6b7 --- /dev/null +++ b/changes/ce/fix-11195.en.md @@ -0,0 +1 @@ +Avoid to create duplicated subscription by HTTP API or client in Stomp gateway