commit
66e9f9b02a
|
@ -43,7 +43,9 @@
|
|||
{mqtt_wildcard_subscription, true}]).
|
||||
|
||||
-define(PUBCAP_KEYS, [max_qos_allowed,
|
||||
mqtt_retain_available]).
|
||||
mqtt_retain_available,
|
||||
mqtt_topic_alias
|
||||
]).
|
||||
-define(SUBCAP_KEYS, [max_qos_allowed,
|
||||
max_topic_levels,
|
||||
mqtt_shared_subscription,
|
||||
|
@ -60,8 +62,15 @@ do_check_pub(Props = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) ->
|
|||
true -> {error, ?RC_QOS_NOT_SUPPORTED};
|
||||
false -> do_check_pub(Props, Caps)
|
||||
end;
|
||||
do_check_pub(Props = #{ topic_alias := TopicAlias}, [{max_topic_alias, MaxTopicAlias}| Caps]) ->
|
||||
case TopicAlias =< MaxTopicAlias andalso TopicAlias > 0 of
|
||||
false -> {error, ?RC_TOPIC_ALIAS_INVALID};
|
||||
true -> do_check_pub(Props, Caps)
|
||||
end;
|
||||
do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) ->
|
||||
{error, ?RC_RETAIN_NOT_SUPPORTED};
|
||||
do_check_pub(Props, [{max_topic_alias, _} | Caps]) ->
|
||||
do_check_pub(Props, Caps);
|
||||
do_check_pub(Props, [{mqtt_retain_available, _}|Caps]) ->
|
||||
do_check_pub(Props, Caps).
|
||||
|
||||
|
@ -136,4 +145,3 @@ with_env(Zone, Key, InitFun) ->
|
|||
Caps;
|
||||
ZoneCaps -> ZoneCaps
|
||||
end.
|
||||
|
||||
|
|
|
@ -72,9 +72,11 @@ validate_packet_id(_) ->
|
|||
validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I})
|
||||
when I =< 0; I >= 16#FFFFFFF ->
|
||||
error(subscription_identifier_invalid);
|
||||
validate_properties(?PUBLISH, # {'Topic-Alias':= I})
|
||||
validate_properties(?PUBLISH, #{'Topic-Alias':= I})
|
||||
when I =:= 0 ->
|
||||
error(topic_alias_invalid);
|
||||
validate_properties(?PUBLISH, #{'Subscription-Identifier' := _I}) ->
|
||||
error(protocol_error);
|
||||
validate_properties(_, _) ->
|
||||
true.
|
||||
|
||||
|
@ -236,4 +238,3 @@ format_password(_Password) -> '******'.
|
|||
i(true) -> 1;
|
||||
i(false) -> 0;
|
||||
i(I) when is_integer(I) -> I.
|
||||
|
||||
|
|
|
@ -33,36 +33,36 @@
|
|||
-export([shutdown/2]).
|
||||
|
||||
-record(pstate, {
|
||||
zone,
|
||||
sendfun,
|
||||
peername,
|
||||
peercert,
|
||||
proto_ver,
|
||||
proto_name,
|
||||
ackprops,
|
||||
client_id,
|
||||
is_assigned,
|
||||
conn_pid,
|
||||
conn_props,
|
||||
ack_props,
|
||||
username,
|
||||
session,
|
||||
clean_start,
|
||||
topic_aliases,
|
||||
packet_size,
|
||||
will_topic,
|
||||
will_msg,
|
||||
keepalive,
|
||||
mountpoint,
|
||||
is_super,
|
||||
is_bridge,
|
||||
enable_ban,
|
||||
enable_acl,
|
||||
recv_stats,
|
||||
send_stats,
|
||||
connected,
|
||||
connected_at
|
||||
}).
|
||||
zone,
|
||||
sendfun,
|
||||
peername,
|
||||
peercert,
|
||||
proto_ver,
|
||||
proto_name,
|
||||
ackprops,
|
||||
client_id,
|
||||
is_assigned,
|
||||
conn_pid,
|
||||
conn_props,
|
||||
ack_props,
|
||||
username,
|
||||
session,
|
||||
clean_start,
|
||||
topic_aliases,
|
||||
packet_size,
|
||||
will_topic,
|
||||
will_msg,
|
||||
keepalive,
|
||||
mountpoint,
|
||||
is_super,
|
||||
is_bridge,
|
||||
enable_ban,
|
||||
enable_acl,
|
||||
recv_stats,
|
||||
send_stats,
|
||||
connected,
|
||||
connected_at
|
||||
}).
|
||||
|
||||
-type(state() :: #pstate{}).
|
||||
-export_type([state/0]).
|
||||
|
@ -315,9 +315,12 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PSt
|
|||
case check_publish(Packet, PState) of
|
||||
{ok, PState1} ->
|
||||
do_publish(Packet, PState1);
|
||||
{error, ?RC_TOPIC_ALIAS_INVALID} ->
|
||||
?LOG(error, "Protocol error - ~p", [?RC_TOPIC_ALIAS_INVALID], PState),
|
||||
{error, ?RC_TOPIC_ALIAS_INVALID, PState};
|
||||
{error, ReasonCode} ->
|
||||
?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, ReasonCode], PState),
|
||||
{ok, PState}
|
||||
{error, ReasonCode, PState}
|
||||
end;
|
||||
|
||||
process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) ->
|
||||
|
@ -644,9 +647,11 @@ check_publish(Packet, PState) ->
|
|||
run_check_steps([fun check_pub_caps/2,
|
||||
fun check_pub_acl/2], Packet, PState).
|
||||
|
||||
check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = R}},
|
||||
check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain},
|
||||
variable = #mqtt_packet_publish{ properties = Properties}},
|
||||
#pstate{zone = Zone}) ->
|
||||
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => R}).
|
||||
#{'Topic-Alias' := TopicAlias} = Properties,
|
||||
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic_alias => TopicAlias}).
|
||||
|
||||
check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl})
|
||||
when IsSuper orelse (not EnableAcl) ->
|
||||
|
|
|
@ -38,7 +38,7 @@ t_get_set_caps(_) ->
|
|||
mqtt_wildcard_subscription => true
|
||||
},
|
||||
Caps2 = Caps#{max_packet_size => 1048576},
|
||||
case emqx_mqtt_caps:get_caps(zone) of
|
||||
case emqx_mqtt_caps:get_caps(zone) of
|
||||
Caps -> ok;
|
||||
Caps2 -> ok
|
||||
end,
|
||||
|
@ -64,20 +64,28 @@ t_check_pub(_) ->
|
|||
{ok, _} = emqx_zone:start_link(),
|
||||
PubCaps = #{
|
||||
max_qos_allowed => ?QOS_1,
|
||||
mqtt_retain_available => false
|
||||
mqtt_retain_available => false,
|
||||
max_topic_alias => 4
|
||||
},
|
||||
emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps),
|
||||
timer:sleep(100),
|
||||
ct:log("~p", [emqx_mqtt_caps:get_caps(zone, publish)]),
|
||||
BadPubProps1 = #{
|
||||
qos => ?QOS_2,
|
||||
retain => false
|
||||
},
|
||||
},
|
||||
{error, ?RC_QOS_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps1),
|
||||
BadPubProps2 = #{
|
||||
qos => ?QOS_1,
|
||||
retain => true
|
||||
},
|
||||
},
|
||||
{error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps2),
|
||||
BadPubProps3 = #{
|
||||
qos => ?QOS_1,
|
||||
retain => false,
|
||||
topic_alias => 5
|
||||
},
|
||||
{error, ?RC_TOPIC_ALIAS_INVALID} = emqx_mqtt_caps:check_pub(zone, BadPubProps3),
|
||||
PubProps = #{
|
||||
qos => ?QOS_1,
|
||||
retain => false
|
||||
|
@ -96,19 +104,19 @@ t_check_sub(_) ->
|
|||
mqtt_wildcard_subscription => true
|
||||
},
|
||||
|
||||
ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]),
|
||||
ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]),
|
||||
ok = do_check_sub(Caps#{max_qos_allowed => ?QOS_1}, [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{qos => ?QOS_1}}]),
|
||||
ok = do_check_sub(Caps#{max_topic_levels => 1},
|
||||
[{<<"client/stat">>, Opts}],
|
||||
ok = do_check_sub(Caps#{max_topic_levels => 1},
|
||||
[{<<"client/stat">>, Opts}],
|
||||
[{<<"client/stat">>, Opts#{rc => ?RC_TOPIC_FILTER_INVALID}}]),
|
||||
ok = do_check_sub(Caps#{mqtt_shared_subscription => false},
|
||||
[{<<"client/stat">>, Opts}],
|
||||
ok = do_check_sub(Caps#{mqtt_shared_subscription => false},
|
||||
[{<<"client/stat">>, Opts}],
|
||||
[{<<"client/stat">>, Opts#{rc => ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}}]),
|
||||
|
||||
ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false},
|
||||
[{<<"vlient/+/dsofi">>, Opts}],
|
||||
[{<<"vlient/+/dsofi">>, Opts#{rc => ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}]),
|
||||
emqx_zone:stop().
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue