emqx/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl

678 lines
29 KiB
Erlang
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_pubsub_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("gen_coap/include/coap.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-define(LOGT(Format, Args), ct:pal(Format, Args)).
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_coap], fun set_special_cfg/1),
Config.
set_special_cfg(emqx_coap) ->
application:set_env(emqx_coap, enable_stats, true);
set_special_cfg(_) ->
ok.
end_per_suite(Config) ->
emqx_ct_helpers:stop_apps([emqx_coap]),
Config.
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_update_max_age(_Config) ->
TopicInPayload = <<"topic1">>,
Payload = <<"<topic1>;ct=42">>,
Payload1 = <<"<topic1>;ct=50">>,
URI = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret",
URI2 = "coap://127.0.0.1/ps/topic1"++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, URI, #coap_content{format = <<"application/link-format">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/topic1">>] ,LocPath),
TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload1, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
?LOGT("lookup topic info=~p", [TopicInfo]),
?assertEqual(60, MaxAge1),
?assertEqual(<<"42">>, CT1),
timer:sleep(50),
%% post to create the same topic but with different max age and ct value in payload
Reply1 = er_coap_client:request(post, URI, #coap_content{max_age = 70, format = <<"application/link-format">>, payload = Payload1}),
{ok,created, #coap_content{location_path = LocPath}} = Reply1,
?assertEqual([<<"/ps/topic1">>] ,LocPath),
[{TopicInPayload, MaxAge2, CT2, _ResPayload2, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
?assertEqual(70, MaxAge2),
?assertEqual(<<"50">>, CT2),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI2).
t_create_subtopic(_Config) ->
TopicInPayload = <<"topic1">>,
TopicInPayloadStr = "topic1",
Payload = <<"<topic1>;ct=42">>,
URI = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret",
RealURI = "coap://127.0.0.1/ps/topic1"++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, URI, #coap_content{format = <<"application/link-format">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/topic1">>] ,LocPath),
TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload1, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
?LOGT("lookup topic info=~p", [TopicInfo]),
?assertEqual(60, MaxAge1),
?assertEqual(<<"42">>, CT1),
timer:sleep(50),
%% post to create the a sub topic
SubPayload = <<"<subtopic>;ct=42">>,
SubTopicInPayloadStr = "subtopic",
SubURI = "coap://127.0.0.1/ps/"++TopicInPayloadStr++"?c=client1&u=tom&p=secret",
SubRealURI = "coap://127.0.0.1/ps/"++TopicInPayloadStr++"/"++SubTopicInPayloadStr++"?c=client1&u=tom&p=secret",
FullTopic = list_to_binary(TopicInPayloadStr++"/"++SubTopicInPayloadStr),
Reply1 = er_coap_client:request(post, SubURI, #coap_content{format = <<"application/link-format">>, payload = SubPayload}),
?LOGT("Reply =~p", [Reply1]),
{ok,created, #coap_content{location_path = LocPath1}} = Reply1,
?assertEqual([<<"/ps/topic1/subtopic">>] ,LocPath1),
[{FullTopic, MaxAge2, CT2, _ResPayload2, _}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic),
?assertEqual(60, MaxAge2),
?assertEqual(<<"42">>, CT2),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, SubRealURI),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, RealURI).
t_over_max_age(_Config) ->
TopicInPayload = <<"topic1">>,
Payload = <<"<topic1>;ct=42">>,
URI = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, URI, #coap_content{max_age = 2, format = <<"application/link-format">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/topic1">>] ,LocPath),
TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
?LOGT("lookup topic info=~p", [TopicInfo]),
?assertEqual(2, MaxAge1),
?assertEqual(<<"42">>, CT1),
timer:sleep(3000),
?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(TopicInPayload)).
t_refreash_max_age(_Config) ->
TopicInPayload = <<"topic1">>,
Payload = <<"<topic1>;ct=42">>,
Payload1 = <<"<topic1>;ct=50">>,
URI = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret",
RealURI = "coap://127.0.0.1/ps/topic1"++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/link-format">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/topic1">>] ,LocPath),
TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload1, TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
?LOGT("lookup topic info=~p", [TopicInfo]),
?LOGT("TimeStamp=~p", [TimeStamp]),
?assertEqual(5, MaxAge1),
?assertEqual(<<"42">>, CT1),
timer:sleep(3000),
%% post to create the same topic, the max age timer will be restarted with the new max age value
Reply1 = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/link-format">>, payload = Payload1}),
{ok,created, #coap_content{location_path = LocPath}} = Reply1,
?assertEqual([<<"/ps/topic1">>] ,LocPath),
[{TopicInPayload, MaxAge2, CT2, _ResPayload2, TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
?LOGT("TimeStamp1=~p", [TimeStamp1]),
?assertEqual(5, MaxAge2),
?assertEqual(<<"50">>, CT2),
timer:sleep(3000),
?assertEqual(false, emqx_coap_pubsub_topics:is_topic_timeout(TopicInPayload)),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, RealURI).
t_case01_publish_post(_Config) ->
timer:sleep(100),
MainTopic = <<"maintopic">>,
TopicInPayload = <<"topic1">>,
Payload = <<"<topic1>;ct=42">>,
MainTopicStr = binary_to_list(MainTopic),
%% post to create topic maintopic/topic1
URI1 = "coap://127.0.0.1/ps/"++MainTopicStr++"?c=client1&u=tom&p=secret",
FullTopic = list_to_binary(MainTopicStr++"/"++binary_to_list(TopicInPayload)),
Reply1 = er_coap_client:request(post, URI1, #coap_content{format = <<"application/link-format">>, payload = Payload}),
?LOGT("Reply =~p", [Reply1]),
{ok,created, #coap_content{location_path = LocPath1}} = Reply1,
?assertEqual([<<"/ps/maintopic/topic1">>] ,LocPath1),
[{FullTopic, MaxAge, CT2, <<>>, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic),
?assertEqual(60, MaxAge),
?assertEqual(<<"42">>, CT2),
%% post to publish message to topic maintopic/topic1
FullTopicStr = emqx_http_lib:uri_encode(binary_to_list(FullTopic)),
URI2 = "coap://127.0.0.1/ps/"++FullTopicStr++"?c=client1&u=tom&p=secret",
PubPayload = <<"PUBLISH">>,
%% Sub topic first
emqx:subscribe(FullTopic),
Reply2 = er_coap_client:request(post, URI2, #coap_content{format = <<"application/octet-stream">>, payload = PubPayload}),
?LOGT("Reply =~p", [Reply2]),
{ok,changed, _} = Reply2,
TopicInfo = [{FullTopic, MaxAge, CT2, PubPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic),
?LOGT("the topic info =~p", [TopicInfo]),
assert_recv(FullTopic, PubPayload),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI2).
t_case02_publish_post(_Config) ->
Topic = <<"topic1">>,
TopicStr = binary_to_list(Topic),
Payload = <<"payload">>,
%% Sub topic first
emqx:subscribe(Topic),
%% post to publish a new topic "topic1", and the topic is created
URI = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/topic1">>] ,LocPath),
[{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?assertEqual(60, MaxAge),
?assertEqual(<<"42">>, CT),
assert_recv(Topic, Payload),
%% post to publish a new message to the same topic "topic1" with different payload
NewPayload = <<"newpayload">>,
Reply1 = er_coap_client:request(post, URI, #coap_content{format = <<"application/octet-stream">>, payload = NewPayload}),
?LOGT("Reply =~p", [Reply1]),
{ok,changed, _} = Reply1,
[{Topic, MaxAge, CT, NewPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
assert_recv(Topic, NewPayload),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
t_case03_publish_post(_Config) ->
Topic = <<"topic1">>,
TopicStr = binary_to_list(Topic),
Payload = <<"payload">>,
%% Sub topic first
emqx:subscribe(Topic),
%% post to publish a new topic "topic1", and the topic is created
URI = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/topic1">>] ,LocPath),
[{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?assertEqual(60, MaxAge),
?assertEqual(<<"42">>, CT),
assert_recv(Topic, Payload),
%% post to publish a new message to the same topic "topic1", but the ct is not same as created
NewPayload = <<"newpayload">>,
Reply1 = er_coap_client:request(post, URI, #coap_content{format = <<"application/exi">>, payload = NewPayload}),
?LOGT("Reply =~p", [Reply1]),
?assertEqual({error,bad_request}, Reply1),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
t_case04_publish_post(_Config) ->
Topic = <<"topic1">>,
TopicStr = binary_to_list(Topic),
Payload = <<"payload">>,
%% post to publish a new topic "topic1", and the topic is created
URI = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/octet-stream">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/topic1">>] ,LocPath),
[{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?assertEqual(5, MaxAge),
?assertEqual(<<"42">>, CT),
%% after max age timeout, the topic still exists but the status is timeout
timer:sleep(6000),
?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(Topic)),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
t_case01_publish_put(_Config) ->
MainTopic = <<"maintopic">>,
TopicInPayload = <<"topic1">>,
Payload = <<"<topic1>;ct=42">>,
MainTopicStr = binary_to_list(MainTopic),
%% post to create topic maintopic/topic1
URI1 = "coap://127.0.0.1/ps/"++MainTopicStr++"?c=client1&u=tom&p=secret",
FullTopic = list_to_binary(MainTopicStr++"/"++binary_to_list(TopicInPayload)),
Reply1 = er_coap_client:request(post, URI1, #coap_content{format = <<"application/link-format">>, payload = Payload}),
?LOGT("Reply =~p", [Reply1]),
{ok,created, #coap_content{location_path = LocPath1}} = Reply1,
?assertEqual([<<"/ps/maintopic/topic1">>] ,LocPath1),
[{FullTopic, MaxAge, CT2, <<>>, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic),
?assertEqual(60, MaxAge),
?assertEqual(<<"42">>, CT2),
%% put to publish message to topic maintopic/topic1
FullTopicStr = emqx_http_lib:uri_encode(binary_to_list(FullTopic)),
URI2 = "coap://127.0.0.1/ps/"++FullTopicStr++"?c=client1&u=tom&p=secret",
PubPayload = <<"PUBLISH">>,
%% Sub topic first
emqx:subscribe(FullTopic),
Reply2 = er_coap_client:request(put, URI2, #coap_content{format = <<"application/octet-stream">>, payload = PubPayload}),
?LOGT("Reply =~p", [Reply2]),
{ok,changed, _} = Reply2,
[{FullTopic, MaxAge, CT2, PubPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic),
assert_recv(FullTopic, PubPayload),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI2).
t_case02_publish_put(_Config) ->
Topic = <<"topic1">>,
TopicStr = binary_to_list(Topic),
Payload = <<"payload">>,
%% Sub topic first
emqx:subscribe(Topic),
%% put to publish a new topic "topic1", and the topic is created
URI = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/topic1">>] ,LocPath),
[{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?assertEqual(60, MaxAge),
?assertEqual(<<"42">>, CT),
assert_recv(Topic, Payload),
%% put to publish a new message to the same topic "topic1" with different payload
NewPayload = <<"newpayload">>,
Reply1 = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = NewPayload}),
?LOGT("Reply =~p", [Reply1]),
{ok,changed, _} = Reply1,
[{Topic, MaxAge, CT, NewPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
assert_recv(Topic, NewPayload),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
t_case03_publish_put(_Config) ->
Topic = <<"topic1">>,
TopicStr = binary_to_list(Topic),
Payload = <<"payload">>,
%% Sub topic first
emqx:subscribe(Topic),
%% put to publish a new topic "topic1", and the topic is created
URI = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/topic1">>] ,LocPath),
[{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?assertEqual(60, MaxAge),
?assertEqual(<<"42">>, CT),
assert_recv(Topic, Payload),
%% put to publish a new message to the same topic "topic1", but the ct is not same as created
NewPayload = <<"newpayload">>,
Reply1 = er_coap_client:request(put, URI, #coap_content{format = <<"application/exi">>, payload = NewPayload}),
?LOGT("Reply =~p", [Reply1]),
?assertEqual({error,bad_request}, Reply1),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
t_case04_publish_put(_Config) ->
Topic = <<"topic1">>,
TopicStr = binary_to_list(Topic),
Payload = <<"payload">>,
%% put to publish a new topic "topic1", and the topic is created
URI = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(put, URI, #coap_content{max_age = 5, format = <<"application/octet-stream">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/topic1">>] ,LocPath),
[{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?assertEqual(5, MaxAge),
?assertEqual(<<"42">>, CT),
%% after max age timeout, no publish message to the same topic, the topic info will be deleted
%%%%%%%%%%%%%%%%%%%%%%%%%%
% but there is one thing to do is we don't count in the publish message received from emqx(from other node).TBD!!!!!!!!!!!!!
%%%%%%%%%%%%%%%%%%%%%%%%%%
timer:sleep(6000),
?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(Topic)),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
t_case01_subscribe(_Config) ->
Topic = <<"topic1">>,
Payload1 = <<"<topic1>;ct=42">>,
timer:sleep(100),
%% First post to create a topic "topic1"
Uri = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, Uri, #coap_content{format = <<"application/link-format">>, payload = Payload1}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = [LocPath]}} = Reply,
?assertEqual(<<"/ps/topic1">> ,LocPath),
TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?LOGT("lookup topic info=~p", [TopicInfo]),
?assertEqual(60, MaxAge1),
?assertEqual(<<"42">>, CT1),
%% Subscribe the topic
Uri1 = "coap://127.0.0.1"++binary_to_list(LocPath)++"?c=client1&u=tom&p=secret",
{ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri1),
?LOGT("observer Pid=~p, N=~p, Code=~p, Content=~p", [Pid, N, Code, Content]),
[SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)),
%% Publish a message
Payload = <<"123">>,
emqx:publish(emqx_message:make(Topic, Payload)),
Notif = receive_notification(),
?LOGT("observer get Notif=~p", [Notif]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv}} = Notif,
?assertEqual(Payload, PayloadRecv),
%% GET to read the publish message of the topic
Reply1 = er_coap_client:request(get, Uri1),
?LOGT("Reply=~p", [Reply1]),
{ok,content, #coap_content{payload = <<"123">>}} = Reply1,
er_coap_observer:stop(Pid),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, Uri1).
t_case02_subscribe(_Config) ->
Topic = <<"a/b">>,
TopicStr = binary_to_list(Topic),
PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr),
Payload = <<"payload">>,
%% post to publish a new topic "a/b", and the topic is created
URI = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/octet-stream">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/a/b">>] ,LocPath),
[{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?assertEqual(5, MaxAge),
?assertEqual(<<"42">>, CT),
%% Wait for the max age of the timer expires
timer:sleep(6000),
?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(Topic)),
%% Subscribe to the timeout topic "a/b", still successfullygot {ok, nocontent} Method
Uri = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret",
Reply1 = {ok, Pid, _N, nocontent, _} = er_coap_observer:observe(Uri),
?LOGT("Subscribe Reply=~p", [Reply1]),
[SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)),
%% put to publish to topic "a/b"
Reply2 = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}),
{ok,changed, #coap_content{}} = Reply2,
[{Topic, MaxAge1, CT, Payload, TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?assertEqual(60, MaxAge1),
?assertEqual(<<"42">>, CT),
?assertEqual(false, TimeStamp =:= timeout),
%% Publish a message
emqx:publish(emqx_message:make(Topic, Payload)),
Notif = receive_notification(),
?LOGT("observer get Notif=~p", [Notif]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = Payload}} = Notif,
er_coap_observer:stop(Pid),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
t_case03_subscribe(_Config) ->
%% Subscribe to the unexisted topic "a/b", got not_found
Topic = <<"a/b">>,
TopicStr = binary_to_list(Topic),
PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr),
Uri = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret",
{error, not_found} = er_coap_observer:observe(Uri),
[] = emqx:subscribers(Topic).
t_case04_subscribe(_Config) ->
%% Subscribe to the wildcad topic "+/b", got bad_request
Topic = <<"+/b">>,
TopicStr = binary_to_list(Topic),
PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr),
Uri = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret",
{error, bad_request} = er_coap_observer:observe(Uri),
[] = emqx:subscribers(Topic).
t_case01_read(_Config) ->
Topic = <<"topic1">>,
TopicStr = binary_to_list(Topic),
Payload = <<"PubPayload">>,
timer:sleep(100),
%% First post to create a topic "topic1"
Uri = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, Uri, #coap_content{format = <<"application/octet-stream">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = [LocPath]}} = Reply,
?assertEqual(<<"/ps/topic1">> ,LocPath),
TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?LOGT("lookup topic info=~p", [TopicInfo]),
?assertEqual(60, MaxAge1),
?assertEqual(<<"42">>, CT1),
%% GET to read the publish message of the topic
timer:sleep(1000),
Reply1 = er_coap_client:request(get, Uri),
?LOGT("Reply=~p", [Reply1]),
{ok,content, #coap_content{payload = Payload}} = Reply1,
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, Uri).
t_case02_read(_Config) ->
Topic = <<"topic1">>,
TopicStr = binary_to_list(Topic),
Payload = <<"PubPayload">>,
timer:sleep(100),
%% First post to publish a topic "topic1"
Uri = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, Uri, #coap_content{format = <<"application/octet-stream">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = [LocPath]}} = Reply,
?assertEqual(<<"/ps/topic1">> ,LocPath),
TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?LOGT("lookup topic info=~p", [TopicInfo]),
?assertEqual(60, MaxAge1),
?assertEqual(<<"42">>, CT1),
%% GET to read the publish message of unmatched format, got bad_request
Reply1 = er_coap_client:request(get, Uri, #coap_content{format = <<"application/json">>}),
?LOGT("Reply=~p", [Reply1]),
{error, bad_request} = Reply1,
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, Uri).
t_case03_read(_Config) ->
Topic = <<"topic1">>,
TopicStr = binary_to_list(Topic),
Uri = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret",
timer:sleep(100),
%% GET to read the nexisted topic "topic1", got not_found
Reply = er_coap_client:request(get, Uri),
?LOGT("Reply=~p", [Reply]),
{error, not_found} = Reply.
t_case04_read(_Config) ->
Topic = <<"topic1">>,
TopicStr = binary_to_list(Topic),
Payload = <<"PubPayload">>,
timer:sleep(100),
%% First post to publish a topic "topic1"
Uri = "coap://127.0.0.1/ps/"++TopicStr++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, Uri, #coap_content{format = <<"application/octet-stream">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = [LocPath]}} = Reply,
?assertEqual(<<"/ps/topic1">> ,LocPath),
TopicInfo = [{Topic, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?LOGT("lookup topic info=~p", [TopicInfo]),
?assertEqual(60, MaxAge1),
?assertEqual(<<"42">>, CT1),
%% GET to read the publish message of wildcard topic, got bad_request
WildTopic = binary_to_list(<<"+/topic1">>),
Uri1 = "coap://127.0.0.1/ps/"++WildTopic++"?c=client1&u=tom&p=secret",
Reply1 = er_coap_client:request(get, Uri1, #coap_content{format = <<"application/json">>}),
?LOGT("Reply=~p", [Reply1]),
{error, bad_request} = Reply1,
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, Uri).
t_case05_read(_Config) ->
Topic = <<"a/b">>,
TopicStr = binary_to_list(Topic),
PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr),
Payload = <<"payload">>,
%% post to publish a new topic "a/b", and the topic is created
URI = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret",
Reply = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/octet-stream">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/a/b">>] ,LocPath),
[{Topic, MaxAge, CT, Payload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(Topic),
?assertEqual(5, MaxAge),
?assertEqual(<<"42">>, CT),
%% Wait for the max age of the timer expires
timer:sleep(6000),
?assertEqual(true, emqx_coap_pubsub_topics:is_topic_timeout(Topic)),
%% GET to read the expired publish message, supposed to get {ok, nocontent}, but now got {ok, content}
Reply1 = er_coap_client:request(get, URI),
?LOGT("Reply=~p", [Reply1]),
{ok, content, #coap_content{payload = <<>>}}= Reply1,
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, URI).
t_case01_delete(_Config) ->
TopicInPayload = <<"a/b">>,
TopicStr = binary_to_list(TopicInPayload),
PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr),
Payload = list_to_binary("<"++PercentEncodedTopic++">;ct=42"),
URI = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret",
%% Client post to CREATE topic "a/b"
Reply = er_coap_client:request(post, URI, #coap_content{format = <<"application/link-format">>, payload = Payload}),
?LOGT("Reply =~p", [Reply]),
{ok,created, #coap_content{location_path = LocPath}} = Reply,
?assertEqual([<<"/ps/a/b">>] ,LocPath),
%% Client post to CREATE topic "a/b/c"
TopicInPayload1 = <<"a/b/c">>,
PercentEncodedTopic1 = emqx_http_lib:uri_encode(binary_to_list(TopicInPayload1)),
Payload1 = list_to_binary("<"++PercentEncodedTopic1++">;ct=42"),
Reply1 = er_coap_client:request(post, URI, #coap_content{format = <<"application/link-format">>, payload = Payload1}),
?LOGT("Reply =~p", [Reply1]),
{ok,created, #coap_content{location_path = LocPath1}} = Reply1,
?assertEqual([<<"/ps/a/b/c">>] ,LocPath1),
timer:sleep(50),
%% DELETE the topic "a/b"
UriD = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret",
ReplyD = er_coap_client:request(delete, UriD),
?LOGT("Reply=~p", [ReplyD]),
{ok, deleted, #coap_content{}}= ReplyD,
timer:sleep(300), %% Waiting gen_server:cast/2 for deleting operation
?assertEqual(false, emqx_coap_pubsub_topics:is_topic_existed(TopicInPayload)),
?assertEqual(false, emqx_coap_pubsub_topics:is_topic_existed(TopicInPayload1)).
t_case02_delete(_Config) ->
TopicInPayload = <<"a/b">>,
TopicStr = binary_to_list(TopicInPayload),
PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr),
%% DELETE the unexisted topic "a/b"
Uri1 = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret",
Reply1 = er_coap_client:request(delete, Uri1),
?LOGT("Reply=~p", [Reply1]),
{error, not_found} = Reply1.
t_case13_emit_stats_test(_Config) ->
ok.
%%--------------------------------------------------------------------
%% Internal functions
receive_notification() ->
receive
{coap_notify, Pid, N2, Code2, Content2} ->
{coap_notify, Pid, N2, Code2, Content2}
after 2000 ->
receive_notification_timeout
end.
assert_recv(Topic, Payload) ->
receive
{deliver, _, Msg} ->
?assertEqual(Topic, Msg#message.topic),
?assertEqual(Payload, Msg#message.payload)
after
500 ->
?assert(false)
end.