refactor(emqx_coap): refactor CoAP API

This commit is contained in:
firest 2022-02-23 14:32:19 +08:00
parent 20323a8f11
commit 6b6acaec43
1 changed files with 64 additions and 60 deletions

View File

@ -18,93 +18,89 @@
-behaviour(minirest_api).
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include("src/coap/include/emqx_coap.hrl").
%% API
-export([api_spec/0]).
-export([api_spec/0, paths/0, schema/1, namespace/0]).
-export([request/2]).
-define(PREFIX, "/gateway/coap/clients/:clientid").
-define(DEF_WAIT_TIME, 10).
-import(emqx_mgmt_util, [ schema/1
, schema/2
, object_schema/1
, object_schema/2
, error_schema/2
, properties/1]).
-import(hoconsc, [mk/2, enum/1]).
-import(emqx_dashboard_swagger, [error_codes/2]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
api_spec() ->
{[request_api()], []}.
namespace() -> "gateway_coap".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
paths() ->
[?PREFIX ++ "/request"].
schema(?PREFIX ++ "/request") ->
#{operationId => request,
post => #{ tags => [<<"gateway|coap">>]
, description => <<"Send a CoAP request message to the client">>
, parameters => request_parameters()
, requestBody => request_body()
, responses => #{200 => coap_message(),
404 => error_codes(['CLIENT_NOT_FOUND'], <<"Client not found error">>),
504 => error_codes(['CLIENT_NOT_RESPONSE'], <<"Waiting for client response timeout">>)}
}
}.
request_api() ->
Metadata = #{post => request_method_meta()},
{?PREFIX ++ "/request", Metadata, request}.
request(post, #{body := Body, bindings := Bindings}) ->
ClientId = maps:get(clientid, Bindings, undefined),
Method = maps:get(<<"method">>, Body, <<"get">>),
CT = maps:get(<<"content_type">>, Body, <<"text/plain">>),
Method = maps:get(<<"method">>, Body, get),
AtomCT = maps:get(<<"content_type">>, Body),
Token = maps:get(<<"token">>, Body, <<>>),
Payload = maps:get(<<"payload">>, Body, <<>>),
BinWaitTime = maps:get(<<"timeout">>, Body, <<"10s">>),
{ok, WaitTime} = emqx_schema:to_duration_ms(BinWaitTime),
WaitTime = maps:get(<<"timeout">>, Body),
CT = erlang:atom_to_binary(AtomCT),
Payload2 = parse_payload(CT, Payload),
ReqType = erlang:binary_to_atom(Method),
Msg = emqx_coap_message:request(con,
ReqType, Payload2, #{content_format => CT}),
Method, Payload2, #{content_format => CT}),
Msg2 = Msg#coap_message{token = Token},
case call_client(ClientId, Msg2, timer:seconds(WaitTime)) of
case call_client(ClientId, Msg2, WaitTime) of
timeout ->
{504, #{code => 'CLIENT_NOT_RESPONSE'}};
not_found ->
{404, #{code => 'CLIENT_NOT_FOUND'}};
Response ->
{200, format_to_response(CT, Response)}
end.
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
request_parameters() ->
[#{name => clientid,
in => path,
schema => #{type => string},
required => true}].
[{clientid, mk(binary(), #{in => path, required => true})}].
request_properties() ->
properties([ {token, string, "message token, can be empty"}
, {method, string, "request method type", ["get", "put", "post", "delete"]}
, {timeout, string, "timespan for response"}
, {content_type, string, "payload type",
[<<"text/plain">>, <<"application/json">>, <<"application/octet-stream">>]}
, {payload, string, "payload"}]).
coap_message_properties() ->
properties([ {id, integer, "message id"}
, {token, string, "message token, can be empty"}
, {method, string, "response code"}
, {payload, string, "payload"}]).
request_method_meta() ->
#{description => <<"lookup matching messages">>,
parameters => request_parameters(),
'requestBody' => object_schema(request_properties(),
<<"request payload, binary must encode by base64">>),
responses => #{
<<"200">> => object_schema(coap_message_properties()),
<<"404">> => error_schema("client not found error", ['CLIENT_NOT_FOUND']),
<<"504">> => error_schema("timeout", ['CLIENT_NOT_RESPONSE'])
}}.
request_body() ->
[ {token, mk(binary(), #{desc => "message token, can be empty"})}
, {method, mk(enum([get, put, post, delete]), #{desc => "request method type"})}
, {timeout, mk(emqx_schema:duration_ms(), #{desc => "timespan for response"})}
, {content_type, mk(enum(['text/plain', 'application/json', 'application/octet-stream']),
#{desc => "payload type"})}
, {payload, mk(binary(), #{desc => "the content of the payload"})}
].
coap_message() ->
[ {id, mk(integer(), #{desc => "message id"})}
, {token, mk(string(), #{desc => "message token, can be empty"})}
, {method, mk(string(), #{desc => "response code"})}
, {payload, mk(string(), #{desc => "payload"})}
].
format_to_response(ContentType, #coap_message{id = Id,
token = Token,
@ -131,15 +127,23 @@ parse_payload(_, Body) ->
Body.
call_client(ClientId, Msg, Timeout) ->
case emqx_gateway_cm_registry:lookup_channels(coap, ClientId) of
[Channel | _] ->
RequestId = emqx_coap_channel:send_request(Channel, Msg),
case gen_server:wait_response(RequestId, Timeout) of
{reply, Reply} ->
Reply;
_ ->
timeout
end;
_ ->
try
case emqx_gateway_cm_registry:lookup_channels(coap, ClientId) of
[Channel | _] ->
RequestId = emqx_coap_channel:send_request(Channel, Msg),
case gen_server:wait_response(RequestId, Timeout) of
{reply, Reply} ->
Reply;
_ ->
timeout
end;
_ ->
not_found
end
catch _:Error:Trace ->
?SLOG(warning, #{msg => "coap_client_call_exception",
clientid => ClientId,
error => Error,
stacktrace => Trace}),
not_found
end.