diff --git a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl index 44c498405..64d95dc42 100644 --- a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl @@ -289,6 +289,67 @@ t_subscribe_inuse(_) -> with_connection(TopicIdInuseViaHttp), with_connection(SubscriptionInuseViaHttp). +t_receive_from_mqtt_publish(_) -> + with_connection(fun(Sock) -> + ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>), + ?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)), + + ok = send_subscribe_frame(Sock, 0, <<"/queue/foo">>), + ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)), + + %% send mqtt publish with content-type + Msg = emqx_message:make( + _From = from_testsuite, + _QoS = 1, + _Topic = <<"/queue/foo">>, + _Payload = <<"hello">>, + _Flags = #{}, + _Headers = #{properties => #{'Content-Type' => <<"application/json">>}} + ), + emqx:publish(Msg), + + {ok, Frame} = recv_a_frame(Sock), + ?assertEqual( + <<"application/json">>, + proplists:get_value(<<"content-type">>, Frame#stomp_frame.headers) + ), + + ?assertMatch( + #stomp_frame{ + command = <<"MESSAGE">>, + headers = _, + body = <<"hello">> + }, + Frame + ), + lists:foreach( + fun({Key, Val}) -> + Val = proplists:get_value(Key, Frame#stomp_frame.headers) + end, + [ + {<<"destination">>, <<"/queue/foo">>}, + {<<"subscription">>, <<"0">>} + ] + ), + + %% assert subscription stats + [ClientInfo1] = clients(), + ?assertMatch(#{subscriptions_cnt := 1}, ClientInfo1), + + %% Unsubscribe + ok = send_unsubscribe_frame(Sock, 0), + ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)), + + %% assert subscription stats + [ClientInfo2] = clients(), + ?assertMatch(#{subscriptions_cnt := 0}, ClientInfo2), + + ok = send_message_frame(Sock, <<"/queue/foo">>, <<"You will not receive this msg">>), + ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)), + + {error, timeout} = gen_tcp:recv(Sock, 0, 500) + end). + t_transaction(_) -> with_connection(fun(Sock) -> gen_tcp:send(