431 lines
14 KiB
Erlang
431 lines
14 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_mgmt_api_publish).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
|
|
-define(ALL_IS_WELL, 200).
|
|
-define(PARTIALLY_OK, 202).
|
|
-define(BAD_REQUEST, 400).
|
|
-define(DISPATCH_ERROR, 503).
|
|
|
|
-behaviour(minirest_api).
|
|
|
|
-export([
|
|
api_spec/0,
|
|
paths/0,
|
|
schema/1,
|
|
fields/1,
|
|
namespace/0
|
|
]).
|
|
|
|
-export([
|
|
publish/2,
|
|
publish_batch/2
|
|
]).
|
|
|
|
namespace() -> undefined.
|
|
|
|
api_spec() ->
|
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
|
|
|
|
paths() ->
|
|
["/publish", "/publish/bulk"].
|
|
|
|
schema("/publish") ->
|
|
#{
|
|
'operationId' => publish,
|
|
post => #{
|
|
summary => <<"Publish a message">>,
|
|
description => ?DESC(publish_api),
|
|
tags => [<<"Publish">>],
|
|
'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, publish_message)),
|
|
responses => #{
|
|
?ALL_IS_WELL => hoconsc:mk(hoconsc:ref(?MODULE, publish_ok)),
|
|
?PARTIALLY_OK => hoconsc:mk(hoconsc:ref(?MODULE, publish_error)),
|
|
?BAD_REQUEST => hoconsc:mk(hoconsc:ref(?MODULE, bad_request)),
|
|
?DISPATCH_ERROR => hoconsc:mk(hoconsc:ref(?MODULE, publish_error))
|
|
}
|
|
}
|
|
};
|
|
schema("/publish/bulk") ->
|
|
#{
|
|
'operationId' => publish_batch,
|
|
post => #{
|
|
summary => <<"Publish a batch of messages">>,
|
|
description => ?DESC(publish_bulk_api),
|
|
tags => [<<"Publish">>],
|
|
'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_message)), #{}),
|
|
responses => #{
|
|
?ALL_IS_WELL => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_ok)), #{}),
|
|
?PARTIALLY_OK => hoconsc:mk(
|
|
hoconsc:array(hoconsc:ref(?MODULE, publish_error)), #{}
|
|
),
|
|
?BAD_REQUEST => bad_request_schema(),
|
|
?DISPATCH_ERROR => hoconsc:mk(
|
|
hoconsc:array(hoconsc:ref(?MODULE, publish_error)), #{}
|
|
)
|
|
}
|
|
}
|
|
}.
|
|
|
|
bad_request_schema() ->
|
|
Union = hoconsc:union([
|
|
hoconsc:ref(?MODULE, bad_request),
|
|
hoconsc:array(hoconsc:ref(?MODULE, publish_error))
|
|
]),
|
|
hoconsc:mk(Union, #{}).
|
|
|
|
fields(message) ->
|
|
[
|
|
{topic,
|
|
hoconsc:mk(binary(), #{
|
|
desc => ?DESC(topic_name),
|
|
required => true,
|
|
example => <<"api/example/topic">>
|
|
})},
|
|
{qos,
|
|
hoconsc:mk(emqx_schema:qos(), #{
|
|
desc => ?DESC(qos),
|
|
required => false,
|
|
default => 0
|
|
})},
|
|
{clientid,
|
|
hoconsc:mk(binary(), #{
|
|
deprecated => {since, "v5.0.14"}
|
|
})},
|
|
{payload,
|
|
hoconsc:mk(binary(), #{
|
|
desc => ?DESC(payload),
|
|
required => true,
|
|
example => <<"hello emqx api">>
|
|
})},
|
|
{properties,
|
|
hoconsc:mk(hoconsc:ref(?MODULE, message_properties), #{
|
|
desc => ?DESC(message_properties),
|
|
required => false
|
|
})},
|
|
{retain,
|
|
hoconsc:mk(boolean(), #{
|
|
desc => ?DESC(retain),
|
|
required => false,
|
|
default => false
|
|
})}
|
|
];
|
|
fields(publish_message) ->
|
|
[
|
|
{payload_encoding,
|
|
hoconsc:mk(hoconsc:enum([plain, base64]), #{
|
|
desc => ?DESC(payload_encoding),
|
|
required => false,
|
|
default => plain
|
|
})}
|
|
] ++ fields(message);
|
|
fields(message_properties) ->
|
|
[
|
|
{'payload_format_indicator',
|
|
hoconsc:mk(typerefl:range(0, 1), #{
|
|
desc => ?DESC(msg_payload_format_indicator),
|
|
required => false,
|
|
example => 0
|
|
})},
|
|
{'message_expiry_interval',
|
|
hoconsc:mk(integer(), #{
|
|
desc => ?DESC(msg_message_expiry_interval),
|
|
required => false
|
|
})},
|
|
{'response_topic',
|
|
hoconsc:mk(binary(), #{
|
|
desc => ?DESC(msg_response_topic),
|
|
required => false,
|
|
example => <<"some_other_topic">>
|
|
})},
|
|
{'correlation_data',
|
|
hoconsc:mk(binary(), #{
|
|
desc => ?DESC(msg_correlation_data),
|
|
required => false
|
|
})},
|
|
{'user_properties',
|
|
hoconsc:mk(map(), #{
|
|
desc => ?DESC(msg_user_properties),
|
|
required => false,
|
|
example => #{<<"foo">> => <<"bar">>}
|
|
})},
|
|
{'content_type',
|
|
hoconsc:mk(binary(), #{
|
|
desc => ?DESC(msg_content_type),
|
|
required => false,
|
|
example => <<"text/plain">>
|
|
})}
|
|
];
|
|
fields(publish_ok) ->
|
|
[
|
|
{id,
|
|
hoconsc:mk(binary(), #{
|
|
desc => ?DESC(message_id)
|
|
})}
|
|
];
|
|
fields(publish_error) ->
|
|
[
|
|
{reason_code,
|
|
hoconsc:mk(integer(), #{
|
|
desc => ?DESC(reason_code),
|
|
example => 16
|
|
})},
|
|
{message,
|
|
hoconsc:mk(binary(), #{
|
|
desc => ?DESC(error_message),
|
|
example => <<"no_matching_subscribers">>
|
|
})}
|
|
];
|
|
fields(bad_request) ->
|
|
[
|
|
{code,
|
|
hoconsc:mk(string(), #{
|
|
desc => <<"BAD_REQUEST">>,
|
|
example => ?RC_TOPIC_NAME_INVALID
|
|
})},
|
|
{message,
|
|
hoconsc:mk(binary(), #{
|
|
desc => ?DESC(error_message),
|
|
example => to_binary(emqx_reason_codes:name(?RC_TOPIC_NAME_INVALID))
|
|
})}
|
|
].
|
|
|
|
publish(post, #{body := Body}) ->
|
|
case message(Body) of
|
|
{ok, Message} ->
|
|
Res = emqx_mgmt:publish(Message),
|
|
publish_result_to_http_reply(Message, Res);
|
|
{error, Reason} ->
|
|
{?BAD_REQUEST, make_bad_req_reply(Reason)}
|
|
end.
|
|
|
|
publish_batch(post, #{body := Body}) ->
|
|
case messages(Body) of
|
|
{ok, Messages} ->
|
|
ResList = lists:map(
|
|
fun(Message) ->
|
|
Res = emqx_mgmt:publish(Message),
|
|
publish_result_to_http_reply(Message, Res)
|
|
end,
|
|
Messages
|
|
),
|
|
publish_results_to_http_reply(ResList);
|
|
{error, Reason} ->
|
|
{?BAD_REQUEST, make_bad_req_reply(Reason)}
|
|
end.
|
|
|
|
make_bad_req_reply(invalid_topic_name) ->
|
|
make_publish_error_response(?RC_TOPIC_NAME_INVALID);
|
|
make_bad_req_reply(packet_too_large) ->
|
|
%% 0x95 RC_PACKET_TOO_LARGE is not a PUBACK reason code
|
|
%% This is why we use RC_QUOTA_EXCEEDED instead
|
|
make_publish_error_response(?RC_QUOTA_EXCEEDED, packet_too_large);
|
|
make_bad_req_reply(Reason) ->
|
|
make_publish_error_response(?RC_IMPLEMENTATION_SPECIFIC_ERROR, to_binary(Reason)).
|
|
|
|
-spec is_ok_deliver({_NodeOrShare, _MatchedTopic, emqx_types:deliver_result()}) -> boolean().
|
|
is_ok_deliver({_NodeOrShare, _MatchedTopic, ok}) -> true;
|
|
is_ok_deliver({_NodeOrShare, _MatchedTopic, {ok, _}}) -> true;
|
|
is_ok_deliver({_NodeOrShare, _MatchedTopic, {error, _}}) -> false.
|
|
|
|
%% @hidden Map MQTT publish result reason code to HTTP status code.
|
|
%% MQTT reason code | Description | HTTP status code
|
|
%% 0 Success 200
|
|
%% 16 No matching subscribers 202
|
|
%% 128 Unspecified error 406
|
|
%% 131 Implementation specific error 406
|
|
%% 144 Topic Name invalid 400
|
|
%% 151 Quota exceeded 400
|
|
%%
|
|
%% %%%%%% Below error codes are not implemented so far %%%%
|
|
%%
|
|
%% If HTTP request passes HTTP authentication, it is considered trusted.
|
|
%% 135 Not authorized 401
|
|
%%
|
|
%% %%%%%% Below error codes are not applicable %%%%%%%
|
|
%%
|
|
%% No user specified packet ID, so there should be no packet ID error
|
|
%% 145 Packet identifier is in use 400
|
|
%%
|
|
%% No preceding payload format indicator to compare against.
|
|
%% Content-Type check should be done at HTTP layer but not here.
|
|
%% 153 Payload format invalid 400
|
|
publish_result_to_http_reply(_Message, []) ->
|
|
%% matched no subscriber
|
|
{?PARTIALLY_OK, make_publish_error_response(?RC_NO_MATCHING_SUBSCRIBERS)};
|
|
publish_result_to_http_reply(Message, PublishResult) ->
|
|
case lists:any(fun is_ok_deliver/1, PublishResult) of
|
|
true ->
|
|
%% delivered to at least one subscriber
|
|
OkBody = make_publish_response(Message),
|
|
{?ALL_IS_WELL, OkBody};
|
|
false ->
|
|
%% this is quite unusual, matched, but failed to deliver
|
|
%% if this happens, the publish result log can be helpful
|
|
%% to idnetify the reason why publish failed
|
|
%% e.g. during emqx restart
|
|
ReasonString = <<"failed_to_dispatch">>,
|
|
ErrorBody = make_publish_error_response(
|
|
?RC_IMPLEMENTATION_SPECIFIC_ERROR, ReasonString
|
|
),
|
|
?SLOG(warning, #{
|
|
msg => ReasonString,
|
|
message_id => emqx_message:id(Message),
|
|
results => PublishResult
|
|
}),
|
|
{?DISPATCH_ERROR, ErrorBody}
|
|
end.
|
|
|
|
%% @hidden Reply batch publish result.
|
|
%% 200 if all published OK.
|
|
%% 202 if at least one message matched no subscribers.
|
|
%% 503 for temp errors duing EMQX restart
|
|
publish_results_to_http_reply([_ | _] = ResList) ->
|
|
{Codes0, BodyL} = lists:unzip(ResList),
|
|
Codes = lists:usort(Codes0),
|
|
HasFailure = lists:member(?DISPATCH_ERROR, Codes),
|
|
All200 = (Codes =:= [?ALL_IS_WELL]),
|
|
Code =
|
|
case All200 of
|
|
true ->
|
|
%% All OK
|
|
?ALL_IS_WELL;
|
|
false when not HasFailure ->
|
|
%% Partially OK
|
|
?PARTIALLY_OK;
|
|
false ->
|
|
%% At least one failed
|
|
?DISPATCH_ERROR
|
|
end,
|
|
{Code, BodyL}.
|
|
|
|
message(Map) ->
|
|
try
|
|
make_message(Map)
|
|
catch
|
|
throw:Reason ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
make_message(Map) ->
|
|
Encoding = maps:get(<<"payload_encoding">>, Map, plain),
|
|
case decode_payload(Encoding, maps:get(<<"payload">>, Map)) of
|
|
{ok, Payload} ->
|
|
QoS = maps:get(<<"qos">>, Map, 0),
|
|
Topic = maps:get(<<"topic">>, Map),
|
|
Retain = maps:get(<<"retain">>, Map, false),
|
|
Headers =
|
|
case maps:get(<<"properties">>, Map, #{}) of
|
|
Properties when
|
|
is_map(Properties) andalso
|
|
map_size(Properties) > 0
|
|
->
|
|
#{properties => to_msg_properties(Properties)};
|
|
_ ->
|
|
#{}
|
|
end,
|
|
try
|
|
_ = emqx_topic:validate(name, Topic)
|
|
catch
|
|
error:_Reason ->
|
|
throw(invalid_topic_name)
|
|
end,
|
|
Message = emqx_message:make(
|
|
http_api, QoS, Topic, Payload, #{retain => Retain}, Headers
|
|
),
|
|
Size = emqx_message:estimate_size(Message),
|
|
(Size > size_limit()) andalso throw(packet_too_large),
|
|
{ok, Message};
|
|
{error, R} ->
|
|
{error, R}
|
|
end.
|
|
|
|
to_msg_properties(Properties) ->
|
|
maps:fold(
|
|
fun to_property/3,
|
|
#{},
|
|
Properties
|
|
).
|
|
|
|
to_property(<<"payload_format_indicator">>, V, M) -> M#{'Payload-Format-Indicator' => V};
|
|
to_property(<<"message_expiry_interval">>, V, M) -> M#{'Message-Expiry-Interval' => V};
|
|
to_property(<<"response_topic">>, V, M) -> M#{'Response-Topic' => V};
|
|
to_property(<<"correlation_data">>, V, M) -> M#{'Correlation-Data' => V};
|
|
to_property(<<"user_properties">>, V, M) -> M#{'User-Property' => maps:to_list(V)};
|
|
to_property(<<"content_type">>, V, M) -> M#{'Content-Type' => V}.
|
|
|
|
%% get the global packet size limit since HTTP API does not belong to any zone.
|
|
size_limit() ->
|
|
try
|
|
emqx_config:get([mqtt, max_packet_size])
|
|
catch
|
|
_:_ ->
|
|
%% leave 1000 bytes for topic name etc.
|
|
?MAX_PACKET_SIZE
|
|
end.
|
|
|
|
decode_payload(plain, Payload) ->
|
|
{ok, Payload};
|
|
decode_payload(base64, Payload) ->
|
|
try
|
|
{ok, base64:decode(Payload)}
|
|
catch
|
|
_:_ ->
|
|
{error, {decode_base64_payload_failed, Payload}}
|
|
end.
|
|
|
|
messages([]) ->
|
|
{errror, <<"empty_batch">>};
|
|
messages(List) ->
|
|
messages(List, []).
|
|
|
|
messages([], Res) ->
|
|
{ok, lists:reverse(Res)};
|
|
messages([MessageMap | List], Res) ->
|
|
case message(MessageMap) of
|
|
{ok, Message} ->
|
|
messages(List, [Message | Res]);
|
|
{error, R} ->
|
|
{error, R}
|
|
end.
|
|
|
|
make_publish_response(#message{id = ID}) ->
|
|
#{
|
|
id => emqx_guid:to_hexstr(ID)
|
|
}.
|
|
|
|
make_publish_error_response(ReasonCode) ->
|
|
make_publish_error_response(ReasonCode, emqx_reason_codes:name(ReasonCode)).
|
|
|
|
make_publish_error_response(ReasonCode, Msg) ->
|
|
#{
|
|
reason_code => ReasonCode,
|
|
message => to_binary(Msg)
|
|
}.
|
|
|
|
to_binary(Atom) when is_atom(Atom) ->
|
|
atom_to_binary(Atom);
|
|
to_binary(Msg) when is_binary(Msg) ->
|
|
Msg;
|
|
to_binary(Term) ->
|
|
list_to_binary(io_lib:format("~0p", [Term])).
|