diff --git a/apps/emqx/src/emqx_message.erl b/apps/emqx/src/emqx_message.erl
index ae74a614b..03f7ca6a2 100644
--- a/apps/emqx/src/emqx_message.erl
+++ b/apps/emqx/src/emqx_message.erl
@@ -74,7 +74,8 @@
to_map/1,
to_log_map/1,
to_list/1,
- from_map/1
+ from_map/1,
+ estimate_size/1
]).
-export_type([message_map/0]).
@@ -175,6 +176,18 @@ make(MsgId, From, QoS, Topic, Payload, Flags, Headers) when
timestamp = Now
}.
+%% optimistic esitmation of a message size after serialization
+%% not including MQTT v5 message headers/user properties etc.
+-spec estimate_size(emqx_types:message()) -> non_neg_integer().
+estimate_size(#message{topic = Topic, payload = Payload}) ->
+ FixedHeaderSize = 1,
+ VarLenSize = 4,
+ TopicSize = iolist_size(Topic),
+ PayloadSize = iolist_size(Payload),
+ PacketIdSize = 2,
+ TopicLengthSize = 2,
+ FixedHeaderSize + VarLenSize + TopicLengthSize + TopicSize + PacketIdSize + PayloadSize.
+
-spec id(emqx_types:message()) -> maybe(binary()).
id(#message{id = Id}) -> Id.
diff --git a/apps/emqx_management/i18n/emqx_mgmt_api_publish_i18n.conf b/apps/emqx_management/i18n/emqx_mgmt_api_publish_i18n.conf
new file mode 100644
index 000000000..2a7c9def8
--- /dev/null
+++ b/apps/emqx_management/i18n/emqx_mgmt_api_publish_i18n.conf
@@ -0,0 +1,127 @@
+
+emqx_mgmt_api_publish {
+ publish_api {
+ desc {
+ en: """
+Publish one message.
+Possible HTTP status response codes are:
+200
: The message is delivered to at least one subscriber;
+202
: No matched subscribers;
+400
: Message is invalid. for example bad topic name, or QoS is out of range;
+503
: Failed to deliver the message to subscriber(s);
+"""
+ zh: """
+发布一个消息。
+可能的 HTTP 状态码如下:
+200: 消息被成功发送到至少一个订阅。
+202: 没有匹配到任何订阅。
+400: 消息编码错误,如非法主题,或 QoS 超出范围等。
+503: 服务重启等过程中导致转发失败。
+"""
+ }
+ }
+ publish_bulk_api {
+ desc {
+ en: """
+Publish a batch of messages.
+Possible HTTP response status code are:
+200: All messages are delivered to at least one subscriber;
+202: At least one message was not delivered to any subscriber;
+400: At least one message is invalid. For example bad topic name, or QoS is out of range;
+503: Failed to deliver at least one of the messages;
+
+In case there is at lest one invalid message in the batch, the HTTP response body
+is the same as for /publish
API.
+Otherwise the HTTP response body is an array of JSON objects indicating the publish
+result of each individual message in the batch.
+"""
+ zh: """
+批量发布一组消息。
+可能的 HTTP 状态码如下:
+200: 所有的消息都被成功发送到至少一个订阅。
+202: 至少有一个消息没有匹配到任何订阅。
+400: 至少有一个消息编码错误,如非法主题,或 QoS 超出范围等。
+503: 至少有一个小因为服务重启的原因导致转发失败。
+
+请求的 Body 或者 Body 中包含的某个消息无法通过 API 规范的类型检查时,HTTP 响应的消息与发布单个消息的 API
+ /publish
是一样的。
+如果所有的消息都是合法的,那么 HTTP 返回的内容是一个 JSON 数组,每个元素代表了该消息转发的状态。
+
+"""
+ }
+ }
+
+ topic_name {
+ desc {
+ en: "Topic Name"
+ zh: "主题名称"
+ }
+ }
+ qos {
+ desc {
+ en: "MQTT message QoS"
+ zh: "MQTT 消息的 QoS"
+ }
+ }
+ clientid {
+ desc {
+ en: "Each message can be published as if it is done on behalf of an MQTT client whos ID can be specified in this field."
+ zh: "每个消息都可以带上一个 MQTT 客户端 ID,用于模拟 MQTT 客户端的发布行为。"
+ }
+ }
+ payload {
+ desc {
+ en: "The MQTT message payload."
+ zh: "MQTT 消息体。"
+ }
+ }
+ retain {
+ desc {
+ en: "A boolean field to indicate if this message should be retained."
+ zh: "布尔型字段,用于表示该消息是否保留消息。"
+ }
+ }
+ payload_encoding {
+ desc {
+ en: "MQTT Payload Encoding, base64
or plain
. When set to base64
, the message is decoded before it is published."
+ zh: "MQTT 消息体的编码方式,可以是 base64
或 plain
。当设置为 base64
时,消息在发布前会先被解码。"
+ }
+ }
+ message_id {
+ desc {
+ en: "A globally unique message ID for correlation/tracing."
+ zh: "全局唯一的一个消息 ID,方便用于关联和追踪。"
+ }
+ }
+ reason_code {
+ desc {
+ en: """
+The MQTT reason code, as the same ones used in PUBACK packet.
+Currently supported codes are:
+
+16(0x10): No matching subscribers;
+131(0x81): Error happened when dispatching the message. e.g. during EMQX restart;
+144(0x90): Topic name invalid;
+151(0x97): Publish rate limited, or message size exceeded limit. The global size limit can be configured with mqtt.max_packet_size
+NOTE: The message size is estimated with the received topic and payload size, meaning the actual size of serialized bytes (when sent to MQTT subscriber)
+might be slightly over the limit.
+"""
+ zh: """
+MQTT 消息发布的错误码,这些错误码也是 MQTT 规范中 PUBACK 消息可能携带的错误码。
+当前支持如下错误码:
+
+16(0x10):没能匹配到任何订阅;
+131(0x81):消息转发时发生错误,例如 EMQX 服务重启;
+144(0x90):主题名称非法;
+151(0x97):受到了速率限制,或者消息尺寸过大。全局消息大小限制可以通过配置项 mqtt.max_packet_size
来进行修改。
+注意:消息尺寸的是通过主题和消息体的字节数进行估算的。具体发布时所占用的字节数可能会稍大于这个估算的值。
+"""
+ }
+ }
+ error_message {
+ desc {
+ en: "Describes the failure reason in detail."
+ zh: "失败的详细原因。"
+ }
+ }
+}
diff --git a/apps/emqx_management/src/emqx_mgmt_api_publish.erl b/apps/emqx_management/src/emqx_mgmt_api_publish.erl
index 0e05a3875..1678c56e0 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_publish.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_publish.erl
@@ -16,7 +16,15 @@
-module(emqx_mgmt_api_publish).
-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-define(ALL_IS_WELL, 200).
+-define(PARTIALLY_OK, 202).
+-define(BAD_REQUEST, 400).
+-define(DISPATCH_ERROR, 503).
-behaviour(minirest_api).
@@ -42,11 +50,14 @@ schema("/publish") ->
#{
'operationId' => publish,
post => #{
- description => <<"Publish Message">>,
+ description => ?DESC(publish_api),
tags => [<<"Publish">>],
'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, publish_message)),
responses => #{
- 200 => hoconsc:mk(hoconsc:ref(?MODULE, publish_message_info))
+ ?ALL_IS_WELL => hoconsc:mk(hoconsc:ref(?MODULE, publish_ok)),
+ ?PARTIALLY_OK => hoconsc:mk(hoconsc:ref(?MODULE, publish_error)),
+ ?BAD_REQUEST => bad_request_schema(),
+ ?DISPATCH_ERROR => hoconsc:mk(hoconsc:ref(?MODULE, publish_error))
}
}
};
@@ -54,44 +65,58 @@ schema("/publish/bulk") ->
#{
'operationId' => publish_batch,
post => #{
- description => <<"Publish Messages">>,
+ description => ?DESC(publish_bulk_api),
tags => [<<"Publish">>],
'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_message)), #{}),
responses => #{
- 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_message_info)), #{})
+ ?ALL_IS_WELL => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_ok)), #{}),
+ ?PARTIALLY_OK => hoconsc:mk(
+ hoconsc:array(hoconsc:ref(?MODULE, publish_error)), #{}
+ ),
+ ?BAD_REQUEST => bad_request_schema(),
+ ?DISPATCH_ERROR => hoconsc:mk(
+ hoconsc:array(hoconsc:ref(?MODULE, publish_error)), #{}
+ )
}
}
}.
+bad_request_schema() ->
+ Union = hoconsc:union([
+ hoconsc:ref(?MODULE, bad_request),
+ hoconsc:array(hoconsc:ref(?MODULE, publish_error))
+ ]),
+ hoconsc:mk(Union, #{}).
+
fields(message) ->
[
{topic,
hoconsc:mk(binary(), #{
- desc => <<"Topic Name">>,
+ desc => ?DESC(topic_name),
required => true,
example => <<"api/example/topic">>
})},
{qos,
hoconsc:mk(emqx_schema:qos(), #{
- desc => <<"MQTT QoS">>,
+ desc => ?DESC(qos),
required => false,
default => 0
})},
{clientid,
hoconsc:mk(binary(), #{
- desc => <<"From client ID">>,
+ desc => ?DESC(clientid),
required => false,
example => <<"api_example_client">>
})},
{payload,
hoconsc:mk(binary(), #{
- desc => <<"MQTT Payload">>,
+ desc => ?DESC(payload),
required => true,
example => <<"hello emqx api">>
})},
{retain,
hoconsc:mk(boolean(), #{
- desc => <<"MQTT Retain Message">>,
+ desc => ?DESC(retain),
required => false,
default => false
})}
@@ -100,53 +125,196 @@ fields(publish_message) ->
[
{payload_encoding,
hoconsc:mk(hoconsc:enum([plain, base64]), #{
- desc => <<"MQTT Payload Encoding, base64 or plain">>,
+ desc => ?DESC(payload_encoding),
required => false,
default => plain
})}
] ++ fields(message);
-fields(publish_message_info) ->
+fields(publish_ok) ->
[
{id,
hoconsc:mk(binary(), #{
- desc => <<"A globally unique message ID">>
+ desc => ?DESC(message_id)
+ })}
+ ];
+fields(publish_error) ->
+ [
+ {reason_code,
+ hoconsc:mk(integer(), #{
+ desc => ?DESC(reason_code),
+ example => 16
+ })},
+ {message,
+ hoconsc:mk(binary(), #{
+ desc => ?DESC(error_message),
+ example => <<"no_matching_subscribers">>
+ })}
+ ];
+fields(bad_request) ->
+ [
+ {code,
+ hoconsc:mk(string(), #{
+ desc => <<"BAD_REQUEST">>
+ })},
+ {message,
+ hoconsc:mk(binary(), #{
+ desc => ?DESC(error_message)
})}
].
publish(post, #{body := Body}) ->
case message(Body) of
{ok, Message} ->
- _ = emqx_mgmt:publish(Message),
- {200, format_message_response(Message)};
- {error, R} ->
- {400, 'BAD_REQUEST', to_binary(R)}
+ Res = emqx_mgmt:publish(Message),
+ publish_result_to_http_reply(Message, Res);
+ {error, Reason} ->
+ {?BAD_REQUEST, make_bad_req_reply(Reason)}
end.
publish_batch(post, #{body := Body}) ->
case messages(Body) of
{ok, Messages} ->
- _ = [emqx_mgmt:publish(Message) || Message <- Messages],
- {200, format_message_response(Messages)};
- {error, R} ->
- {400, 'BAD_REQUEST', to_binary(R)}
+ ResList = lists:map(
+ fun(Message) ->
+ Res = emqx_mgmt:publish(Message),
+ publish_result_to_http_reply(Message, Res)
+ end,
+ Messages
+ ),
+ publish_results_to_http_reply(ResList);
+ {error, Reason} ->
+ {?BAD_REQUEST, make_bad_req_reply(Reason)}
end.
+make_bad_req_reply(invalid_topic_name) ->
+ make_publish_error_response(?RC_TOPIC_NAME_INVALID);
+make_bad_req_reply(packet_too_large) ->
+ %% 0x95 RC_PACKET_TOO_LARGE is not a PUBACK reason code
+ %% This is why we use RC_QUOTA_EXCEEDED instead
+ make_publish_error_response(?RC_QUOTA_EXCEEDED, packet_too_large);
+make_bad_req_reply(Reason) ->
+ make_publish_error_response(?RC_IMPLEMENTATION_SPECIFIC_ERROR, to_binary(Reason)).
+
+-spec is_ok_deliver({_NodeOrShare, _MatchedTopic, emqx_types:deliver_result()}) -> boolean().
+is_ok_deliver({_NodeOrShare, _MatchedTopic, ok}) -> true;
+is_ok_deliver({_NodeOrShare, _MatchedTopic, {ok, _}}) -> true;
+is_ok_deliver({_NodeOrShare, _MatchedTopic, {error, _}}) -> false.
+
+%% @hidden Map MQTT publish result reason code to HTTP status code.
+%% MQTT reason code | Description | HTTP status code
+%% 0 Success 200
+%% 16 No matching subscribers 202
+%% 128 Unspecified error 406
+%% 131 Implementation specific error 406
+%% 144 Topic Name invalid 400
+%% 151 Quota exceeded 400
+%%
+%% %%%%%% Below error codes are not implemented so far %%%%
+%%
+%% If HTTP request passes HTTP authentication, it is considered trusted.
+%% In the future, we may choose to check ACL for the provided MQTT Client ID
+%% 135 Not authorized 401
+%%
+%% %%%%%% Below error codes are not applicable %%%%%%%
+%%
+%% No user specified packet ID, so there should be no packet ID error
+%% 145 Packet identifier is in use 400
+%%
+%% No preceding payload format indicator to compare against.
+%% Content-Type check should be done at HTTP layer but not here.
+%% 153 Payload format invalid 400
+publish_result_to_http_reply(_Message, []) ->
+ %% matched no subscriber
+ {?PARTIALLY_OK, make_publish_error_response(?RC_NO_MATCHING_SUBSCRIBERS)};
+publish_result_to_http_reply(Message, PublishResult) ->
+ case lists:any(fun is_ok_deliver/1, PublishResult) of
+ true ->
+ %% delivered to at least one subscriber
+ OkBody = make_publish_response(Message),
+ {?ALL_IS_WELL, OkBody};
+ false ->
+ %% this is quite unusual, matched, but failed to deliver
+ %% if this happens, the publish result log can be helpful
+ %% to idnetify the reason why publish failed
+ %% e.g. during emqx restart
+ ReasonString = <<"failed_to_dispatch">>,
+ ErrorBody = make_publish_error_response(
+ ?RC_IMPLEMENTATION_SPECIFIC_ERROR, ReasonString
+ ),
+ ?SLOG(warning, #{
+ msg => ReasonString,
+ message_id => emqx_message:id(Message),
+ results => PublishResult
+ }),
+ {?DISPATCH_ERROR, ErrorBody}
+ end.
+
+%% @hidden Reply batch publish result.
+%% 200 if all published OK.
+%% 202 if at least one message matched no subscribers.
+%% 503 for temp errors duing EMQX restart
+publish_results_to_http_reply([_ | _] = ResList) ->
+ {Codes0, BodyL} = lists:unzip(ResList),
+ Codes = lists:usort(Codes0),
+ HasFailure = lists:member(?DISPATCH_ERROR, Codes),
+ All200 = (Codes =:= [?ALL_IS_WELL]),
+ Code =
+ case All200 of
+ true ->
+ %% All OK
+ ?ALL_IS_WELL;
+ false when not HasFailure ->
+ %% Partially OK
+ ?PARTIALLY_OK;
+ false ->
+ %% At least one failed
+ ?DISPATCH_ERROR
+ end,
+ {Code, BodyL}.
+
message(Map) ->
+ try
+ make_message(Map)
+ catch
+ throw:Reason ->
+ {error, Reason}
+ end.
+
+make_message(Map) ->
Encoding = maps:get(<<"payload_encoding">>, Map, plain),
- case encode_payload(Encoding, maps:get(<<"payload">>, Map)) of
+ case decode_payload(Encoding, maps:get(<<"payload">>, Map)) of
{ok, Payload} ->
From = maps:get(<<"clientid">>, Map, http_api),
QoS = maps:get(<<"qos">>, Map, 0),
Topic = maps:get(<<"topic">>, Map),
Retain = maps:get(<<"retain">>, Map, false),
- {ok, emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{})};
+ try
+ _ = emqx_topic:validate(name, Topic)
+ catch
+ error:_Reason ->
+ throw(invalid_topic_name)
+ end,
+ Message = emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}),
+ Size = emqx_message:estimate_size(Message),
+ (Size > size_limit()) andalso throw(packet_too_large),
+ {ok, Message};
{error, R} ->
{error, R}
end.
-encode_payload(plain, Payload) ->
+%% get the global packet size limit since HTTP API does not belong to any zone.
+size_limit() ->
+ try
+ emqx_config:get([mqtt, max_packet_size])
+ catch
+ _:_ ->
+ %% leave 1000 bytes for topic name etc.
+ ?MAX_PACKET_SIZE
+ end.
+
+decode_payload(plain, Payload) ->
{ok, Payload};
-encode_payload(base64, Payload) ->
+decode_payload(base64, Payload) ->
try
{ok, base64:decode(Payload)}
catch
@@ -154,6 +322,8 @@ encode_payload(base64, Payload) ->
{error, {decode_base64_payload_failed, Payload}}
end.
+messages([]) ->
+ {errror, <<"empty_batch">>};
messages(List) ->
messages(List, []).
@@ -167,10 +337,23 @@ messages([MessageMap | List], Res) ->
{error, R}
end.
-format_message_response(Messages) when is_list(Messages) ->
- [format_message_response(Message) || Message <- Messages];
-format_message_response(#message{id = ID}) ->
- #{id => emqx_guid:to_hexstr(ID)}.
+make_publish_response(#message{id = ID}) ->
+ #{
+ id => emqx_guid:to_hexstr(ID)
+ }.
+make_publish_error_response(ReasonCode) ->
+ make_publish_error_response(ReasonCode, emqx_reason_codes:name(ReasonCode)).
+
+make_publish_error_response(ReasonCode, Msg) ->
+ #{
+ reason_code => ReasonCode,
+ message => to_binary(Msg)
+ }.
+
+to_binary(Atom) when is_atom(Atom) ->
+ atom_to_binary(Atom);
+to_binary(Msg) when is_binary(Msg) ->
+ Msg;
to_binary(Term) ->
- list_to_binary(io_lib:format("~p", [Term])).
+ list_to_binary(io_lib:format("~0p", [Term])).
diff --git a/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl
index b4b3aa902..0ebaf7195 100644
--- a/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl
+++ b/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl
@@ -19,6 +19,7 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
-define(CLIENTID, <<"api_clientid">>).
-define(USERNAME, <<"api_username">>).
@@ -36,6 +37,16 @@ init_per_suite(Config) ->
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite().
+init_per_testcase(Case, Config) ->
+ ?MODULE:Case({init, Config}).
+
+end_per_testcase(Case, Config) ->
+ ?MODULE:Case({'end', Config}).
+
+t_publish_api({init, Config}) ->
+ Config;
+t_publish_api({'end', _Config}) ->
+ ok;
t_publish_api(_) ->
{ok, Client} = emqtt:start_link(#{
username => <<"api_username">>, clientid => <<"api_clientid">>
@@ -48,11 +59,113 @@ t_publish_api(_) ->
Auth = emqx_mgmt_api_test_util:auth_header_(),
Body = #{topic => ?TOPIC1, payload => Payload},
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
- ResponseMap = emqx_json:decode(Response, [return_maps]),
- ?assertEqual([<<"id">>], maps:keys(ResponseMap)),
+ ResponseMap = decode_json(Response),
+ ?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap))),
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
- emqtt:disconnect(Client).
+ emqtt:stop(Client).
+t_publish_no_subscriber({init, Config}) ->
+ Config;
+t_publish_no_subscriber({'end', _Config}) ->
+ ok;
+t_publish_no_subscriber(_) ->
+ Payload = <<"hello">>,
+ Path = emqx_mgmt_api_test_util:api_path(["publish"]),
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
+ Body = #{topic => ?TOPIC1, payload => Payload},
+ {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
+ ResponseMap = decode_json(Response),
+ ?assertEqual([<<"message">>, <<"reason_code">>], lists:sort(maps:keys(ResponseMap))),
+ ?assertMatch(#{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}, ResponseMap),
+ ok.
+
+t_publish_bad_topic({init, Config}) ->
+ Config;
+t_publish_bad_topic({'end', _Config}) ->
+ ok;
+t_publish_bad_topic(_) ->
+ Payload = <<"hello">>,
+ Path = emqx_mgmt_api_test_util:api_path(["publish"]),
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
+ Body = #{topic => <<"not/a+/valid/topic">>, payload => Payload},
+ ?assertMatch(
+ {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body)
+ ).
+
+t_publish_bad_base64({init, Config}) ->
+ Config;
+t_publish_bad_base64({'end', _Config}) ->
+ ok;
+t_publish_bad_base64(_) ->
+ %% not a valid base64
+ Payload = <<"hello">>,
+ Path = emqx_mgmt_api_test_util:api_path(["publish"]),
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
+ Body = #{
+ topic => <<"not/a+/valid/topic">>, payload => Payload, payload_encoding => <<"base64">>
+ },
+ ?assertMatch(
+ {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body)
+ ).
+
+t_publish_too_large({init, Config}) ->
+ MaxPacketSize = 100,
+ meck:new(emqx_config, [no_link, passthrough, no_history]),
+ meck:expect(emqx_config, get, fun
+ ([mqtt, max_packet_size]) ->
+ MaxPacketSize;
+ (Other) ->
+ meck:passthrough(Other)
+ end),
+ [{max_packet_size, MaxPacketSize} | Config];
+t_publish_too_large({'end', _Config}) ->
+ meck:unload(emqx_config),
+ ok;
+t_publish_too_large(Config) ->
+ MaxPacketSize = proplists:get_value(max_packet_size, Config),
+ Payload = lists:duplicate(MaxPacketSize, $0),
+ Path = emqx_mgmt_api_test_util:api_path(["publish"]),
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
+ Body = #{topic => <<"random/topic">>, payload => Payload},
+ {error, {Summary, _Headers, ResponseBody}} =
+ emqx_mgmt_api_test_util:request_api(
+ post,
+ Path,
+ "",
+ Auth,
+ Body,
+ #{return_body => true}
+ ),
+ ?assertMatch({_, 400, _}, Summary),
+ ?assertMatch(
+ #{
+ <<"reason_code">> := ?RC_QUOTA_EXCEEDED,
+ <<"message">> := <<"packet_too_large">>
+ },
+ decode_json(ResponseBody)
+ ),
+ ok.
+
+t_publish_bad_topic_bulk({init, Config}) ->
+ Config;
+t_publish_bad_topic_bulk({'end', _Config}) ->
+ ok;
+t_publish_bad_topic_bulk(_Config) ->
+ Payload = <<"hello">>,
+ Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
+ Body = [
+ #{topic => <<"not/a+/valid/topic">>, payload => Payload},
+ #{topic => <<"good/topic">>, payload => Payload}
+ ],
+ ?assertMatch(
+ {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body)
+ ).
+
+t_publish_bulk_api({init, Config}) ->
+ Config;
+t_publish_bulk_api({'end', _Config}) ->
+ ok;
t_publish_bulk_api(_) ->
{ok, Client} = emqtt:start_link(#{
username => <<"api_username">>, clientid => <<"api_clientid">>
@@ -63,19 +176,135 @@ t_publish_bulk_api(_) ->
Payload = <<"hello">>,
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
- Body = [#{topic => ?TOPIC1, payload => Payload}, #{topic => ?TOPIC2, payload => Payload}],
+ Body = [
+ #{
+ topic => ?TOPIC1,
+ payload => Payload,
+ payload_encoding => plain
+ },
+ #{
+ topic => ?TOPIC2,
+ payload => base64:encode(Payload),
+ payload_encoding => base64
+ }
+ ],
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
- ResponseList = emqx_json:decode(Response, [return_maps]),
+ ResponseList = decode_json(Response),
?assertEqual(2, erlang:length(ResponseList)),
lists:foreach(
fun(ResponseMap) ->
- ?assertEqual([<<"id">>], maps:keys(ResponseMap))
+ ?assertMatch(
+ [<<"id">>], lists:sort(maps:keys(ResponseMap))
+ )
end,
ResponseList
),
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
- emqtt:disconnect(Client).
+ emqtt:stop(Client).
+
+t_publish_no_subscriber_bulk({init, Config}) ->
+ Config;
+t_publish_no_subscriber_bulk({'end', _Config}) ->
+ ok;
+t_publish_no_subscriber_bulk(_) ->
+ {ok, Client} = emqtt:start_link(#{
+ username => <<"api_username">>, clientid => <<"api_clientid">>
+ }),
+ {ok, _} = emqtt:connect(Client),
+ {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
+ {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
+ Payload = <<"hello">>,
+ Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
+ Body = [
+ #{topic => ?TOPIC1, payload => Payload},
+ #{topic => ?TOPIC2, payload => Payload},
+ #{topic => <<"no/subscrbier/topic">>, payload => Payload}
+ ],
+ {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
+ ResponseList = decode_json(Response),
+ ?assertMatch(
+ [
+ #{<<"id">> := _},
+ #{<<"id">> := _},
+ #{<<"message">> := <<"no_matching_subscribers">>}
+ ],
+ ResponseList
+ ),
+ ?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
+ ?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
+ emqtt:stop(Client).
+
+t_publish_bulk_dispatch_one_message_invalid_topic({init, Config}) ->
+ Config;
+t_publish_bulk_dispatch_one_message_invalid_topic({'end', _Config}) ->
+ ok;
+t_publish_bulk_dispatch_one_message_invalid_topic(Config) when is_list(Config) ->
+ Payload = <<"hello">>,
+ Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
+ Body = [
+ #{topic => ?TOPIC1, payload => Payload},
+ #{topic => ?TOPIC2, payload => Payload},
+ #{topic => <<"bad/#/topic">>, payload => Payload}
+ ],
+ {error, {Summary, _Headers, ResponseBody}} =
+ emqx_mgmt_api_test_util:request_api(
+ post,
+ Path,
+ "",
+ Auth,
+ Body,
+ #{return_body => true}
+ ),
+ ?assertMatch({_, 400, _}, Summary),
+ ?assertMatch(
+ #{<<"reason_code">> := ?RC_TOPIC_NAME_INVALID},
+ decode_json(ResponseBody)
+ ).
+
+t_publish_bulk_dispatch_failure({init, Config}) ->
+ meck:new(emqx, [no_link, passthrough, no_history]),
+ meck:expect(emqx, is_running, fun() -> false end),
+ Config;
+t_publish_bulk_dispatch_failure({'end', _Config}) ->
+ meck:unload(emqx),
+ ok;
+t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
+ {ok, Client} = emqtt:start_link(#{
+ username => <<"api_username">>, clientid => <<"api_clientid">>
+ }),
+ {ok, _} = emqtt:connect(Client),
+ {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
+ {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
+ Payload = <<"hello">>,
+ Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
+ Body = [
+ #{topic => ?TOPIC1, payload => Payload},
+ #{topic => ?TOPIC2, payload => Payload},
+ #{topic => <<"no/subscrbier/topic">>, payload => Payload}
+ ],
+ {error, {Summary, _Headers, ResponseBody}} =
+ emqx_mgmt_api_test_util:request_api(
+ post,
+ Path,
+ "",
+ Auth,
+ Body,
+ #{return_body => true}
+ ),
+ ?assertMatch({_, 503, _}, Summary),
+ ?assertMatch(
+ [
+ #{<<"reason_code">> := ?RC_IMPLEMENTATION_SPECIFIC_ERROR},
+ #{<<"reason_code">> := ?RC_IMPLEMENTATION_SPECIFIC_ERROR},
+ #{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}
+ ],
+ decode_json(ResponseBody)
+ ),
+ emqtt:stop(Client).
receive_assert(Topic, Qos, Payload) ->
receive
@@ -90,3 +319,6 @@ receive_assert(Topic, Qos, Payload) ->
after 5000 ->
timeout
end.
+
+decode_json(In) ->
+ emqx_json:decode(In, [return_maps]).
diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl
index a8b04dc80..1bc29dfee 100644
--- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl
+++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl
@@ -65,8 +65,11 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, []) when
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
- do_request_api(Method, {NewUrl, build_http_header(AuthOrHeaders)});
-request_api(Method, Url, QueryParams, AuthOrHeaders, Body) when
+ do_request_api(Method, {NewUrl, build_http_header(AuthOrHeaders)}, #{});
+request_api(Method, Url, QueryParams, AuthOrHeaders, Body) ->
+ request_api(Method, Url, QueryParams, AuthOrHeaders, Body, #{}).
+
+request_api(Method, Url, QueryParams, AuthOrHeaders, Body, Opts) when
(Method =:= post) orelse
(Method =:= patch) orelse
(Method =:= put) orelse
@@ -79,10 +82,12 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, Body) when
end,
do_request_api(
Method,
- {NewUrl, build_http_header(AuthOrHeaders), "application/json", emqx_json:encode(Body)}
+ {NewUrl, build_http_header(AuthOrHeaders), "application/json", emqx_json:encode(Body)},
+ Opts
).
-do_request_api(Method, Request) ->
+do_request_api(Method, Request, Opts) ->
+ ReturnBody = maps:get(return_body, Opts, false),
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], []) of
{error, socket_closed_remotely} ->
@@ -91,8 +96,9 @@ do_request_api(Method, Request) ->
Code >= 200 andalso Code =< 299
->
{ok, Return};
- {ok, {Reason, _, _} = Error} ->
- ct:pal("error: ~p~n", [Error]),
+ {ok, {Reason, Headers, Body}} when ReturnBody ->
+ {error, {Reason, Headers, Body}};
+ {ok, {Reason, _Headers, _Body}} ->
{error, Reason}
end.