commit
e9474bffcd
|
@ -16,12 +16,24 @@
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
|
-include("types.hrl").
|
||||||
|
|
||||||
|
%% Create
|
||||||
-export([ make/2
|
-export([ make/2
|
||||||
, make/3
|
, make/3
|
||||||
, make/4
|
, make/4
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% Fields
|
||||||
|
-export([ id/1
|
||||||
|
, qos/1
|
||||||
|
, from/1
|
||||||
|
, topic/1
|
||||||
|
, payload/1
|
||||||
|
, timestamp/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Flags
|
||||||
-export([ get_flag/2
|
-export([ get_flag/2
|
||||||
, get_flag/3
|
, get_flag/3
|
||||||
, set_flag/2
|
, set_flag/2
|
||||||
|
@ -30,6 +42,7 @@
|
||||||
, unset_flag/2
|
, unset_flag/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% Headers
|
||||||
-export([ get_headers/1
|
-export([ get_headers/1
|
||||||
, get_header/2
|
, get_header/2
|
||||||
, get_header/3
|
, get_header/3
|
||||||
|
@ -44,8 +57,6 @@
|
||||||
|
|
||||||
-export([ to_map/1
|
-export([ to_map/1
|
||||||
, to_list/1
|
, to_list/1
|
||||||
, to_bin_key_map/1
|
|
||||||
, to_bin_key_list/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([format/1]).
|
-export([format/1]).
|
||||||
|
@ -56,14 +67,17 @@
|
||||||
make(Topic, Payload) ->
|
make(Topic, Payload) ->
|
||||||
make(undefined, Topic, Payload).
|
make(undefined, Topic, Payload).
|
||||||
|
|
||||||
-spec(make(atom() | emqx_types:client_id(), emqx_topic:topic(), emqx_types:payload())
|
-spec(make(atom() | emqx_types:client_id(),
|
||||||
-> emqx_types:message()).
|
emqx_topic:topic(),
|
||||||
|
emqx_types:payload()) -> emqx_types:message()).
|
||||||
make(From, Topic, Payload) ->
|
make(From, Topic, Payload) ->
|
||||||
make(From, ?QOS_0, Topic, Payload).
|
make(From, ?QOS_0, Topic, Payload).
|
||||||
|
|
||||||
-spec(make(atom() | emqx_types:client_id(), emqx_mqtt_types:qos(),
|
-spec(make(atom() | emqx_types:client_id(),
|
||||||
emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()).
|
emqx_mqtt_types:qos(),
|
||||||
make(From, QoS, Topic, Payload) ->
|
emqx_topic:topic(),
|
||||||
|
emqx_types:payload()) -> emqx_types:message()).
|
||||||
|
make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 ->
|
||||||
#message{id = emqx_guid:gen(),
|
#message{id = emqx_guid:gen(),
|
||||||
qos = QoS,
|
qos = QoS,
|
||||||
from = From,
|
from = From,
|
||||||
|
@ -72,6 +86,24 @@ make(From, QoS, Topic, Payload) ->
|
||||||
payload = Payload,
|
payload = Payload,
|
||||||
timestamp = os:timestamp()}.
|
timestamp = os:timestamp()}.
|
||||||
|
|
||||||
|
-spec(id(emqx_types:message()) -> maybe(binary())).
|
||||||
|
id(#message{id = Id}) -> Id.
|
||||||
|
|
||||||
|
-spec(qos(emqx_types:message()) -> emqx_mqtt_types:qos()).
|
||||||
|
qos(#message{qos = QoS}) -> QoS.
|
||||||
|
|
||||||
|
-spec(from(emqx_types:message()) -> atom() | binary()).
|
||||||
|
from(#message{from = From}) -> From.
|
||||||
|
|
||||||
|
-spec(topic(emqx_types:message()) -> emqx_types:topic()).
|
||||||
|
topic(#message{topic = Topic}) -> Topic.
|
||||||
|
|
||||||
|
-spec(payload(emqx_types:message()) -> emqx_types:payload()).
|
||||||
|
payload(#message{payload = Payload}) -> Payload.
|
||||||
|
|
||||||
|
-spec(timestamp(emqx_types:message()) -> erlang:timestamp()).
|
||||||
|
timestamp(#message{timestamp = TS}) -> TS.
|
||||||
|
|
||||||
-spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()).
|
-spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()).
|
||||||
set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) ->
|
set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) ->
|
||||||
Msg#message{flags = Flags};
|
Msg#message{flags = Flags};
|
||||||
|
@ -159,24 +191,11 @@ update_expiry(Msg) -> Msg.
|
||||||
to_map(Msg) ->
|
to_map(Msg) ->
|
||||||
maps:from_list(to_list(Msg)).
|
maps:from_list(to_list(Msg)).
|
||||||
|
|
||||||
%% @doc Message to map
|
|
||||||
-spec(to_bin_key_map(emqx_types:message()) -> #{binary() => any()}).
|
|
||||||
to_bin_key_map(Msg) ->
|
|
||||||
maps:from_list(to_bin_key_list(Msg)).
|
|
||||||
|
|
||||||
%% @doc Message to tuple list
|
%% @doc Message to tuple list
|
||||||
-spec(to_list(emqx_types:message()) -> map()).
|
-spec(to_list(emqx_types:message()) -> map()).
|
||||||
to_list(Msg) ->
|
to_list(Msg) ->
|
||||||
lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))).
|
lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))).
|
||||||
|
|
||||||
%% @doc Message to tuple list
|
|
||||||
-spec(to_bin_key_list(emqx_types:message()) -> map()).
|
|
||||||
to_bin_key_list(Msg) ->
|
|
||||||
lists:zipwith(
|
|
||||||
fun(Key, Val) ->
|
|
||||||
{bin(Key), bin_key_map(Val)}
|
|
||||||
end, record_info(fields, message), tl(tuple_to_list(Msg))).
|
|
||||||
|
|
||||||
%% MilliSeconds
|
%% MilliSeconds
|
||||||
elapsed(Since) ->
|
elapsed(Since) ->
|
||||||
max(0, timer:now_diff(os:timestamp(), Since) div 1000).
|
max(0, timer:now_diff(os:timestamp(), Since) div 1000).
|
||||||
|
@ -191,14 +210,3 @@ format(flags, Flags) ->
|
||||||
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
|
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
|
||||||
format(headers, Headers) ->
|
format(headers, Headers) ->
|
||||||
io_lib:format("~p", [Headers]).
|
io_lib:format("~p", [Headers]).
|
||||||
|
|
||||||
bin_key_map(Map) when is_map(Map) ->
|
|
||||||
maps:fold(fun(Key, Val, Acc) ->
|
|
||||||
Acc#{bin(Key) => bin_key_map(Val)}
|
|
||||||
end, #{}, Map);
|
|
||||||
bin_key_map(Data) ->
|
|
||||||
Data.
|
|
||||||
|
|
||||||
bin(Bin) when is_binary(Bin) -> Bin;
|
|
||||||
bin(Atom) when is_atom(Atom) -> list_to_binary(atom_to_list(Atom));
|
|
||||||
bin(Str) when is_list(Str) -> list_to_binary(Str).
|
|
||||||
|
|
|
@ -124,8 +124,8 @@ stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
|
||||||
|
|
||||||
%% @doc Enqueue a message.
|
%% @doc Enqueue a message.
|
||||||
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
|
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
|
||||||
in(#message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
|
in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
|
||||||
{_Dropped = undefined, MQ};
|
{_Dropped = Msg, MQ};
|
||||||
in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
|
in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
|
||||||
p_table = PTab,
|
p_table = PTab,
|
||||||
q = Q,
|
q = Q,
|
||||||
|
|
|
@ -140,7 +140,7 @@ publish_props(Headers) ->
|
||||||
%% @doc Message from Packet
|
%% @doc Message from Packet
|
||||||
-spec(to_message(emqx_types:credentials(), emqx_mqtt_types:packet())
|
-spec(to_message(emqx_types:credentials(), emqx_mqtt_types:packet())
|
||||||
-> emqx_types:message()).
|
-> emqx_types:message()).
|
||||||
to_message(#{client_id := ClientId, username := Username},
|
to_message(#{client_id := ClientId, username := Username, peername := Peername},
|
||||||
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||||
retain = Retain,
|
retain = Retain,
|
||||||
qos = QoS,
|
qos = QoS,
|
||||||
|
@ -150,7 +150,8 @@ to_message(#{client_id := ClientId, username := Username},
|
||||||
payload = Payload}) ->
|
payload = Payload}) ->
|
||||||
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
|
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
|
||||||
Msg#message{flags = #{dup => Dup, retain => Retain},
|
Msg#message{flags = #{dup => Dup, retain => Retain},
|
||||||
headers = merge_props(#{username => Username}, Props)}.
|
headers = merge_props(#{username => Username,
|
||||||
|
peername => Peername}, Props)}.
|
||||||
|
|
||||||
-spec(will_msg(#mqtt_packet_connect{}) -> emqx_types:message()).
|
-spec(will_msg(#mqtt_packet_connect{}) -> emqx_types:message()).
|
||||||
will_msg(#mqtt_packet_connect{will_flag = false}) ->
|
will_msg(#mqtt_packet_connect{will_flag = false}) ->
|
||||||
|
|
|
@ -276,7 +276,11 @@ plugin_unloaded(_Name, false) ->
|
||||||
ok;
|
ok;
|
||||||
plugin_unloaded(Name, true) ->
|
plugin_unloaded(Name, true) ->
|
||||||
case read_loaded() of
|
case read_loaded() of
|
||||||
{ok, Names} ->
|
{ok, Names0} ->
|
||||||
|
Names = lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1};
|
||||||
|
({Name1, true}) -> {true, Name1};
|
||||||
|
({_Name1, false}) -> false
|
||||||
|
end, Names0),
|
||||||
case lists:member(Name, Names) of
|
case lists:member(Name, Names) of
|
||||||
true ->
|
true ->
|
||||||
write_loaded(lists:delete(Name, Names));
|
write_loaded(lists:delete(Name, Names));
|
||||||
|
@ -306,4 +310,3 @@ write_loaded(AppNames) ->
|
||||||
?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]),
|
?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]),
|
||||||
{error, Error}
|
{error, Error}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -812,7 +812,11 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel,
|
||||||
%% Dispatch messages
|
%% Dispatch messages
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap}) ->
|
handle_dispatch(Msgs, State = #state{inflight = Inflight,
|
||||||
|
client_id = ClientId,
|
||||||
|
username = Username,
|
||||||
|
subscriptions = SubMap}) ->
|
||||||
|
SessProps = #{client_id => ClientId, username => Username},
|
||||||
%% Drain the mailbox and batch deliver
|
%% Drain the mailbox and batch deliver
|
||||||
Msgs1 = drain_m(batch_n(Inflight), Msgs),
|
Msgs1 = drain_m(batch_n(Inflight), Msgs),
|
||||||
%% Ack the messages for shared subscription
|
%% Ack the messages for shared subscription
|
||||||
|
@ -823,7 +827,9 @@ handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap
|
||||||
SubOpts = find_subopts(Topic, SubMap),
|
SubOpts = find_subopts(Topic, SubMap),
|
||||||
case process_subopts(SubOpts, Msg, State) of
|
case process_subopts(SubOpts, Msg, State) of
|
||||||
{ok, Msg1} -> [Msg1|Acc];
|
{ok, Msg1} -> [Msg1|Acc];
|
||||||
ignore -> Acc
|
ignore ->
|
||||||
|
emqx_hooks:run('message.dropped', [SessProps, Msg]),
|
||||||
|
Acc
|
||||||
end
|
end
|
||||||
end, [], Msgs2),
|
end, [], Msgs2),
|
||||||
NState = batch_process(Msgs3, State),
|
NState = batch_process(Msgs3, State),
|
||||||
|
@ -957,7 +963,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Use
|
||||||
if
|
if
|
||||||
Dropped =/= undefined ->
|
Dropped =/= undefined ->
|
||||||
SessProps = #{client_id => ClientId, username => Username},
|
SessProps = #{client_id => ClientId, username => Username},
|
||||||
ok = emqx_hooks:run('message.dropped', [SessProps, Msg]);
|
ok = emqx_hooks:run('message.dropped', [SessProps, Dropped]);
|
||||||
true -> ok
|
true -> ok
|
||||||
end,
|
end,
|
||||||
State#state{mqueue = NewQ}.
|
State#state{mqueue = NewQ}.
|
||||||
|
|
|
@ -228,7 +228,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
|
||||||
{noreply, start_timer(State#state{updates = Updates1}), hibernate};
|
{noreply, start_timer(State#state{updates = Updates1}), hibernate};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?LOG("error, [Stats] Unexpected info: ~p", [Info]),
|
?LOG(error, "[Stats] Unexpected info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, #state{timer = TRef}) ->
|
terminate(_Reason, #state{timer = TRef}) ->
|
||||||
|
|
|
@ -172,7 +172,7 @@ uptime(days, D) ->
|
||||||
publish(uptime, Uptime) ->
|
publish(uptime, Uptime) ->
|
||||||
safe_publish(systop(uptime), Uptime);
|
safe_publish(systop(uptime), Uptime);
|
||||||
publish(datetime, Datetime) ->
|
publish(datetime, Datetime) ->
|
||||||
safe_publish(systop(datatype), Datetime);
|
safe_publish(systop(datetime), Datetime);
|
||||||
publish(version, Version) ->
|
publish(version, Version) ->
|
||||||
safe_publish(systop(version), #{retain => true}, Version);
|
safe_publish(systop(version), #{retain => true}, Version);
|
||||||
publish(sysdescr, Descr) ->
|
publish(sysdescr, Descr) ->
|
||||||
|
|
|
@ -113,7 +113,7 @@ call(WSPid, Req) when is_pid(WSPid) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init(Req, Opts) ->
|
init(Req, Opts) ->
|
||||||
IdleTimeout = proplists:get_value(idle_timeout, Opts, 60000),
|
IdleTimeout = proplists:get_value(idle_timeout, Opts, 7200000),
|
||||||
DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])),
|
DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])),
|
||||||
MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of
|
MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of
|
||||||
0 -> infinity;
|
0 -> infinity;
|
||||||
|
|
|
@ -87,6 +87,7 @@ t_rpc(Config) when is_list(Config) ->
|
||||||
%% message from a different client, to avoid getting terminated by no-local
|
%% message from a different client, to avoid getting terminated by no-local
|
||||||
Msg1 = emqx_message:make(<<"ClientId-2">>, ?QOS_2, <<"t_rpc/one">>, <<"hello">>),
|
Msg1 = emqx_message:make(<<"ClientId-2">>, ?QOS_2, <<"t_rpc/one">>, <<"hello">>),
|
||||||
ok = emqx_session:subscribe(SPid, [{<<"forwarded/t_rpc/one">>, #{qos => ?QOS_1}}]),
|
ok = emqx_session:subscribe(SPid, [{<<"forwarded/t_rpc/one">>, #{qos => ?QOS_1}}]),
|
||||||
|
ct:sleep(100),
|
||||||
PacketId = 1,
|
PacketId = 1,
|
||||||
emqx_session:publish(SPid, PacketId, Msg1),
|
emqx_session:publish(SPid, PacketId, Msg1),
|
||||||
?wait(case emqx_mock_client:get_last_message(ConnPid) of
|
?wait(case emqx_mock_client:get_last_message(ConnPid) of
|
||||||
|
|
|
@ -14,34 +14,38 @@
|
||||||
|
|
||||||
-module(emqx_message_SUITE).
|
-module(emqx_message_SUITE).
|
||||||
|
|
||||||
-compile(export_all).
|
|
||||||
-compile(nowarn_export_all).
|
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
all() ->
|
-export([ t_make/1
|
||||||
[ message_make
|
, t_flag/1
|
||||||
, message_flag
|
, t_header/1
|
||||||
, message_header
|
, t_format/1
|
||||||
, message_format
|
, t_expired/1
|
||||||
, message_expired
|
, t_to_map/1
|
||||||
, message_to_map
|
]).
|
||||||
].
|
|
||||||
|
|
||||||
message_make(_) ->
|
-export([ all/0
|
||||||
Msg = emqx_message:make(<<"clientid">>, <<"payload">>),
|
, suite/0
|
||||||
?assertEqual(0, Msg#message.qos),
|
]).
|
||||||
|
|
||||||
|
t_make(_) ->
|
||||||
|
Msg = emqx_message:make(<<"topic">>, <<"payload">>),
|
||||||
|
?assertEqual(0, emqx_message:qos(Msg)),
|
||||||
|
?assertEqual(undefined, emqx_message:from(Msg)),
|
||||||
|
?assertEqual(<<"payload">>, emqx_message:payload(Msg)),
|
||||||
Msg1 = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
Msg1 = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||||
?assertEqual(0, Msg1#message.qos),
|
?assertEqual(0, emqx_message:qos(Msg1)),
|
||||||
|
?assertEqual(<<"topic">>, emqx_message:topic(Msg1)),
|
||||||
Msg2 = emqx_message:make(<<"clientid">>, ?QOS_2, <<"topic">>, <<"payload">>),
|
Msg2 = emqx_message:make(<<"clientid">>, ?QOS_2, <<"topic">>, <<"payload">>),
|
||||||
?assert(is_binary(Msg2#message.id)),
|
?assert(is_binary(emqx_message:id(Msg2))),
|
||||||
?assertEqual(2, Msg2#message.qos).
|
?assertEqual(2, emqx_message:qos(Msg2)),
|
||||||
|
?assertEqual(<<"clientid">>, emqx_message:from(Msg2)),
|
||||||
|
?assertEqual(<<"topic">>, emqx_message:topic(Msg2)),
|
||||||
|
?assertEqual(<<"payload">>, emqx_message:payload(Msg2)).
|
||||||
|
|
||||||
message_flag(_) ->
|
t_flag(_) ->
|
||||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||||
Msg2 = emqx_message:set_flag(retain, false, Msg),
|
Msg2 = emqx_message:set_flag(retain, false, Msg),
|
||||||
Msg3 = emqx_message:set_flag(dup, Msg2),
|
Msg3 = emqx_message:set_flag(dup, Msg2),
|
||||||
|
@ -55,7 +59,7 @@ message_flag(_) ->
|
||||||
?assert(emqx_message:get_flag(dup, Msg6)),
|
?assert(emqx_message:get_flag(dup, Msg6)),
|
||||||
?assert(emqx_message:get_flag(retain, Msg6)).
|
?assert(emqx_message:get_flag(retain, Msg6)).
|
||||||
|
|
||||||
message_header(_) ->
|
t_header(_) ->
|
||||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||||
Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg),
|
Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg),
|
||||||
Msg2 = emqx_message:set_header(c, 3, Msg1),
|
Msg2 = emqx_message:set_header(c, 3, Msg1),
|
||||||
|
@ -64,10 +68,10 @@ message_header(_) ->
|
||||||
Msg3 = emqx_message:remove_header(a, Msg2),
|
Msg3 = emqx_message:remove_header(a, Msg2),
|
||||||
?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg3)).
|
?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg3)).
|
||||||
|
|
||||||
message_format(_) ->
|
t_format(_) ->
|
||||||
io:format("~s", [emqx_message:format(emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>))]).
|
io:format("~s", [emqx_message:format(emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>))]).
|
||||||
|
|
||||||
message_expired(_) ->
|
t_expired(_) ->
|
||||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||||
Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg),
|
Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg),
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
|
@ -78,7 +82,7 @@ message_expired(_) ->
|
||||||
Msg2 = emqx_message:update_expiry(Msg1),
|
Msg2 = emqx_message:update_expiry(Msg1),
|
||||||
?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)).
|
?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)).
|
||||||
|
|
||||||
message_to_map(_) ->
|
t_to_map(_) ->
|
||||||
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>),
|
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>),
|
||||||
List = [{id, Msg#message.id},
|
List = [{id, Msg#message.id},
|
||||||
{qos, ?QOS_1},
|
{qos, ?QOS_1},
|
||||||
|
@ -91,3 +95,10 @@ message_to_map(_) ->
|
||||||
?assertEqual(List, emqx_message:to_list(Msg)),
|
?assertEqual(List, emqx_message:to_list(Msg)),
|
||||||
?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)).
|
?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)).
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
IsTestCase = fun("t_" ++ _) -> true; (_) -> false end,
|
||||||
|
[F || {F, _A} <- module_info(exports), IsTestCase(atom_to_list(F))].
|
||||||
|
|
||||||
|
suite() ->
|
||||||
|
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
|
||||||
|
|
||||||
|
|
|
@ -100,8 +100,13 @@ packet_message(_) ->
|
||||||
Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>),
|
Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>),
|
||||||
Msg2 = emqx_message:set_flag(retain, false, Msg),
|
Msg2 = emqx_message:set_flag(retain, false, Msg),
|
||||||
Pkt = emqx_packet:from_message(10, Msg2),
|
Pkt = emqx_packet:from_message(10, Msg2),
|
||||||
Msg3 = emqx_message:set_header(username, "test", Msg2),
|
Msg3 = emqx_message:set_header(
|
||||||
Msg4 = emqx_packet:to_message(#{client_id => <<"clientid">>, username => "test"}, Pkt),
|
peername, {{127,0,0,1}, 9527},
|
||||||
|
emqx_message:set_header(username, "test", Msg2)
|
||||||
|
),
|
||||||
|
Msg4 = emqx_packet:to_message(#{client_id => <<"clientid">>,
|
||||||
|
username => "test",
|
||||||
|
peername => {{127,0,0,1}, 9527}}, Pkt),
|
||||||
Msg5 = Msg4#message{timestamp = Msg3#message.timestamp, id = Msg3#message.id},
|
Msg5 = Msg4#message{timestamp = Msg3#message.timestamp, id = Msg3#message.id},
|
||||||
Msg5 = Msg3.
|
Msg5 = Msg3.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue