emqx/src/emqx_message.erl

352 lines
11 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_message).
-compile(inline).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("types.hrl").
%% Create
-export([ make/2
, make/3
, make/4
, make/6
, make/7
]).
%% Fields
-export([ id/1
, qos/1
, from/1
, topic/1
, payload/1
, timestamp/1
]).
%% Flags
-export([ is_sys/1
, clean_dup/1
, get_flag/2
, get_flag/3
, get_flags/1
, set_flag/2
, set_flag/3
, set_flags/2
, unset_flag/2
]).
%% Headers
-export([ get_headers/1
, get_header/2
, get_header/3
, set_header/3
, set_headers/2
, remove_header/2
]).
-export([ is_expired/1
, update_expiry/1
]).
-export([ to_packet/2
, to_map/1
, to_list/1
, from_map/1
]).
-export_type([message_map/0]).
-type(message_map() :: #{id := binary(),
qos := 0 | 1 | 2,
from := atom() | binary(),
flags := emqx_types:flags(),
headers := emqx_types:headers(),
topic := emqx_types:topic(),
payload := emqx_types:payload(),
timestamp := integer()}
).
-export([format/1]).
-elvis([{elvis_style, god_modules, disable}]).
-spec(make(emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()).
make(Topic, Payload) ->
make(undefined, Topic, Payload).
-spec(make(emqx_types:clientid(),
emqx_topic:topic(),
emqx_types:payload()) -> emqx_types:message()).
make(From, Topic, Payload) ->
make(From, ?QOS_0, Topic, Payload).
-spec(make(emqx_types:clientid(),
emqx_types:qos(),
emqx_topic:topic(),
emqx_types:payload()) -> emqx_types:message()).
make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 ->
Now = erlang:system_time(millisecond),
#message{id = emqx_guid:gen(),
qos = QoS,
from = From,
topic = Topic,
payload = Payload,
timestamp = Now
}.
-spec(make(emqx_types:clientid(),
emqx_types:qos(),
emqx_topic:topic(),
emqx_types:payload(),
emqx_types:flags(),
emqx_types:headers()) -> emqx_types:message()).
make(From, QoS, Topic, Payload, Flags, Headers)
when ?QOS_0 =< QoS, QoS =< ?QOS_2,
is_map(Flags), is_map(Headers) ->
Now = erlang:system_time(millisecond),
#message{id = emqx_guid:gen(),
qos = QoS,
from = From,
flags = Flags,
headers = Headers,
topic = Topic,
payload = Payload,
timestamp = Now
}.
-spec(make(MsgId :: binary(),
emqx_types:clientid(),
emqx_types:qos(),
emqx_topic:topic(),
emqx_types:payload(),
emqx_types:flags(),
emqx_types:headers()) -> emqx_types:message()).
make(MsgId, From, QoS, Topic, Payload, Flags, Headers)
when ?QOS_0 =< QoS, QoS =< ?QOS_2,
is_map(Flags), is_map(Headers) ->
Now = erlang:system_time(millisecond),
#message{id = MsgId,
qos = QoS,
from = From,
flags = Flags,
headers = Headers,
topic = Topic,
payload = Payload,
timestamp = Now
}.
-spec(id(emqx_types:message()) -> maybe(binary())).
id(#message{id = Id}) -> Id.
-spec(qos(emqx_types:message()) -> emqx_types:qos()).
qos(#message{qos = QoS}) -> QoS.
-spec(from(emqx_types:message()) -> atom() | binary()).
from(#message{from = From}) -> From.
-spec(topic(emqx_types:message()) -> emqx_types:topic()).
topic(#message{topic = Topic}) -> Topic.
-spec(payload(emqx_types:message()) -> emqx_types:payload()).
payload(#message{payload = Payload}) -> Payload.
-spec(timestamp(emqx_types:message()) -> integer()).
timestamp(#message{timestamp = TS}) -> TS.
-spec(is_sys(emqx_types:message()) -> boolean()).
is_sys(#message{flags = #{sys := true}}) ->
true;
is_sys(#message{topic = <<"$SYS/", _/binary>>}) ->
true;
is_sys(_Msg) -> false.
-spec(clean_dup(emqx_types:message()) -> emqx_types:message()).
clean_dup(Msg = #message{flags = Flags = #{dup := true}}) ->
Msg#message{flags = Flags#{dup => false}};
clean_dup(Msg) -> Msg.
-spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()).
set_flags(New, Msg = #message{flags = Old}) when is_map(New) ->
Msg#message{flags = maps:merge(Old, New)}.
-spec(get_flag(emqx_types:flag(), emqx_types:message()) -> boolean()).
get_flag(Flag, Msg) ->
get_flag(Flag, Msg, false).
get_flag(Flag, #message{flags = Flags}, Default) ->
maps:get(Flag, Flags, Default).
-spec(get_flags(emqx_types:message()) -> maybe(map())).
get_flags(#message{flags = Flags}) -> Flags.
-spec(set_flag(emqx_types:flag(), emqx_types:message()) -> emqx_types:message()).
set_flag(Flag, Msg = #message{flags = Flags}) when is_atom(Flag) ->
Msg#message{flags = maps:put(Flag, true, Flags)}.
-spec(set_flag(emqx_types:flag(), boolean() | integer(), emqx_types:message())
-> emqx_types:message()).
set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) ->
Msg#message{flags = maps:put(Flag, Val, Flags)}.
-spec(unset_flag(emqx_types:flag(), emqx_types:message()) -> emqx_types:message()).
unset_flag(Flag, Msg = #message{flags = Flags}) ->
case maps:is_key(Flag, Flags) of
true -> Msg#message{flags = maps:remove(Flag, Flags)};
false -> Msg
end.
-spec(set_headers(map(), emqx_types:message()) -> emqx_types:message()).
set_headers(New, Msg = #message{headers = Old}) when is_map(New) ->
Msg#message{headers = maps:merge(Old, New)}.
-spec(get_headers(emqx_types:message()) -> maybe(map())).
get_headers(Msg) -> Msg#message.headers.
-spec(get_header(term(), emqx_types:message()) -> term()).
get_header(Hdr, Msg) ->
get_header(Hdr, Msg, undefined).
-spec(get_header(term(), emqx_types:message(), term()) -> term()).
get_header(Hdr, #message{headers = Headers}, Default) ->
maps:get(Hdr, Headers, Default).
-spec(set_header(term(), term(), emqx_types:message()) -> emqx_types:message()).
set_header(Hdr, Val, Msg = #message{headers = Headers}) ->
Msg#message{headers = maps:put(Hdr, Val, Headers)}.
-spec(remove_header(term(), emqx_types:message()) -> emqx_types:message()).
remove_header(Hdr, Msg = #message{headers = Headers}) ->
case maps:is_key(Hdr, Headers) of
true -> Msg#message{headers = maps:remove(Hdr, Headers)};
false -> Msg
end.
-spec(is_expired(emqx_types:message()) -> boolean()).
is_expired(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
timestamp = CreatedAt}) ->
elapsed(CreatedAt) > timer:seconds(Interval);
is_expired(_Msg) -> false.
-spec(update_expiry(emqx_types:message()) -> emqx_types:message()).
update_expiry(Msg = #message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
timestamp = CreatedAt}) ->
Props = maps:get(properties, Msg#message.headers),
case elapsed(CreatedAt) of
Elapsed when Elapsed > 0 ->
Interval1 = max(1, Interval - (Elapsed div 1000)),
set_header(properties, Props#{'Message-Expiry-Interval' => Interval1}, Msg);
_ -> Msg
end;
update_expiry(Msg) -> Msg.
%% @doc Message to PUBLISH Packet.
-spec(to_packet(emqx_types:packet_id(), emqx_types:message())
-> emqx_types:packet()).
to_packet(PacketId, Msg = #message{qos = QoS, headers = Headers,
topic = Topic, payload = Payload}) ->
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
dup = get_flag(dup, Msg),
qos = QoS,
retain = get_flag(retain, Msg)
},
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId,
properties = filter_pub_props(
maps:get(properties, Headers, #{}))
},
payload = Payload
}.
filter_pub_props(Props) ->
maps:with(['Payload-Format-Indicator',
'Message-Expiry-Interval',
'Response-Topic',
'Correlation-Data',
'User-Property',
'Subscription-Identifier',
'Content-Type'
], Props).
%% @doc Message to map
-spec(to_map(emqx_types:message()) -> message_map()).
to_map(#message{
id = Id,
qos = QoS,
from = From,
flags = Flags,
headers = Headers,
topic = Topic,
payload = Payload,
timestamp = Timestamp
}) ->
#{id => Id,
qos => QoS,
from => From,
flags => Flags,
headers => Headers,
topic => Topic,
payload => Payload,
timestamp => Timestamp
}.
%% @doc Message to tuple list
-spec(to_list(emqx_types:message()) -> list()).
to_list(Msg) ->
lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))).
%% @doc Map to message
-spec(from_map(message_map()) -> emqx_types:message()).
from_map(#{id := Id,
qos := QoS,
from := From,
flags := Flags,
headers := Headers,
topic := Topic,
payload := Payload,
timestamp := Timestamp
}) ->
#message{
id = Id,
qos = QoS,
from = From,
flags = Flags,
headers = Headers,
topic = Topic,
payload = Payload,
timestamp = Timestamp
}.
%% MilliSeconds
elapsed(Since) ->
max(0, erlang:system_time(millisecond) - Since).
format(#message{id = Id,
qos = QoS,
topic = Topic,
from = From,
flags = Flags,
headers = Headers}) ->
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)",
[Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]).
format(flags, Flags) ->
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
format(headers, Headers) ->
io_lib:format("~p", [Headers]).