Merge pull request #940 from emqtt/emq20

V2.1.0-beta.2:Support pbkdf2 hash
This commit is contained in:
Feng Lee 2017-03-13 10:13:09 +08:00 committed by GitHub
commit 9b6c91cbad
32 changed files with 116 additions and 103 deletions

View File

@ -4,7 +4,7 @@ PROJECT_VERSION = 2.1.0
NO_AUTOPATCH = cuttlefish
DEPS = gproc lager esockd mochiweb lager_syslog
DEPS = gproc lager esockd mochiweb lager_syslog pbkdf2
dep_gproc = git https://github.com/uwiger/gproc
dep_getopt = git https://github.com/jcomellas/getopt v0.8.2
@ -12,6 +12,7 @@ dep_lager = git https://github.com/basho/lager master
dep_esockd = git https://github.com/emqtt/esockd master
dep_mochiweb = git https://github.com/emqtt/mochiweb
dep_lager_syslog = git https://github.com/basho/lager_syslog
dep_pbkdf2 = git https://github.com/comtihon/erlang-pbkdf2.git 2.0.0
ERLC_OPTS += +'{parse_transform, lager_transform}'

View File

@ -130,14 +130,14 @@ mqtt.client.enable_stats = off
## Upgrade QoS?
mqtt.session.upgrade_qos = off
## Max number of QoS 1 and 2 messages that can be “inflight” at one time.
## Max Size of the Inflight Window for QoS1 and QoS2 messages
## 0 means no limit
mqtt.session.max_inflight = 32
## Retry Interval for redelivering QoS1/2 messages.
mqtt.session.retry_interval = 20s
## Max Packets that Awaiting PUBREL, 0 means no limit
## Client -> Broker: Max Packets Awaiting PUBREL, 0 means no limit
mqtt.session.max_awaiting_rel = 100
## Awaiting PUBREL Timeout

View File

