Merge pull request #12048 from SergeTupchiy/fix-coap_sub-opts

Fix COAP sub opts
This commit is contained in:
SergeTupchiy 2023-12-05 10:27:04 +02:00 committed by GitHub
commit 7b9d12f20e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 10 deletions

View File

@ -74,14 +74,16 @@ check_topic([]) ->
check_topic(Path) ->
{ok, emqx_http_lib:uri_decode(iolist_to_binary(lists:join(<<"/">>, Path)))}.
get_sub_opts(#coap_message{options = Opts} = Msg) ->
SubOpts = maps:fold(fun parse_sub_opts/3, #{}, Opts),
get_sub_opts(Msg) ->
SubOpts = maps:fold(
fun parse_sub_opts/3, #{}, emqx_coap_message:get_option(uri_query, Msg, #{})
),
case SubOpts of
#{qos := _} ->
maps:merge(SubOpts, ?SUBOPTS);
maps:merge(?SUBOPTS, SubOpts);
_ ->
CfgType = emqx_conf:get([gateway, coap, subscribe_qos], ?QOS_0),
maps:merge(SubOpts, ?SUBOPTS#{qos => type_to_qos(CfgType, Msg)})
maps:merge(?SUBOPTS#{qos => type_to_qos(CfgType, Msg)}, SubOpts)
end.
parse_sub_opts(<<"qos">>, V, Opts) ->

View File

@ -345,6 +345,45 @@ t_subscribe(_) ->
Topics
).
t_subscribe_with_qos_opt(_) ->
Topics = [
{<<"abc">>, 0},
{<<"/abc">>, 1},
{<<"abc/d">>, 2}
],
Fun = fun({Topic, Qos}, Channel, Token) ->
Payload = <<"123">>,
URI = pubsub_uri(binary_to_list(Topic), Token) ++ "&qos=" ++ integer_to_list(Qos),
Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req),
?LOGT("observer topic:~ts~n", [Topic]),
%% ensure subscribe succeed
timer:sleep(100),
[SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)),
?assertEqual(Qos, maps:get(qos, emqx_broker:get_subopts(SubPid, Topic))),
%% publish a message
emqx:publish(emqx_message:make(Topic, Payload)),
{ok, content, Notify} = with_response(Channel),
?LOGT("observer get Notif=~p", [Notify]),
#coap_content{payload = PayloadRecv} = Notify,
?assertEqual(Payload, PayloadRecv)
end,
with_connection(Topics, Fun),
%% subscription removed if coap client disconnected
timer:sleep(100),
lists:foreach(
fun({Topic, _Qos}) ->
?assertEqual([], emqx:subscribers(Topic))
end,
Topics
).
t_un_subscribe(_) ->
%% can unsubscribe to a normal topic
Topics = [

View File

@ -57,14 +57,22 @@ all() ->
init_per_suite(Config) ->
application:load(emqx_gateway_coap),
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_auth, emqx_gateway]),
Config.
Apps = emqx_cth_suite:start(
[
{emqx_conf, ?CONF_DEFAULT},
emqx_gateway,
emqx_auth,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
_ = emqx_common_test_http:create_default_app(),
[{suite_apps, Apps} | Config].
end_per_suite(Config) ->
{ok, _} = emqx:remove_config([<<"gateway">>, <<"coap">>]),
emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_auth]),
Config.
emqx_cth_suite:stop(?config(suite_apps, Config)),
emqx_config:delete_override_conf_files().
%%--------------------------------------------------------------------
%% Cases

View File

@ -0,0 +1 @@
Fix COAP gateway bug that caused it to ignore subscription options.