Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-04-03 04:24:31 +08:00
commit 21e31ab1c8
2 changed files with 75 additions and 29 deletions

View File

@ -16,12 +16,24 @@
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("types.hrl").
%% Create
-export([ make/2
, make/3
, make/4
]).
%% Fields
-export([ id/1
, qos/1
, from/1
, topic/1
, payload/1
, timestamp/1
]).
%% Flags
-export([ get_flag/2
, get_flag/3
, set_flag/2
@ -30,6 +42,7 @@
, unset_flag/2
]).
%% Headers
-export([ get_headers/1
, get_header/2
, get_header/3
@ -56,14 +69,17 @@
make(Topic, Payload) ->
make(undefined, Topic, Payload).
-spec(make(atom() | emqx_types:client_id(), emqx_topic:topic(), emqx_types:payload())
-> emqx_types:message()).
-spec(make(atom() | emqx_types:client_id(),
emqx_topic:topic(),
emqx_types:payload()) -> emqx_types:message()).
make(From, Topic, Payload) ->
make(From, ?QOS_0, Topic, Payload).
-spec(make(atom() | emqx_types:client_id(), emqx_mqtt_types:qos(),
emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()).
make(From, QoS, Topic, Payload) ->
-spec(make(atom() | emqx_types:client_id(),
emqx_mqtt_types:qos(),
emqx_topic:topic(),
emqx_types:payload()) -> emqx_types:message()).
make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 ->
#message{id = emqx_guid:gen(),
qos = QoS,
from = From,
@ -72,6 +88,24 @@ make(From, QoS, Topic, Payload) ->
payload = Payload,
timestamp = os:timestamp()}.
-spec(id(emqx_types:message()) -> maybe(binary())).
id(#message{id = Id}) -> Id.
-spec(qos(emqx_types:message()) -> emqx_mqtt_types:qos()).
qos(#message{qos = QoS}) -> QoS.
-spec(from(emqx_types:message()) -> atom() | binary()).
from(#message{from = From}) -> From.
-spec(topic(emqx_types:message()) -> emqx_types:topic()).
topic(#message{topic = Topic}) -> Topic.
-spec(payload(emqx_types:message()) -> emqx_types:payload()).
payload(#message{payload = Payload}) -> Payload.
-spec(timestamp(emqx_types:message()) -> erlang:timestamp()).
timestamp(#message{timestamp = TS}) -> TS.
-spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()).
set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) ->
Msg#message{flags = Flags};
@ -202,3 +236,4 @@ bin_key_map(Data) ->
bin(Bin) when is_binary(Bin) -> Bin;
bin(Atom) when is_atom(Atom) -> list_to_binary(atom_to_list(Atom));
bin(Str) when is_list(Str) -> list_to_binary(Str).

View File

@ -14,34 +14,38 @@
-module(emqx_message_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() ->
[ message_make
, message_flag
, message_header
, message_format
, message_expired
, message_to_map
].
-export([ t_make/1
, t_flag/1
, t_header/1
, t_format/1
, t_expired/1
, t_to_map/1
]).
message_make(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"payload">>),
?assertEqual(0, Msg#message.qos),
-export([ all/0
, suite/0
]).
t_make(_) ->
Msg = emqx_message:make(<<"topic">>, <<"payload">>),
?assertEqual(0, emqx_message:qos(Msg)),
?assertEqual(undefined, emqx_message:from(Msg)),
?assertEqual(<<"payload">>, emqx_message:payload(Msg)),
Msg1 = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
?assertEqual(0, Msg1#message.qos),
?assertEqual(0, emqx_message:qos(Msg1)),
?assertEqual(<<"topic">>, emqx_message:topic(Msg1)),
Msg2 = emqx_message:make(<<"clientid">>, ?QOS_2, <<"topic">>, <<"payload">>),
?assert(is_binary(Msg2#message.id)),
?assertEqual(2, Msg2#message.qos).
?assert(is_binary(emqx_message:id(Msg2))),
?assertEqual(2, emqx_message:qos(Msg2)),
?assertEqual(<<"clientid">>, emqx_message:from(Msg2)),
?assertEqual(<<"topic">>, emqx_message:topic(Msg2)),
?assertEqual(<<"payload">>, emqx_message:payload(Msg2)).
message_flag(_) ->
t_flag(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
Msg2 = emqx_message:set_flag(retain, false, Msg),
Msg3 = emqx_message:set_flag(dup, Msg2),
@ -55,7 +59,7 @@ message_flag(_) ->
?assert(emqx_message:get_flag(dup, Msg6)),
?assert(emqx_message:get_flag(retain, Msg6)).
message_header(_) ->
t_header(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg),
Msg2 = emqx_message:set_header(c, 3, Msg1),
@ -64,10 +68,10 @@ message_header(_) ->
Msg3 = emqx_message:remove_header(a, Msg2),
?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg3)).
message_format(_) ->
t_format(_) ->
io:format("~s", [emqx_message:format(emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>))]).
message_expired(_) ->
t_expired(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg),
timer:sleep(500),
@ -78,7 +82,7 @@ message_expired(_) ->
Msg2 = emqx_message:update_expiry(Msg1),
?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)).
message_to_map(_) ->
t_to_map(_) ->
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>),
List = [{id, Msg#message.id},
{qos, ?QOS_1},
@ -91,3 +95,10 @@ message_to_map(_) ->
?assertEqual(List, emqx_message:to_list(Msg)),
?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)).
all() ->
IsTestCase = fun("t_" ++ _) -> true; (_) -> false end,
[F || {F, _A} <- module_info(exports), IsTestCase(atom_to_list(F))].
suite() ->
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].