From 95f3e49edb586c771b83c27a34be73f62a81bcf5 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sat, 20 Apr 2024 08:33:55 +0800 Subject: [PATCH 1/3] fix(stomp): pass the Content-Type from the MQTT message --- apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src | 2 +- apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src index 08214aee2..c7c9b6143 100644 --- a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src +++ b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_stomp, [ {description, "Stomp Gateway"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 20d769378..71458f15e 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -1039,7 +1039,7 @@ handle_deliver( {<<"subscription">>, Id}, {<<"message-id">>, next_msgid()}, {<<"destination">>, emqx_message:topic(NMessage)}, - {<<"content-type">>, <<"text/plain">>} + {<<"content-type">>, content_type_from_mqtt_message(NMessage)} ], Headers1 = case Ack of @@ -1080,6 +1080,13 @@ handle_deliver( ), {ok, [{outgoing, lists:reverse(Frames0)}], Channel}. +content_type_from_mqtt_message(Message) -> + Properties = emqx_message:get_header(properties, Message, #{}), + case maps:get('Content-Type', Properties, undefined) of + undefined -> <<"text/plain">>; + ContentType -> ContentType + end. + %%-------------------------------------------------------------------- %% Handle timeout %%-------------------------------------------------------------------- From 5520e54147b5cf69532fd0f7d7dd65bbf01fcca0 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sat, 20 Apr 2024 08:59:36 +0800 Subject: [PATCH 2/3] chore: add tests --- .../test/emqx_stomp_SUITE.erl | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl index 44c498405..64d95dc42 100644 --- a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl @@ -289,6 +289,67 @@ t_subscribe_inuse(_) -> with_connection(TopicIdInuseViaHttp), with_connection(SubscriptionInuseViaHttp). +t_receive_from_mqtt_publish(_) -> + with_connection(fun(Sock) -> + ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>), + ?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)), + + ok = send_subscribe_frame(Sock, 0, <<"/queue/foo">>), + ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)), + + %% send mqtt publish with content-type + Msg = emqx_message:make( + _From = from_testsuite, + _QoS = 1, + _Topic = <<"/queue/foo">>, + _Payload = <<"hello">>, + _Flags = #{}, + _Headers = #{properties => #{'Content-Type' => <<"application/json">>}} + ), + emqx:publish(Msg), + + {ok, Frame} = recv_a_frame(Sock), + ?assertEqual( + <<"application/json">>, + proplists:get_value(<<"content-type">>, Frame#stomp_frame.headers) + ), + + ?assertMatch( + #stomp_frame{ + command = <<"MESSAGE">>, + headers = _, + body = <<"hello">> + }, + Frame + ), + lists:foreach( + fun({Key, Val}) -> + Val = proplists:get_value(Key, Frame#stomp_frame.headers) + end, + [ + {<<"destination">>, <<"/queue/foo">>}, + {<<"subscription">>, <<"0">>} + ] + ), + + %% assert subscription stats + [ClientInfo1] = clients(), + ?assertMatch(#{subscriptions_cnt := 1}, ClientInfo1), + + %% Unsubscribe + ok = send_unsubscribe_frame(Sock, 0), + ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)), + + %% assert subscription stats + [ClientInfo2] = clients(), + ?assertMatch(#{subscriptions_cnt := 0}, ClientInfo2), + + ok = send_message_frame(Sock, <<"/queue/foo">>, <<"You will not receive this msg">>), + ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)), + + {error, timeout} = gen_tcp:recv(Sock, 0, 500) + end). + t_transaction(_) -> with_connection(fun(Sock) -> gen_tcp:send( From 2e21bc51806903657ead4196c387dd2848c560c9 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sat, 20 Apr 2024 09:03:13 +0800 Subject: [PATCH 3/3] chore: update changes --- changes/ce/fix-12902.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/fix-12902.md diff --git a/changes/ce/fix-12902.md b/changes/ce/fix-12902.md new file mode 100644 index 000000000..83409ee6d --- /dev/null +++ b/changes/ce/fix-12902.md @@ -0,0 +1 @@ +Pass the Content-type of MQTT message to the Stomp message.