@ -22,9 +22,9 @@
-define(LICENSE_MESSAGE, "Licensed under the Apache License, Version 2.0").
-define(PROTOCOL_VERSION, "MQTT/3.1.1").
-define(PROTOCOL_VERSION, "MQTT/5.0").
-define(ERTS_MINIMUM, "7.0").
-define(ERTS_MINIMUM, "8.0").
%%--------------------------------------------------------------------
%% Sys/Queue/Share Topics' Prefix
@ -42,7 +42,7 @@
-type(pubsub() :: publish | subscribe).
-define(PUBSUB(PS), (PS =:= publish orelse PS =:= subscribe)).
-define(PS(PS), (PS =:= publish orelse PS =:= subscribe)).
%%--------------------------------------------------------------------
%% MQTT Topic
@ -172,7 +172,7 @@
severity :: warning | error | critical,
title :: iolist() | binary(),
summary :: iolist() | binary(),
timestamp :: erlang:timestamp() %% Timestamp
timestamp :: erlang:timestamp()
}).
-type(mqtt_alarm() :: #mqtt_alarm{}).
@ -186,8 +186,7 @@
-type(mqtt_plugin() :: #mqtt_plugin{}).
%%--------------------------------------------------------------------
%% MQTT CLI Command
%% For example: 'broker metrics'
%% MQTT CLI Command. For example: 'broker metrics'
%%--------------------------------------------------------------------
-record(mqtt_cli, { name, action, args = [], opts = [], usage, descr }).

View File

@ -80,6 +80,7 @@
%%--------------------------------------------------------------------
%% MQTT Control Packet Types
%%--------------------------------------------------------------------
-define(RESERVED, 0). %% Reserved
-define(CONNECT, 1). %% Client request to connect to Server
-define(CONNACK, 2). %% Server to Client: Connect acknowledgment
@ -94,7 +95,7 @@
-define(UNSUBACK, 11). %% Unsubscribe acknowledgment
-define(PINGREQ, 12). %% PING request
-define(PINGRESP, 13). %% PING response
-define(DISCONNECT, 14). %% Client is disconnecting
-define(DISCONNECT, 14). %% Client or Server is disconnecting
-define(AUTH, 15). %% Authentication exchange
-define(TYPE_NAMES, [
@ -146,11 +147,12 @@
%% MQTT Packet Fixed Header
%%--------------------------------------------------------------------
-record(mqtt_packet_header, {
type = ?RESERVED :: mqtt_packet_type(),
dup = false :: boolean(),
qos = ?QOS_0 :: mqtt_qos(),
retain = false :: boolean()}).
-record(mqtt_packet_header,
{ type = ?RESERVED :: mqtt_packet_type(),
dup = false :: boolean(),
qos = ?QOS_0 :: mqtt_qos(),
retain = false :: boolean()
}).
%%--------------------------------------------------------------------
%% MQTT Packets
@ -165,7 +167,7 @@
proto_ver = ?MQTT_PROTO_V4 :: mqtt_vsn(),
proto_name = <<"MQTT">> :: binary(),
will_retain = false :: boolean(),
will_qos = ?QOS_0 :: mqtt_qos(),
will_qos = ?QOS_1 :: mqtt_qos(),
will_flag = false :: boolean(),
clean_sess = false :: boolean(),
keep_alive = 60 :: non_neg_integer(),
@ -199,25 +201,25 @@
}).
-record(mqtt_packet_suback,
{ packet_id :: mqtt_packet_id(),
qos_table :: list(mqtt_qos() | 128)
{ packet_id :: mqtt_packet_id(),
qos_table :: list(mqtt_qos() | 128)
}).
-record(mqtt_packet_unsuback,
{ packet_id :: mqtt_packet_id() }).
{ packet_id :: mqtt_packet_id() }).
%%--------------------------------------------------------------------
%% MQTT Control Packet
%%--------------------------------------------------------------------
-record(mqtt_packet,
{ header :: #mqtt_packet_header{},
variable :: #mqtt_packet_connect{} | #mqtt_packet_connack{}
| #mqtt_packet_publish{} | #mqtt_packet_puback{}
| #mqtt_packet_subscribe{} | #mqtt_packet_suback{}
| #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{}
| mqtt_packet_id() | undefined,
payload :: binary() | undefined
{ header :: #mqtt_packet_header{},
variable :: #mqtt_packet_connect{} | #mqtt_packet_connack{}
| #mqtt_packet_publish{} | #mqtt_packet_puback{}
| #mqtt_packet_subscribe{} | #mqtt_packet_suback{}
| #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{}
| mqtt_packet_id() | undefined,
payload :: binary() | undefined
}).
-type(mqtt_packet() :: #mqtt_packet{}).

View File

@ -17,10 +17,10 @@
-type(trie_node_id() :: binary() | atom()).
-record(trie_node,
{ node_id :: trie_node_id(),
edge_count = 0 :: non_neg_integer(),
topic :: binary() | undefined,
flags :: [retained | static]
{ node_id :: trie_node_id(),
edge_count = 0 :: non_neg_integer(),
topic :: binary() | undefined,
flags :: [retained | static]
}).
-record(trie_edge,

View File

@ -132,14 +132,14 @@ end}.
%% @doc http://www.erlang.org/doc/man/kernel_app.html
{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [
{commented, 6000},
{commented, 6369},
{datatype, integer},
hidden
]}.
%% @see node.dist_listen_min
{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [
{commented, 6999},
{commented, 6369},
{datatype, integer},
hidden
]}.
@ -356,6 +356,7 @@ end}.
{default, off},
{datatype, flag}
]}.
%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.
%% 0 means no limit
{mapping, "mqtt.session.max_inflight", "emqttd.session", [
@ -571,6 +572,11 @@ end}.
hidden
]}.
{mapping, "mqtt.listener.tcp.tune_buffer", "emqttd.listeners", [
{default, off},
{datatype, flag}
]}.
{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [
{datatype, {enum, [true, false]}},
hidden
@ -684,8 +690,8 @@ end}.
LisOpts = fun(Prefix) ->
Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)},
{max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)},
{rate_limt, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}])
end,
{tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}])
end,
TcpOpts = fun(Prefix) ->
Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},

View File

@ -70,7 +70,7 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
Client :: mqtt_client(),
PubSub :: pubsub(),
Topic :: binary()).
check_acl(Client, PubSub, Topic) when ?PUBSUB(PubSub) ->
check_acl(Client, PubSub, Topic) when ?PS(PubSub) ->
case lookup_mods(acl) of
[] -> case emqttd:env(allow_anonymous, false) of
true -> allow;

