352 lines
11 KiB
Erlang
352 lines
11 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_message).
|
|
|
|
-compile(inline).
|
|
|
|
-include("emqx.hrl").
|
|
-include("emqx_mqtt.hrl").
|
|
-include("types.hrl").
|
|
|
|
%% Create
|
|
-export([ make/2
|
|
, make/3
|
|
, make/4
|
|
, make/6
|
|
, make/7
|
|
]).
|
|
|
|
%% Fields
|
|
-export([ id/1
|
|
, qos/1
|
|
, from/1
|
|
, topic/1
|
|
, payload/1
|
|
, timestamp/1
|
|
]).
|
|
|
|
%% Flags
|
|
-export([ is_sys/1
|
|
, clean_dup/1
|
|
, get_flag/2
|
|
, get_flag/3
|
|
, get_flags/1
|
|
, set_flag/2
|
|
, set_flag/3
|
|
, set_flags/2
|
|
, unset_flag/2
|
|
]).
|
|
|
|
%% Headers
|
|
-export([ get_headers/1
|
|
, get_header/2
|
|
, get_header/3
|
|
, set_header/3
|
|
, set_headers/2
|
|
, remove_header/2
|
|
]).
|
|
|
|
-export([ is_expired/1
|
|
, update_expiry/1
|
|
]).
|
|
|
|
-export([ to_packet/2
|
|
, to_map/1
|
|
, to_list/1
|
|
, from_map/1
|
|
]).
|
|
|
|
-export_type([message_map/0]).
|
|
|
|
-type(message_map() :: #{id := binary(),
|
|
qos := 0 | 1 | 2,
|
|
from := atom() | binary(),
|
|
flags := emqx_types:flags(),
|
|
headers := emqx_types:headers(),
|
|
topic := emqx_types:topic(),
|
|
payload := emqx_types:payload(),
|
|
timestamp := integer()}
|
|
).
|
|
|
|
-export([format/1]).
|
|
|
|
-elvis([{elvis_style, god_modules, disable}]).
|
|
|
|
-spec(make(emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()).
|
|
make(Topic, Payload) ->
|
|
make(undefined, Topic, Payload).
|
|
|
|
-spec(make(emqx_types:clientid(),
|
|
emqx_topic:topic(),
|
|
emqx_types:payload()) -> emqx_types:message()).
|
|
make(From, Topic, Payload) ->
|
|
make(From, ?QOS_0, Topic, Payload).
|
|
|
|
-spec(make(emqx_types:clientid(),
|
|
emqx_types:qos(),
|
|
emqx_topic:topic(),
|
|
emqx_types:payload()) -> emqx_types:message()).
|
|
make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 ->
|
|
Now = erlang:system_time(millisecond),
|
|
#message{id = emqx_guid:gen(),
|
|
qos = QoS,
|
|
from = From,
|
|
topic = Topic,
|
|
payload = Payload,
|
|
timestamp = Now
|
|
}.
|
|
|
|
-spec(make(emqx_types:clientid(),
|
|
emqx_types:qos(),
|
|
emqx_topic:topic(),
|
|
emqx_types:payload(),
|
|
emqx_types:flags(),
|
|
emqx_types:headers()) -> emqx_types:message()).
|
|
make(From, QoS, Topic, Payload, Flags, Headers)
|
|
when ?QOS_0 =< QoS, QoS =< ?QOS_2,
|
|
is_map(Flags), is_map(Headers) ->
|
|
Now = erlang:system_time(millisecond),
|
|
#message{id = emqx_guid:gen(),
|
|
qos = QoS,
|
|
from = From,
|
|
flags = Flags,
|
|
headers = Headers,
|
|
topic = Topic,
|
|
payload = Payload,
|
|
timestamp = Now
|
|
}.
|
|
|
|
-spec(make(MsgId :: binary(),
|
|
emqx_types:clientid(),
|
|
emqx_types:qos(),
|
|
emqx_topic:topic(),
|
|
emqx_types:payload(),
|
|
emqx_types:flags(),
|
|
emqx_types:headers()) -> emqx_types:message()).
|
|
make(MsgId, From, QoS, Topic, Payload, Flags, Headers)
|
|
when ?QOS_0 =< QoS, QoS =< ?QOS_2,
|
|
is_map(Flags), is_map(Headers) ->
|
|
Now = erlang:system_time(millisecond),
|
|
#message{id = MsgId,
|
|
qos = QoS,
|
|
from = From,
|
|
flags = Flags,
|
|
headers = Headers,
|
|
topic = Topic,
|
|
payload = Payload,
|
|
timestamp = Now
|
|
}.
|
|
|
|
-spec(id(emqx_types:message()) -> maybe(binary())).
|
|
id(#message{id = Id}) -> Id.
|
|
|
|
-spec(qos(emqx_types:message()) -> emqx_types:qos()).
|
|
qos(#message{qos = QoS}) -> QoS.
|
|
|
|
-spec(from(emqx_types:message()) -> atom() | binary()).
|
|
from(#message{from = From}) -> From.
|
|
|
|
-spec(topic(emqx_types:message()) -> emqx_types:topic()).
|
|
topic(#message{topic = Topic}) -> Topic.
|
|
|
|
-spec(payload(emqx_types:message()) -> emqx_types:payload()).
|
|
payload(#message{payload = Payload}) -> Payload.
|
|
|
|
-spec(timestamp(emqx_types:message()) -> integer()).
|
|
timestamp(#message{timestamp = TS}) -> TS.
|
|
|
|
-spec(is_sys(emqx_types:message()) -> boolean()).
|
|
is_sys(#message{flags = #{sys := true}}) ->
|
|
true;
|
|
is_sys(#message{topic = <<"$SYS/", _/binary>>}) ->
|
|
true;
|
|
is_sys(_Msg) -> false.
|
|
|
|
-spec(clean_dup(emqx_types:message()) -> emqx_types:message()).
|
|
clean_dup(Msg = #message{flags = Flags = #{dup := true}}) ->
|
|
Msg#message{flags = Flags#{dup => false}};
|
|
clean_dup(Msg) -> Msg.
|
|
|
|
-spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()).
|
|
set_flags(New, Msg = #message{flags = Old}) when is_map(New) ->
|
|
Msg#message{flags = maps:merge(Old, New)}.
|
|
|
|
-spec(get_flag(emqx_types:flag(), emqx_types:message()) -> boolean()).
|
|
get_flag(Flag, Msg) ->
|
|
get_flag(Flag, Msg, false).
|
|
|
|
get_flag(Flag, #message{flags = Flags}, Default) ->
|
|
maps:get(Flag, Flags, Default).
|
|
|
|
-spec(get_flags(emqx_types:message()) -> maybe(map())).
|
|
get_flags(#message{flags = Flags}) -> Flags.
|
|
|
|
-spec(set_flag(emqx_types:flag(), emqx_types:message()) -> emqx_types:message()).
|
|
set_flag(Flag, Msg = #message{flags = Flags}) when is_atom(Flag) ->
|
|
Msg#message{flags = maps:put(Flag, true, Flags)}.
|
|
|
|
-spec(set_flag(emqx_types:flag(), boolean() | integer(), emqx_types:message())
|
|
-> emqx_types:message()).
|
|
set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) ->
|
|
Msg#message{flags = maps:put(Flag, Val, Flags)}.
|
|
|
|
-spec(unset_flag(emqx_types:flag(), emqx_types:message()) -> emqx_types:message()).
|
|
unset_flag(Flag, Msg = #message{flags = Flags}) ->
|
|
case maps:is_key(Flag, Flags) of
|
|
true -> Msg#message{flags = maps:remove(Flag, Flags)};
|
|
false -> Msg
|
|
end.
|
|
|
|
-spec(set_headers(map(), emqx_types:message()) -> emqx_types:message()).
|
|
set_headers(New, Msg = #message{headers = Old}) when is_map(New) ->
|
|
Msg#message{headers = maps:merge(Old, New)}.
|
|
|
|
-spec(get_headers(emqx_types:message()) -> maybe(map())).
|
|
get_headers(Msg) -> Msg#message.headers.
|
|
|
|
-spec(get_header(term(), emqx_types:message()) -> term()).
|
|
get_header(Hdr, Msg) ->
|
|
get_header(Hdr, Msg, undefined).
|
|
-spec(get_header(term(), emqx_types:message(), term()) -> term()).
|
|
get_header(Hdr, #message{headers = Headers}, Default) ->
|
|
maps:get(Hdr, Headers, Default).
|
|
|
|
-spec(set_header(term(), term(), emqx_types:message()) -> emqx_types:message()).
|
|
set_header(Hdr, Val, Msg = #message{headers = Headers}) ->
|
|
Msg#message{headers = maps:put(Hdr, Val, Headers)}.
|
|
|
|
-spec(remove_header(term(), emqx_types:message()) -> emqx_types:message()).
|
|
remove_header(Hdr, Msg = #message{headers = Headers}) ->
|
|
case maps:is_key(Hdr, Headers) of
|
|
true -> Msg#message{headers = maps:remove(Hdr, Headers)};
|
|
false -> Msg
|
|
end.
|
|
|
|
-spec(is_expired(emqx_types:message()) -> boolean()).
|
|
is_expired(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
|
|
timestamp = CreatedAt}) ->
|
|
elapsed(CreatedAt) > timer:seconds(Interval);
|
|
is_expired(_Msg) -> false.
|
|
|
|
-spec(update_expiry(emqx_types:message()) -> emqx_types:message()).
|
|
update_expiry(Msg = #message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
|
|
timestamp = CreatedAt}) ->
|
|
Props = maps:get(properties, Msg#message.headers),
|
|
case elapsed(CreatedAt) of
|
|
Elapsed when Elapsed > 0 ->
|
|
Interval1 = max(1, Interval - (Elapsed div 1000)),
|
|
set_header(properties, Props#{'Message-Expiry-Interval' => Interval1}, Msg);
|
|
_ -> Msg
|
|
end;
|
|
update_expiry(Msg) -> Msg.
|
|
|
|
%% @doc Message to PUBLISH Packet.
|
|
-spec(to_packet(emqx_types:packet_id(), emqx_types:message())
|
|
-> emqx_types:packet()).
|
|
to_packet(PacketId, Msg = #message{qos = QoS, headers = Headers,
|
|
topic = Topic, payload = Payload}) ->
|
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
|
dup = get_flag(dup, Msg),
|
|
qos = QoS,
|
|
retain = get_flag(retain, Msg)
|
|
},
|
|
variable = #mqtt_packet_publish{topic_name = Topic,
|
|
packet_id = PacketId,
|
|
properties = filter_pub_props(
|
|
maps:get(properties, Headers, #{}))
|
|
},
|
|
payload = Payload
|
|
}.
|
|
|
|
filter_pub_props(Props) ->
|
|
maps:with(['Payload-Format-Indicator',
|
|
'Message-Expiry-Interval',
|
|
'Response-Topic',
|
|
'Correlation-Data',
|
|
'User-Property',
|
|
'Subscription-Identifier',
|
|
'Content-Type'
|
|
], Props).
|
|
|
|
%% @doc Message to map
|
|
-spec(to_map(emqx_types:message()) -> message_map()).
|
|
to_map(#message{
|
|
id = Id,
|
|
qos = QoS,
|
|
from = From,
|
|
flags = Flags,
|
|
headers = Headers,
|
|
topic = Topic,
|
|
payload = Payload,
|
|
timestamp = Timestamp
|
|
}) ->
|
|
#{id => Id,
|
|
qos => QoS,
|
|
from => From,
|
|
flags => Flags,
|
|
headers => Headers,
|
|
topic => Topic,
|
|
payload => Payload,
|
|
timestamp => Timestamp
|
|
}.
|
|
|
|
%% @doc Message to tuple list
|
|
-spec(to_list(emqx_types:message()) -> list()).
|
|
to_list(Msg) ->
|
|
lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))).
|
|
|
|
%% @doc Map to message
|
|
-spec(from_map(message_map()) -> emqx_types:message()).
|
|
from_map(#{id := Id,
|
|
qos := QoS,
|
|
from := From,
|
|
flags := Flags,
|
|
headers := Headers,
|
|
topic := Topic,
|
|
payload := Payload,
|
|
timestamp := Timestamp
|
|
}) ->
|
|
#message{
|
|
id = Id,
|
|
qos = QoS,
|
|
from = From,
|
|
flags = Flags,
|
|
headers = Headers,
|
|
topic = Topic,
|
|
payload = Payload,
|
|
timestamp = Timestamp
|
|
}.
|
|
|
|
%% MilliSeconds
|
|
elapsed(Since) ->
|
|
max(0, erlang:system_time(millisecond) - Since).
|
|
|
|
format(#message{id = Id,
|
|
qos = QoS,
|
|
topic = Topic,
|
|
from = From,
|
|
flags = Flags,
|
|
headers = Headers}) ->
|
|
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)",
|
|
[Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]).
|
|
|
|
format(flags, Flags) ->
|
|
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
|
|
format(headers, Headers) ->
|
|
io_lib:format("~p", [Headers]).
|
|
|