Merge branch 'emqx30-dev' into emqx30

This commit is contained in:
Feng Lee 2018-08-25 16:20:09 +08:00
commit 03d2d24949
7 changed files with 87 additions and 24 deletions

14
etc/acl.conf.paho Normal file
View File

@ -0,0 +1,14 @@
%%--------------------------------------------------------------------
%% For paho interoperability test cases
%%--------------------------------------------------------------------
{deny, {client, "myclientid"}, subscribe, ["test/nosubscribe"]}.
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
{allow, all}.

View File

@ -140,7 +140,20 @@
-type(mqtt_packet_type() :: ?RESERVED..?AUTH).
%%--------------------------------------------------------------------
%% MQTT Reason Codes
%% MQTT V3.1.1 Connect Return Codes
%%--------------------------------------------------------------------
-define(CONNACK_ACCEPT, 0). %% Connection accepted
-define(CONNACK_PROTO_VER, 1). %% Unacceptable protocol version
-define(CONNACK_INVALID_ID, 2). %% Client Identifier is correct UTF-8 but not allowed by the Server
-define(CONNACK_SERVER, 3). %% Server unavailable
-define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed
-define(CONNACK_AUTH, 5). %% Client is not authorized to connect
-type(mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH).
%%--------------------------------------------------------------------
%% MQTT V5.0 Reason Codes
%%--------------------------------------------------------------------
-define(RC_SUCCESS, 16#00).

View File

@ -331,7 +331,7 @@ parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
{Value + Len * Multiplier, Rest}.
parse_topic_filters(subscribe, Bin) ->
[{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS}}
[{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS, rc => 0, subid => 0}}
|| <<Len:16/big, Topic:Len/binary, _:2, Rh:2, Rap:1, Nl:1, QoS:2>> <= Bin];
parse_topic_filters(unsubscribe, Bin) ->

View File

@ -73,6 +73,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
peercert = Peercert,
proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>,
client_id = <<>>,
client_pid = self(),
username = init_username(Peercert, Options),
is_super = false,
@ -201,6 +202,8 @@ process(?CONNECT_PACKET(
username = Username,
password = Password} = Connect), PState) ->
io:format("~p~n", [Connect]),
PState1 = set_username(Username,
PState#pstate{client_id = ClientId,
proto_ver = ProtoVer,
@ -334,8 +337,12 @@ process(?PACKET(?DISCONNECT), PState) ->
connack({?RC_SUCCESS, SP, PState}) ->
deliver({connack, ?RC_SUCCESS, sp(SP)}, PState);
connack({ReasonCode, PState}) ->
deliver({connack, ReasonCode, 0}, PState),
connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
_ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 ->
ReasonCode;
true ->
emqx_reason_codes:compat(connack, ReasonCode)
end}, PState),
{error, emqx_reason_codes:name(ReasonCode), PState}.
%%------------------------------------------------------------------------------
@ -384,8 +391,13 @@ deliver({pubrel, PacketId}, PState) ->
deliver({pubrec, PacketId, ReasonCode}, PState) ->
send(?PUBREC_PACKET(PacketId, ReasonCode), PState);
deliver({suback, PacketId, ReasonCodes}, PState) ->
send(?SUBACK_PACKET(PacketId, ReasonCodes), PState);
deliver({suback, PacketId, ReasonCodes}, PState = #pstate{proto_ver = ProtoVer}) ->
send(?SUBACK_PACKET(PacketId,
if ProtoVer =:= ?MQTT_PROTO_V5 ->
ReasonCodes;
true ->
[emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes]
end), PState);
deliver({unsuback, PacketId, ReasonCodes}, PState) ->
send(?UNSUBACK_PACKET(PacketId, ReasonCodes), PState);
@ -401,12 +413,12 @@ deliver({disconnect, _ReasonCode}, PState) ->
%% Send Packet to Client
-spec(send(mqtt_packet(), state()) -> {ok, state()} | {error, term()}).
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver,
sendfun = SendFun}) ->
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) ->
trace(send, Packet, PState),
case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of
ok -> emqx_metrics:sent(Packet),
trace(send, Packet, PState),
{ok, inc_stats(send, Type, PState)};
ok ->
emqx_metrics:sent(Packet),
{ok, inc_stats(send, Type, PState)};
{error, Reason} ->
{error, Reason}
end.
@ -415,7 +427,7 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver,
%% Assign a clientid
maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) ->
ClientId = iolist_to_binary(["emqx_", emqx_guid:gen()]),
ClientId = emqx_guid:to_base62(emqx_guid:gen()),
AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
PState#pstate{client_id = ClientId, ackprops = AckProps1};
maybe_assign_client_id(PState) ->
@ -464,18 +476,20 @@ check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
false -> {error, ?RC_PROTOCOL_ERROR}
end.
%% Issue#599: Null clientId and clean_start = false
check_client_id(#mqtt_packet_connect{client_id = ClientId,
clean_start = false}, _PState)
when ClientId == undefined; ClientId == <<>> ->
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
%% MQTT3.1 does not allow null clientId
check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
client_id = ClientId}, _PState)
when ClientId == undefined; ClientId == <<>> ->
client_id = <<>>}, _PState) ->
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
%% Issue#599: Null clientId and clean_start = false
check_client_id(#mqtt_packet_connect{client_id = <<>>,
clean_start = false}, _PState) ->
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
check_client_id(#mqtt_packet_connect{client_id = <<>>,
clean_start = true}, _PState) ->
ok;
check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}) ->
Len = byte_size(ClientId),
MaxLen = emqx_zone:get_env(Zone, max_clientid_len),