View File

@ -87,14 +87,14 @@ handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId,
severity = Severity,
title = Title,
summary = Summary}}, Alarms)->
Timestamp = os:timestamp(),
TS = os:timestamp(),
Json = mochijson2:encode([{id, AlarmId},
{severity, Severity},
{title, iolist_to_binary(Title)},
{summary, iolist_to_binary(Summary)},
{ts, emqttd_time:now_secs(Timestamp)}]),
{ts, emqttd_time:now_secs(TS)}]),
emqttd:publish(alarm_msg(alert, AlarmId, Json)),
{ok, [Alarm#mqtt_alarm{timestamp = Timestamp} | Alarms]};
{ok, [Alarm#mqtt_alarm{timestamp = TS} | Alarms]};
handle_event({clear_alarm, AlarmId}, Alarms) ->
Json = mochijson2:encode([{id, AlarmId}, {ts, emqttd_time:now_secs()}]),

View File

@ -22,7 +22,7 @@
-export([passwd_hash/2]).
-type(hash_type() :: plain | md5 | sha | sha256).
-type(hash_type() :: plain | md5 | sha | sha256 | pbkdf2).
%%--------------------------------------------------------------------
%% Authentication behavihour
@ -51,7 +51,7 @@ behaviour_info(_Other) ->
-endif.
%% @doc Password Hash
-spec(passwd_hash(hash_type(), binary()) -> binary()).
-spec(passwd_hash(hash_type(), binary() | tuple()) -> binary()).
passwd_hash(plain, Password) ->
Password;
passwd_hash(md5, Password) ->
@ -59,7 +59,10 @@ passwd_hash(md5, Password) ->
passwd_hash(sha, Password) ->
hexstring(crypto:hash(sha, Password));
passwd_hash(sha256, Password) ->
hexstring(crypto:hash(sha256, Password)).
hexstring(crypto:hash(sha256, Password));
passwd_hash(pbkdf2,{Salt, Password, Macfun, Iterations, Dklen}) ->
{ok,Hexstring} = pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen),
pbkdf2:to_hex(Hexstring).
hexstring(<<X:128/big-unsigned-integer>>) ->
iolist_to_binary(io_lib:format("~32.16.0b", [X]));

View File

@ -113,7 +113,7 @@ stop_tick(TRef) ->
timer:cancel(TRef).
%%--------------------------------------------------------------------
%% gen_server callbacks
%% gen_server Callbacks
%%--------------------------------------------------------------------
init([]) ->

View File

@ -537,20 +537,21 @@ print({Topic, Node}) ->
?PRINT("~s -> ~s~n", [Topic, Node]);
print({ClientId, _ClientPid, _Persistent, SessInfo}) ->
Data = lists:append(SessInfo, emqttd_stats:get_session_stats(ClientId)),
InfoKeys = [clean_sess,
subscriptions,
max_inflight,
inflight_queue,
message_queue,
message_dropped,
awaiting_rel,
awaiting_ack,
awaiting_comp,
inflight_len,
mqueue_len,
mqueue_dropped,
awaiting_rel_len,
deliver_msg,
enqueue_msg,
created_at],
?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, "
"message_queue=~w, message_dropped=~w, "
"awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
"created_at=~w)~n",
[ClientId | [format(Key, get_value(Key, SessInfo)) || Key <- InfoKeys]]).
?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight=~w, "
"mqueue_len=~w, mqueue_dropped=~w, awaiting_rel=~w, "
"deliver_msg=~w, enqueue_msg=~w, created_at=~w)~n",
[ClientId | [format(Key, get_value(Key, Data)) || Key <- InfoKeys]]).
print(subscription, {Sub, Topic}) when is_pid(Sub) ->
?PRINT("~p -> ~s~n", [Sub, Topic]);

View File

@ -291,7 +291,7 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
%% Receive and parse tcp data
%% Receive and Parse TCP Data
received(<<>>, State) ->
{noreply, gc(State), hibernate};

View File

@ -28,7 +28,7 @@
%% @doc Serialise MQTT Packet
-spec(serialize(mqtt_packet()) -> iolist()).
serialize(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type},
serialize(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type},
variable = Variable,
payload = Payload}) ->
serialize_header(Header,

View File

@ -129,12 +129,12 @@
%% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel.
awaiting_rel :: map(),
%% Awaiting PUBREL timeout
await_rel_timeout = 20000 :: timeout(),
%% Max Packets that Awaiting PUBREL
max_awaiting_rel = 100 :: non_neg_integer(),
%% Awaiting PUBREL timeout
await_rel_timeout = 20000 :: timeout(),
%% Awaiting PUBREL timer
await_rel_timer :: reference(),
@ -580,7 +580,7 @@ code_change(_OldVsn, Session, _Extra) ->
{ok, Session}.
%%--------------------------------------------------------------------
%% Kick old client
%% Kickout old client
%%--------------------------------------------------------------------
kick(_ClientId, undefined, _Pid) ->
ignore;

View File

@ -107,7 +107,7 @@ dispatch(ClientId, Topic, Msg) ->
try ets:lookup_element(mqtt_local_session, ClientId, 2) of
Pid -> Pid ! {dispatch, Topic, Msg}
catch
error:badarg -> io:format("Session Not Found: ~p~n", [ClientId]), ok %%TODO: How??
error:badarg -> ok %%FIXME Later.
end.
call(SM, Req) ->

View File

@ -37,8 +37,8 @@
%% @doc Handle WebSocket Request.
handle_request(Req) ->
{ok, Env} = emqttd:env(protocol),
PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE),
{ok, ProtoEnv} = emqttd:env(protocol),
PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
Parser = emqttd_parser:initial_state(PacketSize),
%% Upgrade WebSocket.
{ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),

