feat(api/publish): return detailed publish results
Prior to this change, the publish API returns 200 in most of the cases. This change provides more insights to the publish result. For single message publish endpoint (`publish/`): HTTP error codes are: 200: Everything is OK 202: No subscriber for the topic 400: When mesage is invalid message. e.g. bad topic name or QoS out of range. 503: Failed to dispatch the message. e.g. during EMQX restart. The response body is a JSON object with two fields `message_id`, and `publish_result`. The `message_id` is a globally unique ID for tracing. `publish_result` is `"OK"` when the message is delivered to at least one subscriber. Otherwise `"no_subscriber"`. `publish_result` may also be some other informative message to hint the failure result, the content of which may change in the future. For `publish/bulk` endpoint: 200: When all message in the bulk are published OK 202: If at least one message in the bulk had `"no_subscriber"` result 400: When mesage is invalid message. e.g. bad topic name or QoS out of range. 503: When there is at least one message failed at dispatch. The reply body is a list of JSON objects having the same layout as for hte `publish` endpoint.
This commit is contained in:
parent
27de9998a8
commit
d1332b72e7
|
@ -74,7 +74,8 @@
|
||||||
to_map/1,
|
to_map/1,
|
||||||
to_log_map/1,
|
to_log_map/1,
|
||||||
to_list/1,
|
to_list/1,
|
||||||
from_map/1
|
from_map/1,
|
||||||
|
estimate_size/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([message_map/0]).
|
-export_type([message_map/0]).
|
||||||
|
@ -175,6 +176,18 @@ make(MsgId, From, QoS, Topic, Payload, Flags, Headers) when
|
||||||
timestamp = Now
|
timestamp = Now
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
%% optimistic esitmation of a message size after serialization
|
||||||
|
%% not including MQTT v5 message headers/user properties etc.
|
||||||
|
-spec estimate_size(emqx_types:message()) -> non_neg_integer().
|
||||||
|
estimate_size(#message{topic = Topic, payload = Payload}) ->
|
||||||
|
FixedHeaderSize = 1,
|
||||||
|
VarLenSize = 4,
|
||||||
|
TopicSize = iolist_size(Topic),
|
||||||
|
PayloadSize = iolist_size(Payload),
|
||||||
|
PacketIdSize = 2,
|
||||||
|
TopicLengthSize = 2,
|
||||||
|
FixedHeaderSize + VarLenSize + TopicLengthSize + TopicSize + PacketIdSize + PayloadSize.
|
||||||
|
|
||||||
-spec id(emqx_types:message()) -> maybe(binary()).
|
-spec id(emqx_types:message()) -> maybe(binary()).
|
||||||
id(#message{id = Id}) -> Id.
|
id(#message{id = Id}) -> Id.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,127 @@
|
||||||
|
|
||||||
|
emqx_mgmt_api_publish {
|
||||||
|
publish_api {
|
||||||
|
desc {
|
||||||
|
en: """
|
||||||
|
Publish one message.<br/>
|
||||||
|
Possible HTTP status response codes are:<br/>
|
||||||
|
<code>200</code>: The message is delivered to at least one subscriber;<br/>
|
||||||
|
<code>202</code>: No matched subscribers;<br/>
|
||||||
|
<code>400</code>: Message is invalid. for example bad topic name, or QoS is out of range;<br/>
|
||||||
|
<code>503</code>: Failed to deliver the message to subscriber(s);<br/>
|
||||||
|
"""
|
||||||
|
zh: """
|
||||||
|
发布一个消息。<br/>
|
||||||
|
可能的 HTTP 状态码如下:<br/>
|
||||||
|
200: 消息被成功发送到至少一个订阅。<br/>
|
||||||
|
202: 没有匹配到任何订阅。<br/>
|
||||||
|
400: 消息编码错误,如非法主题,或 QoS 超出范围等。<br/>
|
||||||
|
503: 服务重启等过程中导致转发失败。<br/><br/>
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
publish_bulk_api {
|
||||||
|
desc {
|
||||||
|
en: """
|
||||||
|
Publish a batch of messages.<br/>
|
||||||
|
Possible HTTP response status code are:<br/>
|
||||||
|
200: All messages are delivered to at least one subscriber;<br/>
|
||||||
|
202: At least one message was not delivered to any subscriber;<br/>
|
||||||
|
400: At least one message is invalid. For example bad topic name, or QoS is out of range;<br/>
|
||||||
|
503: Failed to deliver at least one of the messages;<br/>
|
||||||
|
|
||||||
|
In case there is at lest one invalid message in the batch, the HTTP response body
|
||||||
|
is the same as for <code>/publish</code> API.<br/>
|
||||||
|
Otherwise the HTTP response body is an array of JSON objects indicating the publish
|
||||||
|
result of each individual message in the batch.
|
||||||
|
"""
|
||||||
|
zh: """
|
||||||
|
批量发布一组消息。<br/>
|
||||||
|
可能的 HTTP 状态码如下:<br/>
|
||||||
|
200: 所有的消息都被成功发送到至少一个订阅。<br/>
|
||||||
|
202: 至少有一个消息没有匹配到任何订阅。<br/>
|
||||||
|
400: 至少有一个消息编码错误,如非法主题,或 QoS 超出范围等。<br/>
|
||||||
|
503: 至少有一个小因为服务重启的原因导致转发失败。<br/>
|
||||||
|
|
||||||
|
请求的 Body 或者 Body 中包含的某个消息无法通过 API 规范的类型检查时,HTTP 响应的消息与发布单个消息的 API
|
||||||
|
<code>/publish</code> 是一样的。
|
||||||
|
如果所有的消息都是合法的,那么 HTTP 返回的内容是一个 JSON 数组,每个元素代表了该消息转发的状态。
|
||||||
|
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
topic_name {
|
||||||
|
desc {
|
||||||
|
en: "Topic Name"
|
||||||
|
zh: "主题名称"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
qos {
|
||||||
|
desc {
|
||||||
|
en: "MQTT message QoS"
|
||||||
|
zh: "MQTT 消息的 QoS"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
clientid {
|
||||||
|
desc {
|
||||||
|
en: "Each message can be published as if it is done on behalf of an MQTT client whos ID can be specified in this field."
|
||||||
|
zh: "每个消息都可以带上一个 MQTT 客户端 ID,用于模拟 MQTT 客户端的发布行为。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
payload {
|
||||||
|
desc {
|
||||||
|
en: "The MQTT message payload."
|
||||||
|
zh: "MQTT 消息体。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
retain {
|
||||||
|
desc {
|
||||||
|
en: "A boolean field to indicate if this message should be retained."
|
||||||
|
zh: "布尔型字段,用于表示该消息是否保留消息。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
payload_encoding {
|
||||||
|
desc {
|
||||||
|
en: "MQTT Payload Encoding, <code>base64</code> or <code>plain</code>. When set to <code>base64</code>, the message is decoded before it is published."
|
||||||
|
zh: "MQTT 消息体的编码方式,可以是 <code>base64</code> 或 <code>plain</code>。当设置为 <code>base64</code> 时,消息在发布前会先被解码。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
message_id {
|
||||||
|
desc {
|
||||||
|
en: "A globally unique message ID for correlation/tracing."
|
||||||
|
zh: "全局唯一的一个消息 ID,方便用于关联和追踪。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reason_code {
|
||||||
|
desc {
|
||||||
|
en: """
|
||||||
|
The MQTT reason code, as the same ones used in PUBACK packet.<br/>
|
||||||
|
Currently supported codes are:<br/>
|
||||||
|
|
||||||
|
16(0x10): No matching subscribers;<br/>
|
||||||
|
131(0x81): Error happened when dispatching the message. e.g. during EMQX restart;<br/>
|
||||||
|
144(0x90): Topic name invalid;<br/>
|
||||||
|
151(0x97): Publish rate limited, or message size exceeded limit. The global size limit can be configured with <code>mqtt.max_packet_size</code><br/>
|
||||||
|
NOTE: The message size is estimated with the received topic and payload size, meaning the actual size of serialized bytes (when sent to MQTT subscriber)
|
||||||
|
might be slightly over the limit.
|
||||||
|
"""
|
||||||
|
zh: """
|
||||||
|
MQTT 消息发布的错误码,这些错误码也是 MQTT 规范中 PUBACK 消息可能携带的错误码。<br/>
|
||||||
|
当前支持如下错误码:<br/>
|
||||||
|
|
||||||
|
16(0x10):没能匹配到任何订阅;<br/>
|
||||||
|
131(0x81):消息转发时发生错误,例如 EMQX 服务重启;<br/>
|
||||||
|
144(0x90):主题名称非法;<br/>
|
||||||
|
151(0x97):受到了速率限制,或者消息尺寸过大。全局消息大小限制可以通过配置项 <code>mqtt.max_packet_size</code> 来进行修改。<br/>
|
||||||
|
注意:消息尺寸的是通过主题和消息体的字节数进行估算的。具体发布时所占用的字节数可能会稍大于这个估算的值。
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
error_message {
|
||||||
|
desc {
|
||||||
|
en: "Describes the failure reason in detail."
|
||||||
|
zh: "失败的详细原因。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,7 +16,15 @@
|
||||||
-module(emqx_mgmt_api_publish).
|
-module(emqx_mgmt_api_publish).
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
|
|
||||||
|
-define(ALL_IS_WELL, 200).
|
||||||
|
-define(PARTIALLY_OK, 202).
|
||||||
|
-define(BAD_REQUEST, 400).
|
||||||
|
-define(DISPATCH_ERROR, 503).
|
||||||
|
|
||||||
-behaviour(minirest_api).
|
-behaviour(minirest_api).
|
||||||
|
|
||||||
|
@ -42,11 +50,14 @@ schema("/publish") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => publish,
|
'operationId' => publish,
|
||||||
post => #{
|
post => #{
|
||||||
description => <<"Publish Message">>,
|
description => ?DESC(publish_api),
|
||||||
tags => [<<"Publish">>],
|
tags => [<<"Publish">>],
|
||||||
'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, publish_message)),
|
'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, publish_message)),
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => hoconsc:mk(hoconsc:ref(?MODULE, publish_message_info))
|
?ALL_IS_WELL => hoconsc:mk(hoconsc:ref(?MODULE, publish_ok)),
|
||||||
|
?PARTIALLY_OK => hoconsc:mk(hoconsc:ref(?MODULE, publish_error)),
|
||||||
|
?BAD_REQUEST => bad_request_schema(),
|
||||||
|
?DISPATCH_ERROR => hoconsc:mk(hoconsc:ref(?MODULE, publish_error))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -54,44 +65,58 @@ schema("/publish/bulk") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => publish_batch,
|
'operationId' => publish_batch,
|
||||||
post => #{
|
post => #{
|
||||||
description => <<"Publish Messages">>,
|
description => ?DESC(publish_bulk_api),
|
||||||
tags => [<<"Publish">>],
|
tags => [<<"Publish">>],
|
||||||
'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_message)), #{}),
|
'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_message)), #{}),
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_message_info)), #{})
|
?ALL_IS_WELL => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_ok)), #{}),
|
||||||
|
?PARTIALLY_OK => hoconsc:mk(
|
||||||
|
hoconsc:array(hoconsc:ref(?MODULE, publish_error)), #{}
|
||||||
|
),
|
||||||
|
?BAD_REQUEST => bad_request_schema(),
|
||||||
|
?DISPATCH_ERROR => hoconsc:mk(
|
||||||
|
hoconsc:array(hoconsc:ref(?MODULE, publish_error)), #{}
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
bad_request_schema() ->
|
||||||
|
Union = hoconsc:union([
|
||||||
|
hoconsc:ref(?MODULE, bad_request),
|
||||||
|
hoconsc:array(hoconsc:ref(?MODULE, publish_error))
|
||||||
|
]),
|
||||||
|
hoconsc:mk(Union, #{}).
|
||||||
|
|
||||||
fields(message) ->
|
fields(message) ->
|
||||||
[
|
[
|
||||||
{topic,
|
{topic,
|
||||||
hoconsc:mk(binary(), #{
|
hoconsc:mk(binary(), #{
|
||||||
desc => <<"Topic Name">>,
|
desc => ?DESC(topic_name),
|
||||||
required => true,
|
required => true,
|
||||||
example => <<"api/example/topic">>
|
example => <<"api/example/topic">>
|
||||||
})},
|
})},
|
||||||
{qos,
|
{qos,
|
||||||
hoconsc:mk(emqx_schema:qos(), #{
|
hoconsc:mk(emqx_schema:qos(), #{
|
||||||
desc => <<"MQTT QoS">>,
|
desc => ?DESC(qos),
|
||||||
required => false,
|
required => false,
|
||||||
default => 0
|
default => 0
|
||||||
})},
|
})},
|
||||||
{clientid,
|
{clientid,
|
||||||
hoconsc:mk(binary(), #{
|
hoconsc:mk(binary(), #{
|
||||||
desc => <<"From client ID">>,
|
desc => ?DESC(clientid),
|
||||||
required => false,
|
required => false,
|
||||||
example => <<"api_example_client">>
|
example => <<"api_example_client">>
|
||||||
})},
|
})},
|
||||||
{payload,
|
{payload,
|
||||||
hoconsc:mk(binary(), #{
|
hoconsc:mk(binary(), #{
|
||||||
desc => <<"MQTT Payload">>,
|
desc => ?DESC(payload),
|
||||||
required => true,
|
required => true,
|
||||||
example => <<"hello emqx api">>
|
example => <<"hello emqx api">>
|
||||||
})},
|
})},
|
||||||
{retain,
|
{retain,
|
||||||
hoconsc:mk(boolean(), #{
|
hoconsc:mk(boolean(), #{
|
||||||
desc => <<"MQTT Retain Message">>,
|
desc => ?DESC(retain),
|
||||||
required => false,
|
required => false,
|
||||||
default => false
|
default => false
|
||||||
})}
|
})}
|
||||||
|
@ -100,53 +125,196 @@ fields(publish_message) ->
|
||||||
[
|
[
|
||||||
{payload_encoding,
|
{payload_encoding,
|
||||||
hoconsc:mk(hoconsc:enum([plain, base64]), #{
|
hoconsc:mk(hoconsc:enum([plain, base64]), #{
|
||||||
desc => <<"MQTT Payload Encoding, base64 or plain">>,
|
desc => ?DESC(payload_encoding),
|
||||||
required => false,
|
required => false,
|
||||||
default => plain
|
default => plain
|
||||||
})}
|
})}
|
||||||
] ++ fields(message);
|
] ++ fields(message);
|
||||||
fields(publish_message_info) ->
|
fields(publish_ok) ->
|
||||||
[
|
[
|
||||||
{id,
|
{id,
|
||||||
hoconsc:mk(binary(), #{
|
hoconsc:mk(binary(), #{
|
||||||
desc => <<"A globally unique message ID">>
|
desc => ?DESC(message_id)
|
||||||
|
})}
|
||||||
|
];
|
||||||
|
fields(publish_error) ->
|
||||||
|
[
|
||||||
|
{reason_code,
|
||||||
|
hoconsc:mk(integer(), #{
|
||||||
|
desc => ?DESC(reason_code),
|
||||||
|
example => 16
|
||||||
|
})},
|
||||||
|
{message,
|
||||||
|
hoconsc:mk(binary(), #{
|
||||||
|
desc => ?DESC(error_message),
|
||||||
|
example => <<"no_matching_subscribers">>
|
||||||
|
})}
|
||||||
|
];
|
||||||
|
fields(bad_request) ->
|
||||||
|
[
|
||||||
|
{code,
|
||||||
|
hoconsc:mk(string(), #{
|
||||||
|
desc => <<"BAD_REQUEST">>
|
||||||
|
})},
|
||||||
|
{message,
|
||||||
|
hoconsc:mk(binary(), #{
|
||||||
|
desc => ?DESC(error_message)
|
||||||
})}
|
})}
|
||||||
].
|
].
|
||||||
|
|
||||||
publish(post, #{body := Body}) ->
|
publish(post, #{body := Body}) ->
|
||||||
case message(Body) of
|
case message(Body) of
|
||||||
{ok, Message} ->
|
{ok, Message} ->
|
||||||
_ = emqx_mgmt:publish(Message),
|
Res = emqx_mgmt:publish(Message),
|
||||||
{200, format_message_response(Message)};
|
publish_result_to_http_reply(Message, Res);
|
||||||
{error, R} ->
|
{error, Reason} ->
|
||||||
{400, 'BAD_REQUEST', to_binary(R)}
|
{?BAD_REQUEST, make_bad_req_reply(Reason)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
publish_batch(post, #{body := Body}) ->
|
publish_batch(post, #{body := Body}) ->
|
||||||
case messages(Body) of
|
case messages(Body) of
|
||||||
{ok, Messages} ->
|
{ok, Messages} ->
|
||||||
_ = [emqx_mgmt:publish(Message) || Message <- Messages],
|
ResList = lists:map(
|
||||||
{200, format_message_response(Messages)};
|
fun(Message) ->
|
||||||
{error, R} ->
|
Res = emqx_mgmt:publish(Message),
|
||||||
{400, 'BAD_REQUEST', to_binary(R)}
|
publish_result_to_http_reply(Message, Res)
|
||||||
|
end,
|
||||||
|
Messages
|
||||||
|
),
|
||||||
|
publish_results_to_http_reply(ResList);
|
||||||
|
{error, Reason} ->
|
||||||
|
{?BAD_REQUEST, make_bad_req_reply(Reason)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
make_bad_req_reply(invalid_topic_name) ->
|
||||||
|
make_publish_error_response(?RC_TOPIC_NAME_INVALID);
|
||||||
|
make_bad_req_reply(packet_too_large) ->
|
||||||
|
%% 0x95 RC_PACKET_TOO_LARGE is not a PUBACK reason code
|
||||||
|
%% This is why we use RC_QUOTA_EXCEEDED instead
|
||||||
|
make_publish_error_response(?RC_QUOTA_EXCEEDED, packet_too_large);
|
||||||
|
make_bad_req_reply(Reason) ->
|
||||||
|
make_publish_error_response(?RC_IMPLEMENTATION_SPECIFIC_ERROR, to_binary(Reason)).
|
||||||
|
|
||||||
|
-spec is_ok_deliver({_NodeOrShare, _MatchedTopic, emqx_types:deliver_result()}) -> boolean().
|
||||||
|
is_ok_deliver({_NodeOrShare, _MatchedTopic, ok}) -> true;
|
||||||
|
is_ok_deliver({_NodeOrShare, _MatchedTopic, {ok, _}}) -> true;
|
||||||
|
is_ok_deliver({_NodeOrShare, _MatchedTopic, {error, _}}) -> false.
|
||||||
|
|
||||||
|
%% @hidden Map MQTT publish result reason code to HTTP status code.
|
||||||
|
%% MQTT reason code | Description | HTTP status code
|
||||||
|
%% 0 Success 200
|
||||||
|
%% 16 No matching subscribers 202
|
||||||
|
%% 128 Unspecified error 406
|
||||||
|
%% 131 Implementation specific error 406
|
||||||
|
%% 144 Topic Name invalid 400
|
||||||
|
%% 151 Quota exceeded 400
|
||||||
|
%%
|
||||||
|
%% %%%%%% Below error codes are not implemented so far %%%%
|
||||||
|
%%
|
||||||
|
%% If HTTP request passes HTTP authentication, it is considered trusted.
|
||||||
|
%% In the future, we may choose to check ACL for the provided MQTT Client ID
|
||||||
|
%% 135 Not authorized 401
|
||||||
|
%%
|
||||||
|
%% %%%%%% Below error codes are not applicable %%%%%%%
|
||||||
|
%%
|
||||||
|
%% No user specified packet ID, so there should be no packet ID error
|
||||||
|
%% 145 Packet identifier is in use 400
|
||||||
|
%%
|
||||||
|
%% No preceding payload format indicator to compare against.
|
||||||
|
%% Content-Type check should be done at HTTP layer but not here.
|
||||||
|
%% 153 Payload format invalid 400
|
||||||
|
publish_result_to_http_reply(_Message, []) ->
|
||||||
|
%% matched no subscriber
|
||||||
|
{?PARTIALLY_OK, make_publish_error_response(?RC_NO_MATCHING_SUBSCRIBERS)};
|
||||||
|
publish_result_to_http_reply(Message, PublishResult) ->
|
||||||
|
case lists:any(fun is_ok_deliver/1, PublishResult) of
|
||||||
|
true ->
|
||||||
|
%% delivered to at least one subscriber
|
||||||
|
OkBody = make_publish_response(Message),
|
||||||
|
{?ALL_IS_WELL, OkBody};
|
||||||
|
false ->
|
||||||
|
%% this is quite unusual, matched, but failed to deliver
|
||||||
|
%% if this happens, the publish result log can be helpful
|
||||||
|
%% to idnetify the reason why publish failed
|
||||||
|
%% e.g. during emqx restart
|
||||||
|
ReasonString = <<"failed_to_dispatch">>,
|
||||||
|
ErrorBody = make_publish_error_response(
|
||||||
|
?RC_IMPLEMENTATION_SPECIFIC_ERROR, ReasonString
|
||||||
|
),
|
||||||
|
?SLOG(warning, #{
|
||||||
|
msg => ReasonString,
|
||||||
|
message_id => emqx_message:id(Message),
|
||||||
|
results => PublishResult
|
||||||
|
}),
|
||||||
|
{?DISPATCH_ERROR, ErrorBody}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @hidden Reply batch publish result.
|
||||||
|
%% 200 if all published OK.
|
||||||
|
%% 202 if at least one message matched no subscribers.
|
||||||
|
%% 503 for temp errors duing EMQX restart
|
||||||
|
publish_results_to_http_reply([_ | _] = ResList) ->
|
||||||
|
{Codes0, BodyL} = lists:unzip(ResList),
|
||||||
|
Codes = lists:usort(Codes0),
|
||||||
|
HasFailure = lists:member(?DISPATCH_ERROR, Codes),
|
||||||
|
All200 = (Codes =:= [?ALL_IS_WELL]),
|
||||||
|
Code =
|
||||||
|
case All200 of
|
||||||
|
true ->
|
||||||
|
%% All OK
|
||||||
|
?ALL_IS_WELL;
|
||||||
|
false when not HasFailure ->
|
||||||
|
%% Partially OK
|
||||||
|
?PARTIALLY_OK;
|
||||||
|
false ->
|
||||||
|
%% At least one failed
|
||||||
|
?DISPATCH_ERROR
|
||||||
|
end,
|
||||||
|
{Code, BodyL}.
|
||||||
|
|
||||||
message(Map) ->
|
message(Map) ->
|
||||||
|
try
|
||||||
|
make_message(Map)
|
||||||
|
catch
|
||||||
|
throw:Reason ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
make_message(Map) ->
|
||||||
Encoding = maps:get(<<"payload_encoding">>, Map, plain),
|
Encoding = maps:get(<<"payload_encoding">>, Map, plain),
|
||||||
case encode_payload(Encoding, maps:get(<<"payload">>, Map)) of
|
case decode_payload(Encoding, maps:get(<<"payload">>, Map)) of
|
||||||
{ok, Payload} ->
|
{ok, Payload} ->
|
||||||
From = maps:get(<<"clientid">>, Map, http_api),
|
From = maps:get(<<"clientid">>, Map, http_api),
|
||||||
QoS = maps:get(<<"qos">>, Map, 0),
|
QoS = maps:get(<<"qos">>, Map, 0),
|
||||||
Topic = maps:get(<<"topic">>, Map),
|
Topic = maps:get(<<"topic">>, Map),
|
||||||
Retain = maps:get(<<"retain">>, Map, false),
|
Retain = maps:get(<<"retain">>, Map, false),
|
||||||
{ok, emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{})};
|
try
|
||||||
|
_ = emqx_topic:validate(name, Topic)
|
||||||
|
catch
|
||||||
|
error:_Reason ->
|
||||||
|
throw(invalid_topic_name)
|
||||||
|
end,
|
||||||
|
Message = emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}),
|
||||||
|
Size = emqx_message:estimate_size(Message),
|
||||||
|
(Size > size_limit()) andalso throw(packet_too_large),
|
||||||
|
{ok, Message};
|
||||||
{error, R} ->
|
{error, R} ->
|
||||||
{error, R}
|
{error, R}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
encode_payload(plain, Payload) ->
|
%% get the global packet size limit since HTTP API does not belong to any zone.
|
||||||
|
size_limit() ->
|
||||||
|
try
|
||||||
|
emqx_config:get([mqtt, max_packet_size])
|
||||||
|
catch
|
||||||
|
_:_ ->
|
||||||
|
%% leave 1000 bytes for topic name etc.
|
||||||
|
?MAX_PACKET_SIZE
|
||||||
|
end.
|
||||||
|
|
||||||
|
decode_payload(plain, Payload) ->
|
||||||
{ok, Payload};
|
{ok, Payload};
|
||||||
encode_payload(base64, Payload) ->
|
decode_payload(base64, Payload) ->
|
||||||
try
|
try
|
||||||
{ok, base64:decode(Payload)}
|
{ok, base64:decode(Payload)}
|
||||||
catch
|
catch
|
||||||
|
@ -154,6 +322,8 @@ encode_payload(base64, Payload) ->
|
||||||
{error, {decode_base64_payload_failed, Payload}}
|
{error, {decode_base64_payload_failed, Payload}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
messages([]) ->
|
||||||
|
{errror, <<"empty_batch">>};
|
||||||
messages(List) ->
|
messages(List) ->
|
||||||
messages(List, []).
|
messages(List, []).
|
||||||
|
|
||||||
|
@ -167,10 +337,23 @@ messages([MessageMap | List], Res) ->
|
||||||
{error, R}
|
{error, R}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
format_message_response(Messages) when is_list(Messages) ->
|
make_publish_response(#message{id = ID}) ->
|
||||||
[format_message_response(Message) || Message <- Messages];
|
#{
|
||||||
format_message_response(#message{id = ID}) ->
|
id => emqx_guid:to_hexstr(ID)
|
||||||
#{id => emqx_guid:to_hexstr(ID)}.
|
}.
|
||||||
|
|
||||||
|
make_publish_error_response(ReasonCode) ->
|
||||||
|
make_publish_error_response(ReasonCode, emqx_reason_codes:name(ReasonCode)).
|
||||||
|
|
||||||
|
make_publish_error_response(ReasonCode, Msg) ->
|
||||||
|
#{
|
||||||
|
reason_code => ReasonCode,
|
||||||
|
message => to_binary(Msg)
|
||||||
|
}.
|
||||||
|
|
||||||
|
to_binary(Atom) when is_atom(Atom) ->
|
||||||
|
atom_to_binary(Atom);
|
||||||
|
to_binary(Msg) when is_binary(Msg) ->
|
||||||
|
Msg;
|
||||||
to_binary(Term) ->
|
to_binary(Term) ->
|
||||||
list_to_binary(io_lib:format("~p", [Term])).
|
list_to_binary(io_lib:format("~0p", [Term])).
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
|
||||||
-define(CLIENTID, <<"api_clientid">>).
|
-define(CLIENTID, <<"api_clientid">>).
|
||||||
-define(USERNAME, <<"api_username">>).
|
-define(USERNAME, <<"api_username">>).
|
||||||
|
@ -36,6 +37,16 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_) ->
|
end_per_suite(_) ->
|
||||||
emqx_mgmt_api_test_util:end_suite().
|
emqx_mgmt_api_test_util:end_suite().
|
||||||
|
|
||||||
|
init_per_testcase(Case, Config) ->
|
||||||
|
?MODULE:Case({init, Config}).
|
||||||
|
|
||||||
|
end_per_testcase(Case, Config) ->
|
||||||
|
?MODULE:Case({'end', Config}).
|
||||||
|
|
||||||
|
t_publish_api({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_api({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
t_publish_api(_) ->
|
t_publish_api(_) ->
|
||||||
{ok, Client} = emqtt:start_link(#{
|
{ok, Client} = emqtt:start_link(#{
|
||||||
username => <<"api_username">>, clientid => <<"api_clientid">>
|
username => <<"api_username">>, clientid => <<"api_clientid">>
|
||||||
|
@ -48,11 +59,113 @@ t_publish_api(_) ->
|
||||||
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
Body = #{topic => ?TOPIC1, payload => Payload},
|
Body = #{topic => ?TOPIC1, payload => Payload},
|
||||||
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
||||||
ResponseMap = emqx_json:decode(Response, [return_maps]),
|
ResponseMap = decode_json(Response),
|
||||||
?assertEqual([<<"id">>], maps:keys(ResponseMap)),
|
?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap))),
|
||||||
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
||||||
emqtt:disconnect(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
|
t_publish_no_subscriber({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_no_subscriber({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
|
t_publish_no_subscriber(_) ->
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = #{topic => ?TOPIC1, payload => Payload},
|
||||||
|
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
||||||
|
ResponseMap = decode_json(Response),
|
||||||
|
?assertEqual([<<"message">>, <<"reason_code">>], lists:sort(maps:keys(ResponseMap))),
|
||||||
|
?assertMatch(#{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}, ResponseMap),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_publish_bad_topic({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_bad_topic({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
|
t_publish_bad_topic(_) ->
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = #{topic => <<"not/a+/valid/topic">>, payload => Payload},
|
||||||
|
?assertMatch(
|
||||||
|
{error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_publish_bad_base64({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_bad_base64({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
|
t_publish_bad_base64(_) ->
|
||||||
|
%% not a valid base64
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = #{
|
||||||
|
topic => <<"not/a+/valid/topic">>, payload => Payload, payload_encoding => <<"base64">>
|
||||||
|
},
|
||||||
|
?assertMatch(
|
||||||
|
{error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_publish_too_large({init, Config}) ->
|
||||||
|
MaxPacketSize = 100,
|
||||||
|
meck:new(emqx_config, [no_link, passthrough, no_history]),
|
||||||
|
meck:expect(emqx_config, get, fun
|
||||||
|
([mqtt, max_packet_size]) ->
|
||||||
|
MaxPacketSize;
|
||||||
|
(Other) ->
|
||||||
|
meck:passthrough(Other)
|
||||||
|
end),
|
||||||
|
[{max_packet_size, MaxPacketSize} | Config];
|
||||||
|
t_publish_too_large({'end', _Config}) ->
|
||||||
|
meck:unload(emqx_config),
|
||||||
|
ok;
|
||||||
|
t_publish_too_large(Config) ->
|
||||||
|
MaxPacketSize = proplists:get_value(max_packet_size, Config),
|
||||||
|
Payload = lists:duplicate(MaxPacketSize, $0),
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = #{topic => <<"random/topic">>, payload => Payload},
|
||||||
|
{error, {Summary, _Headers, ResponseBody}} =
|
||||||
|
emqx_mgmt_api_test_util:request_api(
|
||||||
|
post,
|
||||||
|
Path,
|
||||||
|
"",
|
||||||
|
Auth,
|
||||||
|
Body,
|
||||||
|
#{return_body => true}
|
||||||
|
),
|
||||||
|
?assertMatch({_, 400, _}, Summary),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"reason_code">> := ?RC_QUOTA_EXCEEDED,
|
||||||
|
<<"message">> := <<"packet_too_large">>
|
||||||
|
},
|
||||||
|
decode_json(ResponseBody)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_publish_bad_topic_bulk({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_bad_topic_bulk({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
|
t_publish_bad_topic_bulk(_Config) ->
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = [
|
||||||
|
#{topic => <<"not/a+/valid/topic">>, payload => Payload},
|
||||||
|
#{topic => <<"good/topic">>, payload => Payload}
|
||||||
|
],
|
||||||
|
?assertMatch(
|
||||||
|
{error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_publish_bulk_api({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_bulk_api({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
t_publish_bulk_api(_) ->
|
t_publish_bulk_api(_) ->
|
||||||
{ok, Client} = emqtt:start_link(#{
|
{ok, Client} = emqtt:start_link(#{
|
||||||
username => <<"api_username">>, clientid => <<"api_clientid">>
|
username => <<"api_username">>, clientid => <<"api_clientid">>
|
||||||
|
@ -63,19 +176,135 @@ t_publish_bulk_api(_) ->
|
||||||
Payload = <<"hello">>,
|
Payload = <<"hello">>,
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
||||||
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
Body = [#{topic => ?TOPIC1, payload => Payload}, #{topic => ?TOPIC2, payload => Payload}],
|
Body = [
|
||||||
|
#{
|
||||||
|
topic => ?TOPIC1,
|
||||||
|
payload => Payload,
|
||||||
|
payload_encoding => plain
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
topic => ?TOPIC2,
|
||||||
|
payload => base64:encode(Payload),
|
||||||
|
payload_encoding => base64
|
||||||
|
}
|
||||||
|
],
|
||||||
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
||||||
ResponseList = emqx_json:decode(Response, [return_maps]),
|
ResponseList = decode_json(Response),
|
||||||
?assertEqual(2, erlang:length(ResponseList)),
|
?assertEqual(2, erlang:length(ResponseList)),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(ResponseMap) ->
|
fun(ResponseMap) ->
|
||||||
?assertEqual([<<"id">>], maps:keys(ResponseMap))
|
?assertMatch(
|
||||||
|
[<<"id">>], lists:sort(maps:keys(ResponseMap))
|
||||||
|
)
|
||||||
end,
|
end,
|
||||||
ResponseList
|
ResponseList
|
||||||
),
|
),
|
||||||
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
||||||
?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
|
?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
|
||||||
emqtt:disconnect(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
|
t_publish_no_subscriber_bulk({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_no_subscriber_bulk({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
|
t_publish_no_subscriber_bulk(_) ->
|
||||||
|
{ok, Client} = emqtt:start_link(#{
|
||||||
|
username => <<"api_username">>, clientid => <<"api_clientid">>
|
||||||
|
}),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
|
||||||
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = [
|
||||||
|
#{topic => ?TOPIC1, payload => Payload},
|
||||||
|
#{topic => ?TOPIC2, payload => Payload},
|
||||||
|
#{topic => <<"no/subscrbier/topic">>, payload => Payload}
|
||||||
|
],
|
||||||
|
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
||||||
|
ResponseList = decode_json(Response),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{<<"id">> := _},
|
||||||
|
#{<<"id">> := _},
|
||||||
|
#{<<"message">> := <<"no_matching_subscribers">>}
|
||||||
|
],
|
||||||
|
ResponseList
|
||||||
|
),
|
||||||
|
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
||||||
|
?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
|
||||||
|
emqtt:stop(Client).
|
||||||
|
|
||||||
|
t_publish_bulk_dispatch_one_message_invalid_topic({init, Config}) ->
|
||||||
|
Config;
|
||||||
|
t_publish_bulk_dispatch_one_message_invalid_topic({'end', _Config}) ->
|
||||||
|
ok;
|
||||||
|
t_publish_bulk_dispatch_one_message_invalid_topic(Config) when is_list(Config) ->
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = [
|
||||||
|
#{topic => ?TOPIC1, payload => Payload},
|
||||||
|
#{topic => ?TOPIC2, payload => Payload},
|
||||||
|
#{topic => <<"bad/#/topic">>, payload => Payload}
|
||||||
|
],
|
||||||
|
{error, {Summary, _Headers, ResponseBody}} =
|
||||||
|
emqx_mgmt_api_test_util:request_api(
|
||||||
|
post,
|
||||||
|
Path,
|
||||||
|
"",
|
||||||
|
Auth,
|
||||||
|
Body,
|
||||||
|
#{return_body => true}
|
||||||
|
),
|
||||||
|
?assertMatch({_, 400, _}, Summary),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"reason_code">> := ?RC_TOPIC_NAME_INVALID},
|
||||||
|
decode_json(ResponseBody)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_publish_bulk_dispatch_failure({init, Config}) ->
|
||||||
|
meck:new(emqx, [no_link, passthrough, no_history]),
|
||||||
|
meck:expect(emqx, is_running, fun() -> false end),
|
||||||
|
Config;
|
||||||
|
t_publish_bulk_dispatch_failure({'end', _Config}) ->
|
||||||
|
meck:unload(emqx),
|
||||||
|
ok;
|
||||||
|
t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
|
||||||
|
{ok, Client} = emqtt:start_link(#{
|
||||||
|
username => <<"api_username">>, clientid => <<"api_clientid">>
|
||||||
|
}),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
|
||||||
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Body = [
|
||||||
|
#{topic => ?TOPIC1, payload => Payload},
|
||||||
|
#{topic => ?TOPIC2, payload => Payload},
|
||||||
|
#{topic => <<"no/subscrbier/topic">>, payload => Payload}
|
||||||
|
],
|
||||||
|
{error, {Summary, _Headers, ResponseBody}} =
|
||||||
|
emqx_mgmt_api_test_util:request_api(
|
||||||
|
post,
|
||||||
|
Path,
|
||||||
|
"",
|
||||||
|
Auth,
|
||||||
|
Body,
|
||||||
|
#{return_body => true}
|
||||||
|
),
|
||||||
|
?assertMatch({_, 503, _}, Summary),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{<<"reason_code">> := ?RC_IMPLEMENTATION_SPECIFIC_ERROR},
|
||||||
|
#{<<"reason_code">> := ?RC_IMPLEMENTATION_SPECIFIC_ERROR},
|
||||||
|
#{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}
|
||||||
|
],
|
||||||
|
decode_json(ResponseBody)
|
||||||
|
),
|
||||||
|
emqtt:stop(Client).
|
||||||
|
|
||||||
receive_assert(Topic, Qos, Payload) ->
|
receive_assert(Topic, Qos, Payload) ->
|
||||||
receive
|
receive
|
||||||
|
@ -90,3 +319,6 @@ receive_assert(Topic, Qos, Payload) ->
|
||||||
after 5000 ->
|
after 5000 ->
|
||||||
timeout
|
timeout
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
decode_json(In) ->
|
||||||
|
emqx_json:decode(In, [return_maps]).
|
||||||
|
|
|
@ -65,8 +65,11 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, []) when
|
||||||
"" -> Url;
|
"" -> Url;
|
||||||
_ -> Url ++ "?" ++ QueryParams
|
_ -> Url ++ "?" ++ QueryParams
|
||||||
end,
|
end,
|
||||||
do_request_api(Method, {NewUrl, build_http_header(AuthOrHeaders)});
|
do_request_api(Method, {NewUrl, build_http_header(AuthOrHeaders)}, #{});
|
||||||
request_api(Method, Url, QueryParams, AuthOrHeaders, Body) when
|
request_api(Method, Url, QueryParams, AuthOrHeaders, Body) ->
|
||||||
|
request_api(Method, Url, QueryParams, AuthOrHeaders, Body, #{}).
|
||||||
|
|
||||||
|
request_api(Method, Url, QueryParams, AuthOrHeaders, Body, Opts) when
|
||||||
(Method =:= post) orelse
|
(Method =:= post) orelse
|
||||||
(Method =:= patch) orelse
|
(Method =:= patch) orelse
|
||||||
(Method =:= put) orelse
|
(Method =:= put) orelse
|
||||||
|
@ -79,10 +82,12 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, Body) when
|
||||||
end,
|
end,
|
||||||
do_request_api(
|
do_request_api(
|
||||||
Method,
|
Method,
|
||||||
{NewUrl, build_http_header(AuthOrHeaders), "application/json", emqx_json:encode(Body)}
|
{NewUrl, build_http_header(AuthOrHeaders), "application/json", emqx_json:encode(Body)},
|
||||||
|
Opts
|
||||||
).
|
).
|
||||||
|
|
||||||
do_request_api(Method, Request) ->
|
do_request_api(Method, Request, Opts) ->
|
||||||
|
ReturnBody = maps:get(return_body, Opts, false),
|
||||||
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
|
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
|
||||||
case httpc:request(Method, Request, [], []) of
|
case httpc:request(Method, Request, [], []) of
|
||||||
{error, socket_closed_remotely} ->
|
{error, socket_closed_remotely} ->
|
||||||
|
@ -91,8 +96,9 @@ do_request_api(Method, Request) ->
|
||||||
Code >= 200 andalso Code =< 299
|
Code >= 200 andalso Code =< 299
|
||||||
->
|
->
|
||||||
{ok, Return};
|
{ok, Return};
|
||||||
{ok, {Reason, _, _} = Error} ->
|
{ok, {Reason, Headers, Body}} when ReturnBody ->
|
||||||
ct:pal("error: ~p~n", [Error]),
|
{error, {Reason, Headers, Body}};
|
||||||
|
{ok, {Reason, _Headers, _Body}} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue