%%-------------------------------------------------------------------- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_coap_pubsub_SUITE). -compile(export_all). -compile(nowarn_export_all). -include_lib("gen_coap/include/coap.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx.hrl"). -define(LOGT(Format, Args), ct:pal(Format, Args)). all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> emqx_ct_helpers:start_apps([emqx_coap], fun set_special_cfg/1), Config. set_special_cfg(emqx_coap) -> application:set_env(emqx_coap, enable_stats, true); set_special_cfg(_) -> ok. end_per_suite(Config) -> emqx_ct_helpers:stop_apps([emqx_coap]), Config. %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- t_update_max_age(_Config) -> TopicInPayload = <<"topic1">>, Payload = <<";ct=42">>, Payload1 = <<";ct=50">>, URI = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret", URI2 = "coap://127.0.0.1/ps/topic1"++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, URI, #coap_content{format = <<"application/link-format">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), ?LOGT("lookup topic info=~p", [TopicInfo]), ?assertEqual(60, MaxAge1), ?assertEqual(<<"42">>, CT1), timer:sleep(50), %% post to create the same topic but with different max age and ct value in payload Reply1 = er_coap_client:request(post, URI, #coap_content{max_age = 70, format = <<"application/link-format">>, payload = Payload1}), {ok,created, #coap_content{location_path = LocPath}} = Reply1, ?assertEqual([<<"/ps/topic1">>] ,LocPath), [{TopicInPayload, MaxAge2, CT2, _ResPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), ?assertEqual(70, MaxAge2), ?assertEqual(<<"50">>, CT2), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI2). t_create_subtopic(_Config) -> TopicInPayload = <<"topic1">>, TopicInPayloadStr = "topic1", Payload = <<";ct=42">>, URI = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret", RealURI = "coap://127.0.0.1/ps/topic1"++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, URI, #coap_content{format = <<"application/link-format">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), ?LOGT("lookup topic info=~p", [TopicInfo]), ?assertEqual(60, MaxAge1), ?assertEqual(<<"42">>, CT1), timer:sleep(50), %% post to create the a sub topic SubPayload = <<";ct=42">>, SubTopicInPayloadStr = "subtopic", SubURI = "coap://127.0.0.1/ps/"++TopicInPayloadStr++"?c=client1&u=tom&p=secret", SubRealURI = "coap://127.0.0.1/ps/"++TopicInPayloadStr++"/"++SubTopicInPayloadStr++"?c=client1&u=tom&p=secret", FullTopic = list_to_binary(TopicInPayloadStr++"/"++SubTopicInPayloadStr), Reply1 = er_coap_client:request(post, SubURI, #coap_content{format = <<"application/link-format">>, payload = SubPayload}), ?LOGT("Reply =~p", [Reply1]), {ok,created, #coap_content{location_path = LocPath1}} = Reply1, ?assertEqual([<<"/ps/topic1/subtopic">>] ,LocPath1), [{FullTopic, MaxAge2, CT2, _ResPayload, _}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic), ?assertEqual(60, MaxAge2), ?assertEqual(<<"42">>, CT2), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, SubRealURI), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, RealURI). t_over_max_age(_Config) -> TopicInPayload = <<"topic1">>, Payload = <<";ct=42">>, URI = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, URI, #coap_content{max_age = 2, format = <<"application/link-format">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), ?LOGT("lookup topic info=~p", [TopicInfo]), ?assertEqual(2, MaxAge1), ?assertEqual(<<"42">>, CT1), timer:sleep(3000), ?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(TopicInPayload)). t_refreash_max_age(_Config) -> TopicInPayload = <<"topic1">>, Payload = <<";ct=42">>, Payload1 = <<";ct=50">>, URI = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret", RealURI = "coap://127.0.0.1/ps/topic1"++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/link-format">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), ?LOGT("lookup topic info=~p", [TopicInfo]), ?LOGT("TimeStamp=~p", [TimeStamp]), ?assertEqual(5, MaxAge1), ?assertEqual(<<"42">>, CT1), timer:sleep(3000), %% post to create the same topic, the max age timer will be restarted with the new max age value Reply1 = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/link-format">>, payload = Payload1}), {ok,created, #coap_content{location_path = LocPath}} = Reply1, ?assertEqual([<<"/ps/topic1">>] ,LocPath), [{TopicInPayload, MaxAge2, CT2, _ResPayload, TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), ?LOGT("TimeStamp1=~p", [TimeStamp1]), ?assertEqual(5, MaxAge2), ?assertEqual(<<"50">>, CT2), timer:sleep(3000), ?assertEqual(false, emqx_coap_pubsub_topics:is_topic_timeout(TopicInPayload)), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, RealURI). t_case01_publish_post(_Config) -> timer:sleep(100), MainTopic = <<"maintopic">>, TopicInPayload = <<"topic1">>, Payload = <<";ct=42">>, MainTopicStr = binary_to_list(MainTopic), %% post to create topic maintopic/topic1 URI1 = "coap://127.0.0.1/ps/"++MainTopicStr++"?c=client1&u=tom&p=secret", FullTopic = list_to_binary(MainTopicStr++"/"++binary_to_list(TopicInPayload)), Reply1 = er_coap_client:request(post, URI1, #coap_content{format = <<"application/link-format">>, payload = Payload}), ?LOGT("Reply =~p", [Reply1]), {ok,created, #coap_content{location_path = LocPath1}} = Reply1, ?assertEqual([<<"/ps/maintopic/topic1">>] ,LocPath1), [{FullTopic, MaxAge, CT2, <<>>, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic), ?assertEqual(60, MaxAge), ?assertEqual(<<"42">>, CT2), %% post to publish message to topic maintopic/topic1 FullTopicStr = emqx_http_lib:uri_encode(binary_to_list(FullTopic)), URI2 = "coap://127.0.0.1/ps/"++FullTopicStr++"?c=client1&u=tom&p=secret", PubPayload = <<"PUBLISH">>, %% Sub topic first emqx:subscribe(FullTopic), Reply2 = er_coap_client:request(post, URI2, #coap_content{format = <<"application/octet-stream">>, payload = PubPayload}), ?LOGT("Reply =~p", [Reply2]), {ok,changed, _} = Reply2, TopicInfo = [{FullTopic, MaxAge, CT2, PubPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic), ?LOGT("the topic info =~p", [TopicInfo]), assert_recv(FullTopic, PubPayload), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI2). t_case02_publish_post(_Config) -> Topic = <<"topic1">>, TopicStr = binary_to_list(Topic), Payload = <<"payload">>, %% Sub topic first emqx:subscribe(Topic), %% post to publish a new topic "topic1", and the topic is created URI = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?assertEqual(60, MaxAge), ?assertEqual(<<"42">>, CT), assert_recv(Topic, Payload), %% post to publish a new message to the same topic "topic1" with different payload NewPayload = <<"newpayload">>, Reply1 = er_coap_client:request(post, URI, #coap_content{format = <<"application/octet-stream">>, payload = NewPayload}), ?LOGT("Reply =~p", [Reply1]), {ok,changed, _} = Reply1, [{Topic, MaxAge, CT, NewPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), assert_recv(Topic, NewPayload), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI). t_case03_publish_post(_Config) -> Topic = <<"topic1">>, TopicStr = binary_to_list(Topic), Payload = <<"payload">>, %% Sub topic first emqx:subscribe(Topic), %% post to publish a new topic "topic1", and the topic is created URI = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?assertEqual(60, MaxAge), ?assertEqual(<<"42">>, CT), assert_recv(Topic, Payload), %% post to publish a new message to the same topic "topic1", but the ct is not same as created NewPayload = <<"newpayload">>, Reply1 = er_coap_client:request(post, URI, #coap_content{format = <<"application/exi">>, payload = NewPayload}), ?LOGT("Reply =~p", [Reply1]), ?assertEqual({error,bad_request}, Reply1), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI). t_case04_publish_post(_Config) -> Topic = <<"topic1">>, TopicStr = binary_to_list(Topic), Payload = <<"payload">>, %% post to publish a new topic "topic1", and the topic is created URI = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/octet-stream">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?assertEqual(5, MaxAge), ?assertEqual(<<"42">>, CT), %% after max age timeout, the topic still exists but the status is timeout timer:sleep(6000), ?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(Topic)), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI). t_case01_publish_put(_Config) -> MainTopic = <<"maintopic">>, TopicInPayload = <<"topic1">>, Payload = <<";ct=42">>, MainTopicStr = binary_to_list(MainTopic), %% post to create topic maintopic/topic1 URI1 = "coap://127.0.0.1/ps/"++MainTopicStr++"?c=client1&u=tom&p=secret", FullTopic = list_to_binary(MainTopicStr++"/"++binary_to_list(TopicInPayload)), Reply1 = er_coap_client:request(post, URI1, #coap_content{format = <<"application/link-format">>, payload = Payload}), ?LOGT("Reply =~p", [Reply1]), {ok,created, #coap_content{location_path = LocPath1}} = Reply1, ?assertEqual([<<"/ps/maintopic/topic1">>] ,LocPath1), [{FullTopic, MaxAge, CT2, <<>>, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic), ?assertEqual(60, MaxAge), ?assertEqual(<<"42">>, CT2), %% put to publish message to topic maintopic/topic1 FullTopicStr = emqx_http_lib:uri_encode(binary_to_list(FullTopic)), URI2 = "coap://127.0.0.1/ps/"++FullTopicStr++"?c=client1&u=tom&p=secret", PubPayload = <<"PUBLISH">>, %% Sub topic first emqx:subscribe(FullTopic), Reply2 = er_coap_client:request(put, URI2, #coap_content{format = <<"application/octet-stream">>, payload = PubPayload}), ?LOGT("Reply =~p", [Reply2]), {ok,changed, _} = Reply2, [{FullTopic, MaxAge, CT2, PubPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic), assert_recv(FullTopic, PubPayload), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI2). t_case02_publish_put(_Config) -> Topic = <<"topic1">>, TopicStr = binary_to_list(Topic), Payload = <<"payload">>, %% Sub topic first emqx:subscribe(Topic), %% put to publish a new topic "topic1", and the topic is created URI = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?assertEqual(60, MaxAge), ?assertEqual(<<"42">>, CT), assert_recv(Topic, Payload), %% put to publish a new message to the same topic "topic1" with different payload NewPayload = <<"newpayload">>, Reply1 = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = NewPayload}), ?LOGT("Reply =~p", [Reply1]), {ok,changed, _} = Reply1, [{Topic, MaxAge, CT, NewPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), assert_recv(Topic, NewPayload), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI). t_case03_publish_put(_Config) -> Topic = <<"topic1">>, TopicStr = binary_to_list(Topic), Payload = <<"payload">>, %% Sub topic first emqx:subscribe(Topic), %% put to publish a new topic "topic1", and the topic is created URI = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?assertEqual(60, MaxAge), ?assertEqual(<<"42">>, CT), assert_recv(Topic, Payload), %% put to publish a new message to the same topic "topic1", but the ct is not same as created NewPayload = <<"newpayload">>, Reply1 = er_coap_client:request(put, URI, #coap_content{format = <<"application/exi">>, payload = NewPayload}), ?LOGT("Reply =~p", [Reply1]), ?assertEqual({error,bad_request}, Reply1), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI). t_case04_publish_put(_Config) -> Topic = <<"topic1">>, TopicStr = binary_to_list(Topic), Payload = <<"payload">>, %% put to publish a new topic "topic1", and the topic is created URI = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(put, URI, #coap_content{max_age = 5, format = <<"application/octet-stream">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?assertEqual(5, MaxAge), ?assertEqual(<<"42">>, CT), %% after max age timeout, no publish message to the same topic, the topic info will be deleted %%%%%%%%%%%%%%%%%%%%%%%%%% % but there is one thing to do is we don't count in the publish message received from emqx(from other node).TBD!!!!!!!!!!!!! %%%%%%%%%%%%%%%%%%%%%%%%%% timer:sleep(6000), ?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(Topic)), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI). t_case01_subscribe(_Config) -> Topic = <<"topic1">>, Payload1 = <<";ct=42">>, timer:sleep(100), %% First post to create a topic "topic1" Uri = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, Uri, #coap_content{format = <<"application/link-format">>, payload = Payload1}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = [LocPath]}} = Reply, ?assertEqual(<<"/ps/topic1">> ,LocPath), TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?LOGT("lookup topic info=~p", [TopicInfo]), ?assertEqual(60, MaxAge1), ?assertEqual(<<"42">>, CT1), %% Subscribe the topic Uri1 = "coap://127.0.0.1"++binary_to_list(LocPath)++"?c=client1&u=tom&p=secret", {ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri1), ?LOGT("observer Pid=~p, N=~p, Code=~p, Content=~p", [Pid, N, Code, Content]), [SubPid] = emqx:subscribers(Topic), ?assert(is_pid(SubPid)), %% Publish a message Payload = <<"123">>, emqx:publish(emqx_message:make(Topic, Payload)), Notif = receive_notification(), ?LOGT("observer get Notif=~p", [Notif]), {coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv}} = Notif, ?assertEqual(Payload, PayloadRecv), %% GET to read the publish message of the topic Reply1 = er_coap_client:request(get, Uri1), ?LOGT("Reply=~p", [Reply1]), {ok,content, #coap_content{payload = <<"123">>}} = Reply1, er_coap_observer:stop(Pid), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, Uri1). t_case02_subscribe(_Config) -> Topic = <<"a/b">>, TopicStr = binary_to_list(Topic), PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr), Payload = <<"payload">>, %% post to publish a new topic "a/b", and the topic is created URI = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/octet-stream">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/a/b">>] ,LocPath), [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?assertEqual(5, MaxAge), ?assertEqual(<<"42">>, CT), %% Wait for the max age of the timer expires timer:sleep(6000), ?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(Topic)), %% Subscribe to the timeout topic "a/b", still successfully,got {ok, nocontent} Method Uri = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret", Reply1 = {ok, Pid, _N, nocontent, _} = er_coap_observer:observe(Uri), ?LOGT("Subscribe Reply=~p", [Reply1]), [SubPid] = emqx:subscribers(Topic), ?assert(is_pid(SubPid)), %% put to publish to topic "a/b" Reply2 = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}), {ok,changed, #coap_content{}} = Reply2, [{Topic, MaxAge1, CT, Payload, TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?assertEqual(60, MaxAge1), ?assertEqual(<<"42">>, CT), ?assertEqual(false, TimeStamp =:= timeout), %% Publish a message emqx:publish(emqx_message:make(Topic, Payload)), Notif = receive_notification(), ?LOGT("observer get Notif=~p", [Notif]), {coap_notify, _, _, {ok,content}, #coap_content{payload = Payload}} = Notif, er_coap_observer:stop(Pid), {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI). t_case03_subscribe(_Config) -> %% Subscribe to the unexisted topic "a/b", got not_found Topic = <<"a/b">>, TopicStr = binary_to_list(Topic), PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr), Uri = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret", {error, not_found} = er_coap_observer:observe(Uri), [] = emqx:subscribers(Topic). t_case04_subscribe(_Config) -> %% Subscribe to the wildcad topic "+/b", got bad_request Topic = <<"+/b">>, TopicStr = binary_to_list(Topic), PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr), Uri = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret", {error, bad_request} = er_coap_observer:observe(Uri), [] = emqx:subscribers(Topic). t_case01_read(_Config) -> Topic = <<"topic1">>, TopicStr = binary_to_list(Topic), Payload = <<"PubPayload">>, timer:sleep(100), %% First post to create a topic "topic1" Uri = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, Uri, #coap_content{format = <<"application/octet-stream">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = [LocPath]}} = Reply, ?assertEqual(<<"/ps/topic1">> ,LocPath), TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?LOGT("lookup topic info=~p", [TopicInfo]), ?assertEqual(60, MaxAge1), ?assertEqual(<<"42">>, CT1), %% GET to read the publish message of the topic timer:sleep(1000), Reply1 = er_coap_client:request(get, Uri), ?LOGT("Reply=~p", [Reply1]), {ok,content, #coap_content{payload = Payload}} = Reply1, {ok, deleted, #coap_content{}} = er_coap_client:request(delete, Uri). t_case02_read(_Config) -> Topic = <<"topic1">>, TopicStr = binary_to_list(Topic), Payload = <<"PubPayload">>, timer:sleep(100), %% First post to publish a topic "topic1" Uri = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, Uri, #coap_content{format = <<"application/octet-stream">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = [LocPath]}} = Reply, ?assertEqual(<<"/ps/topic1">> ,LocPath), TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?LOGT("lookup topic info=~p", [TopicInfo]), ?assertEqual(60, MaxAge1), ?assertEqual(<<"42">>, CT1), %% GET to read the publish message of unmatched format, got bad_request Reply1 = er_coap_client:request(get, Uri, #coap_content{format = <<"application/json">>}), ?LOGT("Reply=~p", [Reply1]), {error, bad_request} = Reply1, {ok, deleted, #coap_content{}} = er_coap_client:request(delete, Uri). t_case03_read(_Config) -> Topic = <<"topic1">>, TopicStr = binary_to_list(Topic), Uri = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret", timer:sleep(100), %% GET to read the nexisted topic "topic1", got not_found Reply = er_coap_client:request(get, Uri), ?LOGT("Reply=~p", [Reply]), {error, not_found} = Reply. t_case04_read(_Config) -> Topic = <<"topic1">>, TopicStr = binary_to_list(Topic), Payload = <<"PubPayload">>, timer:sleep(100), %% First post to publish a topic "topic1" Uri = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, Uri, #coap_content{format = <<"application/octet-stream">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = [LocPath]}} = Reply, ?assertEqual(<<"/ps/topic1">> ,LocPath), TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?LOGT("lookup topic info=~p", [TopicInfo]), ?assertEqual(60, MaxAge1), ?assertEqual(<<"42">>, CT1), %% GET to read the publish message of wildcard topic, got bad_request WildTopic = binary_to_list(<<"+/topic1">>), Uri1 = "coap://127.0.0.1/ps/"++WildTopic++"?c=client1&u=tom&p=secret", Reply1 = er_coap_client:request(get, Uri1, #coap_content{format = <<"application/json">>}), ?LOGT("Reply=~p", [Reply1]), {error, bad_request} = Reply1, {ok, deleted, #coap_content{}} = er_coap_client:request(delete, Uri). t_case05_read(_Config) -> Topic = <<"a/b">>, TopicStr = binary_to_list(Topic), PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr), Payload = <<"payload">>, %% post to publish a new topic "a/b", and the topic is created URI = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret", Reply = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/octet-stream">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/a/b">>] ,LocPath), [{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic), ?assertEqual(5, MaxAge), ?assertEqual(<<"42">>, CT), %% Wait for the max age of the timer expires timer:sleep(6000), ?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(Topic)), %% GET to read the expired publish message, supposed to get {ok, nocontent}, but now got {ok, content} Reply1 = er_coap_client:request(get, URI), ?LOGT("Reply=~p", [Reply1]), {ok, content, #coap_content{payload = <<>>}}= Reply1, {ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI). t_case01_delete(_Config) -> TopicInPayload = <<"a/b">>, TopicStr = binary_to_list(TopicInPayload), PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr), Payload = list_to_binary("<"++PercentEncodedTopic++">;ct=42"), URI = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret", %% Client post to CREATE topic "a/b" Reply = er_coap_client:request(post, URI, #coap_content{format = <<"application/link-format">>, payload = Payload}), ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/a/b">>] ,LocPath), %% Client post to CREATE topic "a/b/c" TopicInPayload1 = <<"a/b/c">>, PercentEncodedTopic1 = emqx_http_lib:uri_encode(binary_to_list(TopicInPayload1)), Payload1 = list_to_binary("<"++PercentEncodedTopic1++">;ct=42"), Reply1 = er_coap_client:request(post, URI, #coap_content{format = <<"application/link-format">>, payload = Payload1}), ?LOGT("Reply =~p", [Reply1]), {ok,created, #coap_content{location_path = LocPath1}} = Reply1, ?assertEqual([<<"/ps/a/b/c">>] ,LocPath1), timer:sleep(50), %% DELETE the topic "a/b" UriD = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret", ReplyD = er_coap_client:request(delete, UriD), ?LOGT("Reply=~p", [ReplyD]), {ok, deleted, #coap_content{}}= ReplyD, timer:sleep(300), %% Waiting gen_server:cast/2 for deleting operation ?assertEqual(false, emqx_coap_pubsub_topics:is_topic_existed(TopicInPayload)), ?assertEqual(false, emqx_coap_pubsub_topics:is_topic_existed(TopicInPayload1)). t_case02_delete(_Config) -> TopicInPayload = <<"a/b">>, TopicStr = binary_to_list(TopicInPayload), PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr), %% DELETE the unexisted topic "a/b" Uri1 = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret", Reply1 = er_coap_client:request(delete, Uri1), ?LOGT("Reply=~p", [Reply1]), {error, not_found} = Reply1. t_case13_emit_stats_test(_Config) -> ok. %%-------------------------------------------------------------------- %% Internal functions receive_notification() -> receive {coap_notify, Pid, N2, Code2, Content2} -> {coap_notify, Pid, N2, Code2, Content2} after 2000 -> receive_notification_timeout end. assert_recv(Topic, Payload) -> receive {deliver, _, Msg} -> ?assertEqual(Topic, Msg#message.topic), ?assertEqual(Payload, Msg#message.payload) after 500 -> ?assert(false) end.