Merge pull request #9170 from zmstone/fix-publish-api-return-error-code
feat(api/publish): return detailed publish results
This commit is contained in:
commit
01e0c9e64e
|
@ -74,7 +74,8 @@
|
||||||
to_map/1,
|
to_map/1,
|
||||||
to_log_map/1,
|
to_log_map/1,
|
||||||
to_list/1,
|
to_list/1,
|
||||||
from_map/1
|
from_map/1,
|
||||||
|
estimate_size/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([message_map/0]).
|
-export_type([message_map/0]).
|
||||||
|
@ -175,6 +176,18 @@ make(MsgId, From, QoS, Topic, Payload, Flags, Headers) when
|
||||||
timestamp = Now
|
timestamp = Now
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
%% optimistic esitmation of a message size after serialization
|
||||||
|
%% not including MQTT v5 message headers/user properties etc.
|
||||||
|
-spec estimate_size(emqx_types:message()) -> non_neg_integer().
|
||||||
|
estimate_size(#message{topic = Topic, payload = Payload}) ->
|
||||||
|
FixedHeaderSize = 1,
|
||||||
|
VarLenSize = 4,
|
||||||
|
TopicSize = iolist_size(Topic),
|
||||||
|
PayloadSize = iolist_size(Payload),
|
||||||
|
PacketIdSize = 2,
|
||||||
|
TopicLengthSize = 2,
|
||||||
|
FixedHeaderSize + VarLenSize + TopicLengthSize + TopicSize + PacketIdSize + PayloadSize.
|
||||||
|
|
||||||
-spec id(emqx_types:message()) -> maybe(binary()).
|
-spec id(emqx_types:message()) -> maybe(binary()).
|
||||||
id(#message{id = Id}) -> Id.
|
id(#message{id = Id}) -> Id.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,127 @@
|
||||||
|
|
||||||
|
emqx_mgmt_api_publish {
|
||||||
|
publish_api {
|
||||||
|
desc {
|
||||||
|
en: """
|
||||||
|
Publish one message.<br/>
|
||||||
|
Possible HTTP status response codes are:<br/>
|
||||||
|
<code>200</code>: The message is delivered to at least one subscriber;<br/>
|
||||||
|
<code>202</code>: No matched subscribers;<br/>
|
||||||
|
<code>400</code>: Message is invalid. for example bad topic name, or QoS is out of range;<br/>
|
||||||
|
<code>503</code>: Failed to deliver the message to subscriber(s);<br/>
|
||||||
|
"""
|
||||||
|
zh: """
|
||||||
|
发布一个消息。<br/>
|
||||||
|
可能的 HTTP 状态码如下:<br/>
|
||||||
|
200: 消息被成功发送到至少一个订阅。<br/>
|
||||||
|
202: 没有匹配到任何订阅。<br/>
|
||||||
|
400: 消息编码错误,如非法主题,或 QoS 超出范围等。<br/>
|
||||||
|
503: 服务重启等过程中导致转发失败。<br/><br/>
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
publish_bulk_api {
|
||||||
|
desc {
|
||||||
|
en: """
|
||||||
|
Publish a batch of messages.<br/>
|
||||||
|
Possible HTTP response status code are:<br/>
|
||||||
|
200: All messages are delivered to at least one subscriber;<br/>
|
||||||
|
202: At least one message was not delivered to any subscriber;<br/>
|
||||||
|
400: At least one message is invalid. For example bad topic name, or QoS is out of range;<br/>
|
||||||
|
503: Failed to deliver at least one of the messages;<br/>
|
||||||
|
|
||||||
|
In case there is at lest one invalid message in the batch, the HTTP response body
|
||||||
|
is the same as for <code>/publish</code> API.<br/>
|
||||||
|
Otherwise the HTTP response body is an array of JSON objects indicating the publish
|
||||||
|
result of each individual message in the batch.
|
||||||
|
"""
|
||||||
|
zh: """
|
||||||
|
批量发布一组消息。<br/>
|
||||||
|
可能的 HTTP 状态码如下:<br/>
|
||||||
|
200: 所有的消息都被成功发送到至少一个订阅。<br/>
|
||||||
|
202: 至少有一个消息没有匹配到任何订阅。<br/>
|
||||||
|
400: 至少有一个消息编码错误,如非法主题,或 QoS 超出范围等。<br/>
|
||||||
|
503: 至少有一个小因为服务重启的原因导致转发失败。<br/>
|
||||||
|
|
||||||
|
请求的 Body 或者 Body 中包含的某个消息无法通过 API 规范的类型检查时,HTTP 响应的消息与发布单个消息的 API
|
||||||
|
<code>/publish</code> 是一样的。
|
||||||
|
如果所有的消息都是合法的,那么 HTTP 返回的内容是一个 JSON 数组,每个元素代表了该消息转发的状态。
|
||||||
|
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
topic_name {
|
||||||
|
desc {
|
||||||
|
en: "Topic Name"
|
||||||
|
zh: "主题名称"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
qos {
|
||||||
|
desc {
|
||||||
|
en: "MQTT message QoS"
|
||||||
|
zh: "MQTT 消息的 QoS"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
clientid {
|
||||||
|
desc {
|
||||||
|
en: "Each message can be published as if it is done on behalf of an MQTT client whos ID can be specified in this field."
|
||||||
|
zh: "每个消息都可以带上一个 MQTT 客户端 ID,用于模拟 MQTT 客户端的发布行为。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
payload {
|
||||||
|
desc {
|
||||||
|
en: "The MQTT message payload."
|
||||||
|
zh: "MQTT 消息体。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
retain {
|
||||||
|
desc {
|
||||||
|
en: "A boolean field to indicate if this message should be retained."
|
||||||
|
zh: "布尔型字段,用于表示该消息是否保留消息。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
payload_encoding {
|
||||||
|
desc {
|
||||||
|
en: "MQTT Payload Encoding, <code>base64</code> or <code>plain</code>. When set to <code>base64</code>, the message is decoded before it is published."
|
||||||
|
zh: "MQTT 消息体的编码方式,可以是 <code>base64</code> 或 <code>plain</code>。当设置为 <code>base64</code> 时,消息在发布前会先被解码。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
message_id {
|
||||||
|
desc {
|
||||||
|
en: "A globally unique message ID for correlation/tracing."
|
||||||
|
zh: "全局唯一的一个消息 ID,方便用于关联和追踪。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reason_code {
|
||||||
|
desc {
|
||||||
|
en: """
|
||||||
|
The MQTT reason code, as the same ones used in PUBACK packet.<br/>
|
||||||
|
Currently supported codes are:<br/>
|
||||||
|
|
||||||
|
16(0x10): No matching subscribers;<br/>
|
||||||
|
131(0x81): Error happened when dispatching the message. e.g. during EMQX restart;<br/>
|
||||||
|
144(0x90): Topic name invalid;<br/>
|
||||||
|
151(0x97): Publish rate limited, or message size exceeded limit. The global size limit can be configured with <code>mqtt.max_packet_size</code><br/>
|
||||||
|
NOTE: The message size is estimated with the received topic and payload size, meaning the actual size of serialized bytes (when sent to MQTT subscriber)
|
||||||
|
might be slightly over the limit.
|
||||||
|
"""
|
||||||
|
zh: """
|
||||||
|
MQTT 消息发布的错误码,这些错误码也是 MQTT 规范中 PUBACK 消息可能携带的错误码。<br/>
|
||||||
|
当前支持如下错误码:<br/>
|
||||||
|
|
||||||
|
16(0x10):没能匹配到任何订阅;<br/>
|
||||||
|
131(0x81):消息转发时发生错误,例如 EMQX 服务重启;<br/>
|
||||||
|
144(0x90):主题名称非法;<br/>
|
||||||
|
151(0x97):受到了速率限制,或者消息尺寸过大。全局消息大小限制可以通过配置项 <code>mqtt.max_packet_size</code> 来进行修改。<br/>
|
||||||
|
注意:消息尺寸的是通过主题和消息体的字节数进行估算的。具体发布时所占用的字节数可能会稍大于这个估算的值。
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
error_message {
|
||||||
|
desc {
|
||||||
|
en: "Describes the failure reason in detail."
|
||||||
|
zh: "失败的详细原因。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,7 +16,15 @@
|
||||||
-module(emqx_mgmt_api_publish).
|
-module(emqx_mgmt_api_publish).
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
|
|
||||||
|
-define(ALL_IS_WELL, 200).
|
||||||
|
-define(PARTIALLY_OK, 202).
|
||||||
|
-define(BAD_REQUEST, 400).
|
||||||
|
-define(DISPATCH_ERROR, 503).
|
||||||
|
|
||||||
-behaviour(minirest_api).
|
-behaviour(minirest_api).
|
||||||
|
|
||||||
|
@ -42,11 +50,14 @@ schema("/publish") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => publish,
|
'operationId' => publish,
|
||||||
post => #{
|
post => #{
|
||||||
description => <<"Publish Message">>,
|
description => ?DESC(publish_api),
|
||||||
tags => [<<"Publish">>],
|
tags => [<<"Publish">>],
|
||||||
'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, publish_message)),
|
'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, publish_message)),
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => hoconsc:mk(hoconsc:ref(?MODULE, publish_message_info))
|
?ALL_IS_WELL => hoconsc:mk(hoconsc:ref(?MODULE, publish_ok)),
|
||||||
|
?PARTIALLY_OK => hoconsc:mk(hoconsc:ref(?MODULE, publish_error)),
|
||||||
|
?BAD_REQUEST => bad_request_schema(),
|
||||||
|
?DISPATCH_ERROR => hoconsc:mk(hoconsc:ref(?MODULE, publish_error))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -54,44 +65,58 @@ schema("/publish/bulk") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => publish_batch,
|
'operationId' => publish_batch,
|
||||||
post => #{
|
post => #{
|
||||||
description => <<"Publish Messages">>,
|
description => ?DESC(publish_bulk_api),
|
||||||
tags => [<<"Publish">>],
|
tags => [<<"Publish">>],
|
||||||
'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_message)), #{}),
|
'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_message)), #{}),
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_message_info)), #{})
|
?ALL_IS_WELL => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_ok)), #{}),
|
||||||
|
?PARTIALLY_OK => hoconsc:mk(
|
||||||
|
hoconsc:array(hoconsc:ref(?MODULE, publish_error)), #{}
|
||||||
|
),
|
||||||
|
?BAD_REQUEST => bad_request_schema(),
|
||||||
|
?DISPATCH_ERROR => hoconsc:mk(
|
||||||
|
hoconsc:array(hoconsc:ref(?MODULE, publish_error)), #{}
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
bad_request_schema() ->
|
||||||
|
Union = hoconsc:union([
|
||||||
|
hoconsc:ref(?MODULE, bad_request),
|
||||||
|
hoconsc:array(hoconsc:ref(?MODULE, publish_error))
|
||||||
|
]),
|
||||||
|
hoconsc:mk(Union, #{}).
|
||||||
|
|
||||||
fields(message) ->
|
fields(message) ->
|
||||||
[
|
[
|
||||||
{topic,
|
{topic,
|
||||||
hoconsc:mk(binary(), #{
|
hoconsc:mk(binary(), #{
|
||||||
desc => <<"Topic Name">>,
|
desc => ?DESC(topic_name),
|
||||||
required => true,
|
required => true,
|
||||||
example => <<"api/example/topic">>
|
example => <<"api/example/topic">>
|
||||||
})},
|
})},
|
||||||
{qos,
|
{qos,
|
||||||
hoconsc:mk(emqx_schema:qos(), #{
|
hoconsc:mk(emqx_schema:qos(), #{
|
||||||
desc => <<"MQTT QoS">>,
|
desc => ?DESC(qos),
|
||||||
required => false,
|
required => false,
|
||||||
default => 0
|
default => 0
|
||||||
})},
|
})},
|
||||||
{clientid,
|
{clientid,
|
||||||
hoconsc:mk(binary(), #{
|
hoconsc:mk(binary(), #{
|
||||||
desc => <<"From client ID">>,
|
desc => ?DESC(clientid),
|
||||||
required => false,
|
required => false,
|
||||||
example => <<"api_example_client">>
|
example => <<"api_example_client">>
|
||||||
})},
|
})},
|
||||||
{payload,
|
{payload,
|
||||||
hoconsc:mk(binary(), #{
|
hoconsc:mk(binary(), #{
|
||||||
desc => <<"MQTT Payload">>,
|
desc => ?DESC(payload),
|
||||||
required => true,
|
required => true,
|
||||||
example => <<"hello emqx api">>
|
example => <<"hello emqx api">>
|
||||||
})},
|
})},
|
||||||
{retain,
|
{retain,
|
||||||
hoconsc:mk(boolean(), #{
|
hoconsc:mk(boolean(), #{
|
||||||
desc => <<"MQTT Retain Message">>,
|
desc => ?DESC(retain),
|
||||||
required => false,
|
required => false,
|
||||||
default => false
|
default => false
|
||||||
})}
|
})}
|
||||||
|
@ -100,53 +125,196 @@ fields(publish_message) ->
|
||||||
[
|
[
|
||||||
{payload_encoding,
|
{payload_encoding,
|
||||||
hoconsc:mk(hoconsc:enum([plain, base64]), #{
|
hoconsc:mk(hoconsc:enum([plain, base64]), #{
|
||||||
desc => <<"MQTT Payload Encoding, base64 or plain">>,
|
desc => ?DESC(payload_encoding),
|
||||||
required => false,
|
required => false,
|
||||||
default => plain
|
default => plain
|
||||||
})}
|
})}
|
||||||
] ++ fields(message);
|
] ++ fields(message);
|
||||||
fields(publish_message_info) ->
|
fields(publish_ok) ->
|
||||||
[
|
[
|
||||||
{id,
|
{id,
|
||||||
hoconsc:mk(binary(), #{
|
hoconsc:mk(binary(), #{
|
||||||
desc => <<"A globally unique message ID">>
|
desc => ?DESC(message_id)
|
||||||
|
})}
|
||||||
|
];
|
||||||
|
fields(publish_error) ->
|
||||||
|
[
|
||||||
|
{reason_code,
|
||||||
|
hoconsc:mk(integer(), #{
|
||||||
|
desc => ?DESC(reason_code),
|
||||||
|
example => 16
|
||||||
|
})},
|
||||||
|
{message,
|
||||||
|
hoconsc:mk(binary(), #{
|
||||||
|
desc => ?DESC(error_message),
|
||||||
|
example => <<"no_matching_subscribers">>
|
||||||
|
})}
|
||||||
|
];
|
||||||
|
fields(bad_request) ->
|
||||||
|
[
|
||||||
|
{code,
|
||||||
|
hoconsc:mk(string(), #{
|
||||||
|
desc => <<"BAD_REQUEST">>
|
||||||
|
})},
|
||||||
|
{message,
|
||||||
|
hoconsc:mk(binary(), #{
|
||||||
|
desc => ?DESC(error_message)
|
||||||
})}
|
})}
|
||||||
].
|
].
|
||||||
|
|
||||||
publish(post, #{body := Body}) ->
|
publish(post, #{body := Body}) ->
|
||||||
case message(Body) of
|
case message(Body) of
|
||||||
{ok, Message} ->
|
{ok, Message} ->
|
||||||
_ = emqx_mgmt:publish(Message),
|
Res = emqx_mgmt:publish(Message),
|
||||||
{200, format_message_response(Message)};
|
publish_result_to_http_reply(Message, Res);
|
||||||
{error, R} ->
|
{error, Reason} ->
|
||||||
{400, 'BAD_REQUEST', to_binary(R)}
|
{?BAD_REQUEST, make_bad_req_reply(Reason)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
publish_batch(post, #{body := Body}) ->
|
publish_batch(post, #{body := Body}) ->
|
||||||
case messages(Body) of
|
case messages(Body) of
|
||||||
{ok, Messages} ->
|
{ok, Messages} ->
|
||||||
_ = [emqx_mgmt:publish(Message) || Message <- Messages],
|
ResList = lists:map(
|
||||||
{200, format_message_response(Messages)};
|
fun(Message) ->
|
||||||
{error, R} ->
|
Res = emqx_mgmt:publish(Message),
|
||||||
{400, 'BAD_REQUEST', to_binary(R)}
|
publish_result_to_http_reply(Message, Res)
|
||||||
|
end,
|
||||||
|
Messages
|
||||||
|
),
|
||||||
|
publish_results_to_http_reply(ResList);
|
||||||
|
{error, Reason} ->
|
||||||
|
{?BAD_REQUEST, make_bad_req_reply(Reason)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
make_bad_req_reply(invalid_topic_name) ->
|
||||||
|
make_publish_error_response(?RC_TOPIC_NAME_INVALID);
|
||||||
|
make_bad_req_reply(packet_too_large) ->
|
||||||
|
%% 0x95 RC_PACKET_TOO_LARGE is not a PUBACK reason code
|
||||||
|
%% This is why we use RC_QUOTA_EXCEEDED instead
|
||||||
|
make_publish_error_response(?RC_QUOTA_EXCEEDED, packet_too_large);
|
||||||
|
make_bad_req_reply(Reason) ->
|
||||||
|
make_publish_error_response(?RC_IMPLEMENTATION_SPECIFIC_ERROR, to_binary(Reason)).
|
||||||
|
|
||||||
|
-spec is_ok_deliver({_NodeOrShare, _MatchedTopic, emqx_types:deliver_result()}) -> boolean().
|
||||||
|
is_ok_deliver({_NodeOrShare, _MatchedTopic, ok}) -> true;
|
||||||
|
is_ok_deliver({_NodeOrShare, _MatchedTopic, {ok, _}}) -> true;
|
||||||
|
is_ok_deliver({_NodeOrShare, _MatchedTopic, {error, _}}) -> false.
|
||||||
|
|
||||||
|
%% @hidden Map MQTT publish result reason code to HTTP status code.
|
||||||
|
%% MQTT reason code | Description | HTTP status code
|
||||||
|
%% 0 Success 200
|
||||||
|
%% 16 No matching subscribers 202
|
||||||
|
%% 128 Unspecified error 406
|
||||||
|
%% 131 Implementation specific error 406
|
||||||
|
%% 144 Topic Name invalid 400
|
||||||
|
%% 151 Quota exceeded 400
|
||||||
|
%%
|
||||||
|
%% %%%%%% Below error codes are not implemented so far %%%%
|
||||||
|
%%
|
||||||
|
%% If HTTP request passes HTTP authentication, it is considered trusted.
|
||||||
|
%% In the future, we may choose to check ACL for the provided MQTT Client ID
|
||||||
|
%% 135 Not authorized 401
|
||||||
|
%%
|
||||||
|
%% %%%%%% Below error codes are not applicable %%%%%%%
|
||||||
|
%%
|
||||||
|
%% No user specified packet ID, so there should be no packet ID error
|
||||||
|
%% 145 Packet identifier is in use 400
|
||||||
|
%%
|
||||||
|
%% No preceding payload format indicator to compare against.
|
||||||
|
%% Content-Type check should be done at HTTP layer but not here.
|
||||||
|
%% 153 Payload format invalid 400
|
||||||
|
publish_result_to_http_reply(_Message, []) ->
|
||||||
|
%% matched no subscriber
|
||||||
|
{?PARTIALLY_OK, make_publish_error_response(?RC_NO_MATCHING_SUBSCRIBERS)};
|
||||||
|
publish_result_to_http_reply(Message, PublishResult) ->
|
||||||
|
case lists:any(fun is_ok_deliver/1, PublishResult) of
|
||||||
|
true ->
|
||||||
|
%% delivered to at least one subscriber
|
||||||
|
OkBody = make_publish_response(Message),
|
||||||
|
{?ALL_IS_WELL, OkBody};
|
||||||
|
false ->
|
||||||
|
%% this is quite unusual, matched, but failed to deliver
|
||||||
|
%% if this happens, the publish result log can be helpful
|
||||||
|
%% to idnetify the reason why publish failed
|
||||||
|
%% e.g. during emqx restart
|
||||||
|
ReasonString = <<"failed_to_dispatch">>,
|
||||||
|
ErrorBody = make_publish_error_response(
|
||||||
|
?RC_IMPLEMENTATION_SPECIFIC_ERROR, ReasonString
|
||||||
|
),
|
||||||
|
?SLOG(warning, #{
|
||||||
|
msg => ReasonString,
|
||||||
|
message_id => emqx_message:id(Message),
|
||||||
|
results => PublishResult
|
||||||
|
}),
|
||||||
|
{?DISPATCH_ERROR, ErrorBody}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @hidden Reply batch publish result.
|
||||||
|
%% 200 if all published OK.
|
||||||
|
%% 202 if at least one message matched no subscribers.
|
||||||
|
%% 503 for temp errors duing EMQX restart
|
||||||
|
publish_results_to_http_reply([_ | _] = ResList) ->
|
||||||
|
{Codes0, BodyL} = lists:unzip(ResList),
|
||||||
|
Codes = lists:usort(Codes0),
|
||||||
|
HasFailure = lists:member(?DISPATCH_ERROR, Codes),
|
||||||
|
All200 = (Codes =:= [?ALL_IS_WELL]),
|
||||||
|
Code =
|
||||||
|
case All200 of
|
||||||
|
true ->
|
||||||
|
%% All OK
|
||||||
|
?ALL_IS_WELL;
|
||||||
|
false when not HasFailure ->
|
||||||
|
%% Partially OK
|
||||||
|
?PARTIALLY_OK;
|
||||||
|
false ->
|
||||||
|
%% At least one failed
|
||||||
|
?DISPATCH_ERROR
|
||||||
|
end,
|
||||||
|
{Code, BodyL}.
|
||||||
|
|
||||||
message(Map) ->
|
message(Map) ->
|
||||||
|
try
|
||||||
|
make_message(Map)
|
||||||
|
catch
|
||||||
|
throw:Reason ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
make_message(Map) ->
|
||||||
Encoding = maps:get(<<"payload_encoding">>, Map, plain),
|
Encoding = maps:get(<<"payload_encoding">>, Map, plain),
|
||||||
case encode_payload(Encoding, maps:get(<<"payload">>, Map)) of
|
case decode_payload(Encoding, maps:get(<<"payload">>, Map)) of
|
||||||
{ok, Payload} ->
|
{ok, Payload} ->
|
||||||
From = maps:get(<<"clientid">>, Map, http_api),
|
From = maps:get(<<"clientid">>, Map, http_api),
|
||||||
QoS = maps:get(<<"qos">>, Map, 0),
|
QoS = maps:get(<<"qos">>, Map, 0),
|
||||||
Topic = maps:get(<<"topic">>, Map),
|
Topic = maps:get(<<"topic">>, Map),
|
||||||
Retain = maps:get(<<"retain">>, Map, false),
|
Retain = maps:get(<<"retain">>, Map, false),
|
||||||
{ok, emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{})};
|
try
|
||||||
|
_ = emqx_topic:validate(name, Topic)
|
||||||
|
catch
|
||||||
|
error:_Reason ->
|
||||||
|
throw(invalid_topic_name)
|
||||||
|
end,
|
||||||
|
Message = emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}),
|
||||||
|
Size = emqx_message:estimate_size(Message),
|
||||||
|
(Size > size_limit()) andalso throw(packet_too_large),
|
||||||
|
{ok, Message};
|
||||||
{error, R} ->
|
{error, R} ->
|
||||||
{error, R}
|
{error, R}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
encode_payload(plain, Payload) ->
|
%% get the global packet size limit since HTTP API does not belong to any zone.
|
||||||
|
size_limit() ->
|
||||||
|
try
|
||||||
|
emqx_config:get([mqtt, max_packet_size])
|
||||||
|
catch
|
||||||
|
_:_ ->
|
||||||
|
%% leave 1000 bytes for topic name etc.
|
||||||
|
?MAX_PACKET_SIZE
|
||||||
|
end.
|
||||||
|
|
||||||
|
decode_payload(plain, Payload) ->
|
||||||
{ok, Payload};
|
{ok, Payload};
|
||||||
encode_payload(base64, Payload) ->
|
decode_payload(base64, Payload) ->
|
||||||
try
|
try
|
||||||
{ok, base64:decode(Payload)}
|
{ok, base64:decode(Payload)}
|
||||||
catch
|
catch
|
||||||
|
@ -154,6 +322,8 @@ encode_payload(base64, Payload) ->
|
||||||
{error, {decode_base64_payload_failed, Payload}}
|
{error, {decode_base64_payload_failed, Payload}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
messages([]) ->
|
||||||
|
{errror, <<"empty_batch">>};
|
||||||
messages(List) ->
|
messages(List) ->
|
||||||
messages(List, []).
|
messages(List, []).
|
||||||
|
|
||||||
|
@ -167,10 +337,23 @@ messages([MessageMap | List], Res) ->
|
||||||
{error, R}
|
{error, R}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
format_message_response(Messages) when is_list(Messages) ->
|
make_publish_response(#message{id = ID}) ->
|
||||||
[format_message_response(Message) || Message <- Messages];
|
#{
|
||||||
format_message_response(#message{id = ID}) ->
|
id => emqx_guid:to_hexstr(ID)
|
||||||
#{id => emqx_guid:to_hexstr(ID)}.
|
}.
|
||||||
|
|
||||||
|
make_publish_error_response(ReasonCode) ->
|
||||||
|
make_publish_error_response(ReasonCode, emqx_reason_codes:name(ReasonCode)).
|
||||||
|
|
||||||
|
make_publish_error_response(ReasonCode, Msg) ->
|
||||||
|
#{
|
||||||
|
reason_code => ReasonCode,
|
||||||
|
message => to_binary(Msg)
|
||||||
|
}.
|
||||||
|
|
||||||
|
to_binary(Atom) when is_atom(Atom) ->
|
||||||
|
atom_to_binary(Atom);
|
||||||
|
to_binary(Msg) when is_binary(Msg) ->
|
||||||
|
Msg;
|
||||||
to_binary(Term) ->
|
to_binary(Term) ->
|
||||||
list_to_binary(io_lib:format("~p", [Term])).
|
list_to_binary(io_lib:format("~0p", [Term])).
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
|
||||||
-define(CLIENTID, <<"api_clientid">>).
|
-define(CLIENTID, <<"api_clientid">>).
|
||||||
-define(USERNAME, <<"api_username">>).
|
-define(USERNAME, <<"api_username">>).
|
||||||
|
@ -36,6 +37,16 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_) ->
|
end_per_suite(_) ->
|
||||||
emqx_mgmt_api_test_util:end_suite().
|
emqx_mgmt_api_test_util:end_suite().
|
||||||
|
|
||||||
|
init_per_testcase(Case, Config) ->
|
||||||
|
?MODULE:Case({init, Config}).
|
||||||
|
|
||||||
|
end_per_testcase(Case, Config) ->
|
||||||
|
?MODULE:Case({'end', Config}).
|
||||||
|
|
||||||
|
t_publish_api({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_api({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
t_publish_api(_) ->
|
t_publish_api(_) ->
|
||||||
{ok, Client} = emqtt:start_link(#{
|
{ok, Client} = emqtt:start_link(#{
|
||||||
username => <<"api_username">>, clientid => <<"api_clientid">>
|
username => <<"api_username">>, clientid => <<"api_clientid">>
|
||||||
|
@ -48,11 +59,113 @@ t_publish_api(_) ->
|
||||||
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
Body = #{topic => ?TOPIC1, payload => Payload},
|
Body = #{topic => ?TOPIC1, payload => Payload},
|
||||||
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
||||||
ResponseMap = emqx_json:decode(Response, [return_maps]),
|
ResponseMap = decode_json(Response),
|
||||||
?assertEqual([<<"id">>], maps:keys(ResponseMap)),
|
?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap))),
|
||||||
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
||||||
emqtt:disconnect(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
|
t_publish_no_subscriber({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_no_subscriber({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
|
t_publish_no_subscriber(_) ->
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = #{topic => ?TOPIC1, payload => Payload},
|
||||||
|
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
||||||
|
ResponseMap = decode_json(Response),
|
||||||
|
?assertEqual([<<"message">>, <<"reason_code">>], lists:sort(maps:keys(ResponseMap))),
|
||||||
|
?assertMatch(#{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}, ResponseMap),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_publish_bad_topic({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_bad_topic({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
|
t_publish_bad_topic(_) ->
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = #{topic => <<"not/a+/valid/topic">>, payload => Payload},
|
||||||
|
?assertMatch(
|
||||||
|
{error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_publish_bad_base64({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_bad_base64({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
|
t_publish_bad_base64(_) ->
|
||||||
|
%% not a valid base64
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = #{
|
||||||
|
topic => <<"not/a+/valid/topic">>, payload => Payload, payload_encoding => <<"base64">>
|
||||||
|
},
|
||||||
|
?assertMatch(
|
||||||
|
{error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_publish_too_large({init, Config}) ->
|
||||||
|
MaxPacketSize = 100,
|
||||||
|
meck:new(emqx_config, [no_link, passthrough, no_history]),
|
||||||
|
meck:expect(emqx_config, get, fun
|
||||||
|
([mqtt, max_packet_size]) ->
|
||||||
|
MaxPacketSize;
|
||||||
|
(Other) ->
|
||||||
|
meck:passthrough(Other)
|
||||||
|
end),
|
||||||
|
[{max_packet_size, MaxPacketSize} | Config];
|
||||||
|
t_publish_too_large({'end', _Config}) ->
|
||||||
|
meck:unload(emqx_config),
|
||||||
|
ok;
|
||||||
|
t_publish_too_large(Config) ->
|
||||||
|
MaxPacketSize = proplists:get_value(max_packet_size, Config),
|
||||||
|
Payload = lists:duplicate(MaxPacketSize, $0),
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = #{topic => <<"random/topic">>, payload => Payload},
|
||||||
|
{error, {Summary, _Headers, ResponseBody}} =
|
||||||
|
emqx_mgmt_api_test_util:request_api(
|
||||||
|
post,
|
||||||
|
Path,
|
||||||
|
"",
|
||||||
|
Auth,
|
||||||
|
Body,
|
||||||
|
#{return_body => true}
|
||||||
|
),
|
||||||
|
?assertMatch({_, 400, _}, Summary),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"reason_code">> := ?RC_QUOTA_EXCEEDED,
|
||||||
|
<<"message">> := <<"packet_too_large">>
|
||||||
|
},
|
||||||
|
decode_json(ResponseBody)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_publish_bad_topic_bulk({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_bad_topic_bulk({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
|
t_publish_bad_topic_bulk(_Config) ->
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = [
|
||||||
|
#{topic => <<"not/a+/valid/topic">>, payload => Payload},
|
||||||
|
#{topic => <<"good/topic">>, payload => Payload}
|
||||||
|
],
|
||||||
|
?assertMatch(
|
||||||
|
{error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_publish_bulk_api({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_bulk_api({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
t_publish_bulk_api(_) ->
|
t_publish_bulk_api(_) ->
|
||||||
{ok, Client} = emqtt:start_link(#{
|
{ok, Client} = emqtt:start_link(#{
|
||||||
username => <<"api_username">>, clientid => <<"api_clientid">>
|
username => <<"api_username">>, clientid => <<"api_clientid">>
|
||||||
|
@ -63,19 +176,135 @@ t_publish_bulk_api(_) ->
|
||||||
Payload = <<"hello">>,
|
Payload = <<"hello">>,
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
||||||
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
Body = [#{topic => ?TOPIC1, payload => Payload}, #{topic => ?TOPIC2, payload => Payload}],
|
Body = [
|
||||||
|
#{
|
||||||
|
topic => ?TOPIC1,
|
||||||
|
payload => Payload,
|
||||||
|
payload_encoding => plain
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
topic => ?TOPIC2,
|
||||||
|
payload => base64:encode(Payload),
|
||||||
|
payload_encoding => base64
|
||||||
|
}
|
||||||
|
],
|
||||||
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
||||||
ResponseList = emqx_json:decode(Response, [return_maps]),
|
ResponseList = decode_json(Response),
|
||||||
?assertEqual(2, erlang:length(ResponseList)),
|
?assertEqual(2, erlang:length(ResponseList)),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(ResponseMap) ->
|
fun(ResponseMap) ->
|
||||||
?assertEqual([<<"id">>], maps:keys(ResponseMap))
|
?assertMatch(
|
||||||
|
[<<"id">>], lists:sort(maps:keys(ResponseMap))
|
||||||
|
)
|
||||||
end,
|
end,
|
||||||
ResponseList
|
ResponseList
|
||||||
),
|
),
|
||||||
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
||||||
?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
|
?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
|
||||||
emqtt:disconnect(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
|
t_publish_no_subscriber_bulk({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_no_subscriber_bulk({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
|
t_publish_no_subscriber_bulk(_) ->
|
||||||
|
{ok, Client} = emqtt:start_link(#{
|
||||||
|
username => <<"api_username">>, clientid => <<"api_clientid">>
|
||||||
|
}),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
|
||||||
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = [
|
||||||
|
#{topic => ?TOPIC1, payload => Payload},
|
||||||
|
#{topic => ?TOPIC2, payload => Payload},
|
||||||
|
#{topic => <<"no/subscrbier/topic">>, payload => Payload}
|
||||||
|
],
|
||||||
|
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
||||||
|
ResponseList = decode_json(Response),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{<<"id">> := _},
|
||||||
|
#{<<"id">> := _},
|
||||||
|
#{<<"message">> := <<"no_matching_subscribers">>}
|
||||||
|
],
|
||||||
|
ResponseList
|
||||||
|
),
|
||||||
|
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
||||||
|
?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
|
||||||
|
emqtt:stop(Client).
|
||||||
|
|
||||||
|
t_publish_bulk_dispatch_one_message_invalid_topic({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_bulk_dispatch_one_message_invalid_topic({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
|
t_publish_bulk_dispatch_one_message_invalid_topic(Config) when is_list(Config) ->
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = [
|
||||||
|
#{topic => ?TOPIC1, payload => Payload},
|
||||||
|
#{topic => ?TOPIC2, payload => Payload},
|
||||||
|
#{topic => <<"bad/#/topic">>, payload => Payload}
|
||||||
|
],
|
||||||
|
{error, {Summary, _Headers, ResponseBody}} =
|
||||||
|
emqx_mgmt_api_test_util:request_api(
|
||||||
|
post,
|
||||||
|
Path,
|
||||||
|
"",
|
||||||
|
Auth,
|
||||||
|
Body,
|
||||||
|
#{return_body => true}
|
||||||
|
),
|
||||||
|
?assertMatch({_, 400, _}, Summary),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"reason_code">> := ?RC_TOPIC_NAME_INVALID},
|
||||||
|
decode_json(ResponseBody)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_publish_bulk_dispatch_failure({init, Config}) ->
|
||||||
|
meck:new(emqx, [no_link, passthrough, no_history]),
|
||||||
|
meck:expect(emqx, is_running, fun() -> false end),
|
||||||
|
Config;
|
||||||
|
t_publish_bulk_dispatch_failure({'end', _Config}) ->
|
||||||
|
meck:unload(emqx),
|
||||||
|
ok;
|
||||||
|
t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
|
||||||
|
{ok, Client} = emqtt:start_link(#{
|
||||||
|
username => <<"api_username">>, clientid => <<"api_clientid">>
|
||||||
|
}),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
|
||||||
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = [
|
||||||
|
#{topic => ?TOPIC1, payload => Payload},
|
||||||
|
#{topic => ?TOPIC2, payload => Payload},
|
||||||
|
#{topic => <<"no/subscrbier/topic">>, payload => Payload}
|
||||||
|
],
|
||||||
|
{error, {Summary, _Headers, ResponseBody}} =
|
||||||
|
emqx_mgmt_api_test_util:request_api(
|
||||||
|
post,
|
||||||
|
Path,
|
||||||
|
"",
|
||||||
|
Auth,
|
||||||
|
Body,
|
||||||
|
#{return_body => true}
|
||||||
|
),
|
||||||
|
?assertMatch({_, 503, _}, Summary),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{<<"reason_code">> := ?RC_IMPLEMENTATION_SPECIFIC_ERROR},
|
||||||
|
#{<<"reason_code">> := ?RC_IMPLEMENTATION_SPECIFIC_ERROR},
|
||||||
|
#{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}
|
||||||
|
],
|
||||||
|
decode_json(ResponseBody)
|
||||||
|
),
|
||||||
|
emqtt:stop(Client).
|
||||||
|
|
||||||
receive_assert(Topic, Qos, Payload) ->
|
receive_assert(Topic, Qos, Payload) ->
|
||||||
receive
|
receive
|
||||||
|
@ -90,3 +319,6 @@ receive_assert(Topic, Qos, Payload) ->
|
||||||
after 5000 ->
|
after 5000 ->
|
||||||
timeout
|
timeout
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
decode_json(In) ->
|
||||||
|
emqx_json:decode(In, [return_maps]).
|
||||||
|
|
|
@ -65,8 +65,11 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, []) when
|
||||||
"" -> Url;
|
"" -> Url;
|
||||||
_ -> Url ++ "?" ++ QueryParams
|
_ -> Url ++ "?" ++ QueryParams
|
||||||
end,
|
end,
|
||||||
do_request_api(Method, {NewUrl, build_http_header(AuthOrHeaders)});
|
do_request_api(Method, {NewUrl, build_http_header(AuthOrHeaders)}, #{});
|
||||||
request_api(Method, Url, QueryParams, AuthOrHeaders, Body) when
|
request_api(Method, Url, QueryParams, AuthOrHeaders, Body) ->
|
||||||
|
request_api(Method, Url, QueryParams, AuthOrHeaders, Body, #{}).
|
||||||
|
|
||||||
|
request_api(Method, Url, QueryParams, AuthOrHeaders, Body, Opts) when
|
||||||
(Method =:= post) orelse
|
(Method =:= post) orelse
|
||||||
(Method =:= patch) orelse
|
(Method =:= patch) orelse
|
||||||
(Method =:= put) orelse
|
(Method =:= put) orelse
|
||||||
|
@ -79,10 +82,12 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, Body) when
|
||||||
end,
|
end,
|
||||||
do_request_api(
|
do_request_api(
|
||||||
Method,
|
Method,
|
||||||
{NewUrl, build_http_header(AuthOrHeaders), "application/json", emqx_json:encode(Body)}
|
{NewUrl, build_http_header(AuthOrHeaders), "application/json", emqx_json:encode(Body)},
|
||||||
|
Opts
|
||||||
).
|
).
|
||||||
|
|
||||||
do_request_api(Method, Request) ->
|
do_request_api(Method, Request, Opts) ->
|
||||||
|
ReturnBody = maps:get(return_body, Opts, false),
|
||||||
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
|
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
|
||||||
case httpc:request(Method, Request, [], []) of
|
case httpc:request(Method, Request, [], []) of
|
||||||
{error, socket_closed_remotely} ->
|
{error, socket_closed_remotely} ->
|
||||||
|
@ -91,8 +96,9 @@ do_request_api(Method, Request) ->
|
||||||
Code >= 200 andalso Code =< 299
|
Code >= 200 andalso Code =< 299
|
||||||
->
|
->
|
||||||
{ok, Return};
|
{ok, Return};
|
||||||
{ok, {Reason, _, _} = Error} ->
|
{ok, {Reason, Headers, Body}} when ReturnBody ->
|
||||||
ct:pal("error: ~p~n", [Error]),
|
{error, {Reason, Headers, Body}};
|
||||||
|
{ok, {Reason, _Headers, _Body}} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue