Rewrite the emqx_packet module and improve channel pipeline (#2903)

Add use_username_as_clientid/1 function and Improve function 'pipeline/3'
This commit is contained in:
Feng Lee 2019-09-16 14:17:36 +08:00 committed by tigercl
parent 681ae511a8
commit 4764a7707c
9 changed files with 439 additions and 333 deletions

View File

@ -50,8 +50,7 @@
]). ]).
-import(emqx_misc, -import(emqx_misc,
[ run_fold/2 [ run_fold/3
, run_fold/3
, pipeline/3 , pipeline/3
, maybe_apply/2 , maybe_apply/2
]). ]).
@ -106,7 +105,7 @@
%% Init the channel %% Init the channel
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(init(emqx_types:conn(), proplists:proplist()) -> channel()). -spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()).
init(ConnInfo, Options) -> init(ConnInfo, Options) ->
Zone = proplists:get_value(zone, Options), Zone = proplists:get_value(zone, Options),
Peercert = maps:get(peercert, ConnInfo, undefined), Peercert = maps:get(peercert, ConnInfo, undefined),
@ -216,8 +215,7 @@ handle_in(?CONNECT_PACKET(_), Channel = #channel{connected = true}) ->
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel);
handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
case pipeline([fun validate_packet/2, case pipeline([fun check_connpkt/2,
fun check_connect/2,
fun init_protocol/2, fun init_protocol/2,
fun enrich_client/2, fun enrich_client/2,
fun set_logger_meta/2, fun set_logger_meta/2,
@ -232,7 +230,7 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId),
Channel = #channel{protocol = Protocol}) -> Channel = #channel{protocol = Protocol}) ->
case pipeline([fun validate_packet/2, case pipeline([fun emqx_packet:check/1,
fun process_alias/2, fun process_alias/2,
fun check_publish/2], Packet, Channel) of fun check_publish/2], Packet, Channel) of
{ok, NPacket, NChannel} -> {ok, NPacket, NChannel} ->
@ -302,21 +300,26 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
{ok, Channel} {ok, Channel}
end; end;
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), Channel) -> handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
case validate_packet(Packet, Channel) of Channel = #channel{client = Client}) ->
ok -> case emqx_packet:check(Packet) of
TopicFilters = preprocess_subscribe(Properties, RawTopicFilters, Channel), ok -> TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
{ReasonCodes, NChannel} = process_subscribe(TopicFilters, Channel), [Client, Properties],
parse_topic_filters(TopicFilters)),
TopicFilters2 = enrich_subid(Properties, TopicFilters1),
{ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel),
handle_out({suback, PacketId, ReasonCodes}, NChannel); handle_out({suback, PacketId, ReasonCodes}, NChannel);
{error, ReasonCode} -> {error, ReasonCode} ->
handle_out({disconnect, ReasonCode}, Channel) handle_out({disconnect, ReasonCode}, Channel)
end; end;
handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), Channel) -> handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
case validate_packet(Packet, Channel) of Channel = #channel{client = Client}) ->
ok -> case emqx_packet:check(Packet) of
TopicFilters = preprocess_unsubscribe(Properties, RawTopicFilters, Channel), ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
{ReasonCodes, NChannel} = process_unsubscribe(TopicFilters, Channel), [Client, Properties],
parse_topic_filters(TopicFilters)),
{ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
handle_out({unsuback, PacketId, ReasonCodes}, NChannel); handle_out({unsuback, PacketId, ReasonCodes}, NChannel);
{error, ReasonCode} -> {error, ReasonCode} ->
handle_out({disconnect, ReasonCode}, Channel) handle_out({disconnect, ReasonCode}, Channel)
@ -421,24 +424,18 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_2},
handle_out({pubrec, PacketId, RC}, Channel) handle_out({pubrec, PacketId, RC}, Channel)
end. end.
publish_to_msg(Packet, #channel{client = Client = #{mountpoint := MountPoint}}) -> publish_to_msg(Packet, #channel{client = Client = #{mountpoint := MountPoint},
protocol = Protocol}) ->
Msg = emqx_packet:to_message(Client, Packet), Msg = emqx_packet:to_message(Client, Packet),
Msg1 = emqx_message:set_flag(dup, false, Msg), Msg1 = emqx_message:set_flag(dup, false, Msg),
emqx_mountpoint:mount(MountPoint, Msg1). ProtoVer = emqx_protocol:info(proto_ver, Protocol),
Msg2 = emqx_message:set_header(proto_ver, ProtoVer, Msg1),
emqx_mountpoint:mount(MountPoint, Msg2).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process Subscribe %% Process Subscribe
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-compile({inline, [preprocess_subscribe/3]}).
preprocess_subscribe(Properties, RawTopicFilters, #channel{client = Client}) ->
RunHook = fun(TopicFilters) ->
emqx_hooks:run_fold('client.subscribe',
[Client, Properties], TopicFilters)
end,
Enrich = fun(TopicFilters) -> enrich_subid(Properties, TopicFilters) end,
run_fold([fun parse_topic_filters/1, RunHook, Enrich], RawTopicFilters).
process_subscribe(TopicFilters, Channel) -> process_subscribe(TopicFilters, Channel) ->
process_subscribe(TopicFilters, [], Channel). process_subscribe(TopicFilters, [], Channel).
@ -468,14 +465,6 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
%% Process Unsubscribe %% Process Unsubscribe
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-compile({inline, [preprocess_unsubscribe/3]}).
preprocess_unsubscribe(Properties, RawTopicFilter, #channel{client = Client}) ->
RunHook = fun(TopicFilters) ->
emqx_hooks:run_fold('client.unsubscribe',
[Client, Properties], TopicFilters)
end,
run_fold([fun parse_topic_filters/1, RunHook], RawTopicFilter).
-compile({inline, [process_unsubscribe/2]}). -compile({inline, [process_unsubscribe/2]}).
process_unsubscribe(TopicFilters, Channel) -> process_unsubscribe(TopicFilters, Channel) ->
process_unsubscribe(TopicFilters, [], Channel). process_unsubscribe(TopicFilters, [], Channel).
@ -578,7 +567,7 @@ handle_out({publish, PacketId, Msg}, Channel =
Msg1 = emqx_message:update_expiry(Msg), Msg1 = emqx_message:update_expiry(Msg),
Msg2 = emqx_hooks:run_fold('message.delivered', [Client], Msg1), Msg2 = emqx_hooks:run_fold('message.delivered', [Client], Msg1),
Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2), Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2),
{ok, emqx_packet:from_message(PacketId, Msg3), Channel}; {ok, emqx_message:to_packet(PacketId, Msg3), Channel};
handle_out({puback, PacketId, ReasonCode}, Channel) -> handle_out({puback, PacketId, ReasonCode}, Channel) ->
{ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel}; {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel};
@ -671,16 +660,18 @@ handle_cast(Msg, Channel) ->
-spec(handle_info(Info :: term(), channel()) -spec(handle_info(Info :: term(), channel())
-> {ok, channel()} | {stop, Reason :: term(), channel()}). -> {ok, channel()} | {stop, Reason :: term(), channel()}).
handle_info({subscribe, RawTopicFilters}, Channel) -> handle_info({subscribe, TopicFilters}, Channel = #channel{client = Client}) ->
TopicFilters = preprocess_subscribe(#{'Internal' => true}, TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
RawTopicFilters, Channel), [Client, #{'Internal' => true}],
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters, Channel), parse_topic_filters(TopicFilters)),
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
{ok, NChannel}; {ok, NChannel};
handle_info({unsubscribe, RawTopicFilters}, Channel) -> handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) ->
TopicFilters = preprocess_unsubscribe(#{'Internal' => true}, TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
RawTopicFilters, Channel), [Client, #{'Internal' => true}],
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters, Channel), parse_topic_filters(TopicFilters)),
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
{ok, NChannel}; {ok, NChannel};
handle_info(disconnected, Channel = #channel{connected = undefined}) -> handle_info(disconnected, Channel = #channel{connected = undefined}) ->
@ -839,119 +830,45 @@ publish_will_msg(undefined) ->
publish_will_msg(Msg) -> publish_will_msg(Msg) ->
emqx_broker:publish(Msg). emqx_broker:publish(Msg).
%% @doc Validate incoming packet. %% @doc Check connect packet.
-spec(validate_packet(emqx_types:packet(), channel()) check_connpkt(ConnPkt, #channel{client = #{zone := Zone}}) ->
-> ok | {error, emqx_types:reason_code()}). emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)).
validate_packet(Packet, _Channel) ->
try emqx_packet:validate(Packet) of
true -> ok
catch
error:protocol_error ->
{error, ?RC_PROTOCOL_ERROR};
error:subscription_identifier_invalid ->
{error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED};
error:topic_alias_invalid ->
{error, ?RC_TOPIC_ALIAS_INVALID};
error:topic_filters_invalid ->
{error, ?RC_TOPIC_FILTER_INVALID};
error:topic_name_invalid ->
{error, ?RC_TOPIC_NAME_INVALID};
error:_Reason ->
{error, ?RC_MALFORMED_PACKET}
end.
%%--------------------------------------------------------------------
%% Check connect packet
%%--------------------------------------------------------------------
check_connect(ConnPkt, Channel) ->
pipeline([fun check_proto_ver/2,
fun check_client_id/2,
fun check_will_topic/2,
fun check_will_retain/2], ConnPkt, Channel).
check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
proto_name = Name}, _Channel) ->
case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of
true -> ok;
false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION}
end.
%% MQTT3.1 does not allow null clientId
check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
client_id = <<>>
}, _Channel) ->
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
%% Issue#599: Null clientId and clean_start = false
check_client_id(#mqtt_packet_connect{client_id = <<>>,
clean_start = false}, _Channel) ->
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
check_client_id(#mqtt_packet_connect{client_id = <<>>,
clean_start = true}, _Channel) ->
ok;
check_client_id(#mqtt_packet_connect{client_id = ClientId},
#channel{client = #{zone := Zone}}) ->
Len = byte_size(ClientId),
MaxLen = emqx_zone:get_env(Zone, max_clientid_len),
case (1 =< Len) andalso (Len =< MaxLen) of
true -> ok;
false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
end.
check_will_topic(#mqtt_packet_connect{will_flag = false}, _Channel) ->
ok;
check_will_topic(#mqtt_packet_connect{will_topic = WillTopic}, _Channel) ->
try emqx_topic:validate(WillTopic) of
true -> ok
catch error:_Error ->
{error, ?RC_TOPIC_NAME_INVALID}
end.
check_will_retain(#mqtt_packet_connect{will_retain = false}, _Channel) ->
ok;
check_will_retain(#mqtt_packet_connect{will_retain = true},
#channel{client = #{zone := Zone}}) ->
case emqx_zone:get_env(Zone, mqtt_retain_available, true) of
true -> ok;
false -> {error, ?RC_RETAIN_NOT_SUPPORTED}
end.
%% @doc Init protocol record.
init_protocol(ConnPkt, Channel = #channel{client = #{zone := Zone}}) -> init_protocol(ConnPkt, Channel = #channel{client = #{zone := Zone}}) ->
{ok, Channel#channel{protocol = emqx_protocol:init(ConnPkt, Zone)}}. {ok, Channel#channel{protocol = emqx_protocol:init(ConnPkt, Zone)}}.
%%-------------------------------------------------------------------- %% @doc Enrich client
%% Enrich client enrich_client(ConnPkt, Channel = #channel{client = Client}) ->
%%--------------------------------------------------------------------
enrich_client(ConnPkt = #mqtt_packet_connect{is_bridge = IsBridge},
Channel = #channel{client = Client}) ->
{ok, NConnPkt, NClient} = pipeline([fun set_username/2, {ok, NConnPkt, NClient} = pipeline([fun set_username/2,
fun set_bridge_mode/2,
fun maybe_username_as_clientid/2, fun maybe_username_as_clientid/2,
fun maybe_assign_clientid/2, fun maybe_assign_clientid/2,
fun fix_mountpoint/2 fun fix_mountpoint/2
], ConnPkt, Client), ], ConnPkt, Client),
{ok, NConnPkt, Channel#channel{client = NClient#{is_bridge => IsBridge}}}. {ok, NConnPkt, Channel#channel{client = NClient}}.
%% Username may be not undefined if peer_cert_as_username
set_username(#mqtt_packet_connect{username = Username}, Client = #{username := undefined}) -> set_username(#mqtt_packet_connect{username = Username}, Client = #{username := undefined}) ->
{ok, Client#{username => Username}}; {ok, Client#{username => Username}};
set_username(_ConnPkt, Client) -> set_username(_ConnPkt, Client) ->
{ok, Client}. {ok, Client}.
set_bridge_mode(#mqtt_packet_connect{is_bridge = true}, Client) ->
{ok, Client#{is_bridge => true}};
set_bridge_mode(_ConnPkt, _Client) -> ok.
maybe_username_as_clientid(_ConnPkt, Client = #{username := undefined}) -> maybe_username_as_clientid(_ConnPkt, Client = #{username := undefined}) ->
{ok, Client}; {ok, Client};
maybe_username_as_clientid(_ConnPkt, Client = #{zone := Zone, username := Username}) -> maybe_username_as_clientid(_ConnPkt, Client = #{zone := Zone, username := Username}) ->
case emqx_zone:get_env(Zone, use_username_as_clientid, false) of case emqx_zone:use_username_as_clientid(Zone) of
true -> {ok, Client#{client_id => Username}}; true -> {ok, Client#{client_id => Username}};
false -> ok false -> ok
end. end.
maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>}, Client) -> maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>}, Client) ->
RandClientId = emqx_guid:to_base62(emqx_guid:gen()), %% Generate a rand clientId
{ok, Client#{client_id => RandClientId}}; RandId = emqx_guid:to_base62(emqx_guid:gen()),
{ok, Client#{client_id => RandId}};
maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, Client) -> maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, Client) ->
{ok, Client#{client_id => ClientId}}. {ok, Client#{client_id => ClientId}}.

View File

@ -104,7 +104,7 @@ info(CPid) when is_pid(CPid) ->
info(Conn = #connection{chan_state = ChanState}) -> info(Conn = #connection{chan_state = ChanState}) ->
ConnInfo = info(?INFO_KEYS, Conn), ConnInfo = info(?INFO_KEYS, Conn),
ChanInfo = emqx_channel:info(ChanState), ChanInfo = emqx_channel:info(ChanState),
maps:merge(ChanInfo, #{connection => maps:from_list(ConnInfo)}). maps:merge(ChanInfo, #{conninfo => maps:from_list(ConnInfo)}).
info(Keys, Conn) when is_list(Keys) -> info(Keys, Conn) when is_list(Keys) ->
[{Key, info(Key, Conn)} || Key <- Keys]; [{Key, info(Key, Conn)} || Key <- Keys];

View File

@ -57,7 +57,8 @@
, update_expiry/1 , update_expiry/1
]). ]).
-export([ to_map/1 -export([ to_packet/2
, to_map/1
, to_list/1 , to_list/1
]). ]).
@ -188,6 +189,34 @@ update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval},
end; end;
update_expiry(Msg) -> Msg. update_expiry(Msg) -> Msg.
%% @doc Message to PUBLISH Packet.
-spec(to_packet(emqx_types:packet_id(), emqx_types:message())
-> emqx_types:packet()).
to_packet(PacketId, #message{qos = QoS, flags = Flags, headers = Headers,
topic = Topic, payload = Payload}) ->
Flags1 = if Flags =:= undefined -> #{};
true -> Flags
end,
Dup = maps:get(dup, Flags1, false),
Retain = maps:get(retain, Flags1, false),
Publish = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId,
properties = publish_props(Headers)},
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
dup = Dup,
qos = QoS,
retain = Retain},
variable = Publish, payload = Payload}.
publish_props(Headers) ->
maps:with(['Payload-Format-Indicator',
'Response-Topic',
'Correlation-Data',
'User-Property',
'Subscription-Identifier',
'Content-Type',
'Message-Expiry-Interval'], Headers).
%% @doc Message to map %% @doc Message to map
-spec(to_map(emqx_types:message()) -> map()). -spec(to_map(emqx_types:message()) -> map()).
to_map(#message{ to_map(#message{
@ -228,3 +257,4 @@ format(flags, Flags) ->
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
format(headers, Headers) -> format(headers, Headers) ->
io_lib:format("~p", [Headers]). io_lib:format("~p", [Headers]).

View File

@ -20,7 +20,6 @@
-export([ merge_opts/2 -export([ merge_opts/2
, maybe_apply/2 , maybe_apply/2
, run_fold/2
, run_fold/3 , run_fold/3
, pipeline/3 , pipeline/3
, start_timer/2 , start_timer/2
@ -60,11 +59,6 @@ maybe_apply(_Fun, undefined) ->
maybe_apply(Fun, Arg) when is_function(Fun) -> maybe_apply(Fun, Arg) when is_function(Fun) ->
erlang:apply(Fun, [Arg]). erlang:apply(Fun, [Arg]).
run_fold([], Acc) ->
Acc;
run_fold([Fun|More], Acc) ->
run_fold(More, Fun(Acc)).
%% @doc RunFold %% @doc RunFold
run_fold([], Acc, _State) -> run_fold([], Acc, _State) ->
Acc; Acc;
@ -76,18 +70,25 @@ pipeline([], Input, State) ->
{ok, Input, State}; {ok, Input, State};
pipeline([Fun|More], Input, State) -> pipeline([Fun|More], Input, State) ->
case Fun(Input, State) of case apply_fun(Fun, Input, State) of
ok -> pipeline(More, Input, State); ok -> pipeline(More, Input, State);
{ok, NState} -> {ok, NState} ->
pipeline(More, Input, NState); pipeline(More, Input, NState);
{ok, NInput, NState} -> {ok, Output, NState} ->
pipeline(More, NInput, NState); pipeline(More, Output, NState);
{error, Reason} -> {error, Reason} ->
{error, Reason, State}; {error, Reason, State};
{error, Reason, NState} -> {error, Reason, NState} ->
{error, Reason, NState} {error, Reason, NState}
end. end.
-compile({inline, [apply_fun/3]}).
apply_fun(Fun, Input, State) ->
case erlang:fun_info(Fun, arity) of
{arity, 1} -> Fun(Input);
{arity, 2} -> Fun(Input, State)
end.
-spec(start_timer(integer(), term()) -> reference()). -spec(start_timer(integer(), term()) -> reference()).
start_timer(Interval, Msg) -> start_timer(Interval, Msg) ->
start_timer(Interval, self(), Msg). start_timer(Interval, self(), Msg).

View File

@ -19,143 +19,224 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
%% Header APIs
-export([ type/1 -export([ type/1
, type_name/1
, dup/1
, qos/1 , qos/1
, retain/1
]). ]).
-export([ proto_name/1 -export([ proto_name/1
, type_name/1 , proto_ver/1
, validate/1 ]).
, format/1
, to_message/2 %% Check API
, from_message/2 -export([ check/1
, check/2
]).
-export([ to_message/2
, will_msg/1 , will_msg/1
]). ]).
-compile(inline). -export([format/1]).
-type(connect() :: #mqtt_packet_connect{}).
-type(publish() :: #mqtt_packet_publish{}).
-type(subscribe() :: #mqtt_packet_subscribe{}).
-type(unsubscribe() :: #mqtt_packet_unsubscribe{}).
%%--------------------------------------------------------------------
%% MQTT Packet Type and Flags.
%%--------------------------------------------------------------------
%% @doc MQTT packet type.
-spec(type(emqx_types:packet()) -> emqx_types:packet_type()).
type(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) -> type(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) ->
Type. Type.
%% @doc Name of MQTT packet type.
-spec(type_name(emqx_types:packet()) -> atom()).
type_name(Packet) when is_record(Packet, mqtt_packet) ->
lists:nth(type(Packet), ?TYPE_NAMES).
%% @doc Dup flag of MQTT packet.
-spec(dup(emqx_types:packet()) -> boolean()).
dup(#mqtt_packet{header = #mqtt_packet_header{dup = Dup}}) ->
Dup.
%% @doc QoS of MQTT packet type.
-spec(qos(emqx_types:packet()) -> emqx_types:qos()).
qos(#mqtt_packet{header = #mqtt_packet_header{qos = QoS}}) -> qos(#mqtt_packet{header = #mqtt_packet_header{qos = QoS}}) ->
QoS. QoS.
%% @doc Protocol name of the version. %% @doc Retain flag of MQTT packet.
-spec(proto_name(emqx_types:version()) -> binary()). -spec(retain(emqx_types:packet()) -> boolean()).
proto_name(?MQTT_PROTO_V3) -> retain(#mqtt_packet{header = #mqtt_packet_header{retain = Retain}}) ->
<<"MQIsdp">>; Retain.
proto_name(?MQTT_PROTO_V4) ->
<<"MQTT">>;
proto_name(?MQTT_PROTO_V5) ->
<<"MQTT">>.
%% @doc Name of MQTT packet type.
-spec(type_name(emqx_types:packet_type()) -> atom()).
type_name(Type) when ?RESERVED < Type, Type =< ?AUTH ->
lists:nth(Type, ?TYPE_NAMES).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Validate MQTT Packet %% Protocol name and version of MQTT CONNECT Packet.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(validate(emqx_types:packet()) -> true). %% @doc Protocol name of the CONNECT Packet.
validate(?SUBSCRIBE_PACKET(_PacketId, _Properties, [])) -> -spec(proto_name(emqx_types:packet()|connect()) -> binary()).
error(topic_filters_invalid); proto_name(?CONNECT_PACKET(ConnPkt)) ->
validate(?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters)) -> proto_name(ConnPkt);
validate_packet_id(PacketId) proto_name(#mqtt_packet_connect{proto_name = Name}) ->
andalso validate_properties(?SUBSCRIBE, Properties) Name.
andalso ok == lists:foreach(fun validate_subscription/1, TopicFilters);
validate(?UNSUBSCRIBE_PACKET(_PacketId, [])) -> %% @doc Protocol version of the CONNECT Packet.
error(topic_filters_invalid); -spec(proto_ver(emqx_types:packet()|connect()) -> emqx_types:version()).
validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) -> proto_ver(?CONNACK_PACKET(ConnPkt)) ->
validate_packet_id(PacketId) proto_ver(ConnPkt);
andalso ok == lists:foreach(fun emqx_topic:validate/1, TopicFilters); proto_ver(#mqtt_packet_connect{proto_ver = Ver}) ->
Ver.
validate(?PUBLISH_PACKET(_QoS, <<>>, _, #{'Topic-Alias':= _I}, _)) -> %%--------------------------------------------------------------------
true; %% Check MQTT Packet
validate(?PUBLISH_PACKET(_QoS, <<>>, _, _, _)) -> %%--------------------------------------------------------------------
error(protocol_error);
validate(?PUBLISH_PACKET(QoS, Topic, _, Properties, _)) ->
((not (QoS =:= 3)) orelse error(qos_invalid))
andalso ((not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid))
andalso validate_properties(?PUBLISH, Properties);
validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = Properties})) -> %% @doc Check PubSub Packet.
validate_properties(?CONNECT, Properties); -spec(check(emqx_types:packet()|publish()|subscribe()|unsubscribe())
-> ok | {error, emqx_types:reason_code()}).
check(#mqtt_packet{variable = PubPkt}) when is_record(PubPkt, mqtt_packet_publish) ->
check(PubPkt);
validate(_Packet) -> check(#mqtt_packet{variable = SubPkt}) when is_record(SubPkt, mqtt_packet_subscribe) ->
true. check(SubPkt);
validate_packet_id(0) -> check(#mqtt_packet{variable = UnsubPkt}) when is_record(UnsubPkt, mqtt_packet_unsubscribe) ->
error(packet_id_invalid); check(UnsubPkt);
validate_packet_id(_) ->
true.
validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I}) check(#mqtt_packet_publish{topic_name = TopicName, properties = Props}) ->
when I =< 0; I >= 16#FFFFFFF -> try emqx_topic:validate(name, TopicName) of
error(subscription_identifier_invalid); true -> check_pub_props(Props)
validate_properties(?PUBLISH, #{'Topic-Alias':= 0}) -> catch
error(topic_alias_invalid); error:_Error ->
validate_properties(?PUBLISH, #{'Subscription-Identifier' := _I}) -> {error, ?RC_TOPIC_NAME_INVALID}
error(protocol_error);
validate_properties(?PUBLISH, #{'Response-Topic' := ResponseTopic}) ->
case emqx_topic:wildcard(ResponseTopic) of
true ->
error(protocol_error);
false ->
true
end; end;
validate_properties(?CONNECT, #{'Receive-Maximum' := 0}) ->
error(protocol_error); check(#mqtt_packet_subscribe{properties = #{'Subscription-Identifier' := I}})
validate_properties(?CONNECT, #{'Request-Response-Information' := ReqRespInfo}) when I =< 0; I >= 16#FFFFFFF ->
{error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED};
check(#mqtt_packet_subscribe{topic_filters = []}) ->
{error, ?RC_TOPIC_FILTER_INVALID};
check(#mqtt_packet_subscribe{topic_filters = TopicFilters}) ->
try validate_topic_filters(TopicFilters)
catch
error:_Error ->
{error, ?RC_TOPIC_FILTER_INVALID}
end;
check(#mqtt_packet_unsubscribe{topic_filters = []}) ->
{error, ?RC_TOPIC_FILTER_INVALID};
check(#mqtt_packet_unsubscribe{topic_filters = TopicFilters}) ->
try validate_topic_filters(TopicFilters)
catch
error:_Error ->
{error, ?RC_TOPIC_FILTER_INVALID}
end.
check_pub_props(#{'Topic-Alias' := 0}) ->
{error, ?RC_TOPIC_ALIAS_INVALID};
check_pub_props(#{'Subscription-Identifier' := 0}) ->
{error, ?RC_PROTOCOL_ERROR};
check_pub_props(#{'Response-Topic' := ResponseTopic}) ->
try emqx_topic:validate(name, ResponseTopic) of
true -> ok
catch
error:_Error ->
{error, ?RC_PROTOCOL_ERROR}
end;
check_pub_props(_Props) -> ok.
%% @doc Check CONNECT Packet.
-spec(check(emqx_types:packet()|connect(), Opts :: map())
-> ok | {error, emqx_types:reason_code()}).
check(?CONNECT_PACKET(ConnPkt), Opts) ->
check(ConnPkt, Opts);
check(ConnPkt, Opts) when is_record(ConnPkt, mqtt_packet_connect) ->
run_checks([fun check_proto_ver/2,
fun check_client_id/2,
fun check_conn_props/2,
fun check_will_msg/2], ConnPkt, Opts).
check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
proto_name = Name}, _Opts) ->
case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of
true -> ok;
false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION}
end.
%% MQTT3.1 does not allow null clientId
check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
client_id = <<>>}, _Opts) ->
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
%% Issue#599: Null clientId and clean_start = false
check_client_id(#mqtt_packet_connect{client_id = <<>>,
clean_start = false}, _Opts) ->
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
check_client_id(#mqtt_packet_connect{client_id = <<>>,
clean_start = true}, _Opts) ->
ok;
check_client_id(#mqtt_packet_connect{client_id = ClientId},
_Opts = #{max_clientid_len := MaxLen}) ->
case (1 =< (Len = byte_size(ClientId))) andalso (Len =< MaxLen) of
true -> ok;
false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
end.
check_conn_props(#mqtt_packet_connect{properties = undefined}, _Opts) ->
ok;
check_conn_props(#mqtt_packet_connect{properties = #{'Receive-Maximum' := 0}}, _Opts) ->
{error, ?RC_PROTOCOL_ERROR};
check_conn_props(#mqtt_packet_connect{properties = #{'Request-Response-Information' := ReqRespInfo}}, _Opts)
when ReqRespInfo =/= 0, ReqRespInfo =/= 1 -> when ReqRespInfo =/= 0, ReqRespInfo =/= 1 ->
error(protocol_error); {error, ?RC_PROTOCOL_ERROR};
validate_properties(?CONNECT, #{'Request-Problem-Information' := ReqProInfo}) check_conn_props(#mqtt_packet_connect{properties = #{'Request-Problem-Information' := ReqProInfo}}, _Opts)
when ReqProInfo =/= 0, ReqProInfo =/= 1 -> when ReqProInfo =/= 0, ReqProInfo =/= 1 ->
error(protocol_error); {error, ?RC_PROTOCOL_ERROR};
validate_properties(_, _) -> check_conn_props(_ConnPkt, _Opts) -> ok.
true.
validate_subscription({Topic, #{qos := QoS}}) -> check_will_msg(#mqtt_packet_connect{will_flag = false}, _Caps) ->
emqx_topic:validate(filter, Topic) andalso validate_qos(QoS). ok;
check_will_msg(#mqtt_packet_connect{will_retain = true},
_Opts = #{mqtt_retain_available := false}) ->
{error, ?RC_RETAIN_NOT_SUPPORTED};
check_will_msg(#mqtt_packet_connect{will_topic = WillTopic}, _Opts) ->
try emqx_topic:validate(name, WillTopic) of
true -> ok
catch error:_Error ->
{error, ?RC_TOPIC_NAME_INVALID}
end.
validate_qos(QoS) when ?QOS_0 =< QoS, QoS =< ?QOS_2 -> run_checks([], _Packet, _Options) ->
true; ok;
validate_qos(_) -> error(bad_qos). run_checks([Check|More], Packet, Options) ->
case Check(Packet, Options) of
ok -> run_checks(More, Packet, Options);
{error, Reason} -> {error, Reason}
end.
%% @doc From message to packet %% @doc Validate MQTT Packet
-spec(from_message(emqx_types:packet_id(), emqx_types:message()) %% @private
-> emqx_types:packet()). validate_topic_filters(TopicFilters) ->
from_message(PacketId, #message{qos = QoS, flags = Flags, headers = Headers, lists:foreach(
topic = Topic, payload = Payload}) -> fun({TopicFilter, _SubOpts}) ->
Flags1 = if Flags =:= undefined -> emqx_topic:validate(TopicFilter);
#{}; (TopicFilter) ->
true -> Flags emqx_topic:validate(TopicFilter)
end, end, TopicFilters).
Dup = maps:get(dup, Flags1, false),
Retain = maps:get(retain, Flags1, false),
Publish = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId,
properties = publish_props(Headers)},
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
dup = Dup,
qos = QoS,
retain = Retain},
variable = Publish, payload = Payload}.
publish_props(Headers) -> %% @doc Publish Packet to Message.
maps:with(['Payload-Format-Indicator', -spec(to_message(emqx_types:client(), emqx_ypes:packet()) -> emqx_types:message()).
'Response-Topic',
'Correlation-Data',
'User-Property',
'Subscription-Identifier',
'Content-Type',
'Message-Expiry-Interval'], Headers).
%% @doc Message from Packet
-spec(to_message(emqx_types:client(), emqx_ypes:packet())
-> emqx_types:message()).
to_message(#{client_id := ClientId, username := Username, peername := Peername}, to_message(#{client_id := ClientId, username := Username, peername := Peername},
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
retain = Retain, retain = Retain,
@ -201,7 +282,8 @@ format_header(#mqtt_packet_header{type = Type,
S == undefined -> <<>>; S == undefined -> <<>>;
true -> [", ", S] true -> [", ", S]
end, end,
io_lib:format("~s(Q~p, R~p, D~p~s)", [type_name(Type), QoS, i(Retain), i(Dup), S1]). io_lib:format("~s(Q~p, R~p, D~p~s)",
[lists:nth(Type, ?TYPE_NAMES), QoS, i(Retain), i(Dup), S1]).
format_variable(undefined, _) -> format_variable(undefined, _) ->
undefined; undefined;

View File

@ -97,7 +97,7 @@ attrs(WsPid) when is_pid(WsPid) ->
attrs(WsConn = #ws_connection{chan_state = ChanState}) -> attrs(WsConn = #ws_connection{chan_state = ChanState}) ->
ConnAttrs = info(?ATTR_KEYS, WsConn), ConnAttrs = info(?ATTR_KEYS, WsConn),
ChanAttrs = emqx_channel:attrs(ChanState), ChanAttrs = emqx_channel:attrs(ChanState),
maps:merge(ChanAttrs, #{connection => maps:from_list(ConnAttrs)}). maps:merge(ChanAttrs, #{conninfo => maps:from_list(ConnAttrs)}).
-spec(stats(pid()|ws_connection()) -> emqx_types:stats()). -spec(stats(pid()|ws_connection()) -> emqx_types:stats()).
stats(WsPid) when is_pid(WsPid) -> stats(WsPid) when is_pid(WsPid) ->

View File

@ -27,7 +27,8 @@
%% APIs %% APIs
-export([start_link/0, stop/0]). -export([start_link/0, stop/0]).
-export([ enable_acl/1 -export([ use_username_as_clientid/1
, enable_acl/1
, enable_banned/1 , enable_banned/1
, enable_flapping_detect/1 , enable_flapping_detect/1
]). ]).
@ -67,6 +68,10 @@
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(use_username_as_clientid(zone()) -> boolean()).
use_username_as_clientid(Zone) ->
get_env(Zone, use_username_as_clientid, false).
-spec(enable_acl(zone()) -> boolean()). -spec(enable_acl(zone()) -> boolean()).
enable_acl(Zone) -> enable_acl(Zone) ->
get_env(Zone, enable_acl, true). get_env(Zone, enable_acl, true).

View File

@ -27,6 +27,7 @@
, t_header/1 , t_header/1
, t_format/1 , t_format/1
, t_expired/1 , t_expired/1
, t_to_packet/1
, t_to_map/1 , t_to_map/1
]). ]).
@ -91,6 +92,18 @@ t_expired(_) ->
Msg2 = emqx_message:update_expiry(Msg1), Msg2 = emqx_message:update_expiry(Msg1),
?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)).
t_to_packet(_) ->
Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = ?QOS_0,
retain = false,
dup = false},
variable = #mqtt_packet_publish{topic_name = <<"topic">>,
packet_id = 10,
properties = #{}},
payload = <<"payload">>},
Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>),
?assertEqual(Pkt, emqx_message:to_packet(10, Msg)).
t_to_map(_) -> t_to_map(_) ->
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>), Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>),
List = [{id, emqx_message:id(Msg)}, List = [{id, emqx_message:id(Msg)},

View File

@ -26,63 +26,127 @@
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
t_proto_name(_) -> t_type(_) ->
?assertEqual(<<"MQIsdp">>, emqx_packet:proto_name(3)), ?assertEqual(?CONNECT, emqx_packet:type(?CONNECT_PACKET(#mqtt_packet_connect{}))),
?assertEqual(<<"MQTT">>, emqx_packet:proto_name(4)), ?assertEqual(?CONNACK, emqx_packet:type(?CONNACK_PACKET(?RC_SUCCESS))),
?assertEqual(<<"MQTT">>, emqx_packet:proto_name(5)). ?assertEqual(?PUBLISH, emqx_packet:type(?PUBLISH_PACKET(?QOS_1))),
?assertEqual(?PUBACK, emqx_packet:type(?PUBACK_PACKET(1))),
?assertEqual(?PUBREC, emqx_packet:type(?PUBREC_PACKET(1))),
?assertEqual(?PUBREL, emqx_packet:type(?PUBREL_PACKET(1))),
?assertEqual(?PUBCOMP, emqx_packet:type(?PUBCOMP_PACKET(1))),
?assertEqual(?SUBSCRIBE, emqx_packet:type(?SUBSCRIBE_PACKET(1, []))),
?assertEqual(?SUBACK, emqx_packet:type(?SUBACK_PACKET(1, [0]))),
?assertEqual(?UNSUBSCRIBE, emqx_packet:type(?UNSUBSCRIBE_PACKET(1, []))),
?assertEqual(?UNSUBACK, emqx_packet:type(?UNSUBACK_PACKET(1))),
?assertEqual(?DISCONNECT, emqx_packet:type(?DISCONNECT_PACKET(?RC_SUCCESS))),
?assertEqual(?AUTH, emqx_packet:type(?AUTH_PACKET())).
t_type_name(_) -> t_type_name(_) ->
?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)), ?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT_PACKET(#mqtt_packet_connect{}))),
?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). ?assertEqual('CONNACK', emqx_packet:type_name(?CONNACK_PACKET(?RC_SUCCESS))),
?assertEqual('PUBLISH', emqx_packet:type_name(?PUBLISH_PACKET(?QOS_1))),
?assertEqual('PUBACK', emqx_packet:type_name(?PUBACK_PACKET(1))),
?assertEqual('PUBREC', emqx_packet:type_name(?PUBREC_PACKET(1))),
?assertEqual('PUBREL', emqx_packet:type_name(?PUBREL_PACKET(1))),
?assertEqual('PUBCOMP', emqx_packet:type_name(?PUBCOMP_PACKET(1))),
?assertEqual('SUBSCRIBE', emqx_packet:type_name(?SUBSCRIBE_PACKET(1, []))),
?assertEqual('SUBACK', emqx_packet:type_name(?SUBACK_PACKET(1, [0]))),
?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE_PACKET(1, []))),
?assertEqual('UNSUBACK', emqx_packet:type_name(?UNSUBACK_PACKET(1))),
?assertEqual('DISCONNECT', emqx_packet:type_name(?DISCONNECT_PACKET(?RC_SUCCESS))),
?assertEqual('AUTH', emqx_packet:type_name(?AUTH_PACKET())).
t_validate(_) -> t_dup(_) ->
?assert(emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, ?assertEqual(false, emqx_packet:dup(?PUBLISH_PACKET(?QOS_1))).
[{<<"topic">>, #{qos => ?QOS_0}}]))),
?assert(emqx_packet:validate(?UNSUBSCRIBE_PACKET(89, [<<"topic">>]))), t_qos(_) ->
?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{}))), ?assertEqual(?QOS_0, emqx_packet:qos(?PUBLISH_PACKET(?QOS_0))),
?assertEqual(?QOS_1, emqx_packet:qos(?PUBLISH_PACKET(?QOS_1))),
?assertEqual(?QOS_2, emqx_packet:qos(?PUBLISH_PACKET(?QOS_2))).
t_retain(_) ->
?assertEqual(false, emqx_packet:retain(?PUBLISH_PACKET(?QOS_1))).
t_proto_name(_) ->
lists:foreach(
fun({Ver, Name}) ->
ConnPkt = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = Ver,
proto_name = Name}),
?assertEqual(Name, emqx_packet:proto_name(ConnPkt))
end, ?PROTOCOL_NAMES).
t_proto_ver(_) ->
lists:foreach(
fun(Ver) ->
?assertEqual(Ver, emqx_packet:proto_ver(#mqtt_packet_connect{proto_ver = Ver}))
end, [?MQTT_PROTO_V3, ?MQTT_PROTO_V4, ?MQTT_PROTO_V5]).
t_check_publish(_) ->
Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1}, Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1},
?assert(emqx_packet:validate(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>))), ok = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>)),
?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' => 1}}))), ok = emqx_packet:check(#mqtt_packet_publish{packet_id = 1, topic_name = <<"t">>}),
?assertError(subscription_identifier_invalid, {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(1,<<>>,1,#{},<<"payload">>)),
emqx_packet:validate( {error, ?RC_TOPIC_NAME_INVALID} = emqx_packet:check(?PUBLISH_PACKET(1, <<"+/+">>, 1, #{}, <<"payload">>)),
?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1}, {error, ?RC_TOPIC_ALIAS_INVALID} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>)),
[{<<"topic">>, #{qos => ?QOS_0}}]))), %% TODO::
?assertError(topic_filters_invalid, %% {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 10}, <<"payload">>)),
emqx_packet:validate(?UNSUBSCRIBE_PACKET(1,[]))), ok = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 10}, <<"payload">>)),
?assertError(protocol_error, {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Response-Topic' => <<"+/+">>}, <<"payload">>)).
emqx_packet:validate(?PUBLISH_PACKET(1,<<>>,1,#{},<<"payload">>))),
?assertError(topic_name_invalid, t_check_subscribe(_) ->
emqx_packet:validate(?PUBLISH_PACKET ok = emqx_packet:check(?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 1},
(1, <<"+/+">>, 1, #{}, <<"payload">>))), [{<<"topic">>, #{qos => ?QOS_0}}])),
?assertError(topic_alias_invalid, {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED} =
emqx_packet:validate( emqx_packet:check(?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => -1},
?PUBLISH_PACKET [{<<"topic">>, #{qos => ?QOS_0, rp => 0}}])).
(1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>))),
?assertError(protocol_error, t_check_unsubscribe(_) ->
emqx_packet:validate( ok = emqx_packet:check(?UNSUBSCRIBE_PACKET(1, [<<"topic">>])),
?PUBLISH_PACKET(1, <<"topic">>, 1, {error, ?RC_TOPIC_FILTER_INVALID} = emqx_packet:check(?UNSUBSCRIBE_PACKET(1,[])).
#{'Subscription-Identifier' => 10}, <<"payload">>))),
?assertError(protocol_error, t_check_connect(_) ->
emqx_packet:validate( Opts = #{max_clientid_len => 5, mqtt_retain_available => false},
?PUBLISH_PACKET(1, <<"topic">>, 1, ok = emqx_packet:check(#mqtt_packet_connect{}, Opts),
#{'Response-Topic' => <<"+/+">>}, <<"payload">>))), ok = emqx_packet:check(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' => 1}}), Opts),
?assertError(protocol_error, ConnPkt1 = #mqtt_packet_connect{proto_name = <<"MQIsdp">>,
emqx_packet:validate( proto_ver = ?MQTT_PROTO_V5
},
{error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} = emqx_packet:check(ConnPkt1, Opts),
ConnPkt2 = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
proto_name = <<"MQIsdp">>,
client_id = <<>>
},
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} = emqx_packet:check(ConnPkt2, Opts),
ConnPkt3 = #mqtt_packet_connect{client_id = <<"123456">>},
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} = emqx_packet:check(ConnPkt3, Opts),
ConnPkt4 = #mqtt_packet_connect{will_flag = true,
will_retain = true
},
{error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_packet:check(ConnPkt4, Opts),
ConnPkt5 = #mqtt_packet_connect{will_flag = true,
will_topic = <<"#">>
},
{error, ?RC_TOPIC_NAME_INVALID} = emqx_packet:check(ConnPkt5, Opts),
ConnPkt6 = ?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Request-Response-Information' => -1}}),
{error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(ConnPkt6, Opts),
{error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(
?CONNECT_PACKET(#mqtt_packet_connect{ ?CONNECT_PACKET(#mqtt_packet_connect{
properties = properties = #{'Request-Problem-Information' => 2}}), Opts),
#{'Request-Response-Information' => -1}}))), {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(
?assertError(protocol_error,
emqx_packet:validate(
?CONNECT_PACKET(#mqtt_packet_connect{ ?CONNECT_PACKET(#mqtt_packet_connect{
properties = properties = #{'Receive-Maximum' => 0}}), Opts).
#{'Request-Problem-Information' => 2}}))),
?assertError(protocol_error,
emqx_packet:validate(
?CONNECT_PACKET(#mqtt_packet_connect{
properties =
#{'Receive-Maximum' => 0}}))).
t_from_to_message(_) -> t_from_to_message(_) ->
ExpectedMsg = emqx_message:set_headers(
#{peername => {{127,0,0,1}, 9527}, username => <<"test">>},
emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>)),
ExpectedMsg1 = emqx_message:set_flag(retain, false, ExpectedMsg),
Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = ?QOS_0, qos = ?QOS_0,
retain = false, retain = false,
@ -91,30 +155,12 @@ t_from_to_message(_) ->
packet_id = 10, packet_id = 10,
properties = #{}}, properties = #{}},
payload = <<"payload">>}, payload = <<"payload">>},
Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>), MsgFromPkt = emqx_packet:to_message(#{client_id => <<"clientid">>,
Msg2 = emqx_message:set_flag(retain, false, Msg), username => <<"test">>,
Pkt = emqx_packet:from_message(10, Msg2),
Msg3 = emqx_message:set_header(
peername, {{127,0,0,1}, 9527},
emqx_message:set_header(username, "test", Msg2)
),
Msg4 = emqx_packet:to_message(#{client_id => <<"clientid">>,
username => "test",
peername => {{127,0,0,1}, 9527}}, Pkt), peername => {{127,0,0,1}, 9527}}, Pkt),
Msg5 = Msg4#message{timestamp = Msg3#message.timestamp, id = Msg3#message.id}, ?assertEqual(ExpectedMsg1, MsgFromPkt#message{id = emqx_message:id(ExpectedMsg),
Msg5 = Msg3. timestamp = emqx_message:timestamp(ExpectedMsg)
}).
t_packet_format(_) ->
io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]),
io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]),
io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))]),
io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]),
io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]).
t_will_msg(_) -> t_will_msg(_) ->
Pkt = #mqtt_packet_connect{will_flag = true, Pkt = #mqtt_packet_connect{will_flag = true,
@ -130,3 +176,15 @@ t_will_msg(_) ->
?assertEqual(<<"clientid">>, Msg#message.from), ?assertEqual(<<"clientid">>, Msg#message.from),
?assertEqual(<<"topic">>, Msg#message.topic). ?assertEqual(<<"topic">>, Msg#message.topic).
t_format(_) ->
io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]),
io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]),
io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))]),
io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]),
io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]).