emqx/apps/emqx_gateway/test/emqx_coap_SUITE.erl

290 lines
9.6 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("er_coap_client/include/coap.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"
gateway.coap
{
idle_timeout = 30s
enable_stats = false
mountpoint = \"\"
notify_type = qos
connection_required = true
subscribe_qos = qos1
publish_qos = qos1
listeners.udp.default
{bind = 5683}
}
">>).
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
-define(PS_PREFIX, "coap://127.0.0.1/ps").
-define(MQTT_PREFIX, "coap://127.0.0.1/mqtt").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([emqx_gateway], fun set_special_cfg/1),
Config.
set_special_cfg(emqx_gateway) ->
ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT);
set_special_cfg(_) ->
ok.
end_per_suite(Config) ->
{ok, _} = emqx:remove_config([<<"gateway">>,<<"coap">>]),
emqx_common_test_helpers:stop_apps([emqx_gateway]),
Config.
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_connection(_Config) ->
Action = fun(Channel) ->
%% connection
Token = connection(Channel),
timer:sleep(100),
?assertNotEqual([], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)),
%% heartbeat
HeartURI = ?MQTT_PREFIX ++ "/connection?clientid=client1&token=" ++ Token,
?LOGT("send heartbeat request:~ts~n", [HeartURI]),
{ok, changed, _} = er_coap_client:request(put, HeartURI),
disconnection(Channel, Token),
timer:sleep(100),
?assertEqual([], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>))
end,
do(Action).
t_publish(_Config) ->
Action = fun(Channel, Token) ->
Topic = <<"/abc">>,
Payload = <<"123">>,
TopicStr = binary_to_list(Topic),
URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
%% Sub topic first
emqx:subscribe(Topic),
Req = make_req(post, Payload),
{ok, changed, _} = do_request(Channel, URI, Req),
receive
{deliver, Topic, Msg} ->
?assertEqual(Topic, Msg#message.topic),
?assertEqual(Payload, Msg#message.payload)
after
500 ->
?assert(false)
end
end,
with_connection(Action).
%t_publish_authz_deny(_Config) ->
% Action = fun(Channel, Token) ->
% Topic = <<"/abc">>,
% Payload = <<"123">>,
% InvalidToken = lists:reverse(Token),
%
% TopicStr = binary_to_list(Topic),
% URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ InvalidToken,
%
% %% Sub topic first
% emqx:subscribe(Topic),
%
% Req = make_req(post, Payload),
% Result = do_request(Channel, URI, Req),
% ?assertEqual({error, reset}, Result)
% end,
%
% with_connection(Action).
t_subscribe(_Config) ->
Topic = <<"/abc">>,
Fun = fun(Channel, Token) ->
TopicStr = binary_to_list(Topic),
Payload = <<"123">>,
URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req),
?LOGT("observer topic:~ts~n", [Topic]),
timer:sleep(100),
[SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)),
%% Publish a message
emqx:publish(emqx_message:make(Topic, Payload)),
{ok, content, Notify} = with_response(Channel),
?LOGT("observer get Notif=~p", [Notify]),
#coap_content{payload = PayloadRecv} = Notify,
?assertEqual(Payload, PayloadRecv)
end,
with_connection(Fun),
timer:sleep(100),
?assertEqual([], emqx:subscribers(Topic)).
t_un_subscribe(_Config) ->
Topic = <<"/abc">>,
Fun = fun(Channel, Token) ->
TopicStr = binary_to_list(Topic),
Payload = <<"123">>,
URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req),
?LOGT("observer topic:~ts~n", [Topic]),
timer:sleep(100),
[SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)),
UnReq = make_req(get, Payload, [{observe, 1}]),
{ok, nocontent, _} = do_request(Channel, URI, UnReq),
?LOGT("un observer topic:~ts~n", [Topic]),
timer:sleep(100),
?assertEqual([], emqx:subscribers(Topic))
end,
with_connection(Fun).
t_observe_wildcard(_Config) ->
Fun = fun(Channel, Token) ->
%% resolve_url can't process wildcard with #
Topic = <<"/abc/+">>,
TopicStr = binary_to_list(Topic),
Payload = <<"123">>,
URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req),
?LOGT("observer topic:~ts~n", [Topic]),
timer:sleep(100),
[SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)),
%% Publish a message
PubTopic = <<"/abc/def">>,
emqx:publish(emqx_message:make(PubTopic, Payload)),
{ok, content, Notify} = with_response(Channel),
?LOGT("observer get Notif=~p", [Notify]),
#coap_content{payload = PayloadRecv} = Notify,
?assertEqual(Payload, PayloadRecv)
end,
with_connection(Fun).
connection(Channel) ->
URI = ?MQTT_PREFIX ++ "/connection?clientid=client1&username=admin&password=public",
Req = make_req(post),
{ok, created, Data} = do_request(Channel, URI, Req),
#coap_content{payload = BinToken} = Data,
binary_to_list(BinToken).
disconnection(Channel, Token) ->
%% delete
URI = ?MQTT_PREFIX ++ "/connection?clientid=client1&token=" ++ Token,
Req = make_req(delete),
{ok, deleted, _} = do_request(Channel, URI, Req).
make_req(Method) ->
make_req(Method, <<>>).
make_req(Method, Payload) ->
make_req(Method, Payload, []).
make_req(Method, Payload, Opts) ->
er_coap_message:request(con, Method, Payload, Opts).
do_request(Channel, URI, #coap_message{options = Opts} = Req) ->
{_, _, Path, Query} = er_coap_client:resolve_uri(URI),
Opts2 = [{uri_path, Path}, {uri_query, Query} | Opts],
Req2 = Req#coap_message{options = Opts2},
?LOGT("send request:~ts~nReq:~p~n", [URI, Req2]),
{ok, _} = er_coap_channel:send(Channel, Req2),
with_response(Channel).
with_response(Channel) ->
receive
{coap_response, _ChId, Channel, _Ref, Message=#coap_message{method=Code}} ->
return_response(Code, Message);
{coap_error, _ChId, Channel, _Ref, reset} ->
{error, reset}
after 2000 ->
{error, timeout}
end.
return_response({ok, Code}, Message) ->
{ok, Code, er_coap_message:get_content(Message)};
return_response({error, Code}, #coap_message{payload= <<>>}) ->
{error, Code};
return_response({error, Code}, Message) ->
{error, Code, er_coap_message:get_content(Message)}.
do(Fun) ->
ChId = {{127, 0, 0, 1}, 5683},
{ok, Sock} = er_coap_udp_socket:start_link(),
{ok, Channel} = er_coap_udp_socket:get_channel(Sock, ChId),
%% send and receive
Res = Fun(Channel),
%% terminate the processes
er_coap_channel:close(Channel),
er_coap_udp_socket:close(Sock),
Res.
with_connection(Action) ->
Fun = fun(Channel) ->
Token = connection(Channel),
timer:sleep(100),
Action(Channel, Token),
disconnection(Channel, Token),
timer:sleep(100)
end,
do(Fun).