Merge branch 'emqx30-dev' of github.com:emqtt/emqttd into emqx30-dev
This commit is contained in:
commit
51ea864526
|
@ -0,0 +1,14 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% For paho interoperability test cases
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
{deny, {client, "myclientid"}, subscribe, ["test/nosubscribe"]}.
|
||||||
|
|
||||||
|
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
|
||||||
|
|
||||||
|
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
|
||||||
|
|
||||||
|
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
|
||||||
|
|
||||||
|
{allow, all}.
|
||||||
|
|
|
@ -140,7 +140,20 @@
|
||||||
-type(mqtt_packet_type() :: ?RESERVED..?AUTH).
|
-type(mqtt_packet_type() :: ?RESERVED..?AUTH).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% MQTT Reason Codes
|
%% MQTT V3.1.1 Connect Return Codes
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(CONNACK_ACCEPT, 0). %% Connection accepted
|
||||||
|
-define(CONNACK_PROTO_VER, 1). %% Unacceptable protocol version
|
||||||
|
-define(CONNACK_INVALID_ID, 2). %% Client Identifier is correct UTF-8 but not allowed by the Server
|
||||||
|
-define(CONNACK_SERVER, 3). %% Server unavailable
|
||||||
|
-define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed
|
||||||
|
-define(CONNACK_AUTH, 5). %% Client is not authorized to connect
|
||||||
|
|
||||||
|
-type(mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT V5.0 Reason Codes
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-define(RC_SUCCESS, 16#00).
|
-define(RC_SUCCESS, 16#00).
|
||||||
|
|
|
@ -331,7 +331,7 @@ parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
|
||||||
{Value + Len * Multiplier, Rest}.
|
{Value + Len * Multiplier, Rest}.
|
||||||
|
|
||||||
parse_topic_filters(subscribe, Bin) ->
|
parse_topic_filters(subscribe, Bin) ->
|
||||||
[{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS}}
|
[{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS, rc => 0, subid => 0}}
|
||||||
|| <<Len:16/big, Topic:Len/binary, _:2, Rh:2, Rap:1, Nl:1, QoS:2>> <= Bin];
|
|| <<Len:16/big, Topic:Len/binary, _:2, Rh:2, Rap:1, Nl:1, QoS:2>> <= Bin];
|
||||||
|
|
||||||
parse_topic_filters(unsubscribe, Bin) ->
|
parse_topic_filters(unsubscribe, Bin) ->
|
||||||
|
|
|
@ -73,6 +73,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
|
||||||
peercert = Peercert,
|
peercert = Peercert,
|
||||||
proto_ver = ?MQTT_PROTO_V4,
|
proto_ver = ?MQTT_PROTO_V4,
|
||||||
proto_name = <<"MQTT">>,
|
proto_name = <<"MQTT">>,
|
||||||
|
client_id = <<>>,
|
||||||
client_pid = self(),
|
client_pid = self(),
|
||||||
username = init_username(Peercert, Options),
|
username = init_username(Peercert, Options),
|
||||||
is_super = false,
|
is_super = false,
|
||||||
|
@ -201,6 +202,8 @@ process(?CONNECT_PACKET(
|
||||||
username = Username,
|
username = Username,
|
||||||
password = Password} = Connect), PState) ->
|
password = Password} = Connect), PState) ->
|
||||||
|
|
||||||
|
io:format("~p~n", [Connect]),
|
||||||
|
|
||||||
PState1 = set_username(Username,
|
PState1 = set_username(Username,
|
||||||
PState#pstate{client_id = ClientId,
|
PState#pstate{client_id = ClientId,
|
||||||
proto_ver = ProtoVer,
|
proto_ver = ProtoVer,
|
||||||
|
@ -334,8 +337,12 @@ process(?PACKET(?DISCONNECT), PState) ->
|
||||||
connack({?RC_SUCCESS, SP, PState}) ->
|
connack({?RC_SUCCESS, SP, PState}) ->
|
||||||
deliver({connack, ?RC_SUCCESS, sp(SP)}, PState);
|
deliver({connack, ?RC_SUCCESS, sp(SP)}, PState);
|
||||||
|
|
||||||
connack({ReasonCode, PState}) ->
|
connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
|
||||||
deliver({connack, ReasonCode, 0}, PState),
|
_ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||||
|
ReasonCode;
|
||||||
|
true ->
|
||||||
|
emqx_reason_codes:compat(connack, ReasonCode)
|
||||||
|
end}, PState),
|
||||||
{error, emqx_reason_codes:name(ReasonCode), PState}.
|
{error, emqx_reason_codes:name(ReasonCode), PState}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -384,8 +391,13 @@ deliver({pubrel, PacketId}, PState) ->
|
||||||
deliver({pubrec, PacketId, ReasonCode}, PState) ->
|
deliver({pubrec, PacketId, ReasonCode}, PState) ->
|
||||||
send(?PUBREC_PACKET(PacketId, ReasonCode), PState);
|
send(?PUBREC_PACKET(PacketId, ReasonCode), PState);
|
||||||
|
|
||||||
deliver({suback, PacketId, ReasonCodes}, PState) ->
|
deliver({suback, PacketId, ReasonCodes}, PState = #pstate{proto_ver = ProtoVer}) ->
|
||||||
send(?SUBACK_PACKET(PacketId, ReasonCodes), PState);
|
send(?SUBACK_PACKET(PacketId,
|
||||||
|
if ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||||
|
ReasonCodes;
|
||||||
|
true ->
|
||||||
|
[emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes]
|
||||||
|
end), PState);
|
||||||
|
|
||||||
deliver({unsuback, PacketId, ReasonCodes}, PState) ->
|
deliver({unsuback, PacketId, ReasonCodes}, PState) ->
|
||||||
send(?UNSUBACK_PACKET(PacketId, ReasonCodes), PState);
|
send(?UNSUBACK_PACKET(PacketId, ReasonCodes), PState);
|
||||||
|
@ -401,12 +413,12 @@ deliver({disconnect, _ReasonCode}, PState) ->
|
||||||
%% Send Packet to Client
|
%% Send Packet to Client
|
||||||
|
|
||||||
-spec(send(mqtt_packet(), state()) -> {ok, state()} | {error, term()}).
|
-spec(send(mqtt_packet(), state()) -> {ok, state()} | {error, term()}).
|
||||||
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver,
|
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) ->
|
||||||
sendfun = SendFun}) ->
|
trace(send, Packet, PState),
|
||||||
case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of
|
case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of
|
||||||
ok -> emqx_metrics:sent(Packet),
|
ok ->
|
||||||
trace(send, Packet, PState),
|
emqx_metrics:sent(Packet),
|
||||||
{ok, inc_stats(send, Type, PState)};
|
{ok, inc_stats(send, Type, PState)};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
@ -415,7 +427,7 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver,
|
||||||
%% Assign a clientid
|
%% Assign a clientid
|
||||||
|
|
||||||
maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) ->
|
maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) ->
|
||||||
ClientId = iolist_to_binary(["emqx_", emqx_guid:gen()]),
|
ClientId = emqx_guid:to_base62(emqx_guid:gen()),
|
||||||
AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
|
AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
|
||||||
PState#pstate{client_id = ClientId, ackprops = AckProps1};
|
PState#pstate{client_id = ClientId, ackprops = AckProps1};
|
||||||
maybe_assign_client_id(PState) ->
|
maybe_assign_client_id(PState) ->
|
||||||
|
@ -464,18 +476,20 @@ check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
|
||||||
false -> {error, ?RC_PROTOCOL_ERROR}
|
false -> {error, ?RC_PROTOCOL_ERROR}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Issue#599: Null clientId and clean_start = false
|
|
||||||
check_client_id(#mqtt_packet_connect{client_id = ClientId,
|
|
||||||
clean_start = false}, _PState)
|
|
||||||
when ClientId == undefined; ClientId == <<>> ->
|
|
||||||
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
|
|
||||||
|
|
||||||
%% MQTT3.1 does not allow null clientId
|
%% MQTT3.1 does not allow null clientId
|
||||||
check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
|
check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
|
||||||
client_id = ClientId}, _PState)
|
client_id = <<>>}, _PState) ->
|
||||||
when ClientId == undefined; ClientId == <<>> ->
|
|
||||||
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
|
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
|
||||||
|
|
||||||
|
%% Issue#599: Null clientId and clean_start = false
|
||||||
|
check_client_id(#mqtt_packet_connect{client_id = <<>>,
|
||||||
|
clean_start = false}, _PState) ->
|
||||||
|
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
|
||||||
|
|
||||||
|
check_client_id(#mqtt_packet_connect{client_id = <<>>,
|
||||||
|
clean_start = true}, _PState) ->
|
||||||
|
ok;
|
||||||
|
|
||||||
check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}) ->
|
check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}) ->
|
||||||
Len = byte_size(ClientId),
|
Len = byte_size(ClientId),
|
||||||
MaxLen = emqx_zone:get_env(Zone, max_clientid_len),
|
MaxLen = emqx_zone:get_env(Zone, max_clientid_len),
|
||||||
|
|
|
@ -15,7 +15,10 @@
|
||||||
%% @doc MQTT5 reason codes
|
%% @doc MQTT5 reason codes
|
||||||
-module(emqx_reason_codes).
|
-module(emqx_reason_codes).
|
||||||
|
|
||||||
|
-include("emqx_mqtt.hrl").
|
||||||
|
|
||||||
-export([name/1, text/1]).
|
-export([name/1, text/1]).
|
||||||
|
-export([compat/2]).
|
||||||
|
|
||||||
name(16#00) -> success;
|
name(16#00) -> success;
|
||||||
name(16#01) -> granted_qos1;
|
name(16#01) -> granted_qos1;
|
||||||
|
@ -107,3 +110,24 @@ text(16#A1) -> <<"Subscription Identifiers not supported">>;
|
||||||
text(16#A2) -> <<"Wildcard Subscriptions not supported">>;
|
text(16#A2) -> <<"Wildcard Subscriptions not supported">>;
|
||||||
text(Code) -> iolist_to_binary(["Unkown Reason Code:", integer_to_list(Code)]).
|
text(Code) -> iolist_to_binary(["Unkown Reason Code:", integer_to_list(Code)]).
|
||||||
|
|
||||||
|
compat(connack, 16#80) -> ?CONNACK_PROTO_VER;
|
||||||
|
compat(connack, 16#81) -> ?CONNACK_PROTO_VER;
|
||||||
|
compat(connack, 16#82) -> ?CONNACK_PROTO_VER;
|
||||||
|
compat(connack, 16#83) -> ?CONNACK_PROTO_VER;
|
||||||
|
compat(connack, 16#84) -> ?CONNACK_PROTO_VER;
|
||||||
|
compat(connack, 16#85) -> ?CONNACK_INVALID_ID;
|
||||||
|
compat(connack, 16#86) -> ?CONNACK_CREDENTIALS;
|
||||||
|
compat(connack, 16#87) -> ?CONNACK_AUTH;
|
||||||
|
compat(connack, 16#88) -> ?CONNACK_SERVER;
|
||||||
|
compat(connack, 16#89) -> ?CONNACK_SERVER;
|
||||||
|
compat(connack, 16#8A) -> ?CONNACK_AUTH;
|
||||||
|
compat(connack, 16#8B) -> ?CONNACK_SERVER;
|
||||||
|
compat(connack, 16#8C) -> ?CONNACK_AUTH;
|
||||||
|
compat(connack, 16#97) -> ?CONNACK_SERVER;
|
||||||
|
compat(connack, 16#9C) -> ?CONNACK_SERVER;
|
||||||
|
compat(connack, 16#9D) -> ?CONNACK_SERVER;
|
||||||
|
compat(connack, 16#9F) -> ?CONNACK_SERVER;
|
||||||
|
|
||||||
|
compat(suback, Code) when Code =< ?QOS2 -> Code;
|
||||||
|
compat(suback, Code) when Code > 16#80 -> 16#80.
|
||||||
|
|
||||||
|
|
|
@ -433,7 +433,7 @@ handle_cast({subscribe, From, {PacketId, Properties, TopicFilters}},
|
||||||
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
|
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
|
||||||
State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
|
State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
|
||||||
{ReasonCodes, Subscriptions1} =
|
{ReasonCodes, Subscriptions1} =
|
||||||
lists:foldr(fun(Topic, {RcAcc, SubMap}) ->
|
lists:foldr(fun({Topic, _Opts}, {RcAcc, SubMap}) ->
|
||||||
case maps:find(Topic, SubMap) of
|
case maps:find(Topic, SubMap) of
|
||||||
{ok, SubOpts} ->
|
{ok, SubOpts} ->
|
||||||
emqx_broker:unsubscribe(Topic, ClientId),
|
emqx_broker:unsubscribe(Topic, ClientId),
|
||||||
|
@ -649,7 +649,6 @@ retry_delivery(Force, State = #state{inflight = Inflight}) ->
|
||||||
State;
|
State;
|
||||||
false ->
|
false ->
|
||||||
Msgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)),
|
Msgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)),
|
||||||
io:format("!!! Retry Delivery: ~p~n", [Msgs]),
|
|
||||||
retry_delivery(Force, Msgs, os:timestamp(), State)
|
retry_delivery(Force, Msgs, os:timestamp(), State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,7 @@ record(Group, Topic, SubPid) ->
|
||||||
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
|
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
|
||||||
|
|
||||||
%% TODO: dispatch strategy, ensure the delivery...
|
%% TODO: dispatch strategy, ensure the delivery...
|
||||||
dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
|
dispatch({Group, _Node}, Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
|
||||||
case pick(subscribers(Group, Topic)) of
|
case pick(subscribers(Group, Topic)) of
|
||||||
false -> Delivery;
|
false -> Delivery;
|
||||||
SubPid -> SubPid ! {dispatch, Topic, Msg},
|
SubPid -> SubPid ! {dispatch, Topic, Msg},
|
||||||
|
@ -98,7 +98,6 @@ pick(SubPids) ->
|
||||||
|
|
||||||
subscribers(Group, Topic) ->
|
subscribers(Group, Topic) ->
|
||||||
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
|
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
|
||||||
|
|
||||||
%%-----------------------------------------------------------------------------
|
%%-----------------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%-----------------------------------------------------------------------------
|
%%-----------------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue