From 89f48f89ebe18e24ddfebbc7cf375b9c32151f07 Mon Sep 17 00:00:00 2001 From: lafirest Date: Fri, 3 Sep 2021 13:55:05 +0800 Subject: [PATCH] feat(emqx_coap): add emqx_coap_api 1. add a request api for emqx_coap 2. fix some emqx_coap logic error --- apps/emqx_gateway/etc/emqx_gateway.conf | 1 - apps/emqx_gateway/src/coap/emqx_coap_api.erl | 145 ++++++++++++ .../src/coap/emqx_coap_channel.erl | 69 ++++-- .../src/coap/emqx_coap_session.erl | 10 +- apps/emqx_gateway/src/coap/emqx_coap_tm.erl | 12 +- .../src/coap/emqx_coap_transport.erl | 11 +- .../coap/handler/emqx_coap_pubsub_handler.erl | 28 ++- apps/emqx_gateway/src/emqx_gateway_app.erl | 2 +- .../emqx_gateway/src/emqx_gateway_metrics.erl | 2 +- .../emqx_gateway/test/emqx_coap_api_SUITE.erl | 224 ++++++++++++++++++ .../test/emqx_mgmt_api_test_util.erl | 27 ++- 11 files changed, 478 insertions(+), 53 deletions(-) create mode 100644 apps/emqx_gateway/src/coap/emqx_coap_api.erl create mode 100644 apps/emqx_gateway/test/emqx_coap_api_SUITE.erl diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index 6fdadcc3b..5212d319f 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -59,7 +59,6 @@ gateway.coap { ## When publishing or subscribing, prefix all topics with a mountpoint string. mountpoint = "" - heartbeat = 30s notify_type = qos ## if true, you need to establish a connection before use diff --git a/apps/emqx_gateway/src/coap/emqx_coap_api.erl b/apps/emqx_gateway/src/coap/emqx_coap_api.erl new file mode 100644 index 000000000..428e99ac5 --- /dev/null +++ b/apps/emqx_gateway/src/coap/emqx_coap_api.erl @@ -0,0 +1,145 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_coap_api). + +-behaviour(minirest_api). + +-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). + +%% API +-export([api_spec/0]). + +-export([request/2]). + +-define(PREFIX, "/gateway/coap/:clientid"). +-define(DEF_WAIT_TIME, 10). + +-import(emqx_mgmt_util, [ schema/1 + , schema/2 + , object_schema/1 + , object_schema/2 + , error_schema/2 + , properties/1]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- +api_spec() -> + {[request_api()], []}. + +request_api() -> + Metadata = #{post => request_method_meta()}, + {?PREFIX ++ "/request", Metadata, request}. + +request(post, #{body := Body, bindings := Bindings}) -> + ClientId = maps:get(clientid, Bindings, undefined), + + Method = maps:get(<<"method">>, Body, <<"get">>), + CT = maps:get(<<"content_type">>, Body, <<"text/plain">>), + Token = maps:get(<<"token">>, Body, <<>>), + Payload = maps:get(<<"payload">>, Body, <<>>), + WaitTime = maps:get(<<"timeout">>, Body, ?DEF_WAIT_TIME), + + Payload2 = parse_payload(CT, Payload), + ReqType = erlang:binary_to_atom(Method), + + Msg = emqx_coap_message:request(con, + ReqType, Payload2, #{content_format => CT}), + + Msg2 = Msg#coap_message{token = Token}, + + case call_client(ClientId, Msg2, timer:seconds(WaitTime)) of + timeout -> + {504}; + not_found -> + {404}; + Response -> + {200, format_to_response(CT, Response)} + end. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +request_parameters() -> + [#{name => clientid, + in => path, + schema => #{type => string}, + required => true}]. + +request_properties() -> + properties([ {token, string, "message token, can be empty"} + , {method, string, "request method type", ["get", "put", "post", "delete"]} + , {timeout, integer, "timespan for response"} + , {content_type, string, "payload type", + [<<"text/plain">>, <<"application/json">>, <<"application/octet-stream">>]} + , {payload, string, "payload"}]). + +coap_message_properties() -> + properties([ {id, integer, "message id"} + , {token, string, "message token, can be empty"} + , {method, string, "response code"} + , {payload, string, "payload"}]). + +request_method_meta() -> + #{description => <<"lookup matching messages">>, + parameters => request_parameters(), + 'requestBody' => object_schema(request_properties(), + <<"request payload, binary must encode by base64">>), + responses => #{ + <<"200">> => object_schema(coap_message_properties()), + <<"404">> => schema(<<"NotFound">>), + <<"504">> => schema(<<"Timeout">>) + }}. + + +format_to_response(ContentType, #coap_message{id = Id, + token = Token, + method = Method, + payload = Payload}) -> + #{id => Id, + token => Token, + method => format_to_binary(Method), + payload => format_payload(ContentType, Payload)}. + +format_to_binary(Obj) -> + erlang:list_to_binary(io_lib:format("~p", [Obj])). + +format_payload(<<"application/octet-stream">>, Payload) -> + base64:encode(Payload); + +format_payload(_, Payload) -> + Payload. + +parse_payload(<<"application/octet-stream">>, Body) -> + base64:decode(Body); + +parse_payload(_, Body) -> + Body. + +call_client(ClientId, Msg, Timeout) -> + case emqx_gateway_cm_registry:lookup_channels(coap, ClientId) of + [Channel | _] -> + RequestId = emqx_coap_channel:send_request(Channel, Msg), + case gen_server:wait_response(RequestId, Timeout) of + {reply, Reply} -> + Reply; + _ -> + timeout + end; + _ -> + not_found + end. diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index 11aca8cc8..112efdc44 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -25,7 +25,10 @@ -export([ info/1 , info/2 , stats/1 - , validator/4]). + , validator/4 + , metrics_inc/2 + , run_hooks/3 + , send_request/2]). -export([ init/2 , handle_in/2 @@ -57,7 +60,7 @@ connection_required :: boolean(), - conn_state :: idle | connected, + conn_state :: idle | connected | disconnected, token :: binary() | undefined }). @@ -99,7 +102,7 @@ init(ConnInfo = #{peername := {PeerHost, _}, sockname := {_, SockPort}}, #{ctx := Ctx} = Config) -> Peercert = maps:get(peercert, ConnInfo, undefined), - Mountpoint = maps:get(mountpoint, Config, undefined), + Mountpoint = maps:get(mountpoint, Config, <<>>), ClientInfo = set_peercert_infos( Peercert, #{ zone => default @@ -128,6 +131,10 @@ init(ConnInfo = #{peername := {PeerHost, _}, validator(Type, Topic, Ctx, ClientInfo) -> emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic). +-spec send_request(pid(), emqx_coap_message()) -> any(). +send_request(Channel, Request) -> + gen_server:send_request(Channel, {?FUNCTION_NAME, Request}). + %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- @@ -143,8 +150,9 @@ handle_in(Msg, ChannleT) -> %%-------------------------------------------------------------------- %% Handle Delivers from broker to client %%-------------------------------------------------------------------- -handle_deliver(Delivers, Channel) -> - call_session(deliver, Delivers, Channel). +handle_deliver(Delivers, #channel{session = Session, + ctx = Ctx} = Channel) -> + handle_result(emqx_coap_session:deliver(Delivers, Ctx, Session), Channel). %%-------------------------------------------------------------------- %% Handle timeout @@ -155,7 +163,7 @@ handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel Channel2 = ensure_keepalive_timer(fun make_timer/4, Channel), {ok, Channel2#channel{keepalive = NewKeepAlive}}; {error, timeout} -> - {shutdown, timeout, Channel} + {shutdown, timeout, ensure_disconnected(keepalive_timeout, Channel)} end; handle_timeout(_, {transport, Msg}, Channel) -> @@ -170,6 +178,10 @@ handle_timeout(_, _, Channel) -> %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- +handle_call({send_request, Msg}, From, Channel) -> + Result = call_session(handle_out, {{send_request, From}, Msg}, Channel), + erlang:setelement(1, Result, noreply); + handle_call(Req, _From, Channel) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, Channel}. @@ -184,6 +196,9 @@ handle_cast(Req, Channel) -> %%-------------------------------------------------------------------- %% Handle Info %%-------------------------------------------------------------------- +handle_info({subscribe, _}, Channel) -> + {ok, Channel}; + handle_info(Info, Channel) -> ?LOG(error, "Unexpected info: ~p", [Info]), {ok, Channel}. @@ -191,8 +206,10 @@ handle_info(Info, Channel) -> %%-------------------------------------------------------------------- %% Terminate %%-------------------------------------------------------------------- -terminate(_Reason, _Channel) -> - ok. +terminate(Reason, #channel{clientinfo = ClientInfo, + ctx = Ctx, + session = Session}) -> + run_hooks(Ctx, 'session.terminated', [ClientInfo, Reason, Session]). %%-------------------------------------------------------------------- %% Internal functions @@ -242,17 +259,17 @@ check_token(true, try_takeover(CState, DesireId, Msg, Channel); _ -> Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg), - {ok, {outgoing, Reply}, Msg} + {ok, {outgoing, Reply}, Channel} end; check_token(false, Msg, Channel) -> case emqx_coap_message:get_option(uri_query, Msg) of #{<<"clientid">> := _} -> Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg), - {ok, {outgoing, Reply}, Msg}; + {ok, {outgoing, Reply}, Channel}; #{<<"token">> := _} -> Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg), - {ok, {outgoing, Reply}, Msg}; + {ok, {outgoing, Reply}, Channel}; _ -> call_session(handle_request, Msg, Channel) end. @@ -322,11 +339,9 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx, {error, Reason} end. -fix_mountpoint(_Packet, #{mountpoint := undefined} = ClientInfo) -> +fix_mountpoint(_Packet, #{mountpoint := <<>>} = ClientInfo) -> {ok, ClientInfo}; fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) -> - %% TODO: Enrich the varibale replacement???? - %% i.e: ${ClientInfo.auth_result.productKey} Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), {ok, ClientInfo#{mountpoint := Mountpoint1}}. @@ -338,6 +353,7 @@ ensure_connected(Channel = #channel{ctx = Ctx, , proto_ver => <<"1">> }, ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), + _ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, []]), Channel#channel{conninfo = NConnInfo}. process_connect(#channel{ctx = Ctx, @@ -374,19 +390,32 @@ run_hooks(Ctx, Name, Args, Acc) -> emqx_gateway_ctx:metrics_inc(Ctx, Name), emqx_hooks:run_fold(Name, Args, Acc). +metrics_inc(Name, Ctx) -> + emqx_gateway_ctx:metrics_inc(Ctx, Name). + +ensure_disconnected(Reason, Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo}) -> + NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, + ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]), + Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. + %%-------------------------------------------------------------------- %% Call Chain %%-------------------------------------------------------------------- -call_session(Fun, - Msg, - #channel{session = Session} = Channel) -> +call_session(Fun, Msg, #channel{session = Session} = Channel) -> + Result = emqx_coap_session:Fun(Msg, Session), + handle_result(Result, Channel). + +handle_result(Result, Channel) -> iter([ session, fun process_session/4 , proto, fun process_protocol/4 , reply, fun process_reply/4 , out, fun process_out/4 , fun process_nothing/3 ], - emqx_coap_session:Fun(Msg, Session), + Result, Channel). call_handler(request, Msg, Result, @@ -406,6 +435,10 @@ call_handler(request, Msg, Result, maps:merge(Result, HandlerResult), Channel); +call_handler(response, {{send_request, From}, Response}, Result, Channel, Iter) -> + gen_server:reply(From, Response), + iter(Iter, Result, Channel); + call_handler(_, _, Result, Channel, Iter) -> iter(Iter, Result, Channel). diff --git a/apps/emqx_gateway/src/coap/emqx_coap_session.erl b/apps/emqx_gateway/src/coap/emqx_coap_session.erl index b7e6c53f4..0fbc47cf8 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_session.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_session.erl @@ -33,7 +33,7 @@ , handle_response/2 , handle_out/2 , set_reply/2 - , deliver/2 + , deliver/3 , timeout/2]). -export_type([session/0]). @@ -66,6 +66,7 @@ ]). -import(emqx_coap_medium, [iter/3]). +-import(emqx_coap_channel, [metrics_inc/2]). %%%------------------------------------------------------------------- %%% API @@ -147,13 +148,16 @@ set_reply(Msg, #session{transport_manager = TM} = Session) -> TM2 = emqx_coap_tm:set_reply(Msg, TM), Session#session{transport_manager = TM2}. -deliver(Delivers, #session{observe_manager = OM, - transport_manager = TM} = Session) -> +deliver(Delivers, Ctx, #session{observe_manager = OM, + transport_manager = TM} = Session) -> Fun = fun({_, Topic, Message}, {OutAcc, OMAcc, TMAcc} = Acc) -> case emqx_coap_observe_res:res_changed(Topic, OMAcc) of undefined -> + metrics_inc('delivery.dropped', Ctx), + metrics_inc('delivery.dropped.no_subid', Ctx), Acc; {Token, SeqId, OM2} -> + metrics_inc('messages.delivered', Ctx), Msg = mqtt_to_coap(Message, Token, SeqId), #{out := Out, tm := TM2} = emqx_coap_tm:handle_out(Msg, TMAcc), {Out ++ OutAcc, OM2, TM2} diff --git a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl index bdc061b1d..b5e4deb7f 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl @@ -108,6 +108,9 @@ handle_response(#coap_message{type = Type, id = MsgId, token = Token} = Msg, TM) end. %% send to a client, msg can be request/piggyback/separate/notify +handle_out({Ctx, Msg}, TM) -> + handle_out(Msg, Ctx, TM); + handle_out(Msg, TM) -> handle_out(Msg, undefined, TM). @@ -119,8 +122,8 @@ handle_out(#coap_message{token = Token} = MsgT, Ctx, TM) -> %% TODO why find by token ? case find_machine_by_keys([Id, TokenId], TM2) of undefined -> - {Machine, TM3} = new_out_machine(Id, Msg, TM), - process_event(out, {Ctx, Msg}, TM3, Machine); + {Machine, TM3} = new_out_machine(Id, Ctx, Msg, TM2), + process_event(out, Msg, TM3, Machine); _ -> %% ignore repeat send empty() @@ -293,9 +296,10 @@ new_in_machine(MachineId, #{seq_id := SeqId} = Manager) -> SeqId => Machine, MachineId => SeqId}}. --spec new_out_machine(state_machine_key(), emqx_coap_message(), manager()) -> +-spec new_out_machine(state_machine_key(), any(), emqx_coap_message(), manager()) -> {state_machine(), manager()}. new_out_machine(MachineId, + Ctx, #coap_message{type = Type, token = Token, options = Opts}, #{seq_id := SeqId} = Manager) -> Observe = maps:get(observe, Opts, undefined), @@ -305,7 +309,7 @@ new_out_machine(MachineId, , observe = Observe , state = idle , timers = #{} - , transport = emqx_coap_transport:new()}, + , transport = emqx_coap_transport:new(Ctx)}, Manager2 = Manager#{seq_id := SeqId + 1, SeqId => Machine, diff --git a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl index eb7ce9bd4..2e858a2e1 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl @@ -20,7 +20,7 @@ -type transport() :: #transport{}. --export([ new/0, idle/3, maybe_reset/3, set_cache/2 +-export([ new/0, new/1, idle/3, maybe_reset/3, set_cache/2 , maybe_resend_4request/3, wait_ack/3, until_stop/3 , observe/3, maybe_resend_4response/3]). @@ -33,9 +33,13 @@ -spec new() -> transport(). new() -> + new(undefined). + +new(ReqCtx) -> #transport{cache = undefined, retry_interval = 0, - retry_count = 0}. + retry_count = 0, + req_context = ReqCtx}. idle(in, #coap_message{type = non, method = Method} = Msg, @@ -62,9 +66,6 @@ idle(in, timeouts =>[{stop_timeout, ?EXCHANGE_LIFETIME}]}) end; -idle(out, {Ctx, Msg}, Transport) -> - idle(out, Msg, Transport#transport{req_context = Ctx}); - idle(out, #coap_message{type = non} = Msg, _) -> out(Msg, #{next => maybe_reset, timeouts => [{stop_timeout, ?NON_LIFETIME}]}); diff --git a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl index ca734993a..85cf32c6d 100644 --- a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl +++ b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl @@ -24,11 +24,14 @@ -import(emqx_coap_message, [response/2, response/3]). -import(emqx_coap_medium, [reply/2, reply/3]). +-import(emqx_coap_channel, [run_hooks/3]). -define(UNSUB(Topic, Msg), #{subscribe => {Topic, Msg}}). -define(SUB(Topic, Token, Msg), #{subscribe => {{Topic, Token}, Msg}}). -define(SUBOPTS, #{qos => 0, rh => 0, rap => 0, nl => 0, is_new => false}). +%% TODO maybe can merge this code into emqx_coap_session, simplify the call chain + handle_request(Path, #coap_message{method = Method} = Msg, Ctx, CInfo) -> case check_topic(Path) of {ok, Topic} -> @@ -42,7 +45,7 @@ handle_method(get, Topic, Msg, Ctx, CInfo) -> 0 -> subscribe(Msg, Topic, Ctx, CInfo); 1 -> - unsubscribe(Msg, Topic, CInfo); + unsubscribe(Msg, Topic, Ctx, CInfo); _ -> reply({error, bad_request}, <<"invalid observe value">>, Msg) end; @@ -51,8 +54,9 @@ handle_method(post, Topic, #coap_message{payload = Payload} = Msg, Ctx, CInfo) - case emqx_coap_channel:validator(publish, Topic, Ctx, CInfo) of allow -> #{clientid := ClientId} = CInfo, + MountTopic = mount(CInfo, Topic), QOS = get_publish_qos(Msg), - MQTTMsg = emqx_message:make(ClientId, QOS, Topic, Payload), + MQTTMsg = emqx_message:make(ClientId, QOS, MountTopic, Payload), MQTTMsg2 = apply_publish_opts(Msg, MQTTMsg), _ = emqx_broker:publish(MQTTMsg2), reply({ok, changed}, Msg); @@ -139,15 +143,19 @@ subscribe(#coap_message{token = Token} = Msg, Topic, Ctx, CInfo) -> allow -> #{clientid := ClientId} = CInfo, SubOpts = get_sub_opts(Msg), - emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', - [CInfo, Topic, SubOpts]), - ?SUB(Topic, Token, Msg); + MountTopic = mount(CInfo, Topic), + emqx_broker:subscribe(MountTopic, ClientId, SubOpts), + run_hooks(Ctx, 'session.subscribed', [CInfo, Topic, SubOpts]), + ?SUB(MountTopic, Token, Msg); _ -> reply({error, unauthorized}, Msg) end. -unsubscribe(Msg, Topic, CInfo) -> - emqx_broker:unsubscribe(Topic), - emqx_hooks:run('session.unsubscribed', [CInfo, Topic, ?SUBOPTS]), - ?UNSUB(Topic, Msg). +unsubscribe(Msg, Topic, Ctx, CInfo) -> + MountTopic = mount(CInfo, Topic), + emqx_broker:unsubscribe(MountTopic), + run_hooks(Ctx, 'session.unsubscribed', [CInfo, Topic, ?SUBOPTS]), + ?UNSUB(MountTopic, Msg). + +mount(#{mountpoint := Mountpoint}, Topic) -> + <>. diff --git a/apps/emqx_gateway/src/emqx_gateway_app.erl b/apps/emqx_gateway/src/emqx_gateway_app.erl index 1ecd9cf26..d90942220 100644 --- a/apps/emqx_gateway/src/emqx_gateway_app.erl +++ b/apps/emqx_gateway/src/emqx_gateway_app.erl @@ -83,4 +83,4 @@ load_gateway_by_default([{Type, Confs}|More]) -> load_gateway_by_default(More). confs() -> - maps:to_list(emqx:get_config([gateway], [])). + maps:to_list(emqx:get_config([gateway], #{})). diff --git a/apps/emqx_gateway/src/emqx_gateway_metrics.erl b/apps/emqx_gateway/src/emqx_gateway_metrics.erl index 458017118..77b97a6a1 100644 --- a/apps/emqx_gateway/src/emqx_gateway_metrics.erl +++ b/apps/emqx_gateway/src/emqx_gateway_metrics.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --include("include/emqx_gateway.hrl"). +-include_lib("emqx_gateway/include/emqx_gateway.hrl"). %% APIs diff --git a/apps/emqx_gateway/test/emqx_coap_api_SUITE.erl b/apps/emqx_gateway/test/emqx_coap_api_SUITE.erl new file mode 100644 index 000000000..74b0cadc8 --- /dev/null +++ b/apps/emqx_gateway/test/emqx_coap_api_SUITE.erl @@ -0,0 +1,224 @@ +%%-------------------------------------------------------------------- +%% Copyright (C) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_coap_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(CONF_DEFAULT, <<" +gateway.coap { + idle_timeout = 30s + enable_stats = false + mountpoint = \"\" + notify_type = qos + connection_required = true + subscribe_qos = qos1 + publish_qos = qos1 + authentication = undefined + + listeners.udp.default { + bind = 5683 + } + } + ">>). + +-define(HOST, "127.0.0.1"). +-define(PORT, 5683). +-define(CONN_URI, "coap://127.0.0.1/mqtt/connection?clientid=client1&username=admin&password=public"). + +-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + emqx_mgmt_api_test_util:init_suite([emqx_gateway]), + Config. + +end_per_suite(Config) -> + emqx_mgmt_api_test_util:end_suite([emqx_gateway]), + Config. + +set_special_configs(emqx_gatewway) -> + ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT); + +set_special_configs(_) -> + ok. + +%%-------------------------------------------------------------------- +%% Cases +%%-------------------------------------------------------------------- +t_send_request_api(_) -> + ClientId = start_client(), + timer:sleep(200), + Path = emqx_mgmt_api_test_util:api_path(["gateway/coap/client1/request"]), + Token = <<"atoken">>, + Payload = <<"simple echo this">>, + Req = #{token => Token, + payload => Payload, + timeout => 10, + content_type => <<"text/plain">>, + method => <<"get">>}, + Auth = emqx_mgmt_api_test_util:auth_header_(), + {ok, Response} = emqx_mgmt_api_test_util:request_api(post, + Path, + "method=get", + Auth, + Req + ), + #{<<"token">> := RToken, <<"payload">> := RPayload} = + emqx_json:decode(Response, [return_maps]), + ?assertEqual(Token, RToken), + ?assertEqual(Payload, RPayload), + erlang:exit(ClientId, kill), + ok. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Internal Functions +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +start_client() -> + spawn(fun coap_client/0). + +coap_client() -> + {ok, CSock} = gen_udp:open(0, [binary, {active, false}]), + test_send_coap_request(CSock, post, <<>>, [], 1), + Response = test_recv_coap_response(CSock), + ?assertEqual({ok, created}, Response#coap_message.method), + echo_loop(CSock). + +echo_loop(CSock) -> + #coap_message{payload = Payload} = Req = test_recv_coap_request(CSock), + test_send_coap_response(CSock, ?HOST, ?PORT, {ok, content}, Payload, Req), + echo_loop(CSock). + +test_send_coap_request(UdpSock, Method, Content, Options, MsgId) -> + is_list(Options) orelse error("Options must be a list"), + case resolve_uri(?CONN_URI) of + {coap, {IpAddr, Port}, Path, Query} -> + Request0 = emqx_coap_message:request(con, Method, Content, + [{uri_path, Path}, + {uri_query, Query} | Options]), + Request = Request0#coap_message{id = MsgId}, + ?LOGT("send_coap_request Request=~p", [Request]), + RequestBinary = emqx_coap_frame:serialize_pkt(Request, undefined), + ?LOGT("test udp socket send to ~p:~p, data=~p", [IpAddr, Port, RequestBinary]), + ok = gen_udp:send(UdpSock, IpAddr, Port, RequestBinary); + {SchemeDiff, ChIdDiff, _, _} -> + error(lists:flatten(io_lib:format("scheme ~s or ChId ~s does not match with socket", [SchemeDiff, ChIdDiff]))) + end. + +test_recv_coap_response(UdpSock) -> + {ok, {Address, Port, Packet}} = gen_udp:recv(UdpSock, 0, 2000), + {ok, Response, _, _} = emqx_coap_frame:parse(Packet, undefined), + ?LOGT("test udp receive from ~p:~p, data1=~p, Response=~p", [Address, Port, Packet, Response]), + #coap_message{type = ack, method = Method, id=Id, token = Token, options = Options, payload = Payload} = Response, + ?LOGT("receive coap response Method=~p, Id=~p, Token=~p, Options=~p, Payload=~p", [Method, Id, Token, Options, Payload]), + Response. + +test_recv_coap_request(UdpSock) -> + case gen_udp:recv(UdpSock, 0) of + {ok, {_Address, _Port, Packet}} -> + {ok, Request, _, _} = emqx_coap_frame:parse(Packet, undefined), + #coap_message{type = con, method = Method, id=Id, token = Token, payload = Payload, options = Options} = Request, + ?LOGT("receive coap request Method=~p, Id=~p, Token=~p, Options=~p, Payload=~p", [Method, Id, Token, Options, Payload]), + Request; + {error, Reason} -> + ?LOGT("test_recv_coap_request failed, Reason=~p", [Reason]), + timeout_test_recv_coap_request + end. + +test_send_coap_response(UdpSock, Host, Port, Code, Content, Request) -> + is_list(Host) orelse error("Host is not a string"), + {ok, IpAddr} = inet:getaddr(Host, inet), + Response = emqx_coap_message:piggyback(Code, Content, Request), + ?LOGT("test_send_coap_response Response=~p", [Response]), + Binary = emqx_coap_frame:serialize_pkt(Response, undefined), + ok = gen_udp:send(UdpSock, IpAddr, Port, Binary). + +resolve_uri(Uri) -> + {ok, #{scheme := Scheme, + host := Host, + port := PortNo, + path := Path} = URIMap} = emqx_http_lib:uri_parse(Uri), + Query = maps:get(query, URIMap, undefined), + {ok, PeerIP} = inet:getaddr(Host, inet), + {Scheme, {PeerIP, PortNo}, split_path(Path), split_query(Query)}. + +split_path([]) -> []; +split_path([$/]) -> []; +split_path([$/ | Path]) -> split_segments(Path, $/, []). + +split_query(undefined) -> #{}; +split_query(Path) -> + split_segments(Path, $&, []). + +split_segments(Path, Char, Acc) -> + case string:rchr(Path, Char) of + 0 -> + [make_segment(Path) | Acc]; + N when N > 0 -> + split_segments(string:substr(Path, 1, N-1), Char, + [make_segment(string:substr(Path, N+1)) | Acc]) + end. + +make_segment(Seg) -> + list_to_binary(emqx_http_lib:uri_decode(Seg)). + + +get_coap_path(Options) -> + get_path(Options, <<>>). + +get_coap_query(Options) -> + proplists:get_value(uri_query, Options, []). + +get_coap_observe(Options) -> + get_observe(Options). + + +get_path([], Acc) -> + %?LOGT("get_path Acc=~p", [Acc]), + Acc; +get_path([{uri_path, Path1}|T], Acc) -> + %?LOGT("Path=~p, Acc=~p", [Path1, Acc]), + get_path(T, join_path(Path1, Acc)); +get_path([{_, _}|T], Acc) -> + get_path(T, Acc). + +get_observe([]) -> + undefined; +get_observe([{observe, V}|_T]) -> + V; +get_observe([{_, _}|T]) -> + get_observe(T). + +join_path([], Acc) -> Acc; +join_path([<<"/">>|T], Acc) -> + join_path(T, Acc); +join_path([H|T], Acc) -> + join_path(T, <>). + +sprintf(Format, Args) -> + lists:flatten(io_lib:format(Format, Args)). diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index fa924a71c..34fa731c7 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -21,23 +21,30 @@ -define(BASE_PATH, "/api/v5"). init_suite() -> + init_suite([]). + +init_suite(Apps) -> ekka_mnesia:start(), application:load(emqx_management), - emqx_ct_helpers:start_apps([emqx_dashboard], fun set_special_configs/1). + emqx_ct_helpers:start_apps(Apps ++ [emqx_dashboard], fun set_special_configs/1). + end_suite() -> + end_suite([]). + +end_suite(Apps) -> application:unload(emqx_management), - emqx_ct_helpers:stop_apps([emqx_dashboard]). + emqx_ct_helpers:stop_apps(Apps ++ [emqx_dashboard]). set_special_configs(emqx_dashboard) -> Config = #{ - default_username => <<"admin">>, - default_password => <<"public">>, - listeners => [#{ - protocol => http, - port => 18083 - }] - }, + default_username => <<"admin">>, + default_password => <<"public">>, + listeners => [#{ + protocol => http, + port => 18083 + }] + }, emqx_config:put([emqx_dashboard], Config), ok; set_special_configs(_App) -> @@ -53,7 +60,7 @@ request_api(Method, Url, QueryParams, Auth) -> request_api(Method, Url, QueryParams, Auth, []). request_api(Method, Url, QueryParams, Auth, []) - when (Method =:= options) orelse + when (Method =:= options) orelse (Method =:= get) orelse (Method =:= put) orelse (Method =:= head) orelse