fix(emqx_gateway_coap): take subscribe options from uri_query
Closes #12031
This commit is contained in:
parent
d93fdd8e9b
commit
3530ed66d5
|
@ -74,14 +74,16 @@ check_topic([]) ->
|
||||||
check_topic(Path) ->
|
check_topic(Path) ->
|
||||||
{ok, emqx_http_lib:uri_decode(iolist_to_binary(lists:join(<<"/">>, Path)))}.
|
{ok, emqx_http_lib:uri_decode(iolist_to_binary(lists:join(<<"/">>, Path)))}.
|
||||||
|
|
||||||
get_sub_opts(#coap_message{options = Opts} = Msg) ->
|
get_sub_opts(Msg) ->
|
||||||
SubOpts = maps:fold(fun parse_sub_opts/3, #{}, Opts),
|
SubOpts = maps:fold(
|
||||||
|
fun parse_sub_opts/3, #{}, emqx_coap_message:get_option(uri_query, Msg, #{})
|
||||||
|
),
|
||||||
case SubOpts of
|
case SubOpts of
|
||||||
#{qos := _} ->
|
#{qos := _} ->
|
||||||
maps:merge(SubOpts, ?SUBOPTS);
|
maps:merge(?SUBOPTS, SubOpts);
|
||||||
_ ->
|
_ ->
|
||||||
CfgType = emqx_conf:get([gateway, coap, subscribe_qos], ?QOS_0),
|
CfgType = emqx_conf:get([gateway, coap, subscribe_qos], ?QOS_0),
|
||||||
maps:merge(SubOpts, ?SUBOPTS#{qos => type_to_qos(CfgType, Msg)})
|
maps:merge(?SUBOPTS#{qos => type_to_qos(CfgType, Msg)}, SubOpts)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
parse_sub_opts(<<"qos">>, V, Opts) ->
|
parse_sub_opts(<<"qos">>, V, Opts) ->
|
||||||
|
|
|
@ -345,6 +345,45 @@ t_subscribe(_) ->
|
||||||
Topics
|
Topics
|
||||||
).
|
).
|
||||||
|
|
||||||
|
t_subscribe_with_qos_opt(_) ->
|
||||||
|
Topics = [
|
||||||
|
{<<"abc">>, 0},
|
||||||
|
{<<"/abc">>, 1},
|
||||||
|
{<<"abc/d">>, 2}
|
||||||
|
],
|
||||||
|
Fun = fun({Topic, Qos}, Channel, Token) ->
|
||||||
|
Payload = <<"123">>,
|
||||||
|
URI = pubsub_uri(binary_to_list(Topic), Token) ++ "&qos=" ++ integer_to_list(Qos),
|
||||||
|
Req = make_req(get, Payload, [{observe, 0}]),
|
||||||
|
{ok, content, _} = do_request(Channel, URI, Req),
|
||||||
|
?LOGT("observer topic:~ts~n", [Topic]),
|
||||||
|
|
||||||
|
%% ensure subscribe succeed
|
||||||
|
timer:sleep(100),
|
||||||
|
[SubPid] = emqx:subscribers(Topic),
|
||||||
|
?assert(is_pid(SubPid)),
|
||||||
|
?assertEqual(Qos, maps:get(qos, emqx_broker:get_subopts(SubPid, Topic))),
|
||||||
|
%% publish a message
|
||||||
|
emqx:publish(emqx_message:make(Topic, Payload)),
|
||||||
|
{ok, content, Notify} = with_response(Channel),
|
||||||
|
?LOGT("observer get Notif=~p", [Notify]),
|
||||||
|
|
||||||
|
#coap_content{payload = PayloadRecv} = Notify,
|
||||||
|
|
||||||
|
?assertEqual(Payload, PayloadRecv)
|
||||||
|
end,
|
||||||
|
|
||||||
|
with_connection(Topics, Fun),
|
||||||
|
|
||||||
|
%% subscription removed if coap client disconnected
|
||||||
|
timer:sleep(100),
|
||||||
|
lists:foreach(
|
||||||
|
fun({Topic, _Qos}) ->
|
||||||
|
?assertEqual([], emqx:subscribers(Topic))
|
||||||
|
end,
|
||||||
|
Topics
|
||||||
|
).
|
||||||
|
|
||||||
t_un_subscribe(_) ->
|
t_un_subscribe(_) ->
|
||||||
%% can unsubscribe to a normal topic
|
%% can unsubscribe to a normal topic
|
||||||
Topics = [
|
Topics = [
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fix COAP gateway bug that caused it to ignore subscription options.
|
Loading…
Reference in New Issue