From 6b6acaec43b97c9a9419dd68123b2c2ba4af7a7f Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 23 Feb 2022 14:32:19 +0800 Subject: [PATCH] refactor(emqx_coap): refactor CoAP API --- apps/emqx_gateway/src/coap/emqx_coap_api.erl | 124 ++++++++++--------- 1 file changed, 64 insertions(+), 60 deletions(-) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_api.erl b/apps/emqx_gateway/src/coap/emqx_coap_api.erl index 92a2cbcae..e982a6e2c 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_api.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_api.erl @@ -18,93 +18,89 @@ -behaviour(minirest_api). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). -include("src/coap/include/emqx_coap.hrl"). %% API --export([api_spec/0]). +-export([api_spec/0, paths/0, schema/1, namespace/0]). -export([request/2]). -define(PREFIX, "/gateway/coap/clients/:clientid"). --define(DEF_WAIT_TIME, 10). --import(emqx_mgmt_util, [ schema/1 - , schema/2 - , object_schema/1 - , object_schema/2 - , error_schema/2 - , properties/1]). +-import(hoconsc, [mk/2, enum/1]). +-import(emqx_dashboard_swagger, [error_codes/2]). %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- -api_spec() -> - {[request_api()], []}. +namespace() -> "gateway_coap". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}). + +paths() -> + [?PREFIX ++ "/request"]. + +schema(?PREFIX ++ "/request") -> + #{operationId => request, + post => #{ tags => [<<"gateway|coap">>] + , description => <<"Send a CoAP request message to the client">> + , parameters => request_parameters() + , requestBody => request_body() + , responses => #{200 => coap_message(), + 404 => error_codes(['CLIENT_NOT_FOUND'], <<"Client not found error">>), + 504 => error_codes(['CLIENT_NOT_RESPONSE'], <<"Waiting for client response timeout">>)} + } + }. -request_api() -> - Metadata = #{post => request_method_meta()}, - {?PREFIX ++ "/request", Metadata, request}. request(post, #{body := Body, bindings := Bindings}) -> ClientId = maps:get(clientid, Bindings, undefined), - - Method = maps:get(<<"method">>, Body, <<"get">>), - CT = maps:get(<<"content_type">>, Body, <<"text/plain">>), + Method = maps:get(<<"method">>, Body, get), + AtomCT = maps:get(<<"content_type">>, Body), Token = maps:get(<<"token">>, Body, <<>>), Payload = maps:get(<<"payload">>, Body, <<>>), - BinWaitTime = maps:get(<<"timeout">>, Body, <<"10s">>), - {ok, WaitTime} = emqx_schema:to_duration_ms(BinWaitTime), + WaitTime = maps:get(<<"timeout">>, Body), + CT = erlang:atom_to_binary(AtomCT), Payload2 = parse_payload(CT, Payload), - ReqType = erlang:binary_to_atom(Method), Msg = emqx_coap_message:request(con, - ReqType, Payload2, #{content_format => CT}), + Method, Payload2, #{content_format => CT}), Msg2 = Msg#coap_message{token = Token}, - case call_client(ClientId, Msg2, timer:seconds(WaitTime)) of + case call_client(ClientId, Msg2, WaitTime) of timeout -> {504, #{code => 'CLIENT_NOT_RESPONSE'}}; not_found -> {404, #{code => 'CLIENT_NOT_FOUND'}}; Response -> {200, format_to_response(CT, Response)} - end. + end. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- request_parameters() -> - [#{name => clientid, - in => path, - schema => #{type => string}, - required => true}]. + [{clientid, mk(binary(), #{in => path, required => true})}]. -request_properties() -> - properties([ {token, string, "message token, can be empty"} - , {method, string, "request method type", ["get", "put", "post", "delete"]} - , {timeout, string, "timespan for response"} - , {content_type, string, "payload type", - [<<"text/plain">>, <<"application/json">>, <<"application/octet-stream">>]} - , {payload, string, "payload"}]). - -coap_message_properties() -> - properties([ {id, integer, "message id"} - , {token, string, "message token, can be empty"} - , {method, string, "response code"} - , {payload, string, "payload"}]). - -request_method_meta() -> - #{description => <<"lookup matching messages">>, - parameters => request_parameters(), - 'requestBody' => object_schema(request_properties(), - <<"request payload, binary must encode by base64">>), - responses => #{ - <<"200">> => object_schema(coap_message_properties()), - <<"404">> => error_schema("client not found error", ['CLIENT_NOT_FOUND']), - <<"504">> => error_schema("timeout", ['CLIENT_NOT_RESPONSE']) - }}. +request_body() -> + [ {token, mk(binary(), #{desc => "message token, can be empty"})} + , {method, mk(enum([get, put, post, delete]), #{desc => "request method type"})} + , {timeout, mk(emqx_schema:duration_ms(), #{desc => "timespan for response"})} + , {content_type, mk(enum(['text/plain', 'application/json', 'application/octet-stream']), + #{desc => "payload type"})} + , {payload, mk(binary(), #{desc => "the content of the payload"})} + ]. +coap_message() -> + [ {id, mk(integer(), #{desc => "message id"})} + , {token, mk(string(), #{desc => "message token, can be empty"})} + , {method, mk(string(), #{desc => "response code"})} + , {payload, mk(string(), #{desc => "payload"})} + ]. format_to_response(ContentType, #coap_message{id = Id, token = Token, @@ -131,15 +127,23 @@ parse_payload(_, Body) -> Body. call_client(ClientId, Msg, Timeout) -> - case emqx_gateway_cm_registry:lookup_channels(coap, ClientId) of - [Channel | _] -> - RequestId = emqx_coap_channel:send_request(Channel, Msg), - case gen_server:wait_response(RequestId, Timeout) of - {reply, Reply} -> - Reply; - _ -> - timeout - end; - _ -> + try + case emqx_gateway_cm_registry:lookup_channels(coap, ClientId) of + [Channel | _] -> + RequestId = emqx_coap_channel:send_request(Channel, Msg), + case gen_server:wait_response(RequestId, Timeout) of + {reply, Reply} -> + Reply; + _ -> + timeout + end; + _ -> + not_found + end + catch _:Error:Trace -> + ?SLOG(warning, #{msg => "coap_client_call_exception", + clientid => ClientId, + error => Error, + stacktrace => Trace}), not_found end.