Merge remote-tracking branch 'origin/develop'
This commit is contained in:
commit
854a48d77c
|
@ -371,8 +371,8 @@ log.file = emqx.log
|
||||||
## Limits the total number of characters printed for each log event.
|
## Limits the total number of characters printed for each log event.
|
||||||
##
|
##
|
||||||
## Value: Integer
|
## Value: Integer
|
||||||
## Default: 8192
|
## Default: No Limit
|
||||||
log.chars_limit = 8192
|
#log.chars_limit = 8192
|
||||||
|
|
||||||
## Maximum size of each log file.
|
## Maximum size of each log file.
|
||||||
##
|
##
|
||||||
|
|
|
@ -421,7 +421,7 @@ end}.
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "log.chars_limit", "kernel.logger", [
|
{mapping, "log.chars_limit", "kernel.logger", [
|
||||||
{default, 8192},
|
{default, -1},
|
||||||
{datatype, integer}
|
{datatype, integer}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
|
|
@ -419,8 +419,7 @@ process(?CONNECT_PACKET(
|
||||||
{ReasonCode, PState1}
|
{ReasonCode, PState1}
|
||||||
end);
|
end);
|
||||||
|
|
||||||
process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
|
process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState = #pstate{zone = Zone}) ->
|
||||||
PState = #pstate{zone = Zone, proto_ver = ProtoVer}) ->
|
|
||||||
case check_publish(Packet, PState) of
|
case check_publish(Packet, PState) of
|
||||||
ok ->
|
ok ->
|
||||||
do_publish(Packet, PState);
|
do_publish(Packet, PState);
|
||||||
|
@ -428,12 +427,10 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
|
||||||
?LOG(warning, "[Protocol] Cannot publish qos0 message to ~s for ~s",
|
?LOG(warning, "[Protocol] Cannot publish qos0 message to ~s for ~s",
|
||||||
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
||||||
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
||||||
ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState},
|
do_acl_deny_action(AclDenyAction, Packet, ReasonCode, PState)
|
||||||
do_acl_deny_action(AclDenyAction, Packet, ReasonCode, ErrorTerm)
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
|
process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState = #pstate{zone = Zone}) ->
|
||||||
PState = #pstate{zone = Zone, proto_ver = ProtoVer}) ->
|
|
||||||
case check_publish(Packet, PState) of
|
case check_publish(Packet, PState) of
|
||||||
ok ->
|
ok ->
|
||||||
do_publish(Packet, PState);
|
do_publish(Packet, PState);
|
||||||
|
@ -442,14 +439,12 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
|
||||||
case deliver({puback, PacketId, ReasonCode}, PState) of
|
case deliver({puback, PacketId, ReasonCode}, PState) of
|
||||||
{ok, PState1} ->
|
{ok, PState1} ->
|
||||||
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
||||||
ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState1},
|
do_acl_deny_action(AclDenyAction, Packet, ReasonCode, PState1);
|
||||||
do_acl_deny_action(AclDenyAction, Packet, ReasonCode, ErrorTerm);
|
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
|
|
||||||
process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
|
process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState = #pstate{zone = Zone}) ->
|
||||||
PState = #pstate{zone = Zone, proto_ver = ProtoVer}) ->
|
|
||||||
case check_publish(Packet, PState) of
|
case check_publish(Packet, PState) of
|
||||||
ok ->
|
ok ->
|
||||||
do_publish(Packet, PState);
|
do_publish(Packet, PState);
|
||||||
|
@ -459,8 +454,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
|
||||||
case deliver({pubrec, PacketId, ReasonCode}, PState) of
|
case deliver({pubrec, PacketId, ReasonCode}, PState) of
|
||||||
{ok, PState1} ->
|
{ok, PState1} ->
|
||||||
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
||||||
ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState1},
|
do_acl_deny_action(AclDenyAction, Packet, ReasonCode, PState1);
|
||||||
do_acl_deny_action(AclDenyAction, Packet, ReasonCode, ErrorTerm);
|
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
|
@ -488,7 +482,7 @@ process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid})
|
||||||
{ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
|
{ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
|
||||||
|
|
||||||
process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
|
process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
|
||||||
PState = #pstate{zone = Zone, proto_ver = ProtoVer, session = SPid, credentials = Credentials}) ->
|
PState = #pstate{zone = Zone, session = SPid, credentials = Credentials}) ->
|
||||||
case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of
|
case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of
|
||||||
{ok, TopicFilters} ->
|
{ok, TopicFilters} ->
|
||||||
TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters),
|
TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters),
|
||||||
|
@ -507,8 +501,7 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
|
||||||
case deliver({suback, PacketId, ReasonCodes}, PState) of
|
case deliver({suback, PacketId, ReasonCodes}, PState) of
|
||||||
{ok, PState1} ->
|
{ok, PState1} ->
|
||||||
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
|
||||||
ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState1},
|
do_acl_deny_action(AclDenyAction, Packet, ReasonCodes, PState1);
|
||||||
do_acl_deny_action(AclDenyAction, Packet, ReasonCodes, ErrorTerm);
|
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
end
|
end
|
||||||
|
@ -778,7 +771,8 @@ check_connect(Packet, PState) ->
|
||||||
fun check_client_id/2,
|
fun check_client_id/2,
|
||||||
fun check_flapping/2,
|
fun check_flapping/2,
|
||||||
fun check_banned/2,
|
fun check_banned/2,
|
||||||
fun check_will_topic/2], Packet, PState).
|
fun check_will_topic/2,
|
||||||
|
fun check_will_retain/2], Packet, PState).
|
||||||
|
|
||||||
check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
|
check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
|
||||||
proto_name = Name}, _PState) ->
|
proto_name = Name}, _PState) ->
|
||||||
|
@ -829,6 +823,16 @@ check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState)
|
||||||
{error, ?RC_TOPIC_NAME_INVALID}
|
{error, ?RC_TOPIC_NAME_INVALID}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
check_will_retain(#mqtt_packet_connect{will_retain = false, proto_ver = ?MQTT_PROTO_V5}, _PState) ->
|
||||||
|
ok;
|
||||||
|
check_will_retain(#mqtt_packet_connect{will_retain = true, proto_ver = ?MQTT_PROTO_V5}, #pstate{zone = Zone}) ->
|
||||||
|
case emqx_zone:get_env(Zone, mqtt_retain_available, true) of
|
||||||
|
true -> {error, ?RC_RETAIN_NOT_SUPPORTED};
|
||||||
|
false -> ok
|
||||||
|
end;
|
||||||
|
check_will_retain(_Packet, _PState) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
check_will_acl(#mqtt_packet_connect{will_topic = WillTopic},
|
check_will_acl(#mqtt_packet_connect{will_topic = WillTopic},
|
||||||
#pstate{zone = Zone, credentials = Credentials}) ->
|
#pstate{zone = Zone, credentials = Credentials}) ->
|
||||||
EnableAcl = emqx_zone:get_env(Zone, enable_acl, false),
|
EnableAcl = emqx_zone:get_env(Zone, enable_acl, false),
|
||||||
|
@ -966,26 +970,33 @@ do_flapping_detect(Action, #pstate{zone = Zone,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_acl_deny_action(disconnect, ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
|
do_acl_deny_action(disconnect, ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
|
||||||
?RC_NOT_AUTHORIZED, ErrorTerm) ->
|
?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer}) ->
|
||||||
ErrorTerm;
|
{error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
|
||||||
|
|
||||||
do_acl_deny_action(disconnect, ?PUBLISH_PACKET(QoS, _Topic, _PacketId, _Payload),
|
do_acl_deny_action(disconnect, ?PUBLISH_PACKET(QoS, _Topic, _PacketId, _Payload),
|
||||||
?RC_NOT_AUTHORIZED, ErrorTerm = {_Error, _CodeName, PState})
|
?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer})
|
||||||
when QoS =:= ?QOS_1; QoS =:= ?QOS_2 ->
|
when QoS =:= ?QOS_1; QoS =:= ?QOS_2 ->
|
||||||
deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
|
deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
|
||||||
ErrorTerm;
|
{error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
|
||||||
|
|
||||||
do_acl_deny_action(disconnect, ?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters),
|
do_acl_deny_action(Action, ?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters), ReasonCodes, PState)
|
||||||
ReasonCodes, ErrorTerm = {_Error, _CodeName, PState}) ->
|
when is_list(ReasonCodes) ->
|
||||||
case lists:member(?RC_NOT_AUTHORIZED, ReasonCodes) of
|
traverse_reason_codes(ReasonCodes, Action, PState);
|
||||||
true ->
|
do_acl_deny_action(_OtherAction, _PubSubPacket, ?RC_NOT_AUTHORIZED, PState) ->
|
||||||
deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
|
{ok, PState};
|
||||||
ErrorTerm;
|
do_acl_deny_action(_OtherAction, _PubSubPacket, ReasonCode, PState = #pstate{proto_ver = ProtoVer}) ->
|
||||||
false ->
|
{error, emqx_reason_codes:name(ReasonCode, ProtoVer), PState}.
|
||||||
{ok, PState}
|
|
||||||
end;
|
traverse_reason_codes([], _Action, PState) ->
|
||||||
do_acl_deny_action(_OtherAction, _PubSupPacket, _ReasonCode, {_Error, _CodeName, PState}) ->
|
{ok, PState};
|
||||||
{ok, PState}.
|
traverse_reason_codes([?RC_SUCCESS | LeftReasonCodes], Action, PState) ->
|
||||||
|
traverse_reason_codes(LeftReasonCodes, Action, PState);
|
||||||
|
traverse_reason_codes([?RC_NOT_AUTHORIZED | _LeftReasonCodes], disconnect, PState = #pstate{proto_ver = ProtoVer}) ->
|
||||||
|
{error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
|
||||||
|
traverse_reason_codes([?RC_NOT_AUTHORIZED | LeftReasonCodes], Action, PState) ->
|
||||||
|
traverse_reason_codes(LeftReasonCodes, Action, PState);
|
||||||
|
traverse_reason_codes([OtherCode | _LeftReasonCodes], _Action, PState = #pstate{proto_ver = ProtoVer}) ->
|
||||||
|
{error, emqx_reason_codes:name(OtherCode, ProtoVer), PState}.
|
||||||
|
|
||||||
%% Reason code compat
|
%% Reason code compat
|
||||||
reason_codes_compat(_PktType, ReasonCodes, ?MQTT_PROTO_V5) ->
|
reason_codes_compat(_PktType, ReasonCodes, ?MQTT_PROTO_V5) ->
|
||||||
|
|
Loading…
Reference in New Issue