View File

@ -92,14 +92,15 @@ init([Env, WsPid, Req, ReplyChannel]) ->
{ok, Peername} = Req:get(peername),
Headers = mochiweb_headers:to_list(
mochiweb_request:get(headers, Req)),
Conn = Req:get(connection),
ProtoState = emqttd_protocol:init(Peername, send_fun(ReplyChannel),
[{ws_initial_headers, Headers} | Env]),
IdleTimeout = get_value(client_idle_timeout, Env, 30000),
EnableStats = get_value(client_enable_stats, Env, false),
ForceGcCount = emqttd_gc:conn_max_gc_count(),
{ok, #wsclient_state{ws_pid = WsPid,
{ok, #wsclient_state{connection = Conn,
ws_pid = WsPid,
peername = Peername,
connection = Req:get(connection),
proto_state = ProtoState,
enable_stats = EnableStats,
force_gc_count = ForceGcCount},

View File

@ -29,7 +29,7 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% @doc Start a WebSocket Client
%% @doc Start a WebSocket Connection.
-spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).
start_client(WsPid, Req, ReplyChannel) ->
supervisor:start_child(?MODULE, [WsPid, Req, ReplyChannel]).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -48,3 +48,4 @@ t_is_full(_) ->
t_is_empty(_) ->
Inflight = ((emqttd_inflight:new(1)):insert(k, v1)),
?assertNot(Inflight:is_empty()).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2016-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2016-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -260,7 +260,7 @@ serialize_connect(_) ->
serialize_connack(_) ->
ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}},
<<32,2,0,0>> = iolist_to_binary(serialize(ConnAck)).
?assertEqual(<<32,2,0,0>>, iolist_to_binary(serialize(ConnAck))).
serialize_publish(_) ->
serialize(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)),
@ -303,20 +303,20 @@ long_payload() ->
%%--------------------------------------------------------------------
packet_proto_name(_) ->
<<"MQIsdp">> = emqttd_packet:protocol_name(3),
<<"MQTT">> = emqttd_packet:protocol_name(4).
?assertEqual(<<"MQIsdp">>, emqttd_packet:protocol_name(3)),
?assertEqual(<<"MQTT">>, emqttd_packet:protocol_name(4)).
packet_type_name(_) ->
'CONNECT' = emqttd_packet:type_name(?CONNECT),
'UNSUBSCRIBE' = emqttd_packet:type_name(?UNSUBSCRIBE).
?assertEqual('CONNECT', emqttd_packet:type_name(?CONNECT)),
?assertEqual('UNSUBSCRIBE', emqttd_packet:type_name(?UNSUBSCRIBE)).
packet_connack_name(_) ->
'CONNACK_ACCEPT' = emqttd_packet:connack_name(?CONNACK_ACCEPT),
'CONNACK_PROTO_VER' = emqttd_packet:connack_name(?CONNACK_PROTO_VER),
'CONNACK_INVALID_ID' = emqttd_packet:connack_name(?CONNACK_INVALID_ID),
'CONNACK_SERVER' = emqttd_packet:connack_name(?CONNACK_SERVER),
'CONNACK_CREDENTIALS' = emqttd_packet:connack_name(?CONNACK_CREDENTIALS),
'CONNACK_AUTH' = emqttd_packet:connack_name(?CONNACK_AUTH).
?assertEqual('CONNACK_ACCEPT', emqttd_packet:connack_name(?CONNACK_ACCEPT)),
?assertEqual('CONNACK_PROTO_VER', emqttd_packet:connack_name(?CONNACK_PROTO_VER)),
?assertEqual('CONNACK_INVALID_ID', emqttd_packet:connack_name(?CONNACK_INVALID_ID)),
?assertEqual('CONNACK_SERVER', emqttd_packet:connack_name(?CONNACK_SERVER)),
?assertEqual('CONNACK_CREDENTIALS', emqttd_packet:connack_name(?CONNACK_CREDENTIALS)),
?assertEqual('CONNACK_AUTH', emqttd_packet:connack_name(?CONNACK_AUTH)).
packet_format(_) ->
io:format("~s", [emqttd_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]),
@ -336,26 +336,25 @@ packet_format(_) ->
message_make(_) ->
Msg = emqttd_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
0 = Msg#mqtt_message.qos,
?assertEqual(0, Msg#mqtt_message.qos),
Msg1 = emqttd_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>),
true = is_binary(Msg1#mqtt_message.id),
2 = Msg1#mqtt_message.qos.
?assert(is_binary(Msg1#mqtt_message.id)),
?assertEqual(2, Msg1#mqtt_message.qos).
message_from_packet(_) ->
Msg = emqttd_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)),
1 = Msg#mqtt_message.qos,
10 = Msg#mqtt_message.pktid,
<<"topic">> = Msg#mqtt_message.topic,
?assertEqual(1, Msg#mqtt_message.qos),
?assertEqual(10, Msg#mqtt_message.packet_id),
?assertEqual(<<"topic">>, Msg#mqtt_message.topic),
WillMsg = emqttd_message:from_packet(#mqtt_packet_connect{will_flag = true,
will_topic = <<"WillTopic">>,
will_msg = <<"WillMsg">>}),
<<"WillTopic">> = WillMsg#mqtt_message.topic,
<<"WillMsg">> = WillMsg#mqtt_message.payload,
?assertEqual(<<"WillTopic">>, WillMsg#mqtt_message.topic),
?assertEqual(<<"WillMsg">>, WillMsg#mqtt_message.payload),
Msg2 = emqttd_message:from_packet(<<"username">>, <<"clientid">>,
?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)),
{<<"clientid">>, <<"username">>} = Msg2#mqtt_message.from,
?assertEqual({<<"clientid">>, <<"username">>}, Msg2#mqtt_message.from),
io:format("~s", [emqttd_message:format(Msg2)]).
message_flag(_) ->
@ -363,13 +362,13 @@ message_flag(_) ->
Msg2 = emqttd_message:from_packet(<<"clientid">>, Pkt),
Msg3 = emqttd_message:set_flag(retain, Msg2),
Msg4 = emqttd_message:set_flag(dup, Msg3),
true = Msg4#mqtt_message.dup,
true = Msg4#mqtt_message.retain,
?assert(Msg4#mqtt_message.dup),
?assert(Msg4#mqtt_message.retain),
Msg5 = emqttd_message:set_flag(Msg4),
Msg6 = emqttd_message:unset_flag(dup, Msg5),
Msg7 = emqttd_message:unset_flag(retain, Msg6),
false = Msg7#mqtt_message.dup,
false = Msg7#mqtt_message.retain,
?assertNot(Msg7#mqtt_message.dup),
?assertNot(Msg7#mqtt_message.retain),
emqttd_message:unset_flag(Msg7),
emqttd_message:to_packet(Msg7).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2016-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.