emqx/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl

792 lines
22 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-import(
emqx_gateway_test_utils,
[
request/2,
request/3
]
).
-include_lib("er_coap_client/include/coap.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<
"\n"
"gateway.coap\n"
"{\n"
" idle_timeout = 30s\n"
" enable_stats = false\n"
" mountpoint = \"\"\n"
" notify_type = qos\n"
" connection_required = true\n"
" subscribe_qos = qos1\n"
" publish_qos = qos1\n"
"\n"
" listeners.udp.default\n"
" {bind = 5683}\n"
"}\n"
>>).
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
-define(PS_PREFIX, "coap://127.0.0.1/ps").
-define(MQTT_PREFIX, "coap://127.0.0.1/mqtt").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
application:load(emqx_gateway_coap),
Apps = emqx_cth_suite:start(
[
{emqx_conf, ?CONF_DEFAULT},
emqx_gateway,
emqx_auth,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
emqx_common_test_http:create_default_app(),
[{suite_apps, Apps} | Config].
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)),
emqx_config:delete_override_conf_files(),
ok.
init_per_testcase(t_connection_with_authn_failed, Config) ->
ok = meck:new(emqx_access_control, [passthrough]),
ok = meck:expect(
emqx_access_control,
authenticate,
fun(_) -> {error, bad_username_or_password} end
),
Config;
init_per_testcase(t_heartbeat, Config) ->
NewHeartbeat = 800,
OldConf = emqx:get_raw_config([gateway, coap]),
{ok, _} = emqx_gateway_conf:update_gateway(
coap,
OldConf#{<<"heartbeat">> => <<"800ms">>}
),
[
{old_conf, OldConf},
{new_heartbeat, NewHeartbeat}
| Config
];
init_per_testcase(_, Config) ->
ok = meck:new(emqx_access_control, [passthrough]),
Config.
end_per_testcase(t_heartbeat, Config) ->
OldConf = ?config(old_conf, Config),
{ok, _} = emqx_gateway_conf:update_gateway(coap, OldConf),
ok;
end_per_testcase(_, Config) ->
ok = meck:unload(emqx_access_control),
Config.
default_config() ->
?CONF_DEFAULT.
mqtt_prefix() ->
?MQTT_PREFIX.
ps_prefix() ->
?PS_PREFIX.
restart_coap_with_connection_mode(Bool) ->
Conf = emqx:get_raw_config([gateway, coap]),
emqx_gateway_conf:update_gateway(
coap,
Conf#{<<"connection_required">> => atom_to_binary(Bool)}
).
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_connection(_) ->
Action = fun(Channel) ->
%% connection
Token = connection(Channel),
timer:sleep(100),
?assertNotEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
),
%% heartbeat
{ok, changed, _} = send_heartbeat(Token),
disconnection(Channel, Token),
timer:sleep(100),
?assertEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
)
end,
do(Action).
t_connection_with_short_param_name(_) ->
Action = fun(Channel) ->
%% connection
Token = connection(Channel, true),
timer:sleep(100),
?assertNotEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
),
%% heartbeat
{ok, changed, _} = send_heartbeat(Token, true),
disconnection(Channel, Token, true),
timer:sleep(100),
?assertEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
)
end,
do(Action).
t_heartbeat(Config) ->
Heartbeat = ?config(new_heartbeat, Config),
Action = fun(Channel) ->
Token = connection(Channel),
timer:sleep(100),
?assertNotEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
),
%% must keep client connection alive
Delay = Heartbeat div 2,
lists:foreach(
fun(_) ->
?assertMatch({ok, changed, _}, send_heartbeat(Token)),
timer:sleep(Delay)
end,
lists:seq(1, 5)
),
?assertNotEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
),
timer:sleep(Heartbeat * 2),
?assertEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
),
disconnection(Channel, Token),
timer:sleep(100),
?assertEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
)
end,
do(Action).
t_connection_optional_params(_) ->
UsernamePasswordAreOptional =
fun(Channel) ->
URI =
?MQTT_PREFIX ++
"/connection?clientid=client1",
Req = make_req(post),
{ok, created, Data} = do_request(Channel, URI, Req),
#coap_content{payload = Token0} = Data,
Token = binary_to_list(Token0),
timer:sleep(100),
?assertNotEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
),
disconnection(Channel, Token),
timer:sleep(100),
?assertEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
)
end,
ClientIdIsRequired =
fun(Channel) ->
URI =
?MQTT_PREFIX ++
"/connection",
Req = make_req(post),
{error, bad_request, _} = do_request(Channel, URI, Req)
end,
do(UsernamePasswordAreOptional),
do(ClientIdIsRequired).
t_connection_with_authn_failed(_) ->
ChId = {{127, 0, 0, 1}, 5683},
{ok, Sock} = er_coap_udp_socket:start_link(),
{ok, Channel} = er_coap_udp_socket:get_channel(Sock, ChId),
URI =
?MQTT_PREFIX ++
"/connection?clientid=client1&username=admin&password=public",
Req = make_req(post),
?assertMatch({error, bad_request, _}, do_request(Channel, URI, Req)),
timer:sleep(100),
?assertEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
),
ok.
t_publish(_) ->
%% can publish to a normal topic
Topics = [
<<"abc">>,
%% can publish to a `/` leading topic
<<"/abc">>
],
Action = fun(Topic, Channel, Token) ->
Payload = <<"123">>,
URI = pubsub_uri(binary_to_list(Topic), Token),
%% Sub topic first
emqx:subscribe(Topic),
Req = make_req(post, Payload),
{ok, changed, _} = do_request(Channel, URI, Req),
receive
{deliver, Topic, Msg} ->
?assertEqual(Topic, Msg#message.topic),
?assertEqual(Payload, Msg#message.payload)
after 500 ->
?assert(false)
end
end,
with_connection(Topics, Action).
t_publish_with_retain_qos_expiry(_) ->
_ = meck:expect(
emqx_access_control,
authorize,
fun(_, #{action_type := publish, qos := 1, retain := true}, _) ->
allow
end
),
Topics = [<<"abc">>],
Action = fun(Topic, Channel, Token) ->
Payload = <<"123">>,
URI = pubsub_uri(binary_to_list(Topic), Token) ++ "&retain=true&qos=1&expiry=60",
%% Sub topic first
emqx:subscribe(Topic),
Req = make_req(post, Payload),
{ok, changed, _} = do_request(Channel, URI, Req),
receive
{deliver, Topic, Msg} ->
?assertEqual(Topic, Msg#message.topic),
?assertEqual(Payload, Msg#message.payload)
after 500 ->
?assert(false)
end
end,
with_connection(Topics, Action),
_ = meck:validate(emqx_access_control).
t_subscribe(_) ->
%% can subscribe to a normal topic
Topics = [
<<"abc">>,
%% can subscribe to a `/` leading topic
<<"/abc">>
],
Fun = fun(Topic, Channel, Token) ->
Payload = <<"123">>,
URI = pubsub_uri(binary_to_list(Topic), Token),
Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req),
?LOGT("observer topic:~ts~n", [Topic]),
%% ensure subscribe succeed
timer:sleep(100),
[SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)),
%% publish a message
emqx:publish(emqx_message:make(Topic, Payload)),
{ok, content, Notify} = with_response(Channel),
?LOGT("observer get Notif=~p", [Notify]),
#coap_content{payload = PayloadRecv} = Notify,
?assertEqual(Payload, PayloadRecv)
end,
with_connection(Topics, Fun),
%% subscription removed if coap client disconnected
timer:sleep(100),
lists:foreach(
fun(Topic) ->
?assertEqual([], emqx:subscribers(Topic))
end,
Topics
).
t_subscribe_with_qos_opt(_) ->
Topics = [
{<<"abc">>, 0},
{<<"/abc">>, 1},
{<<"abc/d">>, 2}
],
Fun = fun({Topic, Qos}, Channel, Token) ->
Payload = <<"123">>,
URI = pubsub_uri(binary_to_list(Topic), Token) ++ "&qos=" ++ integer_to_list(Qos),
Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req),
?LOGT("observer topic:~ts~n", [Topic]),
%% ensure subscribe succeed
timer:sleep(100),
[SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)),
?assertEqual(Qos, maps:get(qos, emqx_broker:get_subopts(SubPid, Topic))),
%% publish a message
emqx:publish(emqx_message:make(Topic, Payload)),
{ok, content, Notify} = with_response(Channel),
?LOGT("observer get Notif=~p", [Notify]),
#coap_content{payload = PayloadRecv} = Notify,
?assertEqual(Payload, PayloadRecv)
end,
with_connection(Topics, Fun),
%% subscription removed if coap client disconnected
timer:sleep(100),
lists:foreach(
fun({Topic, _Qos}) ->
?assertEqual([], emqx:subscribers(Topic))
end,
Topics
).
t_un_subscribe(_) ->
%% can unsubscribe to a normal topic
Topics = [
<<"abc">>,
%% can unsubscribe to a `/` leading topic
<<"/abc">>
],
Fun = fun(Topic, Channel, Token) ->
Payload = <<"123">>,
URI = pubsub_uri(binary_to_list(Topic), Token),
Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req),
?LOGT("observer topic:~ts~n", [Topic]),
timer:sleep(100),
[SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)),
UnReq = make_req(get, Payload, [{observe, 1}]),
{ok, nocontent, _} = do_request(Channel, URI, UnReq),
?LOGT("un observer topic:~ts~n", [Topic]),
timer:sleep(100),
?assertEqual([], emqx:subscribers(Topic))
end,
with_connection(Topics, Fun).
t_observe_wildcard(_) ->
Fun = fun(Channel, Token) ->
%% resolve_url can't process wildcard with #
Topic = <<"abc/+">>,
Payload = <<"123">>,
URI = pubsub_uri(binary_to_list(Topic), Token),
Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req),
?LOGT("observer topic:~ts~n", [Topic]),
timer:sleep(100),
[SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)),
%% Publish a message
PubTopic = <<"abc/def">>,
emqx:publish(emqx_message:make(PubTopic, Payload)),
{ok, content, Notify} = with_response(Channel),
?LOGT("observer get Notif=~p", [Notify]),
#coap_content{payload = PayloadRecv} = Notify,
?assertEqual(Payload, PayloadRecv)
end,
with_connection(Fun).
t_clients_api(_) ->
Fun = fun(_Channel, _Token) ->
ClientId = <<"client1">>,
%% list
{200, #{data := [Client1]}} = request(get, "/gateways/coap/clients"),
#{clientid := ClientId} = Client1,
%% searching
{200, #{data := [Client2]}} =
request(
get,
"/gateways/coap/clients",
[{<<"clientid">>, ClientId}]
),
{200, #{data := [Client3]}} =
request(
get,
"/gateways/coap/clients",
[{<<"like_clientid">>, <<"cli">>}]
),
%% lookup
{200, Client4} =
request(get, "/gateways/coap/clients/client1"),
%% assert
Client1 = Client2 = Client3 = Client4,
%% kickout
{204, _} =
request(delete, "/gateways/coap/clients/client1"),
timer:sleep(200),
{200, #{data := []}} = request(get, "/gateways/coap/clients")
end,
with_connection(Fun).
t_clients_subscription_api(_) ->
Fun = fun(_Channel, _Token) ->
Path = "/gateways/coap/clients/client1/subscriptions",
%% list
{200, []} = request(get, Path),
%% create
SubReq = #{
topic => <<"tx">>,
qos => 0,
nl => 0,
rap => 0,
rh => 0
},
{201, SubsResp} = request(post, Path, SubReq),
{200, [SubsResp2]} = request(get, Path),
?assertEqual(
maps:get(topic, SubsResp),
maps:get(topic, SubsResp2)
),
%% check subscription_cnt
{200, #{subscriptions_cnt := 1}} = request(get, "/gateways/coap/clients/client1"),
{204, _} = request(delete, Path ++ "/tx"),
{200, []} = request(get, Path)
end,
with_connection(Fun).
t_clients_get_subscription_api(_) ->
Fun = fun(Channel, Token) ->
Path = "/gateways/coap/clients/client1/subscriptions",
%% list
{200, []} = request(get, Path),
observe(Channel, Token, true),
{200, [Subs]} = request(get, Path),
?assertEqual(<<"coap/observe">>, maps:get(topic, Subs)),
observe(Channel, Token, false),
{200, []} = request(get, Path)
end,
with_connection(Fun).
t_on_offline_event(_) ->
Fun = fun(Channel) ->
emqx_hooks:add('client.connected', {emqx_sys, on_client_connected, []}, 1000),
emqx_hooks:add('client.disconnected', {emqx_sys, on_client_disconnected, []}, 1000),
ConnectedSub = <<"$SYS/brokers/+/gateway/coap/clients/+/connected">>,
emqx_broker:subscribe(ConnectedSub),
timer:sleep(100),
Token = connection(Channel),
?assertMatch(#message{}, receive_deliver(500)),
DisconnectedSub = <<"$SYS/brokers/+/gateway/coap/clients/+/disconnected">>,
emqx_broker:subscribe(DisconnectedSub),
timer:sleep(100),
disconnection(Channel, Token),
?assertMatch(#message{}, receive_deliver(500)),
emqx_broker:unsubscribe(ConnectedSub),
emqx_broker:unsubscribe(DisconnectedSub),
emqx_hooks:del('client.connected', {emqx_sys, on_client_connected}),
emqx_hooks:del('client.disconnected', {emqx_sys, on_client_disconnected}),
timer:sleep(500)
end,
do(Fun).
t_connectionless_pubsub(_) ->
restart_coap_with_connection_mode(false),
Fun = fun(Channel) ->
Topic = <<"t/a">>,
Payload = <<"123">>,
URI = pubsub_uri(binary_to_list(Topic)),
Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req),
?LOGT("observer topic:~ts~n", [Topic]),
%% ensure subscribe succeed
timer:sleep(100),
[SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)),
%% publish a message
Req2 = make_req(post, Payload),
{ok, changed, _} = do_request(Channel, URI, Req2),
{ok, content, Notify} = with_response(Channel),
?LOGT("observer get Notif=~p", [Notify]),
#coap_content{payload = PayloadRecv} = Notify,
?assertEqual(Payload, PayloadRecv)
end,
do(Fun),
restart_coap_with_connection_mode(true).
%%--------------------------------------------------------------------
%% helpers
send_heartbeat(Token) ->
send_heartbeat(Token, false).
send_heartbeat(Token, ShortenParamName) ->
Prefix = ?MQTT_PREFIX ++ "/connection",
Queries = #{
"clientid" => <<"client1">>,
"token" => Token
},
URI = compose_uri(Prefix, Queries, ShortenParamName),
?LOGT("send heartbeat request:~ts~n", [URI]),
er_coap_client:request(put, URI).
connection(Channel) ->
connection(Channel, false).
connection(Channel, ShortenParamName) ->
Prefix = ?MQTT_PREFIX ++ "/connection",
Queries = #{
"clientid" => <<"client1">>,
"username" => <<"admin">>,
"password" => <<"public">>
},
URI = compose_uri(Prefix, Queries, ShortenParamName),
Req = make_req(post),
{ok, created, Data} = do_request(Channel, URI, Req),
#coap_content{payload = BinToken} = Data,
binary_to_list(BinToken).
disconnection(Channel, Token) ->
disconnection(Channel, Token, false).
disconnection(Channel, Token, ShortenParamName) ->
Prefix = ?MQTT_PREFIX ++ "/connection",
Queries = #{
"clientid" => <<"client1">>,
"token" => Token
},
URI = compose_uri(Prefix, Queries, ShortenParamName),
Req = make_req(delete),
{ok, deleted, _} = do_request(Channel, URI, Req).
observe(Channel, Token, Observe) ->
observe(Channel, Token, Observe, false).
observe(Channel, Token, true, ShortenParamName) ->
Prefix = ?PS_PREFIX ++ "/coap/observe",
Queries = #{
"clientid" => <<"client1">>,
"token" => Token
},
URI = compose_uri(Prefix, Queries, ShortenParamName),
Req = make_req(get, <<>>, [{observe, 0}]),
{ok, content, _Data} = do_request(Channel, URI, Req),
ok;
observe(Channel, Token, false, ShortenParamName) ->
Prefix = ?PS_PREFIX ++ "/coap/observe",
Queries = #{
"clientid" => <<"client1">>,
"token" => Token
},
URI = compose_uri(Prefix, Queries, ShortenParamName),
Req = make_req(get, <<>>, [{observe, 1}]),
{ok, nocontent, _Data} = do_request(Channel, URI, Req),
ok.
pubsub_uri(Topic) when is_list(Topic) ->
?PS_PREFIX ++ "/" ++ Topic.
pubsub_uri(Topic, Token) ->
pubsub_uri(Topic, Token, false).
pubsub_uri(Topic, Token, ShortenParamName) when is_list(Topic), is_list(Token) ->
Prefix = ?PS_PREFIX ++ "/" ++ Topic,
Queries = #{
"clientid" => <<"client1">>,
"token" => Token
},
compose_uri(Prefix, Queries, ShortenParamName).
make_req(Method) ->
make_req(Method, <<>>).
make_req(Method, Payload) ->
make_req(Method, Payload, []).
make_req(Method, Payload, Opts) ->
er_coap_message:request(con, Method, Payload, Opts).
do_request(Channel, URI, #coap_message{options = Opts} = Req) ->
{_, _, Path, Query} = er_coap_client:resolve_uri(URI),
Opts2 = [{uri_path, Path}, {uri_query, Query} | Opts],
Req2 = Req#coap_message{options = Opts2},
?LOGT("send request:~ts~nReq:~p~n", [URI, Req2]),
{ok, _} = er_coap_channel:send(Channel, Req2),
with_response(Channel).
with_response(Channel) ->
receive
{coap_response, _ChId, Channel, _Ref, Message = #coap_message{method = Code}} ->
return_response(Code, Message);
{coap_error, _ChId, Channel, _Ref, reset} ->
{error, reset}
after 2000 ->
{error, timeout}
end.
return_response({ok, Code}, Message) ->
{ok, Code, er_coap_message:get_content(Message)};
return_response({error, Code}, #coap_message{payload = <<>>}) ->
{error, Code};
return_response({error, Code}, Message) ->
{error, Code, er_coap_message:get_content(Message)}.
do(Fun) ->
ChId = {{127, 0, 0, 1}, 5683},
{ok, Sock} = er_coap_udp_socket:start_link(),
{ok, Channel} = er_coap_udp_socket:get_channel(Sock, ChId),
%% send and receive
Res = Fun(Channel),
%% terminate the processes
er_coap_channel:close(Channel),
er_coap_udp_socket:close(Sock),
Res.
with_connection(Action) ->
Fun = fun(Channel) ->
Token = connection(Channel),
timer:sleep(100),
Action(Channel, Token),
disconnection(Channel, Token),
timer:sleep(100)
end,
do(Fun).
with_connection(Checks, Action) ->
Fun = fun(Channel) ->
Token = connection(Channel),
timer:sleep(100),
lists:foreach(fun(E) -> Action(E, Channel, Token) end, Checks),
disconnection(Channel, Token),
timer:sleep(100)
end,
do(Fun).
receive_deliver(Wait) ->
receive
{deliver, _, Msg} ->
Msg
after Wait ->
{error, timeout}
end.
get_field(type, #coap_message{type = Type}) ->
Type;
get_field(method, #coap_message{method = Method}) ->
Method.
compose_uri(URI, Queries, ShortenParamName) ->
Queries1 = shorten_param_name(ShortenParamName, Queries),
case maps:size(Queries1) of
0 ->
URI;
_ ->
URI ++ "?" ++ uri_string:compose_query(maps:to_list(Queries1))
end.
shorten_param_name(false, Queries) ->
Queries;
shorten_param_name(true, Queries) ->
lists:foldl(
fun({Short, Long}, Acc) ->
case maps:take(Long, Acc) of
error ->
Acc;
{Value, Acc1} ->
maps:put(Short, Value, Acc1)
end
end,
Queries,
emqx_coap_message:query_params_mapping_table()
).