From 3530ed66d508897a04f7843df106dd02436f2d35 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 28 Nov 2023 19:28:53 +0200 Subject: [PATCH] fix(emqx_gateway_coap): take subscribe options from uri_query Closes #12031 --- .../src/emqx_coap_pubsub_handler.erl | 10 +++-- .../test/emqx_coap_SUITE.erl | 39 +++++++++++++++++++ changes/ce/fix-12048.en.md | 1 + 3 files changed, 46 insertions(+), 4 deletions(-) create mode 100644 changes/ce/fix-12048.en.md diff --git a/apps/emqx_gateway_coap/src/emqx_coap_pubsub_handler.erl b/apps/emqx_gateway_coap/src/emqx_coap_pubsub_handler.erl index 3070ea891..cbb5d7c94 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_pubsub_handler.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_pubsub_handler.erl @@ -74,14 +74,16 @@ check_topic([]) -> check_topic(Path) -> {ok, emqx_http_lib:uri_decode(iolist_to_binary(lists:join(<<"/">>, Path)))}. -get_sub_opts(#coap_message{options = Opts} = Msg) -> - SubOpts = maps:fold(fun parse_sub_opts/3, #{}, Opts), +get_sub_opts(Msg) -> + SubOpts = maps:fold( + fun parse_sub_opts/3, #{}, emqx_coap_message:get_option(uri_query, Msg, #{}) + ), case SubOpts of #{qos := _} -> - maps:merge(SubOpts, ?SUBOPTS); + maps:merge(?SUBOPTS, SubOpts); _ -> CfgType = emqx_conf:get([gateway, coap, subscribe_qos], ?QOS_0), - maps:merge(SubOpts, ?SUBOPTS#{qos => type_to_qos(CfgType, Msg)}) + maps:merge(?SUBOPTS#{qos => type_to_qos(CfgType, Msg)}, SubOpts) end. parse_sub_opts(<<"qos">>, V, Opts) -> diff --git a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl index c066b84ff..8f93164b5 100644 --- a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl @@ -345,6 +345,45 @@ t_subscribe(_) -> Topics ). +t_subscribe_with_qos_opt(_) -> + Topics = [ + {<<"abc">>, 0}, + {<<"/abc">>, 1}, + {<<"abc/d">>, 2} + ], + Fun = fun({Topic, Qos}, Channel, Token) -> + Payload = <<"123">>, + URI = pubsub_uri(binary_to_list(Topic), Token) ++ "&qos=" ++ integer_to_list(Qos), + Req = make_req(get, Payload, [{observe, 0}]), + {ok, content, _} = do_request(Channel, URI, Req), + ?LOGT("observer topic:~ts~n", [Topic]), + + %% ensure subscribe succeed + timer:sleep(100), + [SubPid] = emqx:subscribers(Topic), + ?assert(is_pid(SubPid)), + ?assertEqual(Qos, maps:get(qos, emqx_broker:get_subopts(SubPid, Topic))), + %% publish a message + emqx:publish(emqx_message:make(Topic, Payload)), + {ok, content, Notify} = with_response(Channel), + ?LOGT("observer get Notif=~p", [Notify]), + + #coap_content{payload = PayloadRecv} = Notify, + + ?assertEqual(Payload, PayloadRecv) + end, + + with_connection(Topics, Fun), + + %% subscription removed if coap client disconnected + timer:sleep(100), + lists:foreach( + fun({Topic, _Qos}) -> + ?assertEqual([], emqx:subscribers(Topic)) + end, + Topics + ). + t_un_subscribe(_) -> %% can unsubscribe to a normal topic Topics = [ diff --git a/changes/ce/fix-12048.en.md b/changes/ce/fix-12048.en.md new file mode 100644 index 000000000..cecd62222 --- /dev/null +++ b/changes/ce/fix-12048.en.md @@ -0,0 +1 @@ +Fix COAP gateway bug that caused it to ignore subscription options.