View File

@ -15,7 +15,10 @@
%% @doc MQTT5 reason codes
-module(emqx_reason_codes).
-include("emqx_mqtt.hrl").
-export([name/1, text/1]).
-export([compat/2]).
name(16#00) -> success;
name(16#01) -> granted_qos1;
@ -107,3 +110,24 @@ text(16#A1) -> <<"Subscription Identifiers not supported">>;
text(16#A2) -> <<"Wildcard Subscriptions not supported">>;
text(Code) -> iolist_to_binary(["Unkown Reason Code:", integer_to_list(Code)]).
compat(connack, 16#80) -> ?CONNACK_PROTO_VER;
compat(connack, 16#81) -> ?CONNACK_PROTO_VER;
compat(connack, 16#82) -> ?CONNACK_PROTO_VER;
compat(connack, 16#83) -> ?CONNACK_PROTO_VER;
compat(connack, 16#84) -> ?CONNACK_PROTO_VER;
compat(connack, 16#85) -> ?CONNACK_INVALID_ID;
compat(connack, 16#86) -> ?CONNACK_CREDENTIALS;
compat(connack, 16#87) -> ?CONNACK_AUTH;
compat(connack, 16#88) -> ?CONNACK_SERVER;
compat(connack, 16#89) -> ?CONNACK_SERVER;
compat(connack, 16#8A) -> ?CONNACK_AUTH;
compat(connack, 16#8B) -> ?CONNACK_SERVER;
compat(connack, 16#8C) -> ?CONNACK_AUTH;
compat(connack, 16#97) -> ?CONNACK_SERVER;
compat(connack, 16#9C) -> ?CONNACK_SERVER;
compat(connack, 16#9D) -> ?CONNACK_SERVER;
compat(connack, 16#9F) -> ?CONNACK_SERVER;
compat(suback, Code) when Code =< ?QOS2 -> Code;
compat(suback, Code) when Code > 16#80 -> 16#80.

View File

@ -433,7 +433,7 @@ handle_cast({subscribe, From, {PacketId, Properties, TopicFilters}},
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} =
lists:foldr(fun(Topic, {RcAcc, SubMap}) ->
lists:foldr(fun({Topic, _Opts}, {RcAcc, SubMap}) ->
case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
emqx_broker:unsubscribe(Topic, ClientId),
@ -649,7 +649,6 @@ retry_delivery(Force, State = #state{inflight = Inflight}) ->
State;
false ->
Msgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)),
io:format("!!! Retry Delivery: ~p~n", [Msgs]),
retry_delivery(Force, Msgs, os:timestamp(), State)
end.

View File

@ -81,7 +81,7 @@ record(Group, Topic, SubPid) ->
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
%% TODO: dispatch strategy, ensure the delivery...
dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
dispatch({Group, _Node}, Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
case pick(subscribers(Group, Topic)) of
false -> Delivery;
SubPid -> SubPid ! {dispatch, Topic, Msg},
@ -98,7 +98,6 @@ pick(SubPids) ->
subscribers(Group, Topic) ->
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
%%-----------------------------------------------------------------------------
%% gen_server callbacks
%%-----------------------------------------------------------------------------