792 lines
22 KiB
Erlang
792 lines
22 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_coap_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-import(
|
|
emqx_gateway_test_utils,
|
|
[
|
|
request/2,
|
|
request/3
|
|
]
|
|
).
|
|
|
|
-include_lib("er_coap_client/include/coap.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
-define(CONF_DEFAULT, <<
|
|
"\n"
|
|
"gateway.coap\n"
|
|
"{\n"
|
|
" idle_timeout = 30s\n"
|
|
" enable_stats = false\n"
|
|
" mountpoint = \"\"\n"
|
|
" notify_type = qos\n"
|
|
" connection_required = true\n"
|
|
" subscribe_qos = qos1\n"
|
|
" publish_qos = qos1\n"
|
|
"\n"
|
|
" listeners.udp.default\n"
|
|
" {bind = 5683}\n"
|
|
"}\n"
|
|
>>).
|
|
|
|
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
|
|
-define(PS_PREFIX, "coap://127.0.0.1/ps").
|
|
-define(MQTT_PREFIX, "coap://127.0.0.1/mqtt").
|
|
|
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
application:load(emqx_gateway_coap),
|
|
Apps = emqx_cth_suite:start(
|
|
[
|
|
{emqx_conf, ?CONF_DEFAULT},
|
|
emqx_gateway,
|
|
emqx_auth,
|
|
emqx_management,
|
|
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
|
|
],
|
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
|
),
|
|
emqx_common_test_http:create_default_app(),
|
|
[{suite_apps, Apps} | Config].
|
|
|
|
end_per_suite(Config) ->
|
|
emqx_cth_suite:stop(?config(suite_apps, Config)),
|
|
emqx_config:delete_override_conf_files(),
|
|
ok.
|
|
|
|
init_per_testcase(t_connection_with_authn_failed, Config) ->
|
|
ok = meck:new(emqx_access_control, [passthrough]),
|
|
ok = meck:expect(
|
|
emqx_access_control,
|
|
authenticate,
|
|
fun(_) -> {error, bad_username_or_password} end
|
|
),
|
|
Config;
|
|
init_per_testcase(t_heartbeat, Config) ->
|
|
NewHeartbeat = 800,
|
|
OldConf = emqx:get_raw_config([gateway, coap]),
|
|
{ok, _} = emqx_gateway_conf:update_gateway(
|
|
coap,
|
|
OldConf#{<<"heartbeat">> => <<"800ms">>}
|
|
),
|
|
[
|
|
{old_conf, OldConf},
|
|
{new_heartbeat, NewHeartbeat}
|
|
| Config
|
|
];
|
|
init_per_testcase(_, Config) ->
|
|
ok = meck:new(emqx_access_control, [passthrough]),
|
|
Config.
|
|
|
|
end_per_testcase(t_heartbeat, Config) ->
|
|
OldConf = ?config(old_conf, Config),
|
|
{ok, _} = emqx_gateway_conf:update_gateway(coap, OldConf),
|
|
ok;
|
|
end_per_testcase(_, Config) ->
|
|
ok = meck:unload(emqx_access_control),
|
|
Config.
|
|
|
|
default_config() ->
|
|
?CONF_DEFAULT.
|
|
|
|
mqtt_prefix() ->
|
|
?MQTT_PREFIX.
|
|
|
|
ps_prefix() ->
|
|
?PS_PREFIX.
|
|
|
|
restart_coap_with_connection_mode(Bool) ->
|
|
Conf = emqx:get_raw_config([gateway, coap]),
|
|
emqx_gateway_conf:update_gateway(
|
|
coap,
|
|
Conf#{<<"connection_required">> => atom_to_binary(Bool)}
|
|
).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test Cases
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_connection(_) ->
|
|
Action = fun(Channel) ->
|
|
%% connection
|
|
Token = connection(Channel),
|
|
|
|
timer:sleep(100),
|
|
?assertNotEqual(
|
|
[],
|
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
|
),
|
|
|
|
%% heartbeat
|
|
{ok, changed, _} = send_heartbeat(Token),
|
|
|
|
disconnection(Channel, Token),
|
|
|
|
timer:sleep(100),
|
|
?assertEqual(
|
|
[],
|
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
|
)
|
|
end,
|
|
do(Action).
|
|
|
|
t_connection_with_short_param_name(_) ->
|
|
Action = fun(Channel) ->
|
|
%% connection
|
|
Token = connection(Channel, true),
|
|
|
|
timer:sleep(100),
|
|
?assertNotEqual(
|
|
[],
|
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
|
),
|
|
|
|
%% heartbeat
|
|
{ok, changed, _} = send_heartbeat(Token, true),
|
|
|
|
disconnection(Channel, Token, true),
|
|
|
|
timer:sleep(100),
|
|
?assertEqual(
|
|
[],
|
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
|
)
|
|
end,
|
|
do(Action).
|
|
|
|
t_heartbeat(Config) ->
|
|
Heartbeat = ?config(new_heartbeat, Config),
|
|
Action = fun(Channel) ->
|
|
Token = connection(Channel),
|
|
|
|
timer:sleep(100),
|
|
?assertNotEqual(
|
|
[],
|
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
|
),
|
|
|
|
%% must keep client connection alive
|
|
Delay = Heartbeat div 2,
|
|
lists:foreach(
|
|
fun(_) ->
|
|
?assertMatch({ok, changed, _}, send_heartbeat(Token)),
|
|
timer:sleep(Delay)
|
|
end,
|
|
lists:seq(1, 5)
|
|
),
|
|
|
|
?assertNotEqual(
|
|
[],
|
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
|
),
|
|
|
|
timer:sleep(Heartbeat * 2),
|
|
?assertEqual(
|
|
[],
|
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
|
),
|
|
|
|
disconnection(Channel, Token),
|
|
|
|
timer:sleep(100),
|
|
?assertEqual(
|
|
[],
|
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
|
)
|
|
end,
|
|
do(Action).
|
|
|
|
t_connection_optional_params(_) ->
|
|
UsernamePasswordAreOptional =
|
|
fun(Channel) ->
|
|
URI =
|
|
?MQTT_PREFIX ++
|
|
"/connection?clientid=client1",
|
|
Req = make_req(post),
|
|
{ok, created, Data} = do_request(Channel, URI, Req),
|
|
#coap_content{payload = Token0} = Data,
|
|
Token = binary_to_list(Token0),
|
|
|
|
timer:sleep(100),
|
|
?assertNotEqual(
|
|
[],
|
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
|
),
|
|
|
|
disconnection(Channel, Token),
|
|
|
|
timer:sleep(100),
|
|
?assertEqual(
|
|
[],
|
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
|
)
|
|
end,
|
|
ClientIdIsRequired =
|
|
fun(Channel) ->
|
|
URI =
|
|
?MQTT_PREFIX ++
|
|
"/connection",
|
|
Req = make_req(post),
|
|
{error, bad_request, _} = do_request(Channel, URI, Req)
|
|
end,
|
|
do(UsernamePasswordAreOptional),
|
|
do(ClientIdIsRequired).
|
|
|
|
t_connection_with_authn_failed(_) ->
|
|
ChId = {{127, 0, 0, 1}, 5683},
|
|
{ok, Sock} = er_coap_udp_socket:start_link(),
|
|
{ok, Channel} = er_coap_udp_socket:get_channel(Sock, ChId),
|
|
URI =
|
|
?MQTT_PREFIX ++
|
|
"/connection?clientid=client1&username=admin&password=public",
|
|
Req = make_req(post),
|
|
?assertMatch({error, bad_request, _}, do_request(Channel, URI, Req)),
|
|
|
|
timer:sleep(100),
|
|
?assertEqual(
|
|
[],
|
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
|
),
|
|
ok.
|
|
|
|
t_publish(_) ->
|
|
%% can publish to a normal topic
|
|
Topics = [
|
|
<<"abc">>,
|
|
%% can publish to a `/` leading topic
|
|
<<"/abc">>
|
|
],
|
|
Action = fun(Topic, Channel, Token) ->
|
|
Payload = <<"123">>,
|
|
URI = pubsub_uri(binary_to_list(Topic), Token),
|
|
|
|
%% Sub topic first
|
|
emqx:subscribe(Topic),
|
|
|
|
Req = make_req(post, Payload),
|
|
{ok, changed, _} = do_request(Channel, URI, Req),
|
|
|
|
receive
|
|
{deliver, Topic, Msg} ->
|
|
?assertEqual(Topic, Msg#message.topic),
|
|
?assertEqual(Payload, Msg#message.payload)
|
|
after 500 ->
|
|
?assert(false)
|
|
end
|
|
end,
|
|
with_connection(Topics, Action).
|
|
|
|
t_publish_with_retain_qos_expiry(_) ->
|
|
_ = meck:expect(
|
|
emqx_access_control,
|
|
authorize,
|
|
fun(_, #{action_type := publish, qos := 1, retain := true}, _) ->
|
|
allow
|
|
end
|
|
),
|
|
|
|
Topics = [<<"abc">>],
|
|
Action = fun(Topic, Channel, Token) ->
|
|
Payload = <<"123">>,
|
|
URI = pubsub_uri(binary_to_list(Topic), Token) ++ "&retain=true&qos=1&expiry=60",
|
|
|
|
%% Sub topic first
|
|
emqx:subscribe(Topic),
|
|
|
|
Req = make_req(post, Payload),
|
|
{ok, changed, _} = do_request(Channel, URI, Req),
|
|
|
|
receive
|
|
{deliver, Topic, Msg} ->
|
|
?assertEqual(Topic, Msg#message.topic),
|
|
?assertEqual(Payload, Msg#message.payload)
|
|
after 500 ->
|
|
?assert(false)
|
|
end
|
|
end,
|
|
with_connection(Topics, Action),
|
|
|
|
_ = meck:validate(emqx_access_control).
|
|
|
|
t_subscribe(_) ->
|
|
%% can subscribe to a normal topic
|
|
Topics = [
|
|
<<"abc">>,
|
|
%% can subscribe to a `/` leading topic
|
|
<<"/abc">>
|
|
],
|
|
Fun = fun(Topic, Channel, Token) ->
|
|
Payload = <<"123">>,
|
|
URI = pubsub_uri(binary_to_list(Topic), Token),
|
|
Req = make_req(get, Payload, [{observe, 0}]),
|
|
{ok, content, _} = do_request(Channel, URI, Req),
|
|
?LOGT("observer topic:~ts~n", [Topic]),
|
|
|
|
%% ensure subscribe succeed
|
|
timer:sleep(100),
|
|
[SubPid] = emqx:subscribers(Topic),
|
|
?assert(is_pid(SubPid)),
|
|
|
|
%% publish a message
|
|
emqx:publish(emqx_message:make(Topic, Payload)),
|
|
{ok, content, Notify} = with_response(Channel),
|
|
?LOGT("observer get Notif=~p", [Notify]),
|
|
|
|
#coap_content{payload = PayloadRecv} = Notify,
|
|
|
|
?assertEqual(Payload, PayloadRecv)
|
|
end,
|
|
|
|
with_connection(Topics, Fun),
|
|
|
|
%% subscription removed if coap client disconnected
|
|
timer:sleep(100),
|
|
lists:foreach(
|
|
fun(Topic) ->
|
|
?assertEqual([], emqx:subscribers(Topic))
|
|
end,
|
|
Topics
|
|
).
|
|
|
|
t_subscribe_with_qos_opt(_) ->
|
|
Topics = [
|
|
{<<"abc">>, 0},
|
|
{<<"/abc">>, 1},
|
|
{<<"abc/d">>, 2}
|
|
],
|
|
Fun = fun({Topic, Qos}, Channel, Token) ->
|
|
Payload = <<"123">>,
|
|
URI = pubsub_uri(binary_to_list(Topic), Token) ++ "&qos=" ++ integer_to_list(Qos),
|
|
Req = make_req(get, Payload, [{observe, 0}]),
|
|
{ok, content, _} = do_request(Channel, URI, Req),
|
|
?LOGT("observer topic:~ts~n", [Topic]),
|
|
|
|
%% ensure subscribe succeed
|
|
timer:sleep(100),
|
|
[SubPid] = emqx:subscribers(Topic),
|
|
?assert(is_pid(SubPid)),
|
|
?assertEqual(Qos, maps:get(qos, emqx_broker:get_subopts(SubPid, Topic))),
|
|
%% publish a message
|
|
emqx:publish(emqx_message:make(Topic, Payload)),
|
|
{ok, content, Notify} = with_response(Channel),
|
|
?LOGT("observer get Notif=~p", [Notify]),
|
|
|
|
#coap_content{payload = PayloadRecv} = Notify,
|
|
|
|
?assertEqual(Payload, PayloadRecv)
|
|
end,
|
|
|
|
with_connection(Topics, Fun),
|
|
|
|
%% subscription removed if coap client disconnected
|
|
timer:sleep(100),
|
|
lists:foreach(
|
|
fun({Topic, _Qos}) ->
|
|
?assertEqual([], emqx:subscribers(Topic))
|
|
end,
|
|
Topics
|
|
).
|
|
|
|
t_un_subscribe(_) ->
|
|
%% can unsubscribe to a normal topic
|
|
Topics = [
|
|
<<"abc">>,
|
|
%% can unsubscribe to a `/` leading topic
|
|
<<"/abc">>
|
|
],
|
|
Fun = fun(Topic, Channel, Token) ->
|
|
Payload = <<"123">>,
|
|
URI = pubsub_uri(binary_to_list(Topic), Token),
|
|
|
|
Req = make_req(get, Payload, [{observe, 0}]),
|
|
{ok, content, _} = do_request(Channel, URI, Req),
|
|
?LOGT("observer topic:~ts~n", [Topic]),
|
|
|
|
timer:sleep(100),
|
|
[SubPid] = emqx:subscribers(Topic),
|
|
?assert(is_pid(SubPid)),
|
|
|
|
UnReq = make_req(get, Payload, [{observe, 1}]),
|
|
{ok, nocontent, _} = do_request(Channel, URI, UnReq),
|
|
?LOGT("un observer topic:~ts~n", [Topic]),
|
|
timer:sleep(100),
|
|
?assertEqual([], emqx:subscribers(Topic))
|
|
end,
|
|
|
|
with_connection(Topics, Fun).
|
|
|
|
t_observe_wildcard(_) ->
|
|
Fun = fun(Channel, Token) ->
|
|
%% resolve_url can't process wildcard with #
|
|
Topic = <<"abc/+">>,
|
|
Payload = <<"123">>,
|
|
|
|
URI = pubsub_uri(binary_to_list(Topic), Token),
|
|
Req = make_req(get, Payload, [{observe, 0}]),
|
|
{ok, content, _} = do_request(Channel, URI, Req),
|
|
?LOGT("observer topic:~ts~n", [Topic]),
|
|
|
|
timer:sleep(100),
|
|
[SubPid] = emqx:subscribers(Topic),
|
|
?assert(is_pid(SubPid)),
|
|
|
|
%% Publish a message
|
|
PubTopic = <<"abc/def">>,
|
|
emqx:publish(emqx_message:make(PubTopic, Payload)),
|
|
{ok, content, Notify} = with_response(Channel),
|
|
|
|
?LOGT("observer get Notif=~p", [Notify]),
|
|
|
|
#coap_content{payload = PayloadRecv} = Notify,
|
|
|
|
?assertEqual(Payload, PayloadRecv)
|
|
end,
|
|
|
|
with_connection(Fun).
|
|
|
|
t_clients_api(_) ->
|
|
Fun = fun(_Channel, _Token) ->
|
|
ClientId = <<"client1">>,
|
|
%% list
|
|
{200, #{data := [Client1]}} = request(get, "/gateways/coap/clients"),
|
|
#{clientid := ClientId} = Client1,
|
|
%% searching
|
|
{200, #{data := [Client2]}} =
|
|
request(
|
|
get,
|
|
"/gateways/coap/clients",
|
|
[{<<"clientid">>, ClientId}]
|
|
),
|
|
{200, #{data := [Client3]}} =
|
|
request(
|
|
get,
|
|
"/gateways/coap/clients",
|
|
[{<<"like_clientid">>, <<"cli">>}]
|
|
),
|
|
%% lookup
|
|
{200, Client4} =
|
|
request(get, "/gateways/coap/clients/client1"),
|
|
%% assert
|
|
Client1 = Client2 = Client3 = Client4,
|
|
%% kickout
|
|
{204, _} =
|
|
request(delete, "/gateways/coap/clients/client1"),
|
|
timer:sleep(200),
|
|
{200, #{data := []}} = request(get, "/gateways/coap/clients")
|
|
end,
|
|
with_connection(Fun).
|
|
|
|
t_clients_subscription_api(_) ->
|
|
Fun = fun(_Channel, _Token) ->
|
|
Path = "/gateways/coap/clients/client1/subscriptions",
|
|
%% list
|
|
{200, []} = request(get, Path),
|
|
%% create
|
|
SubReq = #{
|
|
topic => <<"tx">>,
|
|
qos => 0,
|
|
nl => 0,
|
|
rap => 0,
|
|
rh => 0
|
|
},
|
|
|
|
{201, SubsResp} = request(post, Path, SubReq),
|
|
{200, [SubsResp2]} = request(get, Path),
|
|
?assertEqual(
|
|
maps:get(topic, SubsResp),
|
|
maps:get(topic, SubsResp2)
|
|
),
|
|
|
|
%% check subscription_cnt
|
|
{200, #{subscriptions_cnt := 1}} = request(get, "/gateways/coap/clients/client1"),
|
|
|
|
{204, _} = request(delete, Path ++ "/tx"),
|
|
|
|
{200, []} = request(get, Path)
|
|
end,
|
|
with_connection(Fun).
|
|
|
|
t_clients_get_subscription_api(_) ->
|
|
Fun = fun(Channel, Token) ->
|
|
Path = "/gateways/coap/clients/client1/subscriptions",
|
|
%% list
|
|
{200, []} = request(get, Path),
|
|
|
|
observe(Channel, Token, true),
|
|
|
|
{200, [Subs]} = request(get, Path),
|
|
|
|
?assertEqual(<<"coap/observe">>, maps:get(topic, Subs)),
|
|
|
|
observe(Channel, Token, false),
|
|
|
|
{200, []} = request(get, Path)
|
|
end,
|
|
with_connection(Fun).
|
|
|
|
t_on_offline_event(_) ->
|
|
Fun = fun(Channel) ->
|
|
emqx_hooks:add('client.connected', {emqx_sys, on_client_connected, []}, 1000),
|
|
emqx_hooks:add('client.disconnected', {emqx_sys, on_client_disconnected, []}, 1000),
|
|
|
|
ConnectedSub = <<"$SYS/brokers/+/gateway/coap/clients/+/connected">>,
|
|
emqx_broker:subscribe(ConnectedSub),
|
|
timer:sleep(100),
|
|
|
|
Token = connection(Channel),
|
|
?assertMatch(#message{}, receive_deliver(500)),
|
|
|
|
DisconnectedSub = <<"$SYS/brokers/+/gateway/coap/clients/+/disconnected">>,
|
|
emqx_broker:subscribe(DisconnectedSub),
|
|
timer:sleep(100),
|
|
|
|
disconnection(Channel, Token),
|
|
|
|
?assertMatch(#message{}, receive_deliver(500)),
|
|
|
|
emqx_broker:unsubscribe(ConnectedSub),
|
|
emqx_broker:unsubscribe(DisconnectedSub),
|
|
|
|
emqx_hooks:del('client.connected', {emqx_sys, on_client_connected}),
|
|
emqx_hooks:del('client.disconnected', {emqx_sys, on_client_disconnected}),
|
|
timer:sleep(500)
|
|
end,
|
|
do(Fun).
|
|
|
|
t_connectionless_pubsub(_) ->
|
|
restart_coap_with_connection_mode(false),
|
|
Fun = fun(Channel) ->
|
|
Topic = <<"t/a">>,
|
|
Payload = <<"123">>,
|
|
URI = pubsub_uri(binary_to_list(Topic)),
|
|
Req = make_req(get, Payload, [{observe, 0}]),
|
|
{ok, content, _} = do_request(Channel, URI, Req),
|
|
?LOGT("observer topic:~ts~n", [Topic]),
|
|
|
|
%% ensure subscribe succeed
|
|
timer:sleep(100),
|
|
[SubPid] = emqx:subscribers(Topic),
|
|
?assert(is_pid(SubPid)),
|
|
|
|
%% publish a message
|
|
Req2 = make_req(post, Payload),
|
|
{ok, changed, _} = do_request(Channel, URI, Req2),
|
|
|
|
{ok, content, Notify} = with_response(Channel),
|
|
?LOGT("observer get Notif=~p", [Notify]),
|
|
|
|
#coap_content{payload = PayloadRecv} = Notify,
|
|
|
|
?assertEqual(Payload, PayloadRecv)
|
|
end,
|
|
do(Fun),
|
|
restart_coap_with_connection_mode(true).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% helpers
|
|
|
|
send_heartbeat(Token) ->
|
|
send_heartbeat(Token, false).
|
|
|
|
send_heartbeat(Token, ShortenParamName) ->
|
|
Prefix = ?MQTT_PREFIX ++ "/connection",
|
|
Queries = #{
|
|
"clientid" => <<"client1">>,
|
|
"token" => Token
|
|
},
|
|
URI = compose_uri(Prefix, Queries, ShortenParamName),
|
|
?LOGT("send heartbeat request:~ts~n", [URI]),
|
|
er_coap_client:request(put, URI).
|
|
|
|
connection(Channel) ->
|
|
connection(Channel, false).
|
|
|
|
connection(Channel, ShortenParamName) ->
|
|
Prefix = ?MQTT_PREFIX ++ "/connection",
|
|
Queries = #{
|
|
"clientid" => <<"client1">>,
|
|
"username" => <<"admin">>,
|
|
"password" => <<"public">>
|
|
},
|
|
URI = compose_uri(Prefix, Queries, ShortenParamName),
|
|
Req = make_req(post),
|
|
{ok, created, Data} = do_request(Channel, URI, Req),
|
|
#coap_content{payload = BinToken} = Data,
|
|
binary_to_list(BinToken).
|
|
|
|
disconnection(Channel, Token) ->
|
|
disconnection(Channel, Token, false).
|
|
|
|
disconnection(Channel, Token, ShortenParamName) ->
|
|
Prefix = ?MQTT_PREFIX ++ "/connection",
|
|
Queries = #{
|
|
"clientid" => <<"client1">>,
|
|
"token" => Token
|
|
},
|
|
URI = compose_uri(Prefix, Queries, ShortenParamName),
|
|
Req = make_req(delete),
|
|
{ok, deleted, _} = do_request(Channel, URI, Req).
|
|
|
|
observe(Channel, Token, Observe) ->
|
|
observe(Channel, Token, Observe, false).
|
|
|
|
observe(Channel, Token, true, ShortenParamName) ->
|
|
Prefix = ?PS_PREFIX ++ "/coap/observe",
|
|
Queries = #{
|
|
"clientid" => <<"client1">>,
|
|
"token" => Token
|
|
},
|
|
URI = compose_uri(Prefix, Queries, ShortenParamName),
|
|
Req = make_req(get, <<>>, [{observe, 0}]),
|
|
{ok, content, _Data} = do_request(Channel, URI, Req),
|
|
ok;
|
|
observe(Channel, Token, false, ShortenParamName) ->
|
|
Prefix = ?PS_PREFIX ++ "/coap/observe",
|
|
Queries = #{
|
|
"clientid" => <<"client1">>,
|
|
"token" => Token
|
|
},
|
|
URI = compose_uri(Prefix, Queries, ShortenParamName),
|
|
Req = make_req(get, <<>>, [{observe, 1}]),
|
|
{ok, nocontent, _Data} = do_request(Channel, URI, Req),
|
|
ok.
|
|
|
|
pubsub_uri(Topic) when is_list(Topic) ->
|
|
?PS_PREFIX ++ "/" ++ Topic.
|
|
|
|
pubsub_uri(Topic, Token) ->
|
|
pubsub_uri(Topic, Token, false).
|
|
|
|
pubsub_uri(Topic, Token, ShortenParamName) when is_list(Topic), is_list(Token) ->
|
|
Prefix = ?PS_PREFIX ++ "/" ++ Topic,
|
|
Queries = #{
|
|
"clientid" => <<"client1">>,
|
|
"token" => Token
|
|
},
|
|
compose_uri(Prefix, Queries, ShortenParamName).
|
|
|
|
make_req(Method) ->
|
|
make_req(Method, <<>>).
|
|
|
|
make_req(Method, Payload) ->
|
|
make_req(Method, Payload, []).
|
|
|
|
make_req(Method, Payload, Opts) ->
|
|
er_coap_message:request(con, Method, Payload, Opts).
|
|
|
|
do_request(Channel, URI, #coap_message{options = Opts} = Req) ->
|
|
{_, _, Path, Query} = er_coap_client:resolve_uri(URI),
|
|
Opts2 = [{uri_path, Path}, {uri_query, Query} | Opts],
|
|
Req2 = Req#coap_message{options = Opts2},
|
|
?LOGT("send request:~ts~nReq:~p~n", [URI, Req2]),
|
|
|
|
{ok, _} = er_coap_channel:send(Channel, Req2),
|
|
with_response(Channel).
|
|
|
|
with_response(Channel) ->
|
|
receive
|
|
{coap_response, _ChId, Channel, _Ref, Message = #coap_message{method = Code}} ->
|
|
return_response(Code, Message);
|
|
{coap_error, _ChId, Channel, _Ref, reset} ->
|
|
{error, reset}
|
|
after 2000 ->
|
|
{error, timeout}
|
|
end.
|
|
|
|
return_response({ok, Code}, Message) ->
|
|
{ok, Code, er_coap_message:get_content(Message)};
|
|
return_response({error, Code}, #coap_message{payload = <<>>}) ->
|
|
{error, Code};
|
|
return_response({error, Code}, Message) ->
|
|
{error, Code, er_coap_message:get_content(Message)}.
|
|
|
|
do(Fun) ->
|
|
ChId = {{127, 0, 0, 1}, 5683},
|
|
{ok, Sock} = er_coap_udp_socket:start_link(),
|
|
{ok, Channel} = er_coap_udp_socket:get_channel(Sock, ChId),
|
|
%% send and receive
|
|
Res = Fun(Channel),
|
|
%% terminate the processes
|
|
er_coap_channel:close(Channel),
|
|
er_coap_udp_socket:close(Sock),
|
|
Res.
|
|
|
|
with_connection(Action) ->
|
|
Fun = fun(Channel) ->
|
|
Token = connection(Channel),
|
|
timer:sleep(100),
|
|
Action(Channel, Token),
|
|
disconnection(Channel, Token),
|
|
timer:sleep(100)
|
|
end,
|
|
do(Fun).
|
|
|
|
with_connection(Checks, Action) ->
|
|
Fun = fun(Channel) ->
|
|
Token = connection(Channel),
|
|
timer:sleep(100),
|
|
lists:foreach(fun(E) -> Action(E, Channel, Token) end, Checks),
|
|
disconnection(Channel, Token),
|
|
timer:sleep(100)
|
|
end,
|
|
do(Fun).
|
|
|
|
receive_deliver(Wait) ->
|
|
receive
|
|
{deliver, _, Msg} ->
|
|
Msg
|
|
after Wait ->
|
|
{error, timeout}
|
|
end.
|
|
|
|
get_field(type, #coap_message{type = Type}) ->
|
|
Type;
|
|
get_field(method, #coap_message{method = Method}) ->
|
|
Method.
|
|
|
|
compose_uri(URI, Queries, ShortenParamName) ->
|
|
Queries1 = shorten_param_name(ShortenParamName, Queries),
|
|
case maps:size(Queries1) of
|
|
0 ->
|
|
URI;
|
|
_ ->
|
|
URI ++ "?" ++ uri_string:compose_query(maps:to_list(Queries1))
|
|
end.
|
|
|
|
shorten_param_name(false, Queries) ->
|
|
Queries;
|
|
shorten_param_name(true, Queries) ->
|
|
lists:foldl(
|
|
fun({Short, Long}, Acc) ->
|
|
case maps:take(Long, Acc) of
|
|
error ->
|
|
Acc;
|
|
{Value, Acc1} ->
|
|
maps:put(Short, Value, Acc1)
|
|
end
|
|
end,
|
|
Queries,
|
|
emqx_coap_message:query_params_mapping_table()
|
|
